From dd5fa6d8f88bf2dd3d4498929ecc06ce28661cc1 Mon Sep 17 00:00:00 2001 From: Night fury Date: Mon, 1 Sep 2025 18:32:33 +0300 Subject: [PATCH 1/3] feat: integrated otlp tracing for FIX messages --- src/server/fix/server.rs | 37 +++++- src/server/fix/server_plugin.rs | 53 ++++++-- src/server/server.rs | 208 +++++++++++++++++++++++++++++- src/solver/index_order_manager.rs | 2 + src/solver/solver.rs | 9 +- 5 files changed, 292 insertions(+), 17 deletions(-) diff --git a/src/server/fix/server.rs b/src/server/fix/server.rs index 812a01c7..580b4566 100644 --- a/src/server/fix/server.rs +++ b/src/server/fix/server.rs @@ -5,8 +5,8 @@ use eyre::Result; use symm_core::core::functional::{IntoObservableManyVTable, NotificationHandler}; use crate::server::{ - fix::server_plugin::ServerPlugin, fix::rate_limit_config::FixRateLimitConfig, + fix::server_plugin::ServerPlugin, server::{Server as ServerInterface, ServerEvent, ServerResponse}, }; @@ -18,7 +18,7 @@ impl Server { pub fn new() -> Self { Self::new_with_rate_limiting(FixRateLimitConfig::default()) } - + pub fn new_with_rate_limiting(rate_limit_config: FixRateLimitConfig) -> Self { Self { inner: AxumFixServer::new(ServerPlugin::new(rate_limit_config)), @@ -43,6 +43,39 @@ impl IntoObservableManyVTable> for Server { impl ServerInterface for Server { fn respond_with(&mut self, response: ServerResponse) { + // Pull identifiers in one place + let (chain_id_opt, address_opt, client_order_id_opt, client_quote_id_opt) = + response.telemetry_ids(); + + // Materialize owned strings so &str borrows are stable during the event! macro + let chain_attr: i64 = chain_id_opt.map(|v| v as i64).unwrap_or(-1); + + let address_str: String = match &address_opt { + Some(a) => format!("{}", a), + None => String::from("none"), + }; + + let client_order_id_str: String = match &client_order_id_opt { + Some(coid) => String::from(coid.as_str()), + None => String::from("none"), + }; + + let client_quote_id_str: String = match &client_quote_id_opt { + Some(cqid) => String::from(cqid.as_str()), + None => String::from("none"), + }; + + // Emit the OTLP-friendly event with stable &str borrows + tracing::event!( + tracing::Level::INFO, + otlp_kind = "fix_response", + chain_id = chain_attr, + address = address_str.as_str(), + client_order_id = client_order_id_str.as_str(), + client_quote_id = client_quote_id_str.as_str() + ); + + // Send the FIX response; warn (don't panic) on failure if let Err(err) = self.inner.send_response(response) { tracing::warn!("Failed to respond with: {:?}", err); } diff --git a/src/server/fix/server_plugin.rs b/src/server/fix/server_plugin.rs index 15600373..4afa2bfc 100644 --- a/src/server/fix/server_plugin.rs +++ b/src/server/fix/server_plugin.rs @@ -65,7 +65,10 @@ impl ServerPlugin { self.session_comp_ids .write() .map_err(|e| eyre!("Failed to access session comp IDs: {:?}", e))? - .insert(session_id.clone(), (String::from(sender), String::from(target))); + .insert( + session_id.clone(), + (String::from(sender), String::from(target)), + ); Ok(()) } @@ -111,7 +114,11 @@ impl ServerPlugin { nak.set_seq_num(self.seq_num_plugin.next_seq_num(session_id)); if let Err(e) = self.apply_swapped_comp_ids(&mut nak.standard_header, session_id) { - return self.process_error(user_id, format!("Failed to access session comp IDs: {:?}", e), session_id); + return self.process_error( + user_id, + format!("Failed to access session comp IDs: {:?}", e), + session_id, + ); } self.serde_plugin.process_outgoing(nak) @@ -141,7 +148,11 @@ impl ServerPlugin { ack.set_seq_num(self.seq_num_plugin.next_seq_num(session_id)); if let Err(e) = self.apply_swapped_comp_ids(&mut ack.standard_header, session_id) { - return self.process_error(user_id, format!("Failed to access session comp IDs: {:?}", e), session_id); + return self.process_error( + user_id, + format!("Failed to access session comp IDs: {:?}", e), + session_id, + ); } self.serde_plugin.process_outgoing(ack) @@ -442,7 +453,9 @@ impl ServerPlugin { ServerResponse::MintInvoice { chain_id, address, + client_order_id: _, mint_invoice, + timestamp: _, } => { let chain_id = *chain_id; let address = address.clone(); @@ -682,8 +695,16 @@ impl AxumFixServerPlugin for ServerPlugin { match self.serde_plugin.process_incoming(message, session_id) { Ok(result) => { let user_id = WithRateLimitPlugin::get_user_id(&result); - if let Err(e) = self.remember_comp_ids(session_id, &result.standard_header.sender_comp_id, &result.standard_header.target_comp_id) { - return self.process_error(&user_id, format!("Failed to access session comp IDs: {:?}", e), session_id); + if let Err(e) = self.remember_comp_ids( + session_id, + &result.standard_header.sender_comp_id, + &result.standard_header.target_comp_id, + ) { + return self.process_error( + &user_id, + format!("Failed to access session comp IDs: {:?}", e), + session_id, + ); } // verify signature before proceed anything match result.verify_signature() { @@ -697,8 +718,14 @@ impl AxumFixServerPlugin for ServerPlugin { err_msg, ); nak.set_seq_num(self.seq_num_plugin.next_seq_num(session_id)); - if let Err(e) = self.apply_swapped_comp_ids(&mut nak.standard_header, session_id) { - return self.process_error(&user_id, format!("Failed to access session comp IDs: {:?}", e), session_id); + if let Err(e) = + self.apply_swapped_comp_ids(&mut nak.standard_header, session_id) + { + return self.process_error( + &user_id, + format!("Failed to access session comp IDs: {:?}", e), + session_id, + ); } return self.serde_plugin.process_outgoing(nak); } @@ -707,8 +734,16 @@ impl AxumFixServerPlugin for ServerPlugin { self.user_plugin.add_add_user_session(&user_id, session_id); - if let Err(e) = self.remember_comp_ids(session_id, &result.standard_header.sender_comp_id, &result.standard_header.target_comp_id) { - return self.process_error(&user_id, format!("Failed to access session comp IDs: {:?}", e), session_id); + if let Err(e) = self.remember_comp_ids( + session_id, + &result.standard_header.sender_comp_id, + &result.standard_header.target_comp_id, + ) { + return self.process_error( + &user_id, + format!("Failed to access session comp IDs: {:?}", e), + session_id, + ); } // message-specific rate limiting diff --git a/src/server/server.rs b/src/server/server.rs index 93369b5e..392c135e 100644 --- a/src/server/server.rs +++ b/src/server/server.rs @@ -144,43 +144,78 @@ pub enum ServerResponseReason { } #[derive(Error, Debug)] +pub enum NewQuoteRequestNakReason { + #[error("Duplicate client quote ID: {detail:?}")] + DuplicateClientQuoteId { detail: String }, + + #[error("Invalid symbol: {detail:?}")] + InvalidSymbol { detail: String }, + + #[error("Other reason: {detail:?}")] + OtherReason { detail: String }, +} + +#[derive(Error, Debug, WithBaggage)] pub enum ServerResponse { + // ──────── Orders ──────── #[error("NewIndexOrder: ACK [{chain_id}:{address}] {client_order_id} {timestamp}")] NewIndexOrderAck { + #[baggage] chain_id: u32, + #[baggage] address: Address, + #[baggage] client_order_id: ClientOrderId, timestamp: DateTime, }, + #[error("NewIndexOrder: NAK [{chain_id}:{address}] {client_order_id} {timestamp}: {reason:?}")] NewIndexOrderNak { + #[baggage] chain_id: u32, + #[baggage] address: Address, + #[baggage] client_order_id: ClientOrderId, reason: ServerResponseReason, timestamp: DateTime, }, + #[error("CancelIndexOrder: ACK [{chain_id}:{address}] {client_order_id} {timestamp}")] CancelIndexOrderAck { + #[baggage] chain_id: u32, + #[baggage] address: Address, + #[baggage] client_order_id: ClientOrderId, timestamp: DateTime, }, + #[error( "CancelIndexOrder: NAK [{chain_id}:{address}] {client_order_id} {timestamp}: {reason:?}" )] CancelIndexOrderNak { + #[baggage] chain_id: u32, + #[baggage] address: Address, + #[baggage] client_order_id: ClientOrderId, reason: ServerResponseReason, timestamp: DateTime, }, - #[error("IndexOrderFill: [{chain_id}:{address}] {client_order_id} {timestamp}: {filled_quantity} {collateral_spent} {collateral_remaining}")] + + /// Order filled (partial or full) + #[error( + "IndexOrderFill: [{chain_id}:{address}] {client_order_id} {timestamp} status={status}" + )] IndexOrderFill { + #[baggage] chain_id: u32, + #[baggage] address: Address, + #[baggage] client_order_id: ClientOrderId, filled_quantity: Amount, collateral_spent: Amount, @@ -189,54 +224,217 @@ pub enum ServerResponse { status: String, timestamp: DateTime, }, - #[error("MintInvoice: [{chain_id}:{address}]")] + + /// Mint invoice issued for an order + #[error("MintInvoice: [{chain_id}:{address}] {client_order_id} {timestamp}")] MintInvoice { + #[baggage] chain_id: u32, + #[baggage] address: Address, + #[baggage] + client_order_id: ClientOrderId, mint_invoice: MintInvoice, + timestamp: DateTime, }, + + // ──────── Quotes ──────── #[error("NewIndexQuote: ACK [{chain_id}:{address}] {client_quote_id} {timestamp}")] NewIndexQuoteAck { + #[baggage] chain_id: u32, + #[baggage] address: Address, + #[baggage] client_quote_id: ClientQuoteId, timestamp: DateTime, }, + #[error("NewIndexQuote: NAK [{chain_id}:{address}] {client_quote_id} {timestamp}: {reason:?}")] NewIndexQuoteNak { + #[baggage] chain_id: u32, + #[baggage] address: Address, + #[baggage] client_quote_id: ClientQuoteId, reason: ServerResponseReason, timestamp: DateTime, }, - #[error("IndexOrderResponse: [{chain_id}:{address}] {client_quote_id} {timestamp}: {quantity_possible}")] - IndexQuoteResponse { + + #[error("NewQuoteRequest: ACK [{chain_id}:{address}] {client_quote_id} {timestamp}")] + NewQuoteRequestAck { + #[baggage] chain_id: u32, + #[baggage] address: Address, + #[baggage] client_quote_id: ClientQuoteId, - quantity_possible: Amount, timestamp: DateTime, }, + + #[error( + "NewQuoteRequest: NAK [{chain_id}:{address}] {client_quote_id} {timestamp}: {reason:?}" + )] + NewQuoteRequestNak { + #[baggage] + chain_id: u32, + #[baggage] + address: Address, + #[baggage] + client_quote_id: ClientQuoteId, + reason: ServerResponseReason, + timestamp: DateTime, + }, + #[error("CancelIndexQuote: ACK [{chain_id}:{address}] {client_quote_id} {timestamp}")] CancelIndexQuoteAck { + #[baggage] chain_id: u32, + #[baggage] address: Address, + #[baggage] client_quote_id: ClientQuoteId, timestamp: DateTime, }, + #[error( "CancelIndexQuote: NAK [{chain_id}:{address}] {client_quote_id} {timestamp}: {reason:?}" )] CancelIndexQuoteNak { + #[baggage] chain_id: u32, + #[baggage] address: Address, + #[baggage] client_quote_id: ClientQuoteId, reason: ServerResponseReason, timestamp: DateTime, }, + + /// Quote response (with max executable size) + #[error("IndexQuoteResponse: [{chain_id}:{address}] {client_quote_id} {timestamp}")] + IndexQuoteResponse { + #[baggage] + chain_id: u32, + #[baggage] + address: Address, + #[baggage] + client_quote_id: ClientQuoteId, + quantity_possible: Amount, + timestamp: DateTime, + }, } +impl ServerResponse { + /// Returns (chain_id, address, client_order_id, client_quote_id). + /// Clone non-Copy IDs to avoid moving out of &self. + pub fn telemetry_ids( + &self, + ) -> (Option, Option
, Option, Option) { + match self { + // ── Order-side variants (carry client_order_id) + ServerResponse::NewIndexOrderAck { + chain_id, + address, + client_order_id, + .. + } + | ServerResponse::NewIndexOrderNak { + chain_id, + address, + client_order_id, + .. + } + | ServerResponse::CancelIndexOrderAck { + chain_id, + address, + client_order_id, + .. + } + | ServerResponse::CancelIndexOrderNak { + chain_id, + address, + client_order_id, + .. + } + | ServerResponse::IndexOrderFill { + chain_id, + address, + client_order_id, + .. + } => ( + Some(*chain_id), + Some(address.clone()), + Some(client_order_id.clone()), + None, + ), + + // MintInvoice carries the order id inside the payload + ServerResponse::MintInvoice { + chain_id, + address, + mint_invoice, + .. + } => ( + Some(*chain_id), + Some(address.clone()), + Some(mint_invoice.client_order_id.clone()), + None, + ), + + // ── Quote-side variants (carry client_quote_id) + // Support both naming schemes: NewIndexQuote* and NewQuoteRequest* + ServerResponse::NewIndexQuoteAck { + chain_id, + address, + client_quote_id, + .. + } + | ServerResponse::NewIndexQuoteNak { + chain_id, + address, + client_quote_id, + .. + } + | ServerResponse::NewQuoteRequestAck { + chain_id, + address, + client_quote_id, + .. + } + | ServerResponse::NewQuoteRequestNak { + chain_id, + address, + client_quote_id, + .. + } + | ServerResponse::CancelIndexQuoteAck { + chain_id, + address, + client_quote_id, + .. + } + | ServerResponse::CancelIndexQuoteNak { + chain_id, + address, + client_quote_id, + .. + } + | ServerResponse::IndexQuoteResponse { + chain_id, + address, + client_quote_id, + .. + } => ( + Some(*chain_id), + Some(address.clone()), + None, + Some(client_quote_id.clone()), + ), + } + } +} pub trait Server: IntoObservableManyVTable> + Send + Sync { /// Provide methods for sending FIX responses fn respond_with(&mut self, response: ServerResponse); diff --git a/src/solver/index_order_manager.rs b/src/solver/index_order_manager.rs index fa1ef8b1..c75b01eb 100644 --- a/src/solver/index_order_manager.rs +++ b/src/solver/index_order_manager.rs @@ -724,7 +724,9 @@ impl IndexOrderManager { ServerResponse::MintInvoice { chain_id, address: *address, + client_order_id: client_order_id.clone(), mint_invoice, + timestamp: timestamp, } }); diff --git a/src/solver/solver.rs b/src/solver/solver.rs index f1db20f0..46def9a2 100644 --- a/src/solver/solver.rs +++ b/src/solver/solver.rs @@ -709,7 +709,12 @@ impl Solver { reason, timestamp, } => { - tracing::info!("(solver) Chain {} disconnected at {}: {}", chain_id, reason, timestamp); + tracing::info!( + "(solver) Chain {} disconnected at {}: {}", + chain_id, + reason, + timestamp + ); Ok(()) } } @@ -2123,7 +2128,9 @@ mod test { ServerResponse::MintInvoice { chain_id, address, + client_order_id: _, mint_invoice, + timestamp: _, } => { tracing::info!( "(mock) FIX Mint Invoice: [{}:{}] {} {} {:0.5} {:0.5} {:0.5} {:0.5}", From c6ce242e61640803e9664be678641f6e1f2677eb Mon Sep 17 00:00:00 2001 From: Sadhbh Code Date: Wed, 8 Oct 2025 16:27:00 +0100 Subject: [PATCH 2/3] Use TraceableEvent for ServerResponse sending --- src/server/fix/server.rs | 52 ++++---------- src/server/server.rs | 109 ------------------------------ src/solver/index_order_manager.rs | 2 + 3 files changed, 17 insertions(+), 146 deletions(-) diff --git a/src/server/fix/server.rs b/src/server/fix/server.rs index 8da27d25..cc8a2893 100644 --- a/src/server/fix/server.rs +++ b/src/server/fix/server.rs @@ -2,7 +2,10 @@ use std::sync::Arc; use axum_fix_server::server::Server as AxumFixServer; use eyre::Result; -use symm_core::core::functional::{IntoObservableManyVTable, NotificationHandler}; +use symm_core::core::{ + functional::{IntoObservableManyVTable, NotificationHandler}, + telemetry::{TraceableEvent, TracingData, WithBaggage, WithTracingContext}, +}; use crate::server::{ fix::rate_limit_config::FixRateLimitConfig, @@ -43,42 +46,17 @@ impl IntoObservableManyVTable> for Server { impl ServerInterface for Server { fn respond_with(&mut self, response: ServerResponse) { - // Pull identifiers in one place - let (chain_id_opt, address_opt, client_order_id_opt, client_quote_id_opt) = - response.telemetry_ids(); - - // Materialize owned strings so &str borrows are stable during the event! macro - let chain_attr: i64 = chain_id_opt.map(|v| v as i64).unwrap_or(-1); - - let address_str: String = match &address_opt { - Some(a) => format!("{}", a), - None => String::from("none"), - }; - - let client_order_id_str: String = match &client_order_id_opt { - Some(coid) => String::from(coid.as_str()), - None => String::from("none"), - }; - - let client_quote_id_str: String = match &client_quote_id_opt { - Some(cqid) => String::from(cqid.as_str()), - None => String::from("none"), - }; - - // Emit the OTLP-friendly event with stable &str borrows - tracing::event!( - tracing::Level::INFO, - otlp_kind = "fix_response", - chain_id = chain_attr, - address = address_str.as_str(), - client_order_id = client_order_id_str.as_str(), - client_quote_id = client_quote_id_str.as_str() - ); - - // Send the FIX response; warn (don't panic) on failure - if let Err(err) = self.inner.send_response(response) { - tracing::warn!("Failed to respond with: {:?}", err); - } + // Inject OTLP context and baggage + let mut traceable_response = TraceableEvent::new(response); + traceable_response.inject_baggage(); + traceable_response.inject_current_context(); + + traceable_response.with_tracing(|response| { + // Send the FIX response; warn (don't panic) on failure + if let Err(err) = self.inner.send_response(response) { + tracing::warn!("Failed to respond with: {:?}", err); + } + }); } fn initialize_shutdown(&mut self) { diff --git a/src/server/server.rs b/src/server/server.rs index 6aca6df0..4595a979 100644 --- a/src/server/server.rs +++ b/src/server/server.rs @@ -326,115 +326,6 @@ pub enum ServerResponse { }, } -impl ServerResponse { - /// Returns (chain_id, address, client_order_id, client_quote_id). - /// Clone non-Copy IDs to avoid moving out of &self. - pub fn telemetry_ids( - &self, - ) -> (Option, Option
, Option, Option) { - match self { - // ── Order-side variants (carry client_order_id) - ServerResponse::NewIndexOrderAck { - chain_id, - address, - client_order_id, - .. - } - | ServerResponse::NewIndexOrderNak { - chain_id, - address, - client_order_id, - .. - } - | ServerResponse::CancelIndexOrderAck { - chain_id, - address, - client_order_id, - .. - } - | ServerResponse::CancelIndexOrderNak { - chain_id, - address, - client_order_id, - .. - } - | ServerResponse::IndexOrderFill { - chain_id, - address, - client_order_id, - .. - } => ( - Some(*chain_id), - Some(address.clone()), - Some(client_order_id.clone()), - None, - ), - - // MintInvoice carries the order id inside the payload - ServerResponse::MintInvoice { - chain_id, - address, - mint_invoice, - .. - } => ( - Some(*chain_id), - Some(address.clone()), - Some(mint_invoice.client_order_id.clone()), - None, - ), - - // ── Quote-side variants (carry client_quote_id) - // Support both naming schemes: NewIndexQuote* and NewQuoteRequest* - ServerResponse::NewIndexQuoteAck { - chain_id, - address, - client_quote_id, - .. - } - | ServerResponse::NewIndexQuoteNak { - chain_id, - address, - client_quote_id, - .. - } - | ServerResponse::NewQuoteRequestAck { - chain_id, - address, - client_quote_id, - .. - } - | ServerResponse::NewQuoteRequestNak { - chain_id, - address, - client_quote_id, - .. - } - | ServerResponse::CancelIndexQuoteAck { - chain_id, - address, - client_quote_id, - .. - } - | ServerResponse::CancelIndexQuoteNak { - chain_id, - address, - client_quote_id, - .. - } - | ServerResponse::IndexQuoteResponse { - chain_id, - address, - client_quote_id, - .. - } => ( - Some(*chain_id), - Some(address.clone()), - None, - Some(client_quote_id.clone()), - ), - } - } -} pub trait Server: IntoObservableManyVTable> + Send + Sync { /// Provide methods for sending FIX responses fn respond_with(&mut self, response: ServerResponse); diff --git a/src/solver/index_order_manager.rs b/src/solver/index_order_manager.rs index 5b3a4e32..aac0a20a 100644 --- a/src/solver/index_order_manager.rs +++ b/src/solver/index_order_manager.rs @@ -801,7 +801,9 @@ impl IndexOrderManager { ServerResponse::MintInvoice { chain_id, address: *address, + client_order_id: client_order_id.clone(), mint_invoice, + timestamp, } }); From 6391626f363cd99643d232ca7d13021d147e3a96 Mon Sep 17 00:00:00 2001 From: Sadhbh Code Date: Wed, 8 Oct 2025 17:53:38 +0100 Subject: [PATCH 3/3] Refactor: Removed locks from Server --- examples/fix_server/example_plugin.rs | 2 +- .../src/plugins/user_plugin.rs | 2 +- libs/axum-fix-server/src/server.rs | 31 ++--- libs/axum-fix-server/src/server_state.rs | 37 +++--- libs/axum-fix-server/src/session.rs | 14 +-- src/server/fix/responses.rs | 14 +-- src/server/fix/server.rs | 16 +-- src/server/fix/server_plugin.rs | 115 +++++++++--------- 8 files changed, 104 insertions(+), 127 deletions(-) diff --git a/examples/fix_server/example_plugin.rs b/examples/fix_server/example_plugin.rs index 4843a1c9..c8e62bab 100644 --- a/examples/fix_server/example_plugin.rs +++ b/examples/fix_server/example_plugin.rs @@ -77,7 +77,7 @@ where match self.serde_plugin.process_incoming(message, session_id) { Ok(result) => { let user_id = &result.get_user_id(); - self.user_plugin.add_add_user_session(&user_id, session_id); + self.user_plugin.add_user_session(&user_id, session_id); let seq_num = result.get_seq_num(); // Ensure R has this method or adjust accordingly if self.seq_num_plugin.valid_seq_num(seq_num, session_id) { diff --git a/libs/axum-fix-server/src/plugins/user_plugin.rs b/libs/axum-fix-server/src/plugins/user_plugin.rs index 4c2f66d8..e888012f 100644 --- a/libs/axum-fix-server/src/plugins/user_plugin.rs +++ b/libs/axum-fix-server/src/plugins/user_plugin.rs @@ -19,7 +19,7 @@ impl UserPlugin { } } - pub fn add_add_user_session(&self, user_id: &(u32, Address), session_id: &SessionId) { + pub fn add_user_session(&self, user_id: &(u32, Address), session_id: &SessionId) { let mut write_lock = self.users_sessions.write().unwrap(); write_lock .entry(*user_id) diff --git a/libs/axum-fix-server/src/server.rs b/libs/axum-fix-server/src/server.rs index d4a07cfc..f423e47b 100644 --- a/libs/axum-fix-server/src/server.rs +++ b/libs/axum-fix-server/src/server.rs @@ -14,7 +14,6 @@ use axum::{ use eyre::{eyre, Result}; use futures_util::future::join_all; use itertools::Itertools; -use parking_lot::RwLock; use std::net::SocketAddr; /// Server @@ -25,7 +24,7 @@ use std::net::SocketAddr; /// The `Plugin` consumes the message and is responsible for deserialization, /// message validation (fields, seqnum, signatures), publishing to application, etc. pub struct Server { - server_state: Arc>>, + server_state: Arc>, } impl Server @@ -39,18 +38,12 @@ where /// Initializes the session ID counter to start from 1. pub fn new(plugin: Plugin) -> Self { Self { - server_state: Arc::new(RwLock::new(ServerState::new(plugin))), + server_state: Arc::new(ServerState::new(plugin)), } } pub fn with_plugin(&self, cb: impl FnOnce(&Plugin) -> Ret) -> Ret { - let state = self.server_state.read(); - cb(state.get_plugin()) - } - - pub fn with_plugin_mut(&self, cb: impl FnOnce(&mut Plugin) -> Ret) -> Ret { - let mut state = self.server_state.write(); - cb(state.get_plugin_mut()) + cb(self.server_state.get_plugin()) } /// start_server @@ -82,14 +75,14 @@ where /// /// Closes server for new connections pub fn close_server(&self) { - self.server_state.write().close(); + self.server_state.close(); } /// close_server /// /// Closes all sessions pub async fn stop_server(&self) -> Result<()> { - let sessions = self.server_state.write().close_all_sessions()?; + let sessions = self.server_state.close_all_sessions()?; let stop_futures = sessions.iter().map(|s| s.wait_stopped()).collect_vec(); let (_, failures): (Vec<_>, Vec<_>) = @@ -110,7 +103,7 @@ where /// Sends a response to the appropriate client session based on the session ID in the response. /// Returns a `Result` indicating success or failure if the session is not found. pub fn send_response(&self, response: Response) -> Result<()> { - self.server_state.write().process_outgoing(response) + self.server_state.process_outgoing(response) } } @@ -122,7 +115,7 @@ where /// session is closed by the client, the loop is broken and the session destroyed. async fn ws_handler( ws: WebSocketUpgrade, - State(server_state): State>>>, + State(server_state): State>>, ) -> impl IntoResponse where Response: Send + Sync + 'static, @@ -131,11 +124,11 @@ where let timeout = std::time::Duration::from_secs(10); ws.on_upgrade(async move |mut ws: WebSocket| { - if !server_state.read().is_accepting_connections() { + if !server_state.is_accepting_connections() { return; } - let (mut rx, session) = match server_state.write().create_session() { + let (mut rx, session) = match server_state.create_session() { Err(err) => { tracing::warn!("Failed to create session: {:?}", err); return; @@ -165,9 +158,7 @@ where } }; - let maybe_message = server_state - .read() - .process_incoming(incoming_message, &session_id); + let maybe_message = server_state.process_incoming(incoming_message, &session_id); match maybe_message { Ok(result) => { @@ -195,7 +186,7 @@ where tracing::info!(%session_id, "Closing session"); - if let Err(err) = server_state.write().close_session(session_id) { + if let Err(err) = server_state.close_session(session_id) { tracing::warn!("Failed to close session: {:?}", err); } }) diff --git a/libs/axum-fix-server/src/server_state.rs b/libs/axum-fix-server/src/server_state.rs index fdb019a0..82ef8117 100644 --- a/libs/axum-fix-server/src/server_state.rs +++ b/libs/axum-fix-server/src/server_state.rs @@ -1,11 +1,15 @@ use std::{ collections::{hash_map::Entry, HashMap}, marker::PhantomData, - sync::Arc, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, }; use eyre::{eyre, OptionExt, Result}; use itertools::Itertools; +use parking_lot::RwLock; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; use crate::{messages::SessionId, server_plugin::ServerPlugin, session::Session}; @@ -31,9 +35,9 @@ impl Sessions { pub struct ServerState { _phantom_response: PhantomData, - sessions: Sessions, + sessions: RwLock, plugin: Plugin, - accept_connections: bool, + accept_connections: AtomicBool, } impl ServerState @@ -45,18 +49,18 @@ where Self { _phantom_response: PhantomData::default(), - sessions: Sessions::new(), + sessions: RwLock::new(Sessions::new()), plugin, - accept_connections: true, + accept_connections: AtomicBool::new(true), } } pub fn is_accepting_connections(&self) -> bool { - self.accept_connections + self.accept_connections.load(Ordering::Relaxed) } - pub fn close(&mut self) { - self.accept_connections = false; + pub fn close(&self) { + self.accept_connections.store(false, Ordering::Relaxed); } pub fn get_plugin(&self) -> &Plugin { @@ -68,12 +72,13 @@ where } pub fn get_session_ids(&self) -> Vec { - self.sessions.sessions.keys().cloned().collect_vec() + self.sessions.read().sessions.keys().cloned().collect_vec() } - pub fn create_session(&mut self) -> Result<(UnboundedReceiver, Arc)> { - let session_id = self.sessions.gen_next_session_id(); + pub fn create_session(&self) -> Result<(UnboundedReceiver, Arc)> { + let mut sessions_write = self.sessions.write(); + let session_id = sessions_write.gen_next_session_id(); - match self.sessions.sessions.entry(session_id.clone()) { + match sessions_write.sessions.entry(session_id.clone()) { Entry::Occupied(_) => Err(eyre!("Session {} already exists", session_id)), Entry::Vacant(vacant_entry) => { let (tx, rx) = unbounded_channel(); @@ -86,10 +91,12 @@ where } } - pub fn close_session(&mut self, session_id: SessionId) -> Result> { + pub fn close_session(&self, session_id: SessionId) -> Result> { self.plugin.destroy_session(&session_id)?; + let session = self .sessions + .write() .sessions .remove(&session_id) .ok_or_eyre("Session no longer exists")?; @@ -98,7 +105,7 @@ where Ok(session) } - pub fn close_all_sessions(&mut self) -> Result>> { + pub fn close_all_sessions(&self) -> Result>> { let (good, bad): (Vec<_>, Vec<_>) = self .get_session_ids() .into_iter() @@ -128,8 +135,10 @@ where .map(|(sid, resp)| { let session = self .sessions + .read() .sessions .get(&sid) + .cloned() .ok_or_else(|| eyre!("Session {} not found", sid))?; session.send_response(resp) diff --git a/libs/axum-fix-server/src/session.rs b/libs/axum-fix-server/src/session.rs index cbcaa895..5565b399 100644 --- a/libs/axum-fix-server/src/session.rs +++ b/libs/axum-fix-server/src/session.rs @@ -1,18 +1,8 @@ -use std::sync::Arc; - -use axum::extract::ws::{Message, WebSocket}; -use chrono::Utc; use eyre::{eyre, Result}; -use futures_util::TryFutureExt; -use itertools::Itertools; -use tokio::{ - select, - sync::mpsc::{UnboundedReceiver, UnboundedSender}, - time::sleep, -}; +use tokio::sync::mpsc::UnboundedSender; use tokio_util::sync::CancellationToken; -use crate::{messages::SessionId, server_plugin::ServerPlugin, server_state::ServerState}; +use crate::messages::SessionId; /// Session /// diff --git a/src/server/fix/responses.rs b/src/server/fix/responses.rs index 2a64a259..e0fd50e9 100644 --- a/src/server/fix/responses.rs +++ b/src/server/fix/responses.rs @@ -159,12 +159,7 @@ mod tests { // Test ACK response let ack_response = FixResponse::create_ack(&user_id, &session_id, 123); - assert_eq!( - axum_fix_server::plugins::rate_limit_plugin::WithRateLimitPlugin::get_user_id( - &ack_response - ), - user_id - ); + assert_eq!(WithRateLimitPlugin::get_user_id(&ack_response), user_id); assert_eq!( ack_response.get_rate_limit_key(), RateLimitKey::User(user_id.0, user_id.1) @@ -180,12 +175,7 @@ mod tests { "Rate limit exceeded".to_string(), ); - assert_eq!( - axum_fix_server::plugins::rate_limit_plugin::WithRateLimitPlugin::get_user_id( - &nak_response - ), - user_id - ); + assert_eq!(WithRateLimitPlugin::get_user_id(&nak_response), user_id); assert_eq!( nak_response.get_rate_limit_key(), RateLimitKey::User(user_id.0, user_id.1) diff --git a/src/server/fix/server.rs b/src/server/fix/server.rs index cc8a2893..c80b5959 100644 --- a/src/server/fix/server.rs +++ b/src/server/fix/server.rs @@ -4,7 +4,7 @@ use axum_fix_server::server::Server as AxumFixServer; use eyre::Result; use symm_core::core::{ functional::{IntoObservableManyVTable, NotificationHandler}, - telemetry::{TraceableEvent, TracingData, WithBaggage, WithTracingContext}, + telemetry::{TraceableEvent, WithTracingContext}, }; use crate::server::{ @@ -14,7 +14,7 @@ use crate::server::{ }; pub struct Server { - inner: AxumFixServer, + inner: AxumFixServer, ServerPlugin>, } impl Server { @@ -40,23 +40,19 @@ impl Server { impl IntoObservableManyVTable> for Server { fn add_observer(&mut self, observer: Box>>) { self.inner - .with_plugin_mut(|plugin| plugin.add_observer(observer)) + .with_plugin(|plugin| plugin.add_observer(observer)) } } impl ServerInterface for Server { fn respond_with(&mut self, response: ServerResponse) { - // Inject OTLP context and baggage let mut traceable_response = TraceableEvent::new(response); traceable_response.inject_baggage(); traceable_response.inject_current_context(); - traceable_response.with_tracing(|response| { - // Send the FIX response; warn (don't panic) on failure - if let Err(err) = self.inner.send_response(response) { - tracing::warn!("Failed to respond with: {:?}", err); - } - }); + if let Err(err) = self.inner.send_response(traceable_response) { + tracing::warn!("Failed to respond with: {:?}", err); + } } fn initialize_shutdown(&mut self) { diff --git a/src/server/fix/server_plugin.rs b/src/server/fix/server_plugin.rs index 7fc8e0e2..500d02e6 100644 --- a/src/server/fix/server_plugin.rs +++ b/src/server/fix/server_plugin.rs @@ -1,6 +1,6 @@ use std::{ collections::{HashMap, HashSet}, - sync::{Arc, RwLock}, + sync::Arc, }; use crate::server::{ @@ -26,22 +26,23 @@ use axum_fix_server::{ }; use chrono::Utc; use eyre::{eyre, Context, Result}; -use itertools::Itertools; use k256::elliptic_curve::generic_array::GenericArray; use k256::{ ecdsa::{signature::DigestSigner, Signature, SigningKey}, FieldBytes, }; +use parking_lot::RwLock; use sha2::{Digest, Sha256}; use symm_core::core::{ bits::{Address, Amount, ClientOrderId, ClientQuoteId, Side, Symbol}, functional::{IntoObservableManyVTable, NotificationHandler}, + telemetry::TraceableEvent, }; use tracing::warn; // A composite plugin that can wrap other plugins and delegate functionality. pub struct ServerPlugin { - observer_plugin: ObserverPlugin>, + observer_plugin: RwLock>>, serde_plugin: SerdePlugin, seq_num_plugin: SeqNumPlugin, user_plugin: UserPlugin, @@ -53,7 +54,7 @@ pub struct ServerPlugin { impl ServerPlugin { pub fn new(rate_limit_config: FixRateLimitConfig) -> Self { Self { - observer_plugin: ObserverPlugin::new(), + observer_plugin: RwLock::new(ObserverPlugin::new()), serde_plugin: SerdePlugin::new(), seq_num_plugin: SeqNumPlugin::new(), user_plugin: UserPlugin::new(), @@ -63,23 +64,15 @@ impl ServerPlugin { } fn remember_comp_ids(&self, session_id: &SessionId, sender: &str, target: &str) -> Result<()> { - self.session_comp_ids - .write() - .map_err(|e| eyre!("Failed to access session comp IDs: {:?}", e))? - .insert( - session_id.clone(), - (String::from(sender), String::from(target)), - ); + self.session_comp_ids.write().insert( + session_id.clone(), + (String::from(sender), String::from(target)), + ); Ok(()) } fn apply_swapped_comp_ids(&self, header: &mut FixHeader, session_id: &SessionId) -> Result<()> { - let guard = self - .session_comp_ids - .read() - .map_err(|e| eyre!("Failed to access session comp IDs: {:?}", e))?; - - if let Some((sender, target)) = guard.get(session_id).cloned() { + if let Some((sender, target)) = self.session_comp_ids.read().get(session_id).cloned() { header.add_sender(sender); header.add_target(target); } else { @@ -93,11 +86,7 @@ impl ServerPlugin { self.rate_limit_plugin.destroy_session(session_id)?; self.user_plugin.remove_session(session_id); self.seq_num_plugin.destroy_session(session_id)?; - if let Ok(mut guard) = self.session_comp_ids.write() { - guard.remove(session_id); - } else { - warn!("Failed to access session comp IDs : {:?}", session_id); - } + self.session_comp_ids.write().remove(session_id); Ok(()) } @@ -633,15 +622,13 @@ impl ServerPlugin { standard_trailer: trailer, }) } -} -impl IntoObservableManyVTable> for ServerPlugin { - fn add_observer(&mut self, observer: Box>>) { - self.observer_plugin.add_observer(observer); + pub fn add_observer(&self, observer: Box>>) { + self.observer_plugin.write().add_observer(observer); } } -impl AxumFixServerPlugin for ServerPlugin { +impl AxumFixServerPlugin> for ServerPlugin { fn process_incoming(&self, message: String, session_id: &SessionId) -> Result { // pre-process rate limit check if let Err(rate_error) = self.rate_limit_plugin.check_rate_limits( @@ -691,7 +678,7 @@ impl AxumFixServerPlugin for ServerPlugin { } // end verification part - self.user_plugin.add_add_user_session(&user_id, session_id); + self.user_plugin.add_user_session(&user_id, session_id); if let Err(e) = self.remember_comp_ids( session_id, @@ -722,9 +709,17 @@ impl AxumFixServerPlugin for ServerPlugin { error_msg.unwrap_or_else(|_| "Failed to process error".to_string()) ) })?; - self.observer_plugin.publish_request(&Arc::new(event)); - self.process_ack(&user_id, session_id) - .map_err(|e| eyre::eyre!("Process ACK failed: {}", e)) + // We immediatelly produce ACK + let ack = self.process_ack(&user_id, session_id) + .map_err(|e| eyre::eyre!("Process ACK failed: {}", e)); + + // And we defer application processing for later time + self.observer_plugin + .read() + .publish_request(&Arc::new(event)); + + // We return ACK in all cases, and observer can Cancel if error happens + ack } else { let error_msg = format!( "Invalid sequence number: {}; Last valid: {}", @@ -743,38 +738,44 @@ impl AxumFixServerPlugin for ServerPlugin { } } - fn process_outgoing(&self, response: ServerResponse) -> Result> { - let mut result: HashSet<(SessionId, String)>; - result = HashSet::new(); - - let fix_response = - self.server_response_to_fix_response(&response, &SessionId::from("S-proto"))?; - let user_id = WithUserPlugin::get_user_id(&fix_response); - - if let Ok(sessions) = self.user_plugin.get_user_sessions(&user_id) { - for session in sessions { - let mut fix_response = self.server_response_to_fix_response(&response, &session)?; - fix_response.set_seq_num(self.seq_num_plugin.next_seq_num(&session)); + fn process_outgoing( + &self, + response: TraceableEvent, + ) -> Result> { + response.with_tracing(|response| { + let mut result: HashSet<(SessionId, String)>; + result = HashSet::new(); + + let fix_response = + self.server_response_to_fix_response(&response, &SessionId::from("S-proto"))?; + let user_id = WithUserPlugin::get_user_id(&fix_response); + + if let Ok(sessions) = self.user_plugin.get_user_sessions(&user_id) { + for session in sessions { + let mut fix_response = + self.server_response_to_fix_response(&response, &session)?; + fix_response.set_seq_num(self.seq_num_plugin.next_seq_num(&session)); + + if let Ok(message) = self.serde_plugin.process_outgoing(fix_response) { + result.insert((session, message)); + } else { + return Err(eyre::eyre!("Cannot serialize response.")); + } + } + } else { + let session_id = fix_response.get_session_id(); + let mut fix_response = + self.server_response_to_fix_response(&response, &session_id.clone())?; + fix_response.set_seq_num(self.seq_num_plugin.next_seq_num(&session_id.clone())); if let Ok(message) = self.serde_plugin.process_outgoing(fix_response) { - result.insert((session, message)); + result.insert((session_id.clone(), message)); } else { return Err(eyre::eyre!("Cannot serialize response.")); } } - } else { - let session_id = fix_response.get_session_id(); - let mut fix_response = - self.server_response_to_fix_response(&response, &session_id.clone())?; - fix_response.set_seq_num(self.seq_num_plugin.next_seq_num(&session_id.clone())); - - if let Ok(message) = self.serde_plugin.process_outgoing(fix_response) { - result.insert((session_id.clone(), message)); - } else { - return Err(eyre::eyre!("Cannot serialize response.")); - } - } - return Ok(result); + return Ok(result); + }) } fn create_session(&self, session_id: &SessionId) -> Result<()> {