Skip to content

Commit

Permalink
Move kv and llm instrumentation up a level of abstraction
Browse files Browse the repository at this point in the history
Signed-off-by: Caleb Schoepp <caleb.schoepp@fermyon.com>
  • Loading branch information
calebschoepp committed Sep 5, 2024
1 parent 3e62d2e commit b3cb3bc
Show file tree
Hide file tree
Showing 14 changed files with 43 additions and 58 deletions.
3 changes: 0 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions crates/factor-key-value/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use spin_locked_app::MetadataKey;
use spin_world::v2::key_value;
use std::{collections::HashSet, sync::Arc};
use table::Table;
use tracing::{instrument, Level};

pub const KEY_VALUE_STORES_KEY: MetadataKey<Vec<String>> = MetadataKey::new("key_value_stores");

Expand Down Expand Up @@ -79,6 +80,7 @@ impl key_value::Host for KeyValueDispatch {}

#[async_trait]
impl key_value::HostStore for KeyValueDispatch {
#[instrument(name = "spin_key_value.open", skip(self), err(level = Level::INFO), fields(otel.kind = "client", kv.backend=self.manager.summary(&name).unwrap_or("unknown".to_string())))]
async fn open(&mut self, name: String) -> Result<Result<Resource<key_value::Store>, Error>> {
Ok(async {
if self.allowed_stores.contains(&name) {
Expand All @@ -94,6 +96,7 @@ impl key_value::HostStore for KeyValueDispatch {
.await)
}

#[instrument(name = "spin_key_value.get", skip(self, store), err(level = Level::INFO), fields(otel.kind = "client"))]
async fn get(
&mut self,
store: Resource<key_value::Store>,
Expand All @@ -103,6 +106,7 @@ impl key_value::HostStore for KeyValueDispatch {
Ok(store.get(&key).await)
}

#[instrument(name = "spin_key_value.set", skip(self, store, value), err(level = Level::INFO), fields(otel.kind = "client"))]
async fn set(
&mut self,
store: Resource<key_value::Store>,
Expand All @@ -113,6 +117,7 @@ impl key_value::HostStore for KeyValueDispatch {
Ok(store.set(&key, &value).await)
}

#[instrument(name = "spin_key_value.delete", skip(self, store), err(level = Level::INFO), fields(otel.kind = "client"))]
async fn delete(
&mut self,
store: Resource<key_value::Store>,
Expand All @@ -122,6 +127,7 @@ impl key_value::HostStore for KeyValueDispatch {
Ok(store.delete(&key).await)
}

#[instrument(name = "spin_key_value.exists", skip(self, store), err(level = Level::INFO), fields(otel.kind = "client"))]
async fn exists(
&mut self,
store: Resource<key_value::Store>,
Expand All @@ -131,6 +137,7 @@ impl key_value::HostStore for KeyValueDispatch {
Ok(store.exists(&key).await)
}

#[instrument(name = "spin_key_value.get_keys", skip(self, store), err(level = Level::INFO), fields(otel.kind = "client"))]
async fn get_keys(
&mut self,
store: Resource<key_value::Store>,
Expand Down
20 changes: 13 additions & 7 deletions crates/factor-llm/src/host.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
use async_trait::async_trait;
use spin_world::v1::llm::{self as v1};
use spin_world::v2::llm::{self as v2};
use tracing::field::Empty;
use tracing::{instrument, Level};

use crate::InstanceState;

#[async_trait]
impl v2::Host for InstanceState {
#[instrument(name = "spin_llm.infer", skip(self, prompt), err(level = Level::INFO), fields(otel.kind = "client", llm.backend = Empty))]
async fn infer(
&mut self,
model: v2::InferencingModel,
Expand All @@ -15,9 +18,9 @@ impl v2::Host for InstanceState {
if !self.allowed_models.contains(&model) {
return Err(access_denied_error(&model));
}
self.engine
.lock()
.await
let mut engine = self.engine.lock().await;
tracing::Span::current().record("llm.backend", engine.summary());
engine
.infer(
model,
prompt,
Expand All @@ -33,15 +36,18 @@ impl v2::Host for InstanceState {
.await
}

#[instrument(name = "spin_llm.generate_embeddings", skip(self, data), err(level = Level::INFO), fields(otel.kind = "client", llm.backend = Empty))]
async fn generate_embeddings(
&mut self,
m: v1::EmbeddingModel,
model: v1::EmbeddingModel,
data: Vec<String>,
) -> Result<v2::EmbeddingsResult, v2::Error> {
if !self.allowed_models.contains(&m) {
return Err(access_denied_error(&m));
if !self.allowed_models.contains(&model) {
return Err(access_denied_error(&model));
}
self.engine.lock().await.generate_embeddings(m, data).await
let mut engine = self.engine.lock().await;
tracing::Span::current().record("llm.backend", engine.summary());
engine.generate_embeddings(model, data).await
}

fn convert_error(&mut self, error: v2::Error) -> anyhow::Result<v2::Error> {
Expand Down
7 changes: 7 additions & 0 deletions crates/factor-llm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,13 @@ pub trait LlmEngine: Send + Sync {
model: v2::EmbeddingModel,
data: Vec<String>,
) -> Result<v2::EmbeddingsResult, v2::Error>;

/// A human-readable summary of the given engine's configuration
///
/// Example: "local model"
fn summary(&self) -> Option<String> {
None
}
}

/// A creator for an LLM engine.
Expand Down
12 changes: 12 additions & 0 deletions crates/factor-llm/src/spin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ mod local {
) -> Result<v2::EmbeddingsResult, v2::Error> {
self.generate_embeddings(model, data).await
}

fn summary(&self) -> Option<String> {
Some("local model".to_string())
}
}
}

Expand Down Expand Up @@ -78,6 +82,10 @@ impl LlmEngine for RemoteHttpLlmEngine {
) -> Result<v2::EmbeddingsResult, v2::Error> {
self.generate_embeddings(model, data).await
}

fn summary(&self) -> Option<String> {
Some(format!("model at {}", self.url()))
}
}

pub fn runtime_config_from_toml(
Expand Down Expand Up @@ -161,5 +169,9 @@ mod noop {
"Local LLM operations are not supported in this version of Spin.".into(),
))
}

fn summary(&self) -> Option<String> {
Some("noop model".to_owned())
}
}
}
1 change: 0 additions & 1 deletion crates/key-value-azure/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ serde = { version = "1.0", features = ["derive", "rc"] }
spin-core = { path = "../core" }
spin-factor-key-value = { path = "../factor-key-value" }
tokio = "1"
tracing = { workspace = true }
url = "2"

[lints]
Expand Down
22 changes: 0 additions & 22 deletions crates/key-value-azure/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use futures::StreamExt;
use serde::{Deserialize, Serialize};
use spin_core::async_trait;
use spin_factor_key_value::{log_error, Error, Store, StoreManager};
use tracing::{instrument, Level};

pub struct KeyValueAzureCosmos {
client: CollectionClient,
Expand Down Expand Up @@ -119,20 +118,11 @@ struct AzureCosmosStore {

#[async_trait]
impl Store for AzureCosmosStore {
#[instrument(name = "spin_key_value_azure.get", skip(self), err(level = Level::INFO), fields(
otel.kind = "client"
))]
async fn get(&self, key: &str) -> Result<Option<Vec<u8>>, Error> {
let pair = self.get_pair(key).await?;
Ok(pair.map(|p| p.value))
}

#[instrument(
name = "spin_key_value_azure.set",
skip(self, value),
err(level = Level::INFO),
fields(otel.kind = "client")
)]
async fn set(&self, key: &str, value: &[u8]) -> Result<(), Error> {
let pair = Pair {
id: key.to_string(),
Expand All @@ -146,9 +136,6 @@ impl Store for AzureCosmosStore {
Ok(())
}

#[instrument(name = "spin_key_value_azure.delete", skip(self), err(level = Level::INFO), fields(
otel.kind = "client"
))]
async fn delete(&self, key: &str) -> Result<(), Error> {
if self.exists(key).await? {
let document_client = self.client.document_client(key, &key).map_err(log_error)?;
Expand All @@ -157,19 +144,10 @@ impl Store for AzureCosmosStore {
Ok(())
}

#[instrument(name = "spin_key_value_azure.exists", skip(self), err(level = Level::INFO), fields(
otel.kind = "client"
))]
async fn exists(&self, key: &str) -> Result<bool, Error> {
Ok(self.get_pair(key).await?.is_some())
}

#[instrument(
name = "spin_key_value_azure.get_keys",
skip(self),
err(level = Level::INFO),
fields(otel.kind = "client")
)]
async fn get_keys(&self) -> Result<Vec<String>, Error> {
self.get_keys().await
}
Expand Down
1 change: 0 additions & 1 deletion crates/key-value-redis/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ spin-core = { path = "../core" }
spin-factor-key-value = { path = "../factor-key-value" }
spin-world = { path = "../world" }
tokio = "1"
tracing = { workspace = true }
url = "2"

[lints]
Expand Down
7 changes: 0 additions & 7 deletions crates/key-value-redis/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use spin_core::async_trait;
use spin_factor_key_value::{log_error, Error, Store, StoreManager};
use std::sync::Arc;
use tokio::sync::{Mutex, OnceCell};
use tracing::{instrument, Level};
use url::Url;

pub struct KeyValueRedis {
Expand All @@ -25,7 +24,6 @@ impl KeyValueRedis {

#[async_trait]
impl StoreManager for KeyValueRedis {
#[instrument(name = "spin_key_value_redis.get_store", skip(self), err(level = Level::INFO), fields(otel.kind = "client"))]
async fn get(&self, _name: &str) -> Result<Arc<dyn Store>, Error> {
let connection = self
.connection
Expand Down Expand Up @@ -60,13 +58,11 @@ struct RedisStore {

#[async_trait]
impl Store for RedisStore {
#[instrument(name = "spin_key_value_redis.get", skip(self), err(level = Level::INFO), fields(otel.kind = "client"))]
async fn get(&self, key: &str) -> Result<Option<Vec<u8>>, Error> {
let mut conn = self.connection.lock().await;
conn.get(key).await.map_err(log_error)
}

#[instrument(name = "spin_key_value_redis.set", skip(self, value), err(level = Level::INFO), fields(otel.kind = "client"))]
async fn set(&self, key: &str, value: &[u8]) -> Result<(), Error> {
self.connection
.lock()
Expand All @@ -76,7 +72,6 @@ impl Store for RedisStore {
.map_err(log_error)
}

#[instrument(name = "spin_key_value_redis.delete", skip(self), err(level = Level::INFO), fields(otel.kind = "client"))]
async fn delete(&self, key: &str) -> Result<(), Error> {
self.connection
.lock()
Expand All @@ -86,7 +81,6 @@ impl Store for RedisStore {
.map_err(log_error)
}

#[instrument(name = "spin_key_value_redis.exists", skip(self), err(level = Level::INFO), fields(otel.kind = "client"))]
async fn exists(&self, key: &str) -> Result<bool, Error> {
self.connection
.lock()
Expand All @@ -96,7 +90,6 @@ impl Store for RedisStore {
.map_err(log_error)
}

#[instrument(name = "spin_key_value_redis.get_keys", skip(self), err(level = Level::INFO), fields(otel.kind = "client"))]
async fn get_keys(&self) -> Result<Vec<String>, Error> {
self.connection
.lock()
Expand Down
1 change: 0 additions & 1 deletion crates/key-value-spin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ spin-core = { path = "../core" }
spin-factor-key-value = { path = "../factor-key-value" }
spin-world = { path = "../world" }
tokio = { version = "1", features = ["rt-multi-thread"] }
tracing = { workspace = true }

[lints]
workspace = true
7 changes: 0 additions & 7 deletions crates/key-value-spin/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use std::{
sync::{Arc, Mutex},
};
use tokio::task;
use tracing::{instrument, Level};

#[derive(Clone, Debug)]
pub enum DatabaseLocation {
Expand Down Expand Up @@ -37,7 +36,6 @@ impl KeyValueSqlite {

#[async_trait]
impl StoreManager for KeyValueSqlite {
#[instrument(name = "spin_key_value_sqlite.get_store", skip(self), err(level = Level::INFO), fields(otel.kind = "client"))]
async fn get(&self, name: &str) -> Result<Arc<dyn Store>, Error> {
let connection = task::block_in_place(|| {
self.connection.get_or_try_init(|| {
Expand Down Expand Up @@ -89,7 +87,6 @@ struct SqliteStore {

#[async_trait]
impl Store for SqliteStore {
#[instrument(name = "spin_key_value_sqlite.get", skip(self), err(level = Level::INFO), fields(otel.kind = "client"))]
async fn get(&self, key: &str) -> Result<Option<Vec<u8>>, Error> {
task::block_in_place(|| {
self.connection
Expand All @@ -105,7 +102,6 @@ impl Store for SqliteStore {
})
}

#[instrument(name = "spin_key_value_sqlite.set", skip(self, value), err(level = Level::INFO), fields(otel.kind = "client"))]
async fn set(&self, key: &str, value: &[u8]) -> Result<(), Error> {
task::block_in_place(|| {
self.connection
Expand All @@ -122,7 +118,6 @@ impl Store for SqliteStore {
})
}

#[instrument(name = "spin_key_value_sqlite.delete", skip(self), err(level = Level::INFO), fields(otel.kind = "client"))]
async fn delete(&self, key: &str) -> Result<(), Error> {
task::block_in_place(|| {
self.connection
Expand All @@ -136,12 +131,10 @@ impl Store for SqliteStore {
})
}

#[instrument(name = "spin_key_value_sqlite.exists", skip(self), err(level = Level::INFO), fields(otel.kind = "client"))]
async fn exists(&self, key: &str) -> Result<bool, Error> {
Ok(self.get(key).await?.is_some())
}

#[instrument(name = "spin_key_value_sqlite.get_keys", skip(self), err(level = Level::INFO), fields(otel.kind = "client"))]
async fn get_keys(&self) -> Result<Vec<String>, Error> {
task::block_in_place(|| {
self.connection
Expand Down
3 changes: 0 additions & 3 deletions crates/llm-local/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use std::{
sync::{Arc, Mutex},
};
use tokenizers::PaddingParams;
use tracing::{instrument, Level};

const MODEL_ALL_MINILM_L6_V2: &str = "all-minilm-l6-v2";

Expand All @@ -32,7 +31,6 @@ pub struct LocalLlmEngine {
}

impl LocalLlmEngine {
#[instrument(name = "spin_llm_local.infer", skip(self, prompt), err(level = Level::INFO))]
pub async fn infer(
&mut self,
model: wasi_llm::InferencingModel,
Expand Down Expand Up @@ -92,7 +90,6 @@ impl LocalLlmEngine {
Ok(response)
}

#[instrument(name = "spin_llm_local.generate_embeddings", skip(self, data), err(level = Level::INFO))]
pub async fn generate_embeddings(
&mut self,
model: wasi_llm::EmbeddingModel,
Expand Down
Loading

0 comments on commit b3cb3bc

Please sign in to comment.