|
8 | 8 | use Proklung\RabbitMQ\RabbitMq\Consumer;
|
9 | 9 | use Symfony\Component\DependencyInjection\Container;
|
10 | 10 | use Symfony\Component\DependencyInjection\ContainerBuilder;
|
11 |
| -use Proklung\RabbitMq\RabbitMq\AmqpPartsHolder; |
12 | 11 | use Proklung\RabbitMq\RabbitMq\Binding;
|
13 | 12 | use Proklung\RabbitMQ\RabbitMq\DequeuerAwareInterface;
|
14 |
| -use Proklung\RabbitMQ\RabbitMq\Producer; |
15 | 13 | use Proklung\RabbitMq\Utils\BitrixSettingsDiAdapter;
|
16 | 14 | use Symfony\Component\DependencyInjection\ContainerInterface;
|
17 | 15 | use Symfony\Component\DependencyInjection\Definition;
|
@@ -182,36 +180,24 @@ public function getContainer(): Container
|
182 | 180 | */
|
183 | 181 | private function loadPartsHolder() : void
|
184 | 182 | {
|
185 |
| - $holder = function () { |
186 |
| - $className = $this->parameters['rabbitmq.parts_holder.class']; |
187 |
| - |
188 |
| - /** @var AmqpPartsHolder $instance */ |
189 |
| - $instance = new $className(); |
190 |
| - |
191 |
| - foreach ($this->config['bindings'] as $binding) { |
192 |
| - ksort($binding); |
193 |
| - $key = md5(json_encode($binding)); |
194 |
| - |
195 |
| - $part = $this->container->get("rabbitmq.binding.{$key}"); |
196 |
| - $instance->addPart('rabbitmq.binding', $part); |
197 |
| - } |
198 |
| - |
199 |
| - foreach ($this->config['producers'] as $key => $producer) { |
200 |
| - $part = $this->container->get("rabbitmq.{$key}_producer"); |
201 |
| - $instance->addPart('rabbitmq.base_amqp', $part); |
202 |
| - $instance->addPart('rabbitmq.producer', $part); |
203 |
| - } |
204 |
| - |
205 |
| - foreach ($this->config['consumers'] as $key => $consumer) { |
206 |
| - $part = $this->container->get("rabbitmq.{$key}_consumer"); |
207 |
| - $instance->addPart('rabbitmq.base_amqp', $part); |
208 |
| - $instance->addPart('rabbitmq.consumer', $part); |
209 |
| - } |
210 |
| - |
211 |
| - return $instance; |
212 |
| - }; |
| 183 | + if ($this->config['sandbox']) { |
| 184 | + return; |
| 185 | + } |
| 186 | + foreach ($this->config['bindings'] as $binding) { |
| 187 | + ksort($binding); |
| 188 | + $definition = new Definition($binding['class']); |
| 189 | + $definition->addTag('rabbitmq.binding'); |
| 190 | + $definition->addMethodCall('setArguments', array($binding['arguments'])); |
| 191 | + $definition->addMethodCall('setDestination', array($binding['destination'])); |
| 192 | + $definition->addMethodCall('setDestinationIsExchange', array($binding['destination_is_exchange'])); |
| 193 | + $definition->addMethodCall('setExchange', array($binding['exchange'])); |
| 194 | + $definition->addMethodCall('isNowait', array($binding['nowait'])); |
| 195 | + $definition->addMethodCall('setRoutingKey', array($binding['routing_key'])); |
| 196 | + $this->injectConnection($definition, $binding['connection']); |
| 197 | + $key = md5(json_encode($binding)); |
213 | 198 |
|
214 |
| - $this->container->set('rabbitmq.parts_holder', $holder()); |
| 199 | + $this->container->setDefinition(sprintf('rabbitmq.binding.%s', $key), $definition); |
| 200 | + } |
215 | 201 | }
|
216 | 202 |
|
217 | 203 | /**
|
@@ -302,56 +288,43 @@ private function loadBindings() : void
|
302 | 288 | */
|
303 | 289 | private function loadProducers() : void
|
304 | 290 | {
|
305 |
| - if (!isset($this->config['sandbox']) || $this->config['sandbox'] === false) { |
| 291 | + if ($this->config['sandbox'] == false) { |
306 | 292 | foreach ($this->config['producers'] as $key => $producer) {
|
307 |
| - $producerServiceName = "rabbitmq.{$key}_producer"; |
308 |
| - |
309 |
| - if (!isset($producer['class'])) { |
310 |
| - $producer['class'] = $this->parameters['rabbitmq.producer.class']; |
311 |
| - } |
312 |
| - |
313 |
| - // this producer doesn't define an exchange -> using AMQP Default |
| 293 | + $definition = new Definition($producer['class'] ?? $this->container->getParameter('rabbitmq.producer.class')); |
| 294 | + $definition->setPublic(true); |
| 295 | + $definition->addTag('rabbitmq.base_amqp'); |
| 296 | + $definition->addTag('rabbitmq.producer'); |
| 297 | + //this producer doesn't define an exchange -> using AMQP Default |
314 | 298 | if (!isset($producer['exchange_options'])) {
|
315 | 299 | $producer['exchange_options'] = $this->getDefaultExchangeOptions();
|
316 | 300 | }
|
317 |
| - |
318 |
| - // this producer doesn't define a queue -> using AMQP Default |
| 301 | + $definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($producer['exchange_options']))); |
| 302 | + //this producer doesn't define a queue -> using AMQP Default |
319 | 303 | if (!isset($producer['queue_options'])) {
|
320 | 304 | $producer['queue_options'] = $this->getDefaultQueueOptions();
|
321 | 305 | }
|
| 306 | + $definition->addMethodCall('setQueueOptions', array($producer['queue_options'])); |
| 307 | + $this->injectConnection($definition, $producer['connection']); |
322 | 308 |
|
323 |
| - $producers = function () use ($producer) { |
324 |
| - $className = $producer['class']; |
325 |
| - $connectionName = "rabbitmq.connection.{$producer['connection']}"; |
326 |
| - |
327 |
| - /** @var Producer $instance */ |
328 |
| - $instance = new $className($this->container->get($connectionName)); |
329 |
| - |
330 |
| - $instance->setExchangeOptions($producer['exchange_options']); |
331 |
| - $instance->setQueueOptions($producer['queue_options']); |
332 |
| - |
333 |
| - if (isset($producer['auto_setup_fabric']) && !$producer['auto_setup_fabric']) { |
334 |
| - $instance->disableAutoSetupFabric(); |
335 |
| - } |
| 309 | + if (!$producer['auto_setup_fabric']) { |
| 310 | + $definition->addMethodCall('disableAutoSetupFabric'); |
| 311 | + } |
336 | 312 |
|
337 |
| - if (isset($producer['enable_logger']) && $producer['enable_logger']) { |
338 |
| - $instance->setLogger($this->container->get($producer['logger'])); |
339 |
| - } |
| 313 | + if ($producer['enable_logger']) { |
| 314 | + $this->injectLogger($definition); |
| 315 | + } |
340 | 316 |
|
341 |
| - return $instance; |
342 |
| - }; |
| 317 | + $producerServiceName = sprintf('rabbitmq.%s_producer', $key); |
343 | 318 |
|
344 |
| - $this->container->set( |
345 |
| - $producerServiceName, |
346 |
| - $producers() |
347 |
| - ); |
| 319 | + $this->container->setDefinition($producerServiceName, $definition); |
| 320 | + if (null !== $producer['service_alias']) { |
| 321 | + $this->container->setAlias($producer['service_alias'], $producerServiceName); |
| 322 | + } |
348 | 323 | }
|
349 | 324 | } else {
|
350 | 325 | foreach ($this->config['producers'] as $key => $producer) {
|
351 |
| - $this->container->register( |
352 |
| - "rabbitmq.{$key}_producer", |
353 |
| - $this->parameters['rabbitmq.fallback.class'] |
354 |
| - )->setPublic(true); |
| 326 | + $definition = new Definition('%rabbitmq.fallback.class%'); |
| 327 | + $this->container->setDefinition(sprintf('rabbitmq.%s_producer', $key), $definition); |
355 | 328 | }
|
356 | 329 | }
|
357 | 330 | }
|
|
0 commit comments