vendor/shopware/core/Framework/MessageQueue/ScheduledTask/Scheduler/TaskScheduler.php line 54

Open in your IDE?
  1. <?php declare(strict_types=1);
  2. namespace Shopware\Core\Framework\MessageQueue\ScheduledTask\Scheduler;
  3. use Shopware\Core\Defaults;
  4. use Shopware\Core\Framework\Context;
  5. use Shopware\Core\Framework\DataAbstractionLayer\EntityRepositoryInterface;
  6. use Shopware\Core\Framework\DataAbstractionLayer\Search\Aggregation\Metric\MinAggregation;
  7. use Shopware\Core\Framework\DataAbstractionLayer\Search\AggregationResult\AggregationResult;
  8. use Shopware\Core\Framework\DataAbstractionLayer\Search\AggregationResult\Metric\MinResult;
  9. use Shopware\Core\Framework\DataAbstractionLayer\Search\Criteria;
  10. use Shopware\Core\Framework\DataAbstractionLayer\Search\Filter\EqualsAnyFilter;
  11. use Shopware\Core\Framework\DataAbstractionLayer\Search\Filter\EqualsFilter;
  12. use Shopware\Core\Framework\DataAbstractionLayer\Search\Filter\NotFilter;
  13. use Shopware\Core\Framework\DataAbstractionLayer\Search\Filter\RangeFilter;
  14. use Shopware\Core\Framework\MessageQueue\ScheduledTask\ScheduledTask;
  15. use Shopware\Core\Framework\MessageQueue\ScheduledTask\ScheduledTaskDefinition;
  16. use Shopware\Core\Framework\MessageQueue\ScheduledTask\ScheduledTaskEntity;
  17. use Symfony\Component\DependencyInjection\ParameterBag\ParameterBagInterface;
  18. use Symfony\Component\Messenger\MessageBusInterface;
  19. /**
  20.  * @package core
  21.  *
  22.  * @deprecated tag:v6.5.0 - reason:becomes-final - Will be final starting with v6.5.0.0
  23.  */
  24. class TaskScheduler
  25. {
  26.     /**
  27.      * @var EntityRepositoryInterface
  28.      */
  29.     private $scheduledTaskRepository;
  30.     /**
  31.      * @var MessageBusInterface
  32.      */
  33.     private $bus;
  34.     private ParameterBagInterface $parameterBag;
  35.     /**
  36.      * @internal
  37.      */
  38.     public function __construct(
  39.         EntityRepositoryInterface $scheduledTaskRepository,
  40.         MessageBusInterface $bus,
  41.         ParameterBagInterface $parameterBag
  42.     ) {
  43.         $this->scheduledTaskRepository $scheduledTaskRepository;
  44.         $this->bus $bus;
  45.         $this->parameterBag $parameterBag;
  46.     }
  47.     public function queueScheduledTasks(): void
  48.     {
  49.         $criteria $this->buildCriteriaForAllScheduledTask();
  50.         $context Context::createDefaultContext();
  51.         $tasks $this->scheduledTaskRepository->search($criteria$context)->getEntities();
  52.         if (\count($tasks) === 0) {
  53.             return;
  54.         }
  55.         // Tasks **must not** be queued before their state in the database has been updated. Otherwise,
  56.         // a worker could have already fetched the task and set its state to running before it gets set to
  57.         // queued, thus breaking the task.
  58.         /** @var ScheduledTaskEntity $task */
  59.         foreach ($tasks as $task) {
  60.             $this->scheduledTaskRepository->update([
  61.                 [
  62.                     'id' => $task->getId(),
  63.                     'status' => ScheduledTaskDefinition::STATUS_QUEUED,
  64.                 ],
  65.             ], $context);
  66.             $this->queueTask($task);
  67.         }
  68.     }
  69.     public function getNextExecutionTime(): ?\DateTimeInterface
  70.     {
  71.         $criteria $this->buildCriteriaForNextScheduledTask();
  72.         /** @var AggregationResult $aggregation */
  73.         $aggregation $this->scheduledTaskRepository
  74.             ->aggregate($criteriaContext::createDefaultContext())
  75.             ->get('nextExecutionTime');
  76.         /** @var MinResult $aggregation */
  77.         if (!$aggregation instanceof MinResult) {
  78.             return null;
  79.         }
  80.         if ($aggregation->getMin() === null) {
  81.             return null;
  82.         }
  83.         return new \DateTime((string) $aggregation->getMin());
  84.     }
  85.     public function getMinRunInterval(): ?int
  86.     {
  87.         $criteria $this->buildCriteriaForMinRunInterval();
  88.         $aggregation $this->scheduledTaskRepository
  89.             ->aggregate($criteriaContext::createDefaultContext())
  90.             ->get('runInterval');
  91.         /** @var MinResult $aggregation */
  92.         if (!$aggregation instanceof MinResult) {
  93.             return null;
  94.         }
  95.         if ($aggregation->getMin() === null) {
  96.             return null;
  97.         }
  98.         return (int) $aggregation->getMin();
  99.     }
  100.     private function buildCriteriaForAllScheduledTask(): Criteria
  101.     {
  102.         $criteria = new Criteria();
  103.         $criteria->addFilter(
  104.             new RangeFilter(
  105.                 'nextExecutionTime',
  106.                 [
  107.                     RangeFilter::LT => (new \DateTime())->format(Defaults::STORAGE_DATE_TIME_FORMAT),
  108.                 ]
  109.             ),
  110.             new EqualsAnyFilter('status', [
  111.                 ScheduledTaskDefinition::STATUS_SCHEDULED,
  112.                 ScheduledTaskDefinition::STATUS_SKIPPED,
  113.             ])
  114.         );
  115.         return $criteria;
  116.     }
  117.     private function queueTask(ScheduledTaskEntity $taskEntity): void
  118.     {
  119.         $taskClass $taskEntity->getScheduledTaskClass();
  120.         if (!\is_a($taskClassScheduledTask::class, true)) {
  121.             throw new \RuntimeException(sprintf(
  122.                 'Tried to schedule "%s", but class does not extend ScheduledTask',
  123.                 $taskClass
  124.             ));
  125.         }
  126.         if (!$taskClass::shouldRun($this->parameterBag)) {
  127.             return;
  128.         }
  129.         $task = new $taskClass();
  130.         $task->setTaskId($taskEntity->getId());
  131.         $this->bus->dispatch($task);
  132.     }
  133.     private function buildCriteriaForNextScheduledTask(): Criteria
  134.     {
  135.         $criteria = new Criteria();
  136.         $criteria->addFilter(
  137.             new EqualsAnyFilter('status', [
  138.                 ScheduledTaskDefinition::STATUS_SCHEDULED,
  139.                 ScheduledTaskDefinition::STATUS_SKIPPED,
  140.             ])
  141.         )
  142.         ->addAggregation(new MinAggregation('nextExecutionTime''nextExecutionTime'));
  143.         return $criteria;
  144.     }
  145.     private function buildCriteriaForMinRunInterval(): Criteria
  146.     {
  147.         $criteria = new Criteria();
  148.         $criteria->addFilter(
  149.             new NotFilter(NotFilter::CONNECTION_AND, [
  150.                 new EqualsFilter('status'ScheduledTaskDefinition::STATUS_INACTIVE),
  151.             ])
  152.         )
  153.         ->addAggregation(new MinAggregation('runInterval''runInterval'));
  154.         return $criteria;
  155.     }
  156. }