Skip to content

Commit c5187b3

Browse files
authored
feat: add support for sorting in list v2 endpoint (#749)
* feat: add support for sorting in list v2 endpoint * add test cases, fix time sorting in flat file listings * move order into sortby in database adapter * remove unused variable in query
1 parent 635177e commit c5187b3

File tree

7 files changed

+812
-21
lines changed

7 files changed

+812
-21
lines changed
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
CREATE OR REPLACE FUNCTION storage.search_v2 (
2+
prefix text,
3+
bucket_name text,
4+
limits int DEFAULT 100,
5+
levels int DEFAULT 1,
6+
start_after text DEFAULT '',
7+
sort_order text DEFAULT 'asc',
8+
sort_column text DEFAULT 'name',
9+
sort_column_after text DEFAULT ''
10+
) RETURNS TABLE (
11+
key text,
12+
name text,
13+
id uuid,
14+
updated_at timestamptz,
15+
created_at timestamptz,
16+
last_accessed_at timestamptz,
17+
metadata jsonb
18+
)
19+
SECURITY INVOKER
20+
AS $func$
21+
DECLARE
22+
sort_col text;
23+
sort_ord text;
24+
cursor_op text;
25+
cursor_expr text;
26+
sort_expr text;
27+
BEGIN
28+
-- Validate sort_order
29+
sort_ord := lower(sort_order);
30+
IF sort_ord NOT IN ('asc', 'desc') THEN
31+
sort_ord := 'asc';
32+
END IF;
33+
34+
-- Determine cursor comparison operator
35+
IF sort_ord = 'asc' THEN
36+
cursor_op := '>';
37+
ELSE
38+
cursor_op := '<';
39+
END IF;
40+
41+
sort_col := lower(sort_column);
42+
-- Validate sort column
43+
IF sort_col IN ('updated_at', 'created_at') THEN
44+
cursor_expr := format(
45+
'($5 = '''' OR ROW(date_trunc(''milliseconds'', %I), name COLLATE "C") %s ROW(COALESCE(NULLIF($6, '''')::timestamptz, ''epoch''::timestamptz), $5))',
46+
sort_col, cursor_op
47+
);
48+
sort_expr := format(
49+
'COALESCE(date_trunc(''milliseconds'', %I), ''epoch''::timestamptz) %s, name COLLATE "C" %s',
50+
sort_col, sort_ord, sort_ord
51+
);
52+
ELSE
53+
cursor_expr := format('($5 = '''' OR name COLLATE "C" %s $5)', cursor_op);
54+
sort_expr := format('name COLLATE "C" %s', sort_ord);
55+
END IF;
56+
57+
RETURN QUERY EXECUTE format(
58+
$sql$
59+
SELECT * FROM (
60+
(
61+
SELECT
62+
split_part(name, '/', $4) AS key,
63+
name,
64+
NULL::uuid AS id,
65+
updated_at,
66+
created_at,
67+
NULL::timestamptz AS last_accessed_at,
68+
NULL::jsonb AS metadata
69+
FROM storage.prefixes
70+
WHERE name COLLATE "C" LIKE $1 || '%%'
71+
AND bucket_id = $2
72+
AND level = $4
73+
AND %s
74+
ORDER BY %s
75+
LIMIT $3
76+
)
77+
UNION ALL
78+
(
79+
SELECT
80+
split_part(name, '/', $4) AS key,
81+
name,
82+
id,
83+
updated_at,
84+
created_at,
85+
last_accessed_at,
86+
metadata
87+
FROM storage.objects
88+
WHERE name COLLATE "C" LIKE $1 || '%%'
89+
AND bucket_id = $2
90+
AND level = $4
91+
AND %s
92+
ORDER BY %s
93+
LIMIT $3
94+
)
95+
) obj
96+
ORDER BY %s
97+
LIMIT $3
98+
$sql$,
99+
cursor_expr, -- prefixes WHERE
100+
sort_expr, -- prefixes ORDER BY
101+
cursor_expr, -- objects WHERE
102+
sort_expr, -- objects ORDER BY
103+
sort_expr -- final ORDER BY
104+
)
105+
USING prefix, bucket_name, limits, levels, start_after, sort_column_after;
106+
END;
107+
$func$ LANGUAGE plpgsql STABLE;

src/http/routes/object/listObjectsV2.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,14 @@ const searchRequestBodySchema = {
2323
limit: { type: 'integer', minimum: 1, examples: [10] },
2424
cursor: { type: 'string' },
2525
with_delimiter: { type: 'boolean' },
26+
sortBy: {
27+
type: 'object',
28+
properties: {
29+
column: { type: 'string', enum: ['name', 'updated_at', 'created_at'] },
30+
order: { type: 'string', enum: ['asc', 'desc'] },
31+
},
32+
required: ['column'],
33+
},
2634
},
2735
} as const
2836
interface searchRequestInterface extends AuthenticatedRequest {
@@ -57,13 +65,14 @@ export default async function routes(fastify: FastifyInstance) {
5765
}
5866

5967
const { bucketName } = request.params
60-
const { limit, with_delimiter, cursor, prefix } = request.body
68+
const { limit, with_delimiter, cursor, prefix, sortBy } = request.body
6169

6270
const results = await request.storage.from(bucketName).listObjectsV2({
6371
prefix,
6472
delimiter: with_delimiter ? '/' : undefined,
6573
maxKeys: limit,
6674
cursor,
75+
sortBy,
6776
})
6877

6978
return response.status(200).send(results)

src/internal/database/migrations/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,5 @@ export const DBMigration = {
3737
'optimise-existing-functions': 36,
3838
'add-bucket-name-length-trigger': 37,
3939
'iceberg-catalog-flag-on-buckets': 38,
40+
'add-search-v2-sort-support': 39,
4041
}

src/storage/database/adapter.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,11 @@ export interface Database {
105105
nextToken?: string
106106
maxKeys?: number
107107
startAfter?: string
108+
sortBy?: {
109+
order?: string
110+
column?: string
111+
after?: string
112+
}
108113
}
109114
): Promise<Obj[]>
110115

src/storage/database/knex.ts

Lines changed: 51 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -269,27 +269,57 @@ export class StorageKnexDB implements Database {
269269
nextToken?: string
270270
maxKeys?: number
271271
startAfter?: string
272+
sortBy?: {
273+
order?: string
274+
column?: string
275+
after?: string
276+
}
272277
}
273278
) {
274279
return this.runQuery('ListObjectsV2', async (knex) => {
275280
if (!options?.delimiter) {
276281
const query = knex
277282
.table('objects')
278283
.where('bucket_id', bucketId)
279-
.select(['id', 'name', 'metadata', 'updated_at'])
284+
.select(['id', 'name', 'metadata', 'updated_at', 'created_at', 'last_accessed_at'])
280285
.limit(options?.maxKeys || 100)
281286

287+
// only allow these values for sort columns, "name" is excluded intentionally as it is the default and used as tie breaker when sorting by other columns
288+
const allowedSortColumns = new Set(['updated_at', 'created_at'])
289+
const allowedSortOrders = new Set(['asc', 'desc'])
290+
const sortColumn =
291+
options?.sortBy?.column && allowedSortColumns.has(options.sortBy.column)
292+
? options.sortBy.column
293+
: undefined
294+
const sortOrder =
295+
options?.sortBy?.order && allowedSortOrders.has(options.sortBy.order)
296+
? options.sortBy.order
297+
: 'asc'
298+
299+
if (sortColumn) {
300+
query.orderBy(sortColumn, sortOrder)
301+
}
282302
// knex typing is wrong, it doesn't accept a knex.raw on orderBy, even though is totally legit
283303
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
284304
// @ts-ignore
285-
query.orderBy(knex.raw('name COLLATE "C"'))
305+
query.orderBy(knex.raw(`name COLLATE "C"`), sortOrder)
286306

287307
if (options?.prefix) {
288308
query.where('name', 'like', `${options.prefix}%`)
289309
}
290310

291311
if (options?.nextToken) {
292-
query.andWhere(knex.raw('name COLLATE "C" > ?', [options?.nextToken]))
312+
const pageOperator = sortOrder === 'asc' ? '>' : '<'
313+
if (sortColumn && options.sortBy?.after) {
314+
query.andWhere(
315+
knex.raw(
316+
`ROW(date_trunc('milliseconds', ${sortColumn}), name COLLATE "C") ${pageOperator} ROW(COALESCE(NULLIF(?, '')::timestamptz, 'epoch'::timestamptz), ?)`,
317+
[options.sortBy.after, options.nextToken]
318+
)
319+
)
320+
} else {
321+
query.andWhere(knex.raw(`name COLLATE "C" ${pageOperator} ?`, [options.nextToken]))
322+
}
293323
}
294324

295325
return query
@@ -302,14 +332,30 @@ export class StorageKnexDB implements Database {
302332
}
303333

304334
if (useNewSearchVersion2 && options?.delimiter === '/') {
335+
let paramPlaceholders = '?,?,?,?,?'
336+
const sortParams: (string | null)[] = []
337+
// this migration adds 3 more parameters to search v2 support sorting
338+
if (await tenantHasMigrations(this.tenantId, 'add-search-v2-sort-support')) {
339+
paramPlaceholders += ',?,?,?'
340+
sortParams.push(
341+
options?.sortBy?.order || 'asc',
342+
options?.sortBy?.column || 'name',
343+
options?.sortBy?.after || null
344+
)
345+
}
305346
const levels = !options?.prefix ? 1 : options.prefix.split('/').length
306-
const query = await knex.raw('select * from storage.search_v2(?,?,?,?,?)', [
347+
const searchParams = [
307348
options?.prefix || '',
308349
bucketId,
309350
options?.maxKeys || 1000,
310351
levels,
311352
options?.startAfter || '',
312-
])
353+
...sortParams,
354+
]
355+
const query = await knex.raw(
356+
`select * from storage.search_v2(${paramPlaceholders})`,
357+
searchParams
358+
)
313359

314360
return query.rows
315361
}

src/storage/object.ts

Lines changed: 72 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,12 @@ interface CopyObjectParams {
4141
ifUnmodifiedSince?: Date
4242
}
4343
}
44+
export interface ListObjectsV2Result {
45+
folders: Obj[]
46+
objects: Obj[]
47+
hasNext: boolean
48+
nextCursor?: string
49+
}
4450

4551
/**
4652
* ObjectStorage
@@ -586,18 +592,27 @@ export class ObjectStorage {
586592
startAfter?: string
587593
maxKeys?: number
588594
encodingType?: 'url'
589-
}) {
595+
sortBy?: {
596+
column: 'name' | 'created_at' | 'updated_at'
597+
order?: string
598+
}
599+
}): Promise<ListObjectsV2Result> {
590600
const limit = Math.min(options?.maxKeys || 1000, 1000)
591601
const prefix = options?.prefix || ''
592602
const delimiter = options?.delimiter
593603

594-
const cursor = options?.cursor ? decodeContinuationToken(options?.cursor) : undefined
604+
const cursor = options?.cursor ? decodeContinuationToken(options.cursor) : undefined
595605
let searchResult = await this.db.listObjectsV2(this.bucketId, {
596606
prefix: options?.prefix,
597607
delimiter: options?.delimiter,
598608
maxKeys: limit + 1,
599-
nextToken: cursor,
600-
startAfter: cursor || options?.startAfter,
609+
nextToken: cursor?.startAfter,
610+
startAfter: cursor?.startAfter || options?.startAfter,
611+
sortBy: {
612+
order: cursor?.sortOrder || options?.sortBy?.order,
613+
column: cursor?.sortColumn || options?.sortBy?.column,
614+
after: cursor?.sortColumnAfter,
615+
},
601616
})
602617

603618
let prevPrefix = ''
@@ -638,15 +653,31 @@ export class ObjectStorage {
638653
const objects: Obj[] = []
639654
searchResult.forEach((obj) => {
640655
const target = obj.id === null ? folders : objects
656+
const name = obj.id === null && !obj.name.endsWith('/') ? obj.name + '/' : obj.name
641657
target.push({
642658
...obj,
643-
name: options?.encodingType === 'url' ? encodeURIComponent(obj.name) : obj.name,
659+
name: options?.encodingType === 'url' ? encodeURIComponent(name) : name,
644660
})
645661
})
646662

647-
const nextContinuationToken = isTruncated
648-
? encodeContinuationToken(searchResult[searchResult.length - 1].name)
649-
: undefined
663+
let nextContinuationToken: string | undefined
664+
if (isTruncated) {
665+
const sortColumn = (cursor?.sortColumn || options?.sortBy?.column) as
666+
| 'name'
667+
| 'created_at'
668+
| 'updated_at'
669+
| undefined
670+
671+
nextContinuationToken = encodeContinuationToken({
672+
startAfter: searchResult[searchResult.length - 1].name,
673+
sortOrder: cursor?.sortOrder || options?.sortBy?.order,
674+
sortColumn,
675+
sortColumnAfter:
676+
sortColumn && sortColumn !== 'name' && searchResult[searchResult.length - 1][sortColumn]
677+
? new Date(searchResult[searchResult.length - 1][sortColumn] || '').toISOString()
678+
: undefined,
679+
})
680+
}
650681

651682
return {
652683
hasNext: isTruncated,
@@ -806,16 +837,42 @@ export class ObjectStorage {
806837
}
807838
}
808839

809-
function encodeContinuationToken(name: string) {
810-
return Buffer.from(`l:${name}`).toString('base64')
840+
interface ContinuationToken {
841+
startAfter: string
842+
sortOrder?: string // 'asc' | 'desc'
843+
sortColumn?: string
844+
sortColumnAfter?: string
811845
}
812846

813-
function decodeContinuationToken(token: string) {
814-
const decoded = Buffer.from(token, 'base64').toString().split(':')
847+
const CONTINUATION_TOKEN_PART_MAP: Record<string, keyof ContinuationToken> = {
848+
l: 'startAfter',
849+
o: 'sortOrder',
850+
c: 'sortColumn',
851+
a: 'sortColumnAfter',
852+
}
815853

816-
if (decoded.length === 0) {
817-
throw new Error('Invalid continuation token')
854+
function encodeContinuationToken(tokenInfo: ContinuationToken) {
855+
let result = ''
856+
for (const [k, v] of Object.entries(CONTINUATION_TOKEN_PART_MAP)) {
857+
if (tokenInfo[v]) {
858+
result += `${k}:${tokenInfo[v]}\n`
859+
}
818860
}
861+
return Buffer.from(result.slice(0, -1)).toString('base64')
862+
}
819863

820-
return decoded[1]
864+
function decodeContinuationToken(token: string): ContinuationToken {
865+
const decodedParts = Buffer.from(token, 'base64').toString().split('\n')
866+
const result: ContinuationToken = {
867+
startAfter: '',
868+
sortOrder: 'asc',
869+
}
870+
for (const part of decodedParts) {
871+
const partMatch = part.match(/^(\S):(.*)/)
872+
if (!partMatch || partMatch.length !== 3 || !(partMatch[1] in CONTINUATION_TOKEN_PART_MAP)) {
873+
throw new Error('Invalid continuation token')
874+
}
875+
result[CONTINUATION_TOKEN_PART_MAP[partMatch[1]]] = partMatch[2]
876+
}
877+
return result
821878
}

0 commit comments

Comments
 (0)