vendor/shopware/core/Framework/MessageQueue/ScheduledTask/Scheduler/TaskScheduler.php line 69

Open in your IDE?
  1. <?php declare(strict_types=1);
  2. namespace Shopware\Core\Framework\MessageQueue\ScheduledTask\Scheduler;
  3. use Shopware\Core\Defaults;
  4. use Shopware\Core\Framework\Context;
  5. use Shopware\Core\Framework\DataAbstractionLayer\EntityRepositoryInterface;
  6. use Shopware\Core\Framework\DataAbstractionLayer\Search\Aggregation\Metric\MinAggregation;
  7. use Shopware\Core\Framework\DataAbstractionLayer\Search\AggregationResult\AggregationResult;
  8. use Shopware\Core\Framework\DataAbstractionLayer\Search\AggregationResult\Metric\MinResult;
  9. use Shopware\Core\Framework\DataAbstractionLayer\Search\Criteria;
  10. use Shopware\Core\Framework\DataAbstractionLayer\Search\Filter\EqualsAnyFilter;
  11. use Shopware\Core\Framework\DataAbstractionLayer\Search\Filter\EqualsFilter;
  12. use Shopware\Core\Framework\DataAbstractionLayer\Search\Filter\NotFilter;
  13. use Shopware\Core\Framework\DataAbstractionLayer\Search\Filter\RangeFilter;
  14. use Shopware\Core\Framework\Log\Package;
  15. use Shopware\Core\Framework\MessageQueue\ScheduledTask\ScheduledTask;
  16. use Shopware\Core\Framework\MessageQueue\ScheduledTask\ScheduledTaskDefinition;
  17. use Shopware\Core\Framework\MessageQueue\ScheduledTask\ScheduledTaskEntity;
  18. use Symfony\Component\DependencyInjection\ParameterBag\ParameterBagInterface;
  19. use Symfony\Component\Messenger\MessageBusInterface;
  20. /**
  21.  * @deprecated tag:v6.5.0 - reason:becomes-final - Will be final starting with v6.5.0.0
  22.  */
  23. #[Package('core')]
  24. class TaskScheduler
  25. {
  26.     /**
  27.      * @var EntityRepositoryInterface
  28.      */
  29.     private $scheduledTaskRepository;
  30.     /**
  31.      * @var MessageBusInterface
  32.      */
  33.     private $bus;
  34.     private ParameterBagInterface $parameterBag;
  35.     /**
  36.      * @internal
  37.      */
  38.     public function __construct(
  39.         EntityRepositoryInterface $scheduledTaskRepository,
  40.         MessageBusInterface $bus,
  41.         ParameterBagInterface $parameterBag
  42.     ) {
  43.         $this->scheduledTaskRepository $scheduledTaskRepository;
  44.         $this->bus $bus;
  45.         $this->parameterBag $parameterBag;
  46.     }
  47.     public function queueScheduledTasks(): void
  48.     {
  49.         $criteria $this->buildCriteriaForAllScheduledTask();
  50.         $context Context::createDefaultContext();
  51.         $tasks $this->scheduledTaskRepository->search($criteria$context)->getEntities();
  52.         if (\count($tasks) === 0) {
  53.             return;
  54.         }
  55.         // Tasks **must not** be queued before their state in the database has been updated. Otherwise,
  56.         // a worker could have already fetched the task and set its state to running before it gets set to
  57.         // queued, thus breaking the task.
  58.         /** @var ScheduledTaskEntity $task */
  59.         foreach ($tasks as $task) {
  60.             $this->queueTask($task$context);
  61.         }
  62.     }
  63.     public function getNextExecutionTime(): ?\DateTimeInterface
  64.     {
  65.         $criteria $this->buildCriteriaForNextScheduledTask();
  66.         /** @var AggregationResult $aggregation */
  67.         $aggregation $this->scheduledTaskRepository
  68.             ->aggregate($criteriaContext::createDefaultContext())
  69.             ->get('nextExecutionTime');
  70.         /** @var MinResult $aggregation */
  71.         if (!$aggregation instanceof MinResult) {
  72.             return null;
  73.         }
  74.         if ($aggregation->getMin() === null) {
  75.             return null;
  76.         }
  77.         return new \DateTime((string) $aggregation->getMin());
  78.     }
  79.     public function getMinRunInterval(): ?int
  80.     {
  81.         $criteria $this->buildCriteriaForMinRunInterval();
  82.         $aggregation $this->scheduledTaskRepository
  83.             ->aggregate($criteriaContext::createDefaultContext())
  84.             ->get('runInterval');
  85.         /** @var MinResult $aggregation */
  86.         if (!$aggregation instanceof MinResult) {
  87.             return null;
  88.         }
  89.         if ($aggregation->getMin() === null) {
  90.             return null;
  91.         }
  92.         return (int) $aggregation->getMin();
  93.     }
  94.     private function buildCriteriaForAllScheduledTask(): Criteria
  95.     {
  96.         $criteria = new Criteria();
  97.         $criteria->addFilter(
  98.             new RangeFilter(
  99.                 'nextExecutionTime',
  100.                 [
  101.                     RangeFilter::LT => (new \DateTime())->format(Defaults::STORAGE_DATE_TIME_FORMAT),
  102.                 ]
  103.             ),
  104.             new EqualsAnyFilter('status', [
  105.                 ScheduledTaskDefinition::STATUS_SCHEDULED,
  106.                 ScheduledTaskDefinition::STATUS_SKIPPED,
  107.             ])
  108.         );
  109.         return $criteria;
  110.     }
  111.     private function queueTask(ScheduledTaskEntity $taskEntityContext $context): void
  112.     {
  113.         $taskClass $taskEntity->getScheduledTaskClass();
  114.         if (!\is_a($taskClassScheduledTask::class, true)) {
  115.             throw new \RuntimeException(sprintf(
  116.                 'Tried to schedule "%s", but class does not extend ScheduledTask',
  117.                 $taskClass
  118.             ));
  119.         }
  120.         if (!$taskClass::shouldRun($this->parameterBag)) {
  121.             $this->scheduledTaskRepository->update([
  122.                 [
  123.                     'id' => $taskEntity->getId(),
  124.                     'nextExecutionTime' => $this->calculateNextExecutionTime($taskEntity),
  125.                     'status' => ScheduledTaskDefinition::STATUS_SKIPPED,
  126.                 ],
  127.             ], $context);
  128.             return;
  129.         }
  130.         $this->scheduledTaskRepository->update([
  131.             [
  132.                 'id' => $taskEntity->getId(),
  133.                 'status' => ScheduledTaskDefinition::STATUS_QUEUED,
  134.             ],
  135.         ], $context);
  136.         $task = new $taskClass();
  137.         $task->setTaskId($taskEntity->getId());
  138.         $this->bus->dispatch($task);
  139.     }
  140.     private function buildCriteriaForNextScheduledTask(): Criteria
  141.     {
  142.         $criteria = new Criteria();
  143.         $criteria->addFilter(
  144.             new EqualsAnyFilter('status', [
  145.                 ScheduledTaskDefinition::STATUS_SCHEDULED,
  146.                 ScheduledTaskDefinition::STATUS_SKIPPED,
  147.             ])
  148.         )
  149.         ->addAggregation(new MinAggregation('nextExecutionTime''nextExecutionTime'));
  150.         return $criteria;
  151.     }
  152.     private function buildCriteriaForMinRunInterval(): Criteria
  153.     {
  154.         $criteria = new Criteria();
  155.         $criteria->addFilter(
  156.             new NotFilter(NotFilter::CONNECTION_AND, [
  157.                 new EqualsFilter('status'ScheduledTaskDefinition::STATUS_INACTIVE),
  158.             ])
  159.         )
  160.         ->addAggregation(new MinAggregation('runInterval''runInterval'));
  161.         return $criteria;
  162.     }
  163.     private function calculateNextExecutionTime(ScheduledTaskEntity $taskEntity): \DateTimeImmutable
  164.     {
  165.         $now = new \DateTimeImmutable();
  166.         $nextExecutionTimeString $taskEntity->getNextExecutionTime()->format(Defaults::STORAGE_DATE_TIME_FORMAT);
  167.         $nextExecutionTime = new \DateTimeImmutable($nextExecutionTimeString);
  168.         $newNextExecutionTime $nextExecutionTime->modify(sprintf('+%d seconds'$taskEntity->getRunInterval()));
  169.         return $newNextExecutionTime $now $now $newNextExecutionTime;
  170.     }
  171. }