@@ -255,14 +255,71 @@ fn try_merge_collector_schemas(
255255 schema1 : & CollectorSchema ,
256256 schema2 : & CollectorSchema ,
257257) -> Result < CollectorSchema > {
258- let fields = try_merge_fields_schemas ( & schema1. fields , & schema2. fields ) ?;
258+ // Union all fields from both schemas
259+ let mut field_map: HashMap < FieldName , EnrichedValueType > = HashMap :: new ( ) ;
260+
261+ // Add fields from schema1
262+ for field in & schema1. fields {
263+ field_map. insert ( field. name . clone ( ) , field. value_type . clone ( ) ) ;
264+ }
265+
266+ // Merge fields from schema2
267+ for field in & schema2. fields {
268+ if let Some ( existing_type) = field_map. get ( & field. name ) {
269+ // Try to merge types if they differ
270+ let merged_type = try_make_common_value_type ( existing_type, & field. value_type ) ?;
271+ field_map. insert ( field. name . clone ( ) , merged_type) ;
272+ } else {
273+ field_map. insert ( field. name . clone ( ) , field. value_type . clone ( ) ) ;
274+ }
275+ }
276+
277+ // Sort fields by name for consistent ordering, but prioritize UUID fields
278+ let mut fields: Vec < FieldSchema > = field_map
279+ . into_iter ( )
280+ . map ( |( name, value_type) | FieldSchema {
281+ name,
282+ value_type,
283+ description : None ,
284+ } )
285+ . collect ( ) ;
286+
287+ // Prioritize UUID fields by placing them at the beginning for efficiency
288+ fields. sort_by ( |a, b| {
289+ let a_is_uuid = matches ! ( a. value_type. typ, ValueType :: Basic ( BasicValueType :: Uuid ) ) ;
290+ let b_is_uuid = matches ! ( b. value_type. typ, ValueType :: Basic ( BasicValueType :: Uuid ) ) ;
291+
292+ match ( a_is_uuid, b_is_uuid) {
293+ ( true , false ) => std:: cmp:: Ordering :: Less , // UUID fields first
294+ ( false , true ) => std:: cmp:: Ordering :: Greater , // UUID fields first
295+ _ => a. name . cmp ( & b. name ) , // Then alphabetical
296+ }
297+ } ) ;
298+
299+ // Handle auto_uuid_field_idx (UUID fields are now at position 0 for efficiency)
300+ let auto_uuid_field_idx = match ( schema1. auto_uuid_field_idx , schema2. auto_uuid_field_idx ) {
301+ ( Some ( idx1) , Some ( idx2) ) => {
302+ let name1 = & schema1. fields [ idx1] . name ;
303+ let name2 = & schema2. fields [ idx2] . name ;
304+ if name1 == name2 {
305+ // UUID fields are prioritized to position 0, so check if first field is UUID
306+ if fields. first ( ) . map_or ( false , |f| {
307+ matches ! ( f. value_type. typ, ValueType :: Basic ( BasicValueType :: Uuid ) )
308+ } ) {
309+ Some ( 0 )
310+ } else {
311+ fields. iter ( ) . position ( |f| & f. name == name1)
312+ }
313+ } else {
314+ None // Different auto_uuid fields, disable
315+ }
316+ }
317+ _ => None , // If either doesn't have it, or both don't, disable
318+ } ;
319+
259320 Ok ( CollectorSchema {
260321 fields,
261- auto_uuid_field_idx : if schema1. auto_uuid_field_idx == schema2. auto_uuid_field_idx {
262- schema1. auto_uuid_field_idx
263- } else {
264- None
265- } ,
322+ auto_uuid_field_idx,
266323 } )
267324}
268325
@@ -803,16 +860,42 @@ impl AnalyzerContext {
803860 let ( struct_mapping, fields_schema) = analyze_struct_mapping ( & op. input , op_scope) ?;
804861 let has_auto_uuid_field = op. auto_uuid_field . is_some ( ) ;
805862 let fingerprinter = Fingerprinter :: default ( ) . with ( & fields_schema) ?;
863+ let input_field_names: Vec < FieldName > =
864+ fields_schema. iter ( ) . map ( |f| f. name . clone ( ) ) . collect ( ) ;
865+ let collector_ref = add_collector (
866+ & op. scope_name ,
867+ op. collector_name . clone ( ) ,
868+ CollectorSchema :: from_fields ( fields_schema, op. auto_uuid_field . clone ( ) ) ,
869+ op_scope,
870+ ) ?;
871+ // Get the merged collector schema after adding
872+ let collector_schema: Arc < CollectorSchema > = {
873+ let scope = find_scope ( & op. scope_name , op_scope) ?. 1 ;
874+ let states = scope. states . lock ( ) . unwrap ( ) ;
875+ let collector = states. collectors . get ( & op. collector_name ) . unwrap ( ) ;
876+ collector. schema . clone ( )
877+ } ;
878+
879+ // Pre-compute field index mappings for efficient evaluation
880+ let field_index_mapping: Vec < usize > = input_field_names
881+ . iter ( )
882+ . map ( |field_name| {
883+ collector_schema
884+ . fields
885+ . iter ( )
886+ . position ( |f| & f. name == field_name)
887+ . unwrap_or ( usize:: MAX )
888+ } )
889+ . collect ( ) ;
890+
806891 let collect_op = AnalyzedReactiveOp :: Collect ( AnalyzedCollectOp {
807892 name : reactive_op. name . clone ( ) ,
808893 has_auto_uuid_field,
809894 input : struct_mapping,
810- collector_ref : add_collector (
811- & op. scope_name ,
812- op. collector_name . clone ( ) ,
813- CollectorSchema :: from_fields ( fields_schema, op. auto_uuid_field . clone ( ) ) ,
814- op_scope,
815- ) ?,
895+ input_field_names,
896+ collector_schema,
897+ collector_ref,
898+ field_index_mapping,
816899 fingerprinter,
817900 } ) ;
818901 async move { Ok ( collect_op) } . boxed ( )
0 commit comments