From 94c2bb654a8b34aa545455ef65b482d3e46d6a3f Mon Sep 17 00:00:00 2001 From: Trevin Chow Date: Mon, 30 Mar 2026 14:26:03 -0700 Subject: [PATCH] fix(connectors): check consume() return value in sink runtime The connector runtime called consume() on each sink plugin but discarded the i32 return value. Non-zero returns (HTTP timeout, database failure, serialization error) were silently ignored with no logging or error propagation. Capture the return value, log at error level on non-zero, and return Err(RuntimeError::SinkConsumeFailed) so the caller can decide retry semantics. Fixes #2927 Co-Authored-By: Claude Opus 4.6 (1M context) --- core/connectors/runtime/src/error.rs | 3 +++ core/connectors/runtime/src/sink.rs | 7 ++++++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/core/connectors/runtime/src/error.rs b/core/connectors/runtime/src/error.rs index 0f6705faff..02e099c1e3 100644 --- a/core/connectors/runtime/src/error.rs +++ b/core/connectors/runtime/src/error.rs @@ -63,6 +63,8 @@ pub enum RuntimeError { TokenFileReadError(String, String), #[error("Token file is empty: {0}")] TokenFileEmpty(String), + #[error("Sink consume failed for plugin {0} with return code: {1}")] + SinkConsumeFailed(u32, i32), } impl RuntimeError { @@ -78,6 +80,7 @@ impl RuntimeError { RuntimeError::TokenFileNotFound(_) => "invalid_configuration", RuntimeError::TokenFileReadError(_, _) => "invalid_configuration", RuntimeError::TokenFileEmpty(_) => "invalid_configuration", + RuntimeError::SinkConsumeFailed(_, _) => "sink_consume_failed", _ => "error", } } diff --git a/core/connectors/runtime/src/sink.rs b/core/connectors/runtime/src/sink.rs index 48961aca6e..d6567bc797 100644 --- a/core/connectors/runtime/src/sink.rs +++ b/core/connectors/runtime/src/sink.rs @@ -582,7 +582,7 @@ async fn process_messages( RuntimeError::FailedToSerializeRawMessages })?; - (consume)( + let result = (consume)( plugin_id, topic_meta.as_ptr(), topic_meta.len(), @@ -592,5 +592,10 @@ async fn process_messages( messages.len(), ); + if result != 0 { + error!("Sink consume failed for plugin {plugin_id} with return code: {result}"); + return Err(RuntimeError::SinkConsumeFailed(plugin_id, result)); + } + Ok(processed_count) }