Skip to content

Commit 744217d

Browse files
[dataflow] LIR mfp planning (#7093)
* allow join planning to determine their mfp use * rework planning to make mfps non-optional
1 parent f1f0d8e commit 744217d

File tree

4 files changed

+74
-80
lines changed

4 files changed

+74
-80
lines changed

src/dataflow/src/render/join/delta_join.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ impl DeltaJoinPlan {
8888
equivalences: &[Vec<MirScalarExpr>],
8989
join_orders: &[Vec<(usize, Vec<MirScalarExpr>)>],
9090
input_mapper: JoinInputMapper,
91-
map_filter_project: MapFilterProject,
91+
map_filter_project: &mut MapFilterProject,
9292
) -> Self {
9393
let number_of_inputs = join_orders.len();
9494

@@ -97,6 +97,8 @@ impl DeltaJoinPlan {
9797
path_plans: Vec::with_capacity(number_of_inputs),
9898
};
9999

100+
let temporal_mfp = map_filter_project.extract_temporal();
101+
100102
// Each source relation will contribute a path to the join plan.
101103
for source_relation in 0..number_of_inputs {
102104
// Construct initial join build state.
@@ -180,6 +182,10 @@ impl DeltaJoinPlan {
180182
});
181183
}
182184

185+
// Now that `map_filter_project` has been capture in the state builder,
186+
// assign the remaining temporal predicates to it, for the caller's use.
187+
*map_filter_project = temporal_mfp;
188+
183189
join_plan
184190
}
185191
}

src/dataflow/src/render/join/linear_join.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,10 @@ impl LinearJoinPlan {
7171
equivalences: &[Vec<MirScalarExpr>],
7272
join_order: &[(usize, Vec<MirScalarExpr>)],
7373
input_mapper: expr::JoinInputMapper,
74-
map_filter_project: MapFilterProject,
74+
map_filter_project: &mut MapFilterProject,
7575
) -> Self {
76+
let temporal_mfp = map_filter_project.extract_temporal();
77+
7678
// Construct initial join build state.
7779
// This state will evolves as we build the join dataflow.
7880
let mut join_build_state = JoinBuildState::new(
@@ -142,6 +144,10 @@ impl LinearJoinPlan {
142144
Some(final_closure)
143145
};
144146

147+
// Now that `map_filter_project` has been capture in the state builder,
148+
// assign the remaining temporal predicates to it, for the caller's use.
149+
*map_filter_project = temporal_mfp;
150+
145151
// Form and return the complete join plan.
146152
LinearJoinPlan {
147153
source_relation,

src/dataflow/src/render/mod.rs

Lines changed: 53 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -777,7 +777,7 @@ pub mod plan {
777777
let (mut mfp, expr) = MapFilterProject::extract_from_expression(expr);
778778
// We attempt to plan what we have remaining, in the context of `mfp`.
779779
// We may not be able to do this, and must wrap some operators with a `Mfp` stage.
780-
let (mfp, plan) = match expr {
780+
let plan = match expr {
781781
// These operators should have been extracted from the expression.
782782
MirRelationExpr::Map { .. } => {
783783
panic!("This operator should have been extracted");
@@ -798,30 +798,27 @@ pub mod plan {
798798
.collect()
799799
}),
800800
};
801-
(Some(mfp), plan)
801+
plan
802802
}
803-
MirRelationExpr::Get { id, typ: _ } => (
803+
MirRelationExpr::Get { id, typ: _ } => {
804804
// This stage can absorb arbitrary MFP operators.
805-
None,
805+
let mfp = mfp.take();
806806
Plan::Get {
807807
id: id.clone(),
808808
mfp,
809-
},
810-
),
809+
}
810+
}
811811
MirRelationExpr::Let { id, value, body } => {
812812
// It would be unfortunate to have a non-trivial `mfp` here, as we hope
813813
// that they would be pushed down. I am not sure if we should take the
814814
// initiative to push down the `mfp` ourselves.
815815
let value = Box::new(Plan::from_mir(value)?);
816816
let body = Box::new(Plan::from_mir(body)?);
817-
(
818-
Some(mfp),
819-
Plan::Let {
820-
id: id.clone(),
821-
value,
822-
body,
823-
},
824-
)
817+
Plan::Let {
818+
id: id.clone(),
819+
value,
820+
body,
821+
}
825822
}
826823
MirRelationExpr::FlatMap {
827824
input,
@@ -835,16 +832,14 @@ pub mod plan {
835832
prepend_mfp_demand(&mut mfp, expr, demand);
836833
}
837834
let input = Box::new(Plan::from_mir(input)?);
838-
(
839-
// This stage can absorb arbitrary MFP instances.
840-
None,
841-
Plan::FlatMap {
842-
input,
843-
func: func.clone(),
844-
exprs: exprs.clone(),
845-
mfp,
846-
},
847-
)
835+
// This stage can absorb arbitrary MFP instances.
836+
let mfp = mfp.take();
837+
Plan::FlatMap {
838+
input,
839+
func: func.clone(),
840+
exprs: exprs.clone(),
841+
mfp,
842+
}
848843
}
849844
MirRelationExpr::Join {
850845
inputs,
@@ -866,37 +861,31 @@ pub mod plan {
866861
plans.push(Plan::from_mir(input)?);
867862
}
868863
// Extract temporal predicates as joins cannot currently absorb them.
869-
let temporal_mfp = mfp.extract_temporal();
870864
let plan = match implementation {
871865
expr::JoinImplementation::Differential((start, _start_arr), order) => {
872866
JoinPlan::Linear(LinearJoinPlan::create_from(
873867
*start,
874868
equivalences,
875869
order,
876870
input_mapper,
877-
mfp,
871+
&mut mfp,
878872
))
879873
}
880874
expr::JoinImplementation::DeltaQuery(orders) => {
881875
JoinPlan::Delta(DeltaJoinPlan::create_from(
882876
equivalences,
883877
&orders[..],
884878
input_mapper,
885-
mfp,
879+
&mut mfp,
886880
))
887881
}
888882
// Other plans are errors, and should be reported as such.
889883
_ => return Err(()),
890884
};
891-
892-
(
893-
// This stage can absorb only non-temporal MFP instances.
894-
Some(temporal_mfp),
895-
Plan::Join {
896-
inputs: plans,
897-
plan,
898-
},
899-
)
885+
Plan::Join {
886+
inputs: plans,
887+
plan,
888+
}
900889
}
901890
MirRelationExpr::Reduce {
902891
input,
@@ -913,14 +902,11 @@ pub mod plan {
913902
*monotonic,
914903
*expected_group_size,
915904
);
916-
(
917-
Some(mfp),
918-
Plan::Reduce {
919-
input,
920-
key_val_plan,
921-
plan: reduce_plan,
922-
},
923-
)
905+
Plan::Reduce {
906+
input,
907+
key_val_plan,
908+
plan: reduce_plan,
909+
}
924910
}
925911
MirRelationExpr::TopK {
926912
input,
@@ -932,60 +918,49 @@ pub mod plan {
932918
} => {
933919
let arity = input.arity();
934920
let input = Box::new(Self::from_mir(input)?);
935-
(
936-
Some(mfp),
937-
Plan::TopK {
938-
input,
939-
group_key: group_key.clone(),
940-
order_key: order_key.clone(),
941-
limit: *limit,
942-
offset: *offset,
943-
monotonic: *monotonic,
944-
arity,
945-
},
946-
)
921+
Plan::TopK {
922+
input,
923+
group_key: group_key.clone(),
924+
order_key: order_key.clone(),
925+
limit: *limit,
926+
offset: *offset,
927+
monotonic: *monotonic,
928+
arity,
929+
}
947930
}
948931
MirRelationExpr::Negate { input } => {
949932
let input = Box::new(Self::from_mir(input)?);
950-
(Some(mfp), Plan::Negate { input })
933+
Plan::Negate { input }
951934
}
952935
MirRelationExpr::Threshold { input } => {
953936
let arity = input.arity();
954937
let input = Box::new(Self::from_mir(input)?);
955-
(Some(mfp), Plan::Threshold { input, arity })
938+
Plan::Threshold { input, arity }
956939
}
957940
MirRelationExpr::Union { base, inputs } => {
958941
let mut plans = Vec::with_capacity(1 + inputs.len());
959942
plans.push(Self::from_mir(base)?);
960943
for input in inputs.iter() {
961944
plans.push(Self::from_mir(input)?)
962945
}
963-
(Some(mfp), Plan::Union { inputs: plans })
946+
Plan::Union { inputs: plans }
964947
}
965948
MirRelationExpr::ArrangeBy { input, keys } => {
966949
let input = Box::new(Self::from_mir(input)?);
967-
(
968-
Some(mfp),
969-
Plan::ArrangeBy {
970-
input,
971-
keys: keys.clone(),
972-
},
973-
)
974-
}
975-
MirRelationExpr::DeclareKeys { input, keys: _ } => {
976-
(Some(mfp), Self::from_mir(input)?)
950+
Plan::ArrangeBy {
951+
input,
952+
keys: keys.clone(),
953+
}
977954
}
955+
MirRelationExpr::DeclareKeys { input, keys: _ } => Self::from_mir(input)?,
978956
};
979957

980-
if let Some(mfp) = mfp {
981-
if !mfp.is_identity() {
982-
Ok(Plan::Mfp {
983-
input: Box::new(plan),
984-
mfp,
985-
})
986-
} else {
987-
Ok(plan)
988-
}
958+
// If the plan stage did not absorb all linear operators, introduce a new stage to implement them.
959+
if !mfp.is_identity() {
960+
Ok(Plan::Mfp {
961+
input: Box::new(plan),
962+
mfp,
963+
})
989964
} else {
990965
Ok(plan)
991966
}

src/expr/src/linear.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,13 @@ impl MapFilterProject {
290290
.project(0..old_projection_len)
291291
}
292292

293+
/// Returns `self`, and leaves behind an identity operator that acts on its output.
294+
pub fn take(&mut self) -> Self {
295+
let mut identity = Self::new(self.projection.len());
296+
std::mem::swap(self, &mut identity);
297+
identity
298+
}
299+
293300
/// Convert the `MapFilterProject` into a staged evaluation plan.
294301
///
295302
/// The main behavior is extract temporal predicates, which cannot be evaluated

0 commit comments

Comments
 (0)