-
Notifications
You must be signed in to change notification settings - Fork 240
Description
Referencing this chapter in the docs: Customizing SQLMesh
Using SQLMesh version 0.176.0 I have tried this out using the example project scaffolded by sqlmesh init duckdb
.
The config part of the config.py
looks like this:
config = Config(
project="sqlmesh_example",
default_gateway="duckdb",
gateways={"duckdb": GatewayConfig(
connection=DuckDBConnectionConfig(
database="db.db", concurrent_tasks=1
),
)},
model_defaults=ModelDefaultsConfig(dialect="duckdb", start="2025-03-13"),
loader=CustomLoader,
)
I couldn't get VACUUM @this_model
to work with duckdb, even when I add it as a post statement directly to sqlmesh_example.incremental_model
. So instead I went with a different post statement, using the following @print_locals()
macro, present in the macros/
directory of the project.
from sqlmesh import macro
@macro()
def print_locals(evaluator, test=None):
print(f"Test: {test}")
print(f"Locals: {evaluator.locals}")
return None
Using the macro in a post statement
First I tested it out modifying sqlmesh_example.incremental_model
MODEL (
name sqlmesh_example.incremental_model,
kind INCREMENTAL_BY_TIME_RANGE (
time_column event_date
),
start '2020-01-01',
cron '@daily',
grain (id, event_date)
);
SELECT
id,
item_id,
event_date
FROM sqlmesh_example.seed_model
WHERE
event_date BETWEEN @start_date AND @end_date
;
@print_locals();
and used the standard loader (via the scaffolded config.yaml
) to execute sqlmesh plan
.
This works fine. And as a sneak peek to what I think will turn out to be relevant wrt the custom loader, the model's python_env
(from the statestore _snapshots
table):
"python_env": {
"print_locals": {
"payload": "def print_locals(evaluator, test=None):\n print(f'Test: {test}')\n print(f'Locals: {evaluator.locals}')\n return None",
"kind": "definition",
"name": "print_locals",
"path": "macros/print_locals.py"
}
}
NB! Adding the @print_locals();
post statement to sqlmesh_example.seed_model
results in the following error:
Error: Failed to render query at '/home/myuser/projects/sqlmesh_example/models/seed_model.sql':
@print_locals()
Adding the post statement dynamically using a custom loader
On a side note: for this I removed the database and the SQLMesh cache folder since switching from config.yaml
to config.py
seemed to have some side effects that I couldn't resolve by sqlmesh clean
. This might be a separate issue?
As a next step I removed the post statement from sqlmesh_example.incremental_model
and modified the config.py
from Customizing SQLMesh using
parse_one("@print_locals())
and also exempting sqlmesh_example.seed_model
from being modified with a post statement. Config.py:
rom sqlmesh.core.loader import SqlMeshLoader
from sqlmesh.utils import UniqueKeyDict
from sqlmesh.core.dialect import parse_one
from sqlmesh.core.config import Config, ModelDefaultsConfig, GatewayConfig, DuckDBConnectionConfig
# New `CustomLoader` class subclasses `SqlMeshLoader`
class CustomLoader(SqlMeshLoader):
# Override SqlMeshLoader's `_load_models` method to access every model
def _load_models(
self,
macros: "MacroRegistry",
jinja_macros: "JinjaMacroRegistry",
gateway: str | None,
audits: UniqueKeyDict[str, "ModelAudit"],
signals: UniqueKeyDict[str, "signal"],
) -> UniqueKeyDict[str, "Model"]:
# Call SqlMeshLoader's normal `_load_models` method to ingest models from file and parse model SQL
models = super()._load_models(macros, jinja_macros, gateway, audits, signals)
new_models = {}
# Loop through the existing model names/objects
for model_name, model in models.items():
# Inspecting the python_env:
print(f"Python env for {model_name}: {model.python_env}")
# Modifying the post statements for all models except the seed model:
if model_name != '"db"."sqlmesh_example"."seed_model"':
# Create list of existing and new post-statements
new_post_statements = [
# Existing post-statements from model object
*model.post_statements,
# New post-statement is raw SQL, so we parse it with SQLGlot's `parse_one` function.
# Make sure to specify the SQL dialect if different from the project default.
parse_one(f"@print_locals()")
]
else: # The seed model
new_post_statements = [*model.post_statements]
# Create a copy of the model with the `post_statements_` field updated
new_models[model_name] = model.copy(update={"post_statements_": new_post_statements})
# Inspecting the python env of the modified model:
print(f"Python env for modified {model_name}: {new_models[model_name].python_env}")
# Inspecting the post statements:
print(f"Post statements after modifying: {new_models[model_name].post_statements_}")
return new_models
# Pass the CustomLoader class to the SQLMesh configuration object
config = Config(
project="sqlmesh_example",
default_gateway="duckdb",
gateways={"duckdb": GatewayConfig(
connection=DuckDBConnectionConfig(
database="db.db", concurrent_tasks=1
),
)},
model_defaults=ModelDefaultsConfig(dialect="duckdb", start="2025-03-13"),
loader=CustomLoader,
)
Doing sqlmesh plan
results in this error:
Error: Failed to resolve macros for
@print_locals()
Macro 'print_locals' does not exist. at '/home/myuser/projects/sqlmesh_example/models/incremental_model.sql'
Inspecting the python_env
of sqlmesh_example.incremental_model
from the printed messages:
Before modifying:
Python env for "db"."sqlmesh_example"."incremental_model": {}
After modifying:
Python env for modified "db"."sqlmesh_example"."incremental_model": {}
From my current understanding of SQLMesh the definition of @print_locals()
macro needs to be present in python_env
for the plan to succeed.
Doing a trick for adding the macro definition to python_env
As a little trick I added the post statement @print_locals();
to sqlmesh_example.incremental_model
and sqlmesh_example.full_model
. This would add the definition of @print_locals()
to the python env. It would also result in these models having TWO post statements, one from the .sql
files and one from the custom loader.
Doing sqlmesh plan
succeeds.
Inspecting the python_env
of sqlmesh_example.incremental_model
from the printed messages:
Before modifying:
Python env for "db"."sqlmesh_example"."incremental_model": {'print_locals': Executable<payload: def print_locals(evaluator, test=None):
print(f'Test: {test}')
print(f'Locals: {evaluator.locals}')
return None, name: print_locals, path: macros/print_locals.py>}
The macro definition is present because the macro is called in the post statement in models/incremental_model.sql
and added by super()._load_models()
.
After modifying:
Python env for modified "db"."sqlmesh_example"."full_model": {'print_locals': Executable<payload: def print_locals(evaluator, test=None):
print(f'Test: {test}')
print(f'Locals: {evaluator.locals}')
return None, name: print_locals, path: macros/print_locals.py>}
Also, there are 2 post statements after the model has been modified:
Post statements after modifying: [MacroFunc(
this=Anonymous(
this=print_locals)), MacroFunc(
this=Anonymous(
this=print_locals))]
If the post statement in the config.py
is modified slightly to be
parse_one("@print_locals('Hello World')")
the plan is also successfully applied. But this all hinges on the definition of @print_locals()
being present in the model's python_env
beforehand.
Replacing the post statement in config.py
with
parse_one("@print_locals(@gateway)")
results in failure to execute the plan:
Error: Failed to resolve macros for
@print_locals(@gateway)
Macro variable 'gateway' is undefined. at '/home/myuser/projects/sqlmesh_example/models/incremental_model.sql'
And this makes sense, since the @gateway
macro isn't used anywhere else.
Once again we can do a small trick, by including
@gateway as dummy
in the select statements for sqlmesh_example.incremental_model
and sqlmesh_example.full_model
.
After doing this, the plan succeeds - in this case because the because the python_env
now has an item {__sqlmesh__vars__': Executable<payload: {'gateway': 'duckdb'}, kind: value>}
.
Hypothesis
Arriving at the following hypothesis:
After modifying the
post_statements_
attribute of a model it is necessary to load the modified model again in order to updatemodel.python_env
and possibly other things as well.
I think this would require calling load_sql_based_model()
with the right choice of arguments (and for a complete solution we would also need to load python models).
And the above would also apply to other model changes we might make in the custom loader. Without loading the modified models we may easily end up with something that just doesn't work.