Skip to content

feat: Queue::push() to return QueuePushResult instead of boolean #62

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion docs/basic-usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,12 @@ service('queue')->push('emails', 'email', ['message' => 'Email message goes here

We will be pushing `email` job to the `emails` queue.

As a result of calling the `push()` method, you will receive a `QueuePushResult` object, which you can inspect if needed. It provides the following information:

- `getStatus()`: Indicates whether the job was successfully added to the queue.
- `getJobId()`: Returns the ID of the job that was added to the queue.
- `getError()`: Returns any error that occurred if the job was not added.

### Sending chained jobs to the queue

Sending chained jobs is also simple and lets you specify the particular order of the job execution.
Expand All @@ -172,9 +178,11 @@ service('queue')->chain(function($chain) {
});
```

In the example above, we will send jobs to the `reports` and `emails` queue. First, we will generate a report for given user with the `generate-report` job, after this, we will send an email with `email` job.
In the example above, we will send jobs to the `reports` and `emails` queues. First, we will generate a report for given user with the `generate-report` job, after this, we will send an email with `email` job.
The `email` job will be executed only if the `generate-report` job was successful.

As with the `push()` method, calling the `chain()` method also returns a `QueuePushResult` object.

### Consuming the queue

Since we sent our sample job to queue `emails`, then we need to run the worker with the appropriate queue:
Expand Down
5 changes: 3 additions & 2 deletions src/Handlers/BaseHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
use CodeIgniter\Queue\Models\QueueJobFailedModel;
use CodeIgniter\Queue\Payloads\ChainBuilder;
use CodeIgniter\Queue\Payloads\PayloadMetadata;
use CodeIgniter\Queue\QueuePushResult;
use CodeIgniter\Queue\Traits\HasQueueValidation;
use ReflectionException;
use Throwable;
Expand All @@ -39,7 +40,7 @@ abstract class BaseHandler

abstract public function name(): string;

abstract public function push(string $queue, string $job, array $data, ?PayloadMetadata $metadata = null): bool;
abstract public function push(string $queue, string $job, array $data, ?PayloadMetadata $metadata = null): QueuePushResult;

abstract public function pop(string $queue, array $priorities): ?QueueJob;

Expand Down Expand Up @@ -153,7 +154,7 @@ public function setPriority(string $priority): static
*
* @param Closure $callback Chain definition callback
*/
public function chain(Closure $callback): bool
public function chain(Closure $callback): QueuePushResult
{
$chainBuilder = new ChainBuilder($this);
$callback($chainBuilder);
Expand Down
17 changes: 13 additions & 4 deletions src/Handlers/DatabaseHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
use CodeIgniter\Queue\Models\QueueJobModel;
use CodeIgniter\Queue\Payloads\Payload;
use CodeIgniter\Queue\Payloads\PayloadMetadata;
use CodeIgniter\Queue\QueuePushResult;
use ReflectionException;
use Throwable;

Expand All @@ -44,10 +45,8 @@ public function name(): string

/**
* Add job to the queue.
*
* @throws ReflectionException
*/
public function push(string $queue, string $job, array $data, ?PayloadMetadata $metadata = null): bool
public function push(string $queue, string $job, array $data, ?PayloadMetadata $metadata = null): QueuePushResult
{
$this->validateJobAndPriority($queue, $job);

Expand All @@ -62,7 +61,17 @@ public function push(string $queue, string $job, array $data, ?PayloadMetadata $

$this->priority = $this->delay = null;

return $this->jobModel->insert($queueJob, false);
try {
$jobId = $this->jobModel->insert($queueJob);
} catch (Throwable $e) {
return QueuePushResult::failure($e->getMessage());
}

if ($jobId === 0) {
return QueuePushResult::failure('Failed to insert job into the database.');
}

return QueuePushResult::success($jobId);
}

/**
Expand Down
18 changes: 14 additions & 4 deletions src/Handlers/PredisHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
use CodeIgniter\Queue\Interfaces\QueueInterface;
use CodeIgniter\Queue\Payloads\Payload;
use CodeIgniter\Queue\Payloads\PayloadMetadata;
use CodeIgniter\Queue\QueuePushResult;
use Exception;
use Predis\Client;
use Throwable;
Expand Down Expand Up @@ -59,16 +60,17 @@ public function name(): string
/**
* Add job to the queue.
*/
public function push(string $queue, string $job, array $data, ?PayloadMetadata $metadata = null): bool
public function push(string $queue, string $job, array $data, ?PayloadMetadata $metadata = null): QueuePushResult
{
$this->validateJobAndPriority($queue, $job);

helper('text');

$jobId = (int) random_string('numeric', 16);
$availableAt = Time::now()->addSeconds($this->delay ?? 0);

$queueJob = new QueueJob([
'id' => random_string('numeric', 16),
'id' => $jobId,
'queue' => $queue,
'payload' => new Payload($job, $data, $metadata),
'priority' => $this->priority,
Expand All @@ -77,11 +79,19 @@ public function push(string $queue, string $job, array $data, ?PayloadMetadata $
'available_at' => $availableAt,
]);

$result = $this->predis->zadd("queues:{$queue}:{$this->priority}", [json_encode($queueJob) => $availableAt->timestamp]);
try {
$result = $this->predis->zadd("queues:{$queue}:{$this->priority}", [json_encode($queueJob) => $availableAt->timestamp]);
} catch (Throwable $e) {
return QueuePushResult::failure('Unexpected Redis error: ' . $e->getMessage());
} finally {
$this->priority = $this->delay = null;
}

$this->priority = $this->delay = null;

return $result > 0;
return $result > 0
? QueuePushResult::success($jobId)
: QueuePushResult::failure('Job already exists in the queue.');
}

/**
Expand Down
22 changes: 17 additions & 5 deletions src/Handlers/RedisHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
use CodeIgniter\Queue\Interfaces\QueueInterface;
use CodeIgniter\Queue\Payloads\Payload;
use CodeIgniter\Queue\Payloads\PayloadMetadata;
use CodeIgniter\Queue\QueuePushResult;
use Redis;
use RedisException;
use Throwable;
Expand Down Expand Up @@ -76,16 +77,17 @@ public function name(): string
*
* @throws RedisException
*/
public function push(string $queue, string $job, array $data, ?PayloadMetadata $metadata = null): bool
public function push(string $queue, string $job, array $data, ?PayloadMetadata $metadata = null): QueuePushResult
{
$this->validateJobAndPriority($queue, $job);

helper('text');

$availableAt = Time::now()->addSeconds($this->delay ?? 0);
$jobId = (int) random_string('numeric', 16);

$queueJob = new QueueJob([
'id' => random_string('numeric', 16),
'id' => $jobId,
'queue' => $queue,
'payload' => new Payload($job, $data, $metadata),
'priority' => $this->priority,
Expand All @@ -94,11 +96,21 @@ public function push(string $queue, string $job, array $data, ?PayloadMetadata $
'available_at' => $availableAt,
]);

$result = (int) $this->redis->zAdd("queues:{$queue}:{$this->priority}", $availableAt->timestamp, json_encode($queueJob));
try {
$result = $this->redis->zAdd("queues:{$queue}:{$this->priority}", $availableAt->timestamp, json_encode($queueJob));
} catch (Throwable $e) {
return QueuePushResult::failure('Unexpected Redis error: ' . $e->getMessage());
} finally {
$this->priority = $this->delay = null;
}

$this->priority = $this->delay = null;
if ($result === false) {
return QueuePushResult::failure('Failed to add job to Redis.');
}

return $result > 0;
return (int) $result > 0
? QueuePushResult::success($jobId)
: QueuePushResult::failure('Job already exists in the queue.');
}

/**
Expand Down
5 changes: 3 additions & 2 deletions src/Payloads/ChainBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
namespace CodeIgniter\Queue\Payloads;

use CodeIgniter\Queue\Handlers\BaseHandler;
use CodeIgniter\Queue\QueuePushResult;

class ChainBuilder
{
Expand Down Expand Up @@ -44,10 +45,10 @@ public function push(string $queue, string $jobName, array $data = []): ChainEle
/**
* Dispatch the chain of jobs
*/
public function dispatch(): bool
public function dispatch(): QueuePushResult
{
if ($this->payloads->count() === 0) {
return true;
return QueuePushResult::failure('No jobs to dispatch.');
}

$current = $this->payloads->shift();
Expand Down
49 changes: 49 additions & 0 deletions src/QueuePushResult.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
<?php

declare(strict_types=1);

/**
* This file is part of CodeIgniter Queue.
*
* (c) CodeIgniter Foundation <[email protected]>
*
* For the full copyright and license information, please view
* the LICENSE file that was distributed with this source code.
*/

namespace CodeIgniter\Queue;

class QueuePushResult
{
public function __construct(
protected readonly bool $success,
protected readonly ?int $jobId = null,
protected readonly ?string $error = null,
) {
}

public static function success(int $jobId): self
{
return new self(true, $jobId);
}

public static function failure(?string $error = null): self
{
return new self(false, null, $error);
}

public function getStatus(): bool
{
return $this->success;
}

public function getJobId(): ?int
{
return $this->jobId;
}

public function getError(): ?string
{
return $this->error;
}
}
14 changes: 7 additions & 7 deletions tests/DatabaseHandlerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public function testPush(): void
$handler = new DatabaseHandler($this->config);
$result = $handler->push('queue', 'success', ['key' => 'value']);

$this->assertTrue($result);
$this->assertTrue($result->getStatus());
$this->seeInDatabase('queue_jobs', [
'queue' => 'queue',
'payload' => json_encode(['job' => 'success', 'data' => ['key' => 'value'], 'metadata' => []]),
Expand All @@ -103,7 +103,7 @@ public function testPushWithPriority(): void
$handler = new DatabaseHandler($this->config);
$result = $handler->setPriority('high')->push('queue', 'success', ['key' => 'value']);

$this->assertTrue($result);
$this->assertTrue($result->getStatus());
$this->seeInDatabase('queue_jobs', [
'queue' => 'queue',
'payload' => json_encode(['job' => 'success', 'data' => ['key' => 'value'], 'metadata' => []]),
Expand All @@ -122,7 +122,7 @@ public function testPushAndPopWithPriority(): void
$handler = new DatabaseHandler($this->config);
$result = $handler->push('queue', 'success', ['key1' => 'value1']);

$this->assertTrue($result);
$this->assertTrue($result->getStatus());
$this->seeInDatabase('queue_jobs', [
'queue' => 'queue',
'payload' => json_encode(['job' => 'success', 'data' => ['key1' => 'value1'], 'metadata' => []]),
Expand All @@ -132,7 +132,7 @@ public function testPushAndPopWithPriority(): void

$result = $handler->setPriority('high')->push('queue', 'success', ['key2' => 'value2']);

$this->assertTrue($result);
$this->assertTrue($result->getStatus());
$this->seeInDatabase('queue_jobs', [
'queue' => 'queue',
'payload' => json_encode(['job' => 'success', 'data' => ['key2' => 'value2'], 'metadata' => []]),
Expand Down Expand Up @@ -161,7 +161,7 @@ public function testPushWithDelay(): void
$handler = new DatabaseHandler($this->config);
$result = $handler->setDelay(MINUTE)->push('queue-delay', 'success', ['key' => 'value']);

$this->assertTrue($result);
$this->assertTrue($result->getStatus());

$availableAt = 1703859376;

Expand All @@ -188,7 +188,7 @@ public function testChain(): void
->push('queue', 'success', ['key2' => 'value2']);
});

$this->assertTrue($result);
$this->assertTrue($result->getStatus());
$this->seeInDatabase('queue_jobs', [
'queue' => 'queue',
'payload' => json_encode([
Expand Down Expand Up @@ -221,7 +221,7 @@ public function testChainWithPriorityAndDelay(): void
->setDelay(120);
});

$this->assertTrue($result);
$this->assertTrue($result->getStatus());
$this->seeInDatabase('queue_jobs', [
'queue' => 'queue',
'payload' => json_encode([
Expand Down
8 changes: 4 additions & 4 deletions tests/Payloads/ChainBuilderTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public function testChainWithSingleJob(): void
$chain->push('queue', 'success', ['key' => 'value']);
});

$this->assertTrue($result);
$this->assertTrue($result->getStatus());
$this->seeInDatabase('queue_jobs', [
'queue' => 'queue',
'payload' => json_encode([
Expand All @@ -84,7 +84,7 @@ public function testEmptyChain(): void
// No jobs added
});

$this->assertTrue($result);
$this->assertFalse($result->getStatus());
$this->seeInDatabase('queue_jobs', []);
}

Expand All @@ -99,7 +99,7 @@ public function testMultipleDifferentQueues(): void
->push('queue2', 'success', ['key2' => 'value2']);
});

$this->assertTrue($result);
$this->assertTrue($result->getStatus());
$this->seeInDatabase('queue_jobs', [
'queue' => 'queue1',
'payload' => json_encode([
Expand Down Expand Up @@ -132,7 +132,7 @@ public function testChainWithManyJobs(): void
->push('queue', 'success', ['key3' => 'value3']);
});

$this->assertTrue($result);
$this->assertTrue($result->getStatus());
$this->seeInDatabase('queue_jobs', [
'queue' => 'queue',
'payload' => json_encode([
Expand Down
Loading
Loading