Skip to content

Commit ae4cd03

Browse files
authored
Fix(tests): Address some test flakiness (#5209)
1 parent 4dfe8ae commit ae4cd03

File tree

11 files changed

+174
-82
lines changed

11 files changed

+174
-82
lines changed

.circleci/manage-test-db.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ databricks_init() {
5151

5252
# Note: the cluster doesnt need to be running to create / drop catalogs, but it does need to be running to run the integration tests
5353
echo "Ensuring cluster is running"
54-
databricks clusters start $CLUSTER_ID || true
54+
databricks clusters start $CLUSTER_ID
5555
}
5656

5757
databricks_up() {

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ athena-test: guard-AWS_ACCESS_KEY_ID guard-AWS_SECRET_ACCESS_KEY guard-ATHENA_S3
174174
pytest -n auto -m "athena" --retries 3 --junitxml=test-results/junit-athena.xml
175175

176176
fabric-test: guard-FABRIC_HOST guard-FABRIC_CLIENT_ID guard-FABRIC_CLIENT_SECRET guard-FABRIC_DATABASE engine-fabric-install
177-
pytest -n auto -m "fabric" --retries 3 --junitxml=test-results/junit-fabric.xml
177+
pytest -n auto -m "fabric" --retries 3 --junitxml=test-results/junit-fabric.xml
178178

179179
gcp-postgres-test: guard-GCP_POSTGRES_INSTANCE_CONNECTION_STRING guard-GCP_POSTGRES_USER guard-GCP_POSTGRES_PASSWORD guard-GCP_POSTGRES_KEYFILE_JSON engine-gcppostgres-install
180180
pytest -n auto -m "gcp_postgres" --retries 3 --junitxml=test-results/junit-gcp-postgres.xml

pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,7 @@ markers = [
259259
"mssql: test for MSSQL",
260260
"mysql: test for MySQL",
261261
"postgres: test for Postgres",
262+
"gcp_postgres: test for Postgres on GCP",
262263
"redshift: test for Redshift",
263264
"snowflake: test for Snowflake",
264265
"spark: test for Spark",
@@ -267,7 +268,7 @@ markers = [
267268
]
268269
addopts = "-n 0 --dist=loadgroup"
269270
asyncio_default_fixture_loop_scope = "session"
270-
log_cli = false # Set this to true to enable logging during tests
271+
log_cli = true # Set this to true to enable logging during tests
271272
log_cli_format = "%(asctime)s.%(msecs)03d %(filename)s:%(lineno)d %(levelname)s %(message)s"
272273
log_cli_level = "INFO"
273274
filterwarnings = [

sqlmesh/core/config/loader.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ def load_config_from_paths(
8383
personal_paths: t.Optional[t.List[Path]] = None,
8484
config_name: str = "config",
8585
load_from_env: bool = True,
86+
variables: t.Optional[t.Dict[str, t.Any]] = None,
8687
**kwargs: t.Any,
8788
) -> C:
8889
project_paths = project_paths or []
@@ -116,7 +117,7 @@ def load_config_from_paths(
116117
"YAML configs do not support multiple configs. Use Python instead.",
117118
)
118119
yaml_config_path = path.resolve()
119-
non_python_configs.append(load_config_from_yaml(path))
120+
non_python_configs.append(load_config_from_yaml(path, variables))
120121
elif extension == "py":
121122
try:
122123
python_config = load_config_from_python_module(
@@ -194,8 +195,10 @@ def load_config_from_paths(
194195
return non_python_config
195196

196197

197-
def load_config_from_yaml(path: Path) -> t.Dict[str, t.Any]:
198-
content = yaml_load(path)
198+
def load_config_from_yaml(
199+
path: Path, variables: t.Optional[t.Dict[str, t.Any]] = None
200+
) -> t.Dict[str, t.Any]:
201+
content = yaml_load(path, variables=variables)
199202
if not isinstance(content, dict):
200203
raise ConfigError(
201204
f"Invalid YAML configuration: expected a dictionary but got {type(content).__name__}. "

sqlmesh/core/engine_adapter/fabric.py

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import typing as t
44
import logging
55
import requests
6+
import time
67
from functools import cached_property
78
from sqlglot import exp
89
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_result
@@ -15,6 +16,7 @@
1516
from sqlmesh.utils.errors import SQLMeshError
1617
from sqlmesh.utils.connection_pool import ConnectionPool
1718

19+
1820
if t.TYPE_CHECKING:
1921
from sqlmesh.core._typing import TableName
2022

@@ -172,8 +174,17 @@ def __init__(self, tenant_id: str, workspace_id: str, client_id: str, client_sec
172174
self.client_secret = client_secret
173175
self.workspace_id = workspace_id
174176

175-
def create_warehouse(self, warehouse_name: str) -> None:
177+
def create_warehouse(
178+
self, warehouse_name: str, if_not_exists: bool = True, attempt: int = 0
179+
) -> None:
176180
"""Create a catalog (warehouse) in Microsoft Fabric via REST API."""
181+
182+
# attempt count is arbitrary, it essentially equates to 5 minutes of 30 second waits
183+
if attempt > 10:
184+
raise SQLMeshError(
185+
f"Gave up waiting for Fabric warehouse {warehouse_name} to become available"
186+
)
187+
177188
logger.info(f"Creating Fabric warehouse: {warehouse_name}")
178189

179190
request_data = {
@@ -182,7 +193,34 @@ def create_warehouse(self, warehouse_name: str) -> None:
182193
}
183194

184195
response = self.session.post(self._endpoint_url("warehouses"), json=request_data)
185-
response.raise_for_status()
196+
197+
if (
198+
if_not_exists
199+
and response.status_code == 400
200+
and (errorCode := response.json().get("errorCode", None))
201+
):
202+
if errorCode == "ItemDisplayNameAlreadyInUse":
203+
logger.warning(f"Fabric warehouse {warehouse_name} already exists")
204+
return
205+
if errorCode == "ItemDisplayNameNotAvailableYet":
206+
logger.warning(f"Fabric warehouse {warehouse_name} is still spinning up; waiting")
207+
# Fabric error message is something like:
208+
# - "Requested 'circleci_51d7087e__dev' is not available yet and is expected to become available in the upcoming minutes."
209+
# This seems to happen if a catalog is dropped and then a new one with the same name is immediately created.
210+
# There appears to be some delayed async process on the Fabric side that actually drops the warehouses and frees up the names to be used again
211+
time.sleep(30)
212+
return self.create_warehouse(
213+
warehouse_name=warehouse_name, if_not_exists=if_not_exists, attempt=attempt + 1
214+
)
215+
216+
try:
217+
response.raise_for_status()
218+
except:
219+
# the important information to actually debug anything is in the response body which Requests never prints
220+
logger.exception(
221+
f"Failed to create warehouse {warehouse_name}. status: {response.status_code}, body: {response.text}"
222+
)
223+
raise
186224

187225
# Handle direct success (201) or async creation (202)
188226
if response.status_code == 201:
@@ -197,11 +235,12 @@ def create_warehouse(self, warehouse_name: str) -> None:
197235
logger.error(f"Unexpected response from Fabric API: {response}\n{response.text}")
198236
raise SQLMeshError(f"Unable to create warehouse: {response}")
199237

200-
def delete_warehouse(self, warehouse_name: str) -> None:
238+
def delete_warehouse(self, warehouse_name: str, if_exists: bool = True) -> None:
201239
"""Drop a catalog (warehouse) in Microsoft Fabric via REST API."""
202240
logger.info(f"Deleting Fabric warehouse: {warehouse_name}")
203241

204242
# Get the warehouse ID by listing warehouses
243+
# TODO: handle continuationUri for pagination, ref: https://learn.microsoft.com/en-us/rest/api/fabric/warehouse/items/list-warehouses?tabs=HTTP#warehouses
205244
response = self.session.get(self._endpoint_url("warehouses"))
206245
response.raise_for_status()
207246

@@ -213,9 +252,12 @@ def delete_warehouse(self, warehouse_name: str) -> None:
213252
warehouse_id = warehouse_name_to_id.get(warehouse_name, None)
214253

215254
if not warehouse_id:
216-
logger.error(
255+
logger.warning(
217256
f"Fabric warehouse does not exist: {warehouse_name}\n(available warehouses: {', '.join(warehouse_name_to_id)})"
218257
)
258+
if if_exists:
259+
return
260+
219261
raise SQLMeshError(
220262
f"Unable to delete Fabric warehouse {warehouse_name} as it doesnt exist"
221263
)

tests/conftest.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,39 @@ def pytest_collection_modifyitems(items, *args, **kwargs):
212212
item.add_marker("fast")
213213

214214

215+
@pytest.hookimpl(hookwrapper=True, tryfirst=True)
216+
def pytest_runtest_makereport(item: pytest.Item, call: pytest.CallInfo):
217+
# The tmp_path fixture frequently throws errors like:
218+
# - KeyError: <_pytest.stash.StashKey object at 0x79ba385fe1a0>
219+
# in its teardown. This causes pytest to mark the test as failed even though we have zero control over this behaviour.
220+
# So we log/swallow that particular error here rather than raising it
221+
222+
# note: the hook always has to yield
223+
outcome = yield
224+
225+
# we only care about tests that used the tmp_path fixture
226+
if "tmp_path" not in getattr(item, "fixturenames", []):
227+
return
228+
229+
result: pytest.TestReport = outcome.get_result()
230+
231+
if result.when != "teardown":
232+
return
233+
234+
# If we specifically failed with a StashKey error in teardown, mark the test as passed
235+
if result.failed:
236+
exception = call.excinfo
237+
if (
238+
exception
239+
and isinstance(exception.value, KeyError)
240+
and "_pytest.stash.StashKey" in repr(exception)
241+
):
242+
result.outcome = "passed"
243+
item.add_report_section(
244+
"teardown", "stderr", f"Ignored tmp_path teardown error: {exception}"
245+
)
246+
247+
215248
# Ignore all local config files
216249
@pytest.fixture(scope="session", autouse=True)
217250
def ignore_local_config_files():

tests/core/engine_adapter/integration/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ def __init__(
193193
engine_adapter: EngineAdapter,
194194
mark: str,
195195
gateway: str,
196+
tmp_path: pathlib.Path,
196197
is_remote: bool = False,
197198
columns_to_types: t.Optional[t.Dict[str, t.Union[str, exp.DataType]]] = None,
198199
):
@@ -210,6 +211,7 @@ def __init__(
210211
self._catalogs: t.List[
211212
str
212213
] = [] # keep track of any catalogs created via self.create_catalog() so we can drop them at the end
214+
self.tmp_path = tmp_path
213215

214216
@property
215217
def test_type(self) -> str:
@@ -655,6 +657,7 @@ def create_context(
655657
private_sqlmesh_dir / "config.yml",
656658
private_sqlmesh_dir / "config.yaml",
657659
],
660+
variables={"tmp_path": str(path or self.tmp_path)},
658661
)
659662
if config_mutator:
660663
config_mutator(self.gateway, config)

tests/core/engine_adapter/integration/config.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ gateways:
55
type: duckdb
66
catalogs:
77
memory: ':memory:'
8-
testing: 'testing.duckdb'
8+
testing: "{{ var('tmp_path') }}/testing.duckdb"
99

1010
# Databases with docker images available
1111
inttest_trino_hive:

tests/core/engine_adapter/integration/conftest.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,15 @@
2727
logger = logging.getLogger(__name__)
2828

2929

30-
@pytest.fixture(scope="session")
31-
def config() -> Config:
30+
@pytest.fixture
31+
def config(tmp_path: pathlib.Path) -> Config:
3232
return load_config_from_paths(
3333
Config,
3434
project_paths=[
35-
pathlib.Path("examples/wursthall/config.yaml"),
3635
pathlib.Path(os.path.join(os.path.dirname(__file__), "config.yaml")),
3736
],
3837
personal_paths=[pathlib.Path("~/.sqlmesh/config.yaml").expanduser()],
38+
variables={"tmp_path": str(tmp_path)},
3939
)
4040

4141

@@ -89,7 +89,9 @@ def _create(engine_name: str, gateway: str) -> EngineAdapter:
8989

9090
@pytest.fixture
9191
def create_test_context(
92-
request: FixtureRequest, create_engine_adapter: t.Callable[[str, str], EngineAdapter]
92+
request: FixtureRequest,
93+
create_engine_adapter: t.Callable[[str, str], EngineAdapter],
94+
tmp_path: pathlib.Path,
9395
) -> t.Callable[[IntegrationTestEngine, str, str, str], t.Iterable[TestContext]]:
9496
def _create(
9597
engine: IntegrationTestEngine, gateway: str, test_type: str, table_format: str
@@ -103,6 +105,7 @@ def _create(
103105
engine_adapter,
104106
f"{engine.engine}_{table_format}",
105107
gateway,
108+
tmp_path=tmp_path,
106109
is_remote=is_remote,
107110
)
108111

0 commit comments

Comments
 (0)