Skip to content

Commit 2e14ad1

Browse files
committed
feat: add analytics bucket sharding
1 parent 6ba23e4 commit 2e14ad1

File tree

28 files changed

+1313
-269
lines changed

28 files changed

+1313
-269
lines changed
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
DO $$
2+
DECLARE
3+
iceberg_shards text[] = COALESCE(current_setting('storage.iceberg_shards', true), '[]::text[]')::text[];
4+
iceberg_default_shard text = COALESCE(current_setting('storage.iceberg_default_shard', true), '')::text;
5+
i_shard_key text;
6+
BEGIN
7+
8+
ALTER TABLE iceberg_namespaces ADD COLUMN IF NOT EXISTS metadata JSONB NOT NULL DEFAULT '{}';
9+
10+
ALTER TABLE iceberg_tables ADD COLUMN IF NOT EXISTS remote_table_id TEXT NULL;
11+
ALTER TABLE iceberg_tables ADD COLUMN IF NOT EXISTS shard_key TEXT NULL;
12+
ALTER TABLE iceberg_tables ADD COLUMN IF NOT EXISTS shard_id bigint NULL;
13+
14+
-- Only allow deleting namespaces if empty
15+
ALTER TABLE iceberg_tables DROP CONSTRAINT IF EXISTS iceberg_tables_namespace_id_fkey;
16+
17+
ALTER TABLE iceberg_tables
18+
ADD CONSTRAINT iceberg_tables_namespace_id_fkey
19+
FOREIGN KEY (namespace_id)
20+
REFERENCES iceberg_namespaces(id) ON DELETE RESTRICT;
21+
22+
23+
FOREACH i_shard_key IN ARRAY iceberg_shards
24+
LOOP
25+
INSERT INTO shard (kind, shard_key, capacity) VALUES ('iceberg-table', i_shard_key, 1000)
26+
ON CONFLICT (kind, shard_key) DO NOTHING;
27+
END LOOP;
28+
29+
30+
IF array_length(iceberg_shards, 1) = 0 THEN
31+
RETURN;
32+
END IF;
33+
34+
UPDATE iceberg_tables
35+
SET shard_id = (
36+
SELECT id FROM shard WHERE kind = 'iceberg-table' AND shard_key = iceberg_default_shard LIMIT 1
37+
), shard_key = iceberg_default_shard
38+
WHERE shard_id IS NULL;
39+
40+
WITH all_iceberg_tables as (
41+
SELECT t.id, t.tenant_id, t.name, t.shard_key, t.shard_id, t.namespace_id, row_number() OVER () as seq_num
42+
FROM iceberg_tables t
43+
),
44+
set_shard_reservation AS (
45+
INSERT INTO shard_reservation (tenant_id, shard_id, kind, resource_id, lease_expires_at, slot_no, status)
46+
SELECT it.tenant_id, s.id, 'iceberg-table', (it.namespace_id || '/' || it.name), now() + interval '5 minutes', it.seq_num - 1, 'confirmed'
47+
FROM all_iceberg_tables it
48+
JOIN shard s ON s.kind = 'iceberg-table' AND s.shard_key = it.shard_key
49+
),
50+
shard_slot AS (
51+
INSERT INTO shard_slots (tenant_id, shard_id, resource_id, slot_no)
52+
SELECT it.tenant_id, it.shard_id, it.id, it.seq_num - 1
53+
FROM all_iceberg_tables it
54+
RETURNING slot_no
55+
)
56+
UPDATE shard
57+
SET next_slot = (SELECT COALESCE((SELECT MAX(slot_no) + 1 FROM shard_slot), next_slot))
58+
WHERE shard.kind = 'iceberg-table'
59+
AND shard.shard_key = iceberg_default_shard;
60+
61+
END
62+
$$;
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
DO $$
2+
DECLARE
3+
is_multitenant bool = COALESCE(current_setting('storage.multitenant', true), 'false')::boolean;
4+
BEGIN
5+
6+
IF is_multitenant THEN
7+
RETURN;
8+
END IF;
9+
10+
ALTER TABLE iceberg_namespaces ADD COLUMN IF NOT EXISTS metadata JSONB NOT NULL DEFAULT '{}';
11+
ALTER TABLE iceberg_tables ADD COLUMN IF NOT EXISTS remote_table_id TEXT NULL;
12+
13+
ALTER TABLE iceberg_tables ADD COLUMN IF NOT EXISTS shard_key TEXT NULL;
14+
ALTER TABLE iceberg_tables ADD COLUMN IF NOT EXISTS shard_id TEXT NULL;
15+
END
16+
$$;

src/config.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@ type StorageConfigType = {
178178

179179
icebergEnabled: boolean
180180
icebergWarehouse: string
181+
icebergShards: string[]
181182
icebergCatalogUrl: string
182183
icebergCatalogAuthType: IcebergCatalogAuthType
183184
icebergCatalogToken?: string
@@ -515,6 +516,7 @@ export function getConfig(options?: { reload?: boolean }): StorageConfigType {
515516

516517
icebergEnabled: getOptionalConfigFromEnv('ICEBERG_ENABLED') === 'true',
517518
icebergWarehouse: getOptionalConfigFromEnv('ICEBERG_WAREHOUSE') || '',
519+
icebergShards: getOptionalConfigFromEnv('ICEBERG_SHARDS')?.trim().split(',') || [],
518520
icebergCatalogUrl:
519521
getOptionalConfigFromEnv('ICEBERG_CATALOG_URL') ||
520522
`https://s3tables.ap-southeast-1.amazonaws.com/iceberg/v1`,

src/http/plugins/iceberg.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { getTenantConfig, multitenantKnex } from '@internal/database'
55
import { getCatalogAuthStrategy, TenantAwareRestCatalog } from '@storage/protocols/iceberg/catalog'
66
import { getConfig } from '../../config'
77
import { ICEBERG_BUCKET_RESERVED_SUFFIX } from '@storage/limits'
8+
import { KnexShardStoreFactory, ShardCatalog, SingleShard } from '@internal/sharding'
89

910
declare module 'fastify' {
1011
interface FastifyRequest {
@@ -48,8 +49,13 @@ export const icebergRestCatalog = fastifyPlugin(async function (fastify: Fastify
4849
tenantId: req.tenantId,
4950
limits: limits,
5051
restCatalogUrl: icebergCatalogUrl,
51-
warehouse: icebergWarehouse,
5252
auth: catalogAuthType,
53+
sharding: isMultitenant
54+
? new ShardCatalog(new KnexShardStoreFactory(multitenantKnex))
55+
: new SingleShard({
56+
shardKey: icebergWarehouse,
57+
capacity: 10000,
58+
}),
5359
metastore: new KnexMetastore(isMultitenant ? multitenantKnex : req.db.pool.acquire(), {
5460
multiTenant: isMultitenant,
5561
schema: isMultitenant ? 'public' : 'storage',

src/http/routes/iceberg/namespace.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ const createNamespaceSchema = {
1010
type: 'object',
1111
properties: {
1212
namespace: { type: 'string', examples: ['namespace'] },
13+
properties: { type: 'object', additionalProperties: { type: 'string' } },
1314
},
1415
required: ['namespace'],
1516
},
@@ -104,6 +105,7 @@ export default async function routes(fastify: FastifyInstance) {
104105
const result = await request.icebergCatalog.createNamespace({
105106
namespace: [request.body.namespace],
106107
warehouse: request.params.prefix,
108+
properties: request.body.properties,
107109
})
108110

109111
return response.send(result)
@@ -140,7 +142,7 @@ export default async function routes(fastify: FastifyInstance) {
140142
config: {
141143
operation: { type: ROUTE_OPERATIONS.ICEBERG_NAMESPACE_EXISTS },
142144
},
143-
schema: { ...listNamespaceSchema, tags: ['iceberg'] },
145+
schema: { ...loadNamespaceSchema, tags: ['iceberg'] },
144146
},
145147
async (request, response) => {
146148
if (!request.icebergCatalog) {

src/http/routes/s3/router.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ export class Router<Context = unknown, S extends Schema = Schema> {
283283
const headerValue = headerParts[1]
284284

285285
const matchHeaderName = received[headerName] !== undefined
286-
const matchHeaderValue = headerValue ? received[headerName].startsWith(headerValue) : true
286+
const matchHeaderValue = headerValue ? received[headerName]?.startsWith(headerValue) : true
287287

288288
return matchHeaderName && matchHeaderValue
289289
})

src/internal/database/migrations/migrate.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ const {
3030
dbInstallRoles,
3131
dbRefreshMigrationHashesOnMismatch,
3232
dbMigrationFreezeAt,
33+
icebergShards,
3334
} = getConfig()
3435

3536
/**
@@ -640,14 +641,19 @@ function runMigrations({
640641
const migrationsToRun = filterMigrations(intendedMigrations, appliedMigrations)
641642
const completedMigrations = []
642643

644+
const icebergShardVar = `{${icebergShards.map((s) => `"${s}"`).join(',')}}`
645+
const icebergDefaultShard = icebergShards.length > 0 ? icebergShards[0] : ''
646+
643647
if (migrationsToRun.length > 0) {
644648
await client.query(SQL`SELECT
645649
set_config('storage.install_roles', ${dbInstallRoles}, false),
646650
set_config('storage.multitenant', ${isMultitenant ? 'true' : 'false'}, false),
647651
set_config('storage.anon_role', ${dbAnonRole}, false),
648652
set_config('storage.authenticated_role', ${dbAuthenticatedRole}, false),
649653
set_config('storage.service_role', ${dbServiceRole}, false),
650-
set_config('storage.super_user', ${dbSuperUser}, false)
654+
set_config('storage.super_user', ${dbSuperUser}, false),
655+
set_config('storage.iceberg_default_shard', ${icebergDefaultShard}, false),
656+
set_config('storage.iceberg_shards', ${icebergShardVar}, false);
651657
`)
652658
}
653659

src/internal/database/migrations/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,4 +44,5 @@ export const DBMigration = {
4444
'fix-object-level': 43,
4545
'vector-bucket-type': 44,
4646
'vector-buckets': 45,
47+
'iceberg-table-metadata': 46,
4748
}

src/internal/errors/codes.ts

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,16 @@ export enum ErrorCode {
4343
NotSupported = 'NotSupported',
4444
IcebergError = 'IcebergError',
4545
IcebergMaximumResourceLimit = 'IcebergMaximumResourceLimit',
46+
IcebergResourceNotEmpty = 'IcebergResourceNotEmpty',
4647
NoSuchCatalog = 'NoSuchCatalog',
4748

4849
S3VectorConflictException = 'ConflictException',
4950
S3VectorNotFoundException = 'NotFoundException',
5051
S3VectorBucketNotEmpty = 'VectorBucketNotEmpty',
5152
S3VectorMaxBucketsExceeded = 'S3VectorMaxBucketsExceeded',
5253
S3VectorMaxIndexesExceeded = 'S3VectorMaxIndexesExceeded',
53-
S3VectorNoAvailableShard = 'S3VectorNoAvailableShard',
54+
NoAvailableShard = 'NoAvailableShard',
55+
ShardNotFound = 'ShardNotFound',
5456
}
5557

5658
export const ERRORS = {
@@ -69,6 +71,13 @@ export const ERRORS = {
6971
message: `The maximum number of this resource ${limit} is reached`,
7072
originalError: e,
7173
}),
74+
IcebergResourceNotEmpty: (resource: string, name: string, e?: Error) =>
75+
new StorageBackendError({
76+
code: ErrorCode.IcebergResourceNotEmpty,
77+
httpStatusCode: 400,
78+
message: `The resource ${resource}: ${name} is not empty`,
79+
originalError: e,
80+
}),
7281
FeatureNotEnabled: (resource: string, feature: string, e?: Error) =>
7382
new StorageBackendError({
7483
code: ErrorCode.InvalidRequest,
@@ -472,11 +481,18 @@ export const ERRORS = {
472481
message: `Maximum number of indexes exceeded. Max allowed is ${maxIndexes}. Contact support to increase your limit.`,
473482
})
474483
},
475-
S3VectorNoAvailableShard() {
484+
NoAvailableShard() {
476485
return new StorageBackendError({
477-
code: ErrorCode.S3VectorNoAvailableShard,
486+
code: ErrorCode.NoAvailableShard,
478487
httpStatusCode: 500,
479-
message: `No available shards are available to host the vector index. Please try again later.`,
488+
message: `No available shards are available to host the resource. Please try again later.`,
489+
})
490+
},
491+
ShardNotFound(shardId: string) {
492+
return new StorageBackendError({
493+
code: ErrorCode.ShardNotFound,
494+
httpStatusCode: 404,
495+
message: `Shard not found: ${shardId}`,
480496
})
481497
},
482498
}

src/internal/sharding/knex.ts

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,18 @@ import {
1010
} from './store'
1111
import { hashStringToInt } from '@internal/hashing'
1212

13-
export class KnexShardStoreFactory implements ShardStoreFactory {
13+
export class KnexShardStoreFactory implements ShardStoreFactory<Knex.Transaction> {
1414
constructor(private knex: Knex) {}
15+
16+
withExistingTransaction(tnx: Knex.Transaction): ShardStoreFactory {
17+
return new KnexShardStoreFactory(tnx)
18+
}
1519
async withTransaction<T>(fn: (store: ShardStore) => Promise<T>): Promise<T> {
20+
if (this.knex.isTransaction) {
21+
// Already in a transaction, use current connection
22+
return fn(new KnexShardStore(this.knex))
23+
}
24+
1625
try {
1726
return await this.knex.transaction(async (trx) => {
1827
return fn(new KnexShardStore(trx))
@@ -33,6 +42,11 @@ class KnexShardStore implements ShardStore {
3342
return this.db.raw<T>(sql, params as any)
3443
}
3544

45+
async findShardById(shardId: number): Promise<ShardRow | null> {
46+
const shard = await this.db<ShardRow>('shard').select('*').where({ id: shardId }).first()
47+
return shard ?? null
48+
}
49+
3650
async advisoryLockByString(key: string): Promise<void> {
3751
const id = hashStringToInt(key)
3852
await this.q(`SELECT pg_advisory_xact_lock(?::bigint)`, [id])

0 commit comments

Comments
 (0)