Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 23 additions & 6 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,19 +379,36 @@ impl<C: ClientContext> Client<C> {
.generate_oauth_token(oauthbearer_config.as_deref())?;
let token = CString::new(token_info.token)?;
let principal_name = CString::new(token_info.principal_name)?;
Ok((token, principal_name, token_info.lifetime_ms))
let extension_strings = token_info
.extensions
.iter()
.flat_map(|(key, value)| [CString::new(key.as_str()), CString::new(value.as_str())])
.collect::<Result<Vec<_>, _>>()?;
Ok((
token,
principal_name,
token_info.lifetime_ms,
extension_strings,
))
})();
match res {
Ok((token, principal_name, lifetime_ms)) => {
Ok((token, principal_name, lifetime_ms, extension_strings)) => {
let mut err_buf = ErrBuf::new();
let mut extension_ptrs: Vec<*const c_char> =
extension_strings.iter().map(|s| s.as_ptr()).collect();
let (extensions_ptr, extensions_count) = if extension_ptrs.is_empty() {
(ptr::null_mut(), 0)
} else {
(extension_ptrs.as_mut_ptr(), extension_ptrs.len())
};
let code = unsafe {
rdkafka_sys::rd_kafka_oauthbearer_set_token(
self.native_ptr(),
token.as_ptr(),
lifetime_ms,
principal_name.as_ptr(),
ptr::null_mut(),
0,
extensions_ptr,
extensions_count,
err_buf.as_mut_ptr(),
err_buf.capacity(),
)
Expand Down Expand Up @@ -624,15 +641,15 @@ impl NativeQueue {
/// When using the `OAUTHBEARER` SASL authentication method, this type is
/// returned from [`ClientContext::generate_oauth_token`]. The token and
/// principal name must not contain embedded null characters.
///
/// Specifying SASL extensions is not currently supported.
pub struct OAuthToken {
/// The token value to set.
pub token: String,
/// The Kafka principal name associated with the token.
pub principal_name: String,
/// When the token expires, in number of milliseconds since the Unix epoch.
pub lifetime_ms: i64,
/// Optional SASL extensions as key-value pairs.
pub extensions: Vec<(String, String)>,
}

#[cfg(test)]
Expand Down