Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

feat: accept other JSON types as distinct_id, stringify them #65

Merged
merged 1 commit into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 7 additions & 20 deletions capture/src/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,12 @@ pub async fn event(
let payload = base64::engine::general_purpose::STANDARD
.decode(input.data)
.unwrap();
RawEvent::from_bytes(&meta, payload.into())
RawEvent::from_bytes(payload.into())
}
ct => {
tracing::Span::current().record("content_type", ct);

RawEvent::from_bytes(&meta, body)
RawEvent::from_bytes(body)
}
}?;

Expand Down Expand Up @@ -165,19 +165,6 @@ pub fn process_single_event(
event: &RawEvent,
context: &ProcessingContext,
) -> Result<ProcessedEvent, CaptureError> {
let distinct_id = match &event.distinct_id {
Some(id) => id,
None => match event.properties.get("distinct_id").map(|v| v.as_str()) {
Some(Some(id)) => id,
_ => return Err(CaptureError::MissingDistinctId),
},
};
// Limit the size of distinct_id to 200 chars
let distinct_id: String = match distinct_id.len() {
0..=200 => distinct_id.to_owned(),
_ => distinct_id.chars().take(200).collect(),
};

if event.event.is_empty() {
return Err(CaptureError::MissingEventName);
}
Expand All @@ -189,7 +176,7 @@ pub fn process_single_event(

Ok(ProcessedEvent {
uuid: event.uuid.unwrap_or_else(uuid_v7),
distinct_id,
distinct_id: event.extract_distinct_id()?,
ip: context.client_ip.clone(),
data,
now: context.now.clone(),
Expand Down Expand Up @@ -252,7 +239,7 @@ mod tests {
let events = vec![
RawEvent {
token: Some(String::from("hello")),
distinct_id: Some("testing".to_string()),
distinct_id: Some(json!("testing")),
uuid: None,
event: String::new(),
properties: HashMap::new(),
Expand All @@ -263,7 +250,7 @@ mod tests {
},
RawEvent {
token: None,
distinct_id: Some("testing".to_string()),
distinct_id: Some(json!("testing")),
uuid: None,
event: String::new(),
properties: HashMap::from([(String::from("token"), json!("hello"))]),
Expand All @@ -283,7 +270,7 @@ mod tests {
let events = vec![
RawEvent {
token: Some(String::from("hello")),
distinct_id: Some("testing".to_string()),
distinct_id: Some(json!("testing")),
uuid: None,
event: String::new(),
properties: HashMap::new(),
Expand All @@ -294,7 +281,7 @@ mod tests {
},
RawEvent {
token: None,
distinct_id: Some("testing".to_string()),
distinct_id: Some(json!("testing")),
uuid: None,
event: String::new(),
properties: HashMap::from([(String::from("token"), json!("goodbye"))]),
Expand Down
159 changes: 139 additions & 20 deletions capture/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ pub struct RawEvent {
skip_serializing_if = "Option::is_none"
)]
pub token: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub distinct_id: Option<String>,
#[serde(alias = "$distinct_id", skip_serializing_if = "Option::is_none")]
pub distinct_id: Option<Value>, // posthog-js accepts arbitrary values as distinct_id
pub uuid: Option<Uuid>,
pub event: String,
#[serde(default)]
Expand Down Expand Up @@ -87,7 +87,7 @@ impl RawEvent {
/// Instead of trusting the parameter, we peek at the payload's first three bytes to
/// detect gzip, fallback to uncompressed utf8 otherwise.
#[instrument(skip_all)]
pub fn from_bytes(_query: &EventQuery, bytes: Bytes) -> Result<Vec<RawEvent>, CaptureError> {
pub fn from_bytes(bytes: Bytes) -> Result<Vec<RawEvent>, CaptureError> {
tracing::debug!(len = bytes.len(), "decoding new event");

let payload = if bytes.starts_with(&GZIP_MAGIC_NUMBERS) {
Expand Down Expand Up @@ -119,6 +119,30 @@ impl RawEvent {
.map(String::from),
}
}

/// Extracts, stringifies and trims the distinct_id to a 200 chars String.
/// SDKs send the distinct_id either in the root field or as a property,
/// and can send string, number, array, or map values. We try to best-effort
/// stringify complex values, and make sure it's not longer than 200 chars.
pub fn extract_distinct_id(&self) -> Result<String, CaptureError> {
// Breaking change compared to capture-py: None / Null is not allowed.
let value = match &self.distinct_id {
None | Some(Value::Null) => match self.properties.get("distinct_id") {
None | Some(Value::Null) => return Err(CaptureError::MissingDistinctId),
Some(id) => id,
},
Some(id) => id,
};

let distinct_id = value
.as_str()
.map(|s| s.to_owned())
.unwrap_or_else(|| value.to_string());
Ok(match distinct_id.len() {
0..=200 => distinct_id,
_ => distinct_id.chars().take(200).collect(),
})
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -150,29 +174,124 @@ impl ProcessedEvent {

#[cfg(test)]
mod tests {
use super::Compression;
use base64::Engine as _;
use bytes::Bytes;
use rand::distributions::Alphanumeric;
use rand::Rng;
use serde_json::json;

use super::{EventQuery, RawEvent};
use super::CaptureError;
use super::RawEvent;

#[test]
fn decode_bytes() {
let horrible_blob = "H4sIAAAAAAAAA31T207cMBD9lSrikSy+5bIrVX2g4oWWUlEqBEKRY08Sg4mD4+xCEf/e8XLZBSGeEp+ZOWOfmXPxkMAS+pAskp1BtmBBLiHZTQbvBvDBwJgsHpIdh5/kp1Rffp18OcMwAtUS/GhcjwFKZjSbkYjX3q1G8AgeGA+Nu4ughqVRUIX7ATDwHcbr4IYYUJP32LyavMVAF8Kw2NuzTknbuTEsSkIIHlvTf+vhLnzdizUxgslvs2JgkKHr5U1s8VS0dZ/NZSnlW7CVfTvhs7EG+vT0JJaMygP0VQem7bDTvBAbcGV06JAkIwTBpYHV4Hx4zS1FJH+FX7IFj7A1NbZZQR2b4GFbwFlWzFjETY/XCpXRiN538yt/S9mdnm7bSa+lDCY+kOalKDJGs/msZMVuos0YTK+e62hZciHqes7LnDcpoVmTg+TAaqnKMhWUaaa4TllBoCDpJn2uYK3k87xeyFjZFHWdzxmdq5Q0IstBzRXlDMiHbM/5kgnerKfs+tFZqHAolQflvDZ9W0Evawu6wveiENVoND4s+Ami2jBGZbayn/42g3xblizX4skp4FYMYfJQoSQf8DfSjrGBVMEsoWpArpMbK1vc8ItLDG1j1SDvrZM6muBxN/Eg7U1cVFw70KmyRl13bhqjYeBGGrtuFqWTSzzF/q8tRyvV9SfxHXQLoBuidXY0ekeF+KQnNCqgHXaIy7KJBncNERk6VUFhhB33j8zv5uhQ/rCTvbq9/9seH5Pj3Bf/TsuzYf9g2j+3h9N6yZ8Vfpmx4KSguSY5S0lOqc5LmgmhidoMmOaixoFvktFKOo9kK9Nrt3rPxViWk5RwIhtJykZzXohP2DjmZ08+bnH/4B1fkUnGSp2SMmNlIYTguS5ga//eERZZTSVeD8cWPTMGeTMgHSOMpyRLGftDyUKwBV9b6Dx5vPwPzQHjFwsFAAA=";
let decoded_horrible_blob = base64::engine::general_purpose::STANDARD
.decode(horrible_blob)
.unwrap();

let bytes = Bytes::from(decoded_horrible_blob);
let events = RawEvent::from_bytes(
&EventQuery {
compression: Some(Compression::Gzip),
lib_version: None,
sent_at: None,
},
bytes,
fn decode_uncompressed_raw_event() {
let base64_payload = "ewogICAgImRpc3RpbmN0X2lkIjogIm15X2lkMSIsCiAgICAiZXZlbnQiOiAibXlfZXZlbnQxIiwKICAgICJwcm9wZXJ0aWVzIjogewogICAgICAgICIkZGV2aWNlX3R5cGUiOiAiRGVza3RvcCIKICAgIH0sCiAgICAiYXBpX2tleSI6ICJteV90b2tlbjEiCn0K";
let compressed_bytes = Bytes::from(
base64::engine::general_purpose::STANDARD
.decode(base64_payload)
.expect("payload is not base64"),
);

let events = RawEvent::from_bytes(compressed_bytes).expect("failed to parse");
assert_eq!(1, events.len());
assert_eq!(Some("my_token1".to_string()), events[0].extract_token());
assert_eq!("my_event1".to_string(), events[0].event);
assert_eq!(
"my_id1".to_string(),
events[0]
.extract_distinct_id()
.expect("cannot find distinct_id")
);
}
#[test]
fn decode_gzipped_raw_event() {
let base64_payload = "H4sIADQSbmUCAz2MsQqAMAxE936FBEcnR2f/o4i9IRTb0AahiP9urcVMx3t3ucxQjxxn5bCrZUfLQEepYabpkzgRtOOWfyMpCpIyctVXY42PDifvsFoE73BF9hqFWuPu403YepT+WKNHmMnc5gENoFu2kwAAAA==";
let compressed_bytes = Bytes::from(
base64::engine::general_purpose::STANDARD
.decode(base64_payload)
.expect("payload is not base64"),
);

assert!(events.is_ok());
let events = RawEvent::from_bytes(compressed_bytes).expect("failed to parse");
assert_eq!(1, events.len());
assert_eq!(Some("my_token2".to_string()), events[0].extract_token());
assert_eq!("my_event2".to_string(), events[0].event);
assert_eq!(
"my_id2".to_string(),
events[0]
.extract_distinct_id()
.expect("cannot find distinct_id")
);
}

#[test]
fn extract_distinct_id() {
let parse_and_extract = |input: &'static str| -> Result<String, CaptureError> {
let parsed = RawEvent::from_bytes(input.into()).expect("failed to parse");
parsed[0].extract_distinct_id()
};
// Return MissingDistinctId if not found
assert!(matches!(
parse_and_extract(r#"{"event": "e"}"#),
Err(CaptureError::MissingDistinctId)
));
// Return MissingDistinctId if null, breaking compat with capture-py
assert!(matches!(
parse_and_extract(r#"{"event": "e", "distinct_id": null}"#),
Err(CaptureError::MissingDistinctId)
));

let assert_extracted_id = |input: &'static str, expected: &str| {
let id = parse_and_extract(input).expect("failed to extract");
assert_eq!(id, expected);
};
// Happy path: toplevel field present
assert_extracted_id(r#"{"event": "e", "distinct_id": "myid"}"#, "myid");
assert_extracted_id(r#"{"event": "e", "$distinct_id": "23"}"#, "23");

// Sourced from properties if not present in toplevel field, but toplevel wins if both present
assert_extracted_id(
r#"{"event": "e", "properties":{"distinct_id": "myid"}}"#,
"myid",
);
assert_extracted_id(
r#"{"event": "e", "distinct_id": 23, "properties":{"distinct_id": "myid"}}"#,
"23",
);

// Numbers are stringified
assert_extracted_id(r#"{"event": "e", "distinct_id": 23}"#, "23");
assert_extracted_id(r#"{"event": "e", "distinct_id": 23.4}"#, "23.4");

// Containers are stringified
assert_extracted_id(
r#"{"event": "e", "distinct_id": ["a", "b"]}"#,
r#"["a","b"]"#,
);
assert_extracted_id(
r#"{"event": "e", "distinct_id": {"string": "a", "number": 3}}"#,
r#"{"number":3,"string":"a"}"#,
);
}

#[test]
fn extract_distinct_id_trims_to_200_chars() {
let distinct_id: String = rand::thread_rng()
.sample_iter(Alphanumeric)
.take(222)
.map(char::from)
.collect();
let (expected_distinct_id, _) = distinct_id.split_at(200); // works because ascii chars only
let input = json!([{
"token": "mytoken",
"event": "myevent",
"distinct_id": distinct_id
}]);

let parsed = RawEvent::from_bytes(input.to_string().into()).expect("failed to parse");
assert_eq!(
parsed[0].extract_distinct_id().expect("failed to extract"),
expected_distinct_id
);
}
}