Skip to content

Commit 30575f8

Browse files
committed
Batch consumer
1 parent 8b780da commit 30575f8

File tree

3 files changed

+88
-14
lines changed

3 files changed

+88
-14
lines changed

.settings.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616
'rabbitmq.consumer.class' => 'Proklung\RabbitMq\RabbitMq\Consumer',
1717
'rabbitmq.multi_consumer.class' => '',
1818
'rabbitmq.dynamic_consumer.class' => '',
19-
'rabbitmq.batch_consumer.class' => '',
20-
'rabbitmq.anon_consumer.class' => '',
19+
'rabbitmq.batch_consumer.class' => 'Proklung\RabbitMq\RabbitMq\BatchConsumer',
20+
'rabbitmq.anon_consumer.class' => 'Proklung\RabbitMq\RabbitMq\AnonConsumer',
2121
'rabbitmq.rpc_client.class' => 'Proklung\RabbitMq\RabbitMq\RpcClient',
2222
'rabbitmq.rpc_server.class' => 'Proklung\RabbitMq\RabbitMq\RpcServer',
2323
'rabbitmq.logged.channel.class' => '',

README.md

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,13 @@
77

88
## Отличия
99

10-
Выпилил битриксовый сервис-локатор в пользу отдельного симфонического контейнера.
11-
12-
Исправил некоторое количество ошибок.
13-
14-
Добавил к командам префикс `bitrix-`, чтобы избежать конфликта с командами оригинального бандла `RabbitMqBundle`.
15-
16-
Добавлен отдельный раннер консольных команд.
17-
18-
Адаптирована работа с `RPC_Server` и `RPC_Clients`.
19-
20-
Адаптирована работа с `Anon consumer`.
10+
- Выпилил битриксовый сервис-локатор в пользу отдельного симфонического контейнера
11+
- Исправил некоторое количество ошибок
12+
- Добавил к командам префикс `bitrix-`, чтобы избежать конфликта с командами оригинального бандла `RabbitMqBundle`
13+
- Добавлен отдельный раннер консольных команд
14+
- Адаптирована работа с `RPC_Server` и `RPC_Clients`
15+
- Адаптирована работа с `Anon consumer`
16+
- Адаптирована работа с `Batch consumer`
2117

2218
# Оригинальное readme.MD с некоторыми корректировками
2319

@@ -266,7 +262,7 @@ class RandomIntServer
266262
- [x] Fallback producer
267263
- [ ] Multi-consumer
268264
- [ ] Dynamic consumer
269-
- [ ] Batch consumer
265+
- [x] Batch consumer
270266
- [x] Anon consumer
271267
- [x] Rpc client
272268
- [x] Rpc server

lib/Integration/DI/Services.php

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
use Proklung\RabbitMQ\RabbitMq\DequeuerAwareInterface;
1414
use Proklung\RabbitMQ\RabbitMq\Producer;
1515
use Proklung\RabbitMq\Utils\BitrixSettingsDiAdapter;
16+
use Symfony\Component\DependencyInjection\ContainerInterface;
1617
use Symfony\Component\DependencyInjection\Definition;
1718
use Symfony\Component\DependencyInjection\Reference;
1819

@@ -114,6 +115,7 @@ public function load() : void
114115
$this->loadProducers();
115116
$this->loadConsumers();
116117
$this->loadAnonConsumers();
118+
$this->loadBatchConsumers();
117119
$this->loadRpcClients();
118120
$this->loadRpcServers();
119121

@@ -496,6 +498,82 @@ private function loadAnonConsumers() : void
496498
}
497499
}
498500

501+
/**
502+
* @return void
503+
*/
504+
private function loadBatchConsumers() : void
505+
{
506+
foreach ($this->config['batch_consumers'] as $key => $consumer) {
507+
$this->registerCallbackAsService($consumer['callback']);
508+
509+
$definition = new Definition('%rabbitmq.batch_consumer.class%');
510+
511+
if (!isset($consumer['exchange_options'])) {
512+
$consumer['exchange_options'] = $this->getDefaultExchangeOptions();
513+
}
514+
515+
$definition
516+
->setPublic(true)
517+
->addTag('rabbitmq.base_amqp')
518+
->addTag('rabbitmq.batch_consumer')
519+
->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']))
520+
->addMethodCall('setPrefetchCount', array($consumer['qos_options']['prefetch_count']))
521+
->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'batchExecute')))
522+
->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
523+
->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options'])))
524+
->addMethodCall('setQosOptions', array(
525+
$consumer['qos_options']['prefetch_size'],
526+
$consumer['qos_options']['prefetch_count'],
527+
$consumer['qos_options']['global']
528+
))
529+
;
530+
531+
if (isset($consumer['idle_timeout_exit_code'])) {
532+
$definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
533+
}
534+
535+
if (isset($consumer['idle_timeout'])) {
536+
$definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
537+
}
538+
539+
if (isset($consumer['graceful_max_execution'])) {
540+
$definition->addMethodCall(
541+
'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
542+
array($consumer['graceful_max_execution']['timeout'])
543+
);
544+
}
545+
546+
if (!$consumer['auto_setup_fabric']) {
547+
$definition->addMethodCall('disableAutoSetupFabric');
548+
}
549+
550+
if ($consumer['keep_alive']) {
551+
$definition->addMethodCall('keepAlive');
552+
}
553+
554+
$this->injectConnection($definition, $consumer['connection']);
555+
556+
if ($consumer['enable_logger']) {
557+
$this->injectLogger($definition);
558+
}
559+
560+
$this->container->setDefinition(sprintf('rabbitmq.%s_batch', $key), $definition);
561+
}
562+
}
563+
564+
/**
565+
* @param Definition $definition
566+
*
567+
* @return void
568+
*/
569+
private function injectLogger(Definition $definition)
570+
{
571+
$definition->addTag('monolog.logger', array(
572+
'channel' => 'phpamqplib'
573+
));
574+
$definition->addMethodCall('setLogger', array(new Reference('logger', ContainerInterface::IGNORE_ON_INVALID_REFERENCE)));
575+
}
576+
499577
/**
500578
* Регистрация класса сервисом.
501579
*

0 commit comments

Comments
 (0)