diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index b6691cb..4b68145 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -1,12 +1,13 @@ name: Publish Docker Images on: - release: - types: [published] + push: + tags: + - "v*" workflow_dispatch: inputs: tag: - description: "Release tag (e.g. v0.0.1)" + description: "Release tag (e.g. v0.0.1) — used for backfill/manual publish" required: true env: @@ -21,7 +22,6 @@ jobs: publish: strategy: fail-fast: false - max-parallel: 1 matrix: pg: [18, 17, 16, 15, 14] name: PG ${{ matrix.pg }} @@ -41,25 +41,51 @@ jobs: username: ${{ github.actor }} password: ${{ secrets.GITHUB_TOKEN }} - - name: Extract version from release tag + - name: Extract version from tag or input id: version run: | - VERSION="${{ github.event.release.tag_name }}" + # workflow_dispatch input takes precedence; tag-push uses GITHUB_REF_NAME + if [ -n "${{ inputs.tag }}" ]; then + VERSION="${{ inputs.tag }}" + else + VERSION="${GITHUB_REF_NAME}" + fi VERSION="${VERSION#v}" + if [ -z "$VERSION" ]; then + echo "::error::Could not determine version from inputs.tag or GITHUB_REF_NAME" + exit 1 + fi echo "version=${VERSION}" >> "$GITHUB_OUTPUT" echo "Building version: ${VERSION}" + - name: Check out ref at requested tag + if: ${{ inputs.tag != '' }} + run: | + # For workflow_dispatch backfill: checkout the requested tag so the + # Dockerfile build context matches that release. + git fetch --tags --depth=1 origin "refs/tags/${{ inputs.tag }}:refs/tags/${{ inputs.tag }}" + git checkout "${{ inputs.tag }}" + + - name: Compute short version (major.minor) + id: shortver + run: | + # 0.1.2 → 0.1 ; non-semver inputs fall back to full version + FULL="${{ steps.version.outputs.version }}" + SHORT="$(echo "$FULL" | awk -F. '{ if (NF >= 2) print $1"."$2; else print $0 }')" + echo "value=${SHORT}" >> "$GITHUB_OUTPUT" + - name: Docker metadata id: meta uses: docker/metadata-action@v5 with: images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} + # Use raw tags so workflow_dispatch (no github.ref tag) also gets + # version-suffixed images — type=semver only fires on tag pushes. tags: | - # v1.2.3 tag → 1.2.3-pg18, pg18, latest (only for PG 18) - type=semver,pattern={{version}},suffix=-pg${{ matrix.pg }} - type=semver,pattern={{major}}.{{minor}},suffix=-pg${{ matrix.pg }} - type=raw,value=pg${{ matrix.pg }} - type=raw,value=latest,enable=${{ matrix.pg == 18 }} + type=raw,value=${{ steps.version.outputs.version }}-pg${{ matrix.pg }} + type=raw,value=${{ steps.shortver.outputs.value }}-pg${{ matrix.pg }} + type=raw,value=pg${{ matrix.pg }},enable=${{ github.event_name == 'push' }} + type=raw,value=latest,enable=${{ matrix.pg == 18 && github.event_name == 'push' }} labels: | org.opencontainers.image.title=ulak org.opencontainers.image.description=PostgreSQL ${{ matrix.pg }} with ulak extension @@ -82,13 +108,27 @@ jobs: needs: publish runs-on: ubuntu-latest strategy: + fail-fast: false matrix: pg: [18, 17, 16, 15, 14] name: Verify PG ${{ matrix.pg }} steps: + - name: Determine image tag to verify + id: tag + run: | + # Tag-push: pg is updated atomically, prefer that. + # workflow_dispatch backfill: pull the version-suffixed tag instead. + if [ -n "${{ inputs.tag }}" ]; then + VERSION="${{ inputs.tag }}" + VERSION="${VERSION#v}" + echo "value=${VERSION}-pg${{ matrix.pg }}" >> "$GITHUB_OUTPUT" + else + echo "value=pg${{ matrix.pg }}" >> "$GITHUB_OUTPUT" + fi + - name: Pull and test image run: | - IMAGE="${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:pg${{ matrix.pg }}" + IMAGE="${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:${{ steps.tag.outputs.value }}" docker pull "$IMAGE" # Start container diff --git a/.gitignore b/.gitignore index 76bc1ff..a73c6a5 100644 --- a/.gitignore +++ b/.gitignore @@ -76,3 +76,10 @@ tmp_check_iso/ # Static analysis output /tmp/clang-tidy-output.txt +TODO.md + +# Claude Code — per-user overrides (project-level settings.json is committed) +.claude +CLAUDE.md +scripts/validate-claude-setup.sh +docs/ diff --git a/Makefile b/Makefile index 7c09429..12d2efe 100644 --- a/Makefile +++ b/Makefile @@ -213,7 +213,8 @@ REGRESS = 00_setup 01_schema 02_endpoints_crud 03_endpoints_validation \ REGRESS_OPTS = --inputdir=tests/regress --outputdir=tests/regress --dbname=ulak_test # Isolation tests for concurrency scenarios (FOR UPDATE SKIP LOCKED, circuit breaker, ordering) -ISOLATION = skip_locked modulo_partition ordering_key circuit_breaker batch_mark_processing \ +ISOLATION = skip_locked modulo_partition ordering_key ordering_key_partition \ + circuit_breaker batch_mark_processing \ circuit_breaker_threshold circuit_breaker_recovery ordering_key_completion \ retry_visibility priority_contention idempotency_conflict dlq_concurrent_redrive ISOLATION_OPTS = --inputdir=tests/isolation --outputdir=tests/isolation --dbname=ulak_test diff --git a/sql/ulak--0.0.2--0.0.3.sql b/sql/ulak--0.0.2--0.0.3.sql new file mode 100644 index 0000000..e3a4493 --- /dev/null +++ b/sql/ulak--0.0.2--0.0.3.sql @@ -0,0 +1,7 @@ +-- ulak 0.0.2 -> 0.0.3 upgrade migration +-- No schema changes. The worker partition algorithm changed in C code: +-- ordering_key messages are now hash-partitioned by hashtext(ordering_key) +-- instead of id-based modulo, restoring per-key FIFO guarantee in +-- multi-worker deployments. Existing pending messages continue to be +-- fetched correctly under the new algorithm. +SELECT 1; diff --git a/src/worker/batch_processor.c b/src/worker/batch_processor.c index fcb5cdb..934d882 100644 --- a/src/worker/batch_processor.c +++ b/src/worker/batch_processor.c @@ -460,7 +460,15 @@ int64 batch_processor_run(Oid worker_dboid, int worker_id, int total_workers) { " WHERE q2.ordering_key = q.ordering_key " " AND q2.status = 'pending' " " AND q2.id < q.id))) " - " AND (q.id %% %d) = %d " /* Modulo partitioning */ + /* + * Hash-partition by ordering_key when set so all + * messages sharing a key route to the same worker; + * id-based fallback preserves balance for unkeyed + * messages. Required for per-key FIFO under N>1. + */ + " AND ((CASE WHEN q.ordering_key IS NULL THEN q.id " + " ELSE abs(hashtext(q.ordering_key))::bigint " + " END) %% %d) = %d " "ORDER BY q.priority DESC, q.endpoint_id, q.created_at ASC " "LIMIT %d FOR UPDATE OF q SKIP LOCKED", STATUS_PENDING, total_workers, worker_id, ulak_batch_size); diff --git a/tests/isolation/expected/ordering_key_partition.out b/tests/isolation/expected/ordering_key_partition.out new file mode 100644 index 0000000..5606ce1 --- /dev/null +++ b/tests/isolation/expected/ordering_key_partition.out @@ -0,0 +1,40 @@ +Parsed test spec with 2 sessions + +starting permutation: w0_fetch w1_fetch w0_commit w1_commit +step w0_fetch: + -- Lock then count via CTE (FOR UPDATE not allowed with aggregates). + WITH locked AS ( + SELECT q.id + FROM ulak.queue q + WHERE q.status = 'pending' + AND ((CASE WHEN q.ordering_key IS NULL THEN q.id + ELSE abs(hashtext(q.ordering_key))::bigint + END) % 2) = 0 + FOR UPDATE OF q SKIP LOCKED + ) + SELECT count(*) AS fetched FROM locked; + +fetched +------- + 5 +(1 row) + +step w1_fetch: + WITH locked AS ( + SELECT q.id + FROM ulak.queue q + WHERE q.status = 'pending' + AND ((CASE WHEN q.ordering_key IS NULL THEN q.id + ELSE abs(hashtext(q.ordering_key))::bigint + END) % 2) = 1 + FOR UPDATE OF q SKIP LOCKED + ) + SELECT count(*) AS fetched FROM locked; + +fetched +------- + 0 +(1 row) + +step w0_commit: COMMIT; +step w1_commit: COMMIT; diff --git a/tests/isolation/specs/ordering_key_partition.spec b/tests/isolation/specs/ordering_key_partition.spec new file mode 100644 index 0000000..01345eb --- /dev/null +++ b/tests/isolation/specs/ordering_key_partition.spec @@ -0,0 +1,70 @@ +# Test: ordering_key hash-partitioning across multiple workers +# +# All messages with the same ordering_key must route to the same worker. +# When two workers (total_workers=2) run the partitioned fetch query +# concurrently, exactly one of them sees the rows; the other is empty. +# This guards against the multi-worker FIFO race in v0.0.2 where id-based +# modulo partitioning routed siblings of the same key to different workers. + +setup +{ + CREATE EXTENSION IF NOT EXISTS ulak; + DELETE FROM ulak.queue; + DELETE FROM ulak.endpoints; + + INSERT INTO ulak.endpoints (name, protocol, config, enabled) + VALUES ('iso_partition_ep', 'http', + jsonb_build_object('url', 'http://localhost:9999/webhook'), false); + + -- 5 messages all sharing ordering_key 'partition-X' + INSERT INTO ulak.queue (endpoint_id, payload, ordering_key) + SELECT e.id, jsonb_build_object('seq', g), 'partition-X' + FROM ulak.endpoints e, generate_series(1, 5) g + WHERE e.name = 'iso_partition_ep'; +} + +teardown +{ + DELETE FROM ulak.queue; + DELETE FROM ulak.endpoints WHERE name = 'iso_partition_ep'; +} + +# Worker 0: total_workers=2, worker_id=0 +session "w0" +setup { BEGIN; } +step "w0_fetch" +{ + -- Lock then count via CTE (FOR UPDATE not allowed with aggregates). + WITH locked AS ( + SELECT q.id + FROM ulak.queue q + WHERE q.status = 'pending' + AND ((CASE WHEN q.ordering_key IS NULL THEN q.id + ELSE abs(hashtext(q.ordering_key))::bigint + END) % 2) = 0 + FOR UPDATE OF q SKIP LOCKED + ) + SELECT count(*) AS fetched FROM locked; +} +step "w0_commit" { COMMIT; } + +# Worker 1: total_workers=2, worker_id=1 +session "w1" +setup { BEGIN; } +step "w1_fetch" +{ + WITH locked AS ( + SELECT q.id + FROM ulak.queue q + WHERE q.status = 'pending' + AND ((CASE WHEN q.ordering_key IS NULL THEN q.id + ELSE abs(hashtext(q.ordering_key))::bigint + END) % 2) = 1 + FOR UPDATE OF q SKIP LOCKED + ) + SELECT count(*) AS fetched FROM locked; +} +step "w1_commit" { COMMIT; } + +# Exactly one worker must claim all 5 rows; the other must see 0. +permutation "w0_fetch" "w1_fetch" "w0_commit" "w1_commit"