-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstream.py
More file actions
37 lines (31 loc) · 1.28 KB
/
stream.py
File metadata and controls
37 lines (31 loc) · 1.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
"""Real-time stream event writer — persists SSE events to a named Modal Dict."""
import time
import uuid
from typing import Any
import modal
_STREAM_PREFIX = "osint-stream-"
def write_stream_event(scan_id: str, event_type: str, payload: dict[str, Any]) -> None:
"""
Write one SSE event to the persistent stream dict for scan_id.
Keys use UUIDs so concurrent resolver calls never collide.
next_seq is a best-effort monotonic counter (small gaps OK under concurrency).
Events are best-effort: failures are silently swallowed.
"""
if not scan_id:
return
try:
sd = modal.Dict.from_name(f"{_STREAM_PREFIX}{scan_id}", create_if_missing=True)
# next_seq has a benign read-then-write race under concurrency: two
# callers may read the same seq and both increment to the same value.
# This is acceptable because event keys are UUID-based so no data is
# lost — only the seq counter may have duplicates or small gaps.
seq: int = sd.get("next_seq", 0)
sd["next_seq"] = seq + 1
sd[f"evt_{uuid.uuid4().hex}"] = {
"seq": seq,
"type": event_type,
"payload": payload,
"ts": time.time(),
}
except Exception:
pass # stream events are best-effort