Skip to content
This repository was archived by the owner on Feb 8, 2024. It is now read-only.

Commit f6a3d3c

Browse files
committed
allow concurrency between tests
1 parent 7cbe6b9 commit f6a3d3c

File tree

10 files changed

+481
-70
lines changed

10 files changed

+481
-70
lines changed

Cargo.lock

Lines changed: 205 additions & 19 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ members = [
55
]
66

77
[workspace.dependencies]
8+
assert-json-diff = "2.0.2"
89
axum = "0.6.15"
910
axum-client-ip = "0.4.1"
1011
tokio = { version = "1.0", features = ["full"] }

capture-server/Cargo.toml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,11 @@ time = { workspace = true }
1313
envconfig = "0.10.0"
1414
anyhow = { workspace = true, features = [] }
1515
once_cell = "1.18.0"
16+
17+
[dev-dependencies]
18+
assert-json-diff = { workspace = true }
19+
futures = "0.3.29"
20+
rand = { workspace = true }
21+
rdkafka = { workspace = true }
22+
reqwest = "0.11.22"
23+
serde_json = { workspace = true }

capture-server/src/config.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ pub struct Config {
88
#[envconfig(default = "127.0.0.1:3000")]
99
pub address: SocketAddr,
1010
pub redis_url: String,
11+
#[envconfig(default = "true")]
12+
pub export_prometheus: bool,
1113

1214
pub kafka_hosts: String,
1315
pub kafka_topic: String,

capture-server/src/server.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ where
2323
sink::PrintSink {},
2424
redis_client,
2525
billing,
26-
true,
26+
config.export_prometheus,
2727
)
2828
} else {
2929
let sink =
@@ -34,7 +34,7 @@ where
3434
sink,
3535
redis_client,
3636
billing,
37-
true,
37+
config.export_prometheus,
3838
)
3939
};
4040

capture-server/tests/common.rs

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
#![allow(dead_code)]
2+
3+
use std::default::Default;
4+
use std::net::{SocketAddr, TcpListener};
5+
use std::str::FromStr;
6+
use std::string::ToString;
7+
use std::sync::{Arc, Once};
8+
use std::time::Duration;
9+
10+
use anyhow::bail;
11+
use once_cell::sync::Lazy;
12+
use rand::distributions::Alphanumeric;
13+
use rand::Rng;
14+
use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication};
15+
use rdkafka::config::{ClientConfig, FromClientConfig};
16+
use rdkafka::consumer::{BaseConsumer, Consumer};
17+
use rdkafka::util::Timeout;
18+
use rdkafka::{Message, TopicPartitionList};
19+
use tokio::sync::Notify;
20+
use tracing::debug;
21+
22+
use capture_server::config::Config;
23+
use capture_server::server::serve;
24+
25+
pub static DEFAULT_CONFIG: Lazy<Config> = Lazy::new(|| Config {
26+
print_sink: false,
27+
address: SocketAddr::from_str("127.0.0.1:0").unwrap(),
28+
export_prometheus: false,
29+
redis_url: "redis://localhost:6379/".to_string(),
30+
kafka_hosts: "kafka:9092".to_string(),
31+
kafka_topic: "events_plugin_ingestion".to_string(),
32+
kafka_tls: false,
33+
});
34+
35+
static TRACING_INIT: Once = Once::new();
36+
pub fn setup_tracing() {
37+
TRACING_INIT.call_once(|| {
38+
tracing_subscriber::fmt()
39+
.with_writer(tracing_subscriber::fmt::TestWriter::new())
40+
.init()
41+
});
42+
}
43+
pub struct ServerHandle {
44+
pub addr: SocketAddr,
45+
shutdown: Arc<Notify>,
46+
}
47+
48+
impl ServerHandle {
49+
pub fn for_topic(topic: &EphemeralTopic) -> Self {
50+
let mut config = DEFAULT_CONFIG.clone();
51+
config.kafka_topic = topic.topic_name().to_string();
52+
Self::for_config(config)
53+
}
54+
pub fn for_config(config: Config) -> Self {
55+
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
56+
let addr = listener.local_addr().unwrap();
57+
let notify = Arc::new(Notify::new());
58+
let shutdown = notify.clone();
59+
60+
tokio::spawn(
61+
async move { serve(config, listener, async { notify.notified().await }).await },
62+
);
63+
Self { addr, shutdown }
64+
}
65+
66+
pub async fn capture_events<T: Into<reqwest::Body>>(&self, body: T) -> reqwest::Response {
67+
let client = reqwest::Client::new();
68+
client
69+
.post(format!("http://{:?}/i/v0/e", self.addr))
70+
.body(body)
71+
.send()
72+
.await
73+
.expect("failed to send request")
74+
}
75+
}
76+
77+
impl Drop for ServerHandle {
78+
fn drop(&mut self) {
79+
self.shutdown.notify_one()
80+
}
81+
}
82+
83+
pub struct EphemeralTopic {
84+
consumer: BaseConsumer,
85+
read_timeout: Timeout,
86+
topic_name: String,
87+
}
88+
89+
impl EphemeralTopic {
90+
pub async fn new() -> Self {
91+
let mut config = ClientConfig::new();
92+
config.set("group.id", "capture_integration_tests");
93+
config.set("bootstrap.servers", DEFAULT_CONFIG.kafka_hosts.clone());
94+
config.set("debug", "all");
95+
96+
// TODO: check for name collision?
97+
let topic_name = random_string("events_", 16);
98+
let admin = AdminClient::from_config(&config).expect("failed to create admin client");
99+
admin
100+
.create_topics(
101+
&[NewTopic {
102+
name: &topic_name,
103+
num_partitions: 1,
104+
replication: TopicReplication::Fixed(1),
105+
config: vec![],
106+
}],
107+
&AdminOptions::default(),
108+
)
109+
.await
110+
.expect("failed to create topic");
111+
112+
let consumer: BaseConsumer = config.create().expect("failed to create consumer");
113+
let mut assignment = TopicPartitionList::new();
114+
assignment.add_partition(&topic_name, 0);
115+
consumer
116+
.assign(&assignment)
117+
.expect("failed to assign topic");
118+
119+
Self {
120+
consumer,
121+
read_timeout: Timeout::After(Duration::from_secs(5)),
122+
topic_name,
123+
}
124+
}
125+
126+
pub fn next_event(&self) -> anyhow::Result<serde_json::Value> {
127+
match self.consumer.poll(self.read_timeout) {
128+
Some(Ok(message)) => {
129+
let body = message.payload().expect("empty kafka message");
130+
let event = serde_json::from_slice(body)?;
131+
Ok(event)
132+
}
133+
Some(Err(err)) => bail!("kafka read error: {}", err),
134+
None => bail!("kafka read timeout"),
135+
}
136+
}
137+
138+
pub fn topic_name(&self) -> &str {
139+
&self.topic_name
140+
}
141+
}
142+
143+
impl Drop for EphemeralTopic {
144+
fn drop(&mut self) {
145+
debug!("dropping EphemeralTopic {}...", self.topic_name);
146+
_ = self.consumer.unassign();
147+
futures::executor::block_on(delete_topic(self.topic_name.clone()));
148+
debug!("dropped topic");
149+
}
150+
}
151+
152+
async fn delete_topic(topic: String) {
153+
let mut config = ClientConfig::new();
154+
config.set("bootstrap.servers", DEFAULT_CONFIG.kafka_hosts.clone());
155+
let admin = AdminClient::from_config(&config).expect("failed to create admin client");
156+
admin
157+
.delete_topics(&[&topic], &AdminOptions::default())
158+
.await
159+
.expect("failed to delete topic");
160+
}
161+
162+
pub fn random_string(prefix: &str, length: usize) -> String {
163+
let suffix: String = rand::thread_rng()
164+
.sample_iter(Alphanumeric)
165+
.take(length)
166+
.map(char::from)
167+
.collect();
168+
format!("{}_{}", prefix, suffix)
169+
}

capture-server/tests/events.rs

Lines changed: 68 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,54 +1,75 @@
1-
use std::net::{SocketAddr, TcpListener};
2-
use std::str::FromStr;
3-
use std::string::ToString;
4-
51
use anyhow::Result;
6-
use once_cell::sync::Lazy;
7-
use tokio::sync::oneshot;
8-
use tokio::task::JoinHandle;
9-
10-
use capture_server::config::Config;
11-
use capture_server::server::serve;
12-
13-
static DEFAULT_CONFIG: Lazy<Config> = Lazy::new(|| Config {
14-
print_sink: false,
15-
address: SocketAddr::from_str("127.0.0.1:0").unwrap(),
16-
redis_url: "redis://localhost:6379/".to_string(),
17-
kafka_hosts: "kafka:9092".to_string(),
18-
kafka_topic: "events_plugin_ingestion".to_string(),
19-
kafka_tls: false,
20-
});
21-
22-
struct ServerHandle {
23-
addr: SocketAddr,
24-
shutdown: oneshot::Sender<()>,
25-
join: JoinHandle<()>,
26-
}
2+
use assert_json_diff::assert_json_include;
3+
use reqwest::StatusCode;
4+
use serde_json::json;
275

28-
impl ServerHandle {
29-
fn new(config: Config) -> Self {
30-
let (shutdown, rx) = oneshot::channel::<()>();
31-
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
32-
let addr = listener.local_addr().unwrap();
33-
let join =
34-
tokio::spawn(async move { serve(config, listener, async { rx.await.unwrap() }).await });
35-
Self {
36-
addr,
37-
shutdown,
38-
join,
39-
}
40-
}
41-
42-
async fn stop(self) -> Result<()> {
43-
self.shutdown.send(()).unwrap();
44-
self.join.await?;
45-
Ok(())
46-
}
47-
}
6+
use crate::common::*;
7+
mod common;
488

499
#[tokio::test]
5010
async fn it_captures_one_event() -> Result<()> {
51-
let server = ServerHandle::new(DEFAULT_CONFIG.clone());
11+
setup_tracing();
12+
let token = random_string("token", 16);
13+
let distinct_id = random_string("id", 16);
14+
let topic = EphemeralTopic::new().await;
15+
let server = ServerHandle::for_topic(&topic);
16+
17+
let event = json!({
18+
"token": token,
19+
"event": "testing",
20+
"distinct_id": distinct_id
21+
});
22+
let res = server.capture_events(event.to_string()).await;
23+
assert_eq!(StatusCode::OK, res.status());
24+
25+
let event = topic.next_event()?;
26+
assert_json_include!(
27+
actual: event,
28+
expected: json!({
29+
"token": token,
30+
"distinct_id": distinct_id
31+
})
32+
);
33+
34+
Ok(())
35+
}
36+
37+
#[tokio::test]
38+
async fn it_captures_a_batch() -> Result<()> {
39+
setup_tracing();
40+
let token = random_string("token", 16);
41+
let distinct_id1 = random_string("id", 16);
42+
let distinct_id2 = random_string("id", 16);
43+
44+
let topic = EphemeralTopic::new().await;
45+
let server = ServerHandle::for_topic(&topic);
46+
47+
let event = json!([{
48+
"token": token,
49+
"event": "event1",
50+
"distinct_id": distinct_id1
51+
},{
52+
"token": token,
53+
"event": "event2",
54+
"distinct_id": distinct_id2
55+
}]);
56+
let res = server.capture_events(event.to_string()).await;
57+
assert_eq!(StatusCode::OK, res.status());
58+
59+
assert_json_include!(
60+
actual: topic.next_event()?,
61+
expected: json!({
62+
"token": token,
63+
"distinct_id": distinct_id1
64+
})
65+
);
66+
assert_json_include!(
67+
actual: topic.next_event()?,
68+
expected: json!({
69+
"token": token,
70+
"distinct_id": distinct_id2
71+
})
72+
);
5273

53-
server.stop().await
74+
Ok(())
5475
}

capture/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ thiserror = { workspace = true }
3232
redis = { version="0.23.3", features=["tokio-comp", "cluster", "cluster-async"] }
3333

3434
[dev-dependencies]
35-
assert-json-diff = "2.0.2"
35+
assert-json-diff = { workspace = true }
3636
axum-test-helper = "0.2.0"
3737
mockall = "0.11.2"
3838
redis-test = "0.2.3"

capture/src/router.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@ pub fn router<
6666
// does not work well.
6767
if metrics {
6868
let recorder_handle = setup_metrics_recorder();
69-
7069
router.route("/metrics", get(move || ready(recorder_handle.render())))
7170
} else {
7271
router

docker-compose.yml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
version: "3"
2+
13
services:
24
zookeeper:
35
image: zookeeper:3.7.0
@@ -17,10 +19,33 @@ services:
1719
ALLOW_PLAINTEXT_LISTENER: 'true'
1820
ports:
1921
- '9092:9092'
22+
healthcheck:
23+
test: kafka-cluster.sh cluster-id --bootstrap-server localhost:9092 || exit 1
24+
interval: 3s
25+
timeout: 10s
26+
retries: 10
2027

2128
redis:
2229
image: redis:6.2.7-alpine
2330
restart: on-failure
2431
command: redis-server --maxmemory-policy allkeys-lru --maxmemory 200mb
2532
ports:
2633
- '6379:6379'
34+
healthcheck:
35+
test: ["CMD", "redis-cli", "ping"]
36+
interval: 3s
37+
timeout: 10s
38+
retries: 10
39+
40+
kafka-ui:
41+
image: provectuslabs/kafka-ui:latest
42+
profiles: ["ui"]
43+
ports:
44+
- '8080:8080'
45+
depends_on:
46+
- zookeeper
47+
- kafka
48+
environment:
49+
KAFKA_CLUSTERS_0_NAME: local
50+
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
51+
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181

0 commit comments

Comments
 (0)