vendor/shopware/core/Framework/MessageQueue/Monitoring/MonitoringBusDecorator.php line 43

Open in your IDE?
  1. <?php declare(strict_types=1);
  2. namespace Shopware\Core\Framework\MessageQueue\Monitoring;
  3. use Shopware\Core\Framework\Increment\Exception\IncrementGatewayNotFoundException;
  4. use Shopware\Core\Framework\Increment\IncrementGatewayRegistry;
  5. use Shopware\Core\Framework\Log\Package;
  6. use Symfony\Component\Messenger\Envelope;
  7. use Symfony\Component\Messenger\MessageBusInterface;
  8. use Symfony\Component\Messenger\Stamp\ReceivedStamp;
  9. use Symfony\Component\Messenger\Stamp\SentStamp;
  10. /**
  11.  * @deprecated tag:v6.5.0 - reason:remove-decorator - will be removed, as we use default symfony messenger
  12.  */
  13. #[Package('core')]
  14. class MonitoringBusDecorator implements MessageBusInterface
  15. {
  16.     private MessageBusInterface $innerBus;
  17.     private string $defaultTransportName;
  18.     private IncrementGatewayRegistry $gatewayRegistry;
  19.     /**
  20.      * @internal
  21.      */
  22.     public function __construct(
  23.         MessageBusInterface $inner,
  24.         string $defaultTransportName,
  25.         IncrementGatewayRegistry $gatewayRegistry
  26.     ) {
  27.         $this->innerBus $inner;
  28.         $this->defaultTransportName $defaultTransportName;
  29.         $this->gatewayRegistry $gatewayRegistry;
  30.     }
  31.     /**
  32.      * Dispatches the given message to the inner Bus and Logs it.
  33.      *
  34.      * @param object|Envelope $message
  35.      */
  36.     public function dispatch($message, array $stamps = []): Envelope
  37.     {
  38.         $message $this->innerBus->dispatch(Envelope::wrap($message$stamps), $stamps);
  39.         if ($this->wasSentToDefaultTransport($message)) {
  40.             $this->incrementMessageQueueSize($message);
  41.         }
  42.         if ($this->wasReceivedByDefaultTransport($message)) {
  43.             $this->decrementMessageQueueSize($message);
  44.         }
  45.         return $message;
  46.     }
  47.     private function incrementMessageQueueSize(Envelope $message): void
  48.     {
  49.         try {
  50.             $gateway $this->gatewayRegistry->get(IncrementGatewayRegistry::MESSAGE_QUEUE_POOL);
  51.         } catch (IncrementGatewayNotFoundException $exception) {
  52.             // In case message_queue pool is disabled
  53.             return;
  54.         }
  55.         $gateway->increment('message_queue_stats'\get_class($message->getMessage()));
  56.     }
  57.     private function decrementMessageQueueSize(Envelope $message): void
  58.     {
  59.         try {
  60.             $gateway $this->gatewayRegistry->get(IncrementGatewayRegistry::MESSAGE_QUEUE_POOL);
  61.         } catch (IncrementGatewayNotFoundException $exception) {
  62.             // In case message_queue pool is disabled
  63.             return;
  64.         }
  65.         $gateway->decrement('message_queue_stats'\get_class($message->getMessage()));
  66.     }
  67.     private function wasSentToDefaultTransport(Envelope $message): bool
  68.     {
  69.         foreach ($message->all(SentStamp::class) as $stamp) {
  70.             if ($stamp instanceof SentStamp && $stamp->getSenderAlias() === $this->defaultTransportName) {
  71.                 return true;
  72.             }
  73.         }
  74.         return false;
  75.     }
  76.     private function wasReceivedByDefaultTransport(Envelope $message): bool
  77.     {
  78.         foreach ($message->all(ReceivedStamp::class) as $stamp) {
  79.             if ($stamp instanceof ReceivedStamp && $stamp->getTransportName() === $this->defaultTransportName) {
  80.                 return true;
  81.             }
  82.         }
  83.         return false;
  84.     }
  85. }