Real-time PostgreSQL Change Data Capture system built in Go that streams database changes to external systems.
- Real-time Change Streaming: Captures INSERT, UPDATE, DELETE, and TRUNCATE operations as they happen
- Hybrid Checkpoint Management: Redis primary with file backup for reliable LSN tracking
- Event Publishing: Publishes structured events to Redis with retry logic and dead letter queues
- Production Ready: Comprehensive error handling, graceful shutdown, and observability
- Resumable Processing: Automatically resumes from last processed position after failures
- Monitoring & Metrics: Prometheus-ready metrics for operational visibility
- PostgreSQL
ALTER SYSTEM SET wal_level = logical;
ALTER SYSTEM SET max_replication_slots = 10;
ALTER SYSTEM SET max_wal_senders = 10;
-- Restart PostgreSQL
CREATE PUBLICATION cdc_pub FOR ALL TABLES;
CREATE USER cdc_user REPLICATION LOGIN PASSWORD 'password';
GRANT SELECT ON ALL TABLES IN SCHEMA public TO cdc_user;- Environment
export DB_HOST=localhost
export DB_PORT=5432
export DB_NAME=your_database
export DB_USER=cdc_user
export DB_PASSWORD=password
export REDIS_ADDR=localhost:6379
export SLOT_NAME=mini_cdc_slot
export PUBLICATION_NAME=cdc_pub- Run
go mod tidy
go run cmd/main.goMonitor events:
redis-cli MONITOR
redis-cli SUBSCRIBE "cdc:stream:*"
cat ./data/checkpoint.lsnConsumer example:
consumer := eventbus.NewConsumer("cdc:stream:*", transport, metrics, func(event eventbus.Event) {
switch event.Action {
case "INSERT":
// Handle insert
case "UPDATE":
// Handle update
case "DELETE":
// Handle delete
}
})
consumer.Start()Each change event contains complete information about the database modification:
{
"ID": "550e8400-e29b-41d4-a716-446655440000",
"Schema": "public",
"Table": "users",
"Action": "UPDATE",
"Data": {
"id": 123,
"name": "John Doe",
"email": "[email protected]",
"updated_at": "2024-01-15T10:30:00Z"
},
"Old": {
"id": 123,
"name": "John Doe",
"email": "[email protected]",
"updated_at": "2024-01-15T09:15:00Z"
},
"Timestamp": "2024-01-15T10:30:00.123456Z",
"LSN": "16/B374D848"
}func NewDatabaseReplicator(targetDB *sql.DB) {
consumer := eventbus.NewConsumer("cdc:stream:*", transport, metrics, func(event eventbus.Event) {
switch event.Action {
case "INSERT":
insertIntoTarget(targetDB, event.Table, event.Data)
case "UPDATE":
updateInTarget(targetDB, event.Table, event.Data, event.Old)
case "DELETE":
deleteFromTarget(targetDB, event.Table, event.Old)
case "TRUNCATE":
truncateTarget(targetDB, event.Table)
}
})
consumer.Start()
}The system exports comprehensive metrics compatible with Prometheus:
# Event processing metrics
cdc_events_published_total{table="users"} 1250
cdc_events_consumed_total{table="users"} 1250
cdc_events_publish_errors_total{table="users"} 2
cdc_events_dlq_total{table="users"} 1
# Performance metrics
cdc_event_processing_duration_seconds{table="users"} 0.001234
MIT License