Skip to content

[MySQL] Extend replication keep alive mechanism. #314

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 72 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
72 commits
Select commit Hold shift + click to select a range
e5db119
Removed zongji type mappings which are now provided by the Zongji pac…
Rentacookie May 21, 2025
d03360e
Moved most of the binlog event handling logic to a separate BinlogLis…
Rentacookie May 21, 2025
924ecd8
Updated the BinLogStream to use the new BinLogListener
Rentacookie May 21, 2025
404dcde
Renamed BinlogListener to BinLogListener
Rentacookie May 21, 2025
b1b8c30
Added changeset
Rentacookie May 21, 2025
16c1235
Merge branch 'main' into mysql-binlog-backpressure-handling
Rentacookie May 21, 2025
126f9b3
Simplified BinLogListener stopping mechanism
Rentacookie May 22, 2025
a03260f
Corrected BinLogListener name.
Rentacookie May 26, 2025
e147318
Supply port for binlog listener connections.
Rentacookie May 27, 2025
999a8dc
Only set up binlog heartbeat once the listener is fully started up.
Rentacookie May 27, 2025
782b43c
Merge branch 'main' into mysql-binlog-backpressure-handling
Rentacookie May 27, 2025
07201e8
Updated changeset
Rentacookie May 27, 2025
079a2f5
Changed binlog backpressure mechanism to be based on processing queue…
Rentacookie May 29, 2025
59afb33
Merge branch 'main' into mysql-binlog-backpressure-handling
Rentacookie May 29, 2025
286ba16
Changed binlog backpressure mechanism to be based on processing queue…
Rentacookie May 29, 2025
9a00b8b
Added optional columns field to SourceEntityDescriptor
Rentacookie Jun 4, 2025
3aebffd
Cleanup unused imports
Rentacookie Jun 4, 2025
bf481c8
Ensure column values are preserved when available Report 0 storage me…
Rentacookie Jun 5, 2025
b673609
Added basic schema change handling for MySQL
Rentacookie Jun 5, 2025
f707e2b
Revert columns field addition to SourceEntityDescriptor
Rentacookie Jun 18, 2025
74adb22
Added schema change handling for the MySQL binlog replication.
Rentacookie Jun 18, 2025
1270939
Include powersync core version in metrics metadata
Rentacookie Jun 18, 2025
7500bed
Code analysis cleanup
Rentacookie Jun 18, 2025
add2590
Merge branch 'main' into feat/mysql-schema-change-handling
Rentacookie Jun 18, 2025
54e6a9d
Merge conflicts
Rentacookie Jun 18, 2025
a5582b1
Fixed parser import
Rentacookie Jun 18, 2025
d34f8fa
Fixed mysql->sqlite rows parsing that would filter out columns with n…
Rentacookie Jun 25, 2025
fa327ae
Cleaned up SchemaChange handling in BinLogListener
Rentacookie Jun 25, 2025
1d1e945
Added schema change tests
Rentacookie Jun 25, 2025
dd0119a
Merge branch 'main' into feat/mysql-schema-change-handling
Rentacookie Jun 25, 2025
ce8cb9c
Change binlog event receive log message to debug
Rentacookie Jun 25, 2025
2411f21
Revert and fix mysql->sqlite row conversion for null value columns
Rentacookie Jun 25, 2025
cd8ef3e
Added conditional skip of mysql schema test for syntax that does not …
Rentacookie Jun 25, 2025
18e0865
Merge branch 'main' into feat/mysql-schema-change-handling
Rentacookie Jun 26, 2025
3adea04
Fixed version checking for mysql 5.7 incompatible test
Rentacookie Jun 26, 2025
79bd14e
Fix skip test on mysql 5.7 schema change
Rentacookie Jun 26, 2025
81b437f
Reverted mysql dev docker compose
Rentacookie Jun 26, 2025
b8e631b
Moved schema change handling to processing queue
Rentacookie Jun 30, 2025
ac96801
Fixed bug where multiple zongji listeners could be started if multipl…
Rentacookie Jul 1, 2025
301345c
Extended node-sql-parser type definitions
Rentacookie Jul 9, 2025
3898db7
- Simplified schema change types
Rentacookie Jul 9, 2025
a339ec8
Removed unused constant
Rentacookie Jul 9, 2025
f472dd6
Merge branch 'main' into feat/mysql-schema-change-handling
Rentacookie Jul 9, 2025
5df001b
Skip unsupported schema test for MySQL 5.7
Rentacookie Jul 9, 2025
57fcfec
Added error handling for zongji emitted schema errors
Rentacookie Jul 10, 2025
e624081
Added changeset
Rentacookie Jul 11, 2025
462e08d
Typo fixes from pr feedback
Rentacookie Jul 11, 2025
e9a6569
Merge branch 'main' into feat/mysql-schema-change-handling
Rentacookie Jul 11, 2025
1518358
Removed filters from mysql dev docker config
Rentacookie Jul 23, 2025
22c42a2
Added safeguard for gtid splitting when no transactions have been run…
Rentacookie Jul 23, 2025
6565b97
BinLog listener now correctly takes schema into account for replication.
Rentacookie Jul 23, 2025
113ae83
BinLog stream now correctly honors multiple schemas in the sync rules.
Rentacookie Jul 23, 2025
805b609
Added tests for multi schema support
Rentacookie Jul 23, 2025
01c078d
Merge branch 'main' into feat/mysql-schema-change-handling
Rentacookie Jul 23, 2025
6fb5be3
MySQL util fix post merge
Rentacookie Jul 23, 2025
541235b
Removed accidentally commited keepalive code in BinLogStream.
Rentacookie Jul 23, 2025
9c1d34b
Cleaned up Binlog docs and comments a bit
Rentacookie Jul 24, 2025
0f94500
Added keepalive support for MySQL replication.
Rentacookie Jul 29, 2025
78bf259
Merge branch 'main' into feat/mysql-schema-change-handling
Rentacookie Jul 29, 2025
4ee4ed6
Removed potentially spammy log entry.
Rentacookie Jul 29, 2025
9e7cc27
Merge branch 'feat/mysql-schema-change-handling' into mysql-replicati…
Rentacookie Jul 29, 2025
0f70543
Increased MySQL keepalive table timestamp granularity
Rentacookie Jul 29, 2025
94094bf
Merge branch 'main' into mysql-replication-keepalive-mechanism
Rentacookie Jul 29, 2025
db650da
Added check to skip MySQL keepalive table creation if present and cor…
Rentacookie Jul 30, 2025
79a168d
Added success check for mysql keepalive
Rentacookie Jul 30, 2025
75c4ca6
Reworked MySQL keepalive mechanism by hooking into heartbeat events o…
Rentacookie Aug 5, 2025
4f0315b
Merge branch 'main' into mysql-replication-keepalive-mechanism
Rentacookie Aug 5, 2025
76262ed
Bucket Keepalives now bump the syncrules keepalive timestamp even if …
Rentacookie Aug 6, 2025
ed871c2
Switched keepalive logs to debug
Rentacookie Aug 6, 2025
894a748
Added changeset
Rentacookie Aug 6, 2025
da7fd0e
Small cleanup of reverted code
Rentacookie Aug 6, 2025
62090fd
Merge branch 'main' into mysql-replication-keepalive-mechanism
Rentacookie Aug 7, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .changeset/wise-elephants-trade.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
'@powersync/service-module-mysql': minor
'@powersync/service-module-postgres-storage': patch
'@powersync/service-module-mongodb-storage': patch
'@powersync/service-core': patch
---

- Hooked up the MySQL binlog heartbeat events with the bucket batch keepalive mechanism.
Heartbeat events will now update the latest keepalive timestamp in the sync rules.
Original file line number Diff line number Diff line change
Expand Up @@ -807,7 +807,7 @@ export class MongoBucketBatch
}

async keepalive(lsn: string): Promise<boolean> {
if (this.last_checkpoint_lsn != null && lsn <= this.last_checkpoint_lsn) {
if (this.last_checkpoint_lsn != null && lsn < this.last_checkpoint_lsn) {
// No-op
return false;
}
Expand Down
2 changes: 1 addition & 1 deletion modules/module-mysql/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
"@powersync/service-sync-rules": "workspace:*",
"@powersync/service-types": "workspace:*",
"@powersync/service-jsonbig": "workspace:*",
"@powersync/mysql-zongji": "^0.4.0",
"@powersync/mysql-zongji": "^0.5.0",
"async": "^3.2.4",
"mysql2": "^3.11.0",
"node-sql-parser": "^5.3.9",
Expand Down
5 changes: 4 additions & 1 deletion modules/module-mysql/src/replication/BinLogReplicationJob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ export class BinLogReplicationJob extends replication.AbstractReplicationJob {
return this.options.storage.slot_name;
}

async keepAlive() {}
async keepAlive() {
// Keepalives are handled by the binlog heartbeat mechanism
}

async replicate() {
try {
Expand Down Expand Up @@ -56,6 +58,7 @@ export class BinLogReplicationJob extends replication.AbstractReplicationJob {
const connectionManager = this.connectionFactory.create({
// Pool connections are only used intermittently.
idleTimeout: 30_000,
connectionLimit: 2,

connectAttributes: {
// https://dev.mysql.com/doc/refman/8.0/en/performance-schema-connection-attribute-tables.html
Expand Down
9 changes: 7 additions & 2 deletions modules/module-mysql/src/replication/BinLogStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,6 @@ export class BinLogStream {
const fromGTID = checkpoint_lsn
? common.ReplicatedGTID.fromSerialized(checkpoint_lsn)
: await common.readExecutedGtid(connection);
const binLogPositionState = fromGTID.position;
connection.release();

if (!this.stopped) {
Expand All @@ -409,7 +408,7 @@ export class BinLogStream {
const binlogListener = new BinLogListener({
logger: this.logger,
sourceTables: this.syncRules.getSourceTables(),
startPosition: binLogPositionState,
startGTID: fromGTID,
connectionManager: this.connections,
serverId: serverId,
eventHandler: binlogEventHandler
Expand Down Expand Up @@ -455,6 +454,12 @@ export class BinLogStream {
tableEntry: tableMap
});
},
onKeepAlive: async (lsn: string) => {
const didCommit = await batch.keepalive(lsn);
if (didCommit) {
this.oldestUncommittedChange = null;
}
},
onCommit: async (lsn: string) => {
this.metrics.getCounter(ReplicationMetric.TRANSACTIONS_REPLICATED).add(1);
const didCommit = await batch.commit(lsn, { oldestUncommittedChange: this.oldestUncommittedChange });
Expand Down
100 changes: 72 additions & 28 deletions modules/module-mysql/src/replication/zongji/BinLogListener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ import { TablePattern } from '@powersync/service-sync-rules';

const { Parser } = pkg;

/**
* Seconds of inactivity after which a keepalive event is sent by the MySQL server.
*/
export const KEEPALIVE_INACTIVITY_THRESHOLD = 30;
export type Row = Record<string, any>;

/**
Expand Down Expand Up @@ -65,15 +69,17 @@ export interface BinLogEventHandler {
onDelete: (rows: Row[], tableMap: TableMapEntry) => Promise<void>;
onCommit: (lsn: string) => Promise<void>;
onSchemaChange: (change: SchemaChange) => Promise<void>;
onKeepAlive: (lsn: string) => Promise<void>;
}

export interface BinLogListenerOptions {
connectionManager: MySQLConnectionManager;
eventHandler: BinLogEventHandler;
sourceTables: TablePattern[];
serverId: number;
startPosition: common.BinLogPosition;
startGTID: common.ReplicatedGTID;
logger?: Logger;
keepAliveInactivitySeconds?: number;
}

/**
Expand All @@ -85,16 +91,19 @@ export class BinLogListener {
private connectionManager: MySQLConnectionManager;
private eventHandler: BinLogEventHandler;
private binLogPosition: common.BinLogPosition;
private currentGTID: common.ReplicatedGTID | null;
private currentGTID: common.ReplicatedGTID;
private logger: Logger;
private listenerError: Error | null;
private databaseFilter: { [schema: string]: (table: string) => boolean };

private isStopped: boolean = false;
private isStopping: boolean = false;

// Flag to indicate if are currently in a transaction that involves multiple row mutation events.
private isTransactionOpen = false;
zongji: ZongJi;
processingQueue: async.QueueObject<BinLogEvent>;

isStopped: boolean = false;
isStopping: boolean = false;
/**
* The combined size in bytes of all the binlog events currently in the processing queue.
*/
Expand All @@ -104,8 +113,8 @@ export class BinLogListener {
this.logger = options.logger ?? defaultLogger;
this.connectionManager = options.connectionManager;
this.eventHandler = options.eventHandler;
this.binLogPosition = options.startPosition;
this.currentGTID = null;
this.binLogPosition = options.startGTID.position;
this.currentGTID = options.startGTID;
this.sqlParser = new Parser();
this.processingQueue = this.createProcessingQueue();
this.zongji = this.createZongjiListener();
Expand All @@ -130,14 +139,13 @@ export class BinLogListener {
`${isRestart ? 'Restarting' : 'Starting'} BinLog Listener with replica client id:${this.options.serverId}...`
);

// Set a heartbeat interval for the Zongji replication connection
// Zongji does not explicitly handle the heartbeat events - they are categorized as event:unknown
// The heartbeat events are enough to keep the connection alive for setTimeout to work on the socket.
// Set a heartbeat interval for the Zongji replication connection, these events are enough to keep the connection
// alive for setTimeout to work on the socket.
// The heartbeat needs to be set before starting the listener, since the replication connection is locked once replicating
await new Promise((resolve, reject) => {
this.zongji.connection.query(
// In nanoseconds, 10^9 = 1s
'set @master_heartbeat_period=28*1000000000',
`set @master_heartbeat_period=${this.options.keepAliveInactivitySeconds ?? KEEPALIVE_INACTIVITY_THRESHOLD}*1000000000`,
(error: any, results: any, _fields: any) => {
if (error) {
reject(error);
Expand All @@ -158,9 +166,19 @@ export class BinLogListener {
});

this.zongji.start({
// We ignore the unknown/heartbeat event since it currently serves no purpose other than to keep the connection alive
// tablemap events always need to be included for the other row events to work
includeEvents: ['tablemap', 'writerows', 'updaterows', 'deleterows', 'xid', 'rotate', 'gtidlog', 'query'],
// Tablemap events always need to be included for the other row events to work
includeEvents: [
'tablemap',
'writerows',
'updaterows',
'deleterows',
'xid',
'rotate',
'gtidlog',
'query',
'heartbeat',
'heartbeat_v2'
],
includeSchema: this.databaseFilter,
filename: this.binLogPosition.filename,
position: this.binLogPosition.offset,
Expand Down Expand Up @@ -289,19 +307,24 @@ export class BinLogListener {
this.logger.info(`Processed GTID event: ${this.currentGTID.comparable}`);
break;
case zongji_utils.eventIsRotation(evt):
const newFile = this.binLogPosition.filename !== evt.binlogName;
// The first event when starting replication is a synthetic Rotate event
// It describes the last binlog file and position that the replica client processed
this.binLogPosition.filename = evt.binlogName;
this.binLogPosition.offset = evt.nextPosition !== 0 ? evt.nextPosition : evt.position;
await this.eventHandler.onRotate();

const newFile = this.binLogPosition.filename !== evt.binlogName;
if (newFile) {
this.logger.info(
`Processed Rotate event. New BinLog file is: ${this.binLogPosition.filename}:${this.binLogPosition.offset}`
);
}

break;
case zongji_utils.eventIsWriteMutation(evt):
const tableMap = evt.tableMap[evt.tableId];
await this.eventHandler.onWrite(evt.rows, tableMap);
this.binLogPosition.offset = evt.nextPosition;
this.logger.info(
`Processed Write event for table [${tableMap.parentSchema}.${tableMap.tableName}]. ${evt.rows.length} row(s) inserted.`
);
Expand All @@ -312,20 +335,33 @@ export class BinLogListener {
evt.rows.map((row) => row.before),
evt.tableMap[evt.tableId]
);
this.binLogPosition.offset = evt.nextPosition;
this.logger.info(
`Processed Update event for table [${evt.tableMap[evt.tableId].tableName}]. ${evt.rows.length} row(s) updated.`
);
break;
case zongji_utils.eventIsDeleteMutation(evt):
await this.eventHandler.onDelete(evt.rows, evt.tableMap[evt.tableId]);
this.binLogPosition.offset = evt.nextPosition;
this.logger.info(
`Processed Delete event for table [${evt.tableMap[evt.tableId].tableName}]. ${evt.rows.length} row(s) deleted.`
);
break;
case zongji_utils.eventIsHeartbeat(evt):
case zongji_utils.eventIsHeartbeat_v2(evt):
// Heartbeats are sent by the master to keep the connection alive after a period of inactivity. They are synthetic
// so are not written to the binlog. Consequently, they have no effect on the binlog position.
// We forward these along with the current GTID to the event handler, but don't want to do this if a transaction is in progress.
if (!this.isTransactionOpen) {
await this.eventHandler.onKeepAlive(this.currentGTID.comparable);
}
this.logger.debug(`Processed Heartbeat event. Current GTID is: ${this.currentGTID.comparable}`);
break;
case zongji_utils.eventIsXid(evt):
this.isTransactionOpen = false;
this.binLogPosition.offset = evt.nextPosition;
const LSN = new common.ReplicatedGTID({
raw_gtid: this.currentGTID!.raw,
raw_gtid: this.currentGTID.raw,
position: this.binLogPosition
}).comparable;
await this.eventHandler.onCommit(LSN);
Expand All @@ -336,43 +372,44 @@ export class BinLogListener {
break;
}

// Update the binlog position after processing the event
this.binLogPosition.offset = evt.nextPosition;
this.queueMemoryUsage -= evt.size;
};
}

private async processQueryEvent(event: BinLogQueryEvent): Promise<void> {
const { query, nextPosition } = event;

// BEGIN query events mark the start of a transaction before any row events. They are not relevant for schema changes
// BEGIN query events mark the start of a transaction before any row events. They are not schema changes so no further parsing is necessary.
if (query === 'BEGIN') {
this.isTransactionOpen = true;
return;
}

const schemaChanges = this.toSchemaChanges(query, event.schema);
if (schemaChanges.length > 0) {
// Since handling the schema changes can take a long time, we need to stop the Zongji listener instead of pausing it.
// Handling schema changes can take a long time, so we stop the Zongji listener whilst handling them to prevent the listener from timing out.
await this.stopZongji();

for (const change of schemaChanges) {
this.logger.info(`Processing schema change ${change.type} for table [${change.schema}.${change.table}]`);
await this.eventHandler.onSchemaChange(change);
}

// DDL queries are auto commited, but do not come with a corresponding Xid event.
// This is problematic for DDL queries which result in row events because the checkpoint is not moved on,
// so we manually commit here.
this.binLogPosition.offset = nextPosition;
const LSN = new common.ReplicatedGTID({
raw_gtid: this.currentGTID!.raw,
position: this.binLogPosition
}).comparable;
await this.eventHandler.onCommit(LSN);
// DDL queries are auto commited, but do not come with a corresponding Xid event, in those cases we trigger a manual commit if we are not already in a transaction.
// Some DDL queries include row events, and in those cases will include a Xid event.
if (!this.isTransactionOpen) {
this.binLogPosition.offset = nextPosition;
const LSN = new common.ReplicatedGTID({
raw_gtid: this.currentGTID.raw,
position: this.binLogPosition
}).comparable;
await this.eventHandler.onCommit(LSN);
}

this.logger.info(`Successfully processed ${schemaChanges.length} schema change(s).`);

// If there are still events in the processing queue, we need to process those before restarting Zongji
// This avoids potentially processing the same events again after a restart.
if (!this.processingQueue.idle()) {
this.logger.info(`Processing [${this.processingQueue.length()}] events(s) before resuming...`);
this.processingQueue.drain(async () => {
Expand All @@ -381,6 +418,13 @@ export class BinLogListener {
} else {
await this.restartZongji();
}
} else if (!this.isTransactionOpen) {
this.binLogPosition.offset = nextPosition;
const LSN = new common.ReplicatedGTID({
raw_gtid: this.currentGTID.raw,
position: this.binLogPosition
}).comparable;
await this.eventHandler.onCommit(LSN);
}
}

Expand Down
12 changes: 11 additions & 1 deletion modules/module-mysql/src/replication/zongji/zongji-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import {
BinLogTableMapEvent,
BinLogRowUpdateEvent,
BinLogXidEvent,
BinLogQueryEvent
BinLogQueryEvent,
BinLogHeartbeatEvent,
BinLogHeartbeatEvent_V2
} from '@powersync/mysql-zongji';

export function eventIsGTIDLog(event: BinLogEvent): event is BinLogGTIDLogEvent {
Expand All @@ -21,6 +23,14 @@ export function eventIsXid(event: BinLogEvent): event is BinLogXidEvent {
return event.getEventName() == 'xid';
}

export function eventIsHeartbeat(event: BinLogEvent): event is BinLogHeartbeatEvent {
return event.getEventName() == 'heartbeat';
}

export function eventIsHeartbeat_v2(event: BinLogEvent): event is BinLogHeartbeatEvent_V2 {
return event.getEventName() == 'heartbeat_v2';
}

export function eventIsRotation(event: BinLogEvent): event is BinLogRotationEvent {
return event.getEventName() == 'rotate';
}
Expand Down
Loading