Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@

@dataclass(frozen=True)
class TestOutcome:
failing: bool
errored: bool
failure_runs: int # count of failed test runs
success_runs: int # count of successful test runs
started_at: datetime
job_id: int

Expand Down Expand Up @@ -107,9 +107,11 @@ def _dedup_signal_events(self, signals: List[Signal]) -> List[Signal]:
new_commits: List[SignalCommit] = []
for c in s.commits:
filtered: List[SignalEvent] = []
prev_key: Optional[Tuple[datetime, int]] = None
# Include status in the key so we can retain both a FAILURE and
# a SUCCESS emitted at the same (started_at, wf_run_id)
prev_key: Optional[Tuple[datetime, int, SignalStatus]] = None
for e in c.events: # already sorted by (started_at, wf_run_id)
key = (e.started_at, e.wf_run_id)
key = (e.started_at, e.wf_run_id, e.status)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to dedup by status? I mean, for flakiness analysis it is relevant if the job failed 3x before succeeding once or it failed 1x and succeeded once.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding status to the key prevents deduping events with different status.

There are two constraints:

  1. github tends to reuse the same jobs (assigning them different ids) when "retry failed" is invoked for the workflow. This is what dedup is trying to solve.
  2. test reruns come from the same job (so they have the same timestamp and workflow run_id)

I can think of an alternative way to extract test retries and avoid deduplicating them — we can artificially increase the synthetic event time by one second.

However, you're right to question how that change would affect signal processing. I think tests are retried only when first run fails, so in theory this change only acts as a filter. Maybe we can simplify the whole thing and just not consider signals when we detect mixed retries on test levels (pre-filter them even before they are processed).

if key == prev_key:
continue
filtered.append(e)
Expand Down Expand Up @@ -259,7 +261,8 @@ def _build_test_signals(
which base(s) (by normalized job name) a test appears in. For each commit and (workflow, base),
we compute attempt metadata (pending/completed, start time). Then, for tests that failed at least once in
that base, we emit events per commit/attempt:
- If test_run_s3 rows exist → FAILURE if any failing/errored else SUCCESS
- If test_run_s3 rows exist → emit at most one FAILURE event if any failed runs exist,
and at most one SUCCESS event if any successful runs exist (both may be present).
- Else if group pending → PENDING
- Else → no event (missing)

Expand Down Expand Up @@ -290,7 +293,8 @@ def _build_test_signals(
value_fn=lambda j: (j.wf_run_id, j.run_attempt),
)

# Index test_run_s3 rows per (commit, job_base, wf_run, attempt, test_id) and collect base-scoped failing tests
# Index test_run_s3 rows per (commit, job_base, wf_run, attempt, test_id)
# Store aggregated failure/success counts
tests_by_group_attempt: Dict[
Tuple[Sha, WorkflowName, JobBaseName, WfRunId, RunAttempt, TestId],
TestOutcome,
Expand All @@ -309,24 +313,14 @@ def _build_test_signals(
tr.workflow_run_attempt,
tr.test_id,
)
prev = tests_by_group_attempt.get(key)
started_at = min(
(prev.started_at if prev else job.started_at), job.started_at
)
# Use job_id from the first failing test, or current job_id
use_job_id = (
prev.job_id
if prev and (prev.failing or prev.errored)
else int(tr.job_id)
)
outcome = TestOutcome(
failing=(prev.failing if prev else False) or bool(tr.failing),
errored=(prev.errored if prev else False) or bool(tr.errored),
started_at=started_at,
job_id=use_job_id,
failure_runs=tr.failure_runs,
success_runs=tr.success_runs,
started_at=job.started_at,
job_id=int(tr.job_id),
)
tests_by_group_attempt[key] = outcome
if outcome.failing or outcome.errored:
if outcome.failure_runs > 0:
failing_tests_by_job_base_name.add(
(job.workflow_name, job_base_name, tr.test_id)
)
Expand Down Expand Up @@ -354,7 +348,7 @@ def _build_test_signals(
if meta.is_cancelled:
# canceled attempts are treated as missing
continue
verdict = tests_by_group_attempt.get(
outcome = tests_by_group_attempt.get(
(
commit_sha,
wf_name,
Expand All @@ -378,17 +372,26 @@ def _build_test_signals(
"run_attempt": int(run_attempt),
}

if verdict:
events.append(
SignalEvent(
status=SignalStatus.FAILURE
if (verdict.failing or verdict.errored)
else SignalStatus.SUCCESS,
started_at=verdict.started_at,
job_id=verdict.job_id,
**event_common,
if outcome:
# Emit at most one FAILURE and one SUCCESS per attempt
if outcome.failure_runs > 0:
events.append(
SignalEvent(
status=SignalStatus.FAILURE,
started_at=outcome.started_at,
job_id=outcome.job_id,
**event_common,
)
)
if outcome.success_runs > 0:
events.append(
SignalEvent(
status=SignalStatus.SUCCESS,
started_at=outcome.started_at,
job_id=outcome.job_id,
**event_common,
)
)
)
elif meta.is_pending:
events.append(
SignalEvent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ def fetch_tests_for_job_ids(
)
# One query with a CTE that enumerates failed test ids from failed_job_ids,
# then filters the main selection by those ids for the current chunk.
# Note: success_runs explicitly excludes skipped rows via skipped_count = 0.
query = """
WITH failed_test_names AS (
SELECT DISTINCT concat(file, '|', classname, '|', name) AS test_id
Expand All @@ -223,8 +224,8 @@ def fetch_tests_for_job_ids(
AND (failure_count > 0 OR error_count > 0)
)
SELECT job_id, workflow_id, workflow_run_attempt, file, classname, name,
max(failure_count > 0) AS failing,
max(error_count > 0) AS errored
countIf(failure_count > 0 OR error_count > 0) AS failure_runs,
countIf(failure_count = 0 AND error_count = 0 AND skipped_count = 0) AS success_runs
FROM default.test_run_s3
WHERE job_id IN {job_ids:Array(Int64)}
AND concat(file, '|', classname, '|', name) IN failed_test_names
Expand All @@ -247,8 +248,8 @@ def fetch_tests_for_job_ids(
file=str(r[3] or ""),
classname=str(r[4] or ""),
name=str(r[5] or ""),
failing=int(r[6] or 0),
errored=int(r[7] or 0),
failure_runs=int(r[6] or 0),
success_runs=int(r[7] or 0),
)
)
dt = time.perf_counter() - t0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ class TestRow:
file: str
classname: str
name: str
failing: int # 0/1
errored: int # 0/1
failure_runs: int # count of failed test runs
success_runs: int # count of successful test runs

@property
def test_id(self) -> TestId:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ class TestSignalDedup(unittest.TestCase):
def setUp(self) -> None:
self.t0 = datetime(2025, 8, 20, 12, 0, 0)

def test_dedup_removes_adjacent_duplicates(self):
# Two events with identical (started_at, wf_run_id) within a single commit
def test_dedup_keeps_both_statuses(self):
# Two events with identical (started_at, wf_run_id) but different statuses
# should both be retained after dedup (we dedup by (started_at, wf_run_id, status)).
e1 = SignalEvent(
name="job-a",
status=SignalStatus.FAILURE,
Expand All @@ -33,10 +34,10 @@ def test_dedup_removes_adjacent_duplicates(self):
ex = SignalExtractor(workflows=["wf"], lookback_hours=24)
out = ex._dedup_signal_events([s])
self.assertEqual(len(out), 1)
self.assertEqual(len(out[0].commits[0].events), 1)
# keeps the first encountered event for that pair
self.assertEqual(out[0].commits[0].events[0].name, "job-a")
self.assertEqual(out[0].commits[0].events[0].status, SignalStatus.FAILURE)
# Both events survive because status differs
self.assertEqual(len(out[0].commits[0].events), 2)
statuses = {e.status for e in out[0].commits[0].events}
self.assertEqual(statuses, {SignalStatus.FAILURE, SignalStatus.SUCCESS})

def test_dedup_keeps_non_duplicates(self):
e1 = SignalEvent(
Expand Down Expand Up @@ -67,7 +68,7 @@ def test_dedup_keeps_non_duplicates(self):
self.assertEqual(len(out[0].commits[0].events), 3)

def test_dedup_applies_per_commit(self):
# Duplicates in different commits are not cross-deduped
# Dedup applies per commit: each commit retains at most one event per status
e1 = SignalEvent(
name="job-a",
status=SignalStatus.FAILURE,
Expand All @@ -86,8 +87,8 @@ def test_dedup_applies_per_commit(self):

ex = SignalExtractor(workflows=["wf"], lookback_hours=24)
out = ex._dedup_signal_events([s])
# Both commits should each have one event after dedup
self.assertEqual([len(c.events) for c in out[0].commits], [1, 1])
# Both commits should each have two events (one per status) after dedup
self.assertEqual([len(c.events) for c in out[0].commits], [2, 2])


if __name__ == "__main__":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ def T(
attempt: int,
file: str,
name: str,
failing: int,
errored: int = 0,
failure_runs: int,
success_runs: int = 0,
):
return TestRow(
job_id=JobId(job),
Expand All @@ -103,8 +103,8 @@ def T(
file=file,
classname="",
name=name,
failing=failing,
errored=errored,
failure_runs=failure_runs,
success_runs=success_runs,
)


Expand Down Expand Up @@ -197,7 +197,17 @@ def test_keep_going_failure_test_track_failure_and_no_job_signal(self):
rule="pytest failure",
)
]
tests = [T(job=20, run=400, attempt=1, file="f.py", name="test_a", failing=1)]
tests = [
T(
job=20,
run=400,
attempt=1,
file="f.py",
name="test_a",
failure_runs=1,
success_runs=0,
)
]
signals = self._extract(jobs, tests)
# test signal present with FAILURE
test_sig = self._find_test_signal(signals, "trunk", "f.py::test_a")
Expand Down Expand Up @@ -259,8 +269,24 @@ def test_non_test_inclusion_gate(self):
),
]
tests_a = [
T(job=40, run=600, attempt=1, file="f.py", name="test_x", failing=1),
T(job=41, run=610, attempt=1, file="f.py", name="test_x", failing=1),
T(
job=40,
run=600,
attempt=1,
file="f.py",
name="test_x",
failure_runs=1,
success_runs=0,
),
T(
job=41,
run=610,
attempt=1,
file="f.py",
name="test_x",
failure_runs=1,
success_runs=0,
),
]
signals_a = self._extract(jobs_a, tests_a)
self.assertIsNone(
Expand Down Expand Up @@ -374,8 +400,24 @@ def test_test_track_mapping_failure_then_success(self):
),
]
tests = [
T(job=60, run=800, attempt=1, file="g.py", name="test_y", failing=1),
T(job=61, run=810, attempt=1, file="g.py", name="test_y", failing=0),
T(
job=60,
run=800,
attempt=1,
file="g.py",
name="test_y",
failure_runs=1,
success_runs=0,
),
T(
job=61,
run=810,
attempt=1,
file="g.py",
name="test_y",
failure_runs=0,
success_runs=1,
),
]
signals = self._extract(jobs, tests)
test_sig = self._find_test_signal(signals, "trunk", "g.py::test_y")
Expand Down Expand Up @@ -424,7 +466,8 @@ def test_inject_pending_workflow_event_when_missing_in_signal(self):
attempt=1,
file="m.py",
name="test_synthetic_pending",
failing=1,
failure_runs=1,
success_runs=0,
)
]

Expand Down