Skip to content

Commit 4bf5868

Browse files
committed
[Messenger] Notify transports which messages are still being processed, using pcntl_alarm()
1 parent 0f8bb86 commit 4bf5868

7 files changed

+176
-4
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ CHANGELOG
1010
* Add `--format` option to the `messenger:stats` command
1111
* Add `getRetryDelay()` method to `RecoverableExceptionInterface`
1212
* Add `skip` option to `messenger:failed:retry` command when run interactively to skip message and requeue it
13+
* Add the ability to asynchronously notify transports about which messages are still being processed by the worker, using `pcntl_alarm()`
1314

1415
7.1
1516
---

Command/ConsumeMessagesCommand.php

+19-1
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@
4343
#[AsCommand(name: 'messenger:consume', description: 'Consume messages')]
4444
class ConsumeMessagesCommand extends Command implements SignalableCommandInterface
4545
{
46+
private const DEFAULT_KEEPALIVE_INTERVAL = 5;
47+
4648
private ?Worker $worker = null;
4749

4850
public function __construct(
@@ -75,6 +77,7 @@ protected function configure(): void
7577
new InputOption('queues', null, InputOption::VALUE_REQUIRED | InputOption::VALUE_IS_ARRAY, 'Limit receivers to only consume from the specified queues'),
7678
new InputOption('no-reset', null, InputOption::VALUE_NONE, 'Do not reset container services after each message'),
7779
new InputOption('all', null, InputOption::VALUE_NONE, 'Consume messages from all receivers'),
80+
new InputOption('keepalive', null, InputOption::VALUE_OPTIONAL, 'Whether to use the transport\'s keepalive mechanism if implemented', self::DEFAULT_KEEPALIVE_INTERVAL),
7881
])
7982
->setHelp(<<<'EOF'
8083
The <info>%command.name%</info> command consumes messages and dispatches them to the message bus.
@@ -124,6 +127,13 @@ protected function configure(): void
124127
;
125128
}
126129

130+
protected function initialize(InputInterface $input, OutputInterface $output): void
131+
{
132+
if ($input->hasParameterOption('--keepalive')) {
133+
$this->getApplication()->setAlarmInterval((int) ($input->getOption('keepalive') ?? self::DEFAULT_KEEPALIVE_INTERVAL));
134+
}
135+
}
136+
127137
protected function interact(InputInterface $input, OutputInterface $output): void
128138
{
129139
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
@@ -264,7 +274,7 @@ public function complete(CompletionInput $input, CompletionSuggestions $suggesti
264274

265275
public function getSubscribedSignals(): array
266276
{
267-
return $this->signals ?? (\extension_loaded('pcntl') ? [\SIGTERM, \SIGINT, \SIGQUIT] : []);
277+
return $this->signals ?? (\extension_loaded('pcntl') ? [\SIGTERM, \SIGINT, \SIGQUIT, \SIGALRM] : []);
268278
}
269279

270280
public function handleSignal(int $signal, int|false $previousExitCode = 0): int|false
@@ -273,6 +283,14 @@ public function handleSignal(int $signal, int|false $previousExitCode = 0): int|
273283
return false;
274284
}
275285

286+
if (\SIGALRM === $signal) {
287+
$this->logger?->info('Sending keepalive request.', ['transport_names' => $this->worker->getMetadata()->getTransportNames()]);
288+
289+
$this->worker->keepalive();
290+
291+
return false;
292+
}
293+
276294
$this->logger?->info('Received signal {signal}.', ['signal' => $signal, 'transport_names' => $this->worker->getMetadata()->getTransportNames()]);
277295

278296
$this->worker->stop();

Command/FailedMessagesRetryCommand.php

+19-1
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
#[AsCommand(name: 'messenger:failed:retry', description: 'Retry one or more messages from the failure transport')]
4242
class FailedMessagesRetryCommand extends AbstractFailedMessagesCommand implements SignalableCommandInterface
4343
{
44+
private const DEFAULT_KEEPALIVE_INTERVAL = 5;
45+
4446
private bool $shouldStop = false;
4547
private bool $forceExit = false;
4648
private ?Worker $worker = null;
@@ -64,6 +66,7 @@ protected function configure(): void
6466
new InputArgument('id', InputArgument::IS_ARRAY, 'Specific message id(s) to retry'),
6567
new InputOption('force', null, InputOption::VALUE_NONE, 'Force action without confirmation'),
6668
new InputOption('transport', null, InputOption::VALUE_OPTIONAL, 'Use a specific failure transport', self::DEFAULT_TRANSPORT_OPTION),
69+
new InputOption('keepalive', null, InputOption::VALUE_OPTIONAL, 'Whether to use the transport\'s keepalive mechanism if implemented', self::DEFAULT_KEEPALIVE_INTERVAL),
6770
])
6871
->setHelp(<<<'EOF'
6972
The <info>%command.name%</info> retries message in the failure transport.
@@ -87,6 +90,13 @@ protected function configure(): void
8790
;
8891
}
8992

93+
protected function initialize(InputInterface $input, OutputInterface $output): void
94+
{
95+
if ($input->hasParameterOption('--keepalive')) {
96+
$this->getApplication()->setAlarmInterval((int) ($input->getOption('keepalive') ?? self::DEFAULT_KEEPALIVE_INTERVAL));
97+
}
98+
}
99+
90100
protected function execute(InputInterface $input, OutputInterface $output): int
91101
{
92102
$this->eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1));
@@ -134,7 +144,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int
134144

135145
public function getSubscribedSignals(): array
136146
{
137-
return $this->signals ?? (\extension_loaded('pcntl') ? [\SIGTERM, \SIGINT, \SIGQUIT] : []);
147+
return $this->signals ?? (\extension_loaded('pcntl') ? [\SIGTERM, \SIGINT, \SIGQUIT, \SIGALRM] : []);
138148
}
139149

140150
public function handleSignal(int $signal, int|false $previousExitCode = 0): int|false
@@ -143,6 +153,14 @@ public function handleSignal(int $signal, int|false $previousExitCode = 0): int|
143153
return false;
144154
}
145155

156+
if (\SIGALRM === $signal) {
157+
$this->logger?->info('Sending keepalive request.', ['transport_names' => $this->worker->getMetadata()->getTransportNames()]);
158+
159+
$this->worker->keepalive();
160+
161+
return false;
162+
}
163+
146164
$this->logger?->info('Received signal {signal}.', ['signal' => $signal, 'transport_names' => $this->worker->getMetadata()->getTransportNames()]);
147165

148166
$this->worker->stop();

Tests/WorkerTest.php

+80
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
use PHPUnit\Framework\TestCase;
1515
use Psr\EventDispatcher\EventDispatcherInterface;
1616
use Psr\Log\LoggerInterface;
17+
use Symfony\Bridge\PhpUnit\ClockMock;
1718
use Symfony\Component\Clock\MockClock;
1819
use Symfony\Component\EventDispatcher\EventDispatcher;
1920
use Symfony\Component\HttpKernel\DependencyInjection\ServicesResetter;
@@ -43,6 +44,7 @@
4344
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
4445
use Symfony\Component\Messenger\Tests\Fixtures\DummyReceiver;
4546
use Symfony\Component\Messenger\Tests\Fixtures\ResettableDummyReceiver;
47+
use Symfony\Component\Messenger\Transport\Receiver\KeepaliveReceiverInterface;
4648
use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface;
4749
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
4850
use Symfony\Component\Messenger\Worker;
@@ -598,6 +600,64 @@ public function testGcCollectCyclesIsCalledOnMessageHandle()
598600

599601
$this->assertGreaterThan(0, $gcStatus['runs']);
600602
}
603+
604+
/**
605+
* @requires extension pcntl
606+
*/
607+
public function testKeepalive()
608+
{
609+
ClockMock::withClockMock(false);
610+
611+
$expectedEnvelopes = [
612+
new Envelope(new DummyMessage('Hey')),
613+
new Envelope(new DummyMessage('Bob')),
614+
];
615+
616+
$receiver = new DummyKeepaliveReceiver([
617+
[$expectedEnvelopes[0]],
618+
[$expectedEnvelopes[1]],
619+
]);
620+
621+
$handler = new DummyBatchHandler(3);
622+
623+
$middleware = new HandleMessageMiddleware(new HandlersLocator([
624+
DummyMessage::class => [new HandlerDescriptor($handler)],
625+
]));
626+
627+
$bus = new MessageBus([$middleware]);
628+
629+
$dispatcher = new EventDispatcher();
630+
$dispatcher->addListener(WorkerRunningEvent::class, function (WorkerRunningEvent $event) use ($receiver) {
631+
static $i = 0;
632+
if (1 < ++$i) {
633+
$event->getWorker()->stop();
634+
$this->assertSame(2, $receiver->getAcknowledgeCount());
635+
} else {
636+
$this->assertSame(0, $receiver->getAcknowledgeCount());
637+
}
638+
});
639+
640+
$worker = new Worker([$receiver], $bus, $dispatcher, clock: new MockClock());
641+
642+
try {
643+
$oldAsync = pcntl_async_signals(true);
644+
pcntl_signal(\SIGALRM, fn () => $worker->keepalive());
645+
pcntl_alarm(2);
646+
647+
$worker->run();
648+
} finally {
649+
pcntl_async_signals($oldAsync);
650+
pcntl_signal(\SIGALRM, \SIG_DFL);
651+
}
652+
653+
$this->assertCount(2, $receiver->keepaliveEnvelopes);
654+
$this->assertSame($expectedEnvelopes, $receiver->keepaliveEnvelopes);
655+
656+
$receiver->keepaliveEnvelopes = [];
657+
$worker->keepalive();
658+
659+
$this->assertCount(0, $receiver->keepaliveEnvelopes);
660+
}
601661
}
602662

603663
class DummyQueueReceiver extends DummyReceiver implements QueueReceiverInterface
@@ -608,12 +668,26 @@ public function getFromQueues(array $queueNames): iterable
608668
}
609669
}
610670

671+
class DummyKeepaliveReceiver extends DummyReceiver implements KeepaliveReceiverInterface
672+
{
673+
public array $keepaliveEnvelopes = [];
674+
675+
public function keepalive(Envelope $envelope): void
676+
{
677+
$this->keepaliveEnvelopes[] = $envelope;
678+
}
679+
}
680+
611681
class DummyBatchHandler implements BatchHandlerInterface
612682
{
613683
use BatchHandlerTrait;
614684

615685
public array $processedMessages;
616686

687+
public function __construct(private ?int $delay = null)
688+
{
689+
}
690+
617691
public function __invoke(DummyMessage $message, ?Acknowledger $ack = null)
618692
{
619693
return $this->handle($message, $ack);
@@ -628,6 +702,12 @@ private function process(array $jobs): void
628702
{
629703
$this->processedMessages = array_column($jobs, 0);
630704

705+
if (null !== $this->delay) {
706+
for ($i = 0; $i < $this->delay; ++$i) {
707+
sleep(1);
708+
}
709+
}
710+
631711
foreach ($jobs as [$job, $ack]) {
632712
$ack->ack($job);
633713
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <[email protected]>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Transport\Receiver;
13+
14+
use Symfony\Component\Messenger\Envelope;
15+
use Symfony\Component\Messenger\Exception\TransportException;
16+
17+
interface KeepaliveReceiverInterface extends ReceiverInterface
18+
{
19+
/**
20+
* Informs the transport that the message is still being processed to avoid a timeout on the transport's side.
21+
*
22+
* @throws TransportException If there is an issue communicating with the transport
23+
*/
24+
public function keepalive(Envelope $envelope): void;
25+
}

Worker.php

+30
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
use Symfony\Component\Messenger\Stamp\NoAutoAckStamp;
3232
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
3333
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
34+
use Symfony\Component\Messenger\Transport\Receiver\KeepaliveReceiverInterface;
3435
use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface;
3536
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
3637
use Symfony\Component\RateLimiter\LimiterInterface;
@@ -47,6 +48,10 @@ class Worker
4748
private WorkerMetadata $metadata;
4849
private array $acks = [];
4950
private \SplObjectStorage $unacks;
51+
/**
52+
* @var \SplObjectStorage<object, array{0: string, 1: Envelope}>
53+
*/
54+
private \SplObjectStorage $keepalives;
5055

5156
/**
5257
* @param ReceiverInterface[] $receivers Where the key is the transport name
@@ -63,6 +68,7 @@ public function __construct(
6368
'transportNames' => array_keys($receivers),
6469
]);
6570
$this->unacks = new \SplObjectStorage();
71+
$this->keepalives = new \SplObjectStorage();
6672
}
6773

6874
/**
@@ -105,6 +111,10 @@ public function run(array $options = []): void
105111
foreach ($envelopes as $envelope) {
106112
$envelopeHandled = true;
107113

114+
if ($receiver instanceof KeepaliveReceiverInterface) {
115+
$this->keepalives[$envelope->getMessage()] = [$transportName, $envelope];
116+
}
117+
108118
$this->rateLimit($transportName);
109119
$this->handleMessage($envelope, $transportName);
110120
$this->eventDispatcher?->dispatch(new WorkerRunningEvent($this, false));
@@ -186,6 +196,7 @@ private function ack(): bool
186196
if ($rejectFirst = $e instanceof RejectRedeliveredMessageException) {
187197
// redelivered messages are rejected first so that continuous failures in an event listener or while
188198
// publishing for retry does not cause infinite redelivery loops
199+
unset($this->keepalives[$envelope->getMessage()]);
189200
$receiver->reject($envelope);
190201
}
191202

@@ -199,6 +210,7 @@ private function ack(): bool
199210
$envelope = $failedEvent->getEnvelope();
200211

201212
if (!$rejectFirst) {
213+
unset($this->keepalives[$envelope->getMessage()]);
202214
$receiver->reject($envelope);
203215
}
204216

@@ -218,6 +230,7 @@ private function ack(): bool
218230
$this->logger->info('{class} was handled successfully (acknowledging to transport).', $context);
219231
}
220232

233+
unset($this->keepalives[$envelope->getMessage()]);
221234
$receiver->ack($envelope);
222235
}
223236

@@ -277,6 +290,23 @@ public function stop(): void
277290
$this->shouldStop = true;
278291
}
279292

293+
public function keepalive(): void
294+
{
295+
foreach ($this->keepalives as $message) {
296+
[$transportName, $envelope] = $this->keepalives[$message];
297+
298+
if (!$this->receivers[$transportName] instanceof KeepaliveReceiverInterface) {
299+
throw new RuntimeException(\sprintf('Receiver for "%s" does not implement "%s".', $transportName, KeepaliveReceiverInterface::class));
300+
}
301+
302+
$this->logger?->info('Sending keepalive request.', [
303+
'transport' => $transportName,
304+
'message_id' => $envelope->last(TransportMessageIdStamp::class)?->getId(),
305+
]);
306+
$this->receivers[$transportName]->keepalive($envelope);
307+
}
308+
}
309+
280310
public function getMetadata(): WorkerMetadata
281311
{
282312
return $this->metadata;

composer.json

+2-2
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
},
2424
"require-dev": {
2525
"psr/cache": "^1.0|^2.0|^3.0",
26-
"symfony/console": "^6.4|^7.0",
26+
"symfony/console": "^7.2",
2727
"symfony/dependency-injection": "^6.4|^7.0",
2828
"symfony/event-dispatcher": "^6.4|^7.0",
2929
"symfony/http-kernel": "^6.4|^7.0",
@@ -37,7 +37,7 @@
3737
"symfony/validator": "^6.4|^7.0"
3838
},
3939
"conflict": {
40-
"symfony/console": "<6.4",
40+
"symfony/console": "<7.2",
4141
"symfony/event-dispatcher": "<6.4",
4242
"symfony/event-dispatcher-contracts": "<2.5",
4343
"symfony/framework-bundle": "<6.4",

0 commit comments

Comments
 (0)