Skip to content

Commit

Permalink
fix: add metrics for wf engine
Browse files Browse the repository at this point in the history
  • Loading branch information
MasterPtato authored and NathanFlurry committed Feb 26, 2025
1 parent ed680a5 commit dba8d32
Show file tree
Hide file tree
Showing 69 changed files with 797 additions and 169 deletions.
4 changes: 3 additions & 1 deletion packages/common/chirp-workflow/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,6 @@ features = [

[dev-dependencies]
anyhow = "1.0.82"
rand = "0.8"
rand = "0.8"
statrs = "0.18"
dirs = "5.0.1"
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::fmt::Display;
use std::{fmt::Display, time::Instant};

use global_error::GlobalResult;
use serde::Serialize;
Expand Down Expand Up @@ -71,6 +71,8 @@ impl<M: Message> MessageBuilder<M> {

tracing::debug!(msg_name=%M::NAME, tags=?self.tags, "dispatching message");

let start_instant = Instant::now();

let tags = serde_json::Value::Object(self.tags);

if self.wait {
Expand All @@ -79,8 +81,12 @@ impl<M: Message> MessageBuilder<M> {
self.msg_ctx.message(tags, self.body).await?;
}

let dt = start_instant.elapsed().as_secs_f64();
metrics::MESSAGE_SEND_DURATION
.with_label_values(&["", M::NAME])
.observe(dt);
metrics::MESSAGE_PUBLISHED
.with_label_values(&[M::NAME])
.with_label_values(&["", M::NAME])
.inc();

Ok(())
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::fmt::Display;
use std::{fmt::Display, time::Instant};

use global_error::{GlobalError, GlobalResult};
use serde::Serialize;
Expand Down Expand Up @@ -88,6 +88,7 @@ impl<T: Signal + Serialize> SignalBuilder<T> {
}

let signal_id = Uuid::new_v4();
let start_instant = Instant::now();

// Serialize input
let input_val = serde_json::value::to_raw_value(&self.body)
Expand Down Expand Up @@ -162,8 +163,12 @@ impl<T: Signal + Serialize> SignalBuilder<T> {
}
}

let dt = start_instant.elapsed().as_secs_f64();
metrics::SIGNAL_SEND_DURATION
.with_label_values(&["", T::NAME])
.observe(dt);
metrics::SIGNAL_PUBLISHED
.with_label_values(&[T::NAME])
.with_label_values(&["", T::NAME])
.inc();

Ok(signal_id)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::fmt::Display;
use std::{fmt::Display, time::Instant};

use global_error::{GlobalError, GlobalResult};
use serde::Serialize;
Expand Down Expand Up @@ -86,6 +86,7 @@ where

let workflow_name = I::Workflow::NAME;
let workflow_id = Uuid::new_v4();
let start_instant = Instant::now();

let no_tags = self.tags.is_empty();
let tags = serde_json::Value::Object(self.tags);
Expand Down Expand Up @@ -145,8 +146,12 @@ where
}

if workflow_id == actual_workflow_id {
let dt = start_instant.elapsed().as_secs_f64();
metrics::WORKFLOW_DISPATCH_DURATION
.with_label_values(&["", workflow_name])
.observe(dt);
metrics::WORKFLOW_DISPATCHED
.with_label_values(&[workflow_name])
.with_label_values(&["", workflow_name])
.inc();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::fmt::Display;
use std::{fmt::Display, time::Instant};

use global_error::{GlobalError, GlobalResult};
use serde::Serialize;
Expand Down Expand Up @@ -97,6 +97,8 @@ impl<'a, M: Message> MessageBuilder<'a, M> {
else {
tracing::debug!(name=%self.ctx.name(), id=%self.ctx.workflow_id(), msg_name=%M::NAME, tags=?self.tags, "dispatching message");

let start_instant = Instant::now();

// Serialize body
let body_val = serde_json::value::to_raw_value(&self.body)
.map_err(WorkflowError::SerializeMessageBody)
Expand Down Expand Up @@ -131,8 +133,12 @@ impl<'a, M: Message> MessageBuilder<'a, M> {
msg.map_err(GlobalError::raw)?;
write.map_err(GlobalError::raw)?;

let dt = start_instant.elapsed().as_secs_f64();
metrics::MESSAGE_SEND_DURATION
.with_label_values(&[self.ctx.name(), M::NAME])
.observe(dt);
metrics::MESSAGE_PUBLISHED
.with_label_values(&[M::NAME])
.with_label_values(&[self.ctx.name(), M::NAME])
.inc();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::fmt::Display;
use std::{fmt::Display, time::Instant};

use global_error::{GlobalError, GlobalResult};
use serde::Serialize;
Expand Down Expand Up @@ -117,6 +117,7 @@ impl<'a, T: Signal + Serialize> SignalBuilder<'a, T> {
// Send signal
else {
let signal_id = Uuid::new_v4();
let start_instant = Instant::now();

// Serialize input
let input_val = serde_json::value::to_raw_value(&self.body)
Expand Down Expand Up @@ -234,8 +235,12 @@ impl<'a, T: Signal + Serialize> SignalBuilder<'a, T> {
}
}

let dt = start_instant.elapsed().as_secs_f64();
metrics::SIGNAL_SEND_DURATION
.with_label_values(&[self.ctx.name(), T::NAME])
.observe(dt);
metrics::SIGNAL_PUBLISHED
.with_label_values(&[T::NAME])
.with_label_values(&[self.ctx.name(), T::NAME])
.inc();

signal_id
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{fmt::Display, sync::Arc};
use std::{fmt::Display, sync::Arc, time::Instant};

use global_error::{GlobalError, GlobalResult};
use serde::Serialize;
Expand Down Expand Up @@ -136,6 +136,7 @@ where
else {
let sub_workflow_name = I::Workflow::NAME;
let sub_workflow_id = Uuid::new_v4();
let start_instant = Instant::now();

if unique {
tracing::debug!(
Expand Down Expand Up @@ -201,8 +202,12 @@ where
}

if sub_workflow_id == actual_sub_workflow_id {
let dt = start_instant.elapsed().as_secs_f64();
metrics::WORKFLOW_DISPATCH_DURATION
.with_label_values(&[ctx.name(), sub_workflow_name])
.observe(dt);
metrics::WORKFLOW_DISPATCHED
.with_label_values(&[sub_workflow_name])
.with_label_values(&[ctx.name(), sub_workflow_name])
.inc();
}

Expand Down
28 changes: 28 additions & 0 deletions packages/common/chirp-workflow/core/src/ctx/workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -802,6 +802,8 @@ impl WorkflowCtx {
.map_err(WorkflowError::SerializeLoopOutput)
.map_err(GlobalError::raw)?;

let start_instant = Instant::now();

// Insert event before loop is run so the history is consistent
self.db
.upsert_workflow_loop_event(
Expand All @@ -815,6 +817,11 @@ impl WorkflowCtx {
)
.await?;

let dt = start_instant.elapsed().as_secs_f64();
metrics::BRANCH_UPSERT_DURATION
.with_label_values(&[&self.name])
.observe(dt);

(0, state, None)
};

Expand All @@ -833,6 +840,8 @@ impl WorkflowCtx {
tracing::debug!(name=%self.name, id=%self.workflow_id, "running loop");

loop {
let start_instant = Instant::now();

// Create a new branch for each iteration of the loop at location {...loop location, iteration idx}
let mut iteration_branch = loop_branch.branch_inner(
self.input.clone(),
Expand Down Expand Up @@ -863,8 +872,10 @@ impl WorkflowCtx {
}

// Run loop
let start_instant2 = Instant::now();
match cb(&mut iteration_branch, &mut state).await? {
Loop::Continue => {
let dt2 = start_instant2.elapsed().as_secs_f64();
iteration += 1;

let state_val = serde_json::value::to_raw_value(&state)
Expand All @@ -888,8 +899,14 @@ impl WorkflowCtx {
.cursor
.check_clear()
.map_err(GlobalError::raw)?;

let dt = start_instant.elapsed().as_secs_f64();
metrics::BRANCH_UPSERT_DURATION
.with_label_values(&[&self.name])
.observe(dt - dt2);
}
Loop::Break(res) => {
let dt2 = start_instant2.elapsed().as_secs_f64();
iteration += 1;

let state_val = serde_json::value::to_raw_value(&state)
Expand Down Expand Up @@ -917,6 +934,11 @@ impl WorkflowCtx {
.check_clear()
.map_err(GlobalError::raw)?;

let dt = start_instant.elapsed().as_secs_f64();
metrics::BRANCH_UPSERT_DURATION
.with_label_values(&[&self.name])
.observe(dt - dt2);

break res;
}
}
Expand Down Expand Up @@ -1020,6 +1042,7 @@ impl WorkflowCtx {
.map_err(GlobalError::raw)?;
let history_res2 = history_res.equivalent();
let sleep_location = self.cursor.current_location_for(&history_res);
let start_instant = Instant::now();

// Slept before
let (deadline_ts, state) = if let HistoryResult::Event(sleep) = history_res {
Expand Down Expand Up @@ -1131,6 +1154,11 @@ impl WorkflowCtx {
})
.await;

let dt = start_instant.elapsed().as_secs_f64();
metrics::LISTEN_WITH_TIMEOUT_DURATION
.with_label_values(&[&self.name])
.observe(dt);

match res {
Ok(res) => Some(res.map_err(GlobalError::raw)?),
Err(_) => {
Expand Down
4 changes: 2 additions & 2 deletions packages/common/chirp-workflow/core/src/db/crdb_nats/debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ impl DatabaseDebug for DatabaseCrdbNats {
NULL AS tags,
1 AS event_type,
version,
NULL as create_ts,
ack_ts AS create_ts,
signal_name AS name,
signal_id::UUID AS auxiliary_id,
NULL AS auxiliary_id2,
Expand Down Expand Up @@ -282,7 +282,7 @@ impl DatabaseDebug for DatabaseCrdbNats {
tags,
3 AS event_type,
version,
NULL as create_ts,
NULL AS create_ts,
message_name AS name,
NULL AS auxiliary_id,
NULL AS auxiliary_id2,
Expand Down
24 changes: 17 additions & 7 deletions packages/common/chirp-workflow/core/src/db/crdb_nats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -640,10 +640,14 @@ impl Database for DatabaseCrdbNats {
})
.await?;

let worker_instance_id_str = worker_instance_id.to_string();
let dt = start_instant.elapsed().as_secs_f64();
metrics::PULL_WORKFLOWS_DURATION
.with_label_values(&[&worker_instance_id.to_string()])
metrics::LAST_PULL_WORKFLOWS_DURATION
.with_label_values(&[&worker_instance_id_str])
.set(dt);
metrics::PULL_WORKFLOWS_DURATION
.with_label_values(&[&worker_instance_id_str])
.observe(dt);

if workflow_rows.is_empty() {
return Ok(Vec::new());
Expand Down Expand Up @@ -895,15 +899,21 @@ impl Database for DatabaseCrdbNats {
.await?;

let workflows = build_histories(workflow_rows, events)?;

let dt = start_instant2.elapsed().as_secs_f64();
metrics::PULL_WORKFLOWS_HISTORY_DURATION
.with_label_values(&[&worker_instance_id.to_string()])
metrics::LAST_PULL_WORKFLOWS_HISTORY_DURATION
.with_label_values(&[&worker_instance_id_str])
.set(dt);
metrics::PULL_WORKFLOWS_HISTORY_DURATION
.with_label_values(&[&worker_instance_id_str])
.observe(dt);
let dt = start_instant.elapsed().as_secs_f64();
metrics::PULL_WORKFLOWS_FULL_DURATION
.with_label_values(&[&worker_instance_id.to_string()])
metrics::LAST_PULL_WORKFLOWS_FULL_DURATION
.with_label_values(&[&worker_instance_id_str])
.set(dt);
metrics::PULL_WORKFLOWS_FULL_DURATION
.with_label_values(&[&worker_instance_id_str])
.observe(dt);

Ok(workflows)
}
Expand Down
24 changes: 17 additions & 7 deletions packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1218,10 +1218,14 @@ impl Database for DatabaseFdbSqliteNats {
})
.await?;

let worker_instance_id_str = worker_instance_id.to_string();
let dt = start_instant.elapsed().as_secs_f64();
metrics::PULL_WORKFLOWS_DURATION
.with_label_values(&[&worker_instance_id.to_string()])
metrics::LAST_PULL_WORKFLOWS_DURATION
.with_label_values(&[&worker_instance_id_str])
.set(dt);
metrics::PULL_WORKFLOWS_DURATION
.with_label_values(&[&worker_instance_id_str])
.observe(dt);

if partial_workflows.is_empty() {
return Ok(Vec::new());
Expand Down Expand Up @@ -1453,15 +1457,21 @@ impl Database for DatabaseFdbSqliteNats {
.buffer_unordered(512)
.try_collect()
.await?;

let dt = start_instant2.elapsed().as_secs_f64();
metrics::PULL_WORKFLOWS_HISTORY_DURATION
.with_label_values(&[&worker_instance_id.to_string()])
metrics::LAST_PULL_WORKFLOWS_HISTORY_DURATION
.with_label_values(&[&worker_instance_id_str])
.set(dt);
metrics::PULL_WORKFLOWS_HISTORY_DURATION
.with_label_values(&[&worker_instance_id_str])
.observe(dt);
let dt = start_instant.elapsed().as_secs_f64();
metrics::PULL_WORKFLOWS_FULL_DURATION
.with_label_values(&[&worker_instance_id.to_string()])
metrics::LAST_PULL_WORKFLOWS_FULL_DURATION
.with_label_values(&[&worker_instance_id_str])
.set(dt);
metrics::PULL_WORKFLOWS_FULL_DURATION
.with_label_values(&[&worker_instance_id_str])
.observe(dt);

Ok(pulled_workflows)
}
Expand Down
Loading

0 comments on commit dba8d32

Please sign in to comment.