Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions examples/admin_oauthbearer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
//! Example demonstrating how to use the AdminClient with OAUTHBEARER authentication.
//!
//! This example shows how to implement a custom ClientContext that provides
//! OAuth token generation for the AdminClient.

use std::error::Error;

use rdkafka::admin::AdminClient;
use rdkafka::client::{ClientContext, OAuthToken};
use rdkafka::ClientConfig;

/// Custom context that implements OAuth token generation
struct OAuthClientContext;

impl ClientContext for OAuthClientContext {
// Enable OAuth token refresh
const ENABLE_REFRESH_OAUTH_TOKEN: bool = true;

/// Generate an OAuth token
///
/// In a real application, this would:
/// - Call your OAuth provider's token endpoint
/// - Retrieve a valid access token
/// - Return the token with appropriate expiration time
fn generate_oauth_token(
&self,
_oauthbearer_config: Option<&str>,
) -> Result<OAuthToken, Box<dyn Error>> {
// Example token generation (replace with actual OAuth logic)
println!("Generating OAuth token...");

// In a real implementation, you would:
// 1. Call your OAuth provider (e.g., Azure AD, Okta, etc.)
// 2. Get an access token
// 3. Return it with the appropriate lifetime

Ok(OAuthToken {
token: "your-oauth-token-here".to_string(),
principal_name: "your-principal-name".to_string(),
lifetime_ms: 3600000, // 1 hour in milliseconds
})
}
}

fn main() {
// Configure the admin client with OAUTHBEARER
let _admin_client: AdminClient<OAuthClientContext> = ClientConfig::new()
.set("bootstrap.servers", "localhost:9092")
.set("security.protocol", "SASL_SSL")
.set("sasl.mechanisms", "OAUTHBEARER")
// Optional: configure OAuth-specific settings
// .set("sasl.oauthbearer.config", "your-config-here")
.create_with_context(OAuthClientContext)
.expect("Admin client creation failed");

println!("AdminClient with OAUTHBEARER created successfully!");

// Example: In a real scenario, you would perform actual admin operations
// like creating topics, deleting topics, etc.
println!("Admin client is ready to use with OAUTHBEARER authentication");

// The background polling thread will automatically handle
// OAUTHBEARER token refresh when needed
}

71 changes: 53 additions & 18 deletions src/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use futures_util::ready;
use rdkafka_sys as rdsys;
use rdkafka_sys::types::*;

use crate::client::{Client, ClientContext, DefaultClientContext, NativeQueue};
use crate::client::{Client, ClientContext, DefaultClientContext, EventPollResult, NativeClient, NativeQueue};
use crate::config::{ClientConfig, FromClientConfig, FromClientConfigAndContext};
use crate::error::{IsError, KafkaError, KafkaResult};
use crate::log::{trace, warn};
Expand All @@ -38,7 +38,7 @@ use crate::TopicPartitionList;
/// `AdminClient` provides programmatic access to managing a Kafka cluster,
/// notably manipulating topics, partitions, and configuration paramaters.
pub struct AdminClient<C: ClientContext> {
client: Client<C>,
client: Arc<Client<C>>,
queue: Arc<NativeQueue>,
should_stop: Arc<AtomicBool>,
handle: Option<JoinHandle<()>>,
Expand All @@ -54,6 +54,16 @@ impl<C: ClientContext> fmt::Debug for AdminClient<C> {
}

impl<C: ClientContext> AdminClient<C> {
/// Returns a reference to the native rdkafka-sys client.
pub fn native_client(&self) -> &NativeClient {
self.client.native_client()
}

/// Returns the `Client` for this admin client.
pub fn client(&self) -> &Client<C> {
&*self.client
}

/// Creates new topics according to the provided `NewTopic` specifications.
///
/// Note that while the API supports creating multiple topics at once, it
Expand Down Expand Up @@ -399,23 +409,34 @@ impl FromClientConfig for AdminClient<DefaultClientContext> {
}
}

impl<C: ClientContext> FromClientConfigAndContext<C> for AdminClient<C> {
impl<C: ClientContext + 'static> FromClientConfigAndContext<C> for AdminClient<C> {
fn from_config_and_context(config: &ClientConfig, context: C) -> KafkaResult<AdminClient<C>> {
let native_config = config.create_native_config()?;
// librdkafka only provides consumer and producer types. We follow the
// example of the Python bindings in choosing to pretend to be a
// producer, as producer clients are allegedly more lightweight. [0]
//
// [0]: https://github.com/confluentinc/confluent-kafka-python/blob/bfb07dfbca47c256c840aaace83d3fe26c587360/confluent_kafka/src/Admin.c#L1492-L1493
let client = Client::new(

// Register events we want to handle in the polling thread
unsafe {
rdsys::rd_kafka_conf_set_events(
native_config.ptr(),
rdsys::RD_KAFKA_EVENT_STATS
| rdsys::RD_KAFKA_EVENT_ERROR
| rdsys::RD_KAFKA_EVENT_OAUTHBEARER_TOKEN_REFRESH,
)
};

let client = Arc::new(Client::new(
config,
native_config,
RDKafkaType::RD_KAFKA_PRODUCER,
context,
)?;
)?);
let queue = Arc::new(client.new_native_queue());
let should_stop = Arc::new(AtomicBool::new(false));
let handle = start_poll_thread(queue.clone(), should_stop.clone());
let handle = start_poll_thread(Arc::clone(&client), queue.clone(), should_stop.clone());
Ok(AdminClient {
client,
queue,
Expand All @@ -437,25 +458,39 @@ impl<C: ClientContext> Drop for AdminClient<C> {
}
}

fn start_poll_thread(queue: Arc<NativeQueue>, should_stop: Arc<AtomicBool>) -> JoinHandle<()> {
fn start_poll_thread<C: ClientContext + 'static>(
client: Arc<Client<C>>,
queue: Arc<NativeQueue>,
should_stop: Arc<AtomicBool>,
) -> JoinHandle<()> {
thread::Builder::new()
.name("admin client polling thread".into())
.spawn(move || {
trace!("Admin polling thread loop started");
loop {
let event = queue.poll(Duration::from_millis(100));
if event.is_null() {
if should_stop.load(Ordering::Relaxed) {
// We received nothing and the thread should stop, so
// break the loop.
break;
// Use poll_event instead of direct queue.poll to handle system events
// like OAUTHBEARER token refresh, logging, and statistics
let poll_result = client.poll_event(&queue, Duration::from_millis(100));
match poll_result {
EventPollResult::Event(event) => {
// Forward admin operation events to their handlers
let tx: Box<oneshot::Sender<NativeEvent>> =
unsafe { IntoOpaque::from_ptr(rdsys::rd_kafka_event_opaque(event.ptr())) };
let _ = tx.send(event);
}
EventPollResult::EventConsumed => {
// System events (OAUTHBEARER, stats, logs) were handled by poll_event
continue;
}
EventPollResult::None => {
// No event received
if should_stop.load(Ordering::Relaxed) {
// We received nothing and the thread should stop, so
// break the loop.
break;
}
}
continue;
}
let event = unsafe { NativeEvent::from_ptr(event).unwrap() };
let tx: Box<oneshot::Sender<NativeEvent>> =
unsafe { IntoOpaque::from_ptr(rdsys::rd_kafka_event_opaque(event.ptr())) };
let _ = tx.send(event);
}
trace!("Admin polling thread loop terminated");
})
Expand Down
71 changes: 71 additions & 0 deletions tests/test_admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -628,3 +628,74 @@ async fn test_event_errors() {
Err(KafkaError::AdminOp(RDKafkaErrorCode::OperationTimedOut))
);
}

#[tokio::test]
async fn test_admin_client_with_oauthbearer_context() {
use rdkafka::client::{ClientContext, OAuthToken};
use std::error::Error;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

// Custom context that tracks OAuth token generation attempts
struct OAuthTestContext {
token_generation_count: Arc<AtomicUsize>,
}

impl ClientContext for OAuthTestContext {
const ENABLE_REFRESH_OAUTH_TOKEN: bool = true;

fn generate_oauth_token(
&self,
_oauthbearer_config: Option<&str>,
) -> Result<OAuthToken, Box<dyn Error>> {
// Track that token generation was attempted
self.token_generation_count.fetch_add(1, Ordering::SeqCst);

// Return a dummy token
Ok(OAuthToken {
token: "test-token".to_string(),
principal_name: "test-principal".to_string(),
lifetime_ms: 3600000,
})
}
}

let token_count = Arc::new(AtomicUsize::new(0));
let context = OAuthTestContext {
token_generation_count: Arc::clone(&token_count),
};

// Create an AdminClient with OAUTHBEARER context
// This should successfully create even though we don't connect to a real broker
let admin_client: AdminClient<OAuthTestContext> = create_config()
.create_with_context(context)
.expect("Admin client with OAUTHBEARER context creation failed");

// Verify the client was created successfully
assert!(admin_client.native_client().ptr() != std::ptr::null_mut());

// Test a simple operation - fetch metadata
// This should work if a broker is running (like in CI), or fail gracefully if not
let metadata_result = admin_client.client().fetch_metadata(None, std::time::Duration::from_secs(5));

// Either outcome is fine:
// - Success: broker is available, client works with OAUTHBEARER context
// - Error: no broker, but client doesn't crash
match metadata_result {
Ok(_metadata) => {
// Successfully connected - client works with OAUTHBEARER context!
println!("Successfully fetched metadata with OAUTHBEARER context");
}
Err(e) => {
// Failed to connect (no broker) - but client is functional
println!("Expected error without broker: {:?}", e);
}
}

// The client should be functional and not crash when dropped
drop(admin_client);

// Note: In a real OAUTHBEARER scenario with a properly configured broker,
// the token_generation_count would increment when the broker requests authentication.
// For this test, we verify the client can be created and used without crashing.
}