Skip to content

Commit 59d9502

Browse files
committed
feat: collector automatically merge and align multiple collect() called with different schema
1 parent 7de2f5c commit 59d9502

File tree

3 files changed

+160
-13
lines changed

3 files changed

+160
-13
lines changed

src/builder/analyzer.rs

Lines changed: 117 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -255,14 +255,87 @@ fn try_merge_collector_schemas(
255255
schema1: &CollectorSchema,
256256
schema2: &CollectorSchema,
257257
) -> Result<CollectorSchema> {
258-
let fields = try_merge_fields_schemas(&schema1.fields, &schema2.fields)?;
258+
let schema1_fields = &schema1.fields;
259+
let schema2_fields = &schema2.fields;
260+
261+
// Create a map from field name to index in schema1
262+
let field_map: HashMap<FieldName, usize> = schema1_fields
263+
.iter()
264+
.enumerate()
265+
.map(|(i, f)| (f.name.clone(), i))
266+
.collect();
267+
268+
let mut output_fields = Vec::new();
269+
let mut next_field_id_1 = 0;
270+
let mut next_field_id_2 = 0;
271+
272+
for (idx, field) in schema2_fields.iter().enumerate() {
273+
if let Some(&idx1) = field_map.get(&field.name) {
274+
if idx1 < next_field_id_1 {
275+
api_bail!(
276+
"Order mismatch for field '{}': appears at position {} in first schema but expected after '{}'",
277+
field.name,
278+
idx1,
279+
schema1_fields[next_field_id_1 - 1].name
280+
);
281+
}
282+
// Add intervening fields from schema1
283+
for i in next_field_id_1..idx1 {
284+
output_fields.push(schema1_fields[i].clone());
285+
}
286+
// Add intervening fields from schema2
287+
for i in next_field_id_2..idx {
288+
output_fields.push(schema2_fields[i].clone());
289+
}
290+
// Merge the field
291+
let merged_type =
292+
try_make_common_value_type(&schema1_fields[idx1].value_type, &field.value_type)?;
293+
output_fields.push(FieldSchema {
294+
name: field.name.clone(),
295+
value_type: merged_type,
296+
description: None,
297+
});
298+
next_field_id_1 = idx1 + 1;
299+
next_field_id_2 = idx + 1;
300+
// Fields not in schema1 and not UUID are added at the end
301+
}
302+
}
303+
304+
// Add remaining fields from schema1
305+
for i in next_field_id_1..schema1_fields.len() {
306+
output_fields.push(schema1_fields[i].clone());
307+
}
308+
309+
// Add remaining fields from schema2
310+
for i in next_field_id_2..schema2_fields.len() {
311+
output_fields.push(schema2_fields[i].clone());
312+
}
313+
314+
// Handle auto_uuid_field_idx
315+
let auto_uuid_field_idx = match (schema1.auto_uuid_field_idx, schema2.auto_uuid_field_idx) {
316+
(Some(idx1), Some(idx2)) => {
317+
let name1 = &schema1_fields[idx1].name;
318+
let name2 = &schema2_fields[idx2].name;
319+
if name1 == name2 {
320+
// Find the position of the auto_uuid field in the merged output
321+
output_fields.iter().position(|f| &f.name == name1)
322+
} else {
323+
return api_bail!(
324+
"Different auto_uuid fields are not allowed: '{}' vs '{}'",
325+
name1,
326+
name2
327+
);
328+
}
329+
}
330+
(Some(_), None) | (None, Some(_)) => {
331+
return api_bail!("Inconsistent auto_uuid field presence is not allowed");
332+
}
333+
(None, None) => None,
334+
};
335+
259336
Ok(CollectorSchema {
260-
fields,
261-
auto_uuid_field_idx: if schema1.auto_uuid_field_idx == schema2.auto_uuid_field_idx {
262-
schema1.auto_uuid_field_idx
263-
} else {
264-
None
265-
},
337+
fields: output_fields,
338+
auto_uuid_field_idx,
266339
})
267340
}
268341

@@ -803,16 +876,47 @@ impl AnalyzerContext {
803876
let (struct_mapping, fields_schema) = analyze_struct_mapping(&op.input, op_scope)?;
804877
let has_auto_uuid_field = op.auto_uuid_field.is_some();
805878
let fingerprinter = Fingerprinter::default().with(&fields_schema)?;
879+
let input_field_names: Vec<FieldName> =
880+
fields_schema.iter().map(|f| f.name.clone()).collect();
881+
let collector_ref = add_collector(
882+
&op.scope_name,
883+
op.collector_name.clone(),
884+
CollectorSchema::from_fields(fields_schema, op.auto_uuid_field.clone()),
885+
op_scope,
886+
)?;
887+
// Get the merged collector schema after adding
888+
let collector_schema: Arc<CollectorSchema> = {
889+
let scope = find_scope(&op.scope_name, op_scope)?.1;
890+
let states = scope.states.lock().unwrap();
891+
let collector = states.collectors.get(&op.collector_name).unwrap();
892+
collector.schema.clone()
893+
};
894+
895+
// Pre-compute field index mappings for efficient evaluation
896+
let field_name_to_index: HashMap<&FieldName, usize> = collector_schema
897+
.fields
898+
.iter()
899+
.enumerate()
900+
.map(|(i, f)| (&f.name, i))
901+
.collect();
902+
let field_index_mapping: Vec<usize> = input_field_names
903+
.iter()
904+
.map(|field_name| {
905+
field_name_to_index
906+
.get(field_name)
907+
.copied()
908+
.unwrap_or(usize::MAX)
909+
})
910+
.collect();
911+
806912
let collect_op = AnalyzedReactiveOp::Collect(AnalyzedCollectOp {
807913
name: reactive_op.name.clone(),
808914
has_auto_uuid_field,
809915
input: struct_mapping,
810-
collector_ref: add_collector(
811-
&op.scope_name,
812-
op.collector_name.clone(),
813-
CollectorSchema::from_fields(fields_schema, op.auto_uuid_field.clone()),
814-
op_scope,
815-
)?,
916+
input_field_names,
917+
collector_schema,
918+
collector_ref,
919+
field_index_mapping,
816920
fingerprinter,
817921
});
818922
async move { Ok(collect_op) }.boxed()

src/builder/plan.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::base::schema::FieldSchema;
2+
use crate::base::spec::FieldName;
23
use crate::prelude::*;
34

45
use crate::ops::interface::*;
@@ -90,7 +91,12 @@ pub struct AnalyzedCollectOp {
9091
pub name: String,
9192
pub has_auto_uuid_field: bool,
9293
pub input: AnalyzedStructMapping,
94+
pub input_field_names: Vec<FieldName>,
95+
pub collector_schema: Arc<schema::CollectorSchema>,
9396
pub collector_ref: AnalyzedCollectorReference,
97+
/// Pre-computed mapping from input field index to collector field index.
98+
/// For missing fields, the value is usize::MAX.
99+
pub field_index_mapping: Vec<usize>,
94100
/// Fingerprinter of the collector's schema. Used to decide when to reuse auto-generated UUIDs.
95101
pub fingerprinter: Fingerprinter,
96102
}

src/execution/evaluator.rs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -515,6 +515,43 @@ async fn evaluate_op_scope(
515515
let collector_entry = scoped_entries
516516
.headn(op.collector_ref.scope_up_level as usize)
517517
.ok_or_else(|| anyhow::anyhow!("Collector level out of bound"))?;
518+
519+
// Assemble input values
520+
let input_values: Vec<value::Value> =
521+
assemble_input_values(&op.input.fields, scoped_entries)
522+
.collect::<Result<Vec<_>>>()?;
523+
524+
// Create field_values vector for all fields in the merged schema
525+
let mut field_values: Vec<value::Value> =
526+
vec![value::Value::Null; op.collector_schema.fields.len()];
527+
528+
// Use pre-computed field index mappings for O(1) field placement
529+
for (i, &collector_field_idx) in op.field_index_mapping.iter().enumerate() {
530+
if collector_field_idx != usize::MAX {
531+
field_values[collector_field_idx] = input_values[i].clone();
532+
}
533+
}
534+
535+
// Handle auto_uuid_field (assumed to be at position 0 for efficiency)
536+
if op.has_auto_uuid_field {
537+
if let Some(uuid_idx) = op.collector_schema.auto_uuid_field_idx {
538+
let uuid = memory.next_uuid(
539+
op.fingerprinter
540+
.clone()
541+
.with(
542+
&field_values
543+
.iter()
544+
.enumerate()
545+
.filter(|(i, _)| *i != uuid_idx)
546+
.map(|(_, v)| v)
547+
.collect::<Vec<_>>(),
548+
)?
549+
.into_fingerprint(),
550+
)?;
551+
field_values[uuid_idx] = value::Value::Basic(value::BasicValue::Uuid(uuid));
552+
}
553+
}
554+
518555
{
519556
let mut collected_records = collector_entry.collected_values
520557
[op.collector_ref.local.collector_idx as usize]

0 commit comments

Comments
 (0)