Skip to content

Commit 7c682b1

Browse files
committed
[BUG] schema: build default with config ef & knn_index, remove #document population in defaults
1 parent 833a982 commit 7c682b1

File tree

11 files changed

+899
-257
lines changed

11 files changed

+899
-257
lines changed

go/pkg/sysdb/coordinator/model/collection_configuration.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@ type ValueTypes struct {
249249
SparseVector *SparseVectorValueType `json:"sparse_vector,omitempty"`
250250
Int *IntValueType `json:"int,omitempty"`
251251
Float *FloatValueType `json:"float,omitempty"`
252-
Boolean *BoolValueType `json:"boolean,omitempty"`
252+
Boolean *BoolValueType `json:"bool,omitempty"`
253253
}
254254

255255
type Schema struct {

go/pkg/sysdb/coordinator/model/collection_configuration_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ func TestUpdateSchemaFromConfig_HnswSuccess(t *testing.T) {
147147
"config": {}
148148
}
149149
},
150-
"boolean": {
150+
"bool": {
151151
"bool_inverted_index": {
152152
"enabled": true,
153153
"config": {}
@@ -312,7 +312,7 @@ func TestUpdateSchemaFromConfig_SpannSuccess(t *testing.T) {
312312
"config": {}
313313
}
314314
},
315-
"boolean": {
315+
"bool": {
316316
"bool_inverted_index": {
317317
"enabled": true,
318318
"config": {}
@@ -481,7 +481,7 @@ func TestUpdateSchemaFromConfig_EmbeddingFunction(t *testing.T) {
481481
"config": {}
482482
}
483483
},
484-
"boolean": {
484+
"bool": {
485485
"bool_inverted_index": {
486486
"enabled": true,
487487
"config": {}

go/pkg/sysdb/coordinator/table_catalog_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1555,7 +1555,7 @@ func TestUpdateCollection_WithSchema(t *testing.T) {
15551555
"config": {}
15561556
}
15571557
},
1558-
"boolean": {
1558+
"bool": {
15591559
"bool_inverted_index": {
15601560
"enabled": true,
15611561
"config": {}

rust/cli/src/commands/vacuum.rs

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use chroma_segment::local_segment_manager::LocalSegmentManager;
1111
use chroma_sqlite::db::SqliteDb;
1212
use chroma_sysdb::SysDb;
1313
use chroma_system::System;
14-
use chroma_types::{CollectionUuid, KnnIndex, ListCollectionsRequest};
14+
use chroma_types::{CollectionUuid, ListCollectionsRequest, Schema};
1515
use clap::Parser;
1616
use colored::Colorize;
1717
use dialoguer::Confirm;
@@ -101,17 +101,18 @@ async fn trigger_vector_segments_max_seq_id_migration(
101101
sqlite: &SqliteDb,
102102
sysdb: &mut SysDb,
103103
segment_manager: &LocalSegmentManager,
104-
default_knn_index: KnnIndex,
105104
) -> Result<(), Box<dyn Error>> {
106105
let collection_ids = get_collection_ids_to_migrate(sqlite).await?;
107106

108107
for collection_id in collection_ids {
109108
let mut collection = sysdb.get_collection_with_segments(collection_id).await?;
110109

111-
collection
112-
.collection
113-
.reconcile_schema_with_config(default_knn_index)
114-
.map_err(|e| Box::new(e) as Box<dyn Error>)?;
110+
if collection.collection.schema.is_none() {
111+
collection.collection.schema = Some(
112+
Schema::try_from(&collection.collection.config)
113+
.map_err(|e| Box::new(e) as Box<dyn Error>)?,
114+
);
115+
}
115116

116117
// If collection is uninitialized, that means nothing has been written yet.
117118
let dim = match collection.collection.dimension {
@@ -153,13 +154,7 @@ pub async fn vacuum_chroma(config: FrontendConfig) -> Result<(), Box<dyn Error>>
153154

154155
println!("Purging the log...\n");
155156

156-
trigger_vector_segments_max_seq_id_migration(
157-
&sqlite,
158-
&mut sysdb,
159-
&segment_manager,
160-
config.default_knn_index,
161-
)
162-
.await?;
157+
trigger_vector_segments_max_seq_id_migration(&sqlite, &mut sysdb, &segment_manager).await?;
163158

164159
let tenant = String::from("default_tenant");
165160
let database = String::from("default_database");

rust/frontend/src/get_collection_with_segments_provider.rs

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,7 @@ use chroma_config::Configurable;
44
use chroma_error::{ChromaError, ErrorCodes};
55
use chroma_sysdb::SysDb;
66
use chroma_types::{
7-
CollectionAndSegments, CollectionUuid, GetCollectionWithSegmentsError, KnnIndex, Schema,
8-
SchemaError,
7+
CollectionAndSegments, CollectionUuid, GetCollectionWithSegmentsError, Schema, SchemaError,
98
};
109
use serde::{Deserialize, Serialize};
1110
use std::{
@@ -143,7 +142,6 @@ impl CollectionsWithSegmentsProvider {
143142
pub(crate) async fn get_collection_with_segments(
144143
&mut self,
145144
collection_id: CollectionUuid,
146-
knn_index: KnnIndex,
147145
) -> Result<CollectionAndSegments, CollectionsWithSegmentsProviderError> {
148146
if let Some(collection_and_segments_with_ttl) = self
149147
.collections_with_segments_cache
@@ -185,14 +183,13 @@ impl CollectionsWithSegmentsProvider {
185183
.await?
186184
};
187185

188-
// reconcile schema and config
189-
let reconciled_schema = Schema::reconcile_schema_and_config(
190-
collection_and_segments_sysdb.collection.schema.as_ref(),
191-
Some(&collection_and_segments_sysdb.collection.config),
192-
knn_index,
193-
)
194-
.map_err(CollectionsWithSegmentsProviderError::InvalidSchema)?;
195-
collection_and_segments_sysdb.collection.schema = Some(reconciled_schema);
186+
if collection_and_segments_sysdb.collection.schema.is_none() {
187+
collection_and_segments_sysdb.collection.schema = Some(
188+
Schema::try_from(&collection_and_segments_sysdb.collection.config)
189+
.map_err(CollectionsWithSegmentsProviderError::InvalidSchema)?,
190+
);
191+
}
192+
196193
self.set_collection_with_segments(collection_and_segments_sysdb.clone())
197194
.await;
198195
Ok(collection_and_segments_sysdb)

rust/frontend/src/impls/service_based_frontend.rs

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ impl ServiceBasedFrontend {
176176
) -> Result<Collection, GetCollectionError> {
177177
Ok(self
178178
.collections_with_segments_provider
179-
.get_collection_with_segments(collection_id, self.default_knn_index)
179+
.get_collection_with_segments(collection_id)
180180
.await
181181
.map_err(|err| Box::new(err) as Box<dyn ChromaError>)?
182182
.collection)
@@ -188,7 +188,7 @@ impl ServiceBasedFrontend {
188188
) -> Result<Option<u32>, GetCollectionError> {
189189
Ok(self
190190
.collections_with_segments_provider
191-
.get_collection_with_segments(collection_id, self.default_knn_index)
191+
.get_collection_with_segments(collection_id)
192192
.await
193193
.map_err(|err| Box::new(err) as Box<dyn ChromaError>)?
194194
.collection
@@ -381,7 +381,7 @@ impl ServiceBasedFrontend {
381381
if self.enable_schema {
382382
for collection in collections.iter_mut() {
383383
collection
384-
.reconcile_schema_with_config(self.default_knn_index)
384+
.reconcile_schema_for_read()
385385
.map_err(GetCollectionsError::InvalidSchema)?;
386386
}
387387
}
@@ -425,7 +425,7 @@ impl ServiceBasedFrontend {
425425
if self.enable_schema {
426426
for collection in &mut collections {
427427
collection
428-
.reconcile_schema_with_config(self.default_knn_index)
428+
.reconcile_schema_for_read()
429429
.map_err(GetCollectionError::InvalidSchema)?;
430430
}
431431
}
@@ -450,7 +450,7 @@ impl ServiceBasedFrontend {
450450

451451
if self.enable_schema {
452452
collection
453-
.reconcile_schema_with_config(self.default_knn_index)
453+
.reconcile_schema_for_read()
454454
.map_err(GetCollectionByCrnError::InvalidSchema)?;
455455
}
456456
Ok(collection)
@@ -630,9 +630,10 @@ impl ServiceBasedFrontend {
630630
// that was retrieved from sysdb, rather than the one that was passed in
631631
if self.enable_schema {
632632
collection
633-
.reconcile_schema_with_config(self.default_knn_index)
633+
.reconcile_schema_for_read()
634634
.map_err(CreateCollectionError::InvalidSchema)?;
635635
}
636+
636637
Ok(collection)
637638
}
638639

@@ -735,7 +736,7 @@ impl ServiceBasedFrontend {
735736
.await?;
736737
collection_and_segments
737738
.collection
738-
.reconcile_schema_with_config(self.default_knn_index)
739+
.reconcile_schema_for_read()
739740
.map_err(ForkCollectionError::InvalidSchema)?;
740741
let collection = collection_and_segments.collection.clone();
741742
let latest_collection_logical_size_bytes = collection_and_segments
@@ -1099,7 +1100,7 @@ impl ServiceBasedFrontend {
10991100
let read_event = if let Some(where_clause) = r#where {
11001101
let collection_and_segments = self
11011102
.collections_with_segments_provider
1102-
.get_collection_with_segments(collection_id, self.default_knn_index)
1103+
.get_collection_with_segments(collection_id)
11031104
.await
11041105
.map_err(|err| Box::new(err) as Box<dyn ChromaError>)?;
11051106
if self.enable_schema {
@@ -1309,7 +1310,7 @@ impl ServiceBasedFrontend {
13091310
) -> Result<CountResponse, QueryError> {
13101311
let collection_and_segments = self
13111312
.collections_with_segments_provider
1312-
.get_collection_with_segments(collection_id, self.default_knn_index)
1313+
.get_collection_with_segments(collection_id)
13131314
.await
13141315
.map_err(|err| Box::new(err) as Box<dyn ChromaError>)?;
13151316
let latest_collection_logical_size_bytes = collection_and_segments
@@ -1424,7 +1425,7 @@ impl ServiceBasedFrontend {
14241425
) -> Result<GetResponse, QueryError> {
14251426
let collection_and_segments = self
14261427
.collections_with_segments_provider
1427-
.get_collection_with_segments(collection_id, self.default_knn_index)
1428+
.get_collection_with_segments(collection_id)
14281429
.await
14291430
.map_err(|err| Box::new(err) as Box<dyn ChromaError>)?;
14301431
if self.enable_schema {
@@ -1569,7 +1570,7 @@ impl ServiceBasedFrontend {
15691570
) -> Result<QueryResponse, QueryError> {
15701571
let collection_and_segments = self
15711572
.collections_with_segments_provider
1572-
.get_collection_with_segments(collection_id, self.default_knn_index)
1573+
.get_collection_with_segments(collection_id)
15731574
.await
15741575
.map_err(|err| Box::new(err) as Box<dyn ChromaError>)?;
15751576
if self.enable_schema {
@@ -1726,7 +1727,7 @@ impl ServiceBasedFrontend {
17261727
// Get collection and segments once for all queries
17271728
let collection_and_segments = self
17281729
.collections_with_segments_provider
1729-
.get_collection_with_segments(request.collection_id, self.default_knn_index)
1730+
.get_collection_with_segments(request.collection_id)
17301731
.await
17311732
.map_err(|err| QueryError::Other(Box::new(err) as Box<dyn ChromaError>))?;
17321733
if self.enable_schema {

rust/log/src/local_compaction_manager.rs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use chroma_sysdb::SysDb;
1515
use chroma_system::Handler;
1616
use chroma_system::{Component, ComponentContext};
1717
use chroma_types::{
18-
Chunk, CollectionUuid, GetCollectionWithSegmentsError, KnnIndex, LogRecord, SchemaError,
18+
Chunk, CollectionUuid, GetCollectionWithSegmentsError, LogRecord, Schema, SchemaError,
1919
};
2020
use serde::{Deserialize, Serialize};
2121
use thiserror::Error;
@@ -141,9 +141,12 @@ impl Handler<BackfillMessage> for LocalCompactionManager {
141141
.get_collection_with_segments(message.collection_id)
142142
.await?;
143143
let schema_previously_persisted = collection_and_segments.collection.schema.is_some();
144-
collection_and_segments
145-
.collection
146-
.reconcile_schema_with_config(KnnIndex::Hnsw)?;
144+
if !schema_previously_persisted {
145+
collection_and_segments.collection.schema = Some(
146+
Schema::try_from(&collection_and_segments.collection.config)
147+
.map_err(CompactionManagerError::SchemaReconcileError)?,
148+
);
149+
}
147150
// If collection is uninitialized, that means nothing has been written yet.
148151
let dim = match collection_and_segments.collection.dimension {
149152
Some(dim) => dim,
@@ -267,7 +270,12 @@ impl Handler<PurgeLogsMessage> for LocalCompactionManager {
267270
.get_collection_with_segments(message.collection_id)
268271
.await?;
269272
let mut collection = collection_segments.collection.clone();
270-
collection.reconcile_schema_with_config(KnnIndex::Hnsw)?;
273+
if collection.schema.is_none() {
274+
collection.schema = Some(
275+
Schema::try_from(&collection.config)
276+
.map_err(CompactionManagerError::SchemaReconcileError)?,
277+
);
278+
}
271279
// If dimension is None, that means nothing has been written yet.
272280
let dim = match collection.dimension {
273281
Some(dim) => dim,

rust/segment/src/distributed_hnsw.rs

Lines changed: 34 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use chroma_index::hnsw_provider::{
88
HnswIndexProviderOpenError, HnswIndexRef,
99
};
1010
use chroma_index::{Index, IndexUuid};
11-
use chroma_types::{Collection, HnswParametersFromSegmentError, SegmentUuid};
11+
use chroma_types::{Collection, HnswParametersFromSegmentError, Schema, SchemaError, SegmentUuid};
1212
use chroma_types::{MaterializedLogOperation, Segment};
1313
use std::collections::HashMap;
1414
use std::fmt::Debug;
@@ -55,6 +55,8 @@ pub enum DistributedHNSWSegmentFromSegmentError {
5555
MissingHnswConfiguration,
5656
#[error("Could not parse HNSW configuration: {0}")]
5757
InvalidHnswConfiguration(#[from] HnswParametersFromSegmentError),
58+
#[error("Invalid schema: {0}")]
59+
InvalidSchema(#[source] SchemaError),
5860
}
5961

6062
impl ChromaError for DistributedHNSWSegmentFromSegmentError {
@@ -72,6 +74,7 @@ impl ChromaError for DistributedHNSWSegmentFromSegmentError {
7274
DistributedHNSWSegmentFromSegmentError::InvalidHnswConfiguration(_) => {
7375
ErrorCodes::Internal
7476
}
77+
DistributedHNSWSegmentFromSegmentError::InvalidSchema(e) => e.code(),
7578
}
7679
}
7780
}
@@ -96,9 +99,14 @@ impl DistributedHNSWSegmentWriter {
9699
hnsw_index_provider: HnswIndexProvider,
97100
) -> Result<Box<DistributedHNSWSegmentWriter>, Box<DistributedHNSWSegmentFromSegmentError>>
98101
{
99-
let hnsw_configuration = collection
100-
.config
101-
.get_hnsw_config_with_legacy_fallback(segment)
102+
let schema = if let Some(schema) = &collection.schema {
103+
schema
104+
} else {
105+
&Schema::try_from(&collection.config)
106+
.map_err(DistributedHNSWSegmentFromSegmentError::InvalidSchema)?
107+
};
108+
let hnsw_configuration = schema
109+
.get_internal_hnsw_config_with_legacy_fallback(segment)
102110
.map_err(DistributedHNSWSegmentFromSegmentError::InvalidHnswConfiguration)?
103111
.ok_or(DistributedHNSWSegmentFromSegmentError::MissingHnswConfiguration)?;
104112

@@ -313,9 +321,13 @@ impl DistributedHNSWSegmentReader {
313321
hnsw_index_provider: HnswIndexProvider,
314322
) -> Result<Box<DistributedHNSWSegmentReader>, Box<DistributedHNSWSegmentFromSegmentError>>
315323
{
316-
let hnsw_configuration = collection
317-
.config
318-
.get_hnsw_config_with_legacy_fallback(segment)
324+
let schema = collection.schema.as_ref().ok_or_else(|| {
325+
DistributedHNSWSegmentFromSegmentError::InvalidSchema(SchemaError::InvalidSchema {
326+
reason: "Schema is None".to_string(),
327+
})
328+
})?;
329+
let hnsw_configuration = schema
330+
.get_internal_hnsw_config_with_legacy_fallback(segment)
319331
.map_err(DistributedHNSWSegmentFromSegmentError::InvalidHnswConfiguration)?
320332
.ok_or(DistributedHNSWSegmentFromSegmentError::MissingHnswConfiguration)?;
321333

@@ -394,7 +406,7 @@ pub mod test {
394406
use chroma_index::{HnswIndexConfig, DEFAULT_MAX_ELEMENTS};
395407
use chroma_types::{
396408
Collection, CollectionUuid, InternalCollectionConfiguration, InternalHnswConfiguration,
397-
Segment, SegmentUuid,
409+
Schema, Segment, SegmentUuid,
398410
};
399411
use tempfile::tempdir;
400412
use uuid::Uuid;
@@ -423,18 +435,18 @@ pub mod test {
423435
config.persist_path,
424436
Some(persist_path.to_str().unwrap().to_string())
425437
);
438+
let config = InternalCollectionConfiguration {
439+
vector_index: chroma_types::VectorIndexConfiguration::Hnsw(InternalHnswConfiguration {
440+
max_neighbors: 10,
441+
..Default::default()
442+
}),
443+
embedding_function: None,
444+
};
426445

427446
// Try partial override
428447
let collection = Collection {
429-
config: InternalCollectionConfiguration {
430-
vector_index: chroma_types::VectorIndexConfiguration::Hnsw(
431-
InternalHnswConfiguration {
432-
max_neighbors: 10,
433-
..Default::default()
434-
},
435-
),
436-
embedding_function: None,
437-
},
448+
config: config.clone(),
449+
schema: Some(Schema::try_from(&config).unwrap()),
438450
..Default::default()
439451
};
440452

@@ -448,9 +460,12 @@ pub mod test {
448460
};
449461

450462
let hnsw_params = collection
451-
.config
452-
.get_hnsw_config_with_legacy_fallback(&segment)
463+
.schema
464+
.as_ref()
465+
.map(|schema| schema.get_internal_hnsw_config_with_legacy_fallback(&segment))
466+
.transpose()
453467
.unwrap()
468+
.flatten()
454469
.unwrap();
455470
let config = HnswIndexConfig::new_persistent(
456471
hnsw_params.max_neighbors,

0 commit comments

Comments
 (0)