Skip to content

Latest commit

 

History

History
437 lines (335 loc) · 11.1 KB

File metadata and controls

437 lines (335 loc) · 11.1 KB

Operations Database Schema

Operational tables: jobs, config, caching, events.

Architecture: 01_ARCHITECTURE.md
Deployment: 04_DEPLOYMENT.md


Table Overview

Table/System Purpose Technology
river_job Background job queue PostgreSQL (River)
river_leader Job queue leader election PostgreSQL
fx_rates Historical FX rates PostgreSQL
recipients Beneficiary accounts PostgreSQL
quotes FX rate locks Redis (primary) + PostgreSQL (backup)
webhook_deliveries Webhook attempt log PostgreSQL
schema_versions Migration tracking PostgreSQL
Redis keys Cache, locks, sessions Redis
Kafka topics Event streaming Kafka

River Job Queue

Job Table

River uses PostgreSQL as its backing store for reliable job queuing.

-- River job table (managed by River, don't modify)
CREATE TABLE river_job (
    id BIGSERIAL PRIMARY KEY,
    args JSONB NOT NULL DEFAULT '{}',
    attempt SMALLINT NOT NULL DEFAULT 0,
    attempted_at TIMESTAMPTZ,
    attempted_by TEXT[],
    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    errors JSONB[],
    finalized_at TIMESTAMPTZ,
    kind TEXT NOT NULL,
    max_attempts SMALLINT NOT NULL,
    metadata JSONB NOT NULL DEFAULT '{}',
    priority SMALLINT NOT NULL DEFAULT 1,
    queue TEXT NOT NULL DEFAULT 'default',
    scheduled_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    state river_job_state NOT NULL DEFAULT 'available',
    tags VARCHAR(255)[],
    unique_key BYTEA,
    CONSTRAINT river_job_args_check CHECK (args IS NOT NULL),
    CONSTRAINT river_job_metadata_check CHECK (metadata IS NOT NULL)
);

-- Indexes for River
CREATE INDEX idx_river_job_queue_state ON river_job(queue, state, scheduled_at, priority DESC, id);
CREATE INDEX idx_river_job_unique_key ON river_job(unique_key) WHERE unique_key IS NOT NULL;

Job Types

// Transfer processing
JobKindTransferProcess = "transfer_process"

// Webhook delivery
JobKindWebhookDeliver = "webhook_deliver"

// FX rate fetch
JobKindFXRateFetch = "fx_rate_fetch"

// Compliance sync
JobKindComplianceSync = "compliance_sync"

// Reconciliation
JobKindReconciliation = "reconciliation"

// Netting execution
JobKindNettingExecute = "netting_execute"

Job Queues

Queue Priority Workers Purpose
critical 1-10 4 Transfer processing, compliance
default 100 2 Webhooks, notifications
batch 1000 1 Scheduled jobs, reconciliation

FX Rates

Historical FX rate storage.

CREATE TABLE fx_rates (
    id                      UUID PRIMARY KEY DEFAULT uuidv7(),
    from_currency           CHAR(3) NOT NULL,
    to_currency             CHAR(3) NOT NULL,
    rate                    NUMERIC(20,8) NOT NULL,
    source                  VARCHAR(50) NOT NULL,  -- xe, fixer, bi_jisdor, internal
    provider_timestamp      TIMESTAMPTZ,
    created_at              TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    
    CONSTRAINT unique_rate_snapshot UNIQUE (from_currency, to_currency, created_at)
);

-- Indexes
CREATE INDEX idx_fx_rates_pair ON fx_rates(from_currency, to_currency);
CREATE INDEX idx_fx_rates_created ON fx_rates(created_at);
CREATE INDEX idx_fx_rates_source ON fx_rates(source);

-- Latest rate view
CREATE VIEW latest_fx_rates AS
SELECT DISTINCT ON (from_currency, to_currency)
    from_currency,
    to_currency,
    rate,
    source,
    created_at
FROM fx_rates
ORDER BY from_currency, to_currency, created_at DESC;

Recipients

Beneficiary bank accounts (validated and stored).

CREATE TYPE recipient_type_enum AS ENUM ('individual', 'business');
CREATE TYPE recipient_status_enum AS ENUM ('pending', 'verified', 'rejected');

CREATE TABLE recipients (
    id                      UUID PRIMARY KEY DEFAULT uuidv7(),
    tenant_id               UUID NOT NULL,
    
    -- Type
    recipient_type          recipient_type_enum NOT NULL,
    
    -- Name
    name                    VARCHAR(200) NOT NULL,
    
    -- Bank Details (vary by country)
    country                 CHAR(2) NOT NULL,
    currency                CHAR(3) NOT NULL,
    
    -- EU/UK
    iban                    VARCHAR(34),
    bic_swift               VARCHAR(11),
    
    -- Indonesia
    bank_code               VARCHAR(10),  -- BI-FAST bank code
    account_number          VARCHAR(50),
    
    -- UK specific
    sort_code               VARCHAR(10),
    
    -- Address
    address_line1           VARCHAR(200),
    address_line2           VARCHAR(200),
    city                    VARCHAR(100),
    postal_code             VARCHAR(20),
    
    -- Validation
    status                  recipient_status_enum NOT NULL DEFAULT 'pending',
    verified_at             TIMESTAMPTZ,
    verified_by             VARCHAR(100),
    validation_errors       JSONB,
    
    -- Metadata
    metadata                JSONB NOT NULL DEFAULT '{}',
    created_at              TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    updated_at              TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    
    -- Soft delete
    deleted_at              TIMESTAMPTZ,
    
    CONSTRAINT unique_recipient_per_tenant UNIQUE (tenant_id, iban, account_number)
);

-- Indexes
CREATE INDEX idx_recipients_tenant ON recipients(tenant_id);
CREATE INDEX idx_recipients_status ON recipients(status);
CREATE INDEX idx_recipients_country ON recipients(country);
CREATE INDEX idx_recipients_iban ON recipients(iban);

-- Exclude deleted from unique constraint
CREATE UNIQUE INDEX idx_recipients_active ON recipients(tenant_id, iban) 
    WHERE deleted_at IS NULL AND iban IS NOT NULL;

Webhook Deliveries

Track webhook delivery attempts.

CREATE TYPE webhook_status_enum AS ENUM (
    'pending', 'delivering', 'delivered', 'failed'
);

CREATE TABLE webhook_deliveries (
    id                      UUID PRIMARY KEY DEFAULT uuidv7(),
    tenant_id               UUID NOT NULL,
    
    -- Event
    event_type              VARCHAR(100) NOT NULL,
    resource_type           VARCHAR(50) NOT NULL,
    resource_id             UUID NOT NULL,
    payload                 JSONB NOT NULL,
    
    -- Delivery
    url                     TEXT NOT NULL,
    status                  webhook_status_enum NOT NULL DEFAULT 'pending',
    
    -- Attempts
    attempt_count           INTEGER NOT NULL DEFAULT 0,
    last_attempt_at         TIMESTAMPTZ,
    last_error              TEXT,
    last_status_code        INTEGER,
    
    -- Success
    delivered_at            TIMESTAMPTZ,
    
    -- Scheduling
    next_attempt_at         TIMESTAMPTZ,
    
    created_at              TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    updated_at              TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- Indexes
CREATE INDEX idx_webhook_deliveries_tenant ON webhook_deliveries(tenant_id);
CREATE INDEX idx_webhook_deliveries_status ON webhook_deliveries(status) 
    WHERE status IN ('pending', 'delivering');
CREATE INDEX idx_webhook_deliveries_next_attempt ON webhook_deliveries(next_attempt_at) 
    WHERE status IN ('pending', 'delivering');

Schema Versions

Track database migrations.

CREATE TABLE schema_versions (
    version                 INTEGER PRIMARY KEY,
    applied_at              TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    description             TEXT NOT NULL,
    author                  TEXT NOT NULL,
    checksum                VARCHAR(64),  -- SHA-256 of migration file
    execution_time_ms       INTEGER
);

-- View current version
CREATE VIEW current_schema_version AS
SELECT MAX(version) as version FROM schema_versions;

Redis Key Patterns

FX Rate Cache

Key: fx:rate:{from}:{to}
Value: {rate, timestamp, source}
TTL: 60 seconds

Example: fx:rate:EUR:IDR → {"rate": 17250.50, "ts": 1705315200, "src": "xe"}

Quote Locks

Key: quote:{quote_id}
Value: {rate, expiry, tenant_id}
TTL: 600 seconds (10 minutes)

Example: quote:qt_abc123 → {"rate": 17250, "exp": 1705315800, "tenant": "tn_xyz"}

Rate Limiting

Key: rate_limit:{tenant_id}:{window_start}
Value: request_count
TTL: 60 seconds (sliding window)

Example: rate_limit:tn_xyz:1705315200 → 42

Idempotency

Key: idempotency:{tenant_id}:{key_hash}
Value: {transfer_id, created_at}
TTL: 86400 seconds (24 hours)

Example: idempotency:tn_xyz:a1b2c3 → {"id": "tf_def456", "ts": 1705315200}

Session Cache

Key: session:{session_id}
Value: {tenant_id, permissions, expiry}
TTL: 3600 seconds (1 hour)

Netting Windows

Key: netting:{tenant_id}:{corridor}
Value: {window_id, start_time, amount}
TTL: 300 seconds (5 minutes)

Example: netting:tn_xyz:EUR_IDR → {"id": "ng_789", "start": 1705315200, "amt": 10000}

Kafka Topics

Topic Configuration

Topic Partitions Replication Retention Purpose
transfer.events 12 3 7 days Transfer state changes
compliance.events 6 3 30 days Screening results
audit.trail 12 3 365 days Immutable audit log
webhook.dlq 3 3 7 days Failed webhooks
reconciliation.alerts 3 3 30 days FBO/Nostro issues
netting.executed 6 3 7 days Netting completions

Message Format

{
  "event_type": "transfer.state_changed",
  "version": "1.0",
  "timestamp": "2025-01-15T10:30:00.000Z",
  "payload": {
    "transfer_id": "tf_abc123",
    "tenant_id": "tn_xyz",
    "from_state": "processing",
    "to_state": "completed"
  },
  "metadata": {
    "request_id": "req_789",
    "source": "transfer-service"
  }
}

Maintenance Queries

Job Queue Health

-- Stuck jobs (no attempt in 1 hour)
SELECT kind, COUNT(*) 
FROM river_job 
WHERE state = 'available' 
  AND scheduled_at < NOW() - INTERVAL '1 hour'
GROUP BY kind;

-- Failed jobs
SELECT kind, args, errors 
FROM river_job 
WHERE state = 'discarded'
ORDER BY finalized_at DESC
LIMIT 10;

Webhook Backlog

-- Pending webhooks older than 5 minutes
SELECT tenant_id, COUNT(*), MIN(created_at)
FROM webhook_deliveries
WHERE status IN ('pending', 'delivering')
  AND created_at < NOW() - INTERVAL '5 minutes'
GROUP BY tenant_id;

Redis Memory

# Monitor key count by prefix
redis-cli --bigkeys

# Memory usage by pattern
redis-cli EVAL "
  local keys = redis.call('keys', ARGV[1])
  local total = 0
  for _,key in ipairs(keys) do
    total = total + redis.call('memory', 'usage', key)
  end
  return total
" 0 'fx:rate:*'

Kafka Lag

# Check consumer lag
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --describe --group transfer-service

Data Retention

Data Type PostgreSQL Redis Kafka
Jobs (completed) 7 days - -
Jobs (discarded) 30 days - -
FX rates 2 years 60s -
Webhook deliveries 90 days - -
Events - - 7-365 days

Next: README.md for database overview
Prev: LEDGER.md for TigerBeetle schema
Deploy: 04_DEPLOYMENT.md


Last Updated: 2026-02-15