Magento 2 Documentation  2.3
Documentation for Magento 2 CMS v2.3 (December 2018)
MessageProcessor.php
Go to the documentation of this file.
1 <?php
7 
9 
14 {
18  private $messageStatusProcessor;
19 
23  private $resource;
24 
29  public function __construct(
30  MessageStatusProcessor $messageStatusProcessor,
31  ResourceConnection $resource
32  ) {
33  $this->messageStatusProcessor = $messageStatusProcessor;
34  $this->resource = $resource;
35  }
36 
40  public function process(
43  array $messages,
44  array $messagesToAcknowledge,
45  array $mergedMessages
46  ) {
47  try {
48  $this->resource->getConnection()->beginTransaction();
49  $this->messageStatusProcessor->acknowledgeMessages($queue, $messagesToAcknowledge);
50  $this->dispatchMessages($configuration, $mergedMessages);
51  $this->resource->getConnection()->commit();
52  $this->messageStatusProcessor->acknowledgeMessages($queue, $messages);
53  } catch (ConnectionLostException $e) {
54  $this->resource->getConnection()->rollBack();
55  } catch (\Exception $e) {
56  $this->resource->getConnection()->rollBack();
57  $this->messageStatusProcessor->rejectMessages($queue, $messages);
58  }
59  }
60 
67  private function dispatchMessages(ConsumerConfigurationInterface $configuration, array $messageList)
68  {
69  foreach ($messageList as $topicName => $messages) {
70  foreach ($messages as $message) {
71  $callbacks = $configuration->getHandlers($topicName);
72  foreach ($callbacks as $callback) {
73  call_user_func($callback, $message);
74  }
75  }
76  }
77  }
78 }
$queue
Definition: queue.php:21
__construct(MessageStatusProcessor $messageStatusProcessor, ResourceConnection $resource)
$configuration
Definition: index.php:33
$resource
Definition: bulk.php:12
$message
process(QueueInterface $queue, ConsumerConfigurationInterface $configuration, array $messages, array $messagesToAcknowledge, array $mergedMessages)