-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathtest_aggregate.py
More file actions
353 lines (304 loc) · 13.7 KB
/
Copy pathtest_aggregate.py
File metadata and controls
353 lines (304 loc) · 13.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
"""
Unit tests for the aggregate_by_input_id reduce step in main.py.
The aggregation step is the most behavior-critical change from the
recent session — it collapses multi-EGID and multi-polygon sub-rows into
one output row per input_id, with numeric columns becoming ;-joined
arrays where sub-rows disagree. These tests pin down that contract.
"""
import math
import numpy as np
import pandas as pd
import pytest
from main import (
_AGGREGATE_PASS_THROUGH_COLS,
_demote_int_float,
_format_sub_value,
_reduce_group,
aggregate_by_input_id,
)
# ── _format_sub_value ──────────────────────────────────────────────────────
@pytest.mark.parametrize("v,expected", [
(None, ""),
(float("nan"), ""),
(1234, "1234"),
(1234.0, "1234"), # integer-valued float demotes
(1234.5, "1234.5"), # real float keeps decimals
("hello", "hello"),
(0, "0"),
])
def test_format_sub_value(v, expected):
assert _format_sub_value(v) == expected
# ── _demote_int_float ──────────────────────────────────────────────────────
@pytest.mark.parametrize("v,expected", [
(1234.0, 1234),
(1234.5, 1234.5),
("string", "string"),
(None, None),
(1234, 1234),
])
def test_demote_int_float(v, expected):
result = _demote_int_float(v)
assert result == expected
if expected == 1234 and v == 1234.0:
assert isinstance(result, int)
def test_demote_int_float_nan_passes_through():
result = _demote_int_float(float("nan"))
assert isinstance(result, float) and math.isnan(result)
# ── aggregate_by_input_id: single-row groups (no aggregation) ──────────────
def _build_df(rows):
"""
Helper: build a DataFrame from a list of dicts, preserving order.
"""
return pd.DataFrame(rows)
def test_aggregate_no_input_id_passes_through():
"""If there's no input_id column, the function is a no-op."""
df = _build_df([{"av_egid": 1234, "volume_above_ground_m3": 100.0}])
out = aggregate_by_input_id(df)
assert len(out) == 1
assert out.iloc[0]["av_egid"] == 1234
def test_aggregate_empty_df_passes_through():
df = _build_df([])
out = aggregate_by_input_id(df)
assert len(out) == 0
def test_aggregate_single_row_groups_unchanged():
"""Every input_id appears once → output identical to input."""
df = _build_df([
{"input_id": "A", "av_egid": 1, "fid": "f1",
"area_footprint_m2": 100.0, "volume_above_ground_m3": 1000.0,
"warnings": "", "status_step3": "success"},
{"input_id": "B", "av_egid": 2, "fid": "f2",
"area_footprint_m2": 200.0, "volume_above_ground_m3": 2000.0,
"warnings": "", "status_step3": "success"},
])
out = aggregate_by_input_id(df)
assert len(out) == 2
assert out.iloc[0]["av_egid"] == 1
assert out.iloc[1]["av_egid"] == 2
# Numeric columns stay numeric (not strings) for unaggregated rows
assert out.iloc[0]["area_footprint_m2"] == 100.0
# ── aggregate_by_input_id: multi-row groups (aggregation) ──────────────────
def test_aggregate_multi_polygon_one_egid_arrays():
"""
Same EGID, two AV polygons → one output row, fid becomes a ;-joined
array, footprint becomes a ;-joined array (different per polygon),
av_egid stays scalar (same), gkat stays scalar (same).
"""
df = _build_df([
{"input_id": "A", "av_egid": 1234, "fid": "f1",
"area_footprint_m2": 100.0, "volume_above_ground_m3": 1000.0,
"gkat": 1110, "gklas": 1110,
"warnings": "EGID 1234 matched 2 AV polygons",
"status_step3": "success"},
{"input_id": "A", "av_egid": 1234, "fid": "f2",
"area_footprint_m2": 150.0, "volume_above_ground_m3": 1200.0,
"gkat": 1110, "gklas": 1110,
"warnings": "EGID 1234 matched 2 AV polygons",
"status_step3": "success"},
])
out = aggregate_by_input_id(df)
assert len(out) == 1
row = out.iloc[0]
# av_egid stays scalar — both sub-rows agree
assert row["av_egid"] == 1234
# gkat/gklas stay scalar — both sub-rows agree
assert row["gkat"] == 1110
# fid becomes ;-joined — sub-rows differ
assert row["fid"] == "f1; f2"
# numeric metrics become ;-joined — sub-rows differ
assert row["area_footprint_m2"] == "100; 150"
assert row["volume_above_ground_m3"] == "1000; 1200"
# Aggregation note appended to warnings
assert "AV polygons for one EGID" in row["warnings"]
assert "EGID 1234 matched 2 AV polygons" in row["warnings"]
def test_aggregate_multi_egid_input_arrays():
"""
Multi-EGID input cell → multiple distinct EGIDs → arrays everywhere.
"""
df = _build_df([
{"input_id": "A", "av_egid": 1234, "fid": "f1",
"area_footprint_m2": 100.0, "volume_above_ground_m3": 1000.0,
"gkat": 1110, "gklas": 1110,
"warnings": "Input cell contained 2 EGIDs: 1234, 5678",
"status_step3": "success"},
{"input_id": "A", "av_egid": 5678, "fid": "f2",
"area_footprint_m2": 200.0, "volume_above_ground_m3": 2500.0,
"gkat": 1110, "gklas": 1220, # different gklas → array
"warnings": "Input cell contained 2 EGIDs: 1234, 5678",
"status_step3": "success"},
])
out = aggregate_by_input_id(df)
assert len(out) == 1
row = out.iloc[0]
# av_egid becomes ;-joined (different)
assert row["av_egid"] == "1234; 5678"
# gkat stays scalar (both 1110)
assert row["gkat"] == 1110
# gklas becomes ;-joined (1110 vs 1220)
assert row["gklas"] == "1110; 1220"
# Aggregation note matches the multi-EGID branch (not the multi-polygon one)
assert "distinct EGIDs" in row["warnings"]
assert "fix the input CSV" in row["warnings"]
def test_aggregate_multi_egid_partial_match_empty_slots():
"""
Multi-EGID input where some sub-EGIDs failed to find an AV polygon
→ array contains empty strings at the failed positions.
"""
df = _build_df([
{"input_id": "A", "av_egid": 1234, "fid": "f1",
"area_footprint_m2": 100.0, "volume_above_ground_m3": 1000.0,
"warnings": "", "status_step3": "success"},
{"input_id": "A", "av_egid": None, "fid": None,
"area_footprint_m2": None, "volume_above_ground_m3": None,
"warnings": "", "status_step3": "skipped:no_footprint"},
{"input_id": "A", "av_egid": 5678, "fid": "f3",
"area_footprint_m2": 200.0, "volume_above_ground_m3": 2000.0,
"warnings": "", "status_step3": "success"},
])
out = aggregate_by_input_id(df)
assert len(out) == 1
row = out.iloc[0]
# The failed position (index 1) shows as empty between two ;
assert row["av_egid"] == "1234; ; 5678"
assert row["fid"] == "f1; ; f3"
assert row["area_footprint_m2"] == "100; ; 200"
# Status rolls up to success because at least one sub-row succeeded
assert row["status_step3"] == "success"
def test_aggregate_status_rollup_all_failed():
"""
All sub-rows failed → aggregate keeps the first sub-row's failure
status (so downstream filtering works).
"""
df = _build_df([
{"input_id": "A", "av_egid": 1234, "fid": None, "area_footprint_m2": None,
"warnings": "", "status_step3": "skipped:no_footprint"},
{"input_id": "A", "av_egid": 5678, "fid": None, "area_footprint_m2": None,
"warnings": "", "status_step3": "skipped:no_footprint"},
])
out = aggregate_by_input_id(df)
assert out.iloc[0]["status_step3"] == "skipped:no_footprint"
def test_aggregate_warning_text_all_failed_no_egids():
"""
Regression for B5: when n_unique_egids == 0 (every sub-row had
av_egid = NaN), the aggregation note must NOT say "for one EGID" —
there's no EGID. It should explicitly say "none matched".
"""
df = _build_df([
{"input_id": "A", "av_egid": None, "fid": None, "area_footprint_m2": None,
"warnings": "", "status_step3": "skipped:no_footprint"},
{"input_id": "A", "av_egid": None, "fid": None, "area_footprint_m2": None,
"warnings": "", "status_step3": "skipped:no_footprint"},
])
out = aggregate_by_input_id(df)
note = out.iloc[0]["warnings"]
assert "none matched" in note
assert "for one EGID" not in note # the misleading text from before
assert "distinct EGIDs" not in note
def test_aggregate_warning_text_one_egid_multi_polygons():
"""One EGID, two polygons → aggregation note mentions 'one EGID'."""
df = _build_df([
{"input_id": "A", "av_egid": 1234, "fid": "f1",
"area_footprint_m2": 100.0, "warnings": "", "status_step3": "success"},
{"input_id": "A", "av_egid": 1234, "fid": "f2",
"area_footprint_m2": 150.0, "warnings": "", "status_step3": "success"},
])
out = aggregate_by_input_id(df)
note = out.iloc[0]["warnings"]
assert "AV polygons for one EGID" in note
assert "distinct EGIDs" not in note
def test_aggregate_warning_text_multi_egid():
"""Two distinct EGIDs → aggregation note mentions 'distinct EGIDs'."""
df = _build_df([
{"input_id": "A", "av_egid": 1234, "fid": "f1",
"area_footprint_m2": 100.0, "warnings": "", "status_step3": "success"},
{"input_id": "A", "av_egid": 5678, "fid": "f2",
"area_footprint_m2": 200.0, "warnings": "", "status_step3": "success"},
])
out = aggregate_by_input_id(df)
note = out.iloc[0]["warnings"]
assert "distinct EGIDs" in note
assert "for one EGID" not in note
def test_aggregate_warnings_deduplicated():
"""
The same warning text repeated across sub-rows must NOT appear twice
in the aggregated warnings — it's deduplicated.
"""
df = _build_df([
{"input_id": "A", "av_egid": 1234, "fid": "f1",
"area_footprint_m2": 100.0, "warnings": "duplicate warning",
"status_step3": "success"},
{"input_id": "A", "av_egid": 1234, "fid": "f2",
"area_footprint_m2": 150.0, "warnings": "duplicate warning",
"status_step3": "success"},
])
out = aggregate_by_input_id(df)
parts = out.iloc[0]["warnings"].split("; ")
# "duplicate warning" should appear at most once + the aggregation note
duplicate_count = sum(1 for p in parts if p == "duplicate warning")
assert duplicate_count == 1
def test_aggregate_demotes_integer_floats_in_object_cols():
"""
Cosmetic fix: when aggregation forces a column to object dtype,
integer-valued floats from non-aggregated rows must be written as
"1234567" not "1234567.0".
"""
df = _build_df([
# First group: aggregated → av_egid becomes string
{"input_id": "A", "av_egid": 1234.0, "fid": "f1",
"area_footprint_m2": 100.0, "warnings": "", "status_step3": "success"},
{"input_id": "A", "av_egid": 5678.0, "fid": "f2",
"area_footprint_m2": 150.0, "warnings": "", "status_step3": "success"},
# Second group: not aggregated → av_egid stays as a value
{"input_id": "B", "av_egid": 9999.0, "fid": "f3",
"area_footprint_m2": 200.0, "warnings": "", "status_step3": "success"},
])
out = aggregate_by_input_id(df)
# The column is object dtype now (mixed string + float)
assert out["av_egid"].dtype == object
# The non-aggregated row should be int 9999, not float 9999.0
row_b = out[out["input_id"] == "B"].iloc[0]
assert row_b["av_egid"] == 9999
assert isinstance(row_b["av_egid"], int)
def test_aggregate_pass_through_cols_not_arrayed():
"""
Columns in _AGGREGATE_PASS_THROUGH_COLS (input_id, status_*, warnings,
geometry) must not be ;-joined even if they differ — they have
special handling elsewhere in _reduce_group or are the group key.
"""
assert "input_id" in _AGGREGATE_PASS_THROUGH_COLS
assert "status_step3" in _AGGREGATE_PASS_THROUGH_COLS
assert "warnings" in _AGGREGATE_PASS_THROUGH_COLS
assert "geometry" in _AGGREGATE_PASS_THROUGH_COLS # B6 defensive guard
def test_aggregate_geometry_column_not_string_joined():
"""
Regression for B6: today's pipeline drops the geometry column before
aggregation, but the aggregation step must defensively NOT string-join
geometries if a future change leaves it in. With the geometry column
in _AGGREGATE_PASS_THROUGH_COLS, a multi-row group keeps the first
sub-row's geometry as a real Shapely object (not a WKT string).
"""
from shapely.geometry import Polygon
poly_a = Polygon([(0, 0), (10, 0), (10, 10), (0, 10)])
poly_b = Polygon([(20, 20), (30, 20), (30, 30), (20, 30)])
df = _build_df([
{"input_id": "X", "av_egid": 1234, "fid": "f1",
"area_footprint_m2": 100.0, "geometry": poly_a,
"warnings": "", "status_step3": "success"},
{"input_id": "X", "av_egid": 1234, "fid": "f2",
"area_footprint_m2": 150.0, "geometry": poly_b,
"warnings": "", "status_step3": "success"},
])
out = aggregate_by_input_id(df)
assert len(out) == 1
geom = out.iloc[0]["geometry"]
# Must still be a real Shapely Polygon, not a string of joined WKTs
assert isinstance(geom, Polygon), f"geometry got string-joined: {type(geom).__name__}"
# The first sub-row's polygon is preserved
assert geom.equals(poly_a)
def test_aggregate_preserves_column_order():
"""The output schema must match the input schema (same columns, same order)."""
df = _build_df([
{"input_id": "A", "av_egid": 1, "fid": "f1", "area_footprint_m2": 100.0,
"warnings": "", "status_step3": "success"},
])
out = aggregate_by_input_id(df)
assert list(out.columns) == list(df.columns)