Continuous micro-interval poller that streams Zabbix history data into Kafka with checkpoint recovery and Kafka queue backpressure handling.
┌─────────────────────────────┐
│ Zabbix API (host groups) │
└──────────────┬──────────────┘
│
host_groups() / host.get()
│
┌──────────────────▼───────────────────┐
│ Streaming Loop (`main.rs`) │
│ • Ticker (micro_interval_ms) │
│ • Per-group time window selection │
│ • CheckpointStore update/persist │
└──────────────┬──────────────▲────────┘
│ │
│ load()/update()
│ │
┌─────────▼─────────┐ │
│ Checkpoint Store │ │
│ (groupid → ts) │ │
└─────────┬─────────┘ │
│ │
│ collect_group()
│ │
┌──────────────▼──────────────┐
│ DataCollector (`collector`)│
│ • Host discovery │
│ • Parallel host polling │
│ • history.get batching │
└──────────────┬──────────────┘
│ batches of MonitoringRecord
│
┌───────▼────────┐
│ Kafka Channel │ (bounded buffer)
└───────┬────────┘
│
┌─────────▼─────────┐
│ KafkaPublisher │
│ • Idempotent send │
│ • QueueFull backoff│
└─────────┬─────────┘
│
▼
Kafka Topic
- Streaming loop drives the micro-interval ticker, walks host groups, and pushes records into the Kafka channel.
- Checkpoint store persists the highest UNIX timestamp seen for each host group to JSON, enabling restarts and start-mode control. Supports three modes:
EARLIEST,LATEST, andCHECKPOINTwith intelligent priority handling. - Data collector fans out host/item history requests with a semaphore-limited concurrency pool and merges records.
- Kafka publisher drains the channel, performs idempotent publishes, and applies exponential backoff when the broker queue returns
QueueFull.
integration.ini drives runtime behavior. Key sections:
[ZABBIX]
server = http://localhost:8080
user = Admin
password = zabbix
[KAFKA]
queue_max_messages = 100000 ; optional producer buffer limit
bootstrap_servers = localhost:9092
topic = rust.metrics
[STREAM]
micro_interval_ms = 250 ; ticker cadence
buffer_capacity = 512 ; bounded channel depth
checkpoint_mode = LATEST ; EARLIEST, LATEST, or CHECKPOINT
checkpoint_path = checkpoint_state.json
host_groups = NONE ; comma list of group names (or NONE for all)checkpoint_mode = CHECKPOINTloads previously persisted timestamps.CURRENTclears them so the collector begins from “now”.- Adjust
micro_interval_msfor faster/slower polling andbuffer_capacityto tune upstream buffering relative to Kafka throughput. - Set
queue_max_messagesto bound the librdkafka internal queue; smaller values intensify backpressure handling and surfaceQueueFullwarnings during tests (size limits scale proportionally). host_groupsaccepts a comma-separated list of group names to include (comparison is case-insensitive). Omit the key or set it toNONEto stream every discovered group.
The streaming collector supports three checkpoint modes with intelligent priority handling:
EARLIEST- Start from timestamp 0 to retrieve all available historical data from ZabbixLATEST- Start from current time to get new data only (default behavior)CHECKPOINT- Resume from existing checkpoint file if it exists
Checkpoint files always take priority over configuration settings:
| Configuration | Checkpoint File | Effective Mode | Behavior |
|---|---|---|---|
EARLIEST |
❌ No | EARLIEST |
Start from timestamp 0 |
LATEST |
❌ No | LATEST |
Start from current time |
| ANY | ✅ Yes | CHECKPOINT |
Resume from checkpoint |
This ensures reliable restarts in production environments.
# Historical data import
[STREAM]
checkpoint_mode = EARLIEST
# Real-time monitoring (default)
[STREAM]
checkpoint_mode = LATEST
# Flexible (will resume from checkpoint if it exists)
[STREAM]
checkpoint_mode = CHECKPOINTAll configuration can also be set via environment variables:
export ZABBIX_KAFKA_CHECKPOINT_MODE=EARLIEST
export ZABBIX_KAFKA_CHECKPOINT_PATH=/custom/path/checkpoint.jsonEnvironment variables take precedence over configuration file settings.
The collector logs both the configured and effective modes:
configured_mode: Latest
effective_mode: Checkpoint
"Resuming from existing checkpoint file"
- Ensure the integration stack is up (
docker-compose.integration.ymlprovides Zabbix, Kafka, Postgres, etc.). - Build once for release performance:
& "$env:USERPROFILE\.cargo\bin\cargo.exe" build --release
- Launch the streaming collector:
& "$env:USERPROFILE\.cargo\bin\cargo.exe" run --release -- \ --config integration.ini --workers 20
The process will log discovered host groups, enter the continuous loop, and stream Zabbix history records to Kafka while persisting checkpoints.
Two PowerShell harnesses under scripts/ exercise end-to-end recovery flows, including synthetic data injection and Kafka verification:
-
checkpoint_resume_test.ps1- Starts a background job that sends Zabbix metrics for ~140 seconds.
- Runs the collector for a first interval, then stops it and waits (
DowntimeSeconds). - Restarts the collector to confirm it resumes from the prior checkpoint.
- Captures Kafka offsets/messages and writes artifacts under
artifacts/checkpoint_resume.
-
fresh_start_test.ps1- Backs up and deletes the existing checkpoint file.
- Generates synthetic metrics while a single collector run executes.
- Confirms new offsets and records appear, storing artifacts under
artifacts/fresh_start.
Execute them after the stack is running:
powershell.exe -ExecutionPolicy Bypass -File .\scripts\checkpoint_resume_test.ps1
powershell.exe -ExecutionPolicy Bypass -File .\scripts\fresh_start_test.ps1Each script emits the Kafka offset delta plus paths to captured logs and message samples, making it easy to diff runs or hand results to reviewers.
| Path | Purpose | Key Checks | Expected Artifacts |
|---|---|---|---|
| Streaming Happy Path | Run collector with existing checkpoints | info logs showing groups, Kafka offset growth continues steadily |
checkpoint_state.json updated, Kafka topic advancing |
Checkpoint Resume (checkpoint_resume_test.ps1) |
Validate restart resumes at last per-group timestamp | Offset delta > 0 on second run with no duplicate replay in Kafka | artifacts/checkpoint_resume/summary.txt, message sample JSON |
Fresh Start (fresh_start_test.ps1) |
Confirm wiping checkpoint restarts from wall-clock | Offset delta reflects only new data since restart | artifacts/fresh_start/summary.txt, backup and post-run checkpoint copies |
Backpressure (backpressure_test.ps1) |
Simulate slow Kafka broker (pause container, shrink queue_max_messages) |
warn logs: "Kafka queue full, backing off" with recovery without crashes |
Continuous logs, no panic; offsets eventually catch up; artifacts/backpressure/ summary/logs |
| Graceful Shutdown | Hit Ctrl+C while streaming |
Collector logs “Shutdown signal received, draining”, flushes Kafka, persists checkpoints | No lost offsets, Kafka task reports clean exit |
- Checkpoint granularity is per host group; if Zabbix emits late samples with identical second timestamps, consider extending the store to track per-item high-water marks.
- Backpressure strategy relies on
FutureProduceridempotence plus bounded channels—the polling loop naturally slows when Kafka is congested. - Graceful shutdown:
Ctrl+C(or container stop) triggers a drain path that persists outstanding checkpoints, flushes Kafka, and joins the publisher task before exit. - Extensibility: add new history types by extending
zabbix::collect_host_recordsbatching list and adjusting downstream filtering.
For deeper troubleshooting, examine the artifacts created by the automation scripts or enable --log_level debug to view individual batches and timestamps.