diff --git a/java/outbox.md b/java/outbox.md index 7a400f983..3bac8b19b 100644 --- a/java/outbox.md +++ b/java/outbox.md @@ -355,29 +355,65 @@ It is crucial to make the service `OutboxDeadLetterQueueService` accessible for ::: -### Filter for Dead Entries +### Reading Dead Entries -This filtering can't be done on the database since the maximum number of attempts is only available from the CDS properties. - -To ensure that only dead outbox entries are returned when reading `DeadOutboxMessages`, the following code provides the handler for the `DeadLetterQueueService` and the `@After-READ` handler that filters for the dead outbox entries: +This filtering of dead entries is done on the database by adding `where` clauses for each outbox and their maximum number of retries. The following code provides the handler for the `DeadLetterQueueService` that modifies the where clause by adding +additional conditions for filtering the outbox entries: ```java @Component @ServiceName(OutboxDeadLetterQueueService_.CDS_NAME) public class DeadOutboxMessagesHandler implements EventHandler { - @After(entity = DeadOutboxMessages_.CDS_NAME) - public void filterDeadEntries(CdsReadEventContext context) { - CdsProperties.Outbox outboxConfigs = context.getCdsRuntime().getEnvironment().getCdsProperties().getOutbox(); - List deadEntries = context - .getResult() - .listOf(DeadOutboxMessages.class) - .stream() - .filter(entry -> entry.getAttempts() >= outboxConfigs.getService(entry.getTarget()).getMaxAttempts()) - .toList(); - - context.setResult(deadEntries); - } + private final PersistenceService db; + + public DeadOutboxMessagesHandler(@Qualifier(PersistenceService.DEFAULT_NAME) PersistenceService db) { + this.db = db; + } + + @Before(entity = DeadOutboxMessages_.CDS_NAME) + public void modifyWhereClause(CdsReadEventContext context) { + CqnSelect cqn = context.getCqn(); + Optional outboxFilters = this.createOutboxFilters(context.getCdsRuntime()); + CqnSelect modifiedCqn = copy( + cqn, + new Modifier() { + @Override + public CqnPredicate where(Predicate where) { + if (where != null && outboxFilters.isPresent()) { + return where.and(outboxFilters.get()); + } else if (where == null && outboxFilters.isPresent()) { + return outboxFilters.get(); + } else if (where != null && !outboxFilters.isPresent()) { + return where; + } else { + return null; + } + } + }); + + context.setCqn(modifiedCqn); + } + + private Optional createOutboxFilters(CdsRuntime runtime) { + List outboxServices = runtime.getServiceCatalog().getServices(OutboxService.class) + .filter(s -> !s.getName().equals(OutboxService.INMEMORY_NAME)).toList(); + CdsProperties.Outbox outboxConfigs = runtime.getEnvironment().getCdsProperties().getOutbox(); + + Predicate where = null; + for(OutboxService service : outboxServices) { + OutboxServiceConfig config = outboxConfigs.getService(service.getName()); + Predicate targetPredicate = CQL.get(Messages.TARGET).eq(service.getName()).and(CQL.get(Messages.ATTEMPTS).ge(config.getMaxAttempts())); + + if (where == null) { + where = targetPredicate; + } else { + where = where.or(targetPredicate); + } + } + + return Optional.ofNullable(where); + } } ``` @@ -419,8 +455,7 @@ The injected `PersistenceService` instance is used to perform the operations on [Learn more about CQL statement inspection.](./working-with-cql/query-introspection#cqnanalyzer){.learn-more} ::: tip Use paging logic -Avoid to read all entries of the `cds.outbox.Messages` or `OutboxDeadLetterQueueService.DeadOutboxMessages` table at once, as the size of an entry is unpredictable -and depends on the size of the payload. Prefer paging logic instead. +Avoid to read all entries of the `cds.outbox.Messages` or `OutboxDeadLetterQueueService.DeadOutboxMessages` table at once, as the size of an entry is unpredictable and depends on the size of the payload. Prefer paging logic instead. ::: ## Observability using Open Telemetry