16 private $messageStatusProcessor;
23 $this->messageStatusProcessor = $messageStatusProcessor;
33 array $messagesToAcknowledge,
36 $this->messageStatusProcessor->acknowledgeMessages(
$queue, $messagesToAcknowledge);
48 private function dispatchMessages(
52 array $originalMessages
54 $originalMessagesIds = [];
57 foreach ($messageList as $topicName => $messages) {
63 $originalMessagesIds =
$message->getOriginalMessagesIds();
65 foreach ($callbacks as $callback) {
69 $originalMessages = $this->getOriginalMessages($originalMessages, $originalMessagesIds);
70 $this->messageStatusProcessor->acknowledgeMessages(
$queue, $originalMessages);
73 }
catch (\Exception $e) {
74 $originalMessages = $this->getOriginalMessages($originalMessages, $originalMessagesIds);
75 $this->messageStatusProcessor->rejectMessages(
$queue, $originalMessages);
86 private function getOriginalMessages(array $messages, array $messagesIds)
88 $originalMessages = [];
90 foreach ($messagesIds as $messageId) {
91 if (isset($messages[$messageId])) {
92 $originalMessages[] = $messages[$messageId];
96 return $originalMessages;
__construct(MessageStatusProcessor $messageStatusProcessor)
call_user_func($callable, $param)
process(QueueInterface $queue, ConsumerConfigurationInterface $configuration, array $messages, array $messagesToAcknowledge, array $mergedMessages)