Skip to content

Commit 460eb75

Browse files
authored
Allow user to set alerts_provider; don't override existing provider (#133)
* Provide alerts provider * Adapt tests
1 parent c493407 commit 460eb75

File tree

4 files changed

+41
-13
lines changed

4 files changed

+41
-13
lines changed

f/connectors/alerts/alerts_gcs.py

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
def main(
3434
gcp_service_acct: gcp_service_account,
3535
alerts_bucket: str,
36+
alerts_provider: str,
3637
territory_id: int,
3738
db: postgresql,
3839
db_table_name: str,
@@ -49,6 +50,7 @@ def main(
4950
return _main(
5051
storage_client,
5152
alerts_bucket,
53+
alerts_provider,
5254
territory_id,
5355
db,
5456
db_table_name,
@@ -59,6 +61,7 @@ def main(
5961
def _main(
6062
storage_client: gcs.Client,
6163
alerts_bucket: str,
64+
alerts_provider: str,
6265
territory_id: int,
6366
db: postgresql,
6467
db_table_name: str,
@@ -71,6 +74,8 @@ def _main(
7174
storage_client : google.cloud.storage.Client
7275
alerts_bucket : str
7376
The name of the GCS bucket containing alerts.
77+
alerts_provider : str
78+
The name of the alerts provider.
7479
territory_id : int
7580
The ID of the territory for which alerts are being processed.
7681
db : postgresql
@@ -97,10 +102,12 @@ def _main(
97102
convert_tiffs_to_jpg(tiff_files)
98103

99104
prepared_alerts_metadata, alerts_statistics = prepare_alerts_metadata(
100-
alerts_metadata, territory_id
105+
alerts_metadata, territory_id, alerts_provider
101106
)
102107

103-
prepared_alerts_data = prepare_alerts_data(destination_path, geojson_files)
108+
prepared_alerts_data = prepare_alerts_data(
109+
destination_path, geojson_files, alerts_provider
110+
)
104111

105112
logger.info(f"Writing alerts to the database table [{db_table_name}].")
106113
alerts_writer = StructuredDBWriter(
@@ -317,7 +324,7 @@ def convert_tiffs_to_jpg(tiff_files):
317324
logger.info("Successfully converted TIFF files to JPEG.")
318325

319326

320-
def prepare_alerts_metadata(alerts_metadata, territory_id):
327+
def prepare_alerts_metadata(alerts_metadata, territory_id, alerts_provider):
321328
"""
322329
Prepare alerts metadata by filtering and processing CSV data.
323330
@@ -335,6 +342,7 @@ def prepare_alerts_metadata(alerts_metadata, territory_id):
335342
CSV data as a string containing alerts metadata.
336343
territory_id : int
337344
The identifier for the territory used to filter the metadata.
345+
alerts_provider : str
338346
339347
Returns
340348
-------
@@ -377,9 +385,7 @@ def prepare_alerts_metadata(alerts_metadata, territory_id):
377385
index=False,
378386
)
379387

380-
# TODO: Currently, this script is only used for Terras alerts. Let's discuss a more sustainable approach with the alerts provider(s).
381-
# Also, if this changes for future alerts, we will need to ensure that existing records are not overwritten.
382-
filtered_df["data_source"] = "terras"
388+
filtered_df["data_source"] = alerts_provider
383389

384390
# Replace all NaN values with None
385391
filtered_df.replace({float("nan"): None}, inplace=True)
@@ -419,7 +425,7 @@ def prepare_alerts_metadata(alerts_metadata, territory_id):
419425
return prepared_alerts_metadata, alerts_statistics
420426

421427

422-
def prepare_alerts_data(local_directory, geojson_files):
428+
def prepare_alerts_data(local_directory, geojson_files, alerts_provider):
423429
"""
424430
Prepare alerts data by reading GeoJSON files from a local directory.
425431
@@ -432,7 +438,8 @@ def prepare_alerts_data(local_directory, geojson_files):
432438
The local directory where GeoJSON files are stored.
433439
geojson_files : list of str
434440
A list of GeoJSON file names to be processed.
435-
441+
alerts_provider : str
442+
The name of the alerts provider.
436443
Returns
437444
-------
438445
list of dict
@@ -485,7 +492,7 @@ def prepare_alerts_data(local_directory, geojson_files):
485492
"g__type": geom.get("type"),
486493
"g__coordinates": json.dumps(geom.get("coordinates")),
487494
# Metadata
488-
"data_source": "terras",
495+
"data_source": alerts_provider,
489496
"source_file_name": file_path,
490497
}
491498
)

f/connectors/alerts/alerts_gcs.script.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ schema:
99
order:
1010
- gcp_service_acct
1111
- alerts_bucket
12+
- alerts_provider
1213
- territory_id
1314
- db
1415
- db_table_name
@@ -19,6 +20,11 @@ schema:
1920
description: The name of the Google Cloud Storage bucket where the alerts are stored.
2021
default: null
2122
originalType: string
23+
alerts_provider:
24+
type: string
25+
description: The name of the alerts provider.
26+
default: null
27+
originalType: string
2228
db:
2329
type: object
2430
description: The database connection parameters for storing tabular data.
@@ -56,6 +62,7 @@ schema:
5662
required:
5763
- gcp_service_acct
5864
- alerts_bucket
65+
- alerts_provider
5966
- territory_id
6067
- db
6168
- db_table_name

f/connectors/alerts/tests/alerts_gcs_test.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ def test_prepare_alerts_metadata():
2626
alerts_metadata = pd.read_csv(alerts_history_csv).to_csv(index=False)
2727

2828
prepared_alerts_metadata, alert_statistics = prepare_alerts_metadata(
29-
alerts_metadata, 100
29+
alerts_metadata, 100, "test_provider"
3030
)
3131

3232
# Check that alerts statistics is the latest month and year in the CSV
@@ -39,8 +39,8 @@ def test_metadata_id_stability():
3939
alerts_history_csv = Path(assets_directory, "alerts_history.csv")
4040
alerts_metadata = pd.read_csv(alerts_history_csv).to_csv(index=False)
4141

42-
first, _ = prepare_alerts_metadata(alerts_metadata, 100)
43-
second, _ = prepare_alerts_metadata(alerts_metadata, 100)
42+
first, _ = prepare_alerts_metadata(alerts_metadata, 100, "test_provider")
43+
second, _ = prepare_alerts_metadata(alerts_metadata, 100, "test_provider")
4444

4545
# Order shouldn't matter but just to be safe, sort by _id
4646
first_ids = sorted(r["_id"] for r in first)
@@ -53,7 +53,7 @@ def test_metadata_id_stability():
5353
def test_alert_id_generation(tmp_path):
5454
file_path = Path(assets_directory, "alert_202309900112345671.geojson")
5555
geojson_files = [str(file_path)]
56-
prepared = prepare_alerts_data(tmp_path, geojson_files)
56+
prepared = prepare_alerts_data(tmp_path, geojson_files, "test_provider")
5757

5858
assert len(prepared) > 0
5959
for row in prepared:
@@ -99,6 +99,7 @@ def test_script_e2e(pg_database, mock_alerts_storage_client, tmp_path):
9999
alerts_metadata = _main(
100100
mock_alerts_storage_client,
101101
MOCK_BUCKET_NAME,
102+
"test_provider",
102103
100,
103104
pg_database,
104105
"fake_alerts",
@@ -248,6 +249,7 @@ def test_script_e2e(pg_database, mock_alerts_storage_client, tmp_path):
248249
alerts_metadata = _main(
249250
mock_alerts_storage_client,
250251
MOCK_BUCKET_NAME,
252+
"test_provider",
251253
100,
252254
pg_database,
253255
"fake_alerts",
@@ -264,6 +266,7 @@ def test_file_update_logic(pg_database, mock_alerts_storage_client, tmp_path):
264266
_main(
265267
mock_alerts_storage_client,
266268
MOCK_BUCKET_NAME,
269+
"test_provider",
267270
100,
268271
pg_database,
269272
"fake_alerts",
@@ -278,6 +281,7 @@ def test_file_update_logic(pg_database, mock_alerts_storage_client, tmp_path):
278281
_main(
279282
mock_alerts_storage_client,
280283
MOCK_BUCKET_NAME,
284+
"test_provider",
281285
100,
282286
pg_database,
283287
"fake_alerts",
@@ -304,6 +308,7 @@ def test_file_update_logic(pg_database, mock_alerts_storage_client, tmp_path):
304308
_main(
305309
mock_alerts_storage_client,
306310
MOCK_BUCKET_NAME,
311+
"test_provider",
307312
100,
308313
pg_database,
309314
"fake_alerts",

f/connectors/alerts_download_post_notify.flow/flow.yaml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ value:
1111
alerts_bucket:
1212
type: javascript
1313
expr: flow_input.alerts_bucket
14+
alerts_provider:
15+
type: javascript
16+
expr: flow_input.alerts_provider
1417
db:
1518
type: javascript
1619
expr: flow_input.db
@@ -87,6 +90,11 @@ schema:
8790
description: The name of the Google Cloud Storage bucket where the alerts are stored.
8891
default: ''
8992
originalType: string
93+
alerts_provider:
94+
type: string
95+
description: The name of the alerts provider.
96+
default: null
97+
originalType: string
9098
comapeo:
9199
type: object
92100
description: >
@@ -151,6 +159,7 @@ schema:
151159
required:
152160
- gcp_service_acct
153161
- alerts_bucket
162+
- alerts_provider
154163
- territory_id
155164
- db
156165
- db_table_name

0 commit comments

Comments
 (0)