diff --git a/Cargo.toml b/Cargo.toml index 8a0c512..eb6b5fc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,5 +22,7 @@ parse_unknown_fields = [] [dev-dependencies] insta = { version = "1.30.0", features = ["yaml"] } +tokio = { version = "1.38.0", features = ["full"] } +tokio-macros = { version = "0.2.0-alpha.6" } hex = "0.4.3" serde_json = "1.0.100" diff --git a/README.md b/README.md index 1376d3f..295df44 100644 --- a/README.md +++ b/README.md @@ -94,3 +94,7 @@ To run: or ```cargo run --example netflow_udp_listener_single_threaded``` + +or + +```cargo run --example netflow_udp_listener_tokio``` diff --git a/RELEASES.md b/RELEASES.md index 00a5908..c661536 100644 --- a/RELEASES.md +++ b/RELEASES.md @@ -1,5 +1,6 @@ # 0.3.6 * Added V9 Post NAT fields 225-228. + * Added Tokio Async Example # 0.3.5 * 3 Byte Data Numbers now correctly converts back to be_bytes. diff --git a/examples/netflow_udp_listener_tokio.rs b/examples/netflow_udp_listener_tokio.rs new file mode 100644 index 0000000..95d9ea1 --- /dev/null +++ b/examples/netflow_udp_listener_tokio.rs @@ -0,0 +1,32 @@ +use std::collections::HashMap; +use std::io; +use tokio::net::UdpSocket; + +use netflow_parser::NetflowParser; + +#[tokio::main] +async fn main() -> io::Result<()> { + let mut parsers: HashMap = HashMap::new(); + + let sock = UdpSocket::bind("0.0.0.0:9995").await?; + + let mut buf = [0; 65535]; + + loop { + let (len, addr) = sock.recv_from(&mut buf).await?; + + let data = buf[..len].to_vec(); + let data = data.as_slice(); + + let result = match parsers.get_mut(&addr.to_string()) { + Some(parser) => parser.parse_bytes(data), + None => { + let mut new_parser = NetflowParser::default(); + let result = new_parser.parse_bytes(data); + parsers.insert(addr.to_string(), new_parser); + result + } + }; + println!("{:?}", result); + } +} diff --git a/src/lib.rs b/src/lib.rs index 60854ab..ce2eb2e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -92,6 +92,10 @@ //! or //! //! ```cargo run --example netflow_udp_listener_single_threaded``` +//! +//! or +//! +//! ```cargo run --example netflow_udp_listener_tokio``` mod parser; pub mod protocol; diff --git a/src/variable_versions/ipfix.rs b/src/variable_versions/ipfix.rs index 380911b..cd27f4e 100644 --- a/src/variable_versions/ipfix.rs +++ b/src/variable_versions/ipfix.rs @@ -121,14 +121,14 @@ pub struct FlowSetBody { pub options_template: Option, // Data #[nom( - Cond = "id > SET_MIN_RANGE && parser.templates.get(&id).is_some()", + Cond = "id > SET_MIN_RANGE && parser.templates.contains_key(&id)", Parse = "{ |i| Data::parse(i, parser, id) }" )] #[serde(skip_serializing_if = "Option::is_none")] pub data: Option, // OptionsData #[nom( - Cond = "id > SET_MIN_RANGE && parser.options_templates.get(&id).is_some()", + Cond = "id > SET_MIN_RANGE && parser.options_templates.contains_key(&id)", Parse = "{ |i| OptionsData::parse(i, parser, id) }" )] #[serde(skip_serializing_if = "Option::is_none")] diff --git a/src/variable_versions/v9.rs b/src/variable_versions/v9.rs index 6b5e19b..3f366f8 100644 --- a/src/variable_versions/v9.rs +++ b/src/variable_versions/v9.rs @@ -121,14 +121,14 @@ pub struct FlowSetBody { pub options_templates: Option>, // Options Data #[nom( - Cond = "flow_set_id > FLOW_SET_MIN_RANGE && parser.options_templates.get(&flow_set_id).is_some()", + Cond = "flow_set_id > FLOW_SET_MIN_RANGE && parser.options_templates.contains_key(&flow_set_id)", Parse = "{ |i| OptionsData::parse(i, parser, flow_set_id) }" )] #[serde(skip_serializing_if = "Option::is_none")] pub options_data: Option, // Data #[nom( - Cond = "flow_set_id > FLOW_SET_MIN_RANGE && parser.templates.get(&flow_set_id).is_some()", + Cond = "flow_set_id > FLOW_SET_MIN_RANGE && parser.templates.contains_key(&flow_set_id)", Parse = "{ |i| Data::parse(i, parser, flow_set_id) }" )] #[serde(skip_serializing_if = "Option::is_none")]