Skip to content

Commit

Permalink
feat(wasi-observe): A WASI Observe host component
Browse files Browse the repository at this point in the history
Signed-off-by: Caleb Schoepp <[email protected]>
  • Loading branch information
calebschoepp committed Jan 31, 2025
1 parent 24b8f0a commit ce91f58
Show file tree
Hide file tree
Showing 52 changed files with 2,098 additions and 134 deletions.
196 changes: 161 additions & 35 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ openssl = { version = "0.10" }
anyhow = { workspace = true, features = ["backtrace"] }
conformance = { path = "tests/conformance-tests" }
conformance-tests = { workspace = true }
fake-opentelemetry-collector = "0.21.1"
hex = "0.4"
http-body-util = { workspace = true }
hyper = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions crates/factor-key-value/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ edition = { workspace = true }
anyhow = { workspace = true }
serde = { workspace = true }
spin-core = { path = "../core" }
spin-factor-observe = { path = "../factor-observe" }
spin-factors = { path = "../factors" }
spin-locked-app = { path = "../locked-app" }
spin-resource-table = { path = "../table" }
Expand Down
24 changes: 23 additions & 1 deletion crates/factor-key-value/src/host.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::{Cas, SwapError};
use anyhow::{Context, Result};
use spin_core::{async_trait, wasmtime::component::Resource};
use spin_factor_observe::ObserveContext;
use spin_resource_table::Table;
use spin_world::v2::key_value;
use spin_world::wasi::keyvalue as wasi_keyvalue;
Expand Down Expand Up @@ -48,23 +49,26 @@ pub struct KeyValueDispatch {
manager: Arc<dyn StoreManager>,
stores: Table<Arc<dyn Store>>,
compare_and_swaps: Table<Arc<dyn Cas>>,
observe_context: Option<ObserveContext>,
}

impl KeyValueDispatch {
pub fn new(allowed_stores: HashSet<String>, manager: Arc<dyn StoreManager>) -> Self {
Self::new_with_capacity(allowed_stores, manager, DEFAULT_STORE_TABLE_CAPACITY)
Self::new_with_capacity(allowed_stores, manager, DEFAULT_STORE_TABLE_CAPACITY, None)
}

pub fn new_with_capacity(
allowed_stores: HashSet<String>,
manager: Arc<dyn StoreManager>,
capacity: u32,
observe_context: Option<ObserveContext>,
) -> Self {
Self {
allowed_stores,
manager,
stores: Table::new(capacity),
compare_and_swaps: Table::new(capacity),
observe_context,
}
}

Expand Down Expand Up @@ -110,6 +114,9 @@ impl key_value::Host for KeyValueDispatch {}
impl key_value::HostStore for KeyValueDispatch {
#[instrument(name = "spin_key_value.open", skip(self), err(level = Level::INFO), fields(otel.kind = "client", kv.backend=self.manager.summary(&name).unwrap_or("unknown".to_string())))]
async fn open(&mut self, name: String) -> Result<Result<Resource<key_value::Store>, Error>> {
if let Some(observe_context) = self.observe_context.as_ref() {
observe_context.reparent_tracing_span()
}
Ok(async {
if self.allowed_stores.contains(&name) {
let store = self.manager.get(&name).await?;
Expand All @@ -132,6 +139,9 @@ impl key_value::HostStore for KeyValueDispatch {
store: Resource<key_value::Store>,
key: String,
) -> Result<Result<Option<Vec<u8>>, Error>> {
if let Some(observe_context) = self.observe_context.as_ref() {
observe_context.reparent_tracing_span()
}
let store = self.get_store(store)?;
Ok(store.get(&key).await)
}
Expand All @@ -143,6 +153,9 @@ impl key_value::HostStore for KeyValueDispatch {
key: String,
value: Vec<u8>,
) -> Result<Result<(), Error>> {
if let Some(observe_context) = self.observe_context.as_ref() {
observe_context.reparent_tracing_span()
}
let store = self.get_store(store)?;
Ok(store.set(&key, &value).await)
}
Expand All @@ -153,6 +166,9 @@ impl key_value::HostStore for KeyValueDispatch {
store: Resource<key_value::Store>,
key: String,
) -> Result<Result<(), Error>> {
if let Some(observe_context) = self.observe_context.as_ref() {
observe_context.reparent_tracing_span()
}
let store = self.get_store(store)?;
Ok(store.delete(&key).await)
}
Expand All @@ -163,6 +179,9 @@ impl key_value::HostStore for KeyValueDispatch {
store: Resource<key_value::Store>,
key: String,
) -> Result<Result<bool, Error>> {
if let Some(observe_context) = self.observe_context.as_ref() {
observe_context.reparent_tracing_span()
}
let store = self.get_store(store)?;
Ok(store.exists(&key).await)
}
Expand All @@ -172,6 +191,9 @@ impl key_value::HostStore for KeyValueDispatch {
&mut self,
store: Resource<key_value::Store>,
) -> Result<Result<Vec<String>, Error>> {
if let Some(observe_context) = self.observe_context.as_ref() {
observe_context.reparent_tracing_span()
}
let store = self.get_store(store)?;
Ok(store.get_keys().await)
}
Expand Down
8 changes: 7 additions & 1 deletion crates/factor-key-value/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::{
};

use anyhow::ensure;
use spin_factor_observe::ObserveContext;
use spin_factors::{
ConfigureAppContext, Factor, FactorInstanceBuilder, InitContext, PrepareContext, RuntimeFactors,
};
Expand Down Expand Up @@ -84,17 +85,19 @@ impl Factor for KeyValueFactor {

fn prepare<T: RuntimeFactors>(
&self,
ctx: PrepareContext<T, Self>,
mut ctx: PrepareContext<T, Self>,
) -> anyhow::Result<InstanceBuilder> {
let app_state = ctx.app_state();
let allowed_stores = app_state
.component_allowed_stores
.get(ctx.app_component().id())
.expect("component should be in component_stores")
.clone();
let observe_context = ObserveContext::from_prepare_context(&mut ctx)?;
Ok(InstanceBuilder {
store_manager: app_state.store_manager.clone(),
allowed_stores,
observe_context,
})
}
}
Expand Down Expand Up @@ -174,6 +177,7 @@ pub struct InstanceBuilder {
store_manager: Arc<AppStoreManager>,
/// The allowed stores for this component instance.
allowed_stores: HashSet<String>,
observe_context: ObserveContext,
}

impl FactorInstanceBuilder for InstanceBuilder {
Expand All @@ -183,11 +187,13 @@ impl FactorInstanceBuilder for InstanceBuilder {
let Self {
store_manager,
allowed_stores,
observe_context,
} = self;
Ok(KeyValueDispatch::new_with_capacity(
allowed_stores,
store_manager,
u32::MAX,
Some(observe_context),
))
}
}
1 change: 1 addition & 0 deletions crates/factor-llm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ llm-cublas = ["llm", "spin-llm-local/cublas"]
anyhow = { workspace = true }
async-trait = { workspace = true }
serde = { workspace = true }
spin-factor-observe = { path = "../factor-observe" }
spin-factors = { path = "../factors" }
spin-llm-local = { path = "../llm-local", optional = true }
spin-llm-remote-http = { path = "../llm-remote-http" }
Expand Down
4 changes: 4 additions & 0 deletions crates/factor-llm/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ impl v2::Host for InstanceState {
prompt: String,
params: Option<v2::InferencingParams>,
) -> Result<v2::InferencingResult, v2::Error> {
self.observe_context.reparent_tracing_span();

if !self.allowed_models.contains(&model) {
return Err(access_denied_error(&model));
}
Expand Down Expand Up @@ -42,6 +44,8 @@ impl v2::Host for InstanceState {
model: v1::EmbeddingModel,
data: Vec<String>,
) -> Result<v2::EmbeddingsResult, v2::Error> {
self.observe_context.reparent_tracing_span();

if !self.allowed_models.contains(&model) {
return Err(access_denied_error(&model));
}
Expand Down
6 changes: 5 additions & 1 deletion crates/factor-llm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::collections::{HashMap, HashSet};
use std::sync::Arc;

use async_trait::async_trait;
use spin_factor_observe::ObserveContext;
use spin_factors::{
ConfigureAppContext, Factor, PrepareContext, RuntimeFactors, SelfInstanceBuilder,
};
Expand Down Expand Up @@ -76,7 +77,7 @@ impl Factor for LlmFactor {

fn prepare<T: RuntimeFactors>(
&self,
ctx: PrepareContext<T, Self>,
mut ctx: PrepareContext<T, Self>,
) -> anyhow::Result<Self::InstanceBuilder> {
let allowed_models = ctx
.app_state()
Expand All @@ -85,10 +86,12 @@ impl Factor for LlmFactor {
.cloned()
.unwrap_or_default();
let engine = ctx.app_state().engine.clone();
let observe_context = ObserveContext::from_prepare_context(&mut ctx)?;

Ok(InstanceState {
engine,
allowed_models,
observe_context,
})
}
}
Expand All @@ -103,6 +106,7 @@ pub struct AppState {
pub struct InstanceState {
engine: Arc<Mutex<dyn LlmEngine>>,
pub allowed_models: Arc<HashSet<String>>,
observe_context: ObserveContext,
}

/// The runtime configuration for the LLM factor.
Expand Down
23 changes: 23 additions & 0 deletions crates/factor-observe/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[package]
name = "spin-factor-observe"
version = { workspace = true }
authors = { workspace = true }
edition = { workspace = true }

[dependencies]
anyhow = { workspace = true }
indexmap = "2.2.6"
opentelemetry = { workspace = true }
opentelemetry_sdk = { workspace = true }
spin-core = { path = "../core" }
spin-factors = { path = "../factors" }
spin-resource-table = { path = "../table" }
spin-world = { path = "../world" }
tracing = { workspace = true }
tracing-opentelemetry = { workspace = true }

[dev-dependencies]
toml = "0.5"

[lints]
workspace = true
Loading

0 comments on commit ce91f58

Please sign in to comment.