Skip to content

Commit 413dcb2

Browse files
Merge branch 'main' into dependabot/pip/protobuf-4.25.8
2 parents 74ee920 + 0475354 commit 413dcb2

File tree

91 files changed

+889
-1675
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

91 files changed

+889
-1675
lines changed

.circleci/workflows.yml

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -425,11 +425,11 @@ jobs:
425425
name: Install telemetry-airflow dependencies
426426
command: |
427427
cd ~/telemetry-airflow
428-
virtualenv .venv
428+
python3 -m venv .venv
429429
source .venv/bin/activate
430-
pip install -r requirements.txt
431-
pip install -r requirements-dev.txt
432-
pip install -r requirements-override.txt
430+
pip install --no-deps -r requirements.txt
431+
pip install --no-deps -r requirements-dev.txt
432+
pip install --no-deps -r requirements-override.txt
433433
- save_cache:
434434
paths:
435435
- ~/telemetry-airflow/.venv
@@ -1246,6 +1246,7 @@ workflows:
12461246
- validate-views
12471247
- validate-metadata
12481248
- dry-run-sql
1249+
- test-routines
12491250
- test-routines:
12501251
requires:
12511252
- deploy-changes-to-stage

bigquery_etl/cli/query.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2300,6 +2300,9 @@ def _update_query_schema(
23002300
query_schema = Schema.from_query_file(
23012301
query_file_path,
23022302
content=sql_content,
2303+
project=project_name,
2304+
dataset=dataset_name,
2305+
table=table_name,
23032306
use_cloud_function=use_cloud_function,
23042307
respect_skip=respect_dryrun_skip,
23052308
sql_dir=sql_dir,

bigquery_etl/dryrun.py

Lines changed: 149 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,15 @@
1212
"""
1313

1414
import glob
15+
import hashlib
1516
import json
17+
import os
18+
import pickle
1619
import random
1720
import re
21+
import shutil
1822
import sys
23+
import tempfile
1924
import time
2025
from enum import Enum
2126
from os.path import basename, dirname, exists
@@ -106,10 +111,12 @@ def __init__(
106111
dataset=None,
107112
table=None,
108113
billing_project=None,
114+
use_cache=True,
109115
):
110116
"""Instantiate DryRun class."""
111117
self.sqlfile = sqlfile
112118
self.content = content
119+
self.use_cache = use_cache
113120
self.query_parameters = query_parameters
114121
self.strip_dml = strip_dml
115122
self.use_cloud_function = use_cloud_function
@@ -192,6 +199,17 @@ def skipped_files(sql_dir=ConfigLoader.get("default", "sql_dir")) -> Set[str]:
192199

193200
return skip_files
194201

202+
@staticmethod
203+
def clear_cache():
204+
"""Clear dry run cache directory."""
205+
cache_dir = Path(tempfile.gettempdir()) / "bigquery_etl_dryrun_cache"
206+
if cache_dir.exists():
207+
try:
208+
shutil.rmtree(cache_dir)
209+
print(f"Cleared dry run cache at {cache_dir}")
210+
except OSError as e:
211+
print(f"Warning: Failed to clear dry run cache: {e}")
212+
195213
def skip(self):
196214
"""Determine if dry run should be skipped."""
197215
return self.respect_skip and self.sqlfile in self.skipped_files(
@@ -225,6 +243,108 @@ def get_sql(self):
225243

226244
return sql
227245

246+
def _get_cache_key(self, sql):
247+
"""Generate cache key based on SQL content and other parameters."""
248+
cache_input = f"{sql}|{self.project}|{self.dataset}|{self.table}"
249+
return hashlib.sha256(cache_input.encode()).hexdigest()
250+
251+
@staticmethod
252+
def _get_cache_dir():
253+
"""Get the cache directory path."""
254+
cache_dir = Path(tempfile.gettempdir()) / "bigquery_etl_dryrun_cache"
255+
cache_dir.mkdir(parents=True, exist_ok=True)
256+
return cache_dir
257+
258+
def _read_cache_file(self, cache_file, ttl_seconds):
259+
"""Read and return cached data from a pickle file with TTL check."""
260+
try:
261+
if not cache_file.exists():
262+
return None
263+
264+
# check if cache is expired
265+
file_age = time.time() - cache_file.stat().st_mtime
266+
if file_age > ttl_seconds:
267+
try:
268+
cache_file.unlink()
269+
except OSError:
270+
pass
271+
return None
272+
273+
cached_data = pickle.loads(cache_file.read_bytes())
274+
return cached_data
275+
except (pickle.PickleError, EOFError, OSError, FileNotFoundError) as e:
276+
print(f"[CACHE] Failed to load {cache_file}: {e}")
277+
try:
278+
if cache_file.exists():
279+
cache_file.unlink()
280+
except OSError:
281+
pass
282+
return None
283+
284+
@staticmethod
285+
def _write_cache_file(cache_file, data):
286+
"""Write data to a cache file using atomic write."""
287+
try:
288+
# write to temporary file first, then atomically rename
289+
# this prevents race conditions where readers get partial files
290+
# include random bytes to handle thread pool scenarios where threads share same PID
291+
temp_file = Path(
292+
str(cache_file) + f".tmp.{os.getpid()}.{os.urandom(4).hex()}"
293+
)
294+
with open(temp_file, "wb") as f:
295+
pickle.dump(data, f)
296+
f.flush()
297+
os.fsync(f.fileno()) # Ensure data is written to disk
298+
299+
temp_file.replace(cache_file)
300+
except (pickle.PickleError, OSError) as e:
301+
print(f"[CACHE] Failed to save {cache_file}: {e}")
302+
try:
303+
if "temp_file" in locals() and temp_file.exists():
304+
temp_file.unlink()
305+
except (OSError, NameError):
306+
pass
307+
308+
def _get_cached_result(self, cache_key, ttl_seconds=None):
309+
"""Load cached dry run result from disk."""
310+
if ttl_seconds is None:
311+
ttl_seconds = ConfigLoader.get("dry_run", "cache_ttl_seconds", fallback=900)
312+
313+
cache_file = self._get_cache_dir() / f"dryrun_{cache_key}.pkl"
314+
return self._read_cache_file(cache_file, ttl_seconds)
315+
316+
def _save_cached_result(self, cache_key, result):
317+
"""Save dry run result to disk cache using atomic write."""
318+
cache_file = self._get_cache_dir() / f"dryrun_{cache_key}.pkl"
319+
self._write_cache_file(cache_file, result)
320+
321+
# save table metadata separately if present
322+
if (
323+
result
324+
and "tableMetadata" in result
325+
and self.project
326+
and self.dataset
327+
and self.table
328+
):
329+
table_identifier = f"{self.project}.{self.dataset}.{self.table}"
330+
self._save_cached_table_metadata(table_identifier, result["tableMetadata"])
331+
332+
def _get_cached_table_metadata(self, table_identifier, ttl_seconds=None):
333+
"""Load cached table metadata from disk based on table identifier."""
334+
if ttl_seconds is None:
335+
ttl_seconds = ConfigLoader.get("dry_run", "cache_ttl_seconds", fallback=900)
336+
337+
# table identifier as cache key
338+
table_cache_key = hashlib.sha256(table_identifier.encode()).hexdigest()
339+
cache_file = self._get_cache_dir() / f"table_metadata_{table_cache_key}.pkl"
340+
return self._read_cache_file(cache_file, ttl_seconds)
341+
342+
def _save_cached_table_metadata(self, table_identifier, metadata):
343+
"""Save table metadata to disk cache using atomic write."""
344+
table_cache_key = hashlib.sha256(table_identifier.encode()).hexdigest()
345+
cache_file = self._get_cache_dir() / f"table_metadata_{table_cache_key}.pkl"
346+
self._write_cache_file(cache_file, metadata)
347+
228348
@cached_property
229349
def dry_run_result(self):
230350
"""Dry run the provided SQL file."""
@@ -233,6 +353,14 @@ def dry_run_result(self):
233353
else:
234354
sql = self.get_sql()
235355

356+
# check cache first (if caching is enabled)
357+
if sql is not None and self.use_cache:
358+
cache_key = self._get_cache_key(sql)
359+
cached_result = self._get_cached_result(cache_key)
360+
if cached_result is not None:
361+
self.dry_run_duration = 0 # Cached result, no actual dry run
362+
return cached_result
363+
236364
query_parameters = []
237365
if self.query_parameters:
238366
for parameter_name, parameter_type in self.query_parameters.items():
@@ -351,6 +479,12 @@ def dry_run_result(self):
351479
}
352480

353481
self.dry_run_duration = time.time() - start_time
482+
483+
# Save to cache (if caching is enabled and result is valid)
484+
# Don't cache errors to allow retries
485+
if self.use_cache and result.get("valid"):
486+
self._save_cached_result(cache_key, result)
487+
354488
return result
355489

356490
except Exception as e:
@@ -476,6 +610,13 @@ def get_table_schema(self):
476610
):
477611
return self.dry_run_result["tableMetadata"]["schema"]
478612

613+
# Check if table metadata is cached (if caching is enabled)
614+
if self.use_cache and self.project and self.dataset and self.table:
615+
table_identifier = f"{self.project}.{self.dataset}.{self.table}"
616+
cached_metadata = self._get_cached_table_metadata(table_identifier)
617+
if cached_metadata:
618+
return cached_metadata["schema"]
619+
479620
return []
480621

481622
def get_dataset_labels(self):
@@ -565,6 +706,13 @@ def validate_schema(self):
565706
return True
566707

567708
query_file_path = Path(self.sqlfile)
709+
table_name = query_file_path.parent.name
710+
dataset_name = query_file_path.parent.parent.name
711+
project_name = query_file_path.parent.parent.parent.name
712+
self.project = project_name
713+
self.dataset = dataset_name
714+
self.table = table_name
715+
568716
query_schema = Schema.from_json(self.get_schema())
569717
if self.errors():
570718
# ignore file when there are errors that self.get_schema() did not raise
@@ -576,26 +724,7 @@ def validate_schema(self):
576724
click.echo(f"No schema file defined for {query_file_path}", err=True)
577725
return True
578726

579-
table_name = query_file_path.parent.name
580-
dataset_name = query_file_path.parent.parent.name
581-
project_name = query_file_path.parent.parent.parent.name
582-
583-
partitioned_by = None
584-
if (
585-
self.metadata
586-
and self.metadata.bigquery
587-
and self.metadata.bigquery.time_partitioning
588-
):
589-
partitioned_by = self.metadata.bigquery.time_partitioning.field
590-
591-
table_schema = Schema.for_table(
592-
project_name,
593-
dataset_name,
594-
table_name,
595-
client=self.client,
596-
id_token=self.id_token,
597-
partitioned_by=partitioned_by,
598-
)
727+
table_schema = Schema.from_json(self.get_table_schema())
599728

600729
# This check relies on the new schema being deployed to prod
601730
if not query_schema.compatible(table_schema):

bigquery_etl/schema/__init__.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from google.cloud.bigquery import SchemaField
1414

1515
from .. import dryrun
16+
from ..config import ConfigLoader
1617

1718
SCHEMA_FILE = "schema.yaml"
1819

@@ -58,24 +59,34 @@ def from_json(cls, json_schema):
5859
return cls(json_schema)
5960

6061
@classmethod
61-
def for_table(cls, project, dataset, table, partitioned_by=None, *args, **kwargs):
62+
def for_table(
63+
cls,
64+
project,
65+
dataset,
66+
table,
67+
partitioned_by=None,
68+
filename="query.sql",
69+
*args,
70+
**kwargs,
71+
):
6272
"""Get the schema for a BigQuery table."""
6373
query = f"SELECT * FROM `{project}.{dataset}.{table}`"
6474

6575
if partitioned_by:
6676
query += f" WHERE DATE(`{partitioned_by}`) = DATE('2020-01-01')"
6777

6878
try:
79+
sql_dir = ConfigLoader.get("default", "sql_dir")
6980
return cls(
7081
dryrun.DryRun(
71-
os.path.join(project, dataset, table, "query.sql"),
82+
os.path.join(sql_dir, project, dataset, table, filename),
7283
query,
7384
project=project,
7485
dataset=dataset,
7586
table=table,
7687
*args,
7788
**kwargs,
78-
).get_schema()
89+
).get_table_schema()
7990
)
8091
except Exception as e:
8192
print(f"Cannot get schema for {project}.{dataset}.{table}: {e}")

bigquery_etl/schema/stable_table_schema.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,9 @@ def prod_schemas_uri():
5959
with the most recent production schemas deploy.
6060
"""
6161
dryrun = DryRun(
62-
"moz-fx-data-shared-prod/telemetry_derived/foo/query.sql", content="SELECT 1"
62+
"moz-fx-data-shared-prod/telemetry_derived/foo/query.sql",
63+
content="SELECT 1",
64+
use_cache=False,
6365
)
6466
build_id = dryrun.get_dataset_labels()["schemas_build_id"]
6567
commit_hash = build_id.split("_")[-1]
@@ -88,6 +90,11 @@ def get_stable_table_schemas() -> List[SchemaFile]:
8890
print(f"Failed to load cached schemas: {e}, re-downloading...")
8991

9092
print(f"Downloading schemas from {schemas_uri}")
93+
94+
# Clear dry run cache when downloading new schemas
95+
# Schema changes could affect dry run results
96+
DryRun.clear_cache()
97+
9198
with urllib.request.urlopen(schemas_uri) as f:
9299
tarbytes = BytesIO(f.read())
93100

bqetl_project.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ dry_run:
3232
function_accounts:
3333
- bigquery-etl-dryrun@moz-fx-data-shared-prod.iam.gserviceaccount.com
3434
- bigquery-etl-dryrun@moz-fx-data-shar-nonprod-efed.iam.gserviceaccount.com
35+
cache_ttl_seconds: 900 # Cache dry run results for 15 minutes (900 seconds)
3536
skip:
3637
## skip all data-observability-dev queries due to CI lacking permissions in that project.
3738
# TODO: once data observability platform assessment concludes this should be removed.

dags.yaml

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2459,20 +2459,6 @@ bqetl_firefox_enterprise:
24592459
tags:
24602460
- impact/tier_3
24612461

2462-
bqetl_braze_win10_sync:
2463-
schedule_interval: 0 5 * * *
2464-
description: |
2465-
Daily run to pull inactive Win10 users and sync them to Braze.
2466-
default_args:
2467-
2468-
email:
2469-
2470-
start_date: '2025-10-06'
2471-
retries: 1
2472-
retry_delay: 30m
2473-
tags:
2474-
- impact/tier_3
2475-
24762462
bqetl_baseline_clients_city_seen:
24772463
schedule_interval: 0 4 * * *
24782464
default_args:

script/glam/generate_glean_sql

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,11 @@ function write_clients_daily_aggregates {
6262
local tables;
6363
# Pings to exclude
6464
local excluded_tables=(
65-
"use_counters.*" # Use counters bring too many rows to process and GLAM doesn't support them.
66-
"^migration$" # Migration tables are not supported by GLAM and don't have `is_bot_generated` flag.
67-
"^client_deduplication$" # Client deduplication tables are not supported by GLAM and don't have `is_bot_generated` flag.
65+
"use_counters.*" # Use counters bring too many rows to process and GLAM doesn't support them.
66+
"^migration$" # Migration tables are not supported by GLAM and don't have `is_bot_generated` flag.
67+
"^client_deduplication$" # Client deduplication tables are not supported by GLAM and don't have `is_bot_generated` flag.
68+
"^events$" # Not supported by GLAM.
69+
"^nimbus-targeting-context$" # Not supported by GLAM.
6870
)
6971
local exclude_pattern=$(IFS='|'; echo "${excluded_tables[*]}")
7072

0 commit comments

Comments
 (0)