From bcc45497e2bbd2f1f9c2deba4602e40a9c8619d9 Mon Sep 17 00:00:00 2001 From: Xavier Vello Date: Mon, 6 Nov 2023 14:21:29 +0100 Subject: [PATCH] add end2end testing harness to capture-server crate (#45) --- .github/workflows/rust.yml | 5 + Cargo.lock | 227 ++++++++++++++++++++++++++++++--- Cargo.toml | 2 + capture-server/Cargo.toml | 18 ++- capture-server/src/main.rs | 62 +-------- capture-server/tests/common.rs | 169 ++++++++++++++++++++++++ capture-server/tests/events.rs | 75 +++++++++++ capture/Cargo.toml | 3 +- capture/src/config.rs | 19 +++ capture/src/lib.rs | 2 + capture/src/router.rs | 1 - capture/src/server.rs | 52 ++++++++ docker-compose.yml | 51 ++++++++ 13 files changed, 605 insertions(+), 81 deletions(-) create mode 100644 capture-server/tests/common.rs create mode 100644 capture-server/tests/events.rs create mode 100644 capture/src/config.rs create mode 100644 capture/src/server.rs create mode 100644 docker-compose.yml diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 478017a..10636f1 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -38,6 +38,11 @@ jobs: steps: - uses: actions/checkout@v3 + - name: Setup end2end dependencies + run: | + docker compose up -d --wait + echo "127.0.0.1 kafka" | sudo tee -a /etc/hosts + - name: Install rust uses: dtolnay/rust-toolchain@master with: diff --git a/Cargo.lock b/Cargo.lock index 3fca31c..685687f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -205,6 +205,7 @@ dependencies = [ "axum-test-helper", "base64", "bytes", + "envconfig", "flate2", "governor", "metrics", @@ -231,9 +232,17 @@ dependencies = [ name = "capture-server" version = "0.1.0" dependencies = [ + "anyhow", + "assert-json-diff", "axum", "capture", "envconfig", + "futures", + "once_cell", + "rand", + "rdkafka", + "reqwest", + "serde_json", "time", "tokio", "tracing", @@ -406,6 +415,22 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" +[[package]] +name = "errno" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3e13f66a2f95e32a39eaa81f6b95d42878ca0e1db0c7543723dfe12557e860" +dependencies = [ + "libc", + "windows-sys", +] + +[[package]] +name = "fastrand" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5" + [[package]] name = "flate2" version = "1.0.27" @@ -431,6 +456,21 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "form_urlencoded" version = "1.2.0" @@ -458,9 +498,9 @@ checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa" [[package]] name = "futures" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40" +checksum = "da0290714b38af9b4a7b094b8a37086d1b4e61f2df9122c3cad2577669145335" dependencies = [ "futures-channel", "futures-core", @@ -473,9 +513,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2" +checksum = "ff4dd66668b557604244583e3e1e1eada8c5c2e96a6d0d6653ede395b78bbacb" dependencies = [ "futures-core", "futures-sink", @@ -483,15 +523,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c" +checksum = "eb1d22c66e66d9d72e1758f0bd7d4fd0bee04cad842ee34587d68c07e45d088c" [[package]] name = "futures-executor" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ccecee823288125bd88b4d7f565c9e58e41858e47ab72e8ea2d64e93624386e0" +checksum = "0f4fb8693db0cf099eadcca0efe2a5a22e4550f98ed16aba6c48700da29597bc" dependencies = [ "futures-core", "futures-task", @@ -500,15 +540,15 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964" +checksum = "8bf34a163b5c4c52d0478a4d757da8fb65cabef42ba90515efee0f6f9fa45aaa" [[package]] name = "futures-macro" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" +checksum = "53b153fd91e4b0147f4aced87be237c98248656bb01050b96bf3ee89220a8ddb" dependencies = [ "proc-macro2", "quote", @@ -517,15 +557,15 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f43be4fe21a13b9781a69afa4985b0f6ee0e1afab2c6f454a8cf30e2b2237b6e" +checksum = "e36d3378ee38c2a36ad710c5d30c2911d752cb941c00c72dbabfb786a7970817" [[package]] name = "futures-task" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65" +checksum = "efd193069b0ddadc69c46389b740bbccdd97203899b48d09c5f7969591d6bae2" [[package]] name = "futures-timer" @@ -535,9 +575,9 @@ checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c" [[package]] name = "futures-util" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533" +checksum = "a19526d624e703a3179b3d322efec918b6246ea0fa51d41124525f00f1cc8104" dependencies = [ "futures-channel", "futures-core", @@ -696,6 +736,19 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-tls" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" +dependencies = [ + "bytes", + "hyper", + "native-tls", + "tokio", + "tokio-native-tls", +] + [[package]] name = "idna" version = "0.4.0" @@ -780,6 +833,12 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "linux-raw-sys" +version = "0.4.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da2479e8c062e40bf0066ffa0bc823de0a9368974af99c9f6df941d2c231e03f" + [[package]] name = "lock_api" version = "0.4.10" @@ -953,6 +1012,24 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "native-tls" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07226173c32f2926027b63cce4bcd8076c3552846cbe7925f3aaffeac0a3b92e" +dependencies = [ + "lazy_static", + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework", + "security-framework-sys", + "tempfile", +] + [[package]] name = "no-std-compat" version = "0.4.1" @@ -1042,6 +1119,38 @@ version = "1.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d" +[[package]] +name = "openssl" +version = "0.10.57" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bac25ee399abb46215765b1cb35bc0212377e58a061560d8b29b024fd0430e7c" +dependencies = [ + "bitflags 2.4.0", + "cfg-if", + "foreign-types", + "libc", + "once_cell", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.38", +] + +[[package]] +name = "openssl-probe" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" + [[package]] name = "openssl-sys" version = "0.9.93" @@ -1078,7 +1187,7 @@ checksum = "93f00c865fe7cabf650081affecd3871070f26767e7b2070a3ffae14c654b447" dependencies = [ "cfg-if", "libc", - "redox_syscall", + "redox_syscall 0.3.5", "smallvec", "windows-targets", ] @@ -1343,6 +1452,15 @@ dependencies = [ "bitflags 1.3.2", ] +[[package]] +name = "redox_syscall" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4722d768eff46b75989dd134e5c353f0d6296e5aaa3132e776cbdb56be7731aa" +dependencies = [ + "bitflags 1.3.2", +] + [[package]] name = "regex" version = "1.10.0" @@ -1387,11 +1505,13 @@ dependencies = [ "http", "http-body", "hyper", + "hyper-tls", "ipnet", "js-sys", "log", "mime", "mime_guess", + "native-tls", "once_cell", "percent-encoding", "pin-project-lite", @@ -1400,6 +1520,7 @@ dependencies = [ "serde_urlencoded", "system-configuration", "tokio", + "tokio-native-tls", "tokio-util", "tower-service", "url", @@ -1416,6 +1537,19 @@ version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" +[[package]] +name = "rustix" +version = "0.38.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b426b0506e5d50a7d8dafcf2e81471400deb602392c7dd110815afb4eaf02a3" +dependencies = [ + "bitflags 2.4.0", + "errno", + "libc", + "linux-raw-sys", + "windows-sys", +] + [[package]] name = "rustversion" version = "1.0.14" @@ -1428,12 +1562,44 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1ad4cc8da4ef723ed60bced201181d83791ad433213d8c24efffda1eec85d741" +[[package]] +name = "schannel" +version = "0.1.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c3733bf4cf7ea0880754e19cb5a462007c4a8c1914bff372ccc95b464f1df88" +dependencies = [ + "windows-sys", +] + [[package]] name = "scopeguard" version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "security-framework" +version = "2.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05b64fb303737d99b81884b2c63433e9ae28abebe5eb5045dcdd175dc2ecf4de" +dependencies = [ + "bitflags 1.3.2", + "core-foundation", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework-sys" +version = "2.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e932934257d3b408ed8f30db49d85ea163bfe74961f017f405b025af298f0c7a" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "serde" version = "1.0.188" @@ -1601,6 +1767,19 @@ dependencies = [ "libc", ] +[[package]] +name = "tempfile" +version = "3.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ef1adac450ad7f4b3c28589471ade84f25f731a7a0fe30d71dfa9f60fd808e5" +dependencies = [ + "cfg-if", + "fastrand", + "redox_syscall 0.4.1", + "rustix", + "windows-sys", +] + [[package]] name = "termtree" version = "0.4.1" @@ -1710,6 +1889,16 @@ dependencies = [ "syn 2.0.38", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-util" version = "0.7.9" diff --git a/Cargo.toml b/Cargo.toml index 318b180..402243e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,7 @@ members = [ ] [workspace.dependencies] +assert-json-diff = "2.0.2" axum = "0.6.15" axum-client-ip = "0.4.1" tokio = { version = "1.0", features = ["full"] } @@ -28,3 +29,4 @@ rdkafka = { version = "0.34", features = ["cmake-build", "ssl"] } metrics = "0.21.1" metrics-exporter-prometheus = "0.12.1" thiserror = "1.0.48" +envconfig = "0.10.0" diff --git a/capture-server/Cargo.toml b/capture-server/Cargo.toml index fa7151e..1920004 100644 --- a/capture-server/Cargo.toml +++ b/capture-server/Cargo.toml @@ -4,10 +4,20 @@ version = "0.1.0" edition = "2021" [dependencies] -capture = { path = "../capture" } axum = { workspace = true } +capture = { path = "../capture" } +envconfig = { workspace = true } +time = { workspace = true } tokio = { workspace = true } -tracing-subscriber = { workspace = true } tracing = { workspace = true } -time = { workspace = true } -envconfig = "0.10.0" +tracing-subscriber = { workspace = true } + +[dev-dependencies] +anyhow = { workspace = true, features = [] } +assert-json-diff = { workspace = true } +futures = "0.3.29" +once_cell = "1.18.0" +rand = { workspace = true } +rdkafka = { workspace = true } +reqwest = "0.11.22" +serde_json = { workspace = true } diff --git a/capture-server/src/main.rs b/capture-server/src/main.rs index c2d342d..4874a43 100644 --- a/capture-server/src/main.rs +++ b/capture-server/src/main.rs @@ -1,24 +1,10 @@ -use envconfig::Envconfig; -use std::net::SocketAddr; -use std::sync::Arc; +use std::net::TcpListener; -use capture::{billing_limits::BillingLimiter, redis::RedisClient, router, sink}; -use time::Duration; +use envconfig::Envconfig; use tokio::signal; -#[derive(Envconfig)] -struct Config { - #[envconfig(default = "false")] - print_sink: bool, - #[envconfig(default = "127.0.0.1:3000")] - address: SocketAddr, - redis_url: String, - - kafka_hosts: String, - kafka_topic: String, - #[envconfig(default = "false")] - kafka_tls: bool, -} +use capture::config::Config; +use capture::server::serve; async fn shutdown() { let mut term = signal::unix::signal(signal::unix::SignalKind::terminate()) @@ -41,42 +27,6 @@ async fn main() { tracing_subscriber::fmt::init(); let config = Config::init_from_env().expect("Invalid configuration:"); - - let redis_client = - Arc::new(RedisClient::new(config.redis_url).expect("failed to create redis client")); - - let billing = BillingLimiter::new(Duration::seconds(5), redis_client.clone()) - .expect("failed to create billing limiter"); - - let app = if config.print_sink { - router::router( - capture::time::SystemTime {}, - sink::PrintSink {}, - redis_client, - billing, - true, - ) - } else { - let sink = - sink::KafkaSink::new(config.kafka_topic, config.kafka_hosts, config.kafka_tls).unwrap(); - - router::router( - capture::time::SystemTime {}, - sink, - redis_client, - billing, - true, - ) - }; - - // run our app with hyper - // `axum::Server` is a re-export of `hyper::Server` - - tracing::info!("listening on {}", config.address); - - axum::Server::bind(&config.address) - .serve(app.into_make_service_with_connect_info::()) - .with_graceful_shutdown(shutdown()) - .await - .unwrap(); + let listener = TcpListener::bind(config.address).unwrap(); + serve(config, listener, shutdown()).await } diff --git a/capture-server/tests/common.rs b/capture-server/tests/common.rs new file mode 100644 index 0000000..40836ca --- /dev/null +++ b/capture-server/tests/common.rs @@ -0,0 +1,169 @@ +#![allow(dead_code)] + +use std::default::Default; +use std::net::{SocketAddr, TcpListener}; +use std::str::FromStr; +use std::string::ToString; +use std::sync::{Arc, Once}; +use std::time::Duration; + +use anyhow::bail; +use once_cell::sync::Lazy; +use rand::distributions::Alphanumeric; +use rand::Rng; +use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication}; +use rdkafka::config::{ClientConfig, FromClientConfig}; +use rdkafka::consumer::{BaseConsumer, Consumer}; +use rdkafka::util::Timeout; +use rdkafka::{Message, TopicPartitionList}; +use tokio::sync::Notify; +use tracing::debug; + +use capture::config::Config; +use capture::server::serve; + +pub static DEFAULT_CONFIG: Lazy = Lazy::new(|| Config { + print_sink: false, + address: SocketAddr::from_str("127.0.0.1:0").unwrap(), + export_prometheus: false, + redis_url: "redis://localhost:6379/".to_string(), + kafka_hosts: "kafka:9092".to_string(), + kafka_topic: "events_plugin_ingestion".to_string(), + kafka_tls: false, +}); + +static TRACING_INIT: Once = Once::new(); +pub fn setup_tracing() { + TRACING_INIT.call_once(|| { + tracing_subscriber::fmt() + .with_writer(tracing_subscriber::fmt::TestWriter::new()) + .init() + }); +} +pub struct ServerHandle { + pub addr: SocketAddr, + shutdown: Arc, +} + +impl ServerHandle { + pub fn for_topic(topic: &EphemeralTopic) -> Self { + let mut config = DEFAULT_CONFIG.clone(); + config.kafka_topic = topic.topic_name().to_string(); + Self::for_config(config) + } + pub fn for_config(config: Config) -> Self { + let listener = TcpListener::bind("127.0.0.1:0").unwrap(); + let addr = listener.local_addr().unwrap(); + let notify = Arc::new(Notify::new()); + let shutdown = notify.clone(); + + tokio::spawn( + async move { serve(config, listener, async { notify.notified().await }).await }, + ); + Self { addr, shutdown } + } + + pub async fn capture_events>(&self, body: T) -> reqwest::Response { + let client = reqwest::Client::new(); + client + .post(format!("http://{:?}/i/v0/e", self.addr)) + .body(body) + .send() + .await + .expect("failed to send request") + } +} + +impl Drop for ServerHandle { + fn drop(&mut self) { + self.shutdown.notify_one() + } +} + +pub struct EphemeralTopic { + consumer: BaseConsumer, + read_timeout: Timeout, + topic_name: String, +} + +impl EphemeralTopic { + pub async fn new() -> Self { + let mut config = ClientConfig::new(); + config.set("group.id", "capture_integration_tests"); + config.set("bootstrap.servers", DEFAULT_CONFIG.kafka_hosts.clone()); + config.set("debug", "all"); + + // TODO: check for name collision? + let topic_name = random_string("events_", 16); + let admin = AdminClient::from_config(&config).expect("failed to create admin client"); + admin + .create_topics( + &[NewTopic { + name: &topic_name, + num_partitions: 1, + replication: TopicReplication::Fixed(1), + config: vec![], + }], + &AdminOptions::default(), + ) + .await + .expect("failed to create topic"); + + let consumer: BaseConsumer = config.create().expect("failed to create consumer"); + let mut assignment = TopicPartitionList::new(); + assignment.add_partition(&topic_name, 0); + consumer + .assign(&assignment) + .expect("failed to assign topic"); + + Self { + consumer, + read_timeout: Timeout::After(Duration::from_secs(5)), + topic_name, + } + } + + pub fn next_event(&self) -> anyhow::Result { + match self.consumer.poll(self.read_timeout) { + Some(Ok(message)) => { + let body = message.payload().expect("empty kafka message"); + let event = serde_json::from_slice(body)?; + Ok(event) + } + Some(Err(err)) => bail!("kafka read error: {}", err), + None => bail!("kafka read timeout"), + } + } + + pub fn topic_name(&self) -> &str { + &self.topic_name + } +} + +impl Drop for EphemeralTopic { + fn drop(&mut self) { + debug!("dropping EphemeralTopic {}...", self.topic_name); + _ = self.consumer.unassign(); + futures::executor::block_on(delete_topic(self.topic_name.clone())); + debug!("dropped topic"); + } +} + +async fn delete_topic(topic: String) { + let mut config = ClientConfig::new(); + config.set("bootstrap.servers", DEFAULT_CONFIG.kafka_hosts.clone()); + let admin = AdminClient::from_config(&config).expect("failed to create admin client"); + admin + .delete_topics(&[&topic], &AdminOptions::default()) + .await + .expect("failed to delete topic"); +} + +pub fn random_string(prefix: &str, length: usize) -> String { + let suffix: String = rand::thread_rng() + .sample_iter(Alphanumeric) + .take(length) + .map(char::from) + .collect(); + format!("{}_{}", prefix, suffix) +} diff --git a/capture-server/tests/events.rs b/capture-server/tests/events.rs new file mode 100644 index 0000000..42facd8 --- /dev/null +++ b/capture-server/tests/events.rs @@ -0,0 +1,75 @@ +use anyhow::Result; +use assert_json_diff::assert_json_include; +use reqwest::StatusCode; +use serde_json::json; + +use crate::common::*; +mod common; + +#[tokio::test] +async fn it_captures_one_event() -> Result<()> { + setup_tracing(); + let token = random_string("token", 16); + let distinct_id = random_string("id", 16); + let topic = EphemeralTopic::new().await; + let server = ServerHandle::for_topic(&topic); + + let event = json!({ + "token": token, + "event": "testing", + "distinct_id": distinct_id + }); + let res = server.capture_events(event.to_string()).await; + assert_eq!(StatusCode::OK, res.status()); + + let event = topic.next_event()?; + assert_json_include!( + actual: event, + expected: json!({ + "token": token, + "distinct_id": distinct_id + }) + ); + + Ok(()) +} + +#[tokio::test] +async fn it_captures_a_batch() -> Result<()> { + setup_tracing(); + let token = random_string("token", 16); + let distinct_id1 = random_string("id", 16); + let distinct_id2 = random_string("id", 16); + + let topic = EphemeralTopic::new().await; + let server = ServerHandle::for_topic(&topic); + + let event = json!([{ + "token": token, + "event": "event1", + "distinct_id": distinct_id1 + },{ + "token": token, + "event": "event2", + "distinct_id": distinct_id2 + }]); + let res = server.capture_events(event.to_string()).await; + assert_eq!(StatusCode::OK, res.status()); + + assert_json_include!( + actual: topic.next_event()?, + expected: json!({ + "token": token, + "distinct_id": distinct_id1 + }) + ); + assert_json_include!( + actual: topic.next_event()?, + expected: json!({ + "token": token, + "distinct_id": distinct_id2 + }) + ); + + Ok(()) +} diff --git a/capture/Cargo.toml b/capture/Cargo.toml index 7c84c6c..6bdfc39 100644 --- a/capture/Cargo.toml +++ b/capture/Cargo.toml @@ -30,9 +30,10 @@ metrics = { workspace = true } metrics-exporter-prometheus = { workspace = true } thiserror = { workspace = true } redis = { version="0.23.3", features=["tokio-comp", "cluster", "cluster-async"] } +envconfig = { workspace = true } [dev-dependencies] -assert-json-diff = "2.0.2" +assert-json-diff = { workspace = true } axum-test-helper = "0.2.0" mockall = "0.11.2" redis-test = "0.2.3" diff --git a/capture/src/config.rs b/capture/src/config.rs new file mode 100644 index 0000000..6edf438 --- /dev/null +++ b/capture/src/config.rs @@ -0,0 +1,19 @@ +use std::net::SocketAddr; + +use envconfig::Envconfig; + +#[derive(Envconfig, Clone)] +pub struct Config { + #[envconfig(default = "false")] + pub print_sink: bool, + #[envconfig(default = "127.0.0.1:3000")] + pub address: SocketAddr, + pub redis_url: String, + #[envconfig(default = "true")] + pub export_prometheus: bool, + + pub kafka_hosts: String, + pub kafka_topic: String, + #[envconfig(default = "false")] + pub kafka_tls: bool, +} diff --git a/capture/src/lib.rs b/capture/src/lib.rs index fcd802b..70f7548 100644 --- a/capture/src/lib.rs +++ b/capture/src/lib.rs @@ -1,10 +1,12 @@ pub mod api; pub mod billing_limits; pub mod capture; +pub mod config; pub mod event; pub mod prometheus; pub mod redis; pub mod router; +pub mod server; pub mod sink; pub mod time; pub mod token; diff --git a/capture/src/router.rs b/capture/src/router.rs index 58fef22..9acc7f4 100644 --- a/capture/src/router.rs +++ b/capture/src/router.rs @@ -66,7 +66,6 @@ pub fn router< // does not work well. if metrics { let recorder_handle = setup_metrics_recorder(); - router.route("/metrics", get(move || ready(recorder_handle.render()))) } else { router diff --git a/capture/src/server.rs b/capture/src/server.rs new file mode 100644 index 0000000..bee579e --- /dev/null +++ b/capture/src/server.rs @@ -0,0 +1,52 @@ +use std::future::Future; +use std::net::{SocketAddr, TcpListener}; +use std::sync::Arc; + +use time::Duration; + +use crate::billing_limits::BillingLimiter; +use crate::config::Config; +use crate::redis::RedisClient; +use crate::{router, sink}; + +pub async fn serve(config: Config, listener: TcpListener, shutdown: F) +where + F: Future, +{ + let redis_client = + Arc::new(RedisClient::new(config.redis_url).expect("failed to create redis client")); + + let billing = BillingLimiter::new(Duration::seconds(5), redis_client.clone()) + .expect("failed to create billing limiter"); + + let app = if config.print_sink { + router::router( + crate::time::SystemTime {}, + sink::PrintSink {}, + redis_client, + billing, + config.export_prometheus, + ) + } else { + let sink = + sink::KafkaSink::new(config.kafka_topic, config.kafka_hosts, config.kafka_tls).unwrap(); + + router::router( + crate::time::SystemTime {}, + sink, + redis_client, + billing, + config.export_prometheus, + ) + }; + + // run our app with hyper + // `axum::Server` is a re-export of `hyper::Server` + tracing::info!("listening on {:?}", listener.local_addr().unwrap()); + axum::Server::from_tcp(listener) + .unwrap() + .serve(app.into_make_service_with_connect_info::()) + .with_graceful_shutdown(shutdown) + .await + .unwrap() +} diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..804ae78 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,51 @@ +version: "3" + +services: + zookeeper: + image: zookeeper:3.7.0 + restart: on-failure + + kafka: + image: ghcr.io/posthog/kafka-container:v2.8.2 + restart: on-failure + depends_on: + - zookeeper + environment: + KAFKA_BROKER_ID: 1001 + KAFKA_CFG_RESERVED_BROKER_MAX_ID: 1001 + KAFKA_CFG_LISTENERS: PLAINTEXT://:9092 + KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092 + KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181 + ALLOW_PLAINTEXT_LISTENER: 'true' + ports: + - '9092:9092' + healthcheck: + test: kafka-cluster.sh cluster-id --bootstrap-server localhost:9092 || exit 1 + interval: 3s + timeout: 10s + retries: 10 + + redis: + image: redis:6.2.7-alpine + restart: on-failure + command: redis-server --maxmemory-policy allkeys-lru --maxmemory 200mb + ports: + - '6379:6379' + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 3s + timeout: 10s + retries: 10 + + kafka-ui: + image: provectuslabs/kafka-ui:latest + profiles: ["ui"] + ports: + - '8080:8080' + depends_on: + - zookeeper + - kafka + environment: + KAFKA_CLUSTERS_0_NAME: local + KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092 + KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181