Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ im = { workspace = true }
job = { workspace = true }
async-trait = { workspace = true }
derive_builder = { workspace = true }
tracing = { workspace = true }

[dev-dependencies]
anyhow = { workspace = true }
Expand Down
47 changes: 47 additions & 0 deletions src/handle.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
use std::future::Future;
use std::panic::AssertUnwindSafe;

use futures::FutureExt;
use tokio::task::JoinHandle;

#[derive(Debug)]
Expand All @@ -16,3 +20,46 @@ impl Drop for OwnedTaskHandle {
}
}
}

/// Spawn a background task whose exit and panics surface as OTEL spans.
///
/// All long-lived obix tasks (cache loop, pg-listener forwarder) must go
/// through here so that any silent death — normal exit or panic — emits a
/// short-lived error span (queryable in Honeycomb) instead of leaving the
/// outbox half-alive with no signal.
pub(crate) fn spawn_supervised<F>(task_name: &'static str, fut: F) -> JoinHandle<()>
where
F: Future<Output = ()> + Send + 'static,
{
tokio::spawn(async move {
match AssertUnwindSafe(fut).catch_unwind().await {
Ok(()) => record_task_exited(task_name),
Err(panic) => {
let msg = if let Some(s) = panic.downcast_ref::<&str>() {
(*s).to_string()
} else if let Some(s) = panic.downcast_ref::<String>() {
s.clone()
} else {
"<unknown panic payload>".to_string()
};
record_task_panicked(task_name, &msg);
}
}
})
}

#[tracing::instrument(
name = "obix.supervisor.task_exited",
level = "error",
skip_all,
fields(otel.status_code = "ERROR", task = %task),
)]
fn record_task_exited(task: &'static str) {}

#[tracing::instrument(
name = "obix.supervisor.task_panicked",
level = "error",
skip_all,
fields(otel.status_code = "ERROR", task = %task, panic = %panic),
)]
fn record_task_panicked(task: &'static str, panic: &str) {}
41 changes: 38 additions & 3 deletions src/out/ephemeral/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ use tokio::sync::{broadcast, mpsc, oneshot};
use std::sync::Arc;

use crate::out::event::*;
use crate::{config::*, handle::OwnedTaskHandle};
use crate::{
config::*,
handle::{OwnedTaskHandle, spawn_supervised},
};

pub struct CacheHandle<P>
where
Expand Down Expand Up @@ -181,7 +184,7 @@ where
) -> Result<OwnedTaskHandle, sqlx::Error> {
let pool = pool.clone();

let handle = tokio::spawn(async move {
let handle = spawn_supervised("obix::ephemeral_cache_loop", async move {
let mut ephemeral_cache: im::HashMap<EphemeralEventType, Arc<EphemeralOutboxEvent<P>>> =
im::HashMap::new();

Expand All @@ -195,6 +198,7 @@ where
let _ = sender.send(ephemeral_cache.clone());
}
None => {
record_backfill_channel_closed();
break;
}
}
Expand All @@ -218,10 +222,12 @@ where
);
}
}
Err(broadcast::error::RecvError::Lagged(_)) => {
Err(broadcast::error::RecvError::Lagged(n)) => {
record_cache_fill_lagged(n);
continue;
}
Err(broadcast::error::RecvError::Closed) => {
record_cache_fill_closed();
break;
}
}
Expand Down Expand Up @@ -260,6 +266,7 @@ where
}
}
None => {
record_notification_channel_closed();
break;
}
}
Expand All @@ -270,3 +277,31 @@ where
Ok(OwnedTaskHandle::new(handle))
}
}

#[tracing::instrument(
name = "obix.ephemeral_cache.backfill_channel_closed",
level = "error",
fields(otel.status_code = "ERROR"),
)]
fn record_backfill_channel_closed() {}

#[tracing::instrument(
name = "obix.ephemeral_cache.cache_fill_lagged",
level = "error",
fields(otel.status_code = "ERROR"),
)]
fn record_cache_fill_lagged(dropped: u64) {}

#[tracing::instrument(
name = "obix.ephemeral_cache.cache_fill_closed",
level = "error",
fields(otel.status_code = "ERROR"),
)]
fn record_cache_fill_closed() {}

#[tracing::instrument(
name = "obix.ephemeral_cache.notification_channel_closed",
level = "error",
fields(otel.status_code = "ERROR"),
)]
fn record_notification_channel_closed() {}
10 changes: 8 additions & 2 deletions src/out/ephemeral/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use futures::Stream;
use serde::{Serialize, de::DeserializeOwned};
use std::{pin::Pin, sync::Arc, task::Poll};
use tokio::sync::oneshot;
use tokio_stream::wrappers::BroadcastStream;
use tokio_stream::wrappers::{BroadcastStream, errors::BroadcastStreamRecvError};

use super::cache::CacheHandle;
use crate::out::event::{EphemeralEventType, EphemeralOutboxEvent};
Expand Down Expand Up @@ -92,10 +92,16 @@ where
loop {
match Pin::new(&mut this.event_receiver).poll_next(cx) {
Poll::Ready(Some(Ok(event))) => return Poll::Ready(Some(event)),
Poll::Ready(Some(Err(_))) => continue, // Skip lagged, try next
Poll::Ready(Some(Err(BroadcastStreamRecvError::Lagged(n)))) => {
record_lagged(n);
continue;
}
Poll::Ready(None) => return Poll::Ready(None),
Poll::Pending => return Poll::Pending,
}
}
}
}

#[tracing::instrument(name = "obix.ephemeral_listener.lagged", level = "warn")]
fn record_lagged(dropped: u64) {}
71 changes: 67 additions & 4 deletions src/out/persistent/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@ use std::sync::{
};

use crate::out::event::*;
use crate::{config::*, handle::OwnedTaskHandle, sequence::EventSequence};
use crate::{
config::*,
handle::{OwnedTaskHandle, spawn_supervised},
sequence::EventSequence,
};

pub struct CacheHandle<P>
where
Expand Down Expand Up @@ -145,9 +149,15 @@ where
for (seq, evt) in cache.range((Bound::Excluded(last_broadcast_sequence), Bound::Unbounded))
{
if *seq != last_broadcast_sequence.next() {
record_sequence_gap(
u64::from(last_broadcast_sequence),
u64::from(*seq),
highest_known_sequence.load(Ordering::Relaxed),
);
break;
}
if persistent_event_sender.send(evt.clone()).is_err() {
record_no_receivers(u64::from(*seq));
break;
}
last_broadcast_sequence = *seq;
Expand Down Expand Up @@ -188,7 +198,10 @@ where
current_sequence = seq;
}
}
Err(_) => break,
Err(e) => {
record_backfill_failed(&e, u64::from(current_sequence));
break;
}
}
}

Expand Down Expand Up @@ -267,7 +280,7 @@ where

let initial_sequence = EventSequence::from(highest_known_sequence.load(Ordering::Relaxed));

let handle = tokio::spawn(async move {
let handle = spawn_supervised("obix::persistent_cache_loop", async move {
let mut persistent_cache: im::OrdMap<EventSequence, Arc<PersistentOutboxEvent<P>>> =
im::OrdMap::new();
let mut last_broadcast_sequence = initial_sequence;
Expand Down Expand Up @@ -295,6 +308,7 @@ where
));
}
None => {
record_backfill_channel_closed();
break;
}
}
Expand Down Expand Up @@ -326,10 +340,16 @@ where
);
}
}
Err(broadcast::error::RecvError::Lagged(_)) => {
Err(broadcast::error::RecvError::Lagged(n)) => {
record_cache_fill_lagged(
n,
u64::from(last_broadcast_sequence),
highest_known_sequence.load(Ordering::Relaxed),
);
continue;
}
Err(broadcast::error::RecvError::Closed) => {
record_cache_fill_closed();
break;
}
}
Expand Down Expand Up @@ -375,6 +395,7 @@ where
}
}
None => {
record_notification_channel_closed();
break;
}
}
Expand All @@ -393,3 +414,45 @@ where
Ok(OwnedTaskHandle::new(handle))
}
}

#[tracing::instrument(name = "obix.persistent_cache.sequence_gap", level = "warn")]
fn record_sequence_gap(last_broadcast_sequence: u64, next_in_cache: u64, highest_known: u64) {}

#[tracing::instrument(name = "obix.persistent_cache.no_receivers", level = "warn")]
fn record_no_receivers(sequence: u64) {}

#[tracing::instrument(
name = "obix.persistent_cache.backfill_failed",
level = "warn",
skip_all,
fields(error = %error, current_sequence = current_sequence),
)]
fn record_backfill_failed(error: &sqlx::Error, current_sequence: u64) {}

#[tracing::instrument(
name = "obix.persistent_cache.backfill_channel_closed",
level = "error",
fields(otel.status_code = "ERROR"),
)]
fn record_backfill_channel_closed() {}

#[tracing::instrument(
name = "obix.persistent_cache.cache_fill_lagged",
level = "error",
fields(otel.status_code = "ERROR"),
)]
fn record_cache_fill_lagged(dropped: u64, last_broadcast_sequence: u64, highest_known: u64) {}

#[tracing::instrument(
name = "obix.persistent_cache.cache_fill_closed",
level = "error",
fields(otel.status_code = "ERROR"),
)]
fn record_cache_fill_closed() {}

#[tracing::instrument(
name = "obix.persistent_cache.notification_channel_closed",
level = "error",
fields(otel.status_code = "ERROR"),
)]
fn record_notification_channel_closed() {}
11 changes: 10 additions & 1 deletion src/out/persistent/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,13 @@ where
Poll::Ready(Some(Ok(event))) => {
this.maybe_add_to_cache(event);
}
Poll::Ready(Some(Err(BroadcastStreamRecvError::Lagged(_)))) => (),
Poll::Ready(Some(Err(BroadcastStreamRecvError::Lagged(n)))) => {
record_lagged(
n,
u64::from(this.last_returned_sequence),
u64::from(this.latest_known),
);
}
Poll::Pending => break,
}
}
Expand Down Expand Up @@ -128,3 +134,6 @@ where
Poll::Pending
}
}

#[tracing::instrument(name = "obix.persistent_listener.lagged", level = "warn")]
fn record_lagged(dropped: u64, last_returned_sequence: u64, latest_known: u64) {}
36 changes: 32 additions & 4 deletions src/out/pg_notify.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use tokio::sync::mpsc;

use crate::{handle::OwnedTaskHandle, tables::MailboxTables};
use crate::{
handle::{OwnedTaskHandle, spawn_supervised},
tables::MailboxTables,
};

pub async fn spawn_pg_listener<Tables>(
pool: &sqlx::PgPool,
Expand All @@ -19,8 +22,16 @@ where
])
.await?;

let handle = tokio::spawn(async move {
while let Ok(notification) = listener.recv().await {
let handle = spawn_supervised("obix::pg_listener", async move {
loop {
let notification = match listener.recv().await {
Ok(notification) => notification,
Err(e) => {
record_recv_error(&e);
break;
}
};

// Route notification to appropriate channel with backpressure
let result = if notification.channel() == Tables::persistent_outbox_events_channel() {
persistent_notification_tx.send(notification).await
Expand All @@ -32,11 +43,28 @@ where
};

// If send fails, receiver is dropped, so break
if result.is_err() {
if let Err(e) = result {
record_forward_failed(&e);
break;
}
}
});

Ok(OwnedTaskHandle::new(handle))
}

#[tracing::instrument(
name = "obix.pg_listener.recv_error",
level = "error",
skip_all,
fields(otel.status_code = "ERROR", error = %error),
)]
fn record_recv_error(error: &sqlx::Error) {}

#[tracing::instrument(
name = "obix.pg_listener.forward_failed",
level = "error",
skip_all,
fields(otel.status_code = "ERROR", error = %error),
)]
fn record_forward_failed<T>(error: &tokio::sync::mpsc::error::SendError<T>) {}
Loading