Skip to content

Commit 56b03e0

Browse files
authored
Merge pull request #103 from chrivers/chrivers/svc-crate
Implement a new crate ("svc") to manage rust-based micro-services. Bifrost consists of a number of "things" running independently, while performing work either on request, on a timer, or some combination of the two. Synchronizing, coordinating and sharing information between all these various parts of Bifrost, takes up a non-trivial amount of the codebase. That's why we are introducing a dedicates library (a "crate", in rust terminology) for managing services that implement a standardized interface, specifically, the `Service` trait. This makes it possible to share start/run/stop handling for all services, as well as more advanced features like policies ("if a service fails, what should happen?"), error handling, dependencies, and liveness checks. This PR introduces and builds up the "svc" crate, but does not change Bifrost to use it, yet.
2 parents cfb9ec2 + 5c7fae0 commit 56b03e0

13 files changed

+1401
-0
lines changed

crates/svc/Cargo.toml

+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
[package]
2+
name = "svc"
3+
version = "0.1.0"
4+
edition.workspace = true
5+
authors.workspace = true
6+
rust-version.workspace = true
7+
description.workspace = true
8+
readme.workspace = true
9+
repository.workspace = true
10+
license.workspace = true
11+
categories.workspace = true
12+
keywords.workspace = true
13+
14+
[dependencies]
15+
async-trait = "0.1.86"
16+
futures = { version = "0.3.31", default-features = false, features = ["alloc"] }
17+
log = "0.4.26"
18+
serde = { version = "1.0.218", features = ["derive"] }
19+
thiserror = "2.0.11"
20+
tokio = { version = "1.43.0", features = ["io-util", "macros", "process", "rt", "rt-multi-thread", "sync", "time", "tokio-macros"] }
21+
uuid = { version = "1.14.0", features = ["v4"] }
22+
23+
[lints]
24+
workspace = true
25+
26+
[dev-dependencies]
27+
pretty_env_logger = "0.5.0"

crates/svc/examples/from_async_fn.rs

+50
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
use std::time::Duration;
2+
3+
use thiserror::Error;
4+
use tokio::time::sleep;
5+
6+
use svc::error::SvcResult;
7+
use svc::manager::ServiceManager;
8+
9+
#[derive(Error, Debug)]
10+
pub enum SimpleError {
11+
#[error("That didn't work")]
12+
Nope,
13+
}
14+
15+
async fn run() -> Result<(), SimpleError> {
16+
let dur = Duration::from_millis(800);
17+
18+
println!("Hello");
19+
20+
println!("1");
21+
sleep(dur).await;
22+
println!("2");
23+
sleep(dur).await;
24+
println!("3");
25+
26+
Ok(())
27+
}
28+
29+
#[tokio::main]
30+
async fn main() -> SvcResult<()> {
31+
pretty_env_logger::formatted_builder()
32+
.filter_level(log::LevelFilter::Debug)
33+
.parse_default_env()
34+
.init();
35+
36+
let (mut client, future) = ServiceManager::spawn();
37+
38+
client.register_function("foo", run()).await?;
39+
client.start("foo").await?;
40+
println!("main: service configured");
41+
42+
client.wait_for_start("foo").await?;
43+
println!("main: service started");
44+
45+
client.shutdown().await?;
46+
future.await??;
47+
println!("main: service stopped");
48+
49+
Ok(())
50+
}

crates/svc/examples/policy.rs

+70
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
use std::time::Duration;
2+
3+
use async_trait::async_trait;
4+
use svc::policy::{Policy, Retry};
5+
use svc::runservice::StandardService;
6+
use thiserror::Error;
7+
8+
use svc::error::SvcResult;
9+
use svc::manager::ServiceManager;
10+
use svc::traits::{Service, ServiceState};
11+
12+
#[derive(Clone)]
13+
struct PolicyService {
14+
counter: u32,
15+
}
16+
17+
#[derive(Error, Debug)]
18+
pub enum Error {
19+
#[error("Not done yet")]
20+
MoreToDo,
21+
}
22+
23+
#[async_trait]
24+
impl Service for PolicyService {
25+
type Error = Error;
26+
27+
async fn run(&mut self) -> Result<(), Error> {
28+
println!("Hello {}", self.counter);
29+
self.counter += 1;
30+
31+
// returning an Err will invoke the policy for the Running state
32+
Err(Error::MoreToDo)
33+
}
34+
}
35+
36+
#[tokio::main]
37+
async fn main() -> SvcResult<()> {
38+
const NAME: &str = "policy-service";
39+
40+
pretty_env_logger::formatted_builder()
41+
.filter_level(log::LevelFilter::Debug)
42+
.parse_default_env()
43+
.init();
44+
45+
let (mut client, future) = ServiceManager::spawn();
46+
47+
let svc = PolicyService { counter: 0 };
48+
49+
// Manually construct a ServiceRunner, and set a specific policy for
50+
// handling errors during .run()
51+
let svcr = StandardService::new(NAME, svc).with_run_policy(
52+
// Try up to 5 times, waiting 300ms between each attempt
53+
Policy::new()
54+
.with_retry(Retry::Limit(5))
55+
.with_delay(Duration::from_millis(300)),
56+
);
57+
58+
let uuid = client.register(NAME, svcr).await?;
59+
client.start(uuid).await?;
60+
println!("main: service will attempt to run 5 times");
61+
62+
client.wait_for_start(uuid).await?;
63+
println!("main: service started");
64+
65+
client.wait_for_state(uuid, ServiceState::Failed).await?;
66+
client.shutdown().await?;
67+
future.await??;
68+
69+
Ok(())
70+
}

crates/svc/examples/restart.rs

+56
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
use std::time::Duration;
2+
3+
use tokio::time::sleep;
4+
5+
use svc::error::{RunSvcError, SvcResult};
6+
use svc::manager::ServiceManager;
7+
8+
async fn run() -> Result<(), RunSvcError> {
9+
let dur = Duration::from_millis(200);
10+
11+
println!("Hello");
12+
13+
let mut counter = 0;
14+
15+
loop {
16+
println!("{counter}");
17+
sleep(dur).await;
18+
counter += 1;
19+
}
20+
}
21+
22+
#[tokio::main]
23+
async fn main() -> SvcResult<()> {
24+
pretty_env_logger::formatted_builder()
25+
.filter_level(log::LevelFilter::Debug)
26+
.parse_default_env()
27+
.init();
28+
29+
let (mut client, future) = ServiceManager::spawn();
30+
31+
client.register_function("foo", run()).await?;
32+
33+
client.start("foo").await?;
34+
println!("main: service configured");
35+
36+
client.wait_for_start("foo").await?;
37+
println!("main: service started");
38+
39+
sleep(Duration::from_millis(1000)).await;
40+
41+
client.stop("foo").await?;
42+
client.wait_for_stop("foo").await?;
43+
44+
println!("main: service stopped");
45+
46+
client.start("foo").await?;
47+
client.wait_for_start("foo").await?;
48+
println!("main: service started");
49+
50+
sleep(Duration::from_millis(1000)).await;
51+
client.shutdown().await?;
52+
53+
future.await??;
54+
55+
Ok(())
56+
}

crates/svc/examples/simple.rs

+86
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
use std::time::Duration;
2+
3+
use async_trait::async_trait;
4+
use thiserror::Error;
5+
use tokio::time::sleep;
6+
7+
use svc::error::SvcResult;
8+
use svc::manager::ServiceManager;
9+
use svc::traits::Service;
10+
11+
#[derive(Clone)]
12+
struct Simple {
13+
name: String,
14+
counter: u32,
15+
}
16+
17+
#[derive(Error, Debug)]
18+
pub enum SimpleError {
19+
#[error("That didn't work..")]
20+
Nope,
21+
}
22+
23+
#[async_trait]
24+
impl Service for Simple {
25+
type Error = SimpleError;
26+
27+
async fn run(&mut self) -> Result<(), SimpleError> {
28+
let dur = Duration::from_millis(300);
29+
30+
println!("Hello from {}", self.name);
31+
32+
println!("1");
33+
sleep(dur).await;
34+
println!("2");
35+
sleep(dur).await;
36+
println!("3");
37+
38+
println!("Done running. Now going to stop (this will fail the first time)");
39+
Ok(())
40+
}
41+
42+
async fn stop(&mut self) -> Result<(), SimpleError> {
43+
self.counter += 1;
44+
45+
// pretend this service doesn't succeed at stopping right away
46+
if self.counter > 1 {
47+
Ok(())
48+
} else {
49+
Err(SimpleError::Nope)
50+
}
51+
}
52+
}
53+
54+
#[tokio::main]
55+
async fn main() -> SvcResult<()> {
56+
pretty_env_logger::formatted_builder()
57+
.filter_level(log::LevelFilter::Debug)
58+
.parse_default_env()
59+
.init();
60+
61+
let (mut client, future) = ServiceManager::spawn();
62+
63+
let svc = Simple {
64+
name: "Simple Service".to_string(),
65+
counter: 0,
66+
};
67+
68+
client.register_service("foo", svc).await?;
69+
client.start("foo").await?;
70+
71+
println!("main: service configured");
72+
73+
client.wait_for_start("foo").await?;
74+
75+
println!("main: service started");
76+
77+
client.wait_for_stop("foo").await?;
78+
79+
println!("main: service stopped");
80+
81+
client.shutdown().await?;
82+
83+
future.await??;
84+
85+
Ok(())
86+
}

crates/svc/src/error.rs

+75
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
use std::error::Error;
2+
3+
use thiserror::Error;
4+
5+
use crate::manager::{ServiceEvent, SvmRequest};
6+
use crate::serviceid::ServiceId;
7+
use crate::traits::ServiceState;
8+
9+
#[derive(Error, Debug)]
10+
pub enum SvcError {
11+
/* mapped errors */
12+
#[error(transparent)]
13+
FromUtf8Error(#[from] std::string::FromUtf8Error),
14+
15+
#[error(transparent)]
16+
IOError(#[from] std::io::Error),
17+
18+
#[error(transparent)]
19+
TryFromIntError(#[from] std::num::TryFromIntError),
20+
21+
#[error(transparent)]
22+
UuidError(#[from] uuid::Error),
23+
24+
#[error(transparent)]
25+
JoinError(#[from] tokio::task::JoinError),
26+
27+
#[error(transparent)]
28+
MpscSendError(#[from] tokio::sync::mpsc::error::SendError<SvmRequest>),
29+
30+
#[error(transparent)]
31+
MpscSendEventError(#[from] tokio::sync::mpsc::error::SendError<ServiceEvent>),
32+
33+
#[error(transparent)]
34+
WatchSendError(#[from] tokio::sync::watch::error::SendError<ServiceState>),
35+
36+
#[error(transparent)]
37+
WatchRecvError(#[from] tokio::sync::watch::error::RecvError),
38+
39+
#[error(transparent)]
40+
OneshotRecvError(#[from] tokio::sync::oneshot::error::RecvError),
41+
42+
#[error("Service {0:?} not registered")]
43+
ServiceNotFound(ServiceId),
44+
45+
#[error("Service {0} already exists")]
46+
ServiceAlreadyExists(String),
47+
48+
#[error("All services stopped")]
49+
Shutdown,
50+
51+
#[error("Service has failed")]
52+
ServiceFailed,
53+
}
54+
55+
#[derive(Error, Debug)]
56+
pub enum RunSvcError {
57+
/* mapped errors */
58+
#[error(transparent)]
59+
MpscSendError(#[from] tokio::sync::mpsc::error::SendError<SvmRequest>),
60+
61+
#[error(transparent)]
62+
WatchSendError(#[from] tokio::sync::watch::error::SendError<ServiceState>),
63+
64+
#[error(transparent)]
65+
MpscSendEventError(#[from] tokio::sync::mpsc::error::SendError<ServiceEvent>),
66+
67+
#[error(transparent)]
68+
WatchRecvError(#[from] tokio::sync::watch::error::RecvError),
69+
70+
/* errors from run service */
71+
#[error(transparent)]
72+
ServiceError(Box<dyn Error + Send>),
73+
}
74+
75+
pub type SvcResult<T> = Result<T, SvcError>;

crates/svc/src/lib.rs

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
pub mod error;
2+
pub mod manager;
3+
pub mod policy;
4+
pub mod rpc;
5+
pub mod runservice;
6+
pub mod serviceid;
7+
pub mod traits;

0 commit comments

Comments
 (0)