9 use PhpAmqpLib\Message\AMQPMessage;
26 private $communicationConfig;
31 private $publisherConfig;
48 PublisherConfig $publisherConfig,
49 CommunicationConfigInterface $communicationConfig,
52 $this->amqpConfig = $amqpConfig;
53 $this->communicationConfig = $communicationConfig;
54 $this->publisherConfig = $publisherConfig;
55 $this->exchange = $exchange;
61 public function enqueue($topic, array $envelopes)
63 $topicData = $this->communicationConfig->getTopic($topic);
64 $isSync = $topicData[CommunicationConfigInterface::TOPIC_IS_SYNCHRONOUS];
68 foreach ($envelopes as $envelope) {
69 $responses[] = $this->exchange->enqueue($topic, $envelope);
74 $channel = $this->amqpConfig->getChannel();
75 $publisher = $this->publisherConfig->getPublisher($topic);
76 $exchange = $publisher->getConnection()->getExchange();
78 foreach ($envelopes as $envelope) {
79 $msg =
new AMQPMessage($envelope->getBody(), $envelope->getProperties());
80 $channel->batch_basic_publish($msg, $exchange, $topic);
82 $channel->publish_batch();
enqueue($topic, array $envelopes)
__construct(\Magento\Framework\Amqp\Config $amqpConfig, PublisherConfig $publisherConfig, CommunicationConfigInterface $communicationConfig, \Magento\Framework\Amqp\Exchange $exchange)