Skip to content

Commit 8137d56

Browse files
committed
Add $seconds to keepalive methods
1 parent 4bf5868 commit 8137d56

File tree

5 files changed

+10
-8
lines changed

5 files changed

+10
-8
lines changed

Command/ConsumeMessagesCommand.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ public function handleSignal(int $signal, int|false $previousExitCode = 0): int|
286286
if (\SIGALRM === $signal) {
287287
$this->logger?->info('Sending keepalive request.', ['transport_names' => $this->worker->getMetadata()->getTransportNames()]);
288288

289-
$this->worker->keepalive();
289+
$this->worker->keepalive($this->getApplication()->getAlarmInterval());
290290

291291
return false;
292292
}

Command/FailedMessagesRetryCommand.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ public function handleSignal(int $signal, int|false $previousExitCode = 0): int|
156156
if (\SIGALRM === $signal) {
157157
$this->logger?->info('Sending keepalive request.', ['transport_names' => $this->worker->getMetadata()->getTransportNames()]);
158158

159-
$this->worker->keepalive();
159+
$this->worker->keepalive($this->getApplication()->getAlarmInterval());
160160

161161
return false;
162162
}

Tests/WorkerTest.php

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -641,7 +641,7 @@ public function testKeepalive()
641641

642642
try {
643643
$oldAsync = pcntl_async_signals(true);
644-
pcntl_signal(\SIGALRM, fn () => $worker->keepalive());
644+
pcntl_signal(\SIGALRM, fn () => $worker->keepalive(2));
645645
pcntl_alarm(2);
646646

647647
$worker->run();
@@ -654,7 +654,7 @@ public function testKeepalive()
654654
$this->assertSame($expectedEnvelopes, $receiver->keepaliveEnvelopes);
655655

656656
$receiver->keepaliveEnvelopes = [];
657-
$worker->keepalive();
657+
$worker->keepalive(2);
658658

659659
$this->assertCount(0, $receiver->keepaliveEnvelopes);
660660
}
@@ -672,7 +672,7 @@ class DummyKeepaliveReceiver extends DummyReceiver implements KeepaliveReceiverI
672672
{
673673
public array $keepaliveEnvelopes = [];
674674

675-
public function keepalive(Envelope $envelope): void
675+
public function keepalive(Envelope $envelope, ?int $seconds = null): void
676676
{
677677
$this->keepaliveEnvelopes[] = $envelope;
678678
}

Transport/Receiver/KeepaliveReceiverInterface.php

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@ interface KeepaliveReceiverInterface extends ReceiverInterface
1919
/**
2020
* Informs the transport that the message is still being processed to avoid a timeout on the transport's side.
2121
*
22+
* @param int|null $seconds The minimum duration the message should be kept alive
23+
*
2224
* @throws TransportException If there is an issue communicating with the transport
2325
*/
24-
public function keepalive(Envelope $envelope): void;
26+
public function keepalive(Envelope $envelope, ?int $seconds = null): void;
2527
}

Worker.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ public function stop(): void
290290
$this->shouldStop = true;
291291
}
292292

293-
public function keepalive(): void
293+
public function keepalive(?int $seconds): void
294294
{
295295
foreach ($this->keepalives as $message) {
296296
[$transportName, $envelope] = $this->keepalives[$message];
@@ -303,7 +303,7 @@ public function keepalive(): void
303303
'transport' => $transportName,
304304
'message_id' => $envelope->last(TransportMessageIdStamp::class)?->getId(),
305305
]);
306-
$this->receivers[$transportName]->keepalive($envelope);
306+
$this->receivers[$transportName]->keepalive($envelope, $seconds);
307307
}
308308
}
309309

0 commit comments

Comments
 (0)