Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

- Added command for retrying jobs.
- Checked http codes when sending packets to loki.

## [1.0.1] - 2025-03-06

- Handled webform elements not present in submission data
Expand Down
22 changes: 21 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ composer require os2web/os2web_audit
drush pm:enable os2web_audit
```

### Drush
## Drush

### Test audit log

The module provides a Drush command named audit:log. This command enables you
to log a test message to the configured logger. The audit:log command accepts a
Expand All @@ -39,6 +41,24 @@ and once as an error message.
drush audit:log 'This is a test message'
```

### Retry jobs

The module also comes with methods for retrying failed jobs in the
`os2web_audit` queue.

```shell
drush audit:retry-failed-jobs
```

Per default, it simply retries all failed jobs however it comes with
the following options:

```shell
--id[=ID] Retry a specific job by ID (e.g. 1245.)
--ignore-state Retry job regardless of state. This only effects the --id option.
--limit[=LIMIT] Retry (up to) a limited number of jobs. Minimum: 1, Maximum: 5000, Default 1000.
```

## Usage

The module exposes a simple `Logger` service which can log an `info` and `error`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@

namespace Drupal\os2web_audit\Drush\Commands;

use Drush\Attributes\Command;
use Drupal\os2web_audit\Service\Logger;
use Drush\Attributes\Argument;
use Drush\Attributes\Command;
use Drush\Commands\DrushCommands;
use Drush\Exceptions\CommandFailedException;
use Symfony\Component\DependencyInjection\Attribute\Autowire;
use Drush\Attributes\Argument;
use Symfony\Component\DependencyInjection\ContainerInterface;
use Drush\Exceptions\CommandFailedException;

/**
* Simple command to send log message into audit log.
*/
class Commands extends DrushCommands {
class LogMessageCommand extends DrushCommands {

/**
* Commands constructor.
Expand Down
175 changes: 175 additions & 0 deletions src/Drush/Commands/RetryFailedQueueCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
<?php

namespace Drupal\os2web_audit\Drush\Commands;

use Drupal\Core\Database\Connection;
use Drupal\advancedqueue\Job;
use Drush\Attributes\Command;
use Drush\Attributes\Option;
use Drush\Commands\DrushCommands;
use Symfony\Component\DependencyInjection\ContainerInterface;

/**
* Simple command to send log message into audit log.
*/
class RetryFailedQueueCommand extends DrushCommands {

private const OS2WEB_AUDIT_QUEUE_ID = 'os2web_audit';
private const ADVANCEDQUEUE_TABLE = 'advancedqueue';

/**
* Commands constructor.
*
* @param \Drupal\Core\Database\Connection $connection
* The database connection.
*/
public function __construct(
protected Connection $connection,
) {
parent::__construct();
}

/**
* {@inheritdoc}
*/
public static function create(ContainerInterface $container): self {
return new static(
$container->get('database'),
);
}

/**
* Retries all failed jobs in the os2web_audit queue.
*
* @param array<string, mixed> $options
* The options array.
*/
#[Command(name: 'audit:retry-failed-jobs')]
#[Option(name: 'id', description: "Retry a specific job by ID (e.g. 1245.)")]
#[Option(name: 'ignore-state', description: 'Retries job regardless of state. This only effects the --id option.')]
#[Option(name: 'limit', description: "Retry (up to) a limited number of jobs. Minimum: 1, Maximum: 5000, Default 1000.")]
public function retryFailedJobs($options = ['id' => NULL, 'ignore-state' => FALSE, 'limit' => NULL]): void {

if (TRUE === $options['id']) {
$this->writeln('Please specify a job ID, e.g. --id=1245.');
return;
}
elseif (is_string($options['id'])) {
$this->retryJob((int) $options['id'], $options['ignore-state']);
return;
}

if (TRUE === $options['limit']) {
// We use the default 1000.
$this->retryJobs(1000);
return;
}
elseif (is_string($options['limit'])) {
$this->retryJobs((int) $options['limit']);
return;
}

$this->retryAllFailedJobs();

}

/**
* Retries all failed jobs in os2web_audit.
*/
private function retryAllFailedJobs(): void {
try {
$this->connection->update('advancedqueue')
->fields(['state' => Job::STATE_QUEUED])
->condition('queue_id', 'os2web_audit')
->condition('state', Job::STATE_FAILURE)
->execute();

$this->output()->writeln('Successfully retried all failed jobs.');
}
catch (\Exception $e) {
$this->output()->writeln($e->getMessage());
}

}

/**
* Retries jobs in the os2web_audit queue.
*/
private function retryJobs(int $limit): void {
if ($limit < 1 || $limit > 5000) {
$this->output()->writeln('Limit should be an integer between 1 and 5000.');
return;
}

try {
$ids = $this->connection->select(self::ADVANCEDQUEUE_TABLE, 'a')
->fields('a', ['job_id'])
->condition('queue_id', self::OS2WEB_AUDIT_QUEUE_ID)
->condition('state', Job::STATE_FAILURE)
->range(0, $limit)
->execute()
->fetchCol();

$this->connection->update(self::ADVANCEDQUEUE_TABLE)
->fields(['state' => Job::STATE_QUEUED])
->condition('queue_id', self::OS2WEB_AUDIT_QUEUE_ID)
->condition('state', Job::STATE_FAILURE)
->condition('job_id', $ids, 'IN')
->execute();

$this->output()->writeln('Successfully retried failed jobs.');
}
catch (\Exception $e) {
$this->output()->writeln($e->getMessage());
}
}

/**
* Retries failed job in the os2web_audit queue.
*/
private function retryJob(int $id, bool $ignoreState): void {

try {
// Check that job exists by fetching its state.
$query = $this->connection->select(self::ADVANCEDQUEUE_TABLE, 'a')
->fields('a', ['state'])
->condition('job_id', $id);

$result = $query->execute()->fetchAssoc();

if (!$result) {
$this->output()->writeln('Job not found.');
return;
}

// State check.
if (!$ignoreState && $result['state'] !== Job::STATE_FAILURE) {
$this->output()->writeln('Job is not in a failed state.');
return;
}

$query = $this->connection->update(self::ADVANCEDQUEUE_TABLE)
->fields(['state' => Job::STATE_QUEUED])
->condition('queue_id', self::OS2WEB_AUDIT_QUEUE_ID)
->condition('job_id', $id);

if (!$ignoreState) {
$query->condition('state', Job::STATE_FAILURE);
}

$result = $query->execute();

if ($result) {
$this->output()->writeln('Successfully retried job.');
}
else {
$this->output()->writeln('Failed retrying job.');
}

}
catch (\Exception $e) {
$this->output()->writeln($e->getMessage());
}
}

}
76 changes: 58 additions & 18 deletions src/Service/LokiClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -119,22 +119,15 @@ private function sendPacket(array $packet): void {
$payload = json_encode($packet, JSON_THROW_ON_ERROR | JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES);
}
catch (\JsonException $e) {
throw new AuditException(
message: 'Payload could not be encoded.',
previous: $e,
pluginName: 'Loki',
);
throw $this->auditException('Payload could not be encoded.', 0, $e);
}

if (NULL === $this->connection) {
$url = sprintf('%s/loki/api/v1/push', $this->entrypoint);
$this->connection = curl_init($url);

if (FALSE === $this->connection) {
throw new ConnectionException(
message: 'Unable to connect to ' . $url,
pluginName: 'Loki',
);
throw $this->connectionException('Unable to initialize curl connection to ' . $url);
}
}

Expand Down Expand Up @@ -163,20 +156,67 @@ private function sendPacket(array $packet): void {
$result = curl_exec($this->connection);

if (FALSE === $result) {
throw new ConnectionException(
message: 'Error sending packet to Loki',
pluginName: 'Loki',
);
throw $this->connectionException('Error sending packet to Loki');
}

if (curl_errno($this->connection)) {
throw new AuditException(
message: curl_error($this->connection),
code: curl_errno($this->connection),
pluginName: 'Loki',
);
throw $this->auditException(curl_error($this->connection), curl_errno($this->connection));
}

$code = curl_getinfo($this->connection, CURLINFO_HTTP_CODE);

if (!in_array($code, [200, 204], TRUE)) {
throw $this->auditException('Error sending packet to Loki', $code);
}
}
}

/**
* Creates an audit exception.
*
* @param string $message
* The log message.
* @param int $code
* The error code.
* @param ?\Throwable $previous
* The previous throwable.
* @param string $pluginName
* The plugin name.
*
* @return \Drupal\os2web_audit\Exception\AuditException
* The created exception.
*/
private function auditException(string $message = '', int $code = 0, ?\Throwable $previous = NULL, string $pluginName = 'Loki'): AuditException {
return new AuditException(
message: $message,
code: $code,
previous: $previous,
pluginName: $pluginName,
);
}

/**
* Creates a connection exception.
*
* @param string $message
* The log message.
* @param int $code
* The error code.
* @param ?\Throwable $previous
* The previous throwable.
* @param string $pluginName
* The plugin name.
*
* @return \Drupal\os2web_audit\Exception\ConnectionException
* The created exception.
*/
private function connectionException(string $message = '', int $code = 0, ?\Throwable $previous = NULL, string $pluginName = 'Loki'): ConnectionException {
return new ConnectionException(
message: $message,
code: $code,
previous: $previous,
pluginName: $pluginName,
);
}

}
Loading