-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathndjson_streaming.py
More file actions
83 lines (71 loc) · 3.02 KB
/
ndjson_streaming.py
File metadata and controls
83 lines (71 loc) · 3.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
"""VynFi NDJSON Streaming — rate-controlled, TB-scale data streaming (DataSynth 2.3+).
Stream generated data line-by-line from the API instead of downloading the full
archive. Useful for:
- Real-time pipelines (Kafka, message queues)
- Memory-constrained environments
- Progress tracking on long-running jobs
- Backpressure-aware consumers
Requires Scale tier or above. Pass ``file=...`` to select which archive file
to stream (defaults to ``stream.ndjson`` if present).
"""
import os
import time
import vynfi
client = vynfi.VynFi(api_key=os.environ["VYNFI_API_KEY"])
# Find a recent completed job
jobs = client.jobs.list(status="completed", limit=5)
if not jobs.data:
print("No completed jobs to stream from.")
raise SystemExit(1)
job = jobs.data[0]
print(f"Streaming from job: {job.id}")
print()
# ── Stream NDJSON with rate control ──────────────────────────────────────────
#
# Query parameters:
# file -- archive file to stream (required for most jobs)
# rate -- lines per second (1-10000, default 1000)
# burst -- burst capacity (max 1000, default 100)
# progress_interval -- emit progress envelope every N lines (default 1000)
#
# The SDK handles both true NDJSON (.jsonl files) and pretty-printed JSON
# (.json files) transparently.
start = time.monotonic()
data_count = 0
progress_count = 0
last_progress = 0
try:
for envelope in client.jobs.stream_ndjson(
job.id,
file="journal_entries.json", # stream the JE file
rate=500, # 500 lines/sec — gentle on local consumer
burst=50, # allow short bursts
progress_interval=1000, # progress envelope every 1k lines
):
if envelope.get("type") == "_progress":
progress_count += 1
elapsed = time.monotonic() - start
emitted = envelope["lines_emitted"]
rate = (emitted - last_progress) / max(elapsed, 0.001)
last_progress = emitted
print(f" [progress] {emitted:,} lines emitted ({rate:.0f}/sec)")
else:
data_count += 1
# Process the data record here — e.g., send to Kafka, write to DB, etc.
# For demo, just show the first few records.
if data_count <= 3:
header = envelope.get("header", envelope)
doc_id = header.get("document_id", "?")
doc_type = header.get("document_type", "?")
print(f" [data #{data_count}] {doc_type} {str(doc_id)[:30]}")
# Stop after a sample
if data_count >= 100 or progress_count >= 3:
break
except vynfi.VynFiError as e:
print(f" Stream error: {e}")
except KeyboardInterrupt:
print(" Stream interrupted by user")
elapsed = time.monotonic() - start
print()
print(f"Streamed {data_count:,} data records + {progress_count} progress envelopes")
print(f"in {elapsed:.1f}s ({data_count / max(elapsed, 0.001):.0f} records/sec)")