From 2076a21133f52a20f86953c7f181bf9fb82dd246 Mon Sep 17 00:00:00 2001 From: Lyn Date: Fri, 10 Jun 2022 10:39:47 -0700 Subject: [PATCH 01/13] ref: Remove lifetime annotations from ArroyoConsumer This is an attempt to simplify the ArroyoConsumer lifetimes by changing the LocalConsumer to have ownership of the LocalBroker. Since this consumer/broker only used in place of the KafkaConsumer for testing, I think this change is fairly harmless as it's not required for the broker to live outside the consumer. --- src/backends/kafka/mod.rs | 2 +- src/backends/local/mod.rs | 25 ++++++++++--------------- src/backends/mod.rs | 2 +- src/processing/mod.rs | 8 ++++---- 4 files changed, 16 insertions(+), 21 deletions(-) diff --git a/src/backends/kafka/mod.rs b/src/backends/kafka/mod.rs index 573d39e..2341cb7 100644 --- a/src/backends/kafka/mod.rs +++ b/src/backends/kafka/mod.rs @@ -124,7 +124,7 @@ impl KafkaConsumer { } } -impl<'a> ArroyoConsumer<'a, KafkaPayload> for KafkaConsumer { +impl ArroyoConsumer for KafkaConsumer { fn subscribe( &mut self, topics: &[Topic], diff --git a/src/backends/local/mod.rs b/src/backends/local/mod.rs index 9603e5c..9637ae4 100644 --- a/src/backends/local/mod.rs +++ b/src/backends/local/mod.rs @@ -44,10 +44,10 @@ struct SubscriptionState { last_eof_at: HashMap, } -pub struct LocalConsumer<'a, TPayload: Clone> { +pub struct LocalConsumer { id: Uuid, group: String, - broker: &'a mut LocalBroker, + broker: LocalBroker, pending_callback: VecDeque, paused: HashSet, // The offset that a the last ``EndOfPartition`` exception that was @@ -60,10 +60,10 @@ pub struct LocalConsumer<'a, TPayload: Clone> { closed: bool, } -impl<'a, TPayload: Clone> LocalConsumer<'a, TPayload> { +impl LocalConsumer { pub fn new( id: Uuid, - broker: &'a mut LocalBroker, + broker: LocalBroker, group: String, enable_end_of_partition: bool, ) -> Self { @@ -88,7 +88,7 @@ impl<'a, TPayload: Clone> LocalConsumer<'a, TPayload> { } } -impl<'a, TPayload: Clone> Consumer<'a, TPayload> for LocalConsumer<'a, TPayload> { +impl Consumer for LocalConsumer { fn subscribe( &mut self, topics: &[Topic], @@ -358,8 +358,7 @@ mod tests { }; let my_callbacks: Box = Box::new(EmptyCallbacks {}); - let mut consumer = - LocalConsumer::new(Uuid::nil(), &mut broker, "test_group".to_string(), true); + let mut consumer = LocalConsumer::new(Uuid::nil(), broker, "test_group".to_string(), true); assert!(consumer.subscription_state.topics.is_empty()); let res = consumer.subscribe(&[topic1.clone(), topic2.clone()], my_callbacks); @@ -476,8 +475,7 @@ mod tests { let my_callbacks: Box = Box::new(TheseCallbacks {}); - let mut consumer = - LocalConsumer::new(Uuid::nil(), &mut broker, "test_group".to_string(), true); + let mut consumer = LocalConsumer::new(Uuid::nil(), broker, "test_group".to_string(), true); let _ = consumer.subscribe(&[topic1, topic2], my_callbacks); let _ = consumer.poll(Some(Duration::from_millis(100))); @@ -521,8 +519,7 @@ mod tests { } let my_callbacks: Box = Box::new(TheseCallbacks {}); - let mut consumer = - LocalConsumer::new(Uuid::nil(), &mut broker, "test_group".to_string(), true); + let mut consumer = LocalConsumer::new(Uuid::nil(), broker, "test_group".to_string(), true); let _ = consumer.subscribe(&[topic2], my_callbacks); @@ -555,8 +552,7 @@ mod tests { index: 0, }; let my_callbacks: Box = Box::new(EmptyCallbacks {}); - let mut consumer = - LocalConsumer::new(Uuid::nil(), &mut broker, "test_group".to_string(), false); + let mut consumer = LocalConsumer::new(Uuid::nil(), broker, "test_group".to_string(), false); let _ = consumer.subscribe(&[topic2], my_callbacks); assert_eq!(consumer.poll(None).unwrap(), None); @@ -574,8 +570,7 @@ mod tests { fn test_commit() { let mut broker = build_broker(); let my_callbacks: Box = Box::new(EmptyCallbacks {}); - let mut consumer = - LocalConsumer::new(Uuid::nil(), &mut broker, "test_group".to_string(), false); + let mut consumer = LocalConsumer::new(Uuid::nil(), broker, "test_group".to_string(), false); let topic2 = Topic { name: "test2".to_string(), }; diff --git a/src/backends/mod.rs b/src/backends/mod.rs index c14e60d..ac2dcff 100755 --- a/src/backends/mod.rs +++ b/src/backends/mod.rs @@ -77,7 +77,7 @@ pub trait AssignmentCallbacks: Send + Sync { /// occurs even if the consumer retains ownership of the partition across /// assignments.) For this reason, it is generally good practice to ensure /// offsets are committed as part of the revocation callback. -pub trait Consumer<'a, TPayload: Clone> { +pub trait Consumer { fn subscribe( &mut self, topic: &[Topic], diff --git a/src/processing/mod.rs b/src/processing/mod.rs index 27aa3ef..7377661 100644 --- a/src/processing/mod.rs +++ b/src/processing/mod.rs @@ -68,7 +68,7 @@ impl Callbacks { /// strategies are instantiated on partition assignment and closed on /// partition revocation. pub struct StreamProcessor<'a, TPayload: Clone> { - consumer: Box + 'a>, + consumer: Box + 'a>, strategies: Arc>>, message: Option>, shutdown_requested: bool, @@ -76,7 +76,7 @@ pub struct StreamProcessor<'a, TPayload: Clone> { impl<'a, TPayload: 'static + Clone> StreamProcessor<'a, TPayload> { pub fn new( - consumer: Box + 'a>, + consumer: Box + 'a>, processing_factory: Box>, ) -> Self { let strategies = Arc::new(Mutex::new(Strategies { @@ -278,7 +278,7 @@ mod tests { let mut broker = build_broker(); let consumer = Box::new(LocalConsumer::new( Uuid::nil(), - &mut broker, + broker, "test_group".to_string(), false, )); @@ -306,7 +306,7 @@ mod tests { let consumer = Box::new(LocalConsumer::new( Uuid::nil(), - &mut broker, + broker, "test_group".to_string(), false, )); From d980ae201bbd14612d9be48eae5476b4fac53440 Mon Sep 17 00:00:00 2001 From: Lyn Date: Fri, 10 Jun 2022 11:40:02 -0700 Subject: [PATCH 02/13] lint --- src/backends/local/mod.rs | 8 ++++---- src/processing/mod.rs | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/backends/local/mod.rs b/src/backends/local/mod.rs index 9637ae4..b92cfeb 100644 --- a/src/backends/local/mod.rs +++ b/src/backends/local/mod.rs @@ -348,7 +348,7 @@ mod tests { #[test] fn test_consumer_subscription() { - let mut broker = build_broker(); + let broker = build_broker(); let topic1 = Topic { name: "test1".to_string(), @@ -401,7 +401,7 @@ mod tests { #[test] fn test_subscription_callback() { - let mut broker = build_broker(); + let broker = build_broker(); let topic1 = Topic { name: "test1".to_string(), @@ -543,7 +543,7 @@ mod tests { #[test] fn test_paused() { - let mut broker = build_broker(); + let broker = build_broker(); let topic2 = Topic { name: "test2".to_string(), }; @@ -568,7 +568,7 @@ mod tests { #[test] fn test_commit() { - let mut broker = build_broker(); + let broker = build_broker(); let my_callbacks: Box = Box::new(EmptyCallbacks {}); let mut consumer = LocalConsumer::new(Uuid::nil(), broker, "test_group".to_string(), false); let topic2 = Topic { diff --git a/src/processing/mod.rs b/src/processing/mod.rs index 7377661..274762d 100644 --- a/src/processing/mod.rs +++ b/src/processing/mod.rs @@ -275,7 +275,7 @@ mod tests { #[test] fn test_processor() { - let mut broker = build_broker(); + let broker = build_broker(); let consumer = Box::new(LocalConsumer::new( Uuid::nil(), broker, From 60e11a725609b7407a87a429a005d9ff6d142598 Mon Sep 17 00:00:00 2001 From: Lyn Date: Tue, 14 Jun 2022 12:55:11 -0700 Subject: [PATCH 03/13] feat: Attempt to make KafkaPayload zero copy --- src/backends/kafka/mod.rs | 75 ++++++++++++++++++++++++++++++--------- src/backends/local/mod.rs | 8 ++++- src/backends/mod.rs | 7 +++- 3 files changed, 72 insertions(+), 18 deletions(-) diff --git a/src/backends/kafka/mod.rs b/src/backends/kafka/mod.rs index 2341cb7..e452458 100644 --- a/src/backends/kafka/mod.rs +++ b/src/backends/kafka/mod.rs @@ -8,7 +8,7 @@ use rdkafka::config::{ClientConfig, RDKafkaLogLevel}; use rdkafka::consumer::base_consumer::BaseConsumer; use rdkafka::consumer::{CommitMode, Consumer, ConsumerContext, Rebalance}; use rdkafka::error::KafkaResult; -use rdkafka::message::{BorrowedHeaders, BorrowedMessage, Message, OwnedHeaders}; +use rdkafka::message::{BorrowedHeaders, BorrowedMessage, Message, OwnedHeaders, OwnedMessage}; use rdkafka::topic_partition_list::{Offset, TopicPartitionList}; use std::collections::HashMap; use std::collections::HashSet; @@ -16,11 +16,54 @@ use std::mem; use std::sync::Mutex; use std::time::Duration; -#[derive(Clone)] -pub struct KafkaPayload { - pub key: Option>, - pub headers: Option, - pub payload: Option>, +pub struct KafkaPayload<'a> { + borrowed_message: Option>, + owned_message: Option, +} + +impl<'a> KafkaPayload<'a> { + pub fn from_borrowed_message(msg: BorrowedMessage<'a>) -> Self { + Self { + borrowed_message: Some(msg), + owned_message: None, + } + } + + pub fn key(&self) -> Option<&[u8]> { + if self.borrowed_message.is_some() { + return self.borrowed_message?.key(); + } else { + return self.owned_message?.key(); + } + } + pub fn headers(&self) -> Option<&OwnedHeaders> { + if self.borrowed_message.is_some() { + return self + .borrowed_message? + .headers() + .map(BorrowedHeaders::detach) + .as_ref(); + } else { + return self.owned_message?.headers(); + } + } + pub fn payload(&self) -> Option<&[u8]> { + if self.borrowed_message.is_some() { + return self.borrowed_message?.payload(); + } else { + return self.owned_message?.payload(); + } + } +} + +impl<'a> Clone for KafkaPayload<'a> { + fn clone(&self) -> KafkaPayload<'a> { + let owned_message = self.borrowed_message.unwrap().detach(); + KafkaPayload { + borrowed_message: None, + owned_message: Some(owned_message), + } + } } fn create_kafka_message(msg: BorrowedMessage) -> ArroyoMessage { @@ -36,11 +79,7 @@ fn create_kafka_message(msg: BorrowedMessage) -> ArroyoMessage { ArroyoMessage::new( partition, msg.offset() as u64, - KafkaPayload { - key: msg.key().map(|k| k.to_vec()), - headers: msg.headers().map(BorrowedHeaders::detach), - payload: msg.payload().map(|p| p.to_vec()), - }, + KafkaPayload::from_borrowed_message(msg), DateTime::from_utc(NaiveDateTime::from_timestamp(time_millis, 0), Utc), ) } @@ -124,7 +163,7 @@ impl KafkaConsumer { } } -impl ArroyoConsumer for KafkaConsumer { +impl<'a> ArroyoConsumer> for KafkaConsumer { fn subscribe( &mut self, topics: &[Topic], @@ -154,13 +193,17 @@ impl ArroyoConsumer for KafkaConsumer { Ok(()) } - fn poll( - &mut self, + fn poll<'b>( + &'b self, timeout: Option, - ) -> Result>, PollError> { + ) -> Result>>, PollError> + where + KafkaPayload<'a>: 'b, + 'a: 'b, + { let duration = timeout.unwrap_or(Duration::from_millis(100)); - match self.consumer.as_mut() { + match self.consumer.as_ref() { None => Err(PollError::ConsumerClosed), Some(consumer) => { let res = consumer.poll(duration); diff --git a/src/backends/local/mod.rs b/src/backends/local/mod.rs index b92cfeb..1a228cf 100644 --- a/src/backends/local/mod.rs +++ b/src/backends/local/mod.rs @@ -129,7 +129,13 @@ impl Consumer for LocalConsumer { Ok(()) } - fn poll(&mut self, _timeout: Option) -> Result>, PollError> { + fn poll<'b>( + &'b mut self, + _timeout: Option, + ) -> Result>, PollError> + where + TPayload: 'b, + { if self.closed { return Err(PollError::ConsumerClosed); } diff --git a/src/backends/mod.rs b/src/backends/mod.rs index ac2dcff..5175bdc 100755 --- a/src/backends/mod.rs +++ b/src/backends/mod.rs @@ -93,7 +93,12 @@ pub trait Consumer { /// consumer attempts to read from an invalid location in one of it's /// assigned partitions. (Additional details can be found in the /// docstring for ``Consumer.seek``.) - fn poll(&mut self, timeout: Option) -> Result>, PollError>; + fn poll<'b>( + &'b self, + timeout: Option, + ) -> Result>, PollError> + where + TPayload: 'b; /// Pause consuming from the provided partitions. /// From 3fec743b3bfdd0ee36d8d5b9267729860847c0a2 Mon Sep 17 00:00:00 2001 From: Lyn Date: Wed, 15 Jun 2022 12:14:22 -0700 Subject: [PATCH 04/13] try enum --- src/backends/kafka/mod.rs | 51 +++++++++++++++------------------------ src/backends/local/mod.rs | 2 ++ src/backends/mod.rs | 19 +++++++++------ 3 files changed, 32 insertions(+), 40 deletions(-) diff --git a/src/backends/kafka/mod.rs b/src/backends/kafka/mod.rs index e452458..baca9d0 100644 --- a/src/backends/kafka/mod.rs +++ b/src/backends/kafka/mod.rs @@ -16,52 +16,39 @@ use std::mem; use std::sync::Mutex; use std::time::Duration; -pub struct KafkaPayload<'a> { - borrowed_message: Option>, - owned_message: Option, +pub enum KafkaPayload<'a> { + Borrowed(BorrowedMessage<'a>), + Owned(OwnedMessage), } impl<'a> KafkaPayload<'a> { - pub fn from_borrowed_message(msg: BorrowedMessage<'a>) -> Self { - Self { - borrowed_message: Some(msg), - owned_message: None, - } + pub fn new(msg: BorrowedMessage<'a>) -> Self { + Self::Borrowed(msg) } - pub fn key(&self) -> Option<&[u8]> { - if self.borrowed_message.is_some() { - return self.borrowed_message?.key(); - } else { - return self.owned_message?.key(); + match self { + Self::Borrowed(ref msg) => msg.key(), + Self::Owned(ref msg) => msg.key(), } } - pub fn headers(&self) -> Option<&OwnedHeaders> { - if self.borrowed_message.is_some() { - return self - .borrowed_message? - .headers() - .map(BorrowedHeaders::detach) - .as_ref(); - } else { - return self.owned_message?.headers(); + pub fn headers(&self) -> Option { + match self { + Self::Borrowed(ref msg) => msg.headers().map(BorrowedHeaders::detach), + Self::Owned(ref msg) => msg.headers().map(|x| x.as_borrowed().detach()), } } pub fn payload(&self) -> Option<&[u8]> { - if self.borrowed_message.is_some() { - return self.borrowed_message?.payload(); - } else { - return self.owned_message?.payload(); + match self { + Self::Borrowed(ref msg) => msg.payload(), + Self::Owned(ref msg) => msg.payload(), } } } - impl<'a> Clone for KafkaPayload<'a> { fn clone(&self) -> KafkaPayload<'a> { - let owned_message = self.borrowed_message.unwrap().detach(); - KafkaPayload { - borrowed_message: None, - owned_message: Some(owned_message), + match self { + Self::Borrowed(ref msg) => Self::Owned(msg.detach()), + Self::Owned(ref msg) => Self::Owned(msg.clone()), } } } @@ -79,7 +66,7 @@ fn create_kafka_message(msg: BorrowedMessage) -> ArroyoMessage { ArroyoMessage::new( partition, msg.offset() as u64, - KafkaPayload::from_borrowed_message(msg), + KafkaPayload::new(msg), DateTime::from_utc(NaiveDateTime::from_timestamp(time_millis, 0), Utc), ) } diff --git a/src/backends/local/mod.rs b/src/backends/local/mod.rs index 1a228cf..5f6a799 100644 --- a/src/backends/local/mod.rs +++ b/src/backends/local/mod.rs @@ -60,6 +60,8 @@ pub struct LocalConsumer { closed: bool, } +type TPayload = Vec; + impl LocalConsumer { pub fn new( id: Uuid, diff --git a/src/backends/mod.rs b/src/backends/mod.rs index 5175bdc..f9d666f 100755 --- a/src/backends/mod.rs +++ b/src/backends/mod.rs @@ -36,6 +36,12 @@ pub enum ConsumeError { ConsumerError, } +#[derive(Clone)] +pub enum Payload<'a> { + Kafka(kafka::KafkaPayload<'a>), + // Local(Vec), +} + /// This is basically an observer pattern to receive the callbacks from /// the consumer when partitions are assigned/revoked. pub trait AssignmentCallbacks: Send + Sync { @@ -77,7 +83,7 @@ pub trait AssignmentCallbacks: Send + Sync { /// occurs even if the consumer retains ownership of the partition across /// assignments.) For this reason, it is generally good practice to ensure /// offsets are committed as part of the revocation callback. -pub trait Consumer { +pub trait Consumer { fn subscribe( &mut self, topic: &[Topic], @@ -93,12 +99,9 @@ pub trait Consumer { /// consumer attempts to read from an invalid location in one of it's /// assigned partitions. (Additional details can be found in the /// docstring for ``Consumer.seek``.) - fn poll<'b>( - &'b self, - timeout: Option, - ) -> Result>, PollError> + fn poll<'b>(&'b self, timeout: Option) -> Result>, PollError> where - TPayload: 'b; + Payload: 'b; /// Pause consuming from the provided partitions. /// @@ -161,13 +164,13 @@ pub trait Consumer { fn closed(&self) -> bool; } -pub trait Producer { +pub trait Producer { /// Produce to a topic or partition. fn produce( &self, destination_topic: Option, destination_partition: Option, - payload: TPayload, + payload: Payload, ); fn close(&self); From 852eb566256d7bb36220a92d50b94078ae3ababb Mon Sep 17 00:00:00 2001 From: Lyn Date: Wed, 15 Jun 2022 15:05:08 -0700 Subject: [PATCH 05/13] wip --- src/backends/kafka/mod.rs | 10 +++------- src/backends/mod.rs | 10 +++++----- 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/src/backends/kafka/mod.rs b/src/backends/kafka/mod.rs index baca9d0..eee8352 100644 --- a/src/backends/kafka/mod.rs +++ b/src/backends/kafka/mod.rs @@ -180,14 +180,10 @@ impl<'a> ArroyoConsumer> for KafkaConsumer { Ok(()) } - fn poll<'b>( - &'b self, + fn poll( + &self, timeout: Option, - ) -> Result>>, PollError> - where - KafkaPayload<'a>: 'b, - 'a: 'b, - { + ) -> Result>>, PollError> { let duration = timeout.unwrap_or(Duration::from_millis(100)); match self.consumer.as_ref() { diff --git a/src/backends/mod.rs b/src/backends/mod.rs index f9d666f..93ba5ff 100755 --- a/src/backends/mod.rs +++ b/src/backends/mod.rs @@ -39,7 +39,7 @@ pub enum ConsumeError { #[derive(Clone)] pub enum Payload<'a> { Kafka(kafka::KafkaPayload<'a>), - // Local(Vec), + Local(Vec), } /// This is basically an observer pattern to receive the callbacks from @@ -83,7 +83,9 @@ pub trait AssignmentCallbacks: Send + Sync { /// occurs even if the consumer retains ownership of the partition across /// assignments.) For this reason, it is generally good practice to ensure /// offsets are committed as part of the revocation callback. -pub trait Consumer { +/// +/// +pub trait Consumer<'a, Payload: Clone> { fn subscribe( &mut self, topic: &[Topic], @@ -99,9 +101,7 @@ pub trait Consumer { /// consumer attempts to read from an invalid location in one of it's /// assigned partitions. (Additional details can be found in the /// docstring for ``Consumer.seek``.) - fn poll<'b>(&'b self, timeout: Option) -> Result>, PollError> - where - Payload: 'b; + fn poll(&self, timeout: Option) -> Result>>, PollError>; /// Pause consuming from the provided partitions. /// From 69f6c07e0f9e69a5e4ee8df5e27670ee54f548d2 Mon Sep 17 00:00:00 2001 From: Lyn Date: Wed, 15 Jun 2022 16:30:43 -0700 Subject: [PATCH 06/13] idk --- src/backends/kafka/mod.rs | 2 +- src/backends/local/mod.rs | 9 +++------ src/backends/mod.rs | 5 +++-- 3 files changed, 7 insertions(+), 9 deletions(-) diff --git a/src/backends/kafka/mod.rs b/src/backends/kafka/mod.rs index eee8352..169dbc5 100644 --- a/src/backends/kafka/mod.rs +++ b/src/backends/kafka/mod.rs @@ -183,7 +183,7 @@ impl<'a> ArroyoConsumer> for KafkaConsumer { fn poll( &self, timeout: Option, - ) -> Result>>, PollError> { + ) -> Result>>, PollError> { let duration = timeout.unwrap_or(Duration::from_millis(100)); match self.consumer.as_ref() { diff --git a/src/backends/local/mod.rs b/src/backends/local/mod.rs index 5f6a799..ebaf4f6 100644 --- a/src/backends/local/mod.rs +++ b/src/backends/local/mod.rs @@ -131,13 +131,10 @@ impl Consumer for LocalConsumer { Ok(()) } - fn poll<'b>( - &'b mut self, + fn poll<'a>( + &'a self, _timeout: Option, - ) -> Result>, PollError> - where - TPayload: 'b, - { + ) -> Result>, PollError> { if self.closed { return Err(PollError::ConsumerClosed); } diff --git a/src/backends/mod.rs b/src/backends/mod.rs index 93ba5ff..0917648 100755 --- a/src/backends/mod.rs +++ b/src/backends/mod.rs @@ -85,7 +85,7 @@ pub trait AssignmentCallbacks: Send + Sync { /// offsets are committed as part of the revocation callback. /// /// -pub trait Consumer<'a, Payload: Clone> { +pub trait Consumer { fn subscribe( &mut self, topic: &[Topic], @@ -101,7 +101,8 @@ pub trait Consumer<'a, Payload: Clone> { /// consumer attempts to read from an invalid location in one of it's /// assigned partitions. (Additional details can be found in the /// docstring for ``Consumer.seek``.) - fn poll(&self, timeout: Option) -> Result>>, PollError>; + fn poll<'a>(&'a self, timeout: Option) + -> Result>, PollError>; /// Pause consuming from the provided partitions. /// From 48061fa7eecc524def3a69847d2fe42d22247c38 Mon Sep 17 00:00:00 2001 From: Lyn Date: Wed, 15 Jun 2022 20:03:31 -0700 Subject: [PATCH 07/13] consumer is not generic --- src/backends/kafka/mod.rs | 4 ++-- src/backends/mod.rs | 5 ++--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/backends/kafka/mod.rs b/src/backends/kafka/mod.rs index 169dbc5..dbef627 100644 --- a/src/backends/kafka/mod.rs +++ b/src/backends/kafka/mod.rs @@ -150,7 +150,7 @@ impl KafkaConsumer { } } -impl<'a> ArroyoConsumer> for KafkaConsumer { +impl ArroyoConsumer for KafkaConsumer { fn subscribe( &mut self, topics: &[Topic], @@ -183,7 +183,7 @@ impl<'a> ArroyoConsumer> for KafkaConsumer { fn poll( &self, timeout: Option, - ) -> Result>>, PollError> { + ) -> Result>>, PollError> { let duration = timeout.unwrap_or(Duration::from_millis(100)); match self.consumer.as_ref() { diff --git a/src/backends/mod.rs b/src/backends/mod.rs index 0917648..d01f397 100755 --- a/src/backends/mod.rs +++ b/src/backends/mod.rs @@ -85,7 +85,7 @@ pub trait AssignmentCallbacks: Send + Sync { /// offsets are committed as part of the revocation callback. /// /// -pub trait Consumer { +pub trait Consumer { fn subscribe( &mut self, topic: &[Topic], @@ -101,8 +101,7 @@ pub trait Consumer { /// consumer attempts to read from an invalid location in one of it's /// assigned partitions. (Additional details can be found in the /// docstring for ``Consumer.seek``.) - fn poll<'a>(&'a self, timeout: Option) - -> Result>, PollError>; + fn poll(&self, timeout: Option) -> Result>, PollError>; /// Pause consuming from the provided partitions. /// From 35c345964d62a1ea71829a98f9c6b0496b4e1d91 Mon Sep 17 00:00:00 2001 From: Lyn Date: Wed, 15 Jun 2022 20:04:30 -0700 Subject: [PATCH 08/13] update local consumer --- src/backends/local/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/backends/local/mod.rs b/src/backends/local/mod.rs index ebaf4f6..bf7589c 100644 --- a/src/backends/local/mod.rs +++ b/src/backends/local/mod.rs @@ -90,7 +90,7 @@ impl LocalConsumer { } } -impl Consumer for LocalConsumer { +impl Consumer for LocalConsumer { fn subscribe( &mut self, topics: &[Topic], From 47bb6b0c9580aa034d0983c815d3cbd8fffcf2e1 Mon Sep 17 00:00:00 2001 From: Lyn Date: Wed, 15 Jun 2022 20:13:04 -0700 Subject: [PATCH 09/13] remove some TPayload --- src/processing/mod.rs | 32 ++++++++++++++++---------------- src/processing/strategies/mod.rs | 9 +++++---- 2 files changed, 21 insertions(+), 20 deletions(-) diff --git a/src/processing/mod.rs b/src/processing/mod.rs index 274762d..eff0117 100644 --- a/src/processing/mod.rs +++ b/src/processing/mod.rs @@ -1,6 +1,6 @@ pub mod strategies; -use crate::backends::{AssignmentCallbacks, Consumer}; +use crate::backends::{AssignmentCallbacks, Consumer, Payload}; use crate::types::{Message, Partition, Topic}; use std::collections::HashMap; use std::mem::replace; @@ -24,16 +24,16 @@ pub enum RunError { PauseError, } -struct Strategies { - processing_factory: Box>, - strategy: Option>>, +struct Strategies { + processing_factory: Box, + strategy: Option>, } -struct Callbacks { - strategies: Arc>>, +struct Callbacks { + strategies: Arc>, } -impl AssignmentCallbacks for Callbacks { +impl AssignmentCallbacks for Callbacks { // TODO: Having the initialization of the strategy here // means that ProcessingStrategy and ProcessingStrategyFactory // have to be Send and Sync, which is really limiting and unnecessary. @@ -57,8 +57,8 @@ impl AssignmentCallbacks for Callbacks { } } -impl Callbacks { - pub fn new(strategies: Arc>>) -> Self { +impl Callbacks { + pub fn new(strategies: Arc>) -> Self { Self { strategies } } } @@ -67,17 +67,17 @@ impl Callbacks { /// instance and a ``ProcessingStrategy``, ensuring that processing /// strategies are instantiated on partition assignment and closed on /// partition revocation. -pub struct StreamProcessor<'a, TPayload: Clone> { - consumer: Box + 'a>, - strategies: Arc>>, - message: Option>, +pub struct StreamProcessor<'a> { + consumer: Box, + strategies: Arc>, + message: Option>>, shutdown_requested: bool, } -impl<'a, TPayload: 'static + Clone> StreamProcessor<'a, TPayload> { +impl<'a> StreamProcessor<'a> { pub fn new( - consumer: Box + 'a>, - processing_factory: Box>, + consumer: Box, + processing_factory: Box, ) -> Self { let strategies = Arc::new(Mutex::new(Strategies { processing_factory, diff --git a/src/processing/strategies/mod.rs b/src/processing/strategies/mod.rs index 950099f..80cc284 100644 --- a/src/processing/strategies/mod.rs +++ b/src/processing/strategies/mod.rs @@ -1,3 +1,4 @@ +use crate::backends::Payload; use crate::types::{Message, Partition, Position}; use std::collections::HashMap; @@ -17,7 +18,7 @@ pub struct CommitRequest { /// /// This interface is intentionally not prescriptive, and affords a /// significant degree of flexibility for the various implementations. -pub trait ProcessingStrategy: Send + Sync { +pub trait ProcessingStrategy: Send + Sync { /// Poll the processor to check on the status of asynchronous tasks or /// perform other scheduled work. /// @@ -40,7 +41,7 @@ pub trait ProcessingStrategy: Send + Sync { /// If the processing strategy is unable to accept a message (due to it /// being at or over capacity, for example), this method will raise a /// ``MessageRejected`` exception. - fn submit(&mut self, message: Message) -> Result<(), MessageRejected>; + fn submit(&mut self, message: Message) -> Result<(), MessageRejected>; /// Close this instance. No more messages should be accepted by the /// instance after this method has been called. @@ -69,10 +70,10 @@ pub trait ProcessingStrategy: Send + Sync { fn join(&mut self, timeout: Option) -> Option; } -pub trait ProcessingStrategyFactory: Send + Sync { +pub trait ProcessingStrategyFactory: Send + Sync { /// Instantiate and return a ``ProcessingStrategy`` instance. /// /// :param commit: A function that accepts a mapping of ``Partition`` /// instances to offset values that should be committed. - fn create(&self) -> Box>; + fn create(&self) -> Box; } From c088997861f8dd0dc2e68233e70ca2bde9dda539 Mon Sep 17 00:00:00 2001 From: Lyn Date: Wed, 15 Jun 2022 20:21:17 -0700 Subject: [PATCH 10/13] update the local consumer --- src/backends/local/mod.rs | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/src/backends/local/mod.rs b/src/backends/local/mod.rs index bf7589c..d65b976 100644 --- a/src/backends/local/mod.rs +++ b/src/backends/local/mod.rs @@ -2,6 +2,7 @@ pub mod broker; use super::storages::{PartitionDoesNotExist, TopicDoesNotExist}; use super::{AssignmentCallbacks, ConsumeError, Consumer, ConsumerClosed, PauseError, PollError}; +use crate::backends::Payload; use crate::types::{Message, Partition, Position, Topic}; use broker::LocalBroker; use std::collections::HashSet; @@ -44,10 +45,10 @@ struct SubscriptionState { last_eof_at: HashMap, } -pub struct LocalConsumer { +pub struct LocalConsumer<'a> { id: Uuid, group: String, - broker: LocalBroker, + broker: LocalBroker>, pending_callback: VecDeque, paused: HashSet, // The offset that a the last ``EndOfPartition`` exception that was @@ -60,12 +61,10 @@ pub struct LocalConsumer { closed: bool, } -type TPayload = Vec; - -impl LocalConsumer { +impl<'a> LocalConsumer<'a> { pub fn new( id: Uuid, - broker: LocalBroker, + broker: LocalBroker>, group: String, enable_end_of_partition: bool, ) -> Self { @@ -90,7 +89,7 @@ impl LocalConsumer { } } -impl Consumer for LocalConsumer { +impl<'a> Consumer for LocalConsumer<'a> { fn subscribe( &mut self, topics: &[Topic], @@ -131,10 +130,7 @@ impl Consumer for LocalConsumer { Ok(()) } - fn poll<'a>( - &'a self, - _timeout: Option, - ) -> Result>, PollError> { + fn poll(&self, _timeout: Option) -> Result>, PollError> { if self.closed { return Err(PollError::ConsumerClosed); } @@ -165,7 +161,7 @@ impl Consumer for LocalConsumer { let keys = self.subscription_state.offsets.keys(); let mut new_offset: Option<(Partition, u64)> = None; - let mut ret_message: Option> = None; + let mut ret_message: Option> = None; for partition in keys.collect::>() { if self.paused.contains(partition) { continue; From 9f70ed2c7ac16bb60950ce2d9346aaa5ca0c285c Mon Sep 17 00:00:00 2001 From: Lyn Date: Thu, 16 Jun 2022 10:12:29 -0700 Subject: [PATCH 11/13] markus fix for kafka payload --- src/backends/kafka/mod.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/backends/kafka/mod.rs b/src/backends/kafka/mod.rs index dbef627..b10b1b8 100644 --- a/src/backends/kafka/mod.rs +++ b/src/backends/kafka/mod.rs @@ -1,5 +1,5 @@ use super::Consumer as ArroyoConsumer; -use super::{AssignmentCallbacks, ConsumeError, ConsumerClosed, PauseError, PollError}; +use super::{AssignmentCallbacks, ConsumeError, ConsumerClosed, PauseError, Payload, PollError}; use crate::types::Message as ArroyoMessage; use crate::types::{Partition, Position, Topic}; use chrono::{DateTime, NaiveDateTime, Utc}; @@ -53,7 +53,7 @@ impl<'a> Clone for KafkaPayload<'a> { } } -fn create_kafka_message(msg: BorrowedMessage) -> ArroyoMessage { +fn create_kafka_message(msg: BorrowedMessage) -> ArroyoMessage { let topic = Topic { name: msg.topic().to_string(), }; @@ -66,7 +66,7 @@ fn create_kafka_message(msg: BorrowedMessage) -> ArroyoMessage { ArroyoMessage::new( partition, msg.offset() as u64, - KafkaPayload::new(msg), + Payload::Kafka(KafkaPayload::new(msg)), DateTime::from_utc(NaiveDateTime::from_timestamp(time_millis, 0), Utc), ) } @@ -183,7 +183,7 @@ impl ArroyoConsumer for KafkaConsumer { fn poll( &self, timeout: Option, - ) -> Result>>, PollError> { + ) -> Result>>, PollError> { let duration = timeout.unwrap_or(Duration::from_millis(100)); match self.consumer.as_ref() { From ecbbc1ddef0bb89eb1fe0e26c6e8fc785af5f116 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Fri, 24 Jun 2022 16:31:46 +0200 Subject: [PATCH 12/13] wip --- src/backends/kafka/mod.rs | 9 ++++----- src/backends/local/broker.rs | 13 +++++++------ src/backends/local/mod.rs | 14 +++++++------- src/backends/mod.rs | 10 +++++++++- src/backends/storages/memory.rs | 27 ++++++++++++++------------- src/backends/storages/mod.rs | 7 ++++--- src/processing/mod.rs | 12 ++++++------ src/types/mod.rs | 18 +++++++++++++++--- 8 files changed, 66 insertions(+), 44 deletions(-) diff --git a/src/backends/kafka/mod.rs b/src/backends/kafka/mod.rs index b10b1b8..0ef54d5 100644 --- a/src/backends/kafka/mod.rs +++ b/src/backends/kafka/mod.rs @@ -43,12 +43,11 @@ impl<'a> KafkaPayload<'a> { Self::Owned(ref msg) => msg.payload(), } } -} -impl<'a> Clone for KafkaPayload<'a> { - fn clone(&self) -> KafkaPayload<'a> { + + pub fn to_owned(&self) -> KafkaPayload<'static> { match self { - Self::Borrowed(ref msg) => Self::Owned(msg.detach()), - Self::Owned(ref msg) => Self::Owned(msg.clone()), + Self::Borrowed(ref msg) => KafkaPayload::Owned(msg.detach()), + Self::Owned(ref msg) => KafkaPayload::Owned(msg.clone()), } } } diff --git a/src/backends/local/broker.rs b/src/backends/local/broker.rs index 6f29b59..7a4a5f2 100644 --- a/src/backends/local/broker.rs +++ b/src/backends/local/broker.rs @@ -1,20 +1,21 @@ use super::SubscriptionError; use crate::backends::storages::{ConsumeError, MessageStorage, TopicDoesNotExist, TopicExists}; +use crate::backends::Payload; use crate::types::{Message, Partition, Topic}; use crate::utils::clock::Clock; use chrono::DateTime; use std::collections::{HashMap, HashSet}; use uuid::Uuid; -pub struct LocalBroker { - storage: Box>, +pub struct LocalBroker { + storage: Box, clock: Box, offsets: HashMap>, subscriptions: HashMap>>, } -impl LocalBroker { - pub fn new(storage: Box>, clock: Box) -> Self { +impl LocalBroker { + pub fn new(storage: Box, clock: Box) -> Self { Self { storage, clock, @@ -34,7 +35,7 @@ impl LocalBroker { pub fn produce( &mut self, partition: &Partition, - payload: TPayload, + payload: Payload, ) -> Result { let time = self.clock.time(); self.storage @@ -126,7 +127,7 @@ impl LocalBroker { &self, partition: &Partition, offset: u64, - ) -> Result>, ConsumeError> { + ) -> Result>, ConsumeError> { self.storage.consume(partition, offset) } diff --git a/src/backends/local/mod.rs b/src/backends/local/mod.rs index d65b976..6bd0303 100644 --- a/src/backends/local/mod.rs +++ b/src/backends/local/mod.rs @@ -45,10 +45,10 @@ struct SubscriptionState { last_eof_at: HashMap, } -pub struct LocalConsumer<'a> { +pub struct LocalConsumer { id: Uuid, group: String, - broker: LocalBroker>, + broker: LocalBroker, pending_callback: VecDeque, paused: HashSet, // The offset that a the last ``EndOfPartition`` exception that was @@ -61,10 +61,10 @@ pub struct LocalConsumer<'a> { closed: bool, } -impl<'a> LocalConsumer<'a> { +impl LocalConsumer { pub fn new( id: Uuid, - broker: LocalBroker>, + broker: LocalBroker, group: String, enable_end_of_partition: bool, ) -> Self { @@ -89,7 +89,7 @@ impl<'a> LocalConsumer<'a> { } } -impl<'a> Consumer for LocalConsumer<'a> { +impl Consumer for LocalConsumer { fn subscribe( &mut self, topics: &[Topic], @@ -330,8 +330,8 @@ mod tests { fn on_revoke(&mut self, _: Vec) {} } - fn build_broker() -> LocalBroker { - let storage: MemoryMessageStorage = Default::default(); + fn build_broker() -> LocalBroker { + let storage: MemoryMessageStorage = Default::default(); let clock = SystemClock {}; let mut broker = LocalBroker::new(Box::new(storage), Box::new(clock)); diff --git a/src/backends/mod.rs b/src/backends/mod.rs index d01f397..26ce124 100755 --- a/src/backends/mod.rs +++ b/src/backends/mod.rs @@ -36,12 +36,20 @@ pub enum ConsumeError { ConsumerError, } -#[derive(Clone)] pub enum Payload<'a> { Kafka(kafka::KafkaPayload<'a>), Local(Vec), } +impl<'a> Payload<'a> { + pub fn to_owned(&self) -> Payload<'static> { + match self { + Payload::Kafka(payload) => Payload::Kafka(payload.to_owned()), + Payload::Local(payload) => Payload::Local(payload.clone()), + } + } +} + /// This is basically an observer pattern to receive the callbacks from /// the consumer when partitions are assigned/revoked. pub trait AssignmentCallbacks: Send + Sync { diff --git a/src/backends/storages/memory.rs b/src/backends/storages/memory.rs index 22aa06c..10922fc 100755 --- a/src/backends/storages/memory.rs +++ b/src/backends/storages/memory.rs @@ -1,16 +1,17 @@ use super::{ConsumeError, MessageStorage, TopicDoesNotExist, TopicExists}; use crate::types::{Message, Partition, Topic}; +use crate::backends::Payload; use chrono::{DateTime, Utc}; use std::cmp::Ordering; use std::collections::HashMap; use std::convert::TryFrom; -struct TopicContent { +struct TopicContent { partition_meta: Vec, - partitions: HashMap>>, + partitions: HashMap>>>, } -impl TopicContent { +impl TopicContent { pub fn new(topic: &Topic, partitions: u16) -> Self { let mut queues = HashMap::new(); let mut part_meta = Vec::new(); @@ -28,19 +29,19 @@ impl TopicContent { } } - fn get_messages(&self, partition: &Partition) -> Result<&Vec>, ConsumeError> { + fn get_messages(&self, partition: &Partition) -> Result<&Vec>, ConsumeError> { if !self.partition_meta.contains(partition) { return Err(ConsumeError::PartitionDoesNotExist); } Ok(&self.partitions[partition]) } - fn add_message(&mut self, message: Message) -> Result<(), ConsumeError> { + fn add_message(&mut self, message: Message) -> Result<(), ConsumeError> { if !self.partition_meta.contains(&message.partition) { return Err(ConsumeError::PartitionDoesNotExist); } let stream = self.partitions.get_mut(&message.partition).unwrap(); - stream.push(message); + stream.push(message.to_owned()); Ok(()) } @@ -53,11 +54,11 @@ impl TopicContent { } } -pub struct MemoryMessageStorage { - topics: HashMap>, +pub struct MemoryMessageStorage { + topics: HashMap, } -impl Default for MemoryMessageStorage { +impl Default for MemoryMessageStorage { fn default() -> Self { MemoryMessageStorage { topics: HashMap::new(), @@ -65,7 +66,7 @@ impl Default for MemoryMessageStorage { } } -impl MessageStorage for MemoryMessageStorage { +impl MessageStorage for MemoryMessageStorage { fn create_topic(&mut self, topic: Topic, partitions: u16) -> Result<(), TopicExists> { if self.topics.contains_key(&topic) { return Err(TopicExists); @@ -117,11 +118,11 @@ impl MessageStorage for MemoryMessageStorage Result>, ConsumeError> { + ) -> Result>, ConsumeError> { let n_offset = usize::try_from(offset).unwrap(); let messages = self.topics[&partition.topic].get_messages(partition)?; match messages.len().cmp(&n_offset) { - Ordering::Greater => Ok(Some(messages[n_offset].clone())), + Ordering::Greater => Ok(Some(messages[n_offset].to_owned())), Ordering::Less => Err(ConsumeError::OffsetOutOfRange), Ordering::Equal => Ok(None), } @@ -130,7 +131,7 @@ impl MessageStorage for MemoryMessageStorage, ) -> Result { let messages = self diff --git a/src/backends/storages/mod.rs b/src/backends/storages/mod.rs index ec10a37..0581c12 100755 --- a/src/backends/storages/mod.rs +++ b/src/backends/storages/mod.rs @@ -1,5 +1,6 @@ pub mod memory; use super::super::types::{Message, Partition, Topic}; +use crate::backends::Payload; use chrono::{DateTime, Utc}; #[derive(Debug, Clone)] @@ -21,7 +22,7 @@ pub enum ConsumeError { OffsetOutOfRange, } -pub trait MessageStorage { +pub trait MessageStorage { // Create a topic with the given number of partitions. // // If the topic already exists, a ``TopicExists`` exception will be @@ -60,7 +61,7 @@ pub trait MessageStorage { &self, partition: &Partition, offset: u64, - ) -> Result>, ConsumeError>; + ) -> Result>, ConsumeError>; // Produce a single message to the provided partition. // @@ -70,7 +71,7 @@ pub trait MessageStorage { fn produce( &mut self, partition: &Partition, - payload: TPayload, + payload: Payload, timestamp: DateTime, ) -> Result; } diff --git a/src/processing/mod.rs b/src/processing/mod.rs index eff0117..374b1a6 100644 --- a/src/processing/mod.rs +++ b/src/processing/mod.rs @@ -67,16 +67,16 @@ impl Callbacks { /// instance and a ``ProcessingStrategy``, ensuring that processing /// strategies are instantiated on partition assignment and closed on /// partition revocation. -pub struct StreamProcessor<'a> { - consumer: Box, +pub struct StreamProcessor { + consumer: Box, strategies: Arc>, - message: Option>>, + message: Option>>, shutdown_requested: bool, } -impl<'a> StreamProcessor<'a> { +impl StreamProcessor { pub fn new( - consumer: Box, + consumer: Box, processing_factory: Box, ) -> Self { let strategies = Arc::new(Mutex::new(Strategies { @@ -115,7 +115,7 @@ impl<'a> StreamProcessor<'a> { let msg = self.consumer.poll(Some(Duration::from_secs(1))); //TODO: Support errors properly match msg { - Ok(m) => self.message = m, + Ok(m) => self.message = m.map(|x| x.to_owned()), Err(_) => return Err(RunError::PollError), } } diff --git a/src/types/mod.rs b/src/types/mod.rs index 3833335..a34c7ac 100755 --- a/src/types/mod.rs +++ b/src/types/mod.rs @@ -1,4 +1,5 @@ use chrono::{DateTime, Utc}; +use crate::backends::Payload; use std::any::type_name; use std::cmp::Eq; use std::fmt; @@ -35,14 +36,14 @@ pub struct Position { } #[derive(Clone, Debug, PartialEq)] -pub struct Message { +pub struct Message { pub partition: Partition, pub offset: u64, pub payload: T, pub timestamp: DateTime, } -impl Message { +impl Message { pub fn new(partition: Partition, offset: u64, payload: T, timestamp: DateTime) -> Self { Self { partition, @@ -56,7 +57,7 @@ impl Message { } } -impl fmt::Display for Message { +impl fmt::Display for Message { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!( f, @@ -68,6 +69,17 @@ impl fmt::Display for Message { } } +impl<'a> Message> { + pub fn to_owned(&self) -> Message> { + Message { + partition: self.partition.clone(), + offset: self.offset.clone(), + payload: self.payload.to_owned(), + timestamp: self.timestamp.clone(), + } + } +} + #[cfg(test)] mod tests { use super::{Message, Partition, Topic}; From 3c959d78d66622b1a599c48e1fac6f79e480a1b1 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Fri, 24 Jun 2022 16:32:17 +0200 Subject: [PATCH 13/13] wip --- src/backends/kafka/mod.rs | 2 +- src/backends/local/mod.rs | 2 +- src/backends/mod.rs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/backends/kafka/mod.rs b/src/backends/kafka/mod.rs index 0ef54d5..d54dc8a 100644 --- a/src/backends/kafka/mod.rs +++ b/src/backends/kafka/mod.rs @@ -180,7 +180,7 @@ impl ArroyoConsumer for KafkaConsumer { } fn poll( - &self, + &mut self, timeout: Option, ) -> Result>>, PollError> { let duration = timeout.unwrap_or(Duration::from_millis(100)); diff --git a/src/backends/local/mod.rs b/src/backends/local/mod.rs index 6bd0303..84f09d3 100644 --- a/src/backends/local/mod.rs +++ b/src/backends/local/mod.rs @@ -130,7 +130,7 @@ impl Consumer for LocalConsumer { Ok(()) } - fn poll(&self, _timeout: Option) -> Result>, PollError> { + fn poll(&mut self, _timeout: Option) -> Result>, PollError> { if self.closed { return Err(PollError::ConsumerClosed); } diff --git a/src/backends/mod.rs b/src/backends/mod.rs index 26ce124..7a2892e 100755 --- a/src/backends/mod.rs +++ b/src/backends/mod.rs @@ -109,7 +109,7 @@ pub trait Consumer { /// consumer attempts to read from an invalid location in one of it's /// assigned partitions. (Additional details can be found in the /// docstring for ``Consumer.seek``.) - fn poll(&self, timeout: Option) -> Result>, PollError>; + fn poll(&mut self, timeout: Option) -> Result>, PollError>; /// Pause consuming from the provided partitions. ///