Skip to content

Commit 1c5ce7c

Browse files
feat: Automatically check cache-validity of DataFrames tasks marked as lazy (#272)
Co-authored-by: windiana42 <[email protected]>
1 parent 2ed05e3 commit 1c5ce7c

File tree

20 files changed

+309
-244
lines changed

20 files changed

+309
-244
lines changed

docs/source/changelog.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
# Changelog
22

3+
## 0.12.6 (2025-XX-XX)
4+
- Feat: Automatically check cache-validity of polars and pandas DataFrame tasks marked as lazy
5+
36
## 0.12.5 (2025-11-26)
47
- Workaround snowflake sqlalchemy dialect to enable ExternalTableReference to other database
58
- Flag in CreateTable and DropTable DDL statements allows not quoting schema (needed for multi-part schema)

docs/source/examples/realistic_pipeline.md

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -323,14 +323,19 @@ are `sqlalchemy.Table`, `pandas.DataFrame`, `polars.DataFrame`, or `polars.LazyF
323323

324324
### Controlling automatic cache invalidation
325325

326-
For input_type `sa.Table`, and `pdt.SqlAlchemy`, in general, it is best to set lazy=True. This means the task is always
326+
For input_type `sa.Table`, and `pdt.SqlAlchemy`, in general, it is best to set `lazy=True`. This means the task is always
327327
executed because producing a query is fast, but the query is only executed when it is actually needed. For
328328
`pl.LazyFrame`, `version=AUTO_VERSION` is a good choice, because then the task is executed once with empty input
329-
dataframes and only if resulting LazyFrame expressions change, the task is executed again with full input data. For
330-
`pd.DataFrame` and `pl.DataFrame`, we don't try to guess which changes of the code are actually meaningful. Thus the
331-
user needs to help manually bumpig a version number like `version="1.0.0"`. For development, `version=None` simply
332-
deactivates caching until the code is more stable. It is recommended to always develop with small pipeline instances
333-
anyways to achieve high iteration speed (see [multi_instance_pipeline.md](multi_instance_pipeline.md)).
329+
dataframes and only if resulting LazyFrame expressions change, the task is executed again with full input data.
330+
331+
For `pd.DataFrame` and `pl.DataFrame`, we don't try to guess which changes of the code are actually meaningful. Thus,
332+
to avoid running the task, the user needs to help manually bumping a version number like `version="1.0.0"`.
333+
For development, `version=None` simply deactivates caching until the code is more stable. It is recommended to always
334+
develop with small pipeline instances anyways to achieve high iteration speed (see [multi_instance_pipeline.md](multi_instance_pipeline.md)).
335+
Setting `lazy=True` for tasks returning `pd.DataFrame` or `pl.DataFrame` objects, always executes the task, but hashes the result to
336+
determine the cache-validity of the task output and hence the cache invalidation of downstream tasks.
337+
This is a good choice for tasks returning small dataframes which are quick to compute and where bumping the version number adds unwanted
338+
complexity to the development process. It is allowed to produce both dataframe and SQL output in one `@materialize(lazy=True, ...)` task.
334339

335340
### Integration with pydiverse colspec (same as dataframely but with pydiverse transform based SQL support)
336341

docs/source/quickstart.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,9 +187,12 @@ In this case, the task must produce a SQLAlchemy expression for
187187
all tabular outputs without executing them. Pipedag can render the query and will only produce a table based on this
188188
query expression if the query changed or one of the inputs to the task changed.
189189

190+
For tasks returning a Polars or Pandas DataFrame, the hash of the resulting DataFrame is used to determine whether to
191+
cache-invalidate downstream tasks.
192+
190193
### Manual cache invalidation with `version` parameter
191194

192-
For non-SQL tasks, the `version` parameter of the {py:func}`@materialize <pydiverse.pipedag.materialize>` decorator must
195+
For non-lazy tasks, the `version` parameter of the {py:func}`@materialize <pydiverse.pipedag.materialize>` decorator must
193196
be used for manual cache invalidation. As long as the version stays the same, it is assumed that the code of the task
194197
did not materially change and will produce the same outputs given the same inputs. We refrained from automatically
195198
inspecting any python code changes since this would break at shared code changes where it is very hard to distinguish

pixi.lock

Lines changed: 130 additions & 131 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pixi.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ platforms = ["linux-64", "osx-64", "osx-arm64", "win-64"] # "linux-aarch64"
77
#pip = "*"
88
hatchling = "*"
99
python = ">=3.11.14,<3.15.0a0"
10-
pydiverse-common = ">=0.4.1,<0.5"
10+
pydiverse-common = ">=0.4.3,<0.5"
1111
typing-extensions = ">=4.15.0,<5"
1212
networkx = ">=3.4,<4"
1313
attrs = ">=25.4.0,<26"

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ classifiers = [
2929
]
3030

3131
dependencies = [
32-
"pydiverse-common >=0.4.1,<0.5",
32+
"pydiverse-common >=0.4.3,<0.5",
3333
"typing-extensions >=4.15.0,<5",
3434
"networkx >=3.4,<4",
3535
"attrs >=25.4.0,<26",

src/pydiverse/pipedag/backend/table/cache/parquet.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
from pydiverse.pipedag import ConfigContext, Stage, Table
1616
from pydiverse.pipedag.materialize.materializing_task import MaterializingTask
1717
from pydiverse.pipedag.materialize.store import BaseTableCache
18-
from pydiverse.pipedag.materialize.table_hook_base import CanMatResult, CanRetResult, TableHook
18+
from pydiverse.pipedag.materialize.table_hook_base import CanMatResult, CanRetResult, DataFrameTableHook, TableHook
1919
from pydiverse.pipedag.optional_dependency.transform import pdt, pdt_new, pdt_old
2020
from pydiverse.pipedag.util import normalize_name
2121
from pydiverse.pipedag.util.path import is_file_uri
@@ -98,7 +98,7 @@ def get_table_path(self, table: Table, file_extension: str) -> UPath:
9898

9999

100100
@ParquetTableCache.register_table(pd)
101-
class PandasTableHook(TableHook[ParquetTableCache]):
101+
class PandasTableHook(DataFrameTableHook, TableHook[ParquetTableCache]):
102102
pd_version = Version(pd.__version__)
103103

104104
@classmethod

src/pydiverse/pipedag/backend/table/dict.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from pydiverse.pipedag.materialize.materializing_task import MaterializingTask
1010
from pydiverse.pipedag.materialize.metadata import LazyTableMetadata, TaskMetadata
1111
from pydiverse.pipedag.materialize.store import BaseTableStore
12-
from pydiverse.pipedag.materialize.table_hook_base import CanMatResult, CanRetResult, TableHook
12+
from pydiverse.pipedag.materialize.table_hook_base import CanMatResult, CanRetResult, DataFrameTableHook, TableHook
1313
from pydiverse.pipedag.optional_dependency.transform import C, pdt
1414

1515

@@ -126,7 +126,7 @@ def get_table_objects_in_stage(self, stage: Stage, include_views=True) -> list[s
126126

127127

128128
@DictTableStore.register_table(pd)
129-
class PandasTableHook(TableHook[DictTableStore]):
129+
class PandasTableHook(DataFrameTableHook, TableHook[DictTableStore]):
130130
@classmethod
131131
def can_materialize(cls, tbl: Table) -> CanMatResult:
132132
type_ = type(tbl.obj)

src/pydiverse/pipedag/backend/table/sql/dialects/duckdb.py

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,9 @@
99
import sqlalchemy as sa
1010
from packaging.version import Version
1111

12+
import pydiverse.pipedag.backend.table.sql.hooks as sql_hooks
1213
from pydiverse.common import Dtype
1314
from pydiverse.pipedag import Table
14-
from pydiverse.pipedag.backend.table.sql.hooks import (
15-
IbisTableHook,
16-
PandasTableHook,
17-
PolarsTableHook,
18-
)
1915
from pydiverse.pipedag.backend.table.sql.sql import SQLTableStore
2016
from pydiverse.pipedag.container import Schema
2117
from pydiverse.pipedag.materialize.details import resolve_materialization_details_label
@@ -65,7 +61,7 @@ def dialect_requests_empty_creation(self, table: Table, is_sql: bool) -> bool:
6561

6662

6763
@DuckDBTableStore.register_table(pd)
68-
class PandasTableHook(PandasTableHook):
64+
class PandasTableHook(sql_hooks.PandasTableHook):
6965
@classmethod
7066
def _execute_materialize(
7167
cls,
@@ -102,7 +98,7 @@ def _execute_materialize(
10298

10399

104100
@DuckDBTableStore.register_table(pl, duckdb)
105-
class PolarsTableHook(PolarsTableHook):
101+
class PolarsTableHook(sql_hooks.PolarsTableHook):
106102
@classmethod
107103
def dialect_supports_connectorx(cls):
108104
# ConnectorX (used by Polars read_database_uri) does not support DuckDB.
@@ -133,7 +129,7 @@ def download_table(
133129

134130

135131
@DuckDBTableStore.register_table(ibis.api.Table)
136-
class IbisTableHook(IbisTableHook):
132+
class IbisTableHook(sql_hooks.IbisTableHook):
137133
@classmethod
138134
def _conn(cls, store: DuckDBTableStore):
139135
return ibis.duckdb.from_connection(store.engine.raw_connection())

src/pydiverse/pipedag/backend/table/sql/dialects/duckdb_parquet.py

Lines changed: 1 addition & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,6 @@
2727
from pydiverse.pipedag.container import SortOrder, View
2828
from pydiverse.pipedag.context import RunContext
2929
from pydiverse.pipedag.materialize.store import BaseTableStore
30-
from pydiverse.pipedag.materialize.table_hook_base import (
31-
AutoVersionSupport,
32-
CanMatResult,
33-
CanRetResult,
34-
TableHook,
35-
)
3630
from pydiverse.pipedag.optional_dependency.sqlalchemy import Select, SqlText, TextClause
3731
from pydiverse.pipedag.util.path import is_file_uri
3832

@@ -903,18 +897,7 @@ def drop_subquery_table(
903897

904898

905899
@ParquetTableStore.register_table(pd)
906-
class PandasTableHook(TableHook[ParquetTableStore]):
907-
auto_version_support = AutoVersionSupport.TRACE
908-
909-
@classmethod
910-
def can_materialize(cls, tbl: Table) -> CanMatResult:
911-
type_ = type(tbl.obj)
912-
return CanMatResult.new(issubclass(type_, pd.DataFrame))
913-
914-
@classmethod
915-
def can_retrieve(cls, type_) -> CanRetResult:
916-
return CanRetResult.new(issubclass(type_, pd.DataFrame))
917-
900+
class PandasTableHook(sql_hooks.PandasTableHook):
918901
@classmethod
919902
def materialize(
920903
cls,
@@ -1060,14 +1043,6 @@ def get_pyarrow_path(path: UPath, store: ParquetTableStore) -> tuple[str, fsspec
10601043
pyarrow_fs = None
10611044
return pyarrow_path, pyarrow_fs
10621045

1063-
@classmethod
1064-
def auto_table(cls, obj: pd.DataFrame):
1065-
return sql_hooks.PandasTableHook.auto_table(obj)
1066-
1067-
@classmethod
1068-
def get_computation_tracer(cls):
1069-
return sql_hooks.PandasTableHook.ComputationTracer()
1070-
10711046

10721047
@ParquetTableStore.register_table(pl, duckdb)
10731048
class PolarsTableHook(sql_hooks.PolarsTableHook):

0 commit comments

Comments
 (0)