Magento 2 Documentation  2.3
Documentation for Magento 2 CMS v2.3 (December 2018)
PublisherPool.php
Go to the documentation of this file.
1 <?php
8 
10 use Magento\Framework\Communication\ConfigInterface as CommunicationConfig;
12 
20 {
21  const MODE_SYNC = 'sync';
22  const MODE_ASYNC = 'async';
23 
27  const TYPE = 'type';
28 
32  const CONNECTION_NAME = 'connectionName';
33 
40  protected $publishers = [];
41 
49 
53  private $publisherConfig;
54 
58  private $connectionTypeResolver;
59 
70  public function __construct(
71  CommunicationConfig $communicationConfig,
72  QueueConfig $queueConfig,
73  array $publishers
74  ) {
75  $this->communicationConfig = $communicationConfig;
76  $this->initializePublishers($publishers);
77  }
78 
83  public function publish($topicName, $data)
84  {
85  $publisherType = $this->communicationConfig->getTopic($topicName)[CommunicationConfig::TOPIC_IS_SYNCHRONOUS]
88  $connectionName = $this->getPublisherConfig()->getPublisher($topicName)->getConnection()->getName();
89  $publisher = $this->getPublisherForConnectionNameAndType($publisherType, $connectionName);
90  return $publisher->publish($topicName, $data);
91  }
92 
99  private function initializePublishers(array $publishers)
100  {
101  $asyncPublishers = isset($publishers[self::MODE_ASYNC]) ? $publishers[self::MODE_ASYNC] : [];
102  $syncPublishers = isset($publishers[self::MODE_SYNC]) ? $publishers[self::MODE_SYNC] : [];
103  foreach ($asyncPublishers as $connectionType => $publisher) {
104  $this->addPublisherToPool(
105  self::MODE_ASYNC,
106  $connectionType,
107  $publisher
108  );
109  }
110  foreach ($syncPublishers as $connectionType => $publisher) {
111  $this->addPublisherToPool(
112  self::MODE_SYNC,
113  $connectionType,
114  $publisher
115  );
116  }
117  }
118 
127  private function addPublisherToPool($type, $connectionType, PublisherInterface $publisher)
128  {
129  $this->publishers[$type][$connectionType] = $publisher;
130  return $this;
131  }
132 
142  private function getPublisherForConnectionNameAndType($type, $connectionName)
143  {
144  $connectionType = $this->getConnectionTypeResolver()->getConnectionType($connectionName);
145  if (!isset($this->publishers[$type])) {
146  throw new \InvalidArgumentException('Unknown publisher type ' . $type);
147  }
148 
149  if (!isset($this->publishers[$type][$connectionType])) {
150  throw new \LogicException(
151  sprintf(
152  'Could not find an implementation type for type "%s" and connection "%s".',
153  $type,
154  $connectionName
155  )
156  );
157  }
158  return $this->publishers[$type][$connectionType];
159  }
160 
168  private function getPublisherConfig()
169  {
170  if ($this->publisherConfig === null) {
171  $this->publisherConfig = \Magento\Framework\App\ObjectManager::getInstance()->get(PublisherConfig::class);
172  }
173  return $this->publisherConfig;
174  }
175 
183  private function getConnectionTypeResolver()
184  {
185  if ($this->connectionTypeResolver === null) {
186  $this->connectionTypeResolver = \Magento\Framework\App\ObjectManager::getInstance()
187  ->get(ConnectionTypeResolver::class);
188  }
189  return $this->connectionTypeResolver;
190  }
191 }
$type
Definition: item.phtml:13
__construct(CommunicationConfig $communicationConfig, QueueConfig $queueConfig, array $publishers)