vendor/shopware/core/Framework/MessageQueue/Middleware/RetryMiddleware.php line 49

Open in your IDE?
  1. <?php declare(strict_types=1);
  2. namespace Shopware\Core\Framework\MessageQueue\Middleware;
  3. use Shopware\Core\Framework\Context;
  4. use Shopware\Core\Framework\DataAbstractionLayer\EntityRepositoryInterface;
  5. use Shopware\Core\Framework\DataAbstractionLayer\Search\Criteria;
  6. use Shopware\Core\Framework\Log\Package;
  7. use Shopware\Core\Framework\MessageQueue\DeadMessage\DeadMessageEntity;
  8. use Shopware\Core\Framework\MessageQueue\Exception\MessageFailedException;
  9. use Shopware\Core\Framework\MessageQueue\Message\RetryMessage;
  10. use Shopware\Core\Framework\MessageQueue\ScheduledTask\ScheduledTask;
  11. use Shopware\Core\Framework\MessageQueue\Stamp\DecryptedStamp;
  12. use Shopware\Core\Framework\Uuid\Uuid;
  13. use Shopware\Core\Framework\Webhook\Event\RetryWebhookMessageFailedEvent;
  14. use Shopware\Core\Framework\Webhook\Message\WebhookEventMessage;
  15. use Symfony\Component\EventDispatcher\EventDispatcherInterface;
  16. use Symfony\Component\Messenger\Envelope;
  17. use Symfony\Component\Messenger\Exception\HandlerFailedException;
  18. use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
  19. use Symfony\Component\Messenger\Middleware\StackInterface;
  20. /**
  21.  * @deprecated tag:v6.5.0 - reason:remove-decorator - will be removed, as we use default symfony retry mechanism
  22.  */
  23. #[Package('core')]
  24. class RetryMiddleware implements MiddlewareInterface
  25. {
  26.     private EntityRepositoryInterface $deadMessageRepository;
  27.     private Context $context;
  28.     private EventDispatcherInterface $eventDispatcher;
  29.     /**
  30.      * @internal
  31.      */
  32.     public function __construct(EntityRepositoryInterface $deadMessageRepositoryEventDispatcherInterface $eventDispatcher)
  33.     {
  34.         $this->deadMessageRepository $deadMessageRepository;
  35.         $this->context Context::createDefaultContext();
  36.         $this->eventDispatcher $eventDispatcher;
  37.     }
  38.     public function handle(Envelope $envelopeStackInterface $stack): Envelope
  39.     {
  40.         try {
  41.             return $stack->next()->handle($envelope$stack);
  42.         } catch (HandlerFailedException $e) {
  43.             $deadMessage $this->getExistingDeadMessage($envelope);
  44.             $unhandledExceptions = [];
  45.             foreach ($e->getNestedExceptions() as $nestedException) {
  46.                 if (!($nestedException instanceof MessageFailedException)) {
  47.                     $unhandledExceptions[] = $nestedException;
  48.                     continue;
  49.                 }
  50.                 if ($deadMessage) {
  51.                     $this->handleExistingDeadMessage($deadMessage$nestedException);
  52.                     $this->handleRetryWebhookMessageFailed($deadMessage);
  53.                 } else {
  54.                     $this->createDeadMessageFromEnvelope($envelope$nestedException);
  55.                 }
  56.             }
  57.             if (\count($unhandledExceptions) > 0) {
  58.                 throw new HandlerFailedException($envelope$unhandledExceptions);
  59.             }
  60.         }
  61.         return $envelope;
  62.     }
  63.     private function createDeadMessageFromEnvelope(Envelope $envelopeMessageFailedException $e): void
  64.     {
  65.         $this->context->scope(Context::SYSTEM_SCOPE, function () use ($envelope$e): void {
  66.             $encrypted \count($envelope->all(DecryptedStamp::class)) > 0;
  67.             $scheduledTaskId null;
  68.             if ($envelope->getMessage() instanceof ScheduledTask) {
  69.                 $scheduledTaskId $envelope->getMessage()->getTaskId();
  70.             }
  71.             $id Uuid::randomHex();
  72.             $params = [
  73.                 'id' => $id,
  74.                 'originalMessageClass' => \get_class($envelope->getMessage()),
  75.                 'serializedOriginalMessage' => serialize($envelope->getMessage()),
  76.                 'handlerClass' => $e->getHandlerClass(),
  77.                 'encrypted' => $encrypted,
  78.                 'nextExecutionTime' => DeadMessageEntity::calculateNextExecutionTime(1),
  79.                 'exception' => \get_class($e->getException()),
  80.                 'exceptionMessage' => $e->getException()->getMessage(),
  81.                 'exceptionFile' => $e->getException()->getFile(),
  82.                 'exceptionLine' => $e->getException()->getLine(),
  83.                 'scheduledTaskId' => $scheduledTaskId,
  84.             ];
  85.             try {
  86.                 $this->deadMessageRepository->create([$params], $this->context);
  87.             } catch (\Throwable $e) {
  88.                 $params['exceptionMessage'] = ' ';
  89.                 $this->deadMessageRepository->create([$params], $this->context);
  90.             }
  91.         });
  92.     }
  93.     private function handleExistingDeadMessage(DeadMessageEntity $deadMessageMessageFailedException $e): void
  94.     {
  95.         if ($this->isExceptionEqual($deadMessage$e->getException())) {
  96.             $this->incrementErrorCount($deadMessage);
  97.             return;
  98.         }
  99.         $this->deadMessageRepository->delete([
  100.             [
  101.                 'id' => $deadMessage->getId(),
  102.             ],
  103.         ], $this->context);
  104.         $this->createDeadMessageFromExistingMessage($deadMessage$e);
  105.     }
  106.     private function isExceptionEqual(DeadMessageEntity $deadMessage\Throwable $e): bool
  107.     {
  108.         return $deadMessage->getException() === \get_class($e)
  109.             && $deadMessage->getExceptionMessage() === $e->getMessage()
  110.             && $deadMessage->getExceptionFile() === $e->getFile()
  111.             && $deadMessage->getExceptionLine() === $e->getLine();
  112.     }
  113.     private function incrementErrorCount(DeadMessageEntity $deadMessage): void
  114.     {
  115.         $this->context->scope(Context::SYSTEM_SCOPE, function () use ($deadMessage): void {
  116.             $this->deadMessageRepository->update([
  117.                 [
  118.                     'id' => $deadMessage->getId(),
  119.                     'errorCount' => $deadMessage->getErrorCount() + 1,
  120.                     'nextExecutionTime' => DeadMessageEntity::calculateNextExecutionTime($deadMessage->getErrorCount() + 1),
  121.                 ],
  122.             ], $this->context);
  123.         });
  124.     }
  125.     private function createDeadMessageFromExistingMessage(DeadMessageEntity $messageMessageFailedException $e): void
  126.     {
  127.         $this->context->scope(Context::SYSTEM_SCOPE, function () use ($message$e): void {
  128.             $id Uuid::randomHex();
  129.             $this->deadMessageRepository->create([
  130.                 [
  131.                     'id' => $id,
  132.                     'originalMessageClass' => $message->getOriginalMessageClass(),
  133.                     'serializedOriginalMessage' => serialize($message->getOriginalMessage()),
  134.                     'handlerClass' => $e->getHandlerClass(),
  135.                     'encrypted' => $message->isEncrypted(),
  136.                     'nextExecutionTime' => DeadMessageEntity::calculateNextExecutionTime(1),
  137.                     'exception' => \get_class($e->getException()),
  138.                     'exceptionMessage' => $e->getException()->getMessage(),
  139.                     'exceptionFile' => $e->getException()->getFile(),
  140.                     'exceptionLine' => $e->getException()->getLine(),
  141.                 ],
  142.             ], $this->context);
  143.         });
  144.     }
  145.     private function getExistingDeadMessage(Envelope $envelope): ?DeadMessageEntity
  146.     {
  147.         if (!($envelope->getMessage() instanceof RetryMessage)) {
  148.             return null;
  149.         }
  150.         /** @var DeadMessageEntity|null $deadMessage */
  151.         $deadMessage $this->deadMessageRepository
  152.             ->search(new Criteria([$envelope->getMessage()->getDeadMessageId()]), $this->context)
  153.             ->get($envelope->getMessage()->getDeadMessageId());
  154.         return $deadMessage;
  155.     }
  156.     private function handleRetryWebhookMessageFailed(DeadMessageEntity $deadMessage): void
  157.     {
  158.         if (!($deadMessage->getOriginalMessage() instanceof WebhookEventMessage)) {
  159.             return;
  160.         }
  161.         $this->eventDispatcher->dispatch(
  162.             new RetryWebhookMessageFailedEvent($deadMessage$this->context)
  163.         );
  164.     }
  165. }