Skip to content

Commit 81d2ddd

Browse files
committed
Deduplicate buckets
1 parent 826545c commit 81d2ddd

File tree

5 files changed

+99
-26
lines changed

5 files changed

+99
-26
lines changed

packages/service-core/src/sync/BucketChecksumState.ts

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ export class BucketChecksumState {
112112

113113
/** Set of all buckets in this checkpoint. */
114114
const bucketDescriptionMap = new Map(
115-
allBuckets.map((b) => [b.bucket, this.parameterState.overrideBucketDescription(b)])
115+
allBuckets.map((b) => [b.bucket, this.parameterState.translateResolvedBucket(b)])
116116
);
117117

118118
if (bucketDescriptionMap.size > this.context.maxBuckets) {
@@ -345,7 +345,7 @@ export interface CheckpointUpdate {
345345
/**
346346
* All buckets forming part of the checkpoint.
347347
*/
348-
buckets: BucketDescription[];
348+
buckets: ResolvedBucket[];
349349

350350
/**
351351
* If present, a set of buckets that have been updated since the last checkpoint.
@@ -367,7 +367,7 @@ export class BucketParameterState {
367367
private readonly explicitStreamSubscriptions: Record<string, util.RequestedStreamSubscription>;
368368
private readonly subscribedStreamNames: Set<string>;
369369
private readonly logger: Logger;
370-
private cachedDynamicBuckets: BucketDescription[] | null = null;
370+
private cachedDynamicBuckets: ResolvedBucket[] | null = null;
371371
private cachedDynamicBucketSet: Set<string> | null = null;
372372

373373
private readonly lookups: Set<string>;
@@ -412,7 +412,10 @@ export class BucketParameterState {
412412
hasDefaultStreams: this.includeDefaultStreams,
413413
streams: streamsByName
414414
});
415-
this.staticBuckets = new Map<string, BucketDescription>(this.querier.staticBuckets.map((b) => [b.bucket, b]));
415+
416+
this.staticBuckets = new Map<string, BucketDescription>(
417+
mergeBuckets(this.querier.staticBuckets).map((b) => [b.bucket, b])
418+
);
416419
this.lookups = new Set<string>(this.querier.parameterQueryLookups.map((l) => JSONBig.stringify(l.values)));
417420
this.subscribedStreamNames = new Set(Object.keys(streamsByName));
418421
}
@@ -422,7 +425,8 @@ export class BucketParameterState {
422425
* {@link util.ClientBucketDescription}.
423426
*/
424427
translateResolvedBucket(description: ResolvedBucket): util.ClientBucketDescription {
425-
// Assign
428+
// If the client is overriding the priority of any stream that yields this bucket, sync the bucket with that
429+
// priority.
426430
let priorityOverride: BucketPriority | null = null;
427431
for (const reason of description.inclusion_reasons) {
428432
if (reason != 'default') {
@@ -531,7 +535,7 @@ export class BucketParameterState {
531535
}
532536
}
533537

534-
let dynamicBuckets: BucketDescription[];
538+
let dynamicBuckets: ResolvedBucket[];
535539
if (hasParameterChange || this.cachedDynamicBuckets == null || this.cachedDynamicBucketSet == null) {
536540
dynamicBuckets = await querier.queryDynamicBucketDescriptions({
537541
getParameterSets(lookups) {
@@ -612,3 +616,32 @@ function limitedBuckets(buckets: string[] | { bucket: string }[], limit: number)
612616
const limited = buckets.slice(0, limit);
613617
return `${JSON.stringify(limited)}...`;
614618
}
619+
620+
/**
621+
* Resolves duplicate buckets in the given array, merging the inclusion reasons for duplicate.
622+
*
623+
* It's possible for duplicates to occur when a stream has multiple subscriptions, consider e.g.
624+
*
625+
* ```
626+
* sync_streams:
627+
* assets_by_category:
628+
* query: select * from assets where category in (request.parameters() -> 'categories')
629+
* ```
630+
*
631+
* Here, a client might subscribe once with `{"categories": [1]}` and once with `{"categories": [1, 2]}`. Since each
632+
* subscription is evaluated independently, this would lead to three buckets, with a duplicate `assets_by_category[1]`
633+
* bucket.
634+
*/
635+
function mergeBuckets(buckets: ResolvedBucket[]): ResolvedBucket[] {
636+
const byDefinition: Record<string, ResolvedBucket> = {};
637+
638+
for (const bucket of buckets) {
639+
if (Object.hasOwn(byDefinition, bucket.definition)) {
640+
byDefinition[bucket.definition].inclusion_reasons.push(...bucket.inclusion_reasons);
641+
} else {
642+
byDefinition[bucket.definition] = bucket;
643+
}
644+
}
645+
646+
return Object.values(byDefinition);
647+
}

packages/sync-rules/src/BucketParameterQuerier.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { BucketDescription } from './BucketDescription.js';
1+
import { BucketDescription, ResolvedBucket } from './BucketDescription.js';
22
import { RequestParameters, SqliteJsonRow, SqliteJsonValue } from './types.js';
33
import { normalizeParameterValue } from './utils.js';
44

@@ -14,7 +14,7 @@ export interface BucketParameterQuerier {
1414
* select request.user_id() as user_id()
1515
* select value as project_id from json_each(request.jwt() -> 'project_ids')
1616
*/
17-
readonly staticBuckets: BucketDescription[];
17+
readonly staticBuckets: ResolvedBucket[];
1818

1919
/**
2020
* True if there are dynamic buckets, meaning queryDynamicBucketDescriptions() should be used.
@@ -36,7 +36,7 @@ export interface BucketParameterQuerier {
3636
*
3737
* select id as user_id from users where users.id = request.user_id()
3838
*/
39-
queryDynamicBucketDescriptions(source: ParameterLookupSource): Promise<BucketDescription[]>;
39+
queryDynamicBucketDescriptions(source: ParameterLookupSource): Promise<ResolvedBucket[]>;
4040
}
4141

4242
export interface ParameterLookupSource {
@@ -54,7 +54,7 @@ export function mergeBucketParameterQueriers(queriers: BucketParameterQuerier[])
5454
hasDynamicBuckets: parameterQueryLookups.length > 0,
5555
parameterQueryLookups: parameterQueryLookups,
5656
async queryDynamicBucketDescriptions(source: ParameterLookupSource) {
57-
let results: BucketDescription[] = [];
57+
let results: ResolvedBucket[] = [];
5858
for (let q of queriers) {
5959
if (q.hasDynamicBuckets) {
6060
results.push(...(await q.queryDynamicBucketDescriptions(source)));

packages/sync-rules/src/SqlBucketDescriptor.ts

Lines changed: 39 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
import { BucketDescription } from './BucketDescription.js';
1+
import { BucketDescription, BucketInclusionReason, ResolvedBucket } from './BucketDescription.js';
22
import { BucketParameterQuerier, mergeBucketParameterQueriers } from './BucketParameterQuerier.js';
33
import { IdSequence } from './IdSequence.js';
44
import { SourceTableInterface } from './SourceTableInterface.js';
55
import { SqlDataQuery } from './SqlDataQuery.js';
66
import { SqlParameterQuery } from './SqlParameterQuery.js';
7-
import { SyncRulesOptions } from './SqlSyncRules.js';
7+
import { GetQuerierOptions, SyncRulesOptions } from './SqlSyncRules.js';
88
import { StaticSqlParameterQuery } from './StaticSqlParameterQuery.js';
99
import { StreamQuery } from './StreamQuery.js';
1010
import { TablePattern } from './TablePattern.js';
@@ -134,27 +134,48 @@ export class SqlBucketDescriptor {
134134
return results;
135135
}
136136

137-
getBucketParameterQuerier(parameters: RequestParameters): BucketParameterQuerier {
138-
const staticBuckets = this.getStaticBucketDescriptions(parameters);
137+
/**
138+
* @deprecated Use `pushBucketParameterQueriers` instead and merge at the top-level.
139+
*/
140+
getBucketParameterQuerier(options: GetQuerierOptions, parameters: RequestParameters): BucketParameterQuerier {
141+
const queriers: BucketParameterQuerier[] = [];
142+
this.pushBucketParameterQueriers(queriers, options, parameters);
143+
144+
return mergeBucketParameterQueriers(queriers);
145+
}
146+
147+
pushBucketParameterQueriers(
148+
result: BucketParameterQuerier[],
149+
options: GetQuerierOptions,
150+
parameters: RequestParameters
151+
) {
152+
const reasons = [this.bucketInclusionReason(options)];
153+
const staticBuckets = this.getStaticBucketDescriptions(parameters, reasons);
139154
const staticQuerier = {
140155
staticBuckets,
141156
hasDynamicBuckets: false,
142157
parameterQueryLookups: [],
143158
queryDynamicBucketDescriptions: async () => []
144159
} satisfies BucketParameterQuerier;
160+
result.push(staticQuerier);
145161

146162
if (this.parameterQueries.length == 0) {
147-
return staticQuerier;
163+
return;
148164
}
149165

150-
const dynamicQueriers = this.parameterQueries.map((query) => query.getBucketParameterQuerier(parameters));
151-
return mergeBucketParameterQueriers([staticQuerier, ...dynamicQueriers]);
166+
const dynamicQueriers = this.parameterQueries.map((query) => query.getBucketParameterQuerier(parameters, reasons));
167+
result.push(...dynamicQueriers);
152168
}
153169

154-
getStaticBucketDescriptions(parameters: RequestParameters): BucketDescription[] {
155-
let results: BucketDescription[] = [];
170+
getStaticBucketDescriptions(parameters: RequestParameters, reasons: BucketInclusionReason[]): ResolvedBucket[] {
171+
let results: ResolvedBucket[] = [];
156172
for (let query of this.globalParameterQueries) {
157-
results.push(...query.getStaticBucketDescriptions(parameters));
173+
for (const desc of query.getStaticBucketDescriptions(parameters)) {
174+
results.push({
175+
...desc,
176+
inclusion_reasons: reasons
177+
});
178+
}
158179
}
159180
return results;
160181
}
@@ -177,6 +198,14 @@ export class SqlBucketDescriptor {
177198
return result;
178199
}
179200

201+
private bucketInclusionReason(parameters: GetQuerierOptions): BucketInclusionReason {
202+
if (this.type == SqlBucketDescriptorType.STREAM && !this.subscribedToByDefault) {
203+
return { subscription: this.name };
204+
} else {
205+
return 'default';
206+
}
207+
}
208+
180209
tableSyncsData(table: SourceTableInterface): boolean {
181210
for (let query of this.dataQueries) {
182211
if (query.applies(table)) {

packages/sync-rules/src/SqlParameterQuery.ts

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
import { parse, SelectedColumn } from 'pgsql-ast-parser';
2-
import { BucketDescription, BucketPriority, DEFAULT_BUCKET_PRIORITY } from './BucketDescription.js';
2+
import {
3+
BucketDescription,
4+
BucketInclusionReason,
5+
BucketPriority,
6+
DEFAULT_BUCKET_PRIORITY
7+
} from './BucketDescription.js';
38
import { BucketParameterQuerier, ParameterLookup, ParameterLookupSource } from './BucketParameterQuerier.js';
49
import { SqlRuleError } from './errors.js';
510
import { SourceTableInterface } from './SourceTableInterface.js';
@@ -451,7 +456,10 @@ export class SqlParameterQuery {
451456
}
452457
}
453458

454-
getBucketParameterQuerier(requestParameters: RequestParameters): BucketParameterQuerier {
459+
getBucketParameterQuerier(
460+
requestParameters: RequestParameters,
461+
reasons: BucketInclusionReason[]
462+
): BucketParameterQuerier {
455463
const lookups = this.getLookups(requestParameters);
456464
if (lookups.length == 0) {
457465
// This typically happens when the query is pre-filtered using a where clause
@@ -470,7 +478,10 @@ export class SqlParameterQuery {
470478
parameterQueryLookups: lookups,
471479
queryDynamicBucketDescriptions: async (source: ParameterLookupSource) => {
472480
const bucketParameters = await source.getParameterSets(lookups);
473-
return this.resolveBucketDescriptions(bucketParameters, requestParameters);
481+
return this.resolveBucketDescriptions(bucketParameters, requestParameters).map((bucket) => ({
482+
...bucket,
483+
inclusion_reasons: reasons
484+
}));
474485
}
475486
};
476487
}

packages/sync-rules/src/SqlSyncRules.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -411,16 +411,16 @@ export class SqlSyncRules implements SyncRules {
411411
hasExplicitDefaultSubscription = true;
412412
}
413413

414-
queriers.push(descriptor.getBucketParameterQuerier(subscriptionParams));
414+
descriptor.pushBucketParameterQueriers(queriers, options, subscriptionParams);
415415
}
416416

417417
// If the stream is subscribed to by default and there is no explicit subscription that would match the default
418418
// subscription, also include the default querier.
419419
if (descriptor.subscribedToByDefault && !hasExplicitDefaultSubscription) {
420-
queriers.push(descriptor.getBucketParameterQuerier(params));
420+
descriptor.pushBucketParameterQueriers(queriers, options, params);
421421
}
422422
} else {
423-
queriers.push(descriptor.getBucketParameterQuerier(params));
423+
descriptor.pushBucketParameterQueriers(queriers, options, params);
424424
}
425425
}
426426

0 commit comments

Comments
 (0)