-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstreaming_fraud_monitor.py
More file actions
132 lines (107 loc) · 3.92 KB
/
streaming_fraud_monitor.py
File metadata and controls
132 lines (107 loc) · 3.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
"""VynFi Streaming — fraud monitoring with back-pressure.
Stream anomaly/fraud labels from the label JSONL file and throttle downstream
consumers (e.g. a Slack webhook or SIEM) so you never exceed their rate limit.
This is the ideal pattern for:
- Alert fanout (Slack/PagerDuty/SIEM)
- Rate-limited APIs (Salesforce, ServiceNow)
- Queue producers (Kafka/RabbitMQ)
Requires Scale tier. Streams the true NDJSON `labels/anomaly_labels.jsonl`
file (one record per line) for fast throughput.
"""
import os
import time
from collections import Counter
import vynfi
client = vynfi.VynFi(api_key=os.environ["VYNFI_API_KEY"])
job = None
for candidate in client.jobs.list(status="completed", limit=10).data:
try:
probe = client.jobs.stream_ndjson(candidate.id, file="labels/anomaly_labels.jsonl")
next(probe)
try:
probe.close() # type: ignore[union-attr]
except AttributeError:
pass
job = candidate
break
except (vynfi.NotFoundError, StopIteration):
continue
if job is None:
raise SystemExit("No completed job with a streamable anomaly_labels.jsonl found.")
print(f"Fraud monitor streaming from: {job.id}")
print()
class RateLimiter:
"""Token-bucket rate limiter for downstream consumer fan-out."""
def __init__(self, rate_per_sec: float) -> None:
self.rate = rate_per_sec
self.tokens = rate_per_sec
self.last = time.monotonic()
def take(self) -> None:
now = time.monotonic()
self.tokens = min(self.rate, self.tokens + (now - self.last) * self.rate)
self.last = now
if self.tokens < 1:
wait = (1 - self.tokens) / self.rate
time.sleep(wait)
self.tokens = 0
else:
self.tokens -= 1
# Simulate a downstream consumer limited to 2 alerts/sec (e.g. Slack webhook)
downstream = RateLimiter(rate_per_sec=2.0)
def send_alert(label: dict) -> None:
"""Simulated downstream alert — replace with Slack/PagerDuty/SIEM call."""
downstream.take()
atype = label.get("anomaly_type", {})
if isinstance(atype, dict):
category, subtype = next(iter(atype.items()), ("unknown", ""))
else:
category, subtype = str(atype), ""
doc_id = str(label.get("document_id", "?"))[:30]
print(f" [ALERT sent] {category}/{subtype} doc={doc_id} co={label.get('company_code')}")
# ── Stream only fraud/statistical labels — filter early, alert late ──────────
total_seen = 0
fraud_count = 0
alerts_sent = 0
categories: Counter = Counter()
subtypes: Counter = Counter()
start = time.monotonic()
for env in client.jobs.stream_ndjson(
job.id,
file="labels/anomaly_labels.jsonl",
rate=500, # pull fast from API
progress_interval=200,
):
if env.get("type") == "_progress":
continue
total_seen += 1
atype = env.get("anomaly_type", {})
if isinstance(atype, dict):
category = next(iter(atype), "unknown")
subtype = atype.get(category, "") if category != "unknown" else ""
else:
category, subtype = str(atype), ""
categories[category] += 1
subtypes[f"{category}/{subtype}"] += 1
# Filter policy: alert on Fraud and Statistical anomalies
if category in ("Fraud", "Statistical"):
fraud_count += 1
send_alert(env)
alerts_sent += 1
# Demo cap
if alerts_sent >= 5:
break
elapsed = time.monotonic() - start
print()
print(f"Stats:")
print(f" Labels streamed: {total_seen:,}")
print(f" Fraud/Stat matched: {fraud_count}")
print(f" Alerts dispatched: {alerts_sent} in {elapsed:.1f}s "
f"(~{alerts_sent / max(elapsed, 0.001):.1f}/sec — respects downstream limit)")
print()
print("All anomaly categories in sample:")
for cat, count in categories.most_common():
print(f" {cat:15s} {count}")
print()
print("Top anomaly patterns:")
for pattern, count in subtypes.most_common(10):
print(f" {pattern:45s} {count}")