Skip to content

Commit 4f4c15e

Browse files
committed
feat: support replicate_subscription_state option for consumer
1 parent 918c25a commit 4f4c15e

File tree

2 files changed

+10
-0
lines changed

2 files changed

+10
-0
lines changed

src/connection.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1639,6 +1639,7 @@ pub(crate) mod messages {
16391639
.collect(),
16401640
read_compacted: Some(options.read_compacted.unwrap_or(false)),
16411641
initial_position: Some(options.initial_position.into()),
1642+
replicate_subscription_state: options.replicate_subscription_state,
16421643
schema: options.schema,
16431644
start_message_id: options.start_message_id,
16441645
..Default::default()

src/consumer/options.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ pub struct ConsumerOptions {
3131
/// }
3232
/// ```
3333
pub initial_position: InitialPosition,
34+
/// Mark the subscription as "replicated". Pulsar will make sure
35+
/// to periodically sync the state of replicated subscriptions
36+
/// across different clusters (when using geo-replication).
37+
pub replicate_subscription_state: Option<bool>,
3438
}
3539

3640
impl ConsumerOptions {
@@ -76,4 +80,9 @@ impl ConsumerOptions {
7680
self.initial_position = initial_position;
7781
self
7882
}
83+
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
84+
pub fn with_replicate_subscription_state(mut self, replicate_subscription_state: bool) -> Self {
85+
self.replicate_subscription_state = Some(replicate_subscription_state);
86+
self
87+
}
7988
}

0 commit comments

Comments
 (0)