Skip to content

WIP: New unified syntax #313

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 16 commits into
base: sync-streams
Choose a base branch
from
Draft

WIP: New unified syntax #313

wants to merge 16 commits into from

Conversation

simolus3
Copy link
Contributor

This is still WIP, I'll restructure the PRs a bit to make them easier to review.

This adds support for sync streams being defined as a single SQL statement (instead of the split between parameter and data queries we have to day). For details, see the internal "2025-05 Subqueries / New Sync Streams Syntax" document.

Overview

This new syntax is implemented with the same basic primitives also used for sync rules, meaning that assigning rows to buckets and and assigning buckets to users is a three-step process:

  1. For a row matched by a data query in sync rules and the outer query in sync streams, we compute all bucket ids that row could possibly belong to (evaluateRow).
  2. For non-trivial parameter queries before, we compute an index of request parameters => data query results and persist it (evaluateParameterRow).
  3. Finally, when a user connects, we look up all bucket ids that user should receive (BucketParameterQuerier, using persisted lookups if subqueries exist).

Despite that being kept the same, there are some pretty fundamental differences between the rules and streams:

  1. While we still have parameters (as in, a thing that causes rows to be distributed across different buckets), these are no longer explicit. They also don't have a name anymore.
  2. Because parameters are created implicitly instead of with parameter queries, each parameter is referenced exactly once.
  3. A single stream can define multiple disjunct parameter sets by using an OR clause in a WHERE. This is a difference to sync rules, where multiple independent rules would have to be written.

Examples

Before looking at how this is implemented, it's helpful to consider a few examples (particularly interesting ones in bold):

  1. SELECT * FROM issues WHERE length(description) < 10: This is a query without parameters, you could define the same query with sync rules.
  2. SELECT * FROM issues WHERE owner_id = request.user_id(). An implicit parameter is introduced whenever a value derived from data is compared with data derived from the request (like token, user, or stream parameters). Previously, this would correspond to a static parameter query (e.g. SELECT request.user_id() as uid and then select * from issues where owner_id = bucket.user_id as a data query).
  3. SELECT * FROM issues WHERE owner_id = request.user_id() AND length(description) < 10: This combination of the two is also possible with sync rules. Note that conditions that only apply on the row to sync don't introduce parameters. They only affect how evaluateRow works, by ignoring rows not matching the parameter.
  4. SELECT * FROM issues WHERE owner_id = request.user_id() OR length(description) < 10 OR token_parameters.is_admin: Here, there are three independent conditions that could cause a row to get synced! The first one depends on an implicit stream parameter, the second one only depends on the row, the third one only depends on the token. This query is impossible to represent with a single sync rule, but with streams it's pretty straightforward.
  5. select * from comments where issue_id in (select id from issues where owner_id = request.user_id()): This is equivalent to a parameter query selecting from issues before.
  6. select * from lists WHERE owner_id = (SELECT id FROM users WHERE id = requests.user_id() and users.is_admin). This form is also supported by sync rules. However, there's another way to write that now:
  7. select * from lists WHERE owner_id = requests.user_id() AND requests.user_id() IN (SELECT id FROM users WHERE is_admin). This creates a parameter lookup for admin users by their id. However, that parameter lookup doesn't create a bucket parameter! Instead, it's used as an additional request filter in the third step.
  8. The same as 7, but with an OR instead of AND: Some users (admins) have access to everything, others only to the rows they own. This is a pretty useful query that was impossible to write (in a single stream) before.
  9. Multiple subqueries: SELECT * FROM users WHERE id IN (SELECT user_a FROM friends WHERE user_b = request.user_id()) OR id IN (SELECT user_b FROM friends WHERE user_a = request.user_id()). These also create two independent buckets to sync, both with a parameter backed by a lookup.
  10. Complex conditions: select * from comments where (issue_id in (select id from issues where owner_id = request.user_id()) OR token_parameters.is_admin) AND NOT comments.is_deleted is a valid stream definition. The compiler applies the distributive law two create two stream variants (one with a parameter and a row condition, and one with only the row condition but only visible to users with admin rights).
  11. select * from comments where not (is_deleted AND issue_id not in (select id from issues where owner_id = request.user_id())). Why one would possibly want to write queries like this is beyond my understanding, a side-effect of how the compiler handles query 11 is that this is also allowed.

Implementation

I've tried to document the implementation in detail, but it's still helpful to have a broad overview of how these queries are implemented.

After all intermediate steps, streams are represented by an array of StreamVariants. Each variant is formed by an OR clause, where the left and right subfilters may depend on different internal parameter sets (see e.g. example query 4). Since the parameters are different, we need to encode the variant in resulting bucket ids. E.g. query 4 would have buckets of the form stream|0["owner_id"] for matching owners, stream|1[] for rows with length(description < 10) and stream|2[] for all rows (that bucket would only be visible to admin users).

Each variant consists of:

  1. Subqueries: Subqueries replace parameter rows, so they are responsible for creating parameter lookup values. Note that the result of lookup values is not necessarily a stream parameter though (see query 7).
  2. Parameters. Unlike with sync rules, where parameters are named and we have parameter match clauses describing how to deal with them, these parameters contain logic for resolving them (reflecting the fact that each parameter is used exactly once). This lookup consists of two parts:
    1. filterRow(): This maps a row to sync into all possible values that would match that row. E.g. for FROM comments WHERE issue_in IN (SELECT ...), filterRow(comments) would return comments.issue_id. For the overlap operator ( &&), this would return all values in the array.
    2. A request lookup strategy. This can either be a StaticLookup, extracting the other side of the comparison from request parameters (e.g. in WHERE owner_id = request.user()); or an EqualsRowInSubqueryLookup which extracts values from indexed lookups.
  3. Static row filters: Filters like WHERE length(content) < 10 that only depend on the row are included as additionalRowFilters. They only affect the first resolve step, by ignoring some rows in evaluateRow.
  4. Request filters: Filters like WHERE token_parameters.is_admin that only depend on request parameters are included here. They don't rely on parameters, but can exclude some requests. They can either be static, or also rely on subquery results (e.g. WHERE request.user_id() IN (SELECT id FROM users WHERE is_admin)).

Within a variant, conditions form a conjunction: Rows are only included if all filters match.

Different variants form a disjunction: Rows are distributed to all buckets for each matching variant, and users see all variants where a request filter grants access.

To compile a WHERE clause into variants, we can't use most of the logic currently in sql_filters.ts because our way of dealing with subqueries and bucket parameters is incompatible with sync rules. Instead, we define classes for boolean algebra with the FilterOperator class in streams/filter.ts. This allows composing filters based on:

  1. AND, OR and NOT boolean operators.
  2. "Simple conditions" that either only depend on row data (WHERE length(content) < 10) or only on request data (WHERE token_parameters.is_admin). These are created and composed using the existing SqlTools class.
  3. The CompareRowValueWithStreamParameter class, which creates an implicit parameter based on what is a ParameterMatchClause in sync rules.
  4. The InOperator, which checks whether an expression derived from row data is included in results of a subquery derived from request data. Similarly to the existing query compiler, IN operators working on row/request data on both sides are compiled into a SimpleCondition instead.
  5. The OverlapOperator, which is very similar to InOperator but takes an array on the left-side and matches if the subquery intersects with that array.
  6. The ExistsOperator, which is also similar to InOperator but doesn't compare values, it matches if the subquery returns any row. This is used to compile WHERE request.user_id() IN (SELECT * FROM users WHERE is_admin), by pushing the request parameter into the subquery: WHERE EXISTS (SELECT _ FROM users WHERE is_admin AND id = request.user_id()).

Compiling a WHERE clause then involves:

  1. Turning the term into a sum-of-products (so OR as the top-level operator and AND within).
  2. Asserting that no NOT operator appears before anything that isn't a SimpleCondition (for those we can push the operator into the condition via composeFunction(OPERATOR_NOT, ...)). So after this step, no NOT operators appear in the query.
  3. Compiling each term in the outermost OR into a variant.
    a. By introducing parameters for CompareRowValueWithStreamParameter, InOperator and OverlapOperator.
    b. Introducing additional row filters for SimpleConditions that are static or depend on row data.
    c. And requests filters for ExistsOperator and SimpleCondition`s that depend on request data.

Remaining todos:

  • Add a debug representation.
  • Implement remaining methods available on SqlBucketDescriptor.
  • Remove remaining usages of SqlBucketDescriptor outside of sync-rules.

Copy link

changeset-bot bot commented Jul 29, 2025

🦋 Changeset detected

Latest commit: 3dceb77

The changes in this PR will be included in the next version bump.

This PR includes changesets to release 20 packages
Name Type
@powersync/service-module-postgres-storage Patch
@powersync/service-module-mongodb-storage Patch
@powersync/service-core-tests Patch
@powersync/service-module-postgres Patch
@powersync/service-rsocket-router Patch
@powersync/service-errors Patch
@powersync/service-module-mongodb Patch
@powersync/service-core Patch
@powersync/service-module-mysql Patch
@powersync/service-module-core Patch
@powersync/service-sync-rules Patch
@powersync/lib-service-postgres Patch
@powersync/lib-services-framework Patch
@powersync/lib-service-mongodb Patch
@powersync/service-jpgwire Patch
@powersync/service-jsonbig Patch
@powersync/service-schema Patch
@powersync/service-types Patch
@powersync/service-image Patch
test-client Patch

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

Copy link
Contributor

@rkistner rkistner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did an initial review on the tests and implementation, and it looks great!

Some requests for more tests:

  1. Error condition - selecting multiple columns in a subquery: WHERE ... IN (SELECT a, b)
  2. IN on parameter data: SELECT * FROM comments WHERE issue_id IN (subscription.parameters() -> 'issue_ids')
  3. IN on parameter data AND table - is this supported? If not, just test the error condition: SELECT * FROM comments WHERE issue_id IN (SELECT id FROM issues WHERE owner_id = request.user_id()) AND label IN (subscription.parameters() -> 'labels')

On the syntax, I think we just need to finalize the names of the request/stream parameters before merging. I'd also recommend removing the subscription_parameters virtual table, and also removing the token_parameters from streams, in favor of only using the parameter functions.

});

test('negated subquery from outer not operator', () => {
const [_, errors] = syncStreamFromSql('s', 'select * from comments where not ()', options);
Copy link
Contributor

@rkistner rkistner Aug 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the expected result here? (looks like an incomplete test)

rkistner

This comment was marked as duplicate.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants