Skip to content

Commit c89d417

Browse files
authored
Merge pull request #223 from YilunAllenChen/list-offsets
list offsets
2 parents d8f001f + 4a73098 commit c89d417

File tree

4 files changed

+286
-1
lines changed

4 files changed

+286
-1
lines changed

src/client/mod.rs

Lines changed: 107 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@ use std::time::{Duration, Instant};
1515

1616
// pub re-export
1717
pub use crate::compression::Compression;
18+
use crate::protocol::list_offset::ListOffsetVersion;
1819
pub use crate::utils::PartitionOffset;
20+
use crate::utils::TimestampedPartitionOffset;
1921

2022
#[cfg(feature = "security")]
2123
pub use self::network::SecurityConfig;
@@ -930,7 +932,111 @@ impl KafkaClient {
930932
Ok(res)
931933
}
932934

933-
/// Takes ownership back from the given `HashMap` Entry.
935+
/// Fetch offsets for a list of topics
936+
///
937+
/// # Examples
938+
///
939+
/// ```no_run
940+
/// use kafka::client::KafkaClient;
941+
///
942+
/// let mut client = KafkaClient::new(vec!["localhost:9092".to_owned()]);
943+
/// client.load_metadata_all().unwrap();
944+
/// let topics = vec!["test-topic".to_string()];
945+
/// let offsets = client.list_offsets(&s, FetchOffset::ByTime(1698425676797));
946+
/// ```
947+
///
948+
/// Returns a mapping of topic name to `TimestampedPartitionOffset`s.
949+
/// Each entry in the vector represents the timestamp, and the corresponding offset,
950+
/// that Kafka finds to be the first message with timestamp *later* than the passed in
951+
/// FetchOffset parameter.
952+
/// example: Ok({"test-topic": [TimestampedPartitionOffset { offset: 20, partition: 0, time: 1698425676798 } ]
953+
/// Note that the message might not be exactly at the given timestamp.
954+
pub fn list_offsets<T: AsRef<str>>(
955+
&mut self,
956+
topics: &[T],
957+
offset: FetchOffset,
958+
) -> Result<HashMap<String, Vec<TimestampedPartitionOffset>>> {
959+
let api_ver = ListOffsetVersion::V1;
960+
let time = offset.to_kafka_value();
961+
let n_topics = topics.len();
962+
963+
let state = &mut self.state;
964+
let correlation = state.next_correlation_id();
965+
966+
// Map topic and partition to the corresponding broker
967+
let config = &self.config;
968+
let mut reqs: HashMap<&str, protocol::ListOffsetsRequest<'_>> =
969+
HashMap::with_capacity(n_topics);
970+
for topic in topics {
971+
let topic = topic.as_ref();
972+
if let Some(ps) = state.partitions_for(topic) {
973+
for (id, host) in ps
974+
.iter()
975+
.filter_map(|(id, p)| p.broker(state).map(|b| (id, b.host())))
976+
{
977+
let entry = reqs.entry(host).or_insert_with(|| {
978+
protocol::ListOffsetsRequest::new(correlation, api_ver, &config.client_id)
979+
});
980+
entry.add(topic, id, time);
981+
}
982+
}
983+
}
984+
985+
// Call each broker with the request formed earlier
986+
let now = Instant::now();
987+
let mut res: HashMap<String, Vec<TimestampedPartitionOffset>> =
988+
HashMap::with_capacity(n_topics);
989+
for (host, req) in reqs {
990+
let resp = __send_receive::<_, protocol::ListOffsetsResponse>(
991+
&mut self.conn_pool,
992+
host,
993+
now,
994+
req,
995+
)?;
996+
for tp in resp.topics {
997+
let mut entry = res.entry(tp.topic);
998+
let mut new_resp_offsets = None;
999+
let mut err = None;
1000+
// Use an explicit scope here to allow insertion into a vacant entry
1001+
// below
1002+
{
1003+
let resp_offsets = match entry {
1004+
hash_map::Entry::Occupied(ref mut e) => e.get_mut(),
1005+
hash_map::Entry::Vacant(_) => {
1006+
new_resp_offsets = Some(Vec::new());
1007+
new_resp_offsets.as_mut().unwrap()
1008+
}
1009+
};
1010+
for p in tp.partitions {
1011+
let pto: TimestampedPartitionOffset = match p.to_offset() {
1012+
Ok(po) => po,
1013+
Err(code) => {
1014+
err = Some((p.partition, code));
1015+
break;
1016+
}
1017+
};
1018+
resp_offsets.push(pto);
1019+
}
1020+
}
1021+
if let Some((partition, code)) = err {
1022+
let topic = KafkaClient::get_key_from_entry(entry);
1023+
return Err(Error::TopicPartitionError {
1024+
topic_name: topic,
1025+
partition_id: partition,
1026+
error_code: code,
1027+
});
1028+
}
1029+
if let hash_map::Entry::Vacant(e) = entry {
1030+
// unwrap is ok because if it is Vacant, it would have
1031+
// been made into a Some above
1032+
e.insert(new_resp_offsets.unwrap());
1033+
}
1034+
}
1035+
}
1036+
Ok(res)
1037+
}
1038+
1039+
/// Takes ownership back from the given HashMap Entry.
9341040
fn get_key_from_entry<'a, K: 'a, V: 'a>(entry: hash_map::Entry<'a, K, V>) -> K {
9351041
match entry {
9361042
hash_map::Entry::Occupied(e) => e.remove_entry().0,

src/protocol/list_offset.rs

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
use std::io::{Read, Write};
2+
3+
use crate::codecs::{FromByte, ToByte};
4+
use crate::error::{KafkaCode, Result};
5+
use crate::utils::TimestampedPartitionOffset;
6+
7+
use super::{HeaderRequest, HeaderResponse, API_KEY_OFFSET};
8+
9+
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
10+
pub enum ListOffsetVersion {
11+
// currently only support 1
12+
V1 = 1,
13+
}
14+
15+
16+
/// https://kafka.apache.org/protocol.html#The_Messages_ListOffsets
17+
#[derive(Debug)]
18+
pub struct ListOffsetsRequest<'a> {
19+
pub header: HeaderRequest<'a>,
20+
pub replica: i32,
21+
pub topics: Vec<TopicListOffsetsRequest<'a>>,
22+
}
23+
24+
#[derive(Debug)]
25+
pub struct TopicListOffsetsRequest<'a> {
26+
pub topic: &'a str,
27+
pub partitions: Vec<PartitionListOffsetsRequest>,
28+
}
29+
30+
#[derive(Default, Debug)]
31+
pub struct PartitionListOffsetsRequest {
32+
pub partition: i32,
33+
pub time: i64,
34+
}
35+
36+
impl<'a> ListOffsetsRequest<'a> {
37+
pub fn new(
38+
correlation_id: i32,
39+
version: ListOffsetVersion,
40+
client_id: &'a str,
41+
) -> ListOffsetsRequest<'a> {
42+
ListOffsetsRequest {
43+
header: HeaderRequest::new(API_KEY_OFFSET, version as i16, correlation_id, client_id),
44+
replica: -1,
45+
topics: vec![],
46+
}
47+
}
48+
49+
pub fn add(&mut self, topic: &'a str, partition: i32, time: i64) {
50+
for tp in &mut self.topics {
51+
if tp.topic == topic {
52+
tp.add(partition, time);
53+
return;
54+
}
55+
}
56+
let mut tp = TopicListOffsetsRequest::new(topic);
57+
tp.add(partition, time);
58+
self.topics.push(tp);
59+
}
60+
}
61+
62+
impl<'a> TopicListOffsetsRequest<'a> {
63+
fn new(topic: &'a str) -> TopicListOffsetsRequest<'a> {
64+
TopicListOffsetsRequest {
65+
topic,
66+
partitions: vec![],
67+
}
68+
}
69+
fn add(&mut self, partition: i32, time: i64) {
70+
self.partitions
71+
.push(PartitionListOffsetsRequest::new(partition, time));
72+
}
73+
}
74+
75+
impl PartitionListOffsetsRequest {
76+
fn new(partition: i32, time: i64) -> PartitionListOffsetsRequest {
77+
PartitionListOffsetsRequest { partition, time }
78+
}
79+
}
80+
81+
impl<'a> ToByte for ListOffsetsRequest<'a> {
82+
fn encode<T: Write>(&self, buffer: &mut T) -> Result<()> {
83+
try_multi!(
84+
self.header.encode(buffer),
85+
self.replica.encode(buffer),
86+
self.topics.encode(buffer)
87+
)
88+
}
89+
}
90+
91+
impl<'a> ToByte for TopicListOffsetsRequest<'a> {
92+
fn encode<T: Write>(&self, buffer: &mut T) -> Result<()> {
93+
try_multi!(self.topic.encode(buffer), self.partitions.encode(buffer))
94+
}
95+
}
96+
97+
impl ToByte for PartitionListOffsetsRequest {
98+
fn encode<T: Write>(&self, buffer: &mut T) -> Result<()> {
99+
try_multi!(self.partition.encode(buffer), self.time.encode(buffer))
100+
}
101+
}
102+
103+
// -------------------------------------
104+
105+
#[derive(Default, Debug)]
106+
pub struct ListOffsetsResponse {
107+
pub header: HeaderResponse,
108+
pub topics: Vec<TopicListOffsetsResponse>,
109+
}
110+
111+
#[derive(Default, Debug)]
112+
pub struct TopicListOffsetsResponse {
113+
pub topic: String,
114+
pub partitions: Vec<TimestampedPartitionOffsetListOffsetsResponse>,
115+
}
116+
117+
#[derive(Default, Debug)]
118+
pub struct TimestampedPartitionOffsetListOffsetsResponse {
119+
pub partition: i32,
120+
pub error_code: i16,
121+
pub timestamp: i64,
122+
pub offset: i64,
123+
}
124+
125+
impl TimestampedPartitionOffsetListOffsetsResponse {
126+
pub fn to_offset(&self) -> std::result::Result<TimestampedPartitionOffset, KafkaCode> {
127+
match KafkaCode::from_protocol(self.error_code) {
128+
Some(code) => Err(code),
129+
None => Ok(TimestampedPartitionOffset {
130+
partition: self.partition,
131+
offset: self.offset,
132+
time: self.timestamp,
133+
}),
134+
}
135+
}
136+
}
137+
138+
impl FromByte for ListOffsetsResponse {
139+
type R = ListOffsetsResponse;
140+
141+
#[allow(unused_must_use)]
142+
fn decode<T: Read>(&mut self, buffer: &mut T) -> Result<()> {
143+
try_multi!(self.header.decode(buffer), self.topics.decode(buffer))
144+
}
145+
}
146+
147+
impl FromByte for TopicListOffsetsResponse {
148+
type R = TopicListOffsetsResponse;
149+
150+
#[allow(unused_must_use)]
151+
fn decode<T: Read>(&mut self, buffer: &mut T) -> Result<()> {
152+
try_multi!(self.topic.decode(buffer), self.partitions.decode(buffer))
153+
}
154+
}
155+
156+
impl FromByte for TimestampedPartitionOffsetListOffsetsResponse {
157+
type R = TimestampedPartitionOffsetListOffsetsResponse;
158+
159+
#[allow(unused_must_use)]
160+
fn decode<T: Read>(&mut self, buffer: &mut T) -> Result<()> {
161+
try_multi!(
162+
self.partition.decode(buffer),
163+
self.error_code.decode(buffer),
164+
self.timestamp.decode(buffer),
165+
self.offset.decode(buffer)
166+
)
167+
}
168+
}

src/protocol/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ pub mod consumer;
1818
pub mod metadata;
1919
pub mod offset;
2020
pub mod produce;
21+
pub mod list_offset;
2122

2223
pub mod fetch;
2324
mod zreader;
@@ -32,6 +33,7 @@ pub use self::fetch::FetchRequest;
3233
pub use self::metadata::{MetadataRequest, MetadataResponse};
3334
pub use self::offset::{OffsetRequest, OffsetResponse};
3435
pub use self::produce::{ProduceRequest, ProduceResponse};
36+
pub use self::list_offset::{ListOffsetsRequest, ListOffsetsResponse};
3537

3638
// --------------------------------------------------------------------
3739

src/utils.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,12 @@ pub struct PartitionOffset {
99
pub offset: i64,
1010
pub partition: i32,
1111
}
12+
13+
/// A retrieved offset for a particular partition in the context of an
14+
/// already known topic, specific to a timestamp.
15+
#[derive(Debug, Hash, PartialEq, Eq)]
16+
pub struct TimestampedPartitionOffset {
17+
pub offset: i64,
18+
pub partition: i32,
19+
pub time: i64
20+
}

0 commit comments

Comments
 (0)