Skip to content

Commit 771f214

Browse files
ngaspariStyleCIBot
andauthored
worker per single queue (#39)
* option to define queue name Option to define queue name to listen; artisan queue:work --queue=<queue_name> * Apply fixes from StyleCI * . * auto * Apply fixes from StyleCI --------- Co-authored-by: StyleCI Bot <[email protected]>
1 parent dc51d10 commit 771f214

8 files changed

+166
-22
lines changed

README.md

+8-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,14 @@ Subscribing with client acknowledgement option (ENV variables):
4646

4747
```
4848
STOMP_CONSUMER_WIN_SIZE=819200 // number of bytes that Broker will send to client before it expects ACK
49-
STOMP_CONSUMER_ACK_MODE=client // mode: client (ACK needs to be sent) | auto (no ACK, and window-size has to be -1 in that case)
49+
STOMP_CONSUMER_ACK_MODE=auto // mode: client (ACK needs to be sent) | auto (no ACK, and window-size has to be -1 in that case)
50+
```
51+
52+
Options for Laravel worker:
53+
54+
```
55+
STOMP_CONSUMER_ALL_QUEUES = default; // which queue name(s) represent that all queues from Config should be read
56+
STOMP_READ_MESSAGE_DB_LOG = false // write POP-ed events in DB table `stomp_event_logs`
5057
```
5158

5259
You can see all other available ``.env`` variables, their defaults and usage explanation within

config/asseco-stomp.php

+4
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,8 @@
44

55
return [
66
'log_manager' => LogManager::class,
7+
8+
'migrations' => [
9+
'run' => true,
10+
],
711
];

config/stomp.php

+13-2
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,10 @@
3333

3434
/** If all messages should fail on timeout. Set to false in order to revert to default (looking in event payload) */
3535
'fail_on_timeout' => env('STOMP_FAIL_ON_TIMEOUT', true),
36-
/** Maximum time in seconds for job execution. This value must be less than send heartbeat in order to run correctly. */
36+
37+
/**
38+
* Maximum time in seconds for job execution. This value must be less than send heartbeat in order to run correctly.
39+
*/
3740
'timeout' => env('STOMP_TIMEOUT', 45),
3841

3942
/**
@@ -56,6 +59,8 @@
5659
*/
5760
'default_queue' => env('STOMP_DEFAULT_QUEUE'),
5861

62+
'enable_read_events_DB_logs' => env('STOMP_READ_MESSAGE_DB_LOG', false) === true,
63+
5964
/**
6065
* Use Laravel logger for outputting logs.
6166
*/
@@ -89,7 +94,13 @@
8994
/**
9095
* Subscribe mode: auto, client.
9196
*/
92-
'consumer_ack_mode' => env('STOMP_CONSUMER_ACK_MODE', 'client'),
97+
'consumer_ack_mode' => env('STOMP_CONSUMER_ACK_MODE', 'auto'),
98+
99+
/**
100+
* Queue name(s) that represent that all queues should be read
101+
* If no queue is specified, Laravel puts 'default' - so this should be entered here.
102+
*/
103+
'worker_queue_name_all' => explode(';', env('STOMP_CONSUMER_ALL_QUEUES', 'default;')),
93104

94105
/**
95106
* Array of supported versions.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
<?php
2+
3+
use Illuminate\Database\Migrations\Migration;
4+
use Illuminate\Database\Schema\Blueprint;
5+
use Illuminate\Support\Facades\Schema;
6+
7+
return new class extends Migration
8+
{
9+
/**
10+
* Run the migrations.
11+
*
12+
* @return void
13+
*/
14+
public function up()
15+
{
16+
Schema::create('stomp_event_logs', function (Blueprint $table) {
17+
$table->id();
18+
$table->string('session_id')->nullable();
19+
$table->string('queue_name')->nullable();
20+
$table->string('subscription_id')->nullable();
21+
$table->string('message_id')->nullable();
22+
23+
$table->text('payload')->nullable();
24+
25+
$table->timestamp('created_at')->useCurrent();
26+
});
27+
}
28+
29+
/**
30+
* Reverse the migrations.
31+
*
32+
* @return void
33+
*/
34+
public function down()
35+
{
36+
Schema::dropIfExists('stomp_event_logs');
37+
}
38+
};

src/Queue/Jobs/StompJob.php

+2-2
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ protected function fireLaravelJob(): void
137137
{
138138
if ($this->laravelJobClassExists()) {
139139
[$class, $method] = JobName::parse($this->payload['job']);
140-
($this->instance = $this->resolve($class))->{$method}($this, $this->payload['data']);
140+
($this->instance = $this->resolve($class))->{$method}($this, $this->payload['data'] ?? []);
141141
} else {
142142
$this->log->error("$this->session [STOMP] Laravel job class does not exist!");
143143
}
@@ -253,7 +253,7 @@ protected function failed($e)
253253

254254
try {
255255
if (method_exists($this->instance = $this->resolve($class), 'failed')) {
256-
$this->instance->failed($this->payload['data'], $e, $this->payload['uuid']);
256+
$this->instance->failed($this->payload['data'] ?? [], $e, $this->payload['uuid']);
257257
}
258258
} catch (\Exception $e) {
259259
$this->log->error('Exception in job failing: ' . $e->getMessage());

src/Queue/Stomp/Config.php

+10
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,14 @@ protected static function appName(): string
3232
{
3333
return Str::snake(config('app.name', 'localhost'));
3434
}
35+
36+
public static function queueNamesForProcessAllQueues()
37+
{
38+
return self::get('worker_queue_name_all');
39+
}
40+
41+
public static function shouldReadMessagesBeLoggedToDB()
42+
{
43+
return self::get('enable_read_events_DB_logs');
44+
}
3545
}

src/Queue/StompQueue.php

+83-17
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
use Illuminate\Queue\InvalidPayloadException;
1919
use Illuminate\Queue\Queue;
2020
use Illuminate\Support\Arr;
21+
use Illuminate\Support\Facades\DB;
2122
use Illuminate\Support\Str;
2223
use Psr\Log\LoggerInterface;
2324
use Stomp\Exception\ConnectionException;
@@ -33,6 +34,7 @@ class StompQueue extends Queue implements QueueInterface
3334
const CORRELATION = 'X-Correlation-ID';
3435

3536
const ACK_MODE_CLIENT = 'client';
37+
const ACK_MODE_AUTO = 'auto';
3638

3739
/**
3840
* Stomp instance from stomp-php repo.
@@ -53,8 +55,15 @@ class StompQueue extends Queue implements QueueInterface
5355
protected int $circuitBreaker = 0;
5456
protected string $session;
5557

58+
/** @var null|Frame */
5659
protected $_lastFrame = null;
57-
protected $_ackMode = 'client';
60+
61+
protected string $_ackMode = '';
62+
63+
protected array $_queueNamesForProcessAllQueues = [''];
64+
protected bool $_customReadQueusDefined = false;
65+
66+
protected bool $_readMessagesLogToDb = false;
5867

5968
public function __construct(ClientWrapper $stompClient)
6069
{
@@ -65,26 +74,31 @@ public function __construct(ClientWrapper $stompClient)
6574

6675
$this->session = $this->client->getClient()->getSessionId();
6776

68-
$this->_ackMode = strtolower(Config::get('consumer_ack_mode') ?? 'client');
77+
$this->_ackMode = strtolower(Config::get('consumer_ack_mode') ?? self::ACK_MODE_AUTO);
78+
79+
// specify which queue names should be considered as "All queues from Config"
80+
// "default" & ""
81+
$this->_queueNamesForProcessAllQueues = Config::queueNamesForProcessAllQueues();
82+
$this->_readMessagesLogToDb = Config::shouldReadMessagesBeLoggedToDB();
6983
}
7084

7185
/**
7286
* Append queue name to topic/address to avoid random hashes in broker.
7387
*
88+
* @param string|null $queuesString
7489
* @return array
7590
*/
76-
protected function setReadQueues(): array
91+
protected function setReadQueues(?string $queuesString = ''): array
7792
{
78-
$queues = $this->parseQueues(Config::readQueues());
93+
$queuesString = $queuesString ?: Config::readQueues();
94+
$queues = $this->parseQueues($queuesString);
7995

8096
foreach ($queues as &$queue) {
8197
$default = Config::defaultQueue();
82-
8398
if (!str_contains($queue, self::AMQ_QUEUE_SEPARATOR)) {
8499
$queue .= self::AMQ_QUEUE_SEPARATOR . $default . '_' . substr(Str::uuid(), -5);
85100
continue;
86101
}
87-
88102
if (Config::get('prepend_queues')) {
89103
$topic = Str::before($queue, self::AMQ_QUEUE_SEPARATOR);
90104
$queueName = Str::after($queue, self::AMQ_QUEUE_SEPARATOR);
@@ -218,7 +232,7 @@ protected function writeToMultipleQueues(array $writeQueues, Message $payload):
218232
$this->log->info("$this->session [STOMP] Pushing stomp payload to queue: " . print_r([
219233
'body' => $payload->getBody(),
220234
'headers' => $payload->getHeaders(),
221-
'queue' => $writeQueues,
235+
'queues' => $writeQueues,
222236
], true));
223237

224238
$allEventsSent = true;
@@ -349,6 +363,8 @@ protected function hasEvent($job): bool
349363
*/
350364
public function pop($queue = null)
351365
{
366+
$this->setReadQueuesForWorker($queue);
367+
352368
$this->ackLastFrameIfNecessary();
353369

354370
$frame = $this->read($queue);
@@ -366,21 +382,21 @@ public function pop($queue = null)
366382
$queueFromFrame = $this->getQueueFromFrame($frame);
367383

368384
if (!$queueFromFrame) {
369-
$this->log->error("$this->session [STOMP] Wrong frame received. Expected MESSAGE, got: " . print_r($frame, true));
385+
$this->log->warning("$this->session [STOMP] Wrong frame received. Expected MESSAGE, got: " . print_r($frame, true));
370386
$this->_lastFrame = null;
371387

372388
return null;
373389
}
374390

375391
$this->_lastFrame = $frame;
376392

393+
$this->writeMessageToDBIfNeeded($frame, $queueFromFrame);
394+
377395
return new StompJob($this->container, $this, $frame, $queueFromFrame);
378396
}
379397

380398
protected function read($queue)
381399
{
382-
// This will read from queue, then push on same session ID if there are events following, then delete event which was read
383-
// If job fails, it will be re-pushed on same session ID but with additional headers for redelivery
384400
try {
385401
$this->log->info("$this->session [STOMP] POP");
386402

@@ -457,7 +473,11 @@ protected function reconnect(bool $subscribe = true)
457473

458474
try {
459475
$this->client->getClient()->connect();
476+
$newSessionId = $this->client->getClient()->getSessionId();
477+
460478
$this->log->info("$this->session [STOMP] Reconnected successfully.");
479+
$this->log->info("$this->session [STOMP] Switching session to: $newSessionId");
480+
$this->session = $newSessionId;
461481
} catch (Exception $e) {
462482
$this->circuitBreaker++;
463483

@@ -475,7 +495,7 @@ protected function reconnect(bool $subscribe = true)
475495
}
476496

477497
// By this point it should be connected, so it is safe to subscribe
478-
if ($this->client->getClient()->isConnected() && $subscribe) {
498+
if ($subscribe && $this->client->getClient()->isConnected()) {
479499
$this->log->info("$this->session [STOMP] Connected, subscribing...");
480500
$this->subscribedTo = [];
481501
$this->subscribeToQueues();
@@ -497,23 +517,29 @@ public function disconnect()
497517
}
498518
}
499519

520+
/**
521+
* Subscribe to queues.
522+
*
523+
* @return void
524+
*/
500525
protected function subscribeToQueues(): void
501526
{
527+
$winSize = Config::get('consumer_window_size');
528+
if ($this->_ackMode != self::ACK_MODE_CLIENT) {
529+
// New Artemis version can't work without this as it will consume only first message otherwise.
530+
$winSize = -1;
531+
}
532+
502533
foreach ($this->readQueues as $queue) {
503534
$alreadySubscribed = in_array($queue, $this->subscribedTo);
504535

505536
if ($alreadySubscribed) {
506537
continue;
507538
}
508539

509-
$winSize = Config::get('consumer_window_size') ?: 8192000;
510-
if ($this->_ackMode != self::ACK_MODE_CLIENT) {
511-
$winSize = -1;
512-
}
540+
$this->log->info("$this->session [STOMP] subscribeToQueue `$queue` with ack-mode: {$this->_ackMode} & window-size: $winSize");
513541

514542
$this->client->subscribe($queue, null, $this->_ackMode, [
515-
// New Artemis version can't work without this as it will consume only first message otherwise.
516-
//'consumer-window-size' => '-1',
517543
// we can define this if we are using ack mode = client
518544
'consumer-window-size' => (string) $winSize,
519545
]);
@@ -530,8 +556,48 @@ protected function subscribeToQueues(): void
530556
public function ackLastFrameIfNecessary()
531557
{
532558
if ($this->_ackMode == self::ACK_MODE_CLIENT && $this->_lastFrame) {
559+
$this->log->debug("$this->session [STOMP] ACK-ing last frame. Msg #" . $this->_lastFrame->getMessageId());
533560
$this->client->ack($this->_lastFrame);
534561
$this->_lastFrame = null;
535562
}
536563
}
564+
565+
/**
566+
* Set read queues for queue worker, if queue parameter is defined
567+
* > php artisan queue:work --queue=eloquent::live30.
568+
*
569+
* @param $queue
570+
* @return void
571+
*/
572+
protected function setReadQueuesForWorker($queue)
573+
{
574+
if ($this->_customReadQueusDefined) {
575+
// already setup
576+
return;
577+
}
578+
579+
$queue = (string) $queue;
580+
if (!in_array($queue, $this->_queueNamesForProcessAllQueues)) {
581+
// one or more queue
582+
$this->readQueues = $this->setReadQueues($queue);
583+
}
584+
585+
$this->_customReadQueusDefined = true;
586+
}
587+
588+
protected function writeMessageToDBIfNeeded(Frame $frame, $queueFromFrame)
589+
{
590+
if ($this->_readMessagesLogToDb) {
591+
DB::table('stomp_event_logs')->insert(
592+
[
593+
'session_id' => $this->session,
594+
'queue_name' => $queueFromFrame,
595+
'subscription_id' => $frame['subscription'],
596+
'message_id' => $frame->getMessageId(),
597+
'payload' => print_r($frame, true),
598+
'created_at' => date('Y-m-d H:i:s.u'),
599+
]
600+
);
601+
}
602+
}
537603
}

src/StompServiceProvider.php

+8
Original file line numberDiff line numberDiff line change
@@ -50,5 +50,13 @@ public function boot()
5050

5151
return $logsEnabled ? new $logManager($app) : new NullLogger();
5252
});
53+
54+
if (config('asseco-stomp.migrations.run')) {
55+
$this->loadMigrationsFrom(__DIR__ . '/../migrations');
56+
}
57+
58+
$this->publishes([
59+
__DIR__ . '/../migrations' => database_path('migrations'),
60+
], 'asseco-stomp');
5361
}
5462
}

0 commit comments

Comments
 (0)