Skip to content

Commit

Permalink
Tests
Browse files Browse the repository at this point in the history
Signed-off-by: Heinz N. Gies <heinz@licenser.net>
  • Loading branch information
Licenser committed Oct 10, 2023
1 parent 61ca961 commit b27e0c5
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 4 deletions.
47 changes: 45 additions & 2 deletions src/preprocessor/kafka_schema_registry_prefix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,61 @@ impl Preprocessor for SchemaRegistryPrefix {
) -> Result<Vec<(Vec<u8>, Value<'static>)>> {
use std::io::Cursor;
if let Some(d) = data.get(8..) {
let magic = Cursor::new(data).read_u32::<BigEndian>()?;
let mut c = Cursor::new(data);
let magic = c.read_u32::<BigEndian>()?;
if magic != 0 {
return Err(format!(
"Invalid magic bytes (0x00000000) for kafka wire format: {magic}"
)
.into());
}
let schema = Cursor::new(data).read_u32::<BigEndian>()?;
let schema = c.read_u32::<BigEndian>()?;
meta.insert("schema_id", schema)?;
Ok(vec![(d.to_vec(), meta)])
} else {
Err("Kafka schema registry Preprocessor: < 8 byte".into())
}
}
}

#[cfg(test)]
mod test {
use super::*;
use value_trait::ValueAccess;

/// Tests if the preprocessor errors on data that's less then 8 bytes
#[test]
fn test_preprocessor_less_then_8_bytes() {
let mut pp = SchemaRegistryPrefix::default();
let mut ingest_ns = 0;
let data = vec![0, 0, 0, 0, 0, 0, 0];
let meta = Value::object();
let res = pp.process(&mut ingest_ns, &data, meta);
assert!(res.is_err());
}

/// Tests if `schema_id` is added to the meta data properly
#[test]
fn test_preprocessor_schema_id() -> Result<()> {
let mut pp = SchemaRegistryPrefix::default();
let mut ingest_ns = 0;
let data = vec![0, 0, 0, 0, 0, 0, 0, 1, 42];
let meta = Value::object();
let mut res = pp.process(&mut ingest_ns, &data, meta)?;
let (rest, meta) = res.pop().expect("no result");
assert_eq!(meta.get_u8("schema_id"), Some(1));
assert_eq!(rest, vec![42]);
Ok(())
}

/// Tests if the preprocessor errors on invalid magic bytes
#[test]
fn test_preprocessor_invalid_magic_bytes() {
let mut pp = SchemaRegistryPrefix::default();
let mut ingest_ns = 0;
let data = vec![0, 0, 0, 1, 0, 0, 0, 1];
let meta = Value::object();
let res = pp.process(&mut ingest_ns, &data, meta);
assert!(res.is_err());
}
}
31 changes: 31 additions & 0 deletions tremor-codec/src/codec/binflux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,4 +263,35 @@ mod test {
"Invalid BInflux Line Protocol data: Unknown type as influx line value: Array"
);
}

#[tokio::test(flavor = "multi_thread")]
async fn encode_decode() {
let mut c = BInflux::default();

let value = literal!({
"measurement": "m",
"tags": {
"t1": "v1",
"t2": "v2"
},
"fields": {
"f1": 42,
"f2": 42.0,
"f3": true,
"f4": false,
"f5": "snot"
},
"timestamp": 42
});
let mut e = c
.encode(&value, &Value::const_null())
.await
.expect("encode");
let (d, _meta) = c
.decode(e.as_mut_slice(), 42, Value::const_null())
.await
.expect("decode")
.expect("decode");
assert_eq!(value, d);
}
}
6 changes: 4 additions & 2 deletions tremor-script/src/ast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -860,7 +860,7 @@ impl<'script> StringLit<'script> {
} else if let Some(_f) = r.as_f64() {
"42".to_string()
} else {
crate::utils::sorted_serialize(&r)?
String::from_utf8(tremor_value::utils::sorted_serialize(&r)?)?
}
}
};
Expand Down Expand Up @@ -888,7 +888,9 @@ impl<'script> StringLit<'script> {
} else if let Some(_f) = r.as_f64() {
res.push_str("42");
} else {
res.push_str(&crate::utils::sorted_serialize(&r)?);
res.push_str(&String::from_utf8(
tremor_value::utils::sorted_serialize(&r)?,
)?);
}
}
};
Expand Down

0 comments on commit b27e0c5

Please sign in to comment.