diff --git a/examples/admin_oauthbearer.rs b/examples/admin_oauthbearer.rs new file mode 100644 index 000000000..731403b9e --- /dev/null +++ b/examples/admin_oauthbearer.rs @@ -0,0 +1,65 @@ +//! Example demonstrating how to use the AdminClient with OAUTHBEARER authentication. +//! +//! This example shows how to implement a custom ClientContext that provides +//! OAuth token generation for the AdminClient. + +use std::error::Error; + +use rdkafka::admin::AdminClient; +use rdkafka::client::{ClientContext, OAuthToken}; +use rdkafka::ClientConfig; + +/// Custom context that implements OAuth token generation +struct OAuthClientContext; + +impl ClientContext for OAuthClientContext { + // Enable OAuth token refresh + const ENABLE_REFRESH_OAUTH_TOKEN: bool = true; + + /// Generate an OAuth token + /// + /// In a real application, this would: + /// - Call your OAuth provider's token endpoint + /// - Retrieve a valid access token + /// - Return the token with appropriate expiration time + fn generate_oauth_token( + &self, + _oauthbearer_config: Option<&str>, + ) -> Result> { + // Example token generation (replace with actual OAuth logic) + println!("Generating OAuth token..."); + + // In a real implementation, you would: + // 1. Call your OAuth provider (e.g., Azure AD, Okta, etc.) + // 2. Get an access token + // 3. Return it with the appropriate lifetime + + Ok(OAuthToken { + token: "your-oauth-token-here".to_string(), + principal_name: "your-principal-name".to_string(), + lifetime_ms: 3600000, // 1 hour in milliseconds + }) + } +} + +fn main() { + // Configure the admin client with OAUTHBEARER + let _admin_client: AdminClient = ClientConfig::new() + .set("bootstrap.servers", "localhost:9092") + .set("security.protocol", "SASL_SSL") + .set("sasl.mechanisms", "OAUTHBEARER") + // Optional: configure OAuth-specific settings + // .set("sasl.oauthbearer.config", "your-config-here") + .create_with_context(OAuthClientContext) + .expect("Admin client creation failed"); + + println!("AdminClient with OAUTHBEARER created successfully!"); + + // Example: In a real scenario, you would perform actual admin operations + // like creating topics, deleting topics, etc. + println!("Admin client is ready to use with OAUTHBEARER authentication"); + + // The background polling thread will automatically handle + // OAUTHBEARER token refresh when needed +} + diff --git a/src/admin.rs b/src/admin.rs index 595743ef3..d7d94a6f2 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -22,7 +22,7 @@ use futures_util::ready; use rdkafka_sys as rdsys; use rdkafka_sys::types::*; -use crate::client::{Client, ClientContext, DefaultClientContext, NativeQueue}; +use crate::client::{Client, ClientContext, DefaultClientContext, EventPollResult, NativeClient, NativeQueue}; use crate::config::{ClientConfig, FromClientConfig, FromClientConfigAndContext}; use crate::error::{IsError, KafkaError, KafkaResult}; use crate::log::{trace, warn}; @@ -38,7 +38,7 @@ use crate::TopicPartitionList; /// `AdminClient` provides programmatic access to managing a Kafka cluster, /// notably manipulating topics, partitions, and configuration paramaters. pub struct AdminClient { - client: Client, + client: Arc>, queue: Arc, should_stop: Arc, handle: Option>, @@ -54,6 +54,16 @@ impl fmt::Debug for AdminClient { } impl AdminClient { + /// Returns a reference to the native rdkafka-sys client. + pub fn native_client(&self) -> &NativeClient { + self.client.native_client() + } + + /// Returns the `Client` for this admin client. + pub fn client(&self) -> &Client { + &*self.client + } + /// Creates new topics according to the provided `NewTopic` specifications. /// /// Note that while the API supports creating multiple topics at once, it @@ -399,7 +409,7 @@ impl FromClientConfig for AdminClient { } } -impl FromClientConfigAndContext for AdminClient { +impl FromClientConfigAndContext for AdminClient { fn from_config_and_context(config: &ClientConfig, context: C) -> KafkaResult> { let native_config = config.create_native_config()?; // librdkafka only provides consumer and producer types. We follow the @@ -407,15 +417,26 @@ impl FromClientConfigAndContext for AdminClient { // producer, as producer clients are allegedly more lightweight. [0] // // [0]: https://github.com/confluentinc/confluent-kafka-python/blob/bfb07dfbca47c256c840aaace83d3fe26c587360/confluent_kafka/src/Admin.c#L1492-L1493 - let client = Client::new( + + // Register events we want to handle in the polling thread + unsafe { + rdsys::rd_kafka_conf_set_events( + native_config.ptr(), + rdsys::RD_KAFKA_EVENT_STATS + | rdsys::RD_KAFKA_EVENT_ERROR + | rdsys::RD_KAFKA_EVENT_OAUTHBEARER_TOKEN_REFRESH, + ) + }; + + let client = Arc::new(Client::new( config, native_config, RDKafkaType::RD_KAFKA_PRODUCER, context, - )?; + )?); let queue = Arc::new(client.new_native_queue()); let should_stop = Arc::new(AtomicBool::new(false)); - let handle = start_poll_thread(queue.clone(), should_stop.clone()); + let handle = start_poll_thread(Arc::clone(&client), queue.clone(), should_stop.clone()); Ok(AdminClient { client, queue, @@ -437,25 +458,39 @@ impl Drop for AdminClient { } } -fn start_poll_thread(queue: Arc, should_stop: Arc) -> JoinHandle<()> { +fn start_poll_thread( + client: Arc>, + queue: Arc, + should_stop: Arc, +) -> JoinHandle<()> { thread::Builder::new() .name("admin client polling thread".into()) .spawn(move || { trace!("Admin polling thread loop started"); loop { - let event = queue.poll(Duration::from_millis(100)); - if event.is_null() { - if should_stop.load(Ordering::Relaxed) { - // We received nothing and the thread should stop, so - // break the loop. - break; + // Use poll_event instead of direct queue.poll to handle system events + // like OAUTHBEARER token refresh, logging, and statistics + let poll_result = client.poll_event(&queue, Duration::from_millis(100)); + match poll_result { + EventPollResult::Event(event) => { + // Forward admin operation events to their handlers + let tx: Box> = + unsafe { IntoOpaque::from_ptr(rdsys::rd_kafka_event_opaque(event.ptr())) }; + let _ = tx.send(event); + } + EventPollResult::EventConsumed => { + // System events (OAUTHBEARER, stats, logs) were handled by poll_event + continue; + } + EventPollResult::None => { + // No event received + if should_stop.load(Ordering::Relaxed) { + // We received nothing and the thread should stop, so + // break the loop. + break; + } } - continue; } - let event = unsafe { NativeEvent::from_ptr(event).unwrap() }; - let tx: Box> = - unsafe { IntoOpaque::from_ptr(rdsys::rd_kafka_event_opaque(event.ptr())) }; - let _ = tx.send(event); } trace!("Admin polling thread loop terminated"); }) diff --git a/tests/test_admin.rs b/tests/test_admin.rs index 87f258a9e..45262462a 100644 --- a/tests/test_admin.rs +++ b/tests/test_admin.rs @@ -628,3 +628,74 @@ async fn test_event_errors() { Err(KafkaError::AdminOp(RDKafkaErrorCode::OperationTimedOut)) ); } + +#[tokio::test] +async fn test_admin_client_with_oauthbearer_context() { + use rdkafka::client::{ClientContext, OAuthToken}; + use std::error::Error; + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::Arc; + + // Custom context that tracks OAuth token generation attempts + struct OAuthTestContext { + token_generation_count: Arc, + } + + impl ClientContext for OAuthTestContext { + const ENABLE_REFRESH_OAUTH_TOKEN: bool = true; + + fn generate_oauth_token( + &self, + _oauthbearer_config: Option<&str>, + ) -> Result> { + // Track that token generation was attempted + self.token_generation_count.fetch_add(1, Ordering::SeqCst); + + // Return a dummy token + Ok(OAuthToken { + token: "test-token".to_string(), + principal_name: "test-principal".to_string(), + lifetime_ms: 3600000, + }) + } + } + + let token_count = Arc::new(AtomicUsize::new(0)); + let context = OAuthTestContext { + token_generation_count: Arc::clone(&token_count), + }; + + // Create an AdminClient with OAUTHBEARER context + // This should successfully create even though we don't connect to a real broker + let admin_client: AdminClient = create_config() + .create_with_context(context) + .expect("Admin client with OAUTHBEARER context creation failed"); + + // Verify the client was created successfully + assert!(admin_client.native_client().ptr() != std::ptr::null_mut()); + + // Test a simple operation - fetch metadata + // This should work if a broker is running (like in CI), or fail gracefully if not + let metadata_result = admin_client.client().fetch_metadata(None, std::time::Duration::from_secs(5)); + + // Either outcome is fine: + // - Success: broker is available, client works with OAUTHBEARER context + // - Error: no broker, but client doesn't crash + match metadata_result { + Ok(_metadata) => { + // Successfully connected - client works with OAUTHBEARER context! + println!("Successfully fetched metadata with OAUTHBEARER context"); + } + Err(e) => { + // Failed to connect (no broker) - but client is functional + println!("Expected error without broker: {:?}", e); + } + } + + // The client should be functional and not crash when dropped + drop(admin_client); + + // Note: In a real OAUTHBEARER scenario with a properly configured broker, + // the token_generation_count would increment when the broker requests authentication. + // For this test, we verify the client can be created and used without crashing. +}