53 private $publisherConfig;
58 private $connectionTypeResolver;
72 QueueConfig $queueConfig,
85 $publisherType = $this->communicationConfig->getTopic($topicName)[CommunicationConfig::TOPIC_IS_SYNCHRONOUS]
88 $connectionName = $this->getPublisherConfig()->getPublisher($topicName)->getConnection()->getName();
89 $publisher = $this->getPublisherForConnectionNameAndType($publisherType, $connectionName);
90 return $publisher->publish($topicName,
$data);
99 private function initializePublishers(array
$publishers)
103 foreach ($asyncPublishers as $connectionType => $publisher) {
104 $this->addPublisherToPool(
110 foreach ($syncPublishers as $connectionType => $publisher) {
111 $this->addPublisherToPool(
127 private function addPublisherToPool(
$type, $connectionType, PublisherInterface $publisher)
129 $this->publishers[
$type][$connectionType] = $publisher;
142 private function getPublisherForConnectionNameAndType(
$type, $connectionName)
144 $connectionType = $this->getConnectionTypeResolver()->getConnectionType($connectionName);
145 if (!isset($this->publishers[
$type])) {
146 throw new \InvalidArgumentException(
'Unknown publisher type ' .
$type);
149 if (!isset($this->publishers[
$type][$connectionType])) {
150 throw new \LogicException(
152 'Could not find an implementation type for type "%s" and connection "%s".',
158 return $this->publishers[
$type][$connectionType];
168 private function getPublisherConfig()
170 if ($this->publisherConfig ===
null) {
173 return $this->publisherConfig;
183 private function getConnectionTypeResolver()
185 if ($this->connectionTypeResolver ===
null) {
187 ->get(ConnectionTypeResolver::class);
189 return $this->connectionTypeResolver;
publish($topicName, $data)
__construct(CommunicationConfig $communicationConfig, QueueConfig $queueConfig, array $publishers)