-
Notifications
You must be signed in to change notification settings - Fork 0
feat(flags): Basic flags service #31
Changes from 6 commits
aad0f2c
b5591f5
b3308d3
27e69e9
68fe2bb
877e4df
b5c4ebf
777788f
15a43e2
1c3481f
3f3241e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
[package] | ||
name = "feature-flags" | ||
version = "0.1.0" | ||
edition = "2021" | ||
|
||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html | ||
|
||
[dependencies] | ||
anyhow = { workspace = true } | ||
async-trait = { workspace = true } | ||
axum = { workspace = true } | ||
axum-client-ip = { workspace = true } | ||
envconfig = { workspace = true } | ||
tokio = { workspace = true } | ||
tracing = { workspace = true } | ||
tracing-subscriber = { workspace = true, features = ["env-filter"] } | ||
bytes = { workspace = true } | ||
rand = { workspace = true } | ||
redis = { version = "0.23.3", features = [ | ||
"tokio-comp", | ||
"cluster", | ||
"cluster-async", | ||
] } | ||
serde = { workspace = true } | ||
serde_json = { workspace = true } | ||
thiserror = { workspace = true } | ||
|
||
[lints] | ||
workspace = true | ||
|
||
[dev-dependencies] | ||
assert-json-diff = { workspace = true } | ||
once_cell = "1.18.0" | ||
reqwest = { workspace = true } | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
use std::collections::HashMap; | ||
|
||
use axum::http::StatusCode; | ||
use axum::response::{IntoResponse, Response}; | ||
use serde::{Deserialize, Serialize}; | ||
use thiserror::Error; | ||
|
||
#[derive(Debug, PartialEq, Eq, Deserialize, Serialize)] | ||
pub enum FlagsResponseCode { | ||
Ok = 1, | ||
} | ||
|
||
#[derive(Debug, PartialEq, Eq, Deserialize, Serialize)] | ||
#[serde(rename_all = "camelCase")] | ||
pub struct FlagsResponse { | ||
pub error_while_computing_flags: bool, | ||
// TODO: better typing here, support bool responses | ||
pub feature_flags: HashMap<String, String>, | ||
} | ||
|
||
#[derive(Error, Debug)] | ||
pub enum FlagError { | ||
#[error("failed to decode request: {0}")] | ||
RequestDecodingError(String), | ||
#[error("failed to parse request: {0}")] | ||
RequestParsingError(#[from] serde_json::Error), | ||
|
||
#[error("Empty distinct_id in request")] | ||
EmptyDistinctId, | ||
#[error("No distinct_id in request")] | ||
MissingDistinctId, | ||
|
||
#[error("No api_key in request")] | ||
NoTokenError, | ||
#[error("API key is not valid")] | ||
TokenValidationError, | ||
|
||
#[error("rate limited")] | ||
RateLimited, | ||
} | ||
|
||
impl IntoResponse for FlagError { | ||
fn into_response(self) -> Response { | ||
match self { | ||
FlagError::RequestDecodingError(_) | ||
| FlagError::RequestParsingError(_) | ||
| FlagError::EmptyDistinctId | ||
| FlagError::MissingDistinctId => (StatusCode::BAD_REQUEST, self.to_string()), | ||
|
||
FlagError::NoTokenError | FlagError::TokenValidationError => { | ||
(StatusCode::UNAUTHORIZED, self.to_string()) | ||
} | ||
|
||
FlagError::RateLimited => (StatusCode::TOO_MANY_REQUESTS, self.to_string()), | ||
} | ||
.into_response() | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
use std::net::SocketAddr; | ||
|
||
use envconfig::Envconfig; | ||
|
||
#[derive(Envconfig, Clone)] | ||
pub struct Config { | ||
#[envconfig(default = "127.0.0.1:0")] | ||
pub address: SocketAddr, | ||
|
||
#[envconfig(default = "postgres://posthog:posthog@localhost:15432/test_database")] | ||
pub write_database_url: String, | ||
|
||
#[envconfig(default = "postgres://posthog:posthog@localhost:15432/test_database")] | ||
pub read_database_url: String, | ||
|
||
#[envconfig(default = "1024")] | ||
pub max_concurrent_jobs: usize, | ||
|
||
#[envconfig(default = "100")] | ||
pub max_pg_connections: u32, | ||
|
||
#[envconfig(default = "redis://localhost:6379/")] | ||
pub redis_url: String, | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
pub mod api; | ||
pub mod config; | ||
pub mod redis; | ||
pub mod router; | ||
pub mod server; | ||
pub mod v0_endpoint; | ||
pub mod v0_request; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
use envconfig::Envconfig; | ||
use tokio::signal; | ||
use tracing_subscriber::layer::SubscriberExt; | ||
use tracing_subscriber::util::SubscriberInitExt; | ||
use tracing_subscriber::{EnvFilter, Layer}; | ||
|
||
use feature_flags::config::Config; | ||
use feature_flags::server::serve; | ||
|
||
async fn shutdown() { | ||
let mut term = signal::unix::signal(signal::unix::SignalKind::terminate()) | ||
.expect("failed to register SIGTERM handler"); | ||
|
||
let mut interrupt = signal::unix::signal(signal::unix::SignalKind::interrupt()) | ||
.expect("failed to register SIGINT handler"); | ||
|
||
tokio::select! { | ||
_ = term.recv() => {}, | ||
_ = interrupt.recv() => {}, | ||
}; | ||
|
||
tracing::info!("Shutting down gracefully..."); | ||
} | ||
|
||
#[tokio::main] | ||
async fn main() { | ||
let config = Config::init_from_env().expect("Invalid configuration:"); | ||
|
||
// Basic logging for now: | ||
// - stdout with a level configured by the RUST_LOG envvar (default=ERROR) | ||
let log_layer = tracing_subscriber::fmt::layer().with_filter(EnvFilter::from_default_env()); | ||
tracing_subscriber::registry().with(log_layer).init(); | ||
|
||
// Open the TCP port and start the server | ||
let listener = tokio::net::TcpListener::bind(config.address) | ||
.await | ||
.expect("could not bind port"); | ||
serve(config, listener, shutdown()).await | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
use std::time::Duration; | ||
|
||
use anyhow::Result; | ||
use async_trait::async_trait; | ||
use redis::AsyncCommands; | ||
use tokio::time::timeout; | ||
|
||
// average for all commands is <10ms, check grafana | ||
const REDIS_TIMEOUT_MILLISECS: u64 = 10; | ||
|
||
/// A simple redis wrapper | ||
/// Copied from capture/src/redis.rs. | ||
/// TODO: Modify this to support hincrby, get, and set commands. | ||
|
||
#[async_trait] | ||
pub trait Client { | ||
// A very simplified wrapper, but works for our usage | ||
async fn zrangebyscore(&self, k: String, min: String, max: String) -> Result<Vec<String>>; | ||
} | ||
|
||
pub struct RedisClient { | ||
client: redis::Client, | ||
} | ||
|
||
impl RedisClient { | ||
pub fn new(addr: String) -> Result<RedisClient> { | ||
let client = redis::Client::open(addr)?; | ||
|
||
Ok(RedisClient { client }) | ||
} | ||
} | ||
|
||
#[async_trait] | ||
impl Client for RedisClient { | ||
async fn zrangebyscore(&self, k: String, min: String, max: String) -> Result<Vec<String>> { | ||
let mut conn = self.client.get_async_connection().await?; | ||
|
||
let results = conn.zrangebyscore(k, min, max); | ||
let fut = timeout(Duration::from_secs(REDIS_TIMEOUT_MILLISECS), results).await?; | ||
|
||
Ok(fut?) | ||
} | ||
} | ||
|
||
// TODO: Find if there's a better way around this. | ||
// mockall got really annoying with async and results so I'm just gonna do my own | ||
#[derive(Clone)] | ||
pub struct MockRedisClient { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. By default, all cargo tests in a module run concurrently, so you indeed want isolation. Mocking in rust is hard, so I recommend using the real data stores from the compose stack:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I don't even need to go that far for redis, since all redis keys I care about are scoped by team tokens anyway, so generating a new team ought to be sufficient for both redis & pg. Will dive into this more once I actually connect - but I do like your idea of not using these mocks - just copied this for now to get a feel for how capture does it, but indeed not too happy with these mocks right now. |
||
zrangebyscore_ret: Vec<String>, | ||
} | ||
|
||
impl MockRedisClient { | ||
pub fn new() -> MockRedisClient { | ||
MockRedisClient { | ||
zrangebyscore_ret: Vec::new(), | ||
} | ||
} | ||
|
||
pub fn zrangebyscore_ret(&mut self, ret: Vec<String>) -> Self { | ||
self.zrangebyscore_ret = ret; | ||
|
||
self.clone() | ||
} | ||
} | ||
|
||
impl Default for MockRedisClient { | ||
fn default() -> Self { | ||
Self::new() | ||
} | ||
} | ||
|
||
#[async_trait] | ||
impl Client for MockRedisClient { | ||
// A very simplified wrapper, but works for our usage | ||
async fn zrangebyscore(&self, _k: String, _min: String, _max: String) -> Result<Vec<String>> { | ||
Ok(self.zrangebyscore_ret.clone()) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should consider making this a string and renaming the field to just
error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.. to give client more info about the error?
This would usually only be a db error, so internal, not very useful to expose to clients.
For now I want the same shape as the existing endpoint, so will keep this as is