Skip to content

Commit d150498

Browse files
committed
feat: collector automatically merge and align multiple collect() called with different schema
1 parent cd96429 commit d150498

File tree

3 files changed

+150
-13
lines changed

3 files changed

+150
-13
lines changed

src/builder/analyzer.rs

Lines changed: 108 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -255,14 +255,83 @@ fn try_merge_collector_schemas(
255255
schema1: &CollectorSchema,
256256
schema2: &CollectorSchema,
257257
) -> Result<CollectorSchema> {
258-
let fields = try_merge_fields_schemas(&schema1.fields, &schema2.fields)?;
258+
let schema1_fields = &schema1.fields;
259+
let schema2_fields = &schema2.fields;
260+
261+
// Create a map from field name to index in schema1
262+
let field_map: HashMap<FieldName, usize> = schema1_fields
263+
.iter()
264+
.enumerate()
265+
.map(|(i, f)| (f.name.clone(), i))
266+
.collect();
267+
268+
let mut output_fields = Vec::new();
269+
let mut next_field_id_1 = 0;
270+
let mut next_field_id_2 = 0;
271+
272+
for (idx, field) in schema2_fields.iter().enumerate() {
273+
if let Some(&idx1) = field_map.get(&field.name) {
274+
if idx1 < next_field_id_1 {
275+
api_bail!(
276+
"Order mismatch for field '{}': appears at position {} in first schema but expected after {}",
277+
field.name,
278+
idx1,
279+
next_field_id_1
280+
);
281+
}
282+
// Add intervening fields from schema1
283+
for i in next_field_id_1..idx1 {
284+
output_fields.push(schema1_fields[i].clone());
285+
}
286+
// Add intervening fields from schema2
287+
for i in next_field_id_2..idx {
288+
output_fields.push(schema2_fields[i].clone());
289+
}
290+
// Merge the field
291+
let merged_type = try_make_common_value_type(&schema1_fields[idx1].value_type, &field.value_type)?;
292+
output_fields.push(FieldSchema {
293+
name: field.name.clone(),
294+
value_type: merged_type,
295+
description: None,
296+
});
297+
next_field_id_1 = idx1 + 1;
298+
next_field_id_2 = idx + 1;
299+
} else if matches!(field.value_type.typ, ValueType::Basic(BasicValueType::Uuid)) {
300+
// For UUID, emit it immediately to ensure it appears first
301+
output_fields.push(field.clone());
302+
next_field_id_2 = idx + 1;
303+
}
304+
// Fields not in schema1 and not UUID are added at the end
305+
}
306+
307+
// Add remaining fields from schema1
308+
for i in next_field_id_1..schema1_fields.len() {
309+
output_fields.push(schema1_fields[i].clone());
310+
}
311+
312+
// Add remaining fields from schema2
313+
for i in next_field_id_2..schema2_fields.len() {
314+
output_fields.push(schema2_fields[i].clone());
315+
}
316+
317+
// Handle auto_uuid_field_idx
318+
let auto_uuid_field_idx = match (schema1.auto_uuid_field_idx, schema2.auto_uuid_field_idx) {
319+
(Some(idx1), Some(idx2)) => {
320+
let name1 = &schema1_fields[idx1].name;
321+
let name2 = &schema2_fields[idx2].name;
322+
if name1 == name2 {
323+
// Find the position of the auto_uuid field in the merged output
324+
output_fields.iter().position(|f| &f.name == name1)
325+
} else {
326+
None // Different auto_uuid fields, disable
327+
}
328+
}
329+
_ => None, // If either doesn't have it, or both don't, disable
330+
};
331+
259332
Ok(CollectorSchema {
260-
fields,
261-
auto_uuid_field_idx: if schema1.auto_uuid_field_idx == schema2.auto_uuid_field_idx {
262-
schema1.auto_uuid_field_idx
263-
} else {
264-
None
265-
},
333+
fields: output_fields,
334+
auto_uuid_field_idx,
266335
})
267336
}
268337

@@ -803,16 +872,42 @@ impl AnalyzerContext {
803872
let (struct_mapping, fields_schema) = analyze_struct_mapping(&op.input, op_scope)?;
804873
let has_auto_uuid_field = op.auto_uuid_field.is_some();
805874
let fingerprinter = Fingerprinter::default().with(&fields_schema)?;
875+
let input_field_names: Vec<FieldName> =
876+
fields_schema.iter().map(|f| f.name.clone()).collect();
877+
let collector_ref = add_collector(
878+
&op.scope_name,
879+
op.collector_name.clone(),
880+
CollectorSchema::from_fields(fields_schema, op.auto_uuid_field.clone()),
881+
op_scope,
882+
)?;
883+
// Get the merged collector schema after adding
884+
let collector_schema: Arc<CollectorSchema> = {
885+
let scope = find_scope(&op.scope_name, op_scope)?.1;
886+
let states = scope.states.lock().unwrap();
887+
let collector = states.collectors.get(&op.collector_name).unwrap();
888+
collector.schema.clone()
889+
};
890+
891+
// Pre-compute field index mappings for efficient evaluation
892+
let field_index_mapping: Vec<usize> = input_field_names
893+
.iter()
894+
.map(|field_name| {
895+
collector_schema
896+
.fields
897+
.iter()
898+
.position(|f| &f.name == field_name)
899+
.unwrap_or(usize::MAX)
900+
})
901+
.collect();
902+
806903
let collect_op = AnalyzedReactiveOp::Collect(AnalyzedCollectOp {
807904
name: reactive_op.name.clone(),
808905
has_auto_uuid_field,
809906
input: struct_mapping,
810-
collector_ref: add_collector(
811-
&op.scope_name,
812-
op.collector_name.clone(),
813-
CollectorSchema::from_fields(fields_schema, op.auto_uuid_field.clone()),
814-
op_scope,
815-
)?,
907+
input_field_names,
908+
collector_schema,
909+
collector_ref,
910+
field_index_mapping,
816911
fingerprinter,
817912
});
818913
async move { Ok(collect_op) }.boxed()

src/builder/plan.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::base::schema::FieldSchema;
2+
use crate::base::spec::FieldName;
23
use crate::prelude::*;
34

45
use crate::ops::interface::*;
@@ -90,7 +91,12 @@ pub struct AnalyzedCollectOp {
9091
pub name: String,
9192
pub has_auto_uuid_field: bool,
9293
pub input: AnalyzedStructMapping,
94+
pub input_field_names: Vec<FieldName>,
95+
pub collector_schema: Arc<schema::CollectorSchema>,
9396
pub collector_ref: AnalyzedCollectorReference,
97+
/// Pre-computed mapping from input field index to collector field index.
98+
/// For missing fields, the value is usize::MAX.
99+
pub field_index_mapping: Vec<usize>,
94100
/// Fingerprinter of the collector's schema. Used to decide when to reuse auto-generated UUIDs.
95101
pub fingerprinter: Fingerprinter,
96102
}

src/execution/evaluator.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -515,6 +515,42 @@ async fn evaluate_op_scope(
515515
let collector_entry = scoped_entries
516516
.headn(op.collector_ref.scope_up_level as usize)
517517
.ok_or_else(|| anyhow::anyhow!("Collector level out of bound"))?;
518+
519+
// Assemble input values
520+
let input_values: Vec<value::Value> =
521+
assemble_input_values(&op.input.fields, scoped_entries).collect::<Result<Vec<_>>>()?;
522+
523+
// Create field_values vector for all fields in the merged schema
524+
let mut field_values: Vec<value::Value> =
525+
vec![value::Value::Null; op.collector_schema.fields.len()];
526+
527+
// Use pre-computed field index mappings for O(1) field placement
528+
for (i, &collector_field_idx) in op.field_index_mapping.iter().enumerate() {
529+
if collector_field_idx != usize::MAX {
530+
field_values[collector_field_idx] = input_values[i].clone();
531+
}
532+
}
533+
534+
// Handle auto_uuid_field (assumed to be at position 0 for efficiency)
535+
if op.has_auto_uuid_field {
536+
if let Some(uuid_idx) = op.collector_schema.auto_uuid_field_idx {
537+
let uuid = memory.next_uuid(
538+
op.fingerprinter
539+
.clone()
540+
.with(
541+
&field_values
542+
.iter()
543+
.enumerate()
544+
.filter(|(i, _)| *i != uuid_idx)
545+
.map(|(_, v)| v)
546+
.collect::<Vec<_>>(),
547+
)?
548+
.into_fingerprint(),
549+
)?;
550+
field_values[uuid_idx] = value::Value::Basic(value::BasicValue::Uuid(uuid));
551+
}
552+
}
553+
518554
{
519555
let mut collected_records = collector_entry.collected_values
520556
[op.collector_ref.local.collector_idx as usize]

0 commit comments

Comments
 (0)