Skip to content

Commit b20661c

Browse files
author
palPalani
committed
Support single and multiple message
1 parent eea8274 commit b20661c

File tree

3 files changed

+80
-29
lines changed

3 files changed

+80
-29
lines changed

config/sqs-queue-reader.php

+18-7
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,25 @@
55
*/
66
return [
77

8-
/**
9-
* Separate queue handle with corresponding queue name as key.
10-
*/
8+
// Separate queue handler with corresponding queue name as key.
119
'handlers' => [
12-
//'stripe-webhooks' => App\Jobs\StripeHandler::class,
13-
//'mailgun-webhooks' => App\Jobs\MailgunHandler::class,
14-
//'shopify-webhooks' => App\Jobs\ShopifyHandler::class,
10+
'stripe-webhooks' => [
11+
'class' => App\Jobs\StripeHandler::class,
12+
'count' => 10,
13+
],
14+
'mailgun-webhooks' => [
15+
'class' => App\Jobs\MailgunHandler::class,
16+
'count' => 500,
17+
]
1518
],
1619

17-
'default-handler' => App\Jobs\SqsHandler::class
20+
// If no handlers specified then default handler will be executed.
21+
'default-handler' => [
22+
23+
// Name of the handler class
24+
'class' => App\Jobs\SqsHandler::class,
25+
26+
// Number of messages need to read from SQS.
27+
'count' => 1,
28+
]
1829
];

src/Sqs/Queue.php

+35-4
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ private function getClass($queue = null): string
4848
return Config::get('sqs-queue-reader.default-handler');
4949
}
5050

51-
$queue = end(explode('/', $queue));
51+
$queueArray = explode('/', $queue);
52+
$queue = end($queueArray);
5253

5354
return (array_key_exists($queue, Config::get('sqs-queue-reader.handlers')))
5455
? Config::get('sqs-queue-reader.handlers')[$queue]
@@ -64,12 +65,15 @@ private function getClass($queue = null): string
6465
public function pop($queue = null)
6566
{
6667
$queue = $this->getQueue($queue);
68+
$count = (array_key_exists($queue, Config::get('sqs-queue-reader.handlers')))
69+
? Config::get('sqs-queue-reader.handlers')[$queue]['count']
70+
: Config::get('sqs-queue-reader.default-handler')['count'];
6771

6872
try {
6973
$response = $this->sqs->receiveMessage([
7074
'QueueUrl' => $queue,
7175
'AttributeNames' => ['ApproximateReceiveCount'],
72-
'MaxNumberOfMessages' => 5,
76+
'MaxNumberOfMessages' => $count,
7377
'MessageAttributeNames' => ['All'],
7478
]);
7579

@@ -82,7 +86,11 @@ public function pop($queue = null)
8286
? $this->container['config']->get('sqs-queue-reader.handlers')[$queueId]
8387
: $this->container['config']->get('sqs-queue-reader.default-handler');
8488

85-
$response = $this->modifyPayload($response['Messages'], $class);
89+
if($count === 1) {
90+
$response = $this->modifySinglePayload($response['Messages'][0], $class);
91+
} else {
92+
$response = $this->modifyMultiplePayload($response['Messages'], $class);
93+
}
8694
Log::debug('New $responseV2==', [$response]);
8795

8896
return new SqsJob($this->container, $this->sqs, $response, $this->connectionName, $queue);
@@ -99,7 +107,30 @@ public function pop($queue = null)
99107
* @param string $class
100108
* @return array
101109
*/
102-
private function modifyPayload($payload, $class)
110+
private function modifySinglePayload($payload, $class)
111+
{
112+
if (! is_array($payload)) {
113+
$payload = json_decode($payload, true);
114+
}
115+
116+
$body = json_decode($payload['Body'], true);
117+
118+
$body = [
119+
'job' => $class . '@handle',
120+
'data' => isset($body['data']) ? $body['data'] : $body,
121+
];
122+
123+
$payload['Body'] = json_encode($body);
124+
125+
return $payload;
126+
}
127+
128+
/**
129+
* @param string|array $payload
130+
* @param string $class
131+
* @return array
132+
*/
133+
private function modifyMultiplePayload($payload, $class)
103134
{
104135
if (! is_array($payload)) {
105136
$payload = json_decode($payload, true);

src/SqsQueueReaderServiceProvider.php

+27-18
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
namespace palPalani\SqsQueueReader;
44

55
use Illuminate\Queue\Events\JobProcessed;
6+
use Illuminate\Support\Facades\Config;
67
use Illuminate\Support\Facades\Log;
78
use Illuminate\Support\Facades\Queue;
89
use Illuminate\Support\ServiceProvider;
@@ -18,29 +19,37 @@ public function boot(): void
1819
], 'config');
1920

2021
Queue::after(static function (JobProcessed $event) {
21-
$data = $event->job->payload();
22-
Log::debug('Job payload==', [$data]);
23-
//$event->job->delete();
22+
$queue = $event->job->getQueue();
23+
$count = (array_key_exists($queue, Config::get('sqs-queue-reader.handlers')))
24+
? Config::get('sqs-queue-reader.handlers')[$queue]['count']
25+
: Config::get('sqs-queue-reader.default-handler')['count'];
2426

25-
$batchIds = array_column($data, 'batchIds');
27+
if($count === 1) {
28+
$event->job->delete();
29+
} else {
30+
$data = $event->job->payload();
31+
Log::debug('Job payload==', [$data]);
2632

27-
$batchIds = array_chunk($batchIds, 10);
33+
$batchIds = array_column($data, 'batchIds');
2834

29-
foreach ($batchIds as $batch) {
30-
//Deletes up to ten messages from the specified queue.
31-
$result = $event->job->deleteMessageBatch([
32-
'Entries' => $batch,
33-
'QueueUrl' => $event->job->getQueue(),
34-
]);
35+
$batchIds = array_chunk($batchIds, 10);
3536

36-
if ($result['Failed']) {
37-
$msg = '';
38-
foreach ($result['Failed'] as $failed) {
39-
$msg .= sprintf("Deleting message failed, code = %s, id = %s, msg = %s, senderfault = %s", $failed['Code'], $failed['Id'], $failed['Message'], $failed['SenderFault']);
40-
}
41-
Log::error('Cannot delete some SQS messages: ', [$msg]);
37+
foreach ($batchIds as $batch) {
38+
//Deletes up to ten messages from the specified queue.
39+
$result = $event->job->deleteMessageBatch([
40+
'Entries' => $batch,
41+
'QueueUrl' => $queue,
42+
]);
43+
44+
if ($result['Failed']) {
45+
$msg = '';
46+
foreach ($result['Failed'] as $failed) {
47+
$msg .= sprintf("Deleting message failed, code = %s, id = %s, msg = %s, senderfault = %s", $failed['Code'], $failed['Id'], $failed['Message'], $failed['SenderFault']);
48+
}
49+
Log::error('Cannot delete some SQS messages: ', [$msg]);
4250

43-
throw new \RuntimeException("Cannot delete some messages, consult log for more info!");
51+
throw new \RuntimeException("Cannot delete some messages, consult log for more info!");
52+
}
4453
}
4554
}
4655
});

0 commit comments

Comments
 (0)