29 private $xmlValidator;
39 private $communicationConfig;
51 CommunicationConfig $communicationConfig
53 $this->methodsMap = $methodsMap;
54 $this->xmlValidator = $xmlValidator;
55 $this->communicationConfig = $communicationConfig;
63 $topics = $this->extractTopics(
$source);
64 $topics = $this->processWildcard($topics);
65 $publishers = $this->buildPublishers($topics);
66 $binds = $this->buildBinds($topics);
67 $map = $this->buildExchangeTopicToQueue($topics);
68 $consumers = $this->buildConsumers($topics);
84 private function buildTopicsConfiguration($topics)
87 foreach ($topics as $topicName => $topicConfig) {
88 $topicDefinition = $this->communicationConfig->getTopic($topicName);
90 $topicDefinition[
'request_type'] == CommunicationConfig::TOPIC_REQUEST_TYPE_CLASS
91 ? QueueConfig::TOPIC_SCHEMA_TYPE_OBJECT
92 : QueueConfig::TOPIC_SCHEMA_TYPE_METHOD;
93 $schemaValue = $topicDefinition[CommunicationConfig::TOPIC_REQUEST];
97 'schema_type' => $schemaType,
98 'schema_value' => $schemaValue
100 'response_schema' => [
101 'schema_type' => isset($topicDefinition[
'response']) ? QueueConfig::TOPIC_SCHEMA_TYPE_OBJECT :
null,
102 'schema_value' => $topicDefinition[
'response']
104 'is_synchronous' => $topicDefinition[CommunicationConfig::TOPIC_IS_SYNCHRONOUS],
105 'publisher' => $topicConfig[
'type'] .
'-' . $topicConfig[
'exchange']
117 private function buildConsumers($topics)
120 foreach ($topics as $topicName => $topicConfig) {
121 $topic = $this->communicationConfig->getTopic($topicName);
122 foreach ($topicConfig[
'queues'] as $queueName => $queueConfig) {
124 foreach ($queueConfig[
'handlers'] as
$handler) {
125 if (!isset(
$handler[QueueConfig::CONSUMER_CLASS])) {
126 $handlerExploded = explode(
'::',
$handler);
128 $handler[QueueConfig::CONSUMER_CLASS] = $handlerExploded[0];
129 $handler[QueueConfig::CONSUMER_METHOD] = $handlerExploded[1];
133 $queueConfig[
'handlers'] = $handlers;
135 $output[$queueConfig[
'consumer']] = [
136 'name' => $queueConfig[
'consumer'],
137 'queue' => $queueName,
138 'handlers' => [$topicName => $queueConfig[
'handlers']],
139 'instance_type' => $queueConfig[
'consumerInstance'] !=
null 141 'consumer_type' => $topic[CommunicationConfig::TOPIC_IS_SYNCHRONOUS] ?
'sync' :
'async',
142 'max_messages' => $queueConfig[
'maxMessages'],
143 'connection' => $topicConfig[
'type']
156 private function processWildcard($topics)
158 $topicDefinitions = $this->communicationConfig->getTopics();
160 $topicNames = array_keys($topics);
161 foreach ($topicNames as $topicName) {
162 if (strpos($topicName,
'*') !==
false || strpos($topicName,
'#') !==
false) {
163 $wildcardKeys[] = $topicName;
166 foreach (array_unique($wildcardKeys) as $wildcardKey) {
167 $pattern = $this->xmlValidator->buildWildcardPattern($wildcardKey);
168 foreach (array_keys($topicDefinitions) as $topicName) {
169 if (preg_match(
$pattern, $topicName)) {
170 if (isset($topics[$topicName])) {
171 $topics[$topicName] = array_merge($topics[$topicName], $topics[$wildcardKey]);
173 $topics[$topicName] = $topics[$wildcardKey];
177 unset($topics[$wildcardKey]);
188 private function buildPublishers($topics)
191 foreach ($topics as $topicConfig) {
192 $publisherName = $topicConfig[
'type'] .
'-' . $topicConfig[
'exchange'];
194 'name' => $publisherName,
195 'connection' => $topicConfig[
'type'],
196 'exchange' => $topicConfig[
'exchange']
208 private function buildBinds($topics)
211 foreach ($topics as $topicName => $topicConfig) {
212 $queueNames = array_keys($topicConfig[
'queues']);
213 foreach ($queueNames as $queueName) {
214 $name = $topicName .
'--' . $topicConfig[
'exchange'].
'--' .$queueName;
216 'queue' => $queueName,
217 'exchange' => $topicConfig[
'exchange'],
218 'topic' => $topicName,
231 private function buildExchangeTopicToQueue($topics)
234 foreach ($topics as $topicName => $topicConfig) {
235 $key = $topicConfig[
'type'] .
'-' . $topicConfig[
'exchange'] .
'--' . $topicName;
236 $queueNames = array_keys($topicConfig[
'queues']);
237 foreach ($queueNames as $queueName) {
251 private function extractTopics(
$config)
255 foreach (
$config->getElementsByTagName(
'broker') as $brokerNode) {
260 'exchange' => $this->
getAttributeValue($brokerNode,
'exchange', self::DEFAULT_EXCHANGE),
263 'queues' => $this->extractQueuesFromBroker($brokerNode, $topicName)
276 protected function extractQueuesFromBroker(\DOMElement $brokerNode, $topicName)
279 $topicConfig = $this->communicationConfig->getTopic($topicName);
281 foreach ($brokerNode->getElementsByTagName(
'queue') as $queueNode) {
287 'handlers' =>
$handler ? [
'default' =>
$handler] : $topicConfig[
'handlers'],
310 $item = $node->attributes->getNamedItem($attributeName);
const EXCHANGE_TOPIC_TO_QUEUES_MAP
__construct(MethodsMap $methodsMap, Validator $xmlValidator, CommunicationConfig $communicationConfig)
getAttributeValue(\DOMNode $node, $attributeName, $default=null)
catch(\Exception $e) $handler
if(!isset($_GET['name'])) $name