Skip to content

Commit fa29eac

Browse files
committed
feat: collector automatically merge and align multiple collect() called with different schema
1 parent 4577f0c commit fa29eac

File tree

3 files changed

+107
-29
lines changed

3 files changed

+107
-29
lines changed

src/builder/analyzer.rs

Lines changed: 63 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -255,14 +255,54 @@ fn try_merge_collector_schemas(
255255
schema1: &CollectorSchema,
256256
schema2: &CollectorSchema,
257257
) -> Result<CollectorSchema> {
258-
let fields = try_merge_fields_schemas(&schema1.fields, &schema2.fields)?;
258+
// Union all fields from both schemas
259+
let mut field_map: HashMap<FieldName, EnrichedValueType> = HashMap::new();
260+
261+
// Add fields from schema1
262+
for field in &schema1.fields {
263+
field_map.insert(field.name.clone(), field.value_type.clone());
264+
}
265+
266+
// Merge fields from schema2
267+
for field in &schema2.fields {
268+
if let Some(existing_type) = field_map.get(&field.name) {
269+
// Try to merge types if they differ
270+
let merged_type = try_make_common_value_type(existing_type, &field.value_type)?;
271+
field_map.insert(field.name.clone(), merged_type);
272+
} else {
273+
field_map.insert(field.name.clone(), field.value_type.clone());
274+
}
275+
}
276+
277+
// Sort fields by name for consistent ordering
278+
let mut fields: Vec<FieldSchema> = field_map
279+
.into_iter()
280+
.map(|(name, value_type)| FieldSchema {
281+
name,
282+
value_type,
283+
description: None,
284+
})
285+
.collect();
286+
fields.sort_by(|a, b| a.name.cmp(&b.name));
287+
288+
// Handle auto_uuid_field_idx
289+
let auto_uuid_field_idx = match (schema1.auto_uuid_field_idx, schema2.auto_uuid_field_idx) {
290+
(Some(idx1), Some(idx2)) => {
291+
let name1 = &schema1.fields[idx1].name;
292+
let name2 = &schema2.fields[idx2].name;
293+
if name1 == name2 {
294+
// Find the new index of the auto_uuid field
295+
fields.iter().position(|f| &f.name == name1)
296+
} else {
297+
None // Different auto_uuid fields, disable
298+
}
299+
}
300+
_ => None, // If either doesn't have it, or both don't, disable
301+
};
302+
259303
Ok(CollectorSchema {
260304
fields,
261-
auto_uuid_field_idx: if schema1.auto_uuid_field_idx == schema2.auto_uuid_field_idx {
262-
schema1.auto_uuid_field_idx
263-
} else {
264-
None
265-
},
305+
auto_uuid_field_idx,
266306
})
267307
}
268308

@@ -803,16 +843,27 @@ impl AnalyzerContext {
803843
let (struct_mapping, fields_schema) = analyze_struct_mapping(&op.input, op_scope)?;
804844
let has_auto_uuid_field = op.auto_uuid_field.is_some();
805845
let fingerprinter = Fingerprinter::default().with(&fields_schema)?;
846+
let input_field_names = fields_schema.iter().map(|f| f.name.clone()).collect();
847+
let collector_ref = add_collector(
848+
&op.scope_name,
849+
op.collector_name.clone(),
850+
CollectorSchema::from_fields(fields_schema, op.auto_uuid_field.clone()),
851+
op_scope,
852+
)?;
853+
// Get the merged collector schema after adding
854+
let collector_schema: Arc<CollectorSchema> = {
855+
let scope = find_scope(&op.scope_name, op_scope)?.1;
856+
let states = scope.states.lock().unwrap();
857+
let collector = states.collectors.get(&op.collector_name).unwrap();
858+
collector.schema.clone()
859+
};
806860
let collect_op = AnalyzedReactiveOp::Collect(AnalyzedCollectOp {
807861
name: reactive_op.name.clone(),
808862
has_auto_uuid_field,
809863
input: struct_mapping,
810-
collector_ref: add_collector(
811-
&op.scope_name,
812-
op.collector_name.clone(),
813-
CollectorSchema::from_fields(fields_schema, op.auto_uuid_field.clone()),
814-
op_scope,
815-
)?,
864+
input_field_names,
865+
collector_schema,
866+
collector_ref,
816867
fingerprinter,
817868
});
818869
async move { Ok(collect_op) }.boxed()

src/builder/plan.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::base::schema::FieldSchema;
2+
use crate::base::spec::FieldName;
23
use crate::prelude::*;
34

45
use crate::ops::interface::*;
@@ -90,6 +91,8 @@ pub struct AnalyzedCollectOp {
9091
pub name: String,
9192
pub has_auto_uuid_field: bool,
9293
pub input: AnalyzedStructMapping,
94+
pub input_field_names: Vec<FieldName>,
95+
pub collector_schema: Arc<schema::CollectorSchema>,
9396
pub collector_ref: AnalyzedCollectorReference,
9497
/// Fingerprinter of the collector's schema. Used to decide when to reuse auto-generated UUIDs.
9598
pub fingerprinter: Fingerprinter,

src/execution/evaluator.rs

Lines changed: 41 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -483,26 +483,50 @@ async fn evaluate_op_scope(
483483
}
484484

485485
AnalyzedReactiveOp::Collect(op) => {
486-
let mut field_values = Vec::with_capacity(
487-
op.input.fields.len() + if op.has_auto_uuid_field { 1 } else { 0 },
488-
);
489-
let field_values_iter = assemble_input_values(&op.input.fields, scoped_entries);
490-
if op.has_auto_uuid_field {
491-
field_values.push(value::Value::Null);
492-
field_values.extend(field_values_iter);
493-
let uuid = memory.next_uuid(
494-
op.fingerprinter
495-
.clone()
496-
.with(&field_values[1..])?
497-
.into_fingerprint(),
498-
)?;
499-
field_values[0] = value::Value::Basic(value::BasicValue::Uuid(uuid));
500-
} else {
501-
field_values.extend(field_values_iter);
502-
};
503486
let collector_entry = scoped_entries
504487
.headn(op.collector_ref.scope_up_level as usize)
505488
.ok_or_else(|| anyhow::anyhow!("Collector level out of bound"))?;
489+
490+
// Assemble input values
491+
let input_values: Vec<value::Value> =
492+
assemble_input_values(&op.input.fields, scoped_entries).collect();
493+
494+
// Create field_values vector for all fields in the merged schema
495+
let mut field_values: Vec<value::Value> =
496+
vec![value::Value::Null; op.collector_schema.fields.len()];
497+
498+
// Map input fields to their positions in the merged schema
499+
for (i, field_name) in op.input_field_names.iter().enumerate() {
500+
if let Some(pos) = op
501+
.collector_schema
502+
.fields
503+
.iter()
504+
.position(|f| &f.name == field_name)
505+
{
506+
field_values[pos] = input_values[i].clone();
507+
}
508+
}
509+
510+
// Handle auto_uuid_field
511+
if op.has_auto_uuid_field {
512+
if let Some(uuid_idx) = op.collector_schema.auto_uuid_field_idx {
513+
let uuid = memory.next_uuid(
514+
op.fingerprinter
515+
.clone()
516+
.with(
517+
&field_values
518+
.iter()
519+
.enumerate()
520+
.filter(|(i, _)| *i != uuid_idx)
521+
.map(|(_, v)| v)
522+
.collect::<Vec<_>>(),
523+
)?
524+
.into_fingerprint(),
525+
)?;
526+
field_values[uuid_idx] = value::Value::Basic(value::BasicValue::Uuid(uuid));
527+
}
528+
}
529+
506530
{
507531
let mut collected_records = collector_entry.collected_values
508532
[op.collector_ref.local.collector_idx as usize]

0 commit comments

Comments
 (0)