From fc7ca2b253021b637cf6e5bad7dbdb3e26a9f499 Mon Sep 17 00:00:00 2001 From: Louis Tarvin Date: Thu, 25 Sep 2025 10:57:39 +0100 Subject: [PATCH] Add support for SASL extensions This commit builds off of the work done in #442, adding the missing support for SASL extensions. This allows extensions to be returned from generate_oauth_token as key-value pairs, which then get passed to rd_kafka_oauthbearer_set_token after being converted into the correct format. --- src/client.rs | 29 +++++++++++++++++++++++------ 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/src/client.rs b/src/client.rs index fd3fded5f..40a59b6e4 100644 --- a/src/client.rs +++ b/src/client.rs @@ -379,19 +379,36 @@ impl Client { .generate_oauth_token(oauthbearer_config.as_deref())?; let token = CString::new(token_info.token)?; let principal_name = CString::new(token_info.principal_name)?; - Ok((token, principal_name, token_info.lifetime_ms)) + let extension_strings = token_info + .extensions + .iter() + .flat_map(|(key, value)| [CString::new(key.as_str()), CString::new(value.as_str())]) + .collect::, _>>()?; + Ok(( + token, + principal_name, + token_info.lifetime_ms, + extension_strings, + )) })(); match res { - Ok((token, principal_name, lifetime_ms)) => { + Ok((token, principal_name, lifetime_ms, extension_strings)) => { let mut err_buf = ErrBuf::new(); + let mut extension_ptrs: Vec<*const c_char> = + extension_strings.iter().map(|s| s.as_ptr()).collect(); + let (extensions_ptr, extensions_count) = if extension_ptrs.is_empty() { + (ptr::null_mut(), 0) + } else { + (extension_ptrs.as_mut_ptr(), extension_ptrs.len()) + }; let code = unsafe { rdkafka_sys::rd_kafka_oauthbearer_set_token( self.native_ptr(), token.as_ptr(), lifetime_ms, principal_name.as_ptr(), - ptr::null_mut(), - 0, + extensions_ptr, + extensions_count, err_buf.as_mut_ptr(), err_buf.capacity(), ) @@ -624,8 +641,6 @@ impl NativeQueue { /// When using the `OAUTHBEARER` SASL authentication method, this type is /// returned from [`ClientContext::generate_oauth_token`]. The token and /// principal name must not contain embedded null characters. -/// -/// Specifying SASL extensions is not currently supported. pub struct OAuthToken { /// The token value to set. pub token: String, @@ -633,6 +648,8 @@ pub struct OAuthToken { pub principal_name: String, /// When the token expires, in number of milliseconds since the Unix epoch. pub lifetime_ms: i64, + /// Optional SASL extensions as key-value pairs. + pub extensions: Vec<(String, String)>, } #[cfg(test)]