Skip to content

Commit f1711be

Browse files
author
Thibaut Chieux
committed
[Messenger] Allow to skip message in FailedMessagesRetryCommand
Update CHANGELOG message and help message
1 parent 3e34b41 commit f1711be

5 files changed

+86
-3
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ CHANGELOG
99
* Add `#[AsMessage]` attribute with `$transport` parameter for message routing
1010
* Add `--format` option to the `messenger:stats` command
1111
* Add `getRetryDelay()` method to `RecoverableExceptionInterface`
12+
* Add `skip` option to `messenger:failed:retry` command when run interactively to skip message and requeue it
1213

1314
7.1
1415
---

Command/FailedMessagesRetryCommand.php

+10-3
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,11 @@
2323
use Symfony\Component\Console\Style\SymfonyStyle;
2424
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
2525
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
26+
use Symfony\Component\Messenger\Event\WorkerMessageSkipEvent;
2627
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
2728
use Symfony\Component\Messenger\MessageBusInterface;
2829
use Symfony\Component\Messenger\Stamp\MessageDecodingFailedStamp;
30+
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
2931
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
3032
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
3133
use Symfony\Component\Messenger\Transport\Receiver\SingleMessageReceiver;
@@ -68,8 +70,8 @@ protected function configure(): void
6870
6971
<info>php %command.full_name%</info>
7072
71-
The command will interactively ask if each message should be retried
72-
or discarded.
73+
The command will interactively ask if each message should be retried,
74+
discarded or skipped.
7375
7476
Some transports support retrying a specific message id, which comes
7577
from the <info>messenger:failed:show</info> command.
@@ -204,7 +206,8 @@ private function runWorker(string $failureTransportName, ReceiverInterface $rece
204206

205207
$this->forceExit = true;
206208
try {
207-
$shouldHandle = $shouldForce || 'retry' === $io->choice('Please select an action', ['retry', 'delete'], 'retry');
209+
$choice = $io->choice('Please select an action', ['retry', 'delete', 'skip'], 'retry');
210+
$shouldHandle = $shouldForce || 'retry' === $choice;
208211
} finally {
209212
$this->forceExit = false;
210213
}
@@ -213,6 +216,10 @@ private function runWorker(string $failureTransportName, ReceiverInterface $rece
213216
return;
214217
}
215218

219+
if ('skip' === $choice) {
220+
$this->eventDispatcher->dispatch(new WorkerMessageSkipEvent($envelope, $envelope->last(SentToFailureTransportStamp::class)->getOriginalReceiverName()));
221+
}
222+
216223
$messageReceivedEvent->shouldHandle(false);
217224
$receiver->reject($envelope);
218225
};

Event/WorkerMessageSkipEvent.php

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <[email protected]>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Event;
13+
14+
/**
15+
* Dispatched when a message was skip.
16+
*
17+
* The event name is the class name.
18+
*/
19+
final class WorkerMessageSkipEvent extends AbstractWorkerMessageEvent
20+
{
21+
}

EventListener/SendFailedMessageToFailureTransportListener.php

+17
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use Psr\Log\LoggerInterface;
1616
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
1717
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
18+
use Symfony\Component\Messenger\Event\WorkerMessageSkipEvent;
1819
use Symfony\Component\Messenger\Stamp\DelayStamp;
1920
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
2021
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
@@ -65,10 +66,26 @@ public function onMessageFailed(WorkerMessageFailedEvent $event): void
6566
$failureSender->send($envelope);
6667
}
6768

69+
public function onMessageSkip(WorkerMessageSkipEvent $event): void
70+
{
71+
if (!$this->failureSenders->has($event->getReceiverName())) {
72+
return;
73+
}
74+
75+
$failureSender = $this->failureSenders->get($event->getReceiverName());
76+
$envelope = $event->getEnvelope()->with(
77+
new SentToFailureTransportStamp($event->getReceiverName()),
78+
new DelayStamp(0),
79+
);
80+
81+
$failureSender->send($envelope);
82+
}
83+
6884
public static function getSubscribedEvents(): array
6985
{
7086
return [
7187
WorkerMessageFailedEvent::class => ['onMessageFailed', -100],
88+
WorkerMessageSkipEvent::class => ['onMessageSkip', -100],
7289
];
7390
}
7491
}

Tests/Command/FailedMessagesRetryCommandTest.php

+37
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
use Symfony\Component\Messenger\Command\FailedMessagesRetryCommand;
2020
use Symfony\Component\Messenger\Envelope;
2121
use Symfony\Component\Messenger\MessageBusInterface;
22+
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
2223
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
2324
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
2425

@@ -223,4 +224,40 @@ public function testCompleteIdWithSpecifiedTransport()
223224

224225
$this->assertSame(['2ab50dfa1fbf', '78c2da843723'], $suggestions);
225226
}
227+
228+
public function testSkipRunWithServiceLocator()
229+
{
230+
$failureTransportName = 'failure_receiver';
231+
$originalTransportName = 'original_receiver';
232+
233+
$serviceLocator = $this->createMock(ServiceLocator::class);
234+
$receiver = $this->createMock(ListableReceiverInterface::class);
235+
236+
$dispatcher = new EventDispatcher();
237+
$bus = $this->createMock(MessageBusInterface::class);
238+
239+
$serviceLocator->method('has')->willReturn(true);
240+
$serviceLocator->method('get')->with($failureTransportName)->willReturn($receiver);
241+
242+
$receiver->expects($this->once())->method('find')
243+
->willReturn(Envelope::wrap(new \stdClass(), [
244+
new SentToFailureTransportStamp($originalTransportName)
245+
]));
246+
247+
$receiver->expects($this->never())->method('ack');
248+
$receiver->expects($this->once())->method('reject');
249+
250+
$command = new FailedMessagesRetryCommand(
251+
$failureTransportName,
252+
$serviceLocator,
253+
$bus,
254+
$dispatcher
255+
);
256+
257+
$tester = new CommandTester($command);
258+
$tester->setInputs(['skip']);
259+
260+
$tester->execute(['id' => ['10']]);
261+
$this->assertStringContainsString('[OK]', $tester->getDisplay());
262+
}
226263
}

0 commit comments

Comments
 (0)