Skip to content
This repository was archived by the owner on Nov 8, 2023. It is now read-only.

Commit 3db7930

Browse files
committed
feat: Consumer that does nothing except poll
Does nothing except poll and commit offsets every 100k messages. This consumer does not use any processing strategy or stream processor. The goal of this is to benchmark it's performance against the noop consumer.
1 parent 9fa01ec commit 3db7930

File tree

2 files changed

+161
-0
lines changed

2 files changed

+161
-0
lines changed

Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,3 +32,7 @@ path = "src/bin/consumer/passthrough.rs"
3232
[[bin]]
3333
name = "noop-consumer"
3434
path = "src/bin/consumer/noop.rs"
35+
36+
[[bin]]
37+
name = "consumer-only"
38+
path = "src/bin/consumer/consumer_only.rs"

src/bin/consumer/consumer_only.rs

Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
extern crate rust_arroyo;
2+
3+
use clap::{App, Arg};
4+
use rust_arroyo::backends::kafka::config::KafkaConfig;
5+
use rust_arroyo::backends::kafka::types::KafkaPayload;
6+
use rust_arroyo::backends::kafka::KafkaConsumer;
7+
use rust_arroyo::backends::AssignmentCallbacks;
8+
use rust_arroyo::backends::Consumer;
9+
use rust_arroyo::processing::strategies::ProcessingStrategy;
10+
use rust_arroyo::processing::strategies::{noop, ProcessingStrategyFactory};
11+
use rust_arroyo::types::{Partition, Position, Topic};
12+
use std::collections::HashMap;
13+
use std::time::Duration;
14+
15+
struct EmptyCallbacks {}
16+
impl AssignmentCallbacks for EmptyCallbacks {
17+
fn on_assign(&mut self, _: HashMap<Partition, u64>) {}
18+
fn on_revoke(&mut self, _: Vec<Partition>) {}
19+
}
20+
21+
struct StrategyFactory {
22+
batch_time: u64,
23+
}
24+
impl ProcessingStrategyFactory<KafkaPayload> for StrategyFactory {
25+
fn create(&self) -> Box<dyn ProcessingStrategy<KafkaPayload>> {
26+
Box::new(noop::new(Duration::from_millis(self.batch_time)))
27+
}
28+
}
29+
30+
fn main() {
31+
let matches = App::new("noop consumer")
32+
.version(option_env!("CARGO_PKG_VERSION").unwrap_or(""))
33+
.about("Simple noop consumer")
34+
.arg(
35+
Arg::with_name("brokers")
36+
.short("b")
37+
.long("brokers")
38+
.help("Broker list in kafka format")
39+
.takes_value(true)
40+
.default_value("localhost:9092"),
41+
)
42+
.arg(
43+
Arg::with_name("group-id")
44+
.short("g")
45+
.long("group-id")
46+
.help("Consumer group id")
47+
.takes_value(true)
48+
.default_value("example_consumer_group_id"),
49+
)
50+
.arg(
51+
Arg::with_name("log-conf")
52+
.long("log-conf")
53+
.help("Configure the logging format (example: 'rdkafka=trace')")
54+
.takes_value(true),
55+
)
56+
.arg(
57+
Arg::with_name("source-topic")
58+
.long("source-topic")
59+
.help("source topic name")
60+
.default_value("test_source")
61+
.takes_value(true),
62+
)
63+
.arg(
64+
Arg::with_name("batch-time")
65+
.long("batch-time")
66+
.help("time of the batch for flushing")
67+
.default_value("100")
68+
.takes_value(true),
69+
)
70+
.arg(
71+
Arg::with_name("offset-reset")
72+
.long("offset-reset")
73+
.help("kafka auto.offset.reset param")
74+
.default_value("earliest")
75+
.takes_value(true),
76+
)
77+
.get_matches();
78+
79+
env_logger::init();
80+
let source_topic = matches.value_of("source-topic").unwrap();
81+
let offset_reset = matches.value_of("offset-reset").unwrap();
82+
let brokers = matches.value_of("brokers").unwrap();
83+
let group_id = matches.value_of("group-id").unwrap();
84+
85+
let config = KafkaConfig::new_consumer_config(
86+
vec![brokers.to_string()],
87+
group_id.to_string(),
88+
offset_reset.to_string(),
89+
false,
90+
None,
91+
);
92+
let mut consumer = KafkaConsumer::new(config);
93+
let topic = Topic {
94+
name: source_topic.to_string(),
95+
};
96+
97+
struct EmptyCallbacks {}
98+
impl AssignmentCallbacks for EmptyCallbacks {
99+
fn on_assign(&mut self, _: HashMap<Partition, u64>) {}
100+
fn on_revoke(&mut self, _: Vec<Partition>) {}
101+
}
102+
103+
consumer
104+
.subscribe(&[topic.clone()], Box::new(EmptyCallbacks {}))
105+
.unwrap();
106+
107+
let mut last_offset: u64 = 0;
108+
109+
loop {
110+
let res = consumer.poll(None);
111+
match res {
112+
Err(e) => println!("{}", e),
113+
Ok(val) => match val {
114+
Some(message) => {
115+
// Do nothing
116+
last_offset = message.offset;
117+
118+
// Commit every 100k messages
119+
if last_offset % 100_000 == 0 {
120+
let mut positions = HashMap::new();
121+
positions.insert(
122+
Partition {
123+
topic: topic.clone(),
124+
index: 0, // One partition hardcoded
125+
},
126+
Position {
127+
offset: last_offset + 1,
128+
timestamp: chrono::Utc::now(),
129+
},
130+
);
131+
}
132+
}
133+
None => {
134+
if last_offset == 0 {
135+
continue;
136+
}
137+
println!("Timed out waiting for message, committing offsets");
138+
// If we got here we should have burned the backlog
139+
let mut positions = HashMap::new();
140+
positions.insert(
141+
Partition {
142+
topic: topic.clone(),
143+
index: 0, // One partition hardcoded
144+
},
145+
Position {
146+
offset: last_offset + 1,
147+
timestamp: chrono::Utc::now(),
148+
},
149+
);
150+
consumer.stage_positions(positions).unwrap();
151+
152+
consumer.commit_positions().unwrap();
153+
}
154+
},
155+
}
156+
}
157+
}

0 commit comments

Comments
 (0)