Skip to content

Commit 3353524

Browse files
hellaishynnotLDiazN
authored
Add richer metadata to the analysis aggregation query (#922)
* Add args to make test command * Add E2E tests to list analysis endpoint * Add E2E tests to list observations endpoint * Add E2E tests to aggregate analysis endpoint * Add richer metadata to the analysis aggregation query * Move SQL query to other module * Return Loni values inside of the analysis aggregate * Adjust the calculations of anomaly,confirmed and failure * There is no need to shorten the count key * Cast blocked values to 0 instead of NULL The greatest function will not ignore NULL values and instead return NULL as output * Remove redundant fields * Add extra columns to return value * Updated broken test This test used to assert that anomaly count should be in every entry in the results of the aggregation analysis, but we droped that in favor of Loni values --------- Co-authored-by: hynnot <[email protected]> Co-authored-by: Luis Díaz <[email protected]>
1 parent e39fc99 commit 3353524

File tree

5 files changed

+362
-139
lines changed

5 files changed

+362
-139
lines changed

ooniapi/services/oonimeasurements/src/oonimeasurements/routers/data/aggregate_analysis.py

Lines changed: 80 additions & 132 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from pydantic import BaseModel
77

88
from .utils import get_measurement_start_day_agg, TimeGrains, parse_probe_asn_to_int
9+
from ...sql import format_aggregate_query
910
from ...dependencies import (
1011
get_clickhouse_session,
1112
)
@@ -41,16 +42,37 @@ class DBStats(BaseModel):
4142
total_row_count: int
4243

4344

45+
class Loni(BaseModel):
46+
dns_isp_blocked: float
47+
dns_isp_down: float
48+
dns_isp_ok: float
49+
dns_other_blocked: float
50+
dns_other_down: float
51+
dns_other_ok: float
52+
tls_blocked: float
53+
tls_down: float
54+
tls_ok: float
55+
tcp_blocked: float
56+
tcp_down: float
57+
tcp_ok: float
58+
59+
dns_isp_outcome: str
60+
dns_other_outcome: str
61+
tcp_outcome: str
62+
tls_outcome: str
63+
64+
4465
class AggregationEntry(BaseModel):
45-
anomaly_count: float
46-
confirmed_count: float
47-
failure_count: float
48-
ok_count: float
49-
measurement_count: float
66+
count: float
5067

5168
measurement_start_day: Optional[datetime] = None
69+
5270
outcome_label: str
53-
outcome_value: float
71+
outcome_ok: float
72+
outcome_blocked: float
73+
outcome_down: float
74+
75+
loni: Loni
5476

5577
domain: Optional[str] = None
5678
probe_cc: Optional[str] = None
@@ -148,146 +170,72 @@ async def get_aggregation_analysis(
148170
where += " WHERE "
149171
where += " AND ".join(and_clauses)
150172

151-
q = f"""
152-
WITH
153-
mapFilter((k, v) -> v != 0, dns_nok_outcomes) as dns_outcomes,
154-
mapFilter((k, v) -> v != 0, tcp_nok_outcomes) as tcp_outcomes,
155-
mapFilter((k, v) -> v != 0, tls_nok_outcomes) as tls_outcomes,
156-
157-
arrayZip(mapKeys(dns_outcomes), mapValues(dns_outcomes)) as dns_outcome_list,
158-
arraySum((v) -> v.2, dns_outcome_list) as dns_nok_sum,
159-
arraySort((v) -> -v.2, arrayMap((v) -> (v.1, v.2/dns_nok_sum), dns_outcome_list)) as dns_outcomes_norm,
160-
161-
arrayZip(mapKeys(tcp_outcomes), mapValues(tcp_outcomes)) as tcp_outcome_list,
162-
arraySum((v) -> v.2, tcp_outcome_list) as tcp_nok_sum,
163-
arraySort((v) -> -v.2, arrayMap((v) -> (v.1, v.2/tcp_nok_sum), tcp_outcome_list)) as tcp_outcomes_norm,
164-
165-
arrayZip(mapKeys(tls_outcomes), mapValues(tls_outcomes)) as tls_outcome_list,
166-
arraySum((v) -> v.2, tls_outcome_list) as tls_nok_sum,
167-
arraySort((v) -> -v.2, arrayMap((v) -> (v.1, v.2/tls_nok_sum), tls_outcome_list)) as tls_outcomes_norm,
168-
169-
arraySort(
170-
(v) -> -v.2,
171-
[
172-
(dns_outcome_nok_label, dns_outcome_nok_value),
173-
(tcp_outcome_nok_label, tcp_outcome_nok_value),
174-
(tls_outcome_nok_label, tls_outcome_nok_value),
175-
IF(
176-
tls_ok_sum = 0 AND tls_outcome_nok_value = 0,
177-
-- Special case for when the tested target was not supporting HTTPS and hence the TLS outcome is not so relevant
178-
('ok', arrayMin([dns_outcome_ok_value, tcp_outcome_ok_value])),
179-
('ok', arrayMin([dns_outcome_ok_value, tcp_outcome_ok_value, tls_outcome_ok_value]))
180-
)
181-
]
182-
) as all_outcomes_sorted,
183-
184-
arrayConcat(dns_outcomes_norm, tcp_outcomes_norm, tls_outcomes_norm) as all_nok_outcomes,
185-
186-
dns_outcomes_norm[1].1 as dns_outcome_nok_label,
187-
dns_outcomes_norm[1].2 as dns_outcome_nok_value,
188-
189-
tcp_outcomes_norm[1].1 as tcp_outcome_nok_label,
190-
tcp_outcomes_norm[1].2 as tcp_outcome_nok_value,
191-
192-
tls_outcomes_norm[1].1 as tls_outcome_nok_label,
193-
tls_outcomes_norm[1].2 as tls_outcome_nok_value,
194-
195-
IF(dns_ok_sum > 0, 1 - dns_outcome_nok_value, 0) as dns_outcome_ok_value,
196-
IF(tcp_ok_sum > 0, 1 - tcp_outcome_nok_value, 0) as tcp_outcome_ok_value,
197-
IF(tls_ok_sum > 0, 1 - tls_outcome_nok_value, 0) as tls_outcome_ok_value,
198-
199-
all_outcomes_sorted[1].1 as final_outcome_label,
200-
IF(final_outcome_label = 'ok', all_outcomes_sorted[1].2, all_outcomes_sorted[1].2) as final_outcome_value
201-
202-
SELECT
203-
204-
{",".join(extra_cols.keys())},
205-
probe_analysis,
206-
all_nok_outcomes as all_outcomes,
207-
final_outcome_label as outcome_label,
208-
final_outcome_value as outcome_value
209-
210-
FROM (
211-
WITH
212-
IF(resolver_asn = probe_asn, 1, 0) as is_isp_resolver,
213-
multiIf(
214-
top_dns_failure IN ('android_dns_cache_no_data', 'dns_nxdomain_error'),
215-
'nxdomain',
216-
coalesce(top_dns_failure, 'got_answer')
217-
) as dns_failure
218-
SELECT
219-
{",".join(extra_cols.values())},
220-
221-
anyHeavy(top_probe_analysis) as probe_analysis,
222-
223-
sumMap(
224-
map(
225-
CONCAT(IF(is_isp_resolver, 'dns_isp.blocked.', 'dns_other.blocked.'), dns_failure), dns_blocked,
226-
CONCAT(IF(is_isp_resolver, 'dns_isp.down.', 'dns_other.down.'), dns_failure), dns_down
227-
)
228-
) as dns_nok_outcomes,
229-
sum(dns_ok) as dns_ok_sum,
230-
231-
sumMap(
232-
map(
233-
CONCAT('tcp.blocked.', coalesce(top_tcp_failure, '')), tcp_blocked,
234-
CONCAT('tcp.down.', coalesce(top_tcp_failure, '')), tcp_down
235-
)
236-
) as tcp_nok_outcomes,
237-
sum(tcp_ok) as tcp_ok_sum,
238-
239-
sumMap(
240-
map(
241-
CONCAT('tls.blocked.', coalesce(top_tls_failure, '')), tls_blocked,
242-
CONCAT('tls.down.', coalesce(top_tls_failure, '')), tls_down
243-
)
244-
) as tls_nok_outcomes,
245-
sum(tls_ok) as tls_ok_sum
246-
247-
FROM analysis_web_measurement
248-
{where}
249-
GROUP BY {", ".join(extra_cols.keys())}
250-
ORDER BY {", ".join(extra_cols.keys())}
251-
)
252-
"""
173+
q = format_aggregate_query(extra_cols, where)
253174

254175
t = time.perf_counter()
255176
log.info(f"running query {q} with {q_args}")
256177
rows = db.execute(q, q_args)
257178

258-
fixed_cols = ["probe_analysis", "all_outcomes", "outcome_label", "outcome_value"]
179+
fixed_cols = [
180+
"probe_analysis",
181+
"count",
182+
"dns_isp_blocked",
183+
"dns_isp_down",
184+
"dns_isp_ok",
185+
"dns_other_blocked",
186+
"dns_other_down",
187+
"dns_other_ok",
188+
"tls_blocked",
189+
"tls_down",
190+
"tls_ok",
191+
"tcp_blocked",
192+
"tcp_down",
193+
"tcp_ok",
194+
"dns_isp_outcome",
195+
"dns_other_outcome",
196+
"tcp_outcome",
197+
"tls_outcome",
198+
"most_likely_ok",
199+
"most_likely_down",
200+
"most_likely_blocked",
201+
"most_likely_label",
202+
]
259203

260204
results: List[AggregationEntry] = []
261205
if rows and isinstance(rows, list):
262206
for row in rows:
263207
d = dict(zip(list(extra_cols.keys()) + fixed_cols, row))
264-
outcome_value = d["outcome_value"]
265-
outcome_label = d["outcome_label"]
266-
anomaly_count = 0
267-
confirmed_count = 0
268-
failure_count = 0
269-
ok_count = 0
270-
if outcome_label == "ok":
271-
ok_count = outcome_value
272-
elif "blocked." in outcome_label:
273-
if outcome_value >= anomaly_sensitivity:
274-
confirmed_count = outcome_value
275-
else:
276-
anomaly_count = outcome_value
277-
278-
# Map "down" to failures
279-
else:
280-
failure_count = outcome_value
208+
loni = Loni(
209+
dns_isp_blocked=d.get("dns_isp_blocked", 0.0),
210+
dns_isp_down=d.get("dns_isp_down", 0.0),
211+
dns_isp_ok=d.get("dns_isp_ok", 0.0),
212+
dns_other_blocked=d.get("dns_other_blocked", 0.0),
213+
dns_other_down=d.get("dns_other_down", 0.0),
214+
dns_other_ok=d.get("dns_other_ok", 0.0),
215+
tls_blocked=d.get("tls_blocked", 0.0),
216+
tls_down=d.get("tls_down", 0.0),
217+
tls_ok=d.get("tls_ok", 0.0),
218+
tcp_blocked=d.get("tcp_blocked", 0.0),
219+
tcp_down=d.get("tcp_down", 0.0),
220+
tcp_ok=d.get("tcp_ok", 0.0),
221+
dns_isp_outcome=d.get("dns_isp_outcome", ""),
222+
dns_other_outcome=d.get("dns_other_outcome", ""),
223+
tcp_outcome=d.get("tcp_outcome", ""),
224+
tls_outcome=d.get("tls_outcome", ""),
225+
)
226+
outcome_label = d["most_likely_label"]
227+
outcome_blocked = d["most_likely_blocked"]
228+
outcome_down = d["most_likely_down"]
229+
outcome_ok = d["most_likely_ok"]
281230

282231
entry = AggregationEntry(
283-
anomaly_count=anomaly_count,
284-
confirmed_count=confirmed_count,
285-
failure_count=failure_count,
286-
ok_count=ok_count,
287-
measurement_count=1.0,
232+
count=d["count"],
288233
measurement_start_day=d.get("measurement_start_day"),
234+
loni=loni,
289235
outcome_label=outcome_label,
290-
outcome_value=outcome_value,
236+
outcome_blocked=outcome_blocked,
237+
outcome_down=outcome_down,
238+
outcome_ok=outcome_ok,
291239
domain=d.get("domain"),
292240
probe_cc=d.get("probe_cc"),
293241
probe_asn=d.get("probe_asn"),

0 commit comments

Comments
 (0)