Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 52 additions & 12 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
name: Publish Docker Images

on:
release:
types: [published]
push:
tags:
- "v*"
workflow_dispatch:
inputs:
tag:
description: "Release tag (e.g. v0.0.1)"
description: "Release tag (e.g. v0.0.1) — used for backfill/manual publish"
required: true

env:
Expand All @@ -21,7 +22,6 @@ jobs:
publish:
strategy:
fail-fast: false
max-parallel: 1
matrix:
pg: [18, 17, 16, 15, 14]
name: PG ${{ matrix.pg }}
Expand All @@ -41,25 +41,51 @@ jobs:
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}

- name: Extract version from release tag
- name: Extract version from tag or input
id: version
run: |
VERSION="${{ github.event.release.tag_name }}"
# workflow_dispatch input takes precedence; tag-push uses GITHUB_REF_NAME
if [ -n "${{ inputs.tag }}" ]; then
VERSION="${{ inputs.tag }}"
else
VERSION="${GITHUB_REF_NAME}"
fi
VERSION="${VERSION#v}"
if [ -z "$VERSION" ]; then
echo "::error::Could not determine version from inputs.tag or GITHUB_REF_NAME"
exit 1
fi
echo "version=${VERSION}" >> "$GITHUB_OUTPUT"
echo "Building version: ${VERSION}"

- name: Check out ref at requested tag
if: ${{ inputs.tag != '' }}
run: |
# For workflow_dispatch backfill: checkout the requested tag so the
# Dockerfile build context matches that release.
git fetch --tags --depth=1 origin "refs/tags/${{ inputs.tag }}:refs/tags/${{ inputs.tag }}"
git checkout "${{ inputs.tag }}"

- name: Compute short version (major.minor)
id: shortver
run: |
# 0.1.2 → 0.1 ; non-semver inputs fall back to full version
FULL="${{ steps.version.outputs.version }}"
SHORT="$(echo "$FULL" | awk -F. '{ if (NF >= 2) print $1"."$2; else print $0 }')"
echo "value=${SHORT}" >> "$GITHUB_OUTPUT"

- name: Docker metadata
id: meta
uses: docker/metadata-action@v5
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
# Use raw tags so workflow_dispatch (no github.ref tag) also gets
# version-suffixed images — type=semver only fires on tag pushes.
tags: |
# v1.2.3 tag → 1.2.3-pg18, pg18, latest (only for PG 18)
type=semver,pattern={{version}},suffix=-pg${{ matrix.pg }}
type=semver,pattern={{major}}.{{minor}},suffix=-pg${{ matrix.pg }}
type=raw,value=pg${{ matrix.pg }}
type=raw,value=latest,enable=${{ matrix.pg == 18 }}
type=raw,value=${{ steps.version.outputs.version }}-pg${{ matrix.pg }}
type=raw,value=${{ steps.shortver.outputs.value }}-pg${{ matrix.pg }}
type=raw,value=pg${{ matrix.pg }},enable=${{ github.event_name == 'push' }}
type=raw,value=latest,enable=${{ matrix.pg == 18 && github.event_name == 'push' }}
labels: |
org.opencontainers.image.title=ulak
org.opencontainers.image.description=PostgreSQL ${{ matrix.pg }} with ulak extension
Expand All @@ -82,13 +108,27 @@ jobs:
needs: publish
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
pg: [18, 17, 16, 15, 14]
name: Verify PG ${{ matrix.pg }}
steps:
- name: Determine image tag to verify
id: tag
run: |
# Tag-push: pg<N> is updated atomically, prefer that.
# workflow_dispatch backfill: pull the version-suffixed tag instead.
if [ -n "${{ inputs.tag }}" ]; then
VERSION="${{ inputs.tag }}"
VERSION="${VERSION#v}"
echo "value=${VERSION}-pg${{ matrix.pg }}" >> "$GITHUB_OUTPUT"
else
echo "value=pg${{ matrix.pg }}" >> "$GITHUB_OUTPUT"
fi

- name: Pull and test image
run: |
IMAGE="${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:pg${{ matrix.pg }}"
IMAGE="${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:${{ steps.tag.outputs.value }}"
docker pull "$IMAGE"

# Start container
Expand Down
7 changes: 7 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,10 @@ tmp_check_iso/
# Static analysis output
/tmp/clang-tidy-output.txt

TODO.md

# Claude Code — per-user overrides (project-level settings.json is committed)
.claude
CLAUDE.md
scripts/validate-claude-setup.sh
docs/
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,8 @@ REGRESS = 00_setup 01_schema 02_endpoints_crud 03_endpoints_validation \
REGRESS_OPTS = --inputdir=tests/regress --outputdir=tests/regress --dbname=ulak_test

# Isolation tests for concurrency scenarios (FOR UPDATE SKIP LOCKED, circuit breaker, ordering)
ISOLATION = skip_locked modulo_partition ordering_key circuit_breaker batch_mark_processing \
ISOLATION = skip_locked modulo_partition ordering_key ordering_key_partition \
circuit_breaker batch_mark_processing \
circuit_breaker_threshold circuit_breaker_recovery ordering_key_completion \
retry_visibility priority_contention idempotency_conflict dlq_concurrent_redrive
ISOLATION_OPTS = --inputdir=tests/isolation --outputdir=tests/isolation --dbname=ulak_test
Expand Down
7 changes: 7 additions & 0 deletions sql/ulak--0.0.2--0.0.3.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- ulak 0.0.2 -> 0.0.3 upgrade migration
-- No schema changes. The worker partition algorithm changed in C code:
-- ordering_key messages are now hash-partitioned by hashtext(ordering_key)
-- instead of id-based modulo, restoring per-key FIFO guarantee in
-- multi-worker deployments. Existing pending messages continue to be
-- fetched correctly under the new algorithm.
SELECT 1;
10 changes: 9 additions & 1 deletion src/worker/batch_processor.c
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,15 @@ int64 batch_processor_run(Oid worker_dboid, int worker_id, int total_workers) {
" WHERE q2.ordering_key = q.ordering_key "
" AND q2.status = 'pending' "
" AND q2.id < q.id))) "
" AND (q.id %% %d) = %d " /* Modulo partitioning */
/*
* Hash-partition by ordering_key when set so all
* messages sharing a key route to the same worker;
* id-based fallback preserves balance for unkeyed
* messages. Required for per-key FIFO under N>1.
*/
" AND ((CASE WHEN q.ordering_key IS NULL THEN q.id "
" ELSE abs(hashtext(q.ordering_key))::bigint "
" END) %% %d) = %d "
"ORDER BY q.priority DESC, q.endpoint_id, q.created_at ASC "
"LIMIT %d FOR UPDATE OF q SKIP LOCKED",
STATUS_PENDING, total_workers, worker_id, ulak_batch_size);
Expand Down
40 changes: 40 additions & 0 deletions tests/isolation/expected/ordering_key_partition.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
Parsed test spec with 2 sessions

starting permutation: w0_fetch w1_fetch w0_commit w1_commit
step w0_fetch:
-- Lock then count via CTE (FOR UPDATE not allowed with aggregates).
WITH locked AS (
SELECT q.id
FROM ulak.queue q
WHERE q.status = 'pending'
AND ((CASE WHEN q.ordering_key IS NULL THEN q.id
ELSE abs(hashtext(q.ordering_key))::bigint
END) % 2) = 0
FOR UPDATE OF q SKIP LOCKED
)
SELECT count(*) AS fetched FROM locked;

fetched
-------
5
(1 row)

step w1_fetch:
WITH locked AS (
SELECT q.id
FROM ulak.queue q
WHERE q.status = 'pending'
AND ((CASE WHEN q.ordering_key IS NULL THEN q.id
ELSE abs(hashtext(q.ordering_key))::bigint
END) % 2) = 1
FOR UPDATE OF q SKIP LOCKED
)
SELECT count(*) AS fetched FROM locked;

fetched
-------
0
(1 row)

step w0_commit: COMMIT;
step w1_commit: COMMIT;
70 changes: 70 additions & 0 deletions tests/isolation/specs/ordering_key_partition.spec
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# Test: ordering_key hash-partitioning across multiple workers
#
# All messages with the same ordering_key must route to the same worker.
# When two workers (total_workers=2) run the partitioned fetch query
# concurrently, exactly one of them sees the rows; the other is empty.
# This guards against the multi-worker FIFO race in v0.0.2 where id-based
# modulo partitioning routed siblings of the same key to different workers.

setup
{
CREATE EXTENSION IF NOT EXISTS ulak;
DELETE FROM ulak.queue;
DELETE FROM ulak.endpoints;

INSERT INTO ulak.endpoints (name, protocol, config, enabled)
VALUES ('iso_partition_ep', 'http',
jsonb_build_object('url', 'http://localhost:9999/webhook'), false);

-- 5 messages all sharing ordering_key 'partition-X'
INSERT INTO ulak.queue (endpoint_id, payload, ordering_key)
SELECT e.id, jsonb_build_object('seq', g), 'partition-X'
FROM ulak.endpoints e, generate_series(1, 5) g
WHERE e.name = 'iso_partition_ep';
}

teardown
{
DELETE FROM ulak.queue;
DELETE FROM ulak.endpoints WHERE name = 'iso_partition_ep';
}

# Worker 0: total_workers=2, worker_id=0
session "w0"
setup { BEGIN; }
step "w0_fetch"
{
-- Lock then count via CTE (FOR UPDATE not allowed with aggregates).
WITH locked AS (
SELECT q.id
FROM ulak.queue q
WHERE q.status = 'pending'
AND ((CASE WHEN q.ordering_key IS NULL THEN q.id
ELSE abs(hashtext(q.ordering_key))::bigint
END) % 2) = 0
FOR UPDATE OF q SKIP LOCKED
)
SELECT count(*) AS fetched FROM locked;
}
step "w0_commit" { COMMIT; }

# Worker 1: total_workers=2, worker_id=1
session "w1"
setup { BEGIN; }
step "w1_fetch"
{
WITH locked AS (
SELECT q.id
FROM ulak.queue q
WHERE q.status = 'pending'
AND ((CASE WHEN q.ordering_key IS NULL THEN q.id
ELSE abs(hashtext(q.ordering_key))::bigint
END) % 2) = 1
FOR UPDATE OF q SKIP LOCKED
)
SELECT count(*) AS fetched FROM locked;
}
step "w1_commit" { COMMIT; }

# Exactly one worker must claim all 5 rows; the other must see 0.
permutation "w0_fetch" "w1_fetch" "w0_commit" "w1_commit"
Loading