Skip to content

Commit

Permalink
fix: various bug fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
MasterPtato authored and NathanFlurry committed Feb 26, 2025
1 parent 2e3c189 commit ed680a5
Show file tree
Hide file tree
Showing 13 changed files with 100 additions and 36 deletions.
13 changes: 13 additions & 0 deletions packages/common/chirp-workflow/core/src/db/crdb_nats/debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ impl DatabaseDebug for DatabaseCrdbNats {
NULL AS tags,
0 AS event_type,
version,
create_ts,
activity_name AS name,
NULL AS auxiliary_id,
NULL AS auxiliary_id2,
Expand All @@ -233,6 +234,7 @@ impl DatabaseDebug for DatabaseCrdbNats {
NULL AS tags,
1 AS event_type,
version,
NULL as create_ts,
signal_name AS name,
signal_id::UUID AS auxiliary_id,
NULL AS auxiliary_id2,
Expand All @@ -254,6 +256,7 @@ impl DatabaseDebug for DatabaseCrdbNats {
s.tags,
2 AS event_type,
version,
NULL as create_ts,
se.signal_name AS name,
se.signal_id AS auxiliary_id,
s2.workflow_id AS auxiliary_id2,
Expand All @@ -279,6 +282,7 @@ impl DatabaseDebug for DatabaseCrdbNats {
tags,
3 AS event_type,
version,
NULL as create_ts,
message_name AS name,
NULL AS auxiliary_id,
NULL AS auxiliary_id2,
Expand All @@ -300,6 +304,7 @@ impl DatabaseDebug for DatabaseCrdbNats {
COALESCE(w.tags, '{}'::JSONB) AS tags,
4 AS event_type,
version,
sw.create_ts,
w.workflow_name AS name,
sw.sub_workflow_id AS auxiliary_id,
NULL AS auxiliary_id2,
Expand All @@ -323,6 +328,7 @@ impl DatabaseDebug for DatabaseCrdbNats {
NULL AS tags,
5 AS event_type,
version,
NULL as create_ts,
NULL AS name,
NULL AS auxiliary_id,
NULL AS auxiliary_id2,
Expand All @@ -343,6 +349,7 @@ impl DatabaseDebug for DatabaseCrdbNats {
NULL AS tags,
6 AS event_type,
version,
NULL as create_ts,
NULL AS name,
NULL AS auxiliary_id,
NULL AS auxiliary_id2,
Expand All @@ -363,6 +370,7 @@ impl DatabaseDebug for DatabaseCrdbNats {
NULL AS tags,
7 AS event_type,
version,
NULL as create_ts,
NULL AS name,
NULL AS auxiliary_id,
NULL AS auxiliary_id2,
Expand All @@ -383,6 +391,7 @@ impl DatabaseDebug for DatabaseCrdbNats {
NULL AS tags,
8 AS event_type,
1 AS version,
NULL as create_ts,
NULL AS name,
NULL AS auxiliary_id,
NULL AS auxiliary_id2,
Expand All @@ -403,6 +412,7 @@ impl DatabaseDebug for DatabaseCrdbNats {
NULL AS tags,
9 AS event_type,
version,
NULL as create_ts,
NULL AS name,
NULL AS auxiliary_id,
NULL AS auxiliary_id2,
Expand Down Expand Up @@ -642,6 +652,7 @@ struct AmalgamEventRow {
location2: Option<Location>,
tags: Option<serde_json::Value>,
version: i64,
create_ts: Option<i64>,
event_type: i64,
name: Option<String>,
auxiliary_id: Option<Uuid>,
Expand Down Expand Up @@ -676,6 +687,7 @@ impl TryFrom<AmalgamEventRow> for Event {
Ok(Event {
location,
version: value.version.try_into().context("integer conversion")?,
create_ts: value.create_ts.unwrap_or_default(),
forgotten: value.forgotten,
data: match event_type {
EventType::Activity => EventData::Activity(value.try_into()?),
Expand Down Expand Up @@ -919,6 +931,7 @@ fn build_history(
.map(move |i| Event {
location: root.join(Coordinate::simple(last_coord_head + i)),
version: 0,
create_ts: 0,
forgotten: false,
data: EventData::Empty,
})
Expand Down
1 change: 1 addition & 0 deletions packages/common/chirp-workflow/core/src/db/debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ pub struct HistoryData {
pub struct Event {
pub location: Location,
pub version: usize,
pub create_ts: i64,
pub forgotten: bool,
pub data: EventData,
}
Expand Down
52 changes: 32 additions & 20 deletions packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,7 @@ impl DatabaseDebug for DatabaseFdbSqliteNats {
NULL AS tags,
0 AS event_type,
version,
create_ts,
activity_name AS name,
NULL AS auxiliary_id,
NULL AS auxiliary_id2,
Expand All @@ -561,6 +562,7 @@ impl DatabaseDebug for DatabaseFdbSqliteNats {
NULL AS tags,
1 AS event_type,
version,
ack_ts AS create_ts,
signal_name AS name,
signal_id AS auxiliary_id,
NULL AS auxiliary_id2,
Expand All @@ -580,6 +582,7 @@ impl DatabaseDebug for DatabaseFdbSqliteNats {
json(tags) AS tags,
2 AS event_type,
version,
create_ts,
signal_name AS name,
signal_id AS auxiliary_id,
workflow_id AS auxiliary_id2,
Expand All @@ -599,6 +602,7 @@ impl DatabaseDebug for DatabaseFdbSqliteNats {
json(tags) AS tags,
3 AS event_type,
version,
create_ts,
message_name AS name,
NULL AS auxiliary_id,
NULL AS auxiliary_id2,
Expand All @@ -618,6 +622,7 @@ impl DatabaseDebug for DatabaseFdbSqliteNats {
COALESCE(json(tags), '{}') AS tags,
4 AS event_type,
version,
create_ts,
sub_workflow_name AS name,
sub_workflow_id AS auxiliary_id,
NULL AS auxiliary_id2,
Expand All @@ -637,6 +642,7 @@ impl DatabaseDebug for DatabaseFdbSqliteNats {
NULL AS tags,
5 AS event_type,
version,
create_ts,
NULL AS name,
NULL AS auxiliary_id,
NULL AS auxiliary_id2,
Expand All @@ -655,6 +661,7 @@ impl DatabaseDebug for DatabaseFdbSqliteNats {
NULL AS tags,
6 AS event_type,
version,
create_ts,
NULL AS name,
NULL AS auxiliary_id,
NULL AS auxiliary_id2,
Expand All @@ -673,6 +680,7 @@ impl DatabaseDebug for DatabaseFdbSqliteNats {
NULL AS tags,
7 AS event_type,
version,
create_ts,
NULL AS name,
NULL AS auxiliary_id,
NULL AS auxiliary_id2,
Expand All @@ -691,6 +699,7 @@ impl DatabaseDebug for DatabaseFdbSqliteNats {
NULL AS tags,
8 AS event_type,
1 AS version,
create_ts,
NULL AS name,
NULL AS auxiliary_id,
NULL AS auxiliary_id2,
Expand All @@ -709,6 +718,7 @@ impl DatabaseDebug for DatabaseFdbSqliteNats {
NULL AS tags,
9 AS event_type,
version,
create_ts,
NULL AS name,
NULL AS auxiliary_id,
NULL AS auxiliary_id2,
Expand Down Expand Up @@ -810,28 +820,28 @@ impl DatabaseDebug for DatabaseFdbSqliteNats {
.unpack::<JustUuid>(entry.key())
.map_err(|x| fdb::FdbBindingError::CustomError(x.into()))?;

if current_signal_id
.map(|x| signal_id != x)
.unwrap_or_default()
{
// Save if matches query
if matching_tags == tags.len()
&& name_matches && workflow_id_matches
&& state_matches
{
signal_ids.push(signal_id);

if signal_ids.len() >= 100 {
current_signal_id = None;
break;
if let Some(curr) = current_signal_id {
if signal_id != curr {
// Save if matches query
if matching_tags == tags.len()
&& name_matches && workflow_id_matches
&& state_matches
{
signal_ids.push(curr);

if signal_ids.len() >= 100 {
current_signal_id = None;
break;
}
}
}

// Reset state
matching_tags = 0;
name_matches = name.is_none();
workflow_id_matches = workflow_id.is_none();
state_matches = state.is_none() || state == Some(SignalState::Pending);
// Reset state
matching_tags = 0;
name_matches = name.is_none();
workflow_id_matches = workflow_id.is_none();
state_matches =
state.is_none() || state == Some(SignalState::Pending);
}
}

current_signal_id = Some(signal_id);
Expand Down Expand Up @@ -931,6 +941,7 @@ struct AmalgamEventRow {
location: Location,
tags: Option<serde_json::Value>,
version: i64,
create_ts: i64,
event_type: i64,
name: Option<String>,
auxiliary_id: Option<Uuid>,
Expand All @@ -955,6 +966,7 @@ impl TryFrom<AmalgamEventRow> for Event {
Ok(Event {
location: value.location.clone(),
version: value.version.try_into().context("integer conversion")?,
create_ts: value.create_ts,
forgotten: value.forgotten,
data: match event_type {
EventType::Activity => EventData::Activity(value.try_into()?),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -850,6 +850,8 @@ impl Database for DatabaseFdbSqliteNats {
let first_tag = tag_iter.next().transpose()?;
let rest_of_tags = tag_iter.collect::<WorkflowResult<Vec<_>>>()?;

let start_instant = Instant::now();

let workflow_id = self
.pools
.fdb()?
Expand Down Expand Up @@ -916,6 +918,11 @@ impl Database for DatabaseFdbSqliteNats {
})
.await?;

let dt = start_instant.elapsed().as_secs_f64();
metrics::FIND_WORKFLOW_DURATION
.with_label_values(&[workflow_name])
.observe(dt);

Ok(workflow_id)
}

Expand Down
7 changes: 7 additions & 0 deletions packages/common/chirp-workflow/core/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ lazy_static::lazy_static! {
&["worker_instance_id"],
*REGISTRY,
).unwrap();
pub static ref FIND_WORKFLOW_DURATION: HistogramVec = register_histogram_vec_with_registry!(
"chirp_find_workflows_duration",
"Duration to find a workflow with a given name and tags.",
&["workflow_name"],
BUCKETS.to_vec(),
*REGISTRY,
).unwrap();

pub static ref WORKFLOW_TOTAL: IntGaugeVec = register_int_gauge_vec_with_registry!(
"chirp_workflow_total",
Expand Down
6 changes: 5 additions & 1 deletion packages/common/server-cli/src/commands/wf/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ pub enum SubCommand {
/// Includes location numbers for events in graph.
#[clap(short = 'l', long)]
print_location: bool,
/// Includes create timestamps for events in graph.
#[clap(short = 't', long)]
print_ts: bool,
},
Signal {
#[clap(subcommand)]
Expand Down Expand Up @@ -100,11 +103,12 @@ impl SubCommand {
exclude_json,
include_forgotten,
print_location,
print_ts,
} => {
let history = db
.get_workflow_history(workflow_id, include_forgotten)
.await?;
util::wf::print_history(history, exclude_json, print_location).await
util::wf::print_history(history, exclude_json, print_location, print_ts).await
}
Self::Signal { command } => command.execute(db).await,
}
Expand Down
15 changes: 14 additions & 1 deletion packages/common/server-cli/src/util/wf/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ pub async fn print_history(
history: Option<HistoryData>,
exclude_json: bool,
print_location: bool,
print_ts: bool,
) -> Result<()> {
let Some(history) = history else {
rivet_term::status::success("No workflow found", "");
Expand Down Expand Up @@ -175,7 +176,19 @@ pub async fn print_history(

println!();

// TODO: Color code each (make header white instead of yellow)
if print_ts {
// Indent
print!("{}{c} ", " ".repeat(indent));

let datetime = Utc
.timestamp_millis_opt(event.create_ts)
.single()
.context("invalid ts")?;
let date = datetime.format("%Y-%m-%d %H:%M:%S");

println!("created {}", style(date).magenta());
}

match &event.data {
EventData::Activity(data) => {
if !exclude_json {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ pub fn install(initialize_immediately: bool) -> String {
.replace("__FDB_VERSION__", FDB_VERSION)
.replace(
"__PROMETHEUS_PROXY_SCRIPT__",
include_str!("../files/fdp_prometheus_proxy.py"),
include_str!("../files/fdb_prometheus_proxy.py"),
);

if initialize_immediately {
Expand Down
6 changes: 3 additions & 3 deletions packages/edge/api/actor/src/route/actors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,11 @@ pub async fn create(
ds::types::ServerResources::default_isolate()
}
};

tracing::info!(?tags, "creating server with tags");


let server_id = Uuid::new_v4();

tracing::info!(?server_id, ?tags, "creating server with tags");

let mut ready_sub = ctx
.subscribe::<ds::workflows::server::Ready>(("server_id", server_id))
.await?;
Expand Down
3 changes: 3 additions & 0 deletions packages/edge/infra/edge-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ use std::{
sync::Arc,
};




use anyhow::*;
use clap::Parser;
use rivet_edge_server::run_config;
Expand Down
4 changes: 2 additions & 2 deletions packages/edge/services/pegboard/src/workflows/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,10 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResul
Draining { .. } | Undrained => false,
};

state.ignore_future_state |= sig.ignore_future_state;

// Forward signal to owner
if !state.ignore_future_state {
state.ignore_future_state = sig.ignore_future_state;

match owner {
protocol::ActorOwner::DynamicServer { workflow_id, .. } => {
ctx.signal(sig).to_workflow_id(workflow_id).send().await?;
Expand Down
Loading

0 comments on commit ed680a5

Please sign in to comment.