vendor/sroze/messenger-enqueue-transport/QueueInteropTransport.php line 117

Open in your IDE?
  1. <?php
  2. /*
  3.  * This file is part of the Symfony package.
  4.  *
  5.  * (c) Fabien Potencier <fabien@symfony.com>
  6.  *
  7.  * For the full copyright and license information, please view the LICENSE
  8.  * file that was distributed with this source code.
  9.  */
  10. namespace Enqueue\MessengerAdapter;
  11. use Enqueue\AmqpTools\DelayStrategyAware;
  12. use Enqueue\AmqpTools\RabbitMqDelayPluginDelayStrategy;
  13. use Enqueue\AmqpTools\RabbitMqDlxDelayStrategy;
  14. use Enqueue\MessengerAdapter\EnvelopeItem\InteropMessageStamp;
  15. use Enqueue\MessengerAdapter\EnvelopeItem\TransportConfiguration;
  16. use Enqueue\MessengerAdapter\Exception\MissingMessageMetadataSetterException;
  17. use Enqueue\MessengerAdapter\Exception\SendingMessageFailedException;
  18. use Interop\Queue\Consumer;
  19. use Interop\Queue\Exception as InteropQueueException;
  20. use Interop\Queue\Message;
  21. use Symfony\Component\Messenger\Envelope;
  22. use Symfony\Component\Messenger\Exception\LogicException;
  23. use Symfony\Component\Messenger\Stamp\DelayStamp;
  24. use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
  25. use Symfony\Component\Messenger\Transport\TransportInterface;
  26. use Symfony\Component\OptionsResolver\Options;
  27. use Symfony\Component\OptionsResolver\OptionsResolver;
  28. /**
  29.  * Symfony Messenger transport.
  30.  *
  31.  * @author Samuel Roze <samuel.roze@gmail.com>
  32.  * @author Max Kotliar <kotlyar.maksim@gmail.com>
  33.  */
  34. class QueueInteropTransport implements TransportInterface
  35. {
  36.     private $serializer;
  37.     private $contextManager;
  38.     private $options;
  39.     private $debug;
  40.     public function __construct(
  41.         SerializerInterface $serializer,
  42.         ContextManager $contextManager,
  43.         array $options = array(),
  44.         $debug false
  45.     ) {
  46.         $this->serializer $serializer;
  47.         $this->contextManager $contextManager;
  48.         $this->debug $debug;
  49.         $resolver = new OptionsResolver();
  50.         $this->configureOptions($resolver);
  51.         $this->options $resolver->resolve($options);
  52.     }
  53.     /**
  54.      * {@inheritdoc}
  55.      */
  56.     public function get(): iterable
  57.     {
  58.         $destination $this->getDestination(null);
  59.         if ($this->debug) {
  60.             $this->contextManager->ensureExists($destination);
  61.         }
  62.         try {
  63.             if (null === ($interopMessage $this->getConsumer()->receive($this->options['receiveTimeout'] ?? 30000))) {
  64.                 return array();
  65.             }
  66.         } catch (\Exception $e) {
  67.             if ($this->contextManager->recoverException($e$destination)) {
  68.                 return array();
  69.             }
  70.             throw $e;
  71.         }
  72.         $envelope $this->serializer->decode(array(
  73.             'body' => $interopMessage->getBody(),
  74.             'headers' => $interopMessage->getHeaders(),
  75.             'properties' => $interopMessage->getProperties(),
  76.         ));
  77.         $envelope $envelope->with(new InteropMessageStamp($interopMessage));
  78.         return array($envelope);
  79.     }
  80.     /**
  81.      * {@inheritdoc}
  82.      */
  83.     public function ack(Envelope $envelope): void
  84.     {
  85.         $interopMessage $this->findMessage($envelope);
  86.         $this->getConsumer()->acknowledge($interopMessage);
  87.     }
  88.     /**
  89.      * {@inheritdoc}
  90.      */
  91.     public function reject(Envelope $envelope): void
  92.     {
  93.         $interopMessage $this->findMessage($envelope);
  94.         $this->getConsumer()->reject($interopMessage);
  95.     }
  96.     /**
  97.      * {@inheritdoc}
  98.      */
  99.     public function send(Envelope $envelope): Envelope
  100.     {
  101.         $context $this->contextManager->context();
  102.         $destination $this->getDestination($envelope);
  103.         $topic $context->createTopic($destination['topic']);
  104.         if ($this->debug) {
  105.             $this->contextManager->ensureExists($destination);
  106.         }
  107.         $interopMessage $this->encodeMessage($envelope);
  108.         $this->setMessageMetadata($interopMessage$envelope);
  109.         $producer $context->createProducer();
  110.         $delay 0;
  111.         $delayStamp $envelope->last(DelayStamp::class);
  112.         if (null !== $delayStamp) {
  113.             $delay $delayStamp->getDelay();
  114.         } elseif (isset($this->options['deliveryDelay'])) {
  115.             $delay $this->options['deliveryDelay'];
  116.         }
  117.         if ($delay 0) {
  118.             if ($producer instanceof DelayStrategyAware) {
  119.                 $producer->setDelayStrategy($this->options['delayStrategy']);
  120.             }
  121.             $producer->setDeliveryDelay($delay);
  122.         }
  123.         if (isset($this->options['priority'])) {
  124.             $producer->setPriority($this->options['priority']);
  125.         }
  126.         if (isset($this->options['timeToLive'])) {
  127.             $producer->setTimeToLive($this->options['timeToLive']);
  128.         }
  129.         try {
  130.             $producer->send($topic$interopMessage);
  131.         } catch (InteropQueueException $e) {
  132.             if (!$this->contextManager->recoverException($e$destination)) {
  133.                 throw new SendingMessageFailedException($e->getMessage(), null$e);
  134.             }
  135.             // The context manager recovered the exception, we re-try.
  136.             $envelope $this->send($envelope);
  137.         }
  138.         return $envelope;
  139.     }
  140.     public function configureOptions(OptionsResolver $resolver): void
  141.     {
  142.         $resolver->setDefaults(array(
  143.             'transport_name' => null,
  144.             'receiveTimeout' => null,
  145.             'deliveryDelay' => null,
  146.             'delayStrategy' => RabbitMqDelayPluginDelayStrategy::class,
  147.             'priority' => null,
  148.             'timeToLive' => null,
  149.             'topic' => array('name' => 'messages'),
  150.             'queue' => array('name' => 'messages'),
  151.         ));
  152.         $resolver->setAllowedTypes('transport_name', array('null''string'));
  153.         $resolver->setAllowedTypes('receiveTimeout', array('null''int'));
  154.         $resolver->setAllowedTypes('deliveryDelay', array('null''int'));
  155.         $resolver->setAllowedTypes('priority', array('null''int'));
  156.         $resolver->setAllowedTypes('timeToLive', array('null''int'));
  157.         $resolver->setAllowedTypes('delayStrategy', array('null''string'));
  158.         $resolver->setAllowedValues('delayStrategy', array(
  159.                 null,
  160.                 RabbitMqDelayPluginDelayStrategy::class,
  161.                 RabbitMqDlxDelayStrategy::class,
  162.             )
  163.         );
  164.         $resolver->setNormalizer('delayStrategy', function (Options $options$value) {
  165.             return null !== $value ? new $value() : null;
  166.         });
  167.     }
  168.     private function getDestination(?Envelope $envelope): array
  169.     {
  170.         $configuration $envelope $envelope->last(TransportConfiguration::class) : null;
  171.         $topic null !== $configuration $configuration->getTopic() : null;
  172.         return array(
  173.             'topic' => $topic ?? $this->options['topic']['name'],
  174.             'topicOptions' => $this->options['topic'],
  175.             'queue' => $this->options['queue']['name'],
  176.             'queueOptions' => $this->options['queue'],
  177.         );
  178.     }
  179.     private function setMessageMetadata(Message $interopMessageEnvelope $envelope): void
  180.     {
  181.         $configuration $envelope->last(TransportConfiguration::class);
  182.         if (null === $configuration) {
  183.             return;
  184.         }
  185.         $metadata $configuration->getMetadata();
  186.         $class = new \ReflectionClass($interopMessage);
  187.         foreach ($metadata as $key => $value) {
  188.             $setter sprintf('set%s'ucfirst($key));
  189.             if (!$class->hasMethod($setter)) {
  190.                 throw new MissingMessageMetadataSetterException($key$setter$class->getName());
  191.             }
  192.             $interopMessage->{$setter}($value);
  193.         }
  194.     }
  195.     private function encodeMessage(Envelope $envelope): Message
  196.     {
  197.         $context $this->contextManager->context();
  198.         $encodedMessage $this->serializer->encode($envelope);
  199.         $interopMessage $context->createMessage(
  200.             $encodedMessage['body'],
  201.             $encodedMessage['properties'] ?? array(),
  202.             $encodedMessage['headers'] ?? array()
  203.         );
  204.         return $interopMessage;
  205.     }
  206.     private function findMessage(Envelope $envelope): Message
  207.     {
  208.         /** @var InteropMessageStamp $interopStamp */
  209.         $interopStamp $envelope->last(InteropMessageStamp::class);
  210.         if (null === $interopStamp) {
  211.             throw new LogicException('No InteropMessageStamp found in the Envelope.');
  212.         }
  213.         return $interopStamp->getMessage();
  214.     }
  215.     private function getConsumer(): Consumer
  216.     {
  217.         $context $this->contextManager->context();
  218.         $destination $this->getDestination(null);
  219.         $queue $context->createQueue($destination['queue']);
  220.         return $context->createConsumer($queue);
  221.     }
  222. }