Skip to content

Commit 16c9146

Browse files
sichengSicheng-Pan
authored andcommitted
[ENH] Batch get info from sysdb for compaction scheduler
1 parent ad7dd6b commit 16c9146

File tree

1 file changed

+108
-90
lines changed

1 file changed

+108
-90
lines changed

rust/worker/src/compactor/scheduler.rs

Lines changed: 108 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::collections::HashSet;
1+
use std::collections::{HashMap, HashSet};
22
use std::str::FromStr;
33

44
use chroma_config::assignment::assignment_policy::AssignmentPolicy;
@@ -96,94 +96,94 @@ impl Scheduler {
9696
&mut self,
9797
collections: Vec<CollectionInfo>,
9898
) -> Vec<CollectionRecord> {
99-
let mut collection_records = Vec::new();
100-
for collection_info in collections {
101-
if self
102-
.disabled_collections
103-
.contains(&collection_info.collection_id)
104-
{
105-
tracing::info!(
106-
"Ignoring collection: {:?} because it disabled for compaction",
107-
collection_info.collection_id
108-
);
109-
continue;
110-
}
111-
// TODO: add a cache to avoid fetching the same collection multiple times
112-
let result = self
113-
.sysdb
114-
.get_collections(GetCollectionsOptions {
115-
collection_id: Some(collection_info.collection_id),
116-
..Default::default()
117-
})
118-
.await;
99+
let mut disabled_collections = Vec::new();
100+
let collection_id_to_log_info = collections
101+
.into_iter()
102+
.filter_map(|collection| {
103+
if self
104+
.disabled_collections
105+
.contains(&collection.collection_id)
106+
{
107+
disabled_collections.push(collection);
108+
None
109+
} else {
110+
Some((collection.collection_id, collection))
111+
}
112+
})
113+
.collect::<HashMap<_, _>>();
119114

120-
match result {
121-
Ok(collection) => {
122-
if collection.is_empty() {
123-
self.deleted_collections
124-
.insert(collection_info.collection_id);
125-
continue;
126-
}
115+
if !disabled_collections.is_empty() {
116+
tracing::info!(
117+
"Disabled collections are excluded from compaction: {disabled_collections:?}"
118+
);
119+
}
127120

128-
// TODO: make querying the last compaction time in batch
129-
let log_position_in_collection = collection[0].log_position;
130-
let tenant_ids = vec![collection[0].tenant.clone()];
131-
let tenant = self.sysdb.get_last_compaction_time(tenant_ids).await;
132-
133-
let last_compaction_time = match tenant {
134-
Ok(tenant) => {
135-
if tenant.is_empty() {
136-
tracing::info!(
137-
"Ignoring collection: {:?}",
138-
collection_info.collection_id
139-
);
140-
continue;
141-
}
142-
tenant[0].last_compaction_time
143-
}
144-
Err(e) => {
145-
tracing::error!("Error: {:?}", e);
146-
// Ignore this collection id for this compaction iteration
147-
tracing::info!(
148-
"Ignoring collection: {:?}",
149-
collection_info.collection_id
150-
);
151-
continue;
152-
}
153-
};
154-
155-
let mut offset = collection_info.first_log_offset;
156-
// offset in log is the first offset in the log that has not been compacted. Note that
157-
// since the offset is the first offset of log we get from the log service, we should
158-
// use this offset to pull data from the log service.
159-
if log_position_in_collection + 1 < offset {
160-
panic!(
161-
"offset in sysdb ({}) is less than offset in log ({}) for {}",
162-
log_position_in_collection + 1,
163-
offset,
164-
collection[0].collection_id,
165-
)
166-
} else {
167-
// The offset in sysdb is the last offset that has been compacted.
168-
// We need to start from the next offset.
169-
offset = log_position_in_collection + 1;
170-
}
121+
let collection_id_to_sysdb_info = match self
122+
.sysdb
123+
.get_collections(GetCollectionsOptions {
124+
collection_ids: Some(collection_id_to_log_info.keys().cloned().collect()),
125+
..Default::default()
126+
})
127+
.await
128+
{
129+
Ok(collections) => collections
130+
.into_iter()
131+
.map(|collection| (collection.collection_id, collection))
132+
.collect::<HashMap<_, _>>(),
133+
Err(err) => {
134+
tracing::error!("Unable to fetch collection information from sysdb: {err}");
135+
return Vec::new();
136+
}
137+
};
171138

172-
collection_records.push(CollectionRecord {
173-
collection_id: collection[0].collection_id,
174-
tenant_id: collection[0].tenant.clone(),
175-
last_compaction_time,
176-
first_record_time: collection_info.first_log_ts,
177-
offset,
178-
collection_version: collection[0].version,
179-
collection_logical_size_bytes: collection[0].size_bytes_post_compaction,
180-
});
181-
}
182-
Err(e) => {
183-
tracing::error!("Error: {:?}", e);
139+
let sysdb_tenants = collection_id_to_sysdb_info
140+
.values()
141+
.map(|collection| collection.tenant.clone())
142+
.collect();
143+
144+
let tenant_to_last_compaction_time =
145+
match self.sysdb.get_last_compaction_time(sysdb_tenants).await {
146+
Ok(tenants) => tenants
147+
.into_iter()
148+
.map(|tenant| (tenant.id, tenant.last_compaction_time))
149+
.collect::<HashMap<_, _>>(),
150+
Err(err) => {
151+
tracing::error!(
152+
"Unable to fetch tenant last compaction time from sysdb: {err}"
153+
);
154+
return Vec::new();
184155
}
156+
};
157+
158+
let mut collection_records = Vec::new();
159+
for (collection_id, log_info) in collection_id_to_log_info {
160+
let Some(sysdb_info) = collection_id_to_sysdb_info.get(&collection_id) else {
161+
self.deleted_collections.insert(collection_id);
162+
continue;
163+
};
164+
let Some(last_compaction_time) = tenant_to_last_compaction_time
165+
.get(&sysdb_info.tenant)
166+
.cloned()
167+
else {
168+
tracing::error!("Unable to find last compaction info for tenant [{}]. Skipping compaction for collection [{collection_id}]", &sysdb_info.tenant);
169+
continue;
170+
};
171+
let next_log_to_compact = sysdb_info.log_position.saturating_add(1);
172+
if next_log_to_compact < log_info.first_log_offset {
173+
tracing::error!("Next log to compact for collection [{collection_id}] at [{next_log_to_compact}] is already purged in log service: the first available log is at [{}]", log_info.first_log_offset);
174+
continue;
185175
}
176+
collection_records.push(CollectionRecord {
177+
collection_id,
178+
tenant_id: sysdb_info.tenant.clone(),
179+
last_compaction_time,
180+
first_record_time: log_info.first_log_ts,
181+
offset: next_log_to_compact,
182+
collection_version: sysdb_info.version,
183+
collection_logical_size_bytes: sysdb_info.size_bytes_post_compaction,
184+
});
186185
}
186+
187187
collection_records
188188
}
189189

@@ -505,8 +505,7 @@ mod tests {
505505
}
506506

507507
#[tokio::test]
508-
#[should_panic(expected = "is less than offset")]
509-
async fn test_scheduler_panic() {
508+
async fn test_scheduler_invalid_offsets() {
510509
let mut log = Log::InMemory(InMemoryLog::new());
511510
let in_memory_log = match log {
512511
Log::InMemory(ref mut in_memory_log) => in_memory_log,
@@ -600,17 +599,13 @@ mod tests {
600599
},
601600
},
602601
);
603-
let _ = log
604-
.update_collection_log_offset(&tenant_1, collection_uuid_1, 2)
605-
.await;
606-
607602
let mut sysdb = SysDb::Test(TestSysDb::new());
608603

609604
match sysdb {
610605
SysDb::Test(ref mut sysdb) => {
611606
sysdb.add_collection(collection_1);
612607
let last_compaction_time_1 = 2;
613-
sysdb.add_tenant_last_compaction_time(tenant_1, last_compaction_time_1);
608+
sysdb.add_tenant_last_compaction_time(tenant_1.clone(), last_compaction_time_1);
614609
}
615610
_ => panic!("Invalid sysdb type"),
616611
}
@@ -626,6 +621,27 @@ mod tests {
626621
let mut assignment_policy = Box::new(RendezvousHashingAssignmentPolicy::default());
627622
assignment_policy.set_members(vec![my_member.member_id.clone()]);
628623

624+
let mut scheduler = Scheduler::new(
625+
my_member.member_id.clone(),
626+
log.clone(),
627+
sysdb.clone(),
628+
scheduler_policy.clone(),
629+
max_concurrent_jobs,
630+
1,
631+
assignment_policy.clone(),
632+
HashSet::new(),
633+
);
634+
635+
scheduler.set_memberlist(vec![my_member.clone()]);
636+
scheduler.schedule().await;
637+
let jobs = scheduler.get_jobs();
638+
assert_eq!(jobs.count(), 1);
639+
640+
// Update compaction offset in log so that it's ahead of the value in sysdb
641+
let _ = log
642+
.update_collection_log_offset(&tenant_1, collection_uuid_1, 2)
643+
.await;
644+
629645
let mut scheduler = Scheduler::new(
630646
my_member.member_id.clone(),
631647
log,
@@ -639,5 +655,7 @@ mod tests {
639655

640656
scheduler.set_memberlist(vec![my_member.clone()]);
641657
scheduler.schedule().await;
658+
let jobs = scheduler.get_jobs();
659+
assert_eq!(jobs.count(), 0);
642660
}
643661
}

0 commit comments

Comments
 (0)