Magento 2 Documentation  2.3
Documentation for Magento 2 CMS v2.3 (December 2018)
TopicConfig.php
Go to the documentation of this file.
1 <?php
7 
11 use Magento\Framework\Communication\ConfigInterface as CommunicationConfig;
14 
21 {
22  const DEFAULT_TYPE = 'amqp';
23  const DEFAULT_EXCHANGE = 'magento';
24  const DEFAULT_INSTANCE = ConsumerInterface::class;
25 
29  private $xmlValidator;
30 
34  private $methodsMap;
35 
39  private $communicationConfig;
40 
48  public function __construct(
49  MethodsMap $methodsMap,
50  Validator $xmlValidator,
51  CommunicationConfig $communicationConfig
52  ) {
53  $this->methodsMap = $methodsMap;
54  $this->xmlValidator = $xmlValidator;
55  $this->communicationConfig = $communicationConfig;
56  }
57 
61  public function convert($source)
62  {
63  $topics = $this->extractTopics($source);
64  $topics = $this->processWildcard($topics);
65  $publishers = $this->buildPublishers($topics);
66  $binds = $this->buildBinds($topics);
67  $map = $this->buildExchangeTopicToQueue($topics);
68  $consumers = $this->buildConsumers($topics);
69  return [
70  ConfigInterface::TOPICS => $this->buildTopicsConfiguration($topics),
71  ConfigInterface::PUBLISHERS => $publishers,
72  ConfigInterface::BINDS => $binds,
73  ConfigInterface::CONSUMERS => $consumers,
75  ];
76  }
77 
84  private function buildTopicsConfiguration($topics)
85  {
86  $output = [];
87  foreach ($topics as $topicName => $topicConfig) {
88  $topicDefinition = $this->communicationConfig->getTopic($topicName);
89  $schemaType =
90  $topicDefinition['request_type'] == CommunicationConfig::TOPIC_REQUEST_TYPE_CLASS
91  ? QueueConfig::TOPIC_SCHEMA_TYPE_OBJECT
92  : QueueConfig::TOPIC_SCHEMA_TYPE_METHOD;
93  $schemaValue = $topicDefinition[CommunicationConfig::TOPIC_REQUEST];
94  $output[$topicName] = [
95  'name' => $topicName,
96  'schema' => [
97  'schema_type' => $schemaType,
98  'schema_value' => $schemaValue
99  ],
100  'response_schema' => [
101  'schema_type' => isset($topicDefinition['response']) ? QueueConfig::TOPIC_SCHEMA_TYPE_OBJECT : null,
102  'schema_value' => $topicDefinition['response']
103  ],
104  'is_synchronous' => $topicDefinition[CommunicationConfig::TOPIC_IS_SYNCHRONOUS],
105  'publisher' => $topicConfig['type'] . '-' . $topicConfig['exchange']
106  ];
107  }
108  return $output;
109  }
110 
117  private function buildConsumers($topics)
118  {
119  $output = [];
120  foreach ($topics as $topicName => $topicConfig) {
121  $topic = $this->communicationConfig->getTopic($topicName);
122  foreach ($topicConfig['queues'] as $queueName => $queueConfig) {
123  $handlers = [];
124  foreach ($queueConfig['handlers'] as $handler) {
125  if (!isset($handler[QueueConfig::CONSUMER_CLASS])) {
126  $handlerExploded = explode('::', $handler);
127  unset($handler);
128  $handler[QueueConfig::CONSUMER_CLASS] = $handlerExploded[0];
129  $handler[QueueConfig::CONSUMER_METHOD] = $handlerExploded[1];
130  }
131  $handlers[] = $handler;
132  }
133  $queueConfig['handlers'] = $handlers;
134 
135  $output[$queueConfig['consumer']] = [
136  'name' => $queueConfig['consumer'],
137  'queue' => $queueName,
138  'handlers' => [$topicName => $queueConfig['handlers']],
139  'instance_type' => $queueConfig['consumerInstance'] != null
140  ? $queueConfig['consumerInstance'] : self::DEFAULT_INSTANCE,
141  'consumer_type' => $topic[CommunicationConfig::TOPIC_IS_SYNCHRONOUS] ? 'sync' : 'async',
142  'max_messages' => $queueConfig['maxMessages'],
143  'connection' => $topicConfig['type']
144  ];
145  }
146  }
147  return $output;
148  }
149 
156  private function processWildcard($topics)
157  {
158  $topicDefinitions = $this->communicationConfig->getTopics();
159  $wildcardKeys = [];
160  $topicNames = array_keys($topics);
161  foreach ($topicNames as $topicName) {
162  if (strpos($topicName, '*') !== false || strpos($topicName, '#') !== false) {
163  $wildcardKeys[] = $topicName;
164  }
165  }
166  foreach (array_unique($wildcardKeys) as $wildcardKey) {
167  $pattern = $this->xmlValidator->buildWildcardPattern($wildcardKey);
168  foreach (array_keys($topicDefinitions) as $topicName) {
169  if (preg_match($pattern, $topicName)) {
170  if (isset($topics[$topicName])) {
171  $topics[$topicName] = array_merge($topics[$topicName], $topics[$wildcardKey]);
172  } else {
173  $topics[$topicName] = $topics[$wildcardKey];
174  }
175  }
176  }
177  unset($topics[$wildcardKey]);
178  }
179  return $topics;
180  }
181 
188  private function buildPublishers($topics)
189  {
190  $output = [];
191  foreach ($topics as $topicConfig) {
192  $publisherName = $topicConfig['type'] . '-' . $topicConfig['exchange'];
193  $output[$publisherName] = [
194  'name' => $publisherName,
195  'connection' => $topicConfig['type'],
196  'exchange' => $topicConfig['exchange']
197  ];
198  }
199  return $output;
200  }
201 
208  private function buildBinds($topics)
209  {
210  $output = [];
211  foreach ($topics as $topicName => $topicConfig) {
212  $queueNames = array_keys($topicConfig['queues']);
213  foreach ($queueNames as $queueName) {
214  $name = $topicName . '--' . $topicConfig['exchange']. '--' .$queueName;
215  $output[$name] = [
216  'queue' => $queueName,
217  'exchange' => $topicConfig['exchange'],
218  'topic' => $topicName,
219  ];
220  }
221  }
222  return $output;
223  }
224 
231  private function buildExchangeTopicToQueue($topics)
232  {
233  $output = [];
234  foreach ($topics as $topicName => $topicConfig) {
235  $key = $topicConfig['type'] . '-' . $topicConfig['exchange'] . '--' . $topicName;
236  $queueNames = array_keys($topicConfig['queues']);
237  foreach ($queueNames as $queueName) {
238  $output[$key][] = $queueName;
239  $output[$key] = array_unique($output[$key]);
240  }
241  }
242  return $output;
243  }
244 
251  private function extractTopics($config)
252  {
253  $output = [];
255  foreach ($config->getElementsByTagName('broker') as $brokerNode) {
256  $topicName = $this->getAttributeValue($brokerNode, 'topic');
257  $output[$topicName] = [
258  ConfigInterface::TOPIC_NAME => $topicName,
259  'type' => $this->getAttributeValue($brokerNode, 'type', self::DEFAULT_TYPE),
260  'exchange' => $this->getAttributeValue($brokerNode, 'exchange', self::DEFAULT_EXCHANGE),
261  'consumerInstance' => $this->getAttributeValue($brokerNode, 'consumerInstance'),
262  'maxMessages' => $this->getAttributeValue($brokerNode, 'maxMessages'),
263  'queues' => $this->extractQueuesFromBroker($brokerNode, $topicName)
264  ];
265  }
266  return $output;
267  }
268 
276  protected function extractQueuesFromBroker(\DOMElement $brokerNode, $topicName)
277  {
278  $queues = [];
279  $topicConfig = $this->communicationConfig->getTopic($topicName);
281  foreach ($brokerNode->getElementsByTagName('queue') as $queueNode) {
282  $handler = $this->getAttributeValue($queueNode, 'handler');
283  $queueName = $this->getAttributeValue($queueNode, 'name');
284  $queue = [
285  'name'=> $queueName,
286  'handlerName' => $this->getAttributeValue($queueNode, 'handlerName'),
287  'handlers' => $handler ? ['default' => $handler] : $topicConfig['handlers'],
288  'exchange' => $this->getAttributeValue($queueNode, 'exchange'),
289  'consumer' => $this->getAttributeValue($queueNode, 'consumer'),
290  'consumerInstance' => $this->getAttributeValue($queueNode, 'consumerInstance'),
291  'maxMessages' => $this->getAttributeValue($queueNode, 'maxMessages', null),
292  'type' => $this->getAttributeValue($queueNode, 'type')
293 
294  ];
295  $queues[$queueName] = $queue;
296  }
297  return $queues;
298  }
299 
308  protected function getAttributeValue(\DOMNode $node, $attributeName, $default = null)
309  {
310  $item = $node->attributes->getNamedItem($attributeName);
311  return $item ? $item->nodeValue : $default;
312  }
313 }
$queue
Definition: queue.php:21
$pattern
Definition: website.php:22
$config
Definition: fraud_order.php:17
$source
Definition: source.php:23
__construct(MethodsMap $methodsMap, Validator $xmlValidator, CommunicationConfig $communicationConfig)
Definition: TopicConfig.php:48
$queues
Definition: queues.php:8
getAttributeValue(\DOMNode $node, $attributeName, $default=null)
catch(\Exception $e) $handler
Definition: index.php:30
if(!isset($_GET['name'])) $name
Definition: log.php:14