Skip to content

Temporal bucket: pass through data close to frontier #33018

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

antiguru
Copy link
Member

Adds a threshold parameter that influences which data we enter in the bucket chain, and which data we pass through verbatim. It is a path summary that we apply on the input frontier, and we only retain data that has a timestamp not less or equal to the advanced frontier.

@antiguru antiguru requested a review from a team as a code owner July 14, 2025 08:55
Copy link
Contributor

@petrosagg petrosagg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The regressions of this PR were quite big, some up to 76%. With this PR we will be passing through all data that is within 2 seconds of the frontier. The pressing question then is: how does MZ behave when there is data beyond 2 seconds of the frontier with and without temporal buckets?

My fear is that this PR will fix the current benchmarks just because we don't exercise this case. How do we know we don't have a general regression?

@@ -462,7 +462,7 @@ pub fn build_compute_dataflow<A: Allocate>(
for (id, (oks, errs)) in imported_sources.into_iter() {
let oks = if ENABLE_TEMPORAL_BUCKETING.get(&compute_state.worker_config) {
oks.inner
.bucket::<CapacityContainerBuilder<_>>()
.bucket::<CapacityContainerBuilder<_>>(2000.into())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a dyncfg?

.flat_map(|f| threshold.results_in(f)).collect();
while let Some((time, data)) = input.next() {
// Skip data that is about to be revealed.
let extracted = data.extract_if(.., |(_, t, _)| !threshold.less_than(t));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By using less_than we are making it impossible to express the previous behavior. I think this should be less_equal so that when supplying the boundary value T::Summary::default() we don't extract anything and we send everything in the chain.

antiguru added 3 commits July 15, 2025 17:09
Adds a threshold parameter that influences which data we enter in the
bucket chain, and which data we pass through verbatim. It is a path summary
that we apply on the input frontier, and we only retain data
that has a timestamp not less or equal to the advanced frontier.

Signed-off-by: Moritz Hoffmann <[email protected]>
@antiguru antiguru force-pushed the temporal_bucket_threshold branch from e23926f to f841020 Compare July 15, 2025 15:09
@antiguru antiguru marked this pull request as draft July 15, 2025 15:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants