Skip to content

Commit 2f331d0

Browse files
committed
use lua script to pop the task atomically
1 parent 847079f commit 2f331d0

File tree

3 files changed

+61
-33
lines changed

3 files changed

+61
-33
lines changed

src/Handlers/PredisHandler.php

+22-20
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
namespace CodeIgniter\Queue\Handlers;
1515

16+
use CodeIgniter\Autoloader\FileLocator;
1617
use CodeIgniter\Exceptions\CriticalError;
1718
use CodeIgniter\I18n\Time;
1819
use CodeIgniter\Queue\Config\Queue as QueueConfig;
@@ -27,12 +28,20 @@
2728
class PredisHandler extends BaseHandler implements QueueInterface
2829
{
2930
private readonly Client $predis;
31+
private readonly string $luaScript;
3032

3133
public function __construct(protected QueueConfig $config)
3234
{
3335
try {
3436
$this->predis = new Client($config->predis, ['prefix' => $config->predis['prefix']]);
3537
$this->predis->time();
38+
39+
$locator = new FileLocator(service('autoloader'));
40+
$luaScript = $locator->locateFile('CodeIgniter\Queue\Lua\pop_task', null, 'lua');
41+
if ($luaScript === false) {
42+
throw new CriticalError('Queue: LUA script for Predis is not available.');
43+
}
44+
$this->luaScript = file_get_contents($luaScript);
3645
} catch (Exception $e) {
3746
throw new CriticalError('Queue: Predis connection refused (' . $e->getMessage() . ').');
3847
}
@@ -77,30 +86,23 @@ public function push(string $queue, string $job, array $data): bool
7786
*/
7887
public function pop(string $queue, array $priorities): ?QueueJob
7988
{
80-
$tasks = [];
81-
$now = Time::now()->timestamp;
82-
83-
foreach ($priorities as $priority) {
84-
$tasks = $this->predis->zrangebyscore(
85-
"queues:{$queue}:{$priority}",
86-
'-inf',
87-
$now,
88-
['LIMIT' => [0, 1]]
89-
);
90-
if ($tasks !== []) {
91-
$removed = $this->predis->zrem("queues:{$queue}:{$priority}", ...$tasks);
92-
if ($removed !== 0) {
93-
break;
94-
}
95-
$tasks = [];
96-
}
97-
}
89+
$now = Time::now()->timestamp;
90+
91+
// Prepare the arguments for the Lua script
92+
$args = [
93+
'queues:' . $queue, // KEYS[1]
94+
$now, // ARGV[2]
95+
json_encode($priorities), // ARGV[3]
96+
];
97+
98+
// Execute the Lua script
99+
$task = $this->predis->eval($this->luaScript, 1, ...$args);
98100

99-
if ($tasks === []) {
101+
if ($task === null) {
100102
return null;
101103
}
102104

103-
$queueJob = new QueueJob(json_decode((string) $tasks[0], true));
105+
$queueJob = new QueueJob(json_decode((string) $task, true));
104106

105107
// Set the actual status as in DB.
106108
$queueJob->status = Status::RESERVED->value;

src/Handlers/RedisHandler.php

+22-13
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
namespace CodeIgniter\Queue\Handlers;
1515

16+
use CodeIgniter\Autoloader\FileLocator;
1617
use CodeIgniter\Exceptions\CriticalError;
1718
use CodeIgniter\I18n\Time;
1819
use CodeIgniter\Queue\Config\Queue as QueueConfig;
@@ -27,6 +28,7 @@
2728
class RedisHandler extends BaseHandler implements QueueInterface
2829
{
2930
private readonly Redis $redis;
31+
private readonly string $luaScript;
3032

3133
public function __construct(protected QueueConfig $config)
3234
{
@@ -48,6 +50,13 @@ public function __construct(protected QueueConfig $config)
4850
if (isset($config->redis['prefix']) && ! $this->redis->setOption(Redis::OPT_PREFIX, $config->redis['prefix'])) {
4951
throw new CriticalError('Queue: Redis setting prefix failed.');
5052
}
53+
54+
$locator = new FileLocator(service('autoloader'));
55+
$luaScript = $locator->locateFile('CodeIgniter\Queue\Lua\pop_task', null, 'lua');
56+
if ($luaScript === false) {
57+
throw new CriticalError('Queue: LUA script for Redis is not available.');
58+
}
59+
$this->luaScript = file_get_contents($luaScript);
5160
} catch (RedisException $e) {
5261
throw new CriticalError('Queue: RedisException occurred with message (' . $e->getMessage() . ').');
5362
}
@@ -96,23 +105,23 @@ public function push(string $queue, string $job, array $data): bool
96105
*/
97106
public function pop(string $queue, array $priorities): ?QueueJob
98107
{
99-
$tasks = [];
100-
$now = Time::now()->timestamp;
101-
102-
foreach ($priorities as $priority) {
103-
if ($tasks = $this->redis->zRangeByScore("queues:{$queue}:{$priority}", '-inf', (string) $now, ['limit' => [0, 1]])) {
104-
if ($this->redis->zRem("queues:{$queue}:{$priority}", ...$tasks)) {
105-
break;
106-
}
107-
$tasks = [];
108-
}
109-
}
108+
$now = Time::now()->timestamp;
109+
110+
// Prepare the arguments for the Lua script
111+
$args = [
112+
'queues:' . $queue, // KEYS[1]
113+
$now, // ARGV[2]
114+
json_encode($priorities), // ARGV[3]
115+
];
116+
117+
// Execute the Lua script
118+
$task = $this->redis->eval($this->luaScript, $args, 1);
110119

111-
if ($tasks === []) {
120+
if ($task === false) {
112121
return null;
113122
}
114123

115-
$queueJob = new QueueJob(json_decode((string) $tasks[0], true));
124+
$queueJob = new QueueJob(json_decode((string) $task, true));
116125

117126
// Set the actual status as in DB.
118127
$queueJob->status = Status::RESERVED->value;

src/Lua/pop_task.lua

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
local queue = KEYS[1]
2+
local now = tonumber(ARGV[1])
3+
local priorities = cjson.decode(ARGV[2])
4+
local task = nil
5+
6+
for _, priority in ipairs(priorities) do
7+
local key = queue .. ':' .. priority
8+
local tasks = redis.call('ZRANGEBYSCORE', key, '-inf', tostring(now), 'LIMIT', 0, 1)
9+
10+
if #tasks > 0 then
11+
redis.call('ZREM', key, tasks[1])
12+
task = tasks[1]
13+
break
14+
end
15+
end
16+
17+
return task

0 commit comments

Comments
 (0)