Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
534d766
adapter: Peek sequencing in Adapter Frontend
ggevay Aug 24, 2025
88c08cd
Tweak transient id generation and update tests
ggevay Sep 25, 2025
cc5a49f
More test tweaks
ggevay Sep 25, 2025
daa00a5
Address some comments
ggevay Sep 27, 2025
938c136
Change TODOs to TODO(peek-seq) and minor things
ggevay Oct 6, 2025
8680621
Move the call logic into instance::Client and delete instance::Client…
ggevay Oct 6, 2025
459ecfd
Address various minor comments
ggevay Oct 6, 2025
8a25df9
Always pass read holds in to Instance::peek
ggevay Oct 8, 2025
2670d21
Fix read holds for constant indexes
ggevay Oct 8, 2025
6647062
Temporarily opt out of the frontend peek sequencing in tests that rel…
ggevay Oct 9, 2025
84e1f13
Opt out of the frontend peek sequencing in test_expected_spans
ggevay Oct 9, 2025
e6ea059
Opt out of the frontend peek sequencing in test_peek_on_dropped_index…
ggevay Oct 10, 2025
965c140
Unsafe session invariant fix
ggevay Oct 13, 2025
0410171
Avoid unsafe by moving try_frontend_peek_inner from SessionClient to …
ggevay Oct 13, 2025
cc7e9d2
Avoid involving GuardedReceiver
ggevay Oct 13, 2025
bdc25a2
minor
ggevay Oct 13, 2025
ce9c8b0
Encapsulate catalog_snapshot
ggevay Oct 13, 2025
99761e4
cargo fmt
ggevay Oct 13, 2025
162a3a2
Make recursion_limit.slt easier
ggevay Oct 13, 2025
8439295
Simplify diff checking in implement_fast_path_peek_plan
ggevay Oct 13, 2025
67218ba
Refactor acquire_read_holds_and_collection_write_frontiers
ggevay Oct 13, 2025
a4848d6
minor
ggevay Oct 13, 2025
1a63da0
Move enable_frontend_peek_sequencing from PeekClient to SessionClient
ggevay Oct 13, 2025
6e53e90
Remove the SessionReinserter stuff
ggevay Oct 13, 2025
5ea723d
Add support for PeekResponse::Stashed
ggevay Oct 14, 2025
a5bd4c0
Add support for FastPathPlan::PeekPersist
ggevay Oct 14, 2025
7e0c829
Reduce duplication in implement_fast_path_peek_plan
ggevay Oct 14, 2025
4c53c7d
Opt out of the frontend peek sequencing in test_github_25388
ggevay Oct 15, 2025
d81d4c2
Address some minor comments
ggevay Oct 16, 2025
dbe764c
Handle Plan::SideEffectingFunc gracefeully in the new peek sequencing
ggevay Oct 17, 2025
38d9a21
Error handling 1
ggevay Oct 17, 2025
187dbaa
Error handling 2
ggevay Oct 18, 2025
136ce84
Error handling 3
ggevay Oct 18, 2025
0f60b1e
Error handling 4: Move CollectionMissing from the compute controller …
ggevay Oct 18, 2025
20f9dd1
Error handling 5: Delete ReadHoldError
ggevay Oct 18, 2025
7158325
Error handling 6: Make StorageCollections::collections_frontiers retu…
ggevay Oct 18, 2025
ef1e4fc
Error handling 7: Make StorageCollections::collection_metadata also r…
ggevay Oct 18, 2025
69a8298
Error handling 8
ggevay Oct 18, 2025
cabe87d
Error handling 9: Make conversions to ConcurrentDependencyDrop explicit
ggevay Oct 18, 2025
a6dff21
cargo fmt and clippy
ggevay Oct 18, 2025
ffb308a
minor
ggevay Oct 18, 2025
c872748
Ignore AdapterError::ConcurrentDependencyDrop in Parallel Workload DDL
ggevay Oct 18, 2025
8daaf4f
Version guards for enable_frontend_peek_sequencing alters
ggevay Oct 18, 2025
38f9287
Disable frontend peek sequencing in balancerd tests because of mz_ses…
ggevay Oct 18, 2025
a3270a2
enable_frontend_peek_sequencing adjustments
ggevay Oct 19, 2025
e414085
Tweak error msg
ggevay Oct 22, 2025
5a605fb
Lighten some catalog references
ggevay Oct 25, 2025
ebfe43b
Move catalog_snapshot outside try_frontend_peek_inner
ggevay Oct 25, 2025
d3de565
Cache catalog snapshots in SessionClient
ggevay Oct 25, 2025
6699042
Plan caching (without any invalidation)
ggevay Oct 25, 2025
5492a2a
Omit Command::Commit for single-peek implicit transactions
ggevay Oct 26, 2025
592c2be
Omit PeekNotification for frontend peeks
ggevay Oct 26, 2025
044d037
Comment out irrelevant benchmarks
ggevay Oct 26, 2025
e0d9fb3
Uncomment SelectLimitWorkload
ggevay Oct 26, 2025
f8cc3fb
Turn on the flag
ggevay Oct 26, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@ class StatementLogging(Check):
def initialize(self) -> Testdrive:
return Testdrive(
dedent(
# TODO(peek-seq): enable_frontend_peek_sequencing when it supports statement logging.
"""
$ postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
ALTER SYSTEM SET statement_logging_max_sample_rate TO 1.0

$[version>=16200] postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
ALTER SYSTEM SET enable_frontend_peek_sequencing = false;
"""
)
)
Expand Down
10 changes: 9 additions & 1 deletion misc/python/materialize/mzcompose/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ def get_minimal_system_parameters(
version: MzVersion,
) -> dict[str, str]:
"""Settings we need in order to have tests run at all, but otherwise stay
with the defaults: not changing performance or increasing coverage."""
with the defaults: not changing performance or increasing coverage.
Note: This is not used unless we explicitly select "System Parameters: Minimal" in trigger-ci.
"""

return {
# -----
Expand Down Expand Up @@ -130,6 +132,7 @@ class VariableSystemParameter:


# TODO: The linter should check this too
# Note: Only the default is tested unless we explicitly select "System Parameters: Random" in trigger-ci.
def get_variable_system_parameters(
version: MzVersion,
force_source_table_syntax: bool,
Expand Down Expand Up @@ -186,6 +189,11 @@ def get_variable_system_parameters(
"true",
["true", "false"],
),
VariableSystemParameter(
"enable_frontend_peek_sequencing",
"true",
["true", "false"],
),
VariableSystemParameter(
"kafka_default_metadata_fetch_interval",
"1s",
Expand Down
5 changes: 5 additions & 0 deletions misc/python/materialize/parallel_workload/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ def errors_to_ignore(self, exe: Executor) -> list[str]:
"real-time source dropped before ingesting the upstream system's visible frontier", # Expected, see https://buildkite.com/materialize/nightly/builds/9399#0191be17-1f4c-4321-9b51-edc4b08b71c5
"object state changed while transaction was in progress", # Old error msg, can remove this ignore later
"another session modified the catalog while this DDL transaction was open",
"was dropped while executing a statement",
]
)
if exe.db.scenario == Scenario.Cancel:
Expand Down Expand Up @@ -1322,6 +1323,10 @@ def __init__(
"314572800", # 300 MiB, the production value
]
self.flags_with_values["cluster"] = ["quickstart", "dont_exist"]
self.flags_with_values["enable_frontend_peek_sequencing"] = [
"true",
"false",
]

# If you are adding a new config flag in Materialize, consider using it
# here instead of just marking it as uninteresting to silence the
Expand Down
6 changes: 3 additions & 3 deletions misc/python/materialize/scalability/schema/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def init_sqls(self) -> list[str]:
[
f"CREATE TABLE t{t} (f1 INTEGER DEFAULT 1);",
f"INSERT INTO t{t} DEFAULT VALUES;",
f"CREATE OR REPLACE MATERIALIZED VIEW mv{t} AS SELECT count(*) AS count FROM t{t};",
# f"CREATE OR REPLACE MATERIALIZED VIEW mv{t} AS SELECT count(*) AS count FROM t{t};",
]
)

Expand All @@ -62,9 +62,9 @@ def init_sqls(self) -> list[str]:
# index to be ready.
if self.create_index:
init_sqls.append(f"CREATE INDEX i{t} ON t{t} (f1);")
init_sqls.append(f"CREATE INDEX mv_i{t} ON mv{t} (count);")
# init_sqls.append(f"CREATE INDEX mv_i{t} ON mv{t} (count);")
init_sqls.append(f"SELECT f1 from t{t};")
init_sqls.append(f"SELECT count from mv{t};")
# init_sqls.append(f"SELECT count from mv{t};")

return init_sqls

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,27 @@
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
from materialize.scalability.operation.operations.operations import (
InsertDefaultValues,
SelectCount,
SelectCountInMv,
#InsertDefaultValues,
#SelectCount,
#SelectCountInMv,
SelectLimit,
SelectOne,
#SelectOne,
SelectStar,
SelectUnionAll,
Update,
#SelectUnionAll,
#Update,
)
from materialize.scalability.operation.scalability_operation import Operation
from materialize.scalability.workload.workload_markers import DmlDqlWorkload


class InsertWorkload(DmlDqlWorkload):
def operations(self) -> list["Operation"]:
return [InsertDefaultValues()]
# class InsertWorkload(DmlDqlWorkload):
# def operations(self) -> list["Operation"]:
# return [InsertDefaultValues()]


class SelectOneWorkload(DmlDqlWorkload):
def operations(self) -> list["Operation"]:
return [SelectOne()]
# class SelectOneWorkload(DmlDqlWorkload):
# def operations(self) -> list["Operation"]:
# return [SelectOne()]


class SelectStarWorkload(DmlDqlWorkload):
Expand All @@ -40,26 +40,26 @@ def operations(self) -> list["Operation"]:
return [SelectLimit()]


class SelectCountWorkload(DmlDqlWorkload):
def operations(self) -> list["Operation"]:
return [SelectCount()]
# class SelectCountWorkload(DmlDqlWorkload):
# def operations(self) -> list["Operation"]:
# return [SelectCount()]


class SelectUnionAllWorkload(DmlDqlWorkload):
def operations(self) -> list["Operation"]:
return [SelectUnionAll()]
# class SelectUnionAllWorkload(DmlDqlWorkload):
# def operations(self) -> list["Operation"]:
# return [SelectUnionAll()]


class InsertAndSelectCountInMvWorkload(DmlDqlWorkload):
def operations(self) -> list["Operation"]:
return [InsertDefaultValues(), SelectCountInMv()]
# class InsertAndSelectCountInMvWorkload(DmlDqlWorkload):
# def operations(self) -> list["Operation"]:
# return [InsertDefaultValues(), SelectCountInMv()]


class InsertAndSelectLimitWorkload(DmlDqlWorkload):
def operations(self) -> list["Operation"]:
return [InsertDefaultValues(), SelectLimit()]
# class InsertAndSelectLimitWorkload(DmlDqlWorkload):
# def operations(self) -> list["Operation"]:
# return [InsertDefaultValues(), SelectLimit()]


class UpdateWorkload(DmlDqlWorkload):
def operations(self) -> list["Operation"]:
return [Update()]
# class UpdateWorkload(DmlDqlWorkload):
# def operations(self) -> list["Operation"]:
# return [Update()]
8 changes: 7 additions & 1 deletion src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::borrow::Cow;
use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::convert;
use std::sync::Arc;

use std::sync::atomic::{AtomicU64, Ordering};
use futures::future::BoxFuture;
use futures::{Future, FutureExt};
use itertools::Itertools;
Expand Down Expand Up @@ -140,6 +140,7 @@ pub struct Catalog {
expr_cache_handle: Option<ExpressionCacheHandle>,
storage: Arc<tokio::sync::Mutex<Box<dyn mz_catalog::durable::DurableCatalogState>>>,
transient_revision: u64,
latest_transient_revision: Arc<AtomicU64>,
}

// Implement our own Clone because derive can't unless S is Clone, which it's
Expand All @@ -152,6 +153,7 @@ impl Clone for Catalog {
expr_cache_handle: self.expr_cache_handle.clone(),
storage: Arc::clone(&self.storage),
transient_revision: self.transient_revision,
latest_transient_revision: Arc::clone(&self.latest_transient_revision),
}
}
}
Expand Down Expand Up @@ -490,6 +492,10 @@ impl Catalog {
self.transient_revision
}

pub fn latest_transient_revision(&self) -> u64 {
self.latest_transient_revision.load(Ordering::SeqCst) ////////// Is the ordering ok?
}

/// Creates a debug catalog from the current
/// `METADATA_BACKEND_URL` with parameters set appropriately for debug contexts,
/// like in tests.
Expand Down
2 changes: 2 additions & 0 deletions src/adapter/src/catalog/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ mod builtin_item_migration;

use std::collections::{BTreeMap, BTreeSet};
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use std::time::{Duration, Instant};

use futures::future::{BoxFuture, FutureExt};
Expand Down Expand Up @@ -551,6 +552,7 @@ impl Catalog {
plans: CatalogPlans::default(),
expr_cache_handle,
transient_revision: 1,
latest_transient_revision: Arc::new(AtomicU64::new(1)),
storage: Arc::new(tokio::sync::Mutex::new(storage)),
};

Expand Down
2 changes: 2 additions & 0 deletions src/adapter/src/catalog/transact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
use std::borrow::Cow;
use std::collections::{BTreeMap, BTreeSet};
use std::sync::Arc;
use std::sync::atomic::Ordering;
use std::time::Duration;

use itertools::Itertools;
Expand Down Expand Up @@ -454,6 +455,7 @@ impl Catalog {
drop(storage);
if let Some(new_state) = new_state {
self.transient_revision += 1;
self.latest_transient_revision.fetch_add(1, Ordering::SeqCst); ////////// todo: Is the ordering ok?
self.state = new_state;
}

Expand Down
Loading