Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions migrations/multitenant/0022-iceberg-catalog-sharding.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
DO $$
DECLARE
iceberg_shards text[] = COALESCE(current_setting('storage.iceberg_shards', true), '[]::text[]')::text[];
iceberg_default_shard text = COALESCE(current_setting('storage.iceberg_default_shard', true), '')::text;
i_shard_key text;
BEGIN

ALTER TABLE iceberg_namespaces ADD COLUMN IF NOT EXISTS metadata JSONB NOT NULL DEFAULT '{}';

ALTER TABLE iceberg_tables ADD COLUMN IF NOT EXISTS remote_table_id TEXT NULL;
ALTER TABLE iceberg_tables ADD COLUMN IF NOT EXISTS shard_key TEXT NULL;
ALTER TABLE iceberg_tables ADD COLUMN IF NOT EXISTS shard_id bigint NULL;

-- Only allow deleting namespaces if empty
ALTER TABLE iceberg_tables DROP CONSTRAINT IF EXISTS iceberg_tables_namespace_id_fkey;
ALTER TABLE iceberg_tables DROP CONSTRAINT IF EXISTS iceberg_tables_namespace_id_fkey;

ALTER TABLE iceberg_tables
ADD CONSTRAINT iceberg_tables_namespace_id_fkey
FOREIGN KEY (namespace_id)
REFERENCES iceberg_namespaces(id) ON DELETE RESTRICT;

IF array_length(iceberg_shards, 1) = 0 THEN
RETURN;
END IF;

FOREACH i_shard_key IN ARRAY iceberg_shards
LOOP
INSERT INTO shard (kind, shard_key, capacity) VALUES ('iceberg-table', i_shard_key, 10000)
ON CONFLICT (kind, shard_key) DO NOTHING;
END LOOP;

UPDATE iceberg_tables
SET shard_id = (
SELECT id FROM shard WHERE kind = 'iceberg-table' AND shard_key = iceberg_default_shard LIMIT 1
), shard_key = iceberg_default_shard
WHERE shard_id IS NULL;
END
$$;
57 changes: 57 additions & 0 deletions migrations/multitenant/0023-iceberg-catalog-id.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
-- postgres-migrations disable-transaction
DO $$
BEGIN
DROP INDEX IF EXISTS idx_iceberg_namespaces_bucket_id;
DROP INDEX IF EXISTS idx_iceberg_tables_tenant_namespace_id;
DROP INDEX IF EXISTS idx_iceberg_tables_tenant_location;
DROP INDEX IF EXISTS idx_iceberg_tables_location;

-- remove primary key on iceberg_catalogs id
ALTER TABLE iceberg_catalogs DROP CONSTRAINT IF EXISTS iceberg_catalogs_pkey;

IF NOT EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'iceberg_catalogs' AND column_name = 'name') THEN
ALTER TABLE iceberg_catalogs RENAME COLUMN id TO name;
END IF;

ALTER TABLE iceberg_catalogs ADD COLUMN IF NOT EXISTS id uuid NOT NULL DEFAULT gen_random_uuid() PRIMARY KEY;
ALTER TABLE iceberg_catalogs ADD COLUMN IF NOT EXISTS deleted_at timestamptz NULL;

CREATE INDEX IF NOT EXISTS iceberg_catalogs_unique_name_idx
ON iceberg_catalogs (tenant_id, name) WHERE deleted_at IS NULL;

IF NOT EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'iceberg_namespaces' AND column_name = 'bucket_name') THEN
ALTER TABLE iceberg_namespaces RENAME COLUMN bucket_id to bucket_name;
END IF;

IF NOT EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'iceberg_tables' AND column_name = 'bucket_name') THEN
ALTER TABLE iceberg_tables RENAME COLUMN bucket_id to bucket_name;
END IF;

ALTER TABLE iceberg_namespaces ADD COLUMN IF NOT EXISTS catalog_id uuid NULL REFERENCES iceberg_catalogs(id) ON DELETE CASCADE ON UPDATE CASCADE ;
ALTER TABLE iceberg_tables ADD COLUMN IF NOT EXISTS catalog_id uuid NULL REFERENCES iceberg_catalogs(id) ON DELETE CASCADE ON UPDATE CASCADE;

CREATE UNIQUE INDEX IF NOT EXISTS idx_iceberg_namespaces_bucket_id ON iceberg_namespaces (tenant_id, catalog_id, name);
CREATE UNIQUE INDEX IF NOT EXISTS idx_iceberg_tables_tenant_namespace_id ON iceberg_tables (tenant_id, namespace_id, catalog_id, name);
CREATE UNIQUE INDEX IF NOT EXISTS idx_iceberg_tables_tenant_location ON iceberg_tables (tenant_id, location);
CREATE UNIQUE INDEX IF NOT EXISTS idx_iceberg_tables_location ON iceberg_tables (location);

-- create a unique index on name and deleted_at to allow only one active catalog with a given name
CREATE UNIQUE INDEX IF NOT EXISTS iceberg_catalogs_name_deleted_at_idx
ON iceberg_catalogs (tenant_id, name)
WHERE deleted_at IS NULL;

-- Backfill catalog_id for existing namespaces and tables
UPDATE iceberg_tables it
SET catalog_id = c.id
FROM iceberg_catalogs c
WHERE c.name = it.bucket_name;

UPDATE iceberg_namespaces iname
SET catalog_id = c.id
FROM iceberg_catalogs c
WHERE c.name = iname.bucket_name;

ALTER TABLE iceberg_namespaces ALTER COLUMN catalog_id SET NOT NULL;
ALTER TABLE iceberg_tables ALTER COLUMN catalog_id SET NOT NULL;
END
$$;
121 changes: 121 additions & 0 deletions migrations/multitenant/0024-fixed-exactly-once-queue-index.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
DO $$
DECLARE
partition_queue_ids text[];
i_partition_id text;
BEGIN

-- check if a schema with name pgboss_v10 exists
IF NOT EXISTS (SELECT 1 FROM pg_namespace WHERE nspname = 'pgboss_v10') THEN
RETURN;
END IF;

-- Create or replace function to archive exactly_once jobs
CREATE OR REPLACE FUNCTION pgboss_v10.archive_exactly_once_job()
RETURNS TRIGGER AS
$trigger$
BEGIN
IF NEW.policy = 'exactly_once' AND NEW.state IN ('completed', 'failed', 'cancelled') THEN
INSERT INTO pgboss_v10.archive (
id, name, priority, data, state, retry_limit, retry_count, retry_delay, retry_backoff,
start_after, started_on, singleton_key, singleton_on, expire_in, created_on, completed_on,
keep_until, output, dead_letter, policy
)
VALUES (
NEW.id, NEW.name, NEW.priority, NEW.data, NEW.state, NEW.retry_limit, NEW.retry_count,
NEW.retry_delay, NEW.retry_backoff, NEW.start_after, NEW.started_on, NEW.singleton_key,
NEW.singleton_on, NEW.expire_in, NEW.created_on, NEW.completed_on, NEW.keep_until + INTERVAL '30 days',
NEW.output, NEW.dead_letter, NEW.policy
)
ON CONFLICT DO NOTHING;

DELETE FROM pgboss_v10.job WHERE id = NEW.id;
END IF;
RETURN NEW;
END;
$trigger$
LANGUAGE plpgsql;

CREATE OR REPLACE FUNCTION pgboss_v10.create_queue(queue_name text, options json)
RETURNS VOID AS
$f$
DECLARE
table_name varchar := 'j' || encode(sha224(queue_name::bytea), 'hex');
queue_created_on timestamptz;
BEGIN
WITH q as (
INSERT INTO pgboss_v10.queue (
name,
policy,
retry_limit,
retry_delay,
retry_backoff,
expire_seconds,
retention_minutes,
dead_letter,
partition_name
)
VALUES (
queue_name,
options->>'policy',
(options->>'retryLimit')::int,
(options->>'retryDelay')::int,
(options->>'retryBackoff')::bool,
(options->>'expireInSeconds')::int,
(options->>'retentionMinutes')::int,
options->>'deadLetter',
table_name
)
ON CONFLICT DO NOTHING
RETURNING created_on
)
SELECT created_on into queue_created_on from q;

IF queue_created_on IS NULL THEN
RETURN;
END IF;

EXECUTE format('CREATE TABLE pgboss_v10.%I (LIKE pgboss_v10.job INCLUDING DEFAULTS)', table_name);

EXECUTE format('ALTER TABLE pgboss_v10.%1$I ADD PRIMARY KEY (name, id)', table_name);
EXECUTE format('ALTER TABLE pgboss_v10.%1$I ADD CONSTRAINT q_fkey FOREIGN KEY (name) REFERENCES pgboss_v10.queue (name) ON DELETE RESTRICT DEFERRABLE INITIALLY DEFERRED', table_name);
EXECUTE format('ALTER TABLE pgboss_v10.%1$I ADD CONSTRAINT dlq_fkey FOREIGN KEY (dead_letter) REFERENCES pgboss_v10.queue (name) ON DELETE RESTRICT DEFERRABLE INITIALLY DEFERRED', table_name);
EXECUTE format('CREATE UNIQUE INDEX %1$s_i1 ON pgboss_v10.%1$I (name, COALESCE(singleton_key, '''')) WHERE state = ''created'' AND policy = ''short''', table_name);
EXECUTE format('CREATE UNIQUE INDEX %1$s_i2 ON pgboss_v10.%1$I (name, COALESCE(singleton_key, '''')) WHERE state = ''active'' AND policy = ''singleton''', table_name);
EXECUTE format('CREATE UNIQUE INDEX %1$s_i3 ON pgboss_v10.%1$I (name, state, COALESCE(singleton_key, '''')) WHERE state <= ''active'' AND policy = ''stately''', table_name);
EXECUTE format('CREATE UNIQUE INDEX %1$s_i4 ON pgboss_v10.%1$I (name, singleton_on, COALESCE(singleton_key, '''')) WHERE state <> ''cancelled'' AND singleton_on IS NOT NULL', table_name);
EXECUTE format('CREATE INDEX %1$s_i5 ON pgboss_v10.%1$I (name, start_after) INCLUDE (priority, created_on, id) WHERE state < ''active''', table_name);
EXECUTE format('CREATE UNIQUE INDEX %1$s_i6 ON pgboss_v10.%1$I (name, COALESCE(singleton_key, '''')) WHERE state <= ''active'' AND policy = ''exactly_once''', table_name);

EXECUTE format('ALTER TABLE pgboss_v10.%I ADD CONSTRAINT cjc CHECK (name=%L)', table_name, queue_name);
EXECUTE format('ALTER TABLE pgboss_v10.job ATTACH PARTITION pgboss_v10.%I FOR VALUES IN (%L)', table_name, queue_name);

-- create a function trigger to archive the job when it's exactly_once policy and the state is either completed, failed or cancelled

EXECUTE format('CREATE TRIGGER archive_exactly_once_trigger_insert AFTER INSERT ON pgboss_v10.%I FOR EACH ROW EXECUTE FUNCTION pgboss_v10.archive_exactly_once_job()', table_name);
EXECUTE format('CREATE TRIGGER archive_exactly_once_trigger_update AFTER UPDATE ON pgboss_v10.%I FOR EACH ROW EXECUTE FUNCTION pgboss_v10.archive_exactly_once_job()', table_name);
END;
$f$
LANGUAGE plpgsql;



-- Recreate function with correct index type
SELECT array_agg(partition_name) from pgboss_v10.queue
WHERE policy = 'exactly_once'
INTO partition_queue_ids;

IF array_length(partition_queue_ids, 1) = 0 THEN
RETURN;
END IF;

FOR i_partition_id IN SELECT unnest(partition_queue_ids)
LOOP
EXECUTE format('DROP INDEX IF EXISTS pgboss_v10.%1$s_i6', i_partition_id);
EXECUTE format('CREATE UNIQUE INDEX IF NOT EXISTS %1$s_i6 ON pgboss_v10.%1$I (name, COALESCE(singleton_key, '''')) WHERE state <= ''active'' AND policy = ''exactly_once''', i_partition_id);
IF NOT EXISTS (SELECT 1 FROM pg_trigger WHERE tgname = 'archive_exactly_once_trigger_insert' AND tgrelid = ('pgboss_v10.' || i_partition_id)::regclass) THEN
EXECUTE format('CREATE TRIGGER archive_exactly_once_trigger_insert AFTER INSERT ON pgboss_v10.%I FOR EACH ROW EXECUTE FUNCTION pgboss_v10.archive_exactly_once_job()', i_partition_id);
EXECUTE format('CREATE TRIGGER archive_exactly_once_trigger_update AFTER UPDATE ON pgboss_v10.%I FOR EACH ROW EXECUTE FUNCTION pgboss_v10.archive_exactly_once_job()', i_partition_id);
END IF;
END LOOP;
END;
$$;
6 changes: 6 additions & 0 deletions migrations/multitenant/0025-upgrade-from-event.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
CREATE TABLE IF NOT EXISTS event_upgrades (
id SERIAL PRIMARY KEY,
event_id text NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE(event_id)
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
ALTER TABLE shard_reservation
DROP CONSTRAINT IF EXISTS shard_reservation_kind_resource_id_key CASCADE;

DROP INDEX IF EXISTS shard_reservation_active_slot_idx;

-- Create partial unique index for confirmed reservations
-- Only one confirmed reservation per resource
CREATE UNIQUE INDEX IF NOT EXISTS shard_reservation_kind_resource_confirmed_idx
ON shard_reservation (tenant_id, kind, resource_id);
16 changes: 16 additions & 0 deletions migrations/tenant/0047-iceberg-table-metadata.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
DO $$
DECLARE
is_multitenant bool = COALESCE(current_setting('storage.multitenant', true), 'false')::boolean;
BEGIN

IF is_multitenant THEN
RETURN;
END IF;

ALTER TABLE iceberg_namespaces ADD COLUMN IF NOT EXISTS metadata JSONB NOT NULL DEFAULT '{}';
ALTER TABLE iceberg_tables ADD COLUMN IF NOT EXISTS remote_table_id TEXT NULL;

ALTER TABLE iceberg_tables ADD COLUMN IF NOT EXISTS shard_key TEXT NULL;
ALTER TABLE iceberg_tables ADD COLUMN IF NOT EXISTS shard_id TEXT NULL;
END
$$;
73 changes: 73 additions & 0 deletions migrations/tenant/0048-iceberg-catalog-ids.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
DO $$
DECLARE
is_multitenant bool = COALESCE(current_setting('storage.multitenant', true), 'false')::boolean;
drop_constraint_sql text;
BEGIN

IF is_multitenant = false THEN
ALTER TABLE storage.iceberg_namespaces DROP CONSTRAINT IF EXISTS iceberg_namespaces_bucket_id_fkey;
ALTER TABLE storage.iceberg_tables DROP CONSTRAINT IF EXISTS iceberg_tables_bucket_id_fkey;
END IF;

-- remove primary key on iceberg_catalogs id
SELECT concat('ALTER TABLE storage.buckets_analytics DROP CONSTRAINT ', constraint_name)
INTO drop_constraint_sql
FROM information_schema.table_constraints
WHERE table_schema = 'storage'
AND table_name = 'buckets_analytics'
AND constraint_type = 'PRIMARY KEY';

EXECUTE drop_constraint_sql;

IF NOT EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'buckets_analytics' AND column_name = 'name') THEN
ALTER TABLE storage.buckets_analytics RENAME COLUMN id TO name;
END IF;

ALTER TABLE storage.buckets_analytics ADD COLUMN IF NOT EXISTS id uuid NOT NULL DEFAULT gen_random_uuid() PRIMARY KEY;
ALTER TABLE storage.buckets_analytics ADD COLUMN IF NOT EXISTS deleted_at timestamptz NULL;

CREATE UNIQUE INDEX IF NOT EXISTS buckets_analytics_unique_name_idx
ON storage.buckets_analytics (name) WHERE deleted_at IS NULL;

IF is_multitenant THEN
RETURN;
END IF;

DROP INDEX IF EXISTS idx_iceberg_namespaces_bucket_id;
DROP INDEX IF EXISTS idx_iceberg_tables_namespace_id;

-- remove constraint on iceberg_namespaces bucket_id
ALTER TABLE storage.iceberg_namespaces DROP CONSTRAINT IF EXISTS iceberg_namespaces_bucket_id_fkey;
-- remove constraint on iceberg_tables bucket_id
ALTER TABLE storage.iceberg_tables DROP CONSTRAINT IF EXISTS iceberg_tables_bucket_id_fkey;

IF NOT EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'iceberg_namespaces' AND column_name = 'bucket_name') THEN
ALTER TABLE storage.iceberg_namespaces RENAME COLUMN bucket_id to bucket_name;
END IF;

IF NOT EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'iceberg_tables' AND column_name = 'bucket_name') THEN
ALTER TABLE storage.iceberg_tables RENAME COLUMN bucket_id to bucket_name;
END IF;

ALTER TABLE storage.iceberg_namespaces ADD COLUMN IF NOT EXISTS catalog_id uuid NULL REFERENCES storage.buckets_analytics(id) ON DELETE CASCADE;
ALTER TABLE storage.iceberg_tables ADD COLUMN IF NOT EXISTS catalog_id uuid NULL REFERENCES storage.buckets_analytics(id) ON DELETE CASCADE;

CREATE UNIQUE INDEX IF NOT EXISTS idx_iceberg_namespaces_bucket_id ON storage.iceberg_namespaces (catalog_id, name);
CREATE UNIQUE INDEX IF NOT EXISTS idx_iceberg_tables_namespace_id ON storage.iceberg_tables (catalog_id, namespace_id, name);
CREATE UNIQUE INDEX IF NOT EXISTS idx_iceberg_tables_location ON storage.iceberg_tables (location);

-- Backfill catalog_id for existing namespaces and tables
UPDATE storage.iceberg_tables it
SET catalog_id = c.id
FROM storage.buckets_analytics c
WHERE c.name = it.bucket_name;

UPDATE storage.iceberg_namespaces iname
SET catalog_id = c.id
FROM storage.buckets_analytics c
WHERE c.name = iname.bucket_name;

ALTER TABLE storage.iceberg_namespaces ALTER COLUMN catalog_id SET NOT NULL;
ALTER TABLE storage.iceberg_tables ALTER COLUMN catalog_id SET NOT NULL;
END
$$;
2 changes: 2 additions & 0 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ type StorageConfigType = {

icebergEnabled: boolean
icebergWarehouse: string
icebergShards: string[]
icebergCatalogUrl: string
icebergCatalogAuthType: IcebergCatalogAuthType
icebergCatalogToken?: string
Expand Down Expand Up @@ -515,6 +516,7 @@ export function getConfig(options?: { reload?: boolean }): StorageConfigType {

icebergEnabled: getOptionalConfigFromEnv('ICEBERG_ENABLED') === 'true',
icebergWarehouse: getOptionalConfigFromEnv('ICEBERG_WAREHOUSE') || '',
icebergShards: getOptionalConfigFromEnv('ICEBERG_SHARDS')?.trim().split(',') || [],
icebergCatalogUrl:
getOptionalConfigFromEnv('ICEBERG_CATALOG_URL') ||
`https://s3tables.ap-southeast-1.amazonaws.com/iceberg/v1`,
Expand Down
8 changes: 7 additions & 1 deletion src/http/plugins/iceberg.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { getTenantConfig, multitenantKnex } from '@internal/database'
import { getCatalogAuthStrategy, TenantAwareRestCatalog } from '@storage/protocols/iceberg/catalog'
import { getConfig } from '../../config'
import { ICEBERG_BUCKET_RESERVED_SUFFIX } from '@storage/limits'
import { KnexShardStoreFactory, ShardCatalog, SingleShard } from '@internal/sharding'

declare module 'fastify' {
interface FastifyRequest {
Expand Down Expand Up @@ -48,8 +49,13 @@ export const icebergRestCatalog = fastifyPlugin(async function (fastify: Fastify
tenantId: req.tenantId,
limits: limits,
restCatalogUrl: icebergCatalogUrl,
warehouse: icebergWarehouse,
auth: catalogAuthType,
sharding: isMultitenant
? new ShardCatalog(new KnexShardStoreFactory(multitenantKnex))
: new SingleShard({
shardKey: icebergWarehouse,
capacity: 10000,
}),
metastore: new KnexMetastore(isMultitenant ? multitenantKnex : req.db.pool.acquire(), {
multiTenant: isMultitenant,
schema: isMultitenant ? 'public' : 'storage',
Expand Down
Loading
Loading