Skip to content

Commit 3d0ebc2

Browse files
author
palPalani
committed
Delete bulk message
1 parent 5f5bd59 commit 3d0ebc2

File tree

2 files changed

+59
-22
lines changed

2 files changed

+59
-22
lines changed

src/Sqs/Queue.php

Lines changed: 37 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
namespace palPalani\SqsQueueReader\Sqs;
44

5+
use Aws\Exception\AwsException;
56
use Illuminate\Queue\Jobs\SqsJob;
67
use Illuminate\Queue\SqsQueue;
78
use Illuminate\Support\Facades\Config;
@@ -64,26 +65,31 @@ public function pop($queue = null)
6465
{
6566
$queue = $this->getQueue($queue);
6667

67-
$response = $this->sqs->receiveMessage([
68-
'QueueUrl' => $queue,
69-
'AttributeNames' => ['ApproximateReceiveCount'],
70-
'MaxNumberOfMessages' => 5,
71-
'MessageAttributeNames' => ['All'],
72-
]);
73-
74-
if (isset($response['Messages']) && count($response['Messages']) > 0) {
75-
Log::debug('Messages==', [$response['Messages']]);
76-
$queueId = explode('/', $queue);
77-
$queueId = array_pop($queueId);
78-
79-
$class = (array_key_exists($queueId, $this->container['config']->get('sqs-queue-reader.handlers')))
80-
? $this->container['config']->get('sqs-queue-reader.handlers')[$queueId]
81-
: $this->container['config']->get('sqs-queue-reader.default-handler');
82-
83-
$response = $this->modifyPayload($response['Messages'], $class);
84-
Log::debug('New $responseV2==', [$response]);
85-
86-
return new SqsJob($this->container, $this->sqs, $response, $this->connectionName, $queue);
68+
try {
69+
$response = $this->sqs->receiveMessage([
70+
'QueueUrl' => $queue,
71+
'AttributeNames' => ['ApproximateReceiveCount'],
72+
'MaxNumberOfMessages' => 5,
73+
'MessageAttributeNames' => ['All'],
74+
]);
75+
76+
if (isset($response['Messages']) && count($response['Messages']) > 0) {
77+
Log::debug('Messages==', [$response['Messages']]);
78+
$queueId = explode('/', $queue);
79+
$queueId = array_pop($queueId);
80+
81+
$class = (array_key_exists($queueId, $this->container['config']->get('sqs-queue-reader.handlers')))
82+
? $this->container['config']->get('sqs-queue-reader.handlers')[$queueId]
83+
: $this->container['config']->get('sqs-queue-reader.default-handler');
84+
85+
$response = $this->modifyPayload($response['Messages'], $class);
86+
Log::debug('New $responseV2==', [$response]);
87+
88+
return new SqsJob($this->container, $this->sqs, $response, $this->connectionName, $queue);
89+
}
90+
} catch (AwsException $e) {
91+
$msg = 'Line: '. $e->getLine() .', '. $e->getFile() . ', '. $e->getMessage();
92+
throw new \RuntimeException("Aws SQS error: " . $msg);
8793
}
8894
}
8995

@@ -111,9 +117,19 @@ private function modifyPayload($payload, $class)
111117

112118
$body = [];
113119
$attributes = [];
120+
$batchIds = [];
121+
114122
foreach ($payload as $item) {
115123
//Log::debug('Each Messages==', [$item]);
116-
$body[] = json_decode($item['Body'], true);
124+
//$body[] = json_decode($item['Body'], true);
125+
$body[] = [
126+
'messages' => json_decode($item['Body'], true),
127+
'attributes' => $item['Attributes'],
128+
'batchIds' => [
129+
'Id' => $item['MessageId'],
130+
'ReceiptHandle' => $item['ReceiptHandle'],
131+
]
132+
];
117133
$attributes = $item['Attributes'];
118134
$messageId = $item['MessageId'];
119135
$receiptHandle = $item['ReceiptHandle'];

src/SqsQueueReaderServiceProvider.php

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,28 @@ public function boot(): void
2020
Queue::after(static function (JobProcessed $event) {
2121
$data = $event->job->payload();
2222
Log::debug('Job payload==', [$data]);
23-
$event->job->delete();
23+
//$event->job->delete();
24+
25+
$batchIds = array_column($data, 'batchIds');
26+
27+
$batchIds = array_chunk($batchIds, 10);
28+
29+
foreach ($batchIds as $batch) {
30+
//Deletes up to ten messages from the specified queue.
31+
$result = $event->job->deleteMessageBatch([
32+
'Entries' => $batch,
33+
'QueueUrl' => $event->job->getQueue(),
34+
]);
35+
36+
if ($result['Failed']) {
37+
$msg = '';
38+
foreach ($result['Failed'] as $failed) {
39+
$msg .= sprintf("Deleting message failed, code = %s, id = %s, msg = %s, senderfault = %s", $failed['Code'], $failed['Id'], $failed['Message'], $failed['SenderFault']);
40+
}
41+
Log::error('Cannot delete some SQS messages: ', [$msg]);
42+
throw new \RuntimeException("Cannot delete some messages, consult log for more info!");
43+
}
44+
}
2445
});
2546
}
2647
}

0 commit comments

Comments
 (0)