Skip to content

Commit ab3ceea

Browse files
committed
Support partition stitching in MaterializedViewRewrite
1 parent 6a353e0 commit ab3ceea

File tree

19 files changed

+7446
-297
lines changed

19 files changed

+7446
-297
lines changed

presto-docs/src/main/sphinx/admin/materialized-views.rst

Lines changed: 239 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,245 @@ The following permissions are required for materialized view operations when
7272
* For DEFINER mode: User needs ``SELECT`` permission on the view itself
7373
* For INVOKER mode: User needs ``SELECT`` permission on all underlying base tables
7474

75+
Data Consistency Modes
76+
----------------------
77+
78+
Materialized views support three data consistency modes that control how queries are optimized
79+
when the view's data may be stale:
80+
81+
**USE_STITCHING** (default)
82+
Reads fresh partitions from storage, recomputes stale partitions from base tables,
83+
and combines results via UNION.
84+
85+
**USE_DATA_TABLE**
86+
Reads directly from storage, ignoring staleness. Fastest but may return stale data.
87+
88+
**USE_VIEW_QUERY**
89+
Executes the view query against base tables. Always fresh but highest cost.
90+
91+
Set via session property::
92+
93+
SET SESSION materialized_view_skip_storage = 'USE_STITCHING';
94+
95+
Partition Stitching (USE_STITCHING Mode)
96+
----------------------------------------
97+
98+
Overview
99+
^^^^^^^^
100+
101+
Partition stitching recomputes only stale partitions rather than the entire view. When base
102+
tables change, Presto identifies which partitions are affected and generates a UNION query
103+
that combines:
104+
105+
* **Storage scan**: Reads unchanged (fresh) partitions from the materialized view's storage
106+
* **Recompute branch**: Recomputes changed (stale) partitions from base tables using the view's
107+
defining query
108+
109+
This avoids full recomputation when only a subset of partitions are stale, though there is
110+
overhead from the UNION operation and partition-level filtering.
111+
112+
How It Works
113+
^^^^^^^^^^^^
114+
115+
**Staleness Detection**
116+
117+
For each base table referenced in the materialized view, a connector may track which partitions
118+
have changed since the last refresh and return predicates identifying the stale data. The
119+
specific mechanism depends on the connector:
120+
121+
1. At refresh time, partition-level metadata is recorded (implementation varies by connector)
122+
2. When the view is queried, the current partition state is compared with the recorded state
123+
3. Partition constraints are built that identify exactly which data is stale
124+
125+
See the connector-specific documentation for details on how staleness is tracked.
126+
For Iceberg tables, see :doc:`/connector/iceberg` (Materialized Views section).
127+
128+
**Query Rewriting**
129+
130+
When a query uses a materialized view with stale partitions, the optimizer rewrites the query
131+
to use UNION::
132+
133+
-- Original query
134+
SELECT * FROM my_materialized_view WHERE order_date >= '2024-01-01'
135+
136+
-- Rewritten with partition stitching
137+
SELECT * FROM (
138+
-- Fresh partitions from storage
139+
SELECT * FROM my_materialized_view_storage
140+
WHERE order_date >= '2024-01-01'
141+
AND order_date NOT IN ('2024-01-15', '2024-01-16') -- Exclude stale
142+
UNION ALL
143+
-- Stale partitions recomputed
144+
SELECT o.order_id, c.customer_name, o.order_date
145+
FROM orders o
146+
JOIN customers c ON o.customer_id = c.customer_id
147+
WHERE o.order_date IN ('2024-01-15', '2024-01-16') -- Stale partition filter
148+
AND c.reg_date IN ('2024-01-15', '2024-01-16') -- Propagated via equivalence
149+
AND o.order_date >= '2024-01-01' -- Original filter preserved
150+
)
151+
152+
The partition predicate is propagated to equivalent columns in joined tables (in this case,
153+
``c.reg_date``), allowing partition pruning on the ``customers`` table as well.
154+
155+
Requirements
156+
^^^^^^^^^^^^
157+
158+
For partition stitching to work effectively, the following requirements must be met:
159+
160+
**Partitioning Requirement**
161+
162+
Both base tables and the materialized view must be partitioned, and partition columns must be
163+
preserved through the view's query:
164+
165+
* Base table partition columns must appear in the SELECT list or be equivalent to columns that do
166+
* The materialized view should be partitioned on the same or equivalent columns
167+
* Partition columns must use compatible data types
168+
169+
**Unsupported Query Patterns**
170+
171+
Partition stitching does not work with:
172+
173+
* **Outer joins**: LEFT, RIGHT, and FULL OUTER joins
174+
* **Non-deterministic functions**: ``RANDOM()``, ``NOW()``, ``UUID()``, etc.
175+
176+
**Security Constraints**
177+
178+
For SECURITY INVOKER materialized views, partition stitching requires that:
179+
180+
* No column masks are defined on base tables (or the view is treated as fully stale)
181+
* No row filters are defined on base tables (or the view is treated as fully stale)
182+
183+
This is because column masks and row filters can vary by user, making it impossible to
184+
determine staleness in a user-independent way.
185+
186+
Column Equivalences and Passthrough Partitions
187+
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
188+
189+
Partition stitching supports **passthrough partitions** through **column equivalences**,
190+
which allows tracking partition staleness even when partition columns from base tables
191+
are not directly in the materialized view's output.
192+
193+
**Column Equivalence**
194+
195+
When tables are joined with equality predicates on partition columns, those columns become
196+
equivalent for partition tracking purposes::
197+
198+
CREATE TABLE orders (order_id BIGINT, customer_id BIGINT, order_date VARCHAR)
199+
WITH (partitioning = ARRAY['order_date']);
200+
201+
CREATE TABLE customers (customer_id BIGINT, name VARCHAR, reg_date VARCHAR)
202+
WITH (partitioning = ARRAY['reg_date']);
203+
204+
-- MV with equivalence: order_date = reg_date
205+
CREATE MATERIALIZED VIEW order_summary
206+
WITH (partitioning = ARRAY['order_date'])
207+
AS
208+
SELECT o.order_id, c.name, o.order_date
209+
FROM orders o
210+
JOIN customers c ON o.customer_id = c.customer_id
211+
AND o.order_date = c.reg_date; -- Creates equivalence
212+
213+
In this example:
214+
215+
* ``orders.order_date`` and ``customers.reg_date`` are equivalent due to the equality join condition
216+
* Even though ``reg_date`` is not in the MV's SELECT list, staleness can be tracked through the equivalence to ``order_date``
217+
* When ``customers`` table changes in partition ``reg_date='2024-01-15'``, this maps to ``order_date='2024-01-15'`` for recomputation
218+
219+
**How Passthrough Mapping Works**
220+
221+
1. **Equivalence Extraction**: During MV creation, Presto analyzes JOIN conditions to identify
222+
partition column equivalences
223+
224+
2. **Staleness Detection**: When a base table changes:
225+
226+
* Presto detects which partitions changed in the base table
227+
* For passthrough columns, constraints are mapped through equivalences
228+
* Example: ``customers.reg_date='2024-01-15'`` → ``orders.order_date='2024-01-15'``
229+
230+
3. **Constraint Application**: The mapped constraints are used in:
231+
232+
* Storage scan: Exclude partitions where equivalent columns match stale values
233+
* Recompute branch: Filter the stale table using its partition column
234+
* Joined tables: Propagate the partition predicate to equivalent columns in joined
235+
tables, enabling partition pruning on those tables as well
236+
237+
**Requirements for Passthrough Partitions**
238+
239+
* Join must be an INNER JOIN (not LEFT, RIGHT, or FULL OUTER)
240+
* Equality must be direct (``col1 = col2``), not through expressions like ``col1 = col2 + 1``
241+
* Both columns must be partition columns in their respective tables
242+
* At least one column in the equivalence class must be in the MV's output
243+
* Data types must be compatible
244+
245+
**Transitive Equivalences**
246+
247+
Multiple equivalences can be chained together. If ``A.x = B.y`` and ``B.y = C.z``, then
248+
``A.x``, ``B.y``, and ``C.z`` are all equivalent for partition tracking.
249+
250+
Unsupported Patterns
251+
^^^^^^^^^^^^^^^^^^^^
252+
253+
Partition stitching is **not** applied in the following cases:
254+
255+
* **Non-partitioned tables**: If base tables or the materialized view lack partitioning
256+
* **Partition columns not preserved**: If partition columns are transformed or not in the output
257+
* **Outer joins with passthrough**: LEFT, RIGHT, and FULL OUTER joins invalidate passthrough equivalences due to null handling
258+
* **Expression-based equivalences**: ``CAST(col1 AS DATE) = col2`` or ``col1 = col2 + 1``
259+
260+
When partition stitching cannot be applied, the behavior falls back to the configured consistency mode:
261+
262+
* If ``USE_STITCHING`` is set but stitching is not possible, the query falls back to full
263+
recompute (equivalent to ``USE_VIEW_QUERY``)
264+
* A warning may be logged indicating why stitching was not possible
265+
266+
Performance Considerations
267+
^^^^^^^^^^^^^^^^^^^^^^^^^^^
268+
269+
**When Stitching is Most Effective**
270+
271+
* **Large materialized views**: More benefit from avoiding full recomputation
272+
* **Localized changes**: When only a small fraction of partitions are stale
273+
* **Frequently refreshed**: When most partitions remain fresh between queries
274+
* **Well-partitioned data**: When partition scheme aligns with data modification patterns
275+
276+
**Cost Trade-offs**
277+
278+
Partition stitching introduces a UNION operation, which has overhead:
279+
280+
* **Storage scan overhead**: Reading from storage + filtering fresh partitions
281+
* **Recompute overhead**: Querying base tables + filtering stale partitions
282+
* **Union overhead**: Combining results from both branches
283+
284+
However, this is typically much cheaper than:
285+
286+
* **Full recompute**: Reading all base table data
287+
* **Stale data**: Returning incorrect results
288+
289+
**Optimization Tips**
290+
291+
1. **Partition granularity**: Choose partition columns that align with data modification patterns
292+
293+
* Too coarse (e.g., partitioning by year): Recomputes too much data
294+
* Too fine (e.g., partitioning by second): Too many partitions to manage
295+
296+
2. **Refresh frequency**: Balance freshness needs with refresh costs
297+
298+
* More frequent refreshes: Less recomputation per query, but higher refresh costs
299+
* Less frequent refreshes: More recomputation per query, but lower refresh costs
300+
301+
3. **Query filters**: Include partition column filters in queries when possible::
302+
303+
-- Good: Limits scan to relevant partitions
304+
SELECT * FROM mv WHERE order_date >= '2024-01-01'
305+
306+
-- Less optimal: Scans all partitions
307+
SELECT * FROM mv WHERE customer_id = 12345
308+
309+
4. **Monitor metrics**: Track the ratio of storage scan vs recompute:
310+
311+
* High recompute ratio: Consider more frequent refreshes or better partitioning
312+
* High storage scan ratio: Stitching is working efficiently
313+
75314
See Also
76315
--------
77316

presto-docs/src/main/sphinx/connector/iceberg.rst

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2306,18 +2306,27 @@ Property Name Description
23062306

23072307
The storage table inherits standard Iceberg table properties for partitioning, sorting, and file format.
23082308

2309+
Staleness Tracking
2310+
^^^^^^^^^^^^^^^^^^
2311+
2312+
The Iceberg connector tracks materialized view staleness at the partition level, enabling
2313+
partition stitching to recompute only affected partitions rather than the entire view.
2314+
2315+
.. note::
2316+
Partition-level staleness detection only works for append-only changes (INSERT).
2317+
DELETE or UPDATE operations on base tables cause the entire view to be treated
2318+
as stale, requiring full recomputation.
2319+
23092320
Freshness and Refresh
23102321
^^^^^^^^^^^^^^^^^^^^^^
23112322

2312-
Materialized views track the snapshot IDs of their base tables to determine staleness. When base tables are modified, the materialized view becomes stale and returns results by querying the base tables directly. After running ``REFRESH MATERIALIZED VIEW``, queries read from the pre-computed storage table.
2313-
2314-
The refresh operation uses a full refresh strategy, replacing all data in the storage table with the current query results.
2323+
After running ``REFRESH MATERIALIZED VIEW``, queries read from the pre-computed storage table. The refresh operation uses a full refresh strategy, replacing all data in the storage table with the current query results and recording the new snapshot IDs for all base tables.
23152324

23162325
Limitations
23172326
^^^^^^^^^^^
23182327

2319-
- All refreshes recompute the entire result set
2320-
- REFRESH does not provide snapshot isolation across multiple base tables
2328+
- All refreshes recompute the entire result set (incremental refresh not yet supported)
2329+
- REFRESH does not provide snapshot isolation across multiple base tables (each base table's current snapshot is used independently)
23212330
- Querying materialized views at specific snapshots or timestamps is not supported
23222331

23232332
Example

0 commit comments

Comments
 (0)