18 private $messageStatusProcessor;
33 $this->messageStatusProcessor = $messageStatusProcessor;
44 array $messagesToAcknowledge,
48 $this->resource->getConnection()->beginTransaction();
49 $this->messageStatusProcessor->acknowledgeMessages(
$queue, $messagesToAcknowledge);
51 $this->resource->getConnection()->commit();
52 $this->messageStatusProcessor->acknowledgeMessages(
$queue, $messages);
54 $this->resource->getConnection()->rollBack();
55 }
catch (\Exception $e) {
56 $this->resource->getConnection()->rollBack();
57 $this->messageStatusProcessor->rejectMessages(
$queue, $messages);
69 foreach ($messageList as $topicName => $messages) {
72 foreach ($callbacks as $callback) {
__construct(MessageStatusProcessor $messageStatusProcessor, ResourceConnection $resource)
call_user_func($callable, $param)
process(QueueInterface $queue, ConsumerConfigurationInterface $configuration, array $messages, array $messagesToAcknowledge, array $mergedMessages)