Skip to content
This repository was archived by the owner on Nov 8, 2023. It is now read-only.
Draft
57 changes: 41 additions & 16 deletions src/backends/kafka/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::Consumer as ArroyoConsumer;
use super::{AssignmentCallbacks, ConsumeError, ConsumerClosed, PauseError, PollError};
use super::{AssignmentCallbacks, ConsumeError, ConsumerClosed, PauseError, Payload, PollError};
use crate::types::Message as ArroyoMessage;
use crate::types::{Partition, Position, Topic};
use chrono::{DateTime, NaiveDateTime, Utc};
Expand All @@ -8,22 +8,51 @@ use rdkafka::config::{ClientConfig, RDKafkaLogLevel};
use rdkafka::consumer::base_consumer::BaseConsumer;
use rdkafka::consumer::{CommitMode, Consumer, ConsumerContext, Rebalance};
use rdkafka::error::KafkaResult;
use rdkafka::message::{BorrowedHeaders, BorrowedMessage, Message, OwnedHeaders};
use rdkafka::message::{BorrowedHeaders, BorrowedMessage, Message, OwnedHeaders, OwnedMessage};
use rdkafka::topic_partition_list::{Offset, TopicPartitionList};
use std::collections::HashMap;
use std::collections::HashSet;
use std::mem;
use std::sync::Mutex;
use std::time::Duration;

#[derive(Clone)]
pub struct KafkaPayload {
pub key: Option<Vec<u8>>,
pub headers: Option<OwnedHeaders>,
pub payload: Option<Vec<u8>>,
pub enum KafkaPayload<'a> {
Borrowed(BorrowedMessage<'a>),
Owned(OwnedMessage),
}

fn create_kafka_message(msg: BorrowedMessage) -> ArroyoMessage<KafkaPayload> {
impl<'a> KafkaPayload<'a> {
pub fn new(msg: BorrowedMessage<'a>) -> Self {
Self::Borrowed(msg)
}
pub fn key(&self) -> Option<&[u8]> {
match self {
Self::Borrowed(ref msg) => msg.key(),
Self::Owned(ref msg) => msg.key(),
}
}
pub fn headers(&self) -> Option<OwnedHeaders> {
match self {
Self::Borrowed(ref msg) => msg.headers().map(BorrowedHeaders::detach),
Self::Owned(ref msg) => msg.headers().map(|x| x.as_borrowed().detach()),
}
}
pub fn payload(&self) -> Option<&[u8]> {
match self {
Self::Borrowed(ref msg) => msg.payload(),
Self::Owned(ref msg) => msg.payload(),
}
}

pub fn to_owned(&self) -> KafkaPayload<'static> {
match self {
Self::Borrowed(ref msg) => KafkaPayload::Owned(msg.detach()),
Self::Owned(ref msg) => KafkaPayload::Owned(msg.clone()),
}
}
}

fn create_kafka_message(msg: BorrowedMessage) -> ArroyoMessage<Payload> {
let topic = Topic {
name: msg.topic().to_string(),
};
Expand All @@ -36,11 +65,7 @@ fn create_kafka_message(msg: BorrowedMessage) -> ArroyoMessage<KafkaPayload> {
ArroyoMessage::new(
partition,
msg.offset() as u64,
KafkaPayload {
key: msg.key().map(|k| k.to_vec()),
headers: msg.headers().map(BorrowedHeaders::detach),
payload: msg.payload().map(|p| p.to_vec()),
},
Payload::Kafka(KafkaPayload::new(msg)),
DateTime::from_utc(NaiveDateTime::from_timestamp(time_millis, 0), Utc),
)
}
Expand Down Expand Up @@ -124,7 +149,7 @@ impl KafkaConsumer {
}
}

impl<'a> ArroyoConsumer<'a, KafkaPayload> for KafkaConsumer {
impl ArroyoConsumer for KafkaConsumer {
fn subscribe(
&mut self,
topics: &[Topic],
Expand Down Expand Up @@ -157,10 +182,10 @@ impl<'a> ArroyoConsumer<'a, KafkaPayload> for KafkaConsumer {
fn poll(
&mut self,
timeout: Option<Duration>,
) -> Result<Option<ArroyoMessage<KafkaPayload>>, PollError> {
) -> Result<Option<ArroyoMessage<Payload<'_>>>, PollError> {
let duration = timeout.unwrap_or(Duration::from_millis(100));

match self.consumer.as_mut() {
match self.consumer.as_ref() {
None => Err(PollError::ConsumerClosed),
Some(consumer) => {
let res = consumer.poll(duration);
Expand Down
13 changes: 7 additions & 6 deletions src/backends/local/broker.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
use super::SubscriptionError;
use crate::backends::storages::{ConsumeError, MessageStorage, TopicDoesNotExist, TopicExists};
use crate::backends::Payload;
use crate::types::{Message, Partition, Topic};
use crate::utils::clock::Clock;
use chrono::DateTime;
use std::collections::{HashMap, HashSet};
use uuid::Uuid;

pub struct LocalBroker<TPayload: Clone> {
storage: Box<dyn MessageStorage<TPayload>>,
pub struct LocalBroker {
storage: Box<dyn MessageStorage>,
clock: Box<dyn Clock>,
offsets: HashMap<String, HashMap<Partition, u64>>,
subscriptions: HashMap<String, HashMap<Uuid, Vec<Topic>>>,
}

impl<TPayload: Clone> LocalBroker<TPayload> {
pub fn new(storage: Box<dyn MessageStorage<TPayload>>, clock: Box<dyn Clock>) -> Self {
impl LocalBroker {
pub fn new(storage: Box<dyn MessageStorage>, clock: Box<dyn Clock>) -> Self {
Self {
storage,
clock,
Expand All @@ -34,7 +35,7 @@ impl<TPayload: Clone> LocalBroker<TPayload> {
pub fn produce(
&mut self,
partition: &Partition,
payload: TPayload,
payload: Payload,
) -> Result<u64, ConsumeError> {
let time = self.clock.time();
self.storage
Expand Down Expand Up @@ -126,7 +127,7 @@ impl<TPayload: Clone> LocalBroker<TPayload> {
&self,
partition: &Partition,
offset: u64,
) -> Result<Option<Message<TPayload>>, ConsumeError> {
) -> Result<Option<Message<Payload>>, ConsumeError> {
self.storage.consume(partition, offset)
}

Expand Down
42 changes: 19 additions & 23 deletions src/backends/local/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod broker;

use super::storages::{PartitionDoesNotExist, TopicDoesNotExist};
use super::{AssignmentCallbacks, ConsumeError, Consumer, ConsumerClosed, PauseError, PollError};
use crate::backends::Payload;
use crate::types::{Message, Partition, Position, Topic};
use broker::LocalBroker;
use std::collections::HashSet;
Expand Down Expand Up @@ -44,10 +45,10 @@ struct SubscriptionState {
last_eof_at: HashMap<Partition, u64>,
}

pub struct LocalConsumer<'a, TPayload: Clone> {
pub struct LocalConsumer {
id: Uuid,
group: String,
broker: &'a mut LocalBroker<TPayload>,
broker: LocalBroker,
pending_callback: VecDeque<Callback>,
paused: HashSet<Partition>,
// The offset that a the last ``EndOfPartition`` exception that was
Expand All @@ -60,10 +61,10 @@ pub struct LocalConsumer<'a, TPayload: Clone> {
closed: bool,
}

impl<'a, TPayload: Clone> LocalConsumer<'a, TPayload> {
impl LocalConsumer {
pub fn new(
id: Uuid,
broker: &'a mut LocalBroker<TPayload>,
broker: LocalBroker,
group: String,
enable_end_of_partition: bool,
) -> Self {
Expand All @@ -88,7 +89,7 @@ impl<'a, TPayload: Clone> LocalConsumer<'a, TPayload> {
}
}

impl<'a, TPayload: Clone> Consumer<'a, TPayload> for LocalConsumer<'a, TPayload> {
impl Consumer for LocalConsumer {
fn subscribe(
&mut self,
topics: &[Topic],
Expand Down Expand Up @@ -129,7 +130,7 @@ impl<'a, TPayload: Clone> Consumer<'a, TPayload> for LocalConsumer<'a, TPayload>
Ok(())
}

fn poll(&mut self, _timeout: Option<Duration>) -> Result<Option<Message<TPayload>>, PollError> {
fn poll(&mut self, _timeout: Option<Duration>) -> Result<Option<Message<Payload>>, PollError> {
if self.closed {
return Err(PollError::ConsumerClosed);
}
Expand Down Expand Up @@ -160,7 +161,7 @@ impl<'a, TPayload: Clone> Consumer<'a, TPayload> for LocalConsumer<'a, TPayload>

let keys = self.subscription_state.offsets.keys();
let mut new_offset: Option<(Partition, u64)> = None;
let mut ret_message: Option<Message<TPayload>> = None;
let mut ret_message: Option<Message<Payload>> = None;
for partition in keys.collect::<Vec<_>>() {
if self.paused.contains(partition) {
continue;
Expand Down Expand Up @@ -329,8 +330,8 @@ mod tests {
fn on_revoke(&mut self, _: Vec<Partition>) {}
}

fn build_broker() -> LocalBroker<String> {
let storage: MemoryMessageStorage<String> = Default::default();
fn build_broker() -> LocalBroker {
let storage: MemoryMessageStorage = Default::default();
let clock = SystemClock {};
let mut broker = LocalBroker::new(Box::new(storage), Box::new(clock));

Expand All @@ -348,7 +349,7 @@ mod tests {

#[test]
fn test_consumer_subscription() {
let mut broker = build_broker();
let broker = build_broker();

let topic1 = Topic {
name: "test1".to_string(),
Expand All @@ -358,8 +359,7 @@ mod tests {
};

let my_callbacks: Box<dyn AssignmentCallbacks> = Box::new(EmptyCallbacks {});
let mut consumer =
LocalConsumer::new(Uuid::nil(), &mut broker, "test_group".to_string(), true);
let mut consumer = LocalConsumer::new(Uuid::nil(), broker, "test_group".to_string(), true);
assert!(consumer.subscription_state.topics.is_empty());

let res = consumer.subscribe(&[topic1.clone(), topic2.clone()], my_callbacks);
Expand Down Expand Up @@ -402,7 +402,7 @@ mod tests {

#[test]
fn test_subscription_callback() {
let mut broker = build_broker();
let broker = build_broker();

let topic1 = Topic {
name: "test1".to_string(),
Expand Down Expand Up @@ -476,8 +476,7 @@ mod tests {

let my_callbacks: Box<dyn AssignmentCallbacks> = Box::new(TheseCallbacks {});

let mut consumer =
LocalConsumer::new(Uuid::nil(), &mut broker, "test_group".to_string(), true);
let mut consumer = LocalConsumer::new(Uuid::nil(), broker, "test_group".to_string(), true);

let _ = consumer.subscribe(&[topic1, topic2], my_callbacks);
let _ = consumer.poll(Some(Duration::from_millis(100)));
Expand Down Expand Up @@ -521,8 +520,7 @@ mod tests {
}

let my_callbacks: Box<dyn AssignmentCallbacks> = Box::new(TheseCallbacks {});
let mut consumer =
LocalConsumer::new(Uuid::nil(), &mut broker, "test_group".to_string(), true);
let mut consumer = LocalConsumer::new(Uuid::nil(), broker, "test_group".to_string(), true);

let _ = consumer.subscribe(&[topic2], my_callbacks);

Expand All @@ -546,7 +544,7 @@ mod tests {

#[test]
fn test_paused() {
let mut broker = build_broker();
let broker = build_broker();
let topic2 = Topic {
name: "test2".to_string(),
};
Expand All @@ -555,8 +553,7 @@ mod tests {
index: 0,
};
let my_callbacks: Box<dyn AssignmentCallbacks> = Box::new(EmptyCallbacks {});
let mut consumer =
LocalConsumer::new(Uuid::nil(), &mut broker, "test_group".to_string(), false);
let mut consumer = LocalConsumer::new(Uuid::nil(), broker, "test_group".to_string(), false);
let _ = consumer.subscribe(&[topic2], my_callbacks);

assert_eq!(consumer.poll(None).unwrap(), None);
Expand All @@ -572,10 +569,9 @@ mod tests {

#[test]
fn test_commit() {
let mut broker = build_broker();
let broker = build_broker();
let my_callbacks: Box<dyn AssignmentCallbacks> = Box::new(EmptyCallbacks {});
let mut consumer =
LocalConsumer::new(Uuid::nil(), &mut broker, "test_group".to_string(), false);
let mut consumer = LocalConsumer::new(Uuid::nil(), broker, "test_group".to_string(), false);
let topic2 = Topic {
name: "test2".to_string(),
};
Expand Down
24 changes: 20 additions & 4 deletions src/backends/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,20 @@ pub enum ConsumeError {
ConsumerError,
}

pub enum Payload<'a> {
Kafka(kafka::KafkaPayload<'a>),
Local(Vec<u8>),
}

impl<'a> Payload<'a> {
pub fn to_owned(&self) -> Payload<'static> {
match self {
Payload::Kafka(payload) => Payload::Kafka(payload.to_owned()),
Payload::Local(payload) => Payload::Local(payload.clone()),
}
}
}

/// This is basically an observer pattern to receive the callbacks from
/// the consumer when partitions are assigned/revoked.
pub trait AssignmentCallbacks: Send + Sync {
Expand Down Expand Up @@ -77,7 +91,9 @@ pub trait AssignmentCallbacks: Send + Sync {
/// occurs even if the consumer retains ownership of the partition across
/// assignments.) For this reason, it is generally good practice to ensure
/// offsets are committed as part of the revocation callback.
pub trait Consumer<'a, TPayload: Clone> {
///
///
pub trait Consumer {
fn subscribe(
&mut self,
topic: &[Topic],
Expand All @@ -93,7 +109,7 @@ pub trait Consumer<'a, TPayload: Clone> {
/// consumer attempts to read from an invalid location in one of it's
/// assigned partitions. (Additional details can be found in the
/// docstring for ``Consumer.seek``.)
fn poll(&mut self, timeout: Option<Duration>) -> Result<Option<Message<TPayload>>, PollError>;
fn poll(&mut self, timeout: Option<Duration>) -> Result<Option<Message<Payload>>, PollError>;

/// Pause consuming from the provided partitions.
///
Expand Down Expand Up @@ -156,13 +172,13 @@ pub trait Consumer<'a, TPayload: Clone> {
fn closed(&self) -> bool;
}

pub trait Producer<TPayload> {
pub trait Producer<Payload> {
/// Produce to a topic or partition.
fn produce(
&self,
destination_topic: Option<Topic>,
destination_partition: Option<Partition>,
payload: TPayload,
payload: Payload,
);

fn close(&self);
Expand Down
Loading