diff --git a/README.md b/README.md index 407e27a..d17ce52 100644 --- a/README.md +++ b/README.md @@ -84,6 +84,14 @@ kick off from a specific filename or position, use `client.setBinlogFilename(fil > `client.connect()` is blocking (meaning that client will listen for events in the current thread). `client.connect(timeout)`, on the other hand, spawns a separate thread. +> Note difference between MariaDB and MySQL +``` +BinaryLogClient client = new MariadbBinaryLogClient("hostname", 3306, "username", "password"); +// ... as same as BinaryLogClient +``` +> `client.setGtidSet(gtid)` meaning that client kick off from a specific gtid, MariaDB also support. +> `client.setUseSendAnnotateRowsEvent(true)` meaning that client will send annotate rows events(describe the query which caused the row event), and 'false' by default + #### Controlling event deserialization > You might need it for several reasons: diff --git a/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java b/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java index d8b8299..b7b3b23 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java @@ -15,11 +15,14 @@ */ package com.github.shyiko.mysql.binlog; +import com.github.shyiko.mysql.binlog.event.AnnotateRowsEventData; import com.github.shyiko.mysql.binlog.event.Event; import com.github.shyiko.mysql.binlog.event.EventHeader; import com.github.shyiko.mysql.binlog.event.EventHeaderV4; import com.github.shyiko.mysql.binlog.event.EventType; import com.github.shyiko.mysql.binlog.event.GtidEventData; +import com.github.shyiko.mysql.binlog.event.MariadbGtidEventData; +import com.github.shyiko.mysql.binlog.event.MariadbGtidListEventData; import com.github.shyiko.mysql.binlog.event.QueryEventData; import com.github.shyiko.mysql.binlog.event.RotateEventData; import com.github.shyiko.mysql.binlog.event.deserialization.ChecksumType; @@ -46,9 +49,6 @@ import com.github.shyiko.mysql.binlog.network.protocol.Packet; import com.github.shyiko.mysql.binlog.network.protocol.PacketChannel; import com.github.shyiko.mysql.binlog.network.protocol.ResultSetRowPacket; -import com.github.shyiko.mysql.binlog.network.protocol.command.AuthenticateNativePasswordCommand; -import com.github.shyiko.mysql.binlog.network.protocol.command.AuthenticateSHA2Command; -import com.github.shyiko.mysql.binlog.network.protocol.command.AuthenticateSecurityPasswordCommand; import com.github.shyiko.mysql.binlog.network.protocol.command.Command; import com.github.shyiko.mysql.binlog.network.protocol.command.DumpBinaryLogCommand; import com.github.shyiko.mysql.binlog.network.protocol.command.DumpBinaryLogGtidCommand; @@ -135,8 +135,8 @@ public X509Certificate[] getAcceptedIssuers() { private volatile long connectionId; private SSLMode sslMode = SSLMode.DISABLED; - private GtidSet gtidSet; - private final Object gtidSetAccessLock = new Object(); + protected GtidSet gtidSet; + protected final Object gtidSetAccessLock = new Object(); private boolean gtidSetFallbackToPurged; private boolean useBinlogFilenamePositionInGtidMode; private String gtid; @@ -150,7 +150,7 @@ public X509Certificate[] getAcceptedIssuers() { private SocketFactory socketFactory; private SSLSocketFactory sslSocketFactory; - private volatile PacketChannel channel; + protected volatile PacketChannel channel; private volatile boolean connected; private volatile long masterServerId = -1; @@ -335,10 +335,14 @@ public void setGtidSet(String gtidSet) { this.binlogFilename = ""; } synchronized (gtidSetAccessLock) { - this.gtidSet = gtidSet != null ? new GtidSet(gtidSet) : null; + this.gtidSet = gtidSet != null ? buildGtidSet(gtidSet) : null; } } + protected GtidSet buildGtidSet(String gtidSet) { + return new GtidSet(gtidSet); + } + /** * @see #setGtidSetFallbackToPurged(boolean) * @return whether gtid_purged is used as a fallback @@ -539,11 +543,7 @@ public void connect() throws IOException, IllegalStateException { connectionId = greetingPacket.getThreadId(); if ("".equals(binlogFilename)) { - synchronized (gtidSetAccessLock) { - if (gtidSet != null && "".equals(gtidSet.toString()) && gtidSetFallbackToPurged) { - gtidSet = new GtidSet(fetchGtidPurged()); - } - } + setupGtidSet(); } if (binlogFilename == null) { fetchBinlogFilenameAndPosition(); @@ -592,8 +592,7 @@ public void connect() throws IOException, IllegalStateException { ensureEventDataDeserializer(EventType.ROTATE, RotateEventDataDeserializer.class); synchronized (gtidSetAccessLock) { if (gtidSet != null) { - ensureEventDataDeserializer(EventType.GTID, GtidEventDataDeserializer.class); - ensureEventDataDeserializer(EventType.QUERY, QueryEventDataDeserializer.class); + ensureGtidEventDataDeserializer(); } } listenForEventPackets(); @@ -666,7 +665,7 @@ public Object call() throws Exception { }; } - private void checkError(byte[] packet) throws IOException { + protected void checkError(byte[] packet) throws IOException { if (packet[0] == (byte) 0xFF /* error */) { byte[] bytes = Arrays.copyOfRange(packet, 1, packet.length); ErrorPacket errorPacket = new ErrorPacket(bytes); @@ -710,7 +709,6 @@ private boolean tryUpgradeToSSL(GreetingPacket greetingPacket) throws IOExceptio return false; } - private void enableHeartbeat() throws IOException { channel.write(new QueryCommand("set @master_heartbeat_period=" + heartbeatInterval * 1000000)); byte[] statementResult = channel.read(); @@ -725,7 +723,7 @@ private void setMasterServerId() throws IOException { } } - private void requestBinaryLogStream() throws IOException { + protected void requestBinaryLogStream() throws IOException { long serverId = blocking ? this.serverId : 0; // http://bugs.mysql.com/bug.php?id=71178 Command dumpBinaryLogCommand; synchronized (gtidSetAccessLock) { @@ -741,7 +739,7 @@ private void requestBinaryLogStream() throws IOException { channel.write(dumpBinaryLogCommand); } - private void ensureEventDataDeserializer(EventType eventType, + protected void ensureEventDataDeserializer(EventType eventType, Class eventDataDeserializerClass) { EventDataDeserializer eventDataDeserializer = eventDeserializer.getEventDataDeserializer(eventType); if (eventDataDeserializer.getClass() != eventDataDeserializerClass && @@ -758,6 +756,10 @@ private void ensureEventDataDeserializer(EventType eventType, } } + protected void ensureGtidEventDataDeserializer() { + ensureEventDataDeserializer(EventType.GTID, GtidEventDataDeserializer.class); + ensureEventDataDeserializer(EventType.QUERY, QueryEventDataDeserializer.class); + } private void spawnKeepAliveThread() { final ExecutorService threadExecutor = @@ -902,6 +904,14 @@ private String fetchGtidPurged() throws IOException { return ""; } + protected void setupGtidSet() throws IOException{ + synchronized (gtidSetAccessLock) { + if (gtidSet != null && "".equals(gtidSet.toString()) && gtidSetFallbackToPurged) { + gtidSet = new GtidSet(fetchGtidPurged()); + } + } + } + private void fetchBinlogFilenameAndPosition() throws IOException { ResultSetRowPacket[] resultSet; channel.write(new QueryCommand("show master status")); @@ -1003,7 +1013,7 @@ private byte[] readPacketSplitInChunks(ByteArrayInputStream inputStream, int pac return result; } - private void updateClientBinlogFilenameAndPosition(Event event) { + protected void updateClientBinlogFilenameAndPosition(Event event) { EventHeader eventHeader = event.getHeader(); EventType eventType = eventHeader.getEventType(); if (eventType == EventType.ROTATE) { @@ -1022,7 +1032,7 @@ private void updateClientBinlogFilenameAndPosition(Event event) { } } - private void updateGtidSet(Event event) { + protected void updateGtidSet(Event event) { synchronized (gtidSetAccessLock) { if (gtidSet == null) { return; @@ -1034,6 +1044,15 @@ private void updateGtidSet(Event event) { GtidEventData gtidEventData = (GtidEventData) EventDataWrapper.internal(event.getData()); gtid = gtidEventData.getGtid(); break; + case MARIADB_GTID: + MariadbGtidEventData mariadbGtidEventData = (MariadbGtidEventData) EventDataWrapper.internal(event.getData()); + mariadbGtidEventData.setServerId(eventHeader.getServerId()); + gtid = mariadbGtidEventData.toString(); + break; + case MARIADB_GTID_LIST: + MariadbGtidListEventData mariadbGtidListEventData = (MariadbGtidListEventData) EventDataWrapper.internal(event.getData()); + gtid = mariadbGtidListEventData.getMariaGTIDSet().toString(); + break; case XID: commitGtid(); tx = false; @@ -1044,21 +1063,34 @@ private void updateGtidSet(Event event) { if (sql == null) { break; } - if ("BEGIN".equals(sql)) { - tx = true; - } else - if ("COMMIT".equals(sql) || "ROLLBACK".equals(sql)) { - commitGtid(); - tx = false; - } else - if (!tx) { - // auto-commit query, likely DDL - commitGtid(); + commitGtid(sql); + break; + case ANNOTATE_ROWS: + AnnotateRowsEventData annotateRowsEventData = (AnnotateRowsEventData) EventDeserializer.EventDataWrapper.internal(event.getData()); + sql = annotateRowsEventData.getRowsQuery(); + if (sql == null) { + break; } + commitGtid(sql); + break; default: } } + protected void commitGtid(String sql) { + if ("BEGIN".equals(sql)) { + tx = true; + } else + if ("COMMIT".equals(sql) || "ROLLBACK".equals(sql)) { + commitGtid(); + tx = false; + } else + if (!tx) { + // auto-commit query, likely DDL + commitGtid(); + } + } + private void commitGtid() { if (gtid != null) { synchronized (gtidSetAccessLock) { diff --git a/src/main/java/com/github/shyiko/mysql/binlog/MariadbBinaryLogClient.java b/src/main/java/com/github/shyiko/mysql/binlog/MariadbBinaryLogClient.java new file mode 100644 index 0000000..0c6740d --- /dev/null +++ b/src/main/java/com/github/shyiko/mysql/binlog/MariadbBinaryLogClient.java @@ -0,0 +1,85 @@ +package com.github.shyiko.mysql.binlog; + +import com.github.shyiko.mysql.binlog.event.EventType; +import com.github.shyiko.mysql.binlog.event.deserialization.AnnotateRowsEventDataDeserializer; +import com.github.shyiko.mysql.binlog.event.deserialization.MariadbGtidEventDataDeserializer; +import com.github.shyiko.mysql.binlog.event.deserialization.MariadbGtidListEventDataDeserializer; +import com.github.shyiko.mysql.binlog.network.protocol.command.Command; +import com.github.shyiko.mysql.binlog.network.protocol.command.DumpBinaryLogCommand; +import com.github.shyiko.mysql.binlog.network.protocol.command.QueryCommand; + +import java.io.IOException; + +/** + * Mariadb replication stream client. + * + * @author Winger + */ +public class MariadbBinaryLogClient extends BinaryLogClient { + + private boolean useSendAnnotateRowsEvent; + + public MariadbBinaryLogClient(String username, String password) { + super(username, password); + } + + public MariadbBinaryLogClient(String schema, String username, String password) { + super(schema, username, password); + } + + public MariadbBinaryLogClient(String hostname, int port, String username, String password) { + super(hostname, port, username, password); + } + + public MariadbBinaryLogClient(String hostname, int port, String schema, String username, String password) { + super(hostname, port, schema, username, password); + } + + @Override + protected GtidSet buildGtidSet(String gtidSet) { + return new MariadbGtidSet(gtidSet); + } + + @Override + protected void setupGtidSet() throws IOException { + //Mariadb ignore + } + + @Override + protected void requestBinaryLogStream() throws IOException { + long serverId = isBlocking() ? this.getServerId() : 0; // http://bugs.mysql.com/bug.php?id=71178 + Command dumpBinaryLogCommand; + synchronized (gtidSetAccessLock) { + if (gtidSet != null) { + channel.write(new QueryCommand("SET @mariadb_slave_capability=4")); + checkError(channel.read()); + channel.write(new QueryCommand("SET @slave_connect_state = '" + gtidSet.toString() + "'")); + checkError(channel.read()); + channel.write(new QueryCommand("SET @slave_gtid_strict_mode = 0")); + checkError(channel.read()); + channel.write(new QueryCommand("SET @slave_gtid_ignore_duplicates = 0")); + checkError(channel.read()); + dumpBinaryLogCommand = new DumpBinaryLogCommand(serverId, "", 0L, isUseSendAnnotateRowsEvent()); + + } else { + dumpBinaryLogCommand = new DumpBinaryLogCommand(serverId, getBinlogFilename(), getBinlogPosition()); + } + } + channel.write(dumpBinaryLogCommand); + } + + @Override + protected void ensureGtidEventDataDeserializer() { + ensureEventDataDeserializer(EventType.ANNOTATE_ROWS, AnnotateRowsEventDataDeserializer.class); + ensureEventDataDeserializer(EventType.MARIADB_GTID, MariadbGtidEventDataDeserializer.class); + ensureEventDataDeserializer(EventType.MARIADB_GTID_LIST, MariadbGtidListEventDataDeserializer.class); + } + + public boolean isUseSendAnnotateRowsEvent() { + return useSendAnnotateRowsEvent; + } + + public void setUseSendAnnotateRowsEvent(boolean useSendAnnotateRowsEvent) { + this.useSendAnnotateRowsEvent = useSendAnnotateRowsEvent; + } +} diff --git a/src/main/java/com/github/shyiko/mysql/binlog/MariadbGtidSet.java b/src/main/java/com/github/shyiko/mysql/binlog/MariadbGtidSet.java new file mode 100644 index 0000000..c86ee54 --- /dev/null +++ b/src/main/java/com/github/shyiko/mysql/binlog/MariadbGtidSet.java @@ -0,0 +1,158 @@ +package com.github.shyiko.mysql.binlog; + +import java.util.*; + +/** + * Mariadb Global Transaction ID + * + * @author Winger + * @see GTID for the original doc + */ +public class MariadbGtidSet extends GtidSet { + + private Map map = new HashMap<>(); + + public MariadbGtidSet() { + super(null); // + } + + public MariadbGtidSet(String gtidSet) { + super(null); + if (gtidSet != null && gtidSet.length() > 0) { + String[] gtids = gtidSet.replaceAll("\n", "").split(","); + for (String gtid : gtids) { + MariaGtid mariaGtid = MariaGtid.parse(gtid); + map.put(mariaGtid.getDomainId(), mariaGtid); + } + } + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + for (MariaGtid gtid : map.values()) { + if (sb.length() > 0) { + sb.append(","); + } + sb.append(gtid.toString()); + } + return sb.toString(); + } + + @Override + public Collection getUUIDSets() { + throw new UnsupportedOperationException("Mariadb gtid not support this method"); + } + + @Override + public UUIDSet getUUIDSet(String uuid) { + throw new UnsupportedOperationException("Mariadb gtid not support this method"); + } + + @Override + public UUIDSet putUUIDSet(UUIDSet uuidSet) { + throw new UnsupportedOperationException("Mariadb gtid not support this method"); + } + + @Override + public boolean add(String gtid) { + MariaGtid mariaGtid = MariaGtid.parse(gtid); + map.put(mariaGtid.getDomainId(), mariaGtid); + return true; + } + + public void add(MariaGtid gtid) { + map.put(gtid.getDomainId(), gtid); + } + + @Override + public boolean isContainedWithin(GtidSet other) { + throw new UnsupportedOperationException("Mariadb gtid not support this method"); + } + + @Override + public int hashCode() { + return map.keySet().hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + if (obj instanceof MariadbGtidSet) { + MariadbGtidSet that = (MariadbGtidSet) obj; + return this.map.equals(that.map); + } + return false; + } + + public static class MariaGtid { + + // {domainId}-{serverId}-{sequence} + private long domainId; + private long serverId; + private long sequence; + + public MariaGtid(long domainId, long serverId, long sequence) { + this.domainId = domainId; + this.serverId = serverId; + this.sequence = sequence; + } + + public MariaGtid(String gtid) { + String[] gtidArr = gtid.split("-"); + this.domainId = Long.parseLong(gtidArr[0]); + this.serverId = Long.parseLong(gtidArr[1]); + this.sequence = Long.parseLong(gtidArr[2]); + } + + public static MariaGtid parse(String gtid) { + return new MariaGtid(gtid); + } + + public long getDomainId() { + return domainId; + } + + public void setDomainId(long domainId) { + this.domainId = domainId; + } + + public long getServerId() { + return serverId; + } + + public void setServerId(long serverId) { + this.serverId = serverId; + } + + public long getSequence() { + return sequence; + } + + public void setSequence(long sequence) { + this.sequence = sequence; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + MariaGtid mariaGtid = (MariaGtid) o; + return domainId == mariaGtid.domainId && + serverId == mariaGtid.serverId && + sequence == mariaGtid.sequence; + } + + @Override + public String toString() { + return String.format("%s-%s-%s", domainId, serverId, sequence); + } + } +} + diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/AnnotateRowsEventData.java b/src/main/java/com/github/shyiko/mysql/binlog/event/AnnotateRowsEventData.java new file mode 100644 index 0000000..85fa79e --- /dev/null +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/AnnotateRowsEventData.java @@ -0,0 +1,31 @@ +package com.github.shyiko.mysql.binlog.event; + +/** + * Mariadb ANNOTATE_ROWS_EVENT events accompany row events and describe the query which caused the row event + * Enable this with --binlog-annotate-row-events (default on from MariaDB 10.2.4). + * In the binary log, each Annotate_rows event precedes the corresponding Table map event. + * Note the master server sends ANNOTATE_ROWS_EVENT events only if the Slave server connects + * with the BINLOG_SEND_ANNOTATE_ROWS_EVENT flag (value is 2) in the COM_BINLOG_DUMP Slave Registration phase. + * + * @author Winger + * @see ANNOTATE_ROWS_EVENT for the original doc + */ +public class AnnotateRowsEventData implements EventData { + + private String rowsQuery; + + public String getRowsQuery() { + return rowsQuery; + } + + public void setRowsQuery(String rowsQuery) { + this.rowsQuery = rowsQuery; + } + + @Override + public String toString() { + return "AnnotateRowsEventData{" + + "rowsQuery='" + rowsQuery + '\'' + + '}'; + } +} diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/EventType.java b/src/main/java/com/github/shyiko/mysql/binlog/event/EventType.java index 7f94ccf..fb1fcaa 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/event/EventType.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/EventType.java @@ -16,99 +16,99 @@ package com.github.shyiko.mysql.binlog.event; /** + * @author Stanley Shyiko * @see Event Meanings for the original * documentation. - * @author Stanley Shyiko */ public enum EventType { /** * Events of this event type should never occur. Not written to a binary log. */ - UNKNOWN, + UNKNOWN(0), /** * A descriptor event that is written to the beginning of the each binary log file. (In MySQL 4.0 and 4.1, * this event is written only to the first binary log file that the server creates after startup.) This event is * used in MySQL 3.23 through 4.1 and superseded in MySQL 5.0 by {@link #FORMAT_DESCRIPTION}. */ - START_V3, + START_V3(1), /** * Written when an updating statement is done. */ - QUERY, + QUERY(2), /** * Written when mysqld stops. */ - STOP, + STOP(3), /** * Written when mysqld switches to a new binary log file. This occurs when someone issues a FLUSH LOGS statement or * the current binary log file becomes larger than max_binlog_size. */ - ROTATE, + ROTATE(4), /** * Written every time a statement uses an AUTO_INCREMENT column or the LAST_INSERT_ID() function; precedes other * events for the statement. This is written only before a {@link #QUERY} and is not used in case of RBR. */ - INTVAR, + INTVAR(5), /** * Used for LOAD DATA INFILE statements in MySQL 3.23. */ - LOAD, + LOAD(6), /** * Not used. */ - SLAVE, + SLAVE(7), /** * Used for LOAD DATA INFILE statements in MySQL 4.0 and 4.1. */ - CREATE_FILE, + CREATE_FILE(8), /** * Used for LOAD DATA INFILE statements as of MySQL 4.0. */ - APPEND_BLOCK, + APPEND_BLOCK(9), /** * Used for LOAD DATA INFILE statements in 4.0 and 4.1. */ - EXEC_LOAD, + EXEC_LOAD(10), /** * Used for LOAD DATA INFILE statements as of MySQL 4.0. */ - DELETE_FILE, + DELETE_FILE(11), /** * Used for LOAD DATA INFILE statements in MySQL 4.0 and 4.1. */ - NEW_LOAD, + NEW_LOAD(12), /** * Written every time a statement uses the RAND() function; precedes other events for the statement. Indicates the * seed values to use for generating a random number with RAND() in the next statement. This is written only * before a {@link #QUERY} and is not used in case of RBR. */ - RAND, + RAND(13), /** * Written every time a statement uses a user variable; precedes other events for the statement. Indicates the * value to use for the user variable in the next statement. This is written only before a {@link #QUERY} and * is not used in case of RBR. */ - USER_VAR, + USER_VAR(14), /** * A descriptor event that is written to the beginning of the each binary log file. * This event is used as of MySQL 5.0; it supersedes {@link #START_V3}. */ - FORMAT_DESCRIPTION, + FORMAT_DESCRIPTION(15), /** * Generated for a commit of a transaction that modifies one or more tables of an XA-capable storage engine. * Normal transactions are implemented by sending a {@link #QUERY} containing a BEGIN statement and a {@link #QUERY} * containing a COMMIT statement (or a ROLLBACK statement if the transaction is rolled back). */ - XID, + XID(16), /** * Used for LOAD DATA INFILE statements as of MySQL 5.0. */ - BEGIN_LOAD_QUERY, + BEGIN_LOAD_QUERY(17), /** * Used for LOAD DATA INFILE statements as of MySQL 5.0. */ - EXECUTE_LOAD_QUERY, + EXECUTE_LOAD_QUERY(18), /** * This event precedes each row operation event. It maps a table definition to a number, where the table definition * consists of database and table names and column definitions. The purpose of this event is to enable replication @@ -117,105 +117,128 @@ public enum EventType { * of TABLE_MAP events: one per table used by events in the sequence. * Used in case of RBR. */ - TABLE_MAP, + TABLE_MAP(19), /** * Describes inserted rows (within a single table). * Used in case of RBR (5.1.0 - 5.1.15). */ - PRE_GA_WRITE_ROWS, + PRE_GA_WRITE_ROWS(20), /** * Describes updated rows (within a single table). * Used in case of RBR (5.1.0 - 5.1.15). */ - PRE_GA_UPDATE_ROWS, + PRE_GA_UPDATE_ROWS(21), /** * Describes deleted rows (within a single table). * Used in case of RBR (5.1.0 - 5.1.15). */ - PRE_GA_DELETE_ROWS, + PRE_GA_DELETE_ROWS(22), /** * Describes inserted rows (within a single table). * Used in case of RBR (5.1.16 - mysql-trunk). */ - WRITE_ROWS, + WRITE_ROWS(23), /** * Describes updated rows (within a single table). * Used in case of RBR (5.1.16 - mysql-trunk). */ - UPDATE_ROWS, + UPDATE_ROWS(24), /** * Describes deleted rows (within a single table). * Used in case of RBR (5.1.16 - mysql-trunk). */ - DELETE_ROWS, + DELETE_ROWS(25), /** * Used to log an out of the ordinary event that occurred on the master. It notifies the slave that something * happened on the master that might cause data to be in an inconsistent state. */ - INCIDENT, + INCIDENT(26), /** * Sent by a master to a slave to let the slave know that the master is still alive. Not written to a binary log. */ - HEARTBEAT, + HEARTBEAT(27), /** * In some situations, it is necessary to send over ignorable data to the slave: data that a slave can handle in * case there is code for handling it, but which can be ignored if it is not recognized. */ - IGNORABLE, + IGNORABLE(28), /** * Introduced to record the original query for rows events in RBR. */ - ROWS_QUERY, + ROWS_QUERY(29), /** * Describes inserted rows (within a single table). * Used in case of RBR (5.1.18+). */ - EXT_WRITE_ROWS, + EXT_WRITE_ROWS(30), /** * Describes updated rows (within a single table). * Used in case of RBR (5.1.18+). */ - EXT_UPDATE_ROWS, + EXT_UPDATE_ROWS(31), /** * Describes deleted rows (within a single table). * Used in case of RBR (5.1.18+). */ - EXT_DELETE_ROWS, + EXT_DELETE_ROWS(32), /** * Global Transaction Identifier. */ - GTID, - ANONYMOUS_GTID, - PREVIOUS_GTIDS, - TRANSACTION_CONTEXT, - VIEW_CHANGE, + GTID(33), + ANONYMOUS_GTID(34), + PREVIOUS_GTIDS(35), + TRANSACTION_CONTEXT(36), + VIEW_CHANGE(37), /** * Prepared XA transaction terminal event similar to XID except that it is specific to XA transaction. */ - XA_PREPARE; + XA_PREPARE(38), + + /** + * MariaDB Support Events + * + * @see Replication Protocol for the original doc. + */ + ANNOTATE_ROWS(160), // + MARIADB_GTID(162), + MARIADB_GTID_LIST(163); + + private final int eventNumber; + + EventType(int eventNumber) { + this.eventNumber = eventNumber; + } public static boolean isRowMutation(EventType eventType) { return EventType.isWrite(eventType) || - EventType.isUpdate(eventType) || - EventType.isDelete(eventType); + EventType.isUpdate(eventType) || + EventType.isDelete(eventType); } public static boolean isWrite(EventType eventType) { return eventType == PRE_GA_WRITE_ROWS || - eventType == WRITE_ROWS || - eventType == EXT_WRITE_ROWS; + eventType == WRITE_ROWS || + eventType == EXT_WRITE_ROWS; } public static boolean isUpdate(EventType eventType) { return eventType == PRE_GA_UPDATE_ROWS || - eventType == UPDATE_ROWS || - eventType == EXT_UPDATE_ROWS; + eventType == UPDATE_ROWS || + eventType == EXT_UPDATE_ROWS; } public static boolean isDelete(EventType eventType) { return eventType == PRE_GA_DELETE_ROWS || - eventType == DELETE_ROWS || - eventType == EXT_DELETE_ROWS; + eventType == DELETE_ROWS || + eventType == EXT_DELETE_ROWS; } + public static EventType byEventNumber(int num) { + for (EventType type : EventType.values()) { + if (type.eventNumber == num) { + return type; + } + } + return null; + } } diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/MariadbGtidEventData.java b/src/main/java/com/github/shyiko/mysql/binlog/event/MariadbGtidEventData.java new file mode 100644 index 0000000..54838d0 --- /dev/null +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/MariadbGtidEventData.java @@ -0,0 +1,43 @@ +package com.github.shyiko.mysql.binlog.event; + +/** + * MariaDB and MySQL have different GTID implementations, and that these are not compatible with each other. + * + * @author Winger + * @see GTID_EVENT for the original doc + */ +public class MariadbGtidEventData implements EventData { + + private long sequence; + private long domainId; + private long serverId; + + public long getSequence() { + return sequence; + } + + public void setSequence(long sequence) { + this.sequence = sequence; + } + + public long getDomainId() { + return domainId; + } + + public void setDomainId(long domainId) { + this.domainId = domainId; + } + + public long getServerId() { + return serverId; + } + + public void setServerId(long serverId) { + this.serverId = serverId; + } + + @Override + public String toString() { + return domainId + "-" + serverId + "-" + sequence; + } +} diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/MariadbGtidListEventData.java b/src/main/java/com/github/shyiko/mysql/binlog/event/MariadbGtidListEventData.java new file mode 100644 index 0000000..ae6e91f --- /dev/null +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/MariadbGtidListEventData.java @@ -0,0 +1,29 @@ +package com.github.shyiko.mysql.binlog.event; + +import com.github.shyiko.mysql.binlog.MariadbGtidSet; + +/** + * Logged in every binlog to record the current replication state + * + * @author Winger + * @see GTID_LIST_EVENT for the original doc + */ +public class MariadbGtidListEventData implements EventData { + + private MariadbGtidSet mariaGTIDSet; + + public MariadbGtidSet getMariaGTIDSet() { + return mariaGTIDSet; + } + + public void setMariaGTIDSet(MariadbGtidSet mariaGTIDSet) { + this.mariaGTIDSet = mariaGTIDSet; + } + + @Override + public String toString() { + return "MariadbGtidListEventData{" + + "mariaGTIDSet=" + mariaGTIDSet + + '}'; + } +} diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/AnnotateRowsEventDataDeserializer.java b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/AnnotateRowsEventDataDeserializer.java new file mode 100644 index 0000000..688a9b2 --- /dev/null +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/AnnotateRowsEventDataDeserializer.java @@ -0,0 +1,24 @@ +package com.github.shyiko.mysql.binlog.event.deserialization; + +import com.github.shyiko.mysql.binlog.event.AnnotateRowsEventData; +import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream; + +import java.io.IOException; + +/** + * Mariadb ANNOTATE_ROWS_EVENT Fields + *
+ *  string The SQL statement (not null-terminated)
+ * 
+ * + * @author Winger + */ +public class AnnotateRowsEventDataDeserializer implements EventDataDeserializer { + + @Override + public AnnotateRowsEventData deserialize(ByteArrayInputStream inputStream) throws IOException { + AnnotateRowsEventData event = new AnnotateRowsEventData(); + event.setRowsQuery(inputStream.readString(inputStream.available())); + return event; + } +} diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/EventDeserializer.java b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/EventDeserializer.java index cf93685..ffb3788 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/EventDeserializer.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/EventDeserializer.java @@ -120,6 +120,12 @@ private void registerDefaultEventDataDeserializers() { new PreviousGtidSetDeserializer()); eventDataDeserializers.put(EventType.XA_PREPARE, new XAPrepareEventDataDeserializer()); + eventDataDeserializers.put(EventType.ANNOTATE_ROWS, + new AnnotateRowsEventDataDeserializer()); + eventDataDeserializers.put(EventType.MARIADB_GTID, + new MariadbGtidEventDataDeserializer()); + eventDataDeserializers.put(EventType.MARIADB_GTID_LIST, + new MariadbGtidListEventDataDeserializer()); } public void setEventDataDeserializer(EventType eventType, EventDataDeserializer eventDataDeserializer) { diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/EventHeaderV4Deserializer.java b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/EventHeaderV4Deserializer.java index c0569f0..e953362 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/EventHeaderV4Deserializer.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/EventHeaderV4Deserializer.java @@ -26,8 +26,6 @@ */ public class EventHeaderV4Deserializer implements EventHeaderDeserializer { - private static final EventType[] EVENT_TYPES = EventType.values(); - @Override public EventHeaderV4 deserialize(ByteArrayInputStream inputStream) throws IOException { EventHeaderV4 header = new EventHeaderV4(); @@ -41,10 +39,8 @@ public EventHeaderV4 deserialize(ByteArrayInputStream inputStream) throws IOExce } private static EventType getEventType(int ordinal) { - if (ordinal >= EVENT_TYPES.length) { - return EventType.UNKNOWN; - } - return EVENT_TYPES[ordinal]; + EventType eventType = EventType.byEventNumber(ordinal); + return eventType == null ? EventType.UNKNOWN : eventType; } } diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/MariadbGtidEventDataDeserializer.java b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/MariadbGtidEventDataDeserializer.java new file mode 100644 index 0000000..c57e255 --- /dev/null +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/MariadbGtidEventDataDeserializer.java @@ -0,0 +1,33 @@ +package com.github.shyiko.mysql.binlog.event.deserialization; + +import com.github.shyiko.mysql.binlog.event.MariadbGtidEventData; +import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream; + +import java.io.IOException; + +/** + * Mariadb GTID_EVENT Fields + *
+ *     uint<8> GTID sequence
+ *     uint<4> Replication Domain ID
+ *     uint<1> Flags
+ *
+ * 	if flag & FL_GROUP_COMMIT_ID
+ * 	    uint<8> commit_id
+ * 	else
+ * 	    uint<6> 0
+ * 
+ * + * @author Winger + * @see GTID_EVENT for the original doc + */ +public class MariadbGtidEventDataDeserializer implements EventDataDeserializer { + @Override + public MariadbGtidEventData deserialize(ByteArrayInputStream inputStream) throws IOException { + MariadbGtidEventData event = new MariadbGtidEventData(); + event.setSequence(inputStream.readLong(8)); + event.setDomainId(inputStream.readInteger(4)); + // Flags ignore + return event; + } +} diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/MariadbGtidListEventDataDeserializer.java b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/MariadbGtidListEventDataDeserializer.java new file mode 100644 index 0000000..8a42953 --- /dev/null +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/MariadbGtidListEventDataDeserializer.java @@ -0,0 +1,39 @@ +package com.github.shyiko.mysql.binlog.event.deserialization; + + +import com.github.shyiko.mysql.binlog.MariadbGtidSet; +import com.github.shyiko.mysql.binlog.event.MariadbGtidListEventData; +import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream; + +import java.io.IOException; + +/** + * Mariadb GTID_LIST_EVENT Fields + *
+ *  uint<4> Number of GTIDs
+ *  GTID[0]
+ *      uint<4> Replication Domain ID
+ *      uint<4> Server_ID
+ *      uint<8> GTID sequence ...
+ * GTID[n]
+ * 
+ * + * @author Winger + * @see GTID_EVENT for the original doc + */ +public class MariadbGtidListEventDataDeserializer implements EventDataDeserializer { + @Override + public MariadbGtidListEventData deserialize(ByteArrayInputStream inputStream) throws IOException { + MariadbGtidListEventData eventData = new MariadbGtidListEventData(); + long gtidLength = inputStream.readInteger(4); + MariadbGtidSet mariaGTIDSet = new MariadbGtidSet(); + for (int i = 0; i < gtidLength; i++) { + long domainId = inputStream.readInteger(4); + long serverId = inputStream.readInteger(4); + long sequence = inputStream.readLong(8); + mariaGTIDSet.add(new MariadbGtidSet.MariaGtid(domainId, serverId, sequence)); + } + eventData.setMariaGTIDSet(mariaGTIDSet); + return eventData; + } +} diff --git a/src/main/java/com/github/shyiko/mysql/binlog/network/protocol/command/DumpBinaryLogCommand.java b/src/main/java/com/github/shyiko/mysql/binlog/network/protocol/command/DumpBinaryLogCommand.java index 36216c7..b7436aa 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/network/protocol/command/DumpBinaryLogCommand.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/network/protocol/command/DumpBinaryLogCommand.java @@ -24,9 +24,11 @@ */ public class DumpBinaryLogCommand implements Command { + public static final int BINLOG_SEND_ANNOTATE_ROWS_EVENT = 2; private long serverId; private String binlogFilename; private long binlogPosition; + private boolean sendAnnotateRowsEvent; public DumpBinaryLogCommand(long serverId, String binlogFilename, long binlogPosition) { this.serverId = serverId; @@ -34,12 +36,21 @@ public DumpBinaryLogCommand(long serverId, String binlogFilename, long binlogPos this.binlogPosition = binlogPosition; } + public DumpBinaryLogCommand(long serverId, String binlogFilename, long binlogPosition, boolean sendAnnotateRowsEvent) { + this(serverId, binlogFilename, binlogPosition); + this.sendAnnotateRowsEvent = sendAnnotateRowsEvent; + } + @Override public byte[] toByteArray() throws IOException { ByteArrayOutputStream buffer = new ByteArrayOutputStream(); buffer.writeInteger(CommandType.BINLOG_DUMP.ordinal(), 1); buffer.writeLong(this.binlogPosition, 4); - buffer.writeInteger(0, 2); // flag + int binlogFlags = 0; + if (sendAnnotateRowsEvent) { + binlogFlags |= BINLOG_SEND_ANNOTATE_ROWS_EVENT; + } + buffer.writeInteger(binlogFlags, 2); // flag buffer.writeLong(this.serverId, 4); buffer.writeString(this.binlogFilename); return buffer.toByteArray(); diff --git a/src/test/java/com/github/shyiko/mysql/binlog/MariadbBinaryLogClientIntegrationTest.java b/src/test/java/com/github/shyiko/mysql/binlog/MariadbBinaryLogClientIntegrationTest.java new file mode 100644 index 0000000..fac8b57 --- /dev/null +++ b/src/test/java/com/github/shyiko/mysql/binlog/MariadbBinaryLogClientIntegrationTest.java @@ -0,0 +1,84 @@ +package com.github.shyiko.mysql.binlog; + +import com.github.shyiko.mysql.binlog.event.AnnotateRowsEventData; +import com.github.shyiko.mysql.binlog.event.MariadbGtidEventData; +import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer; +import org.testng.annotations.Test; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.concurrent.TimeUnit; + +import static org.testng.Assert.assertNotEquals; +import static org.testng.AssertJUnit.assertNotNull; + +/** + * @author Winger + */ +public class MariadbBinaryLogClientIntegrationTest { + + protected BinaryLogClientIntegrationTest.MySQLConnection master; + + @Test + public void testMariadbUseGTIDAndAnnotateRowsEvent() throws Exception { + master = new BinaryLogClientIntegrationTest.MySQLConnection("127.0.0.1", 3306, "root", ""); + master.execute(new BinaryLogClientIntegrationTest.Callback() { + @Override + public void execute(Statement statement) throws SQLException { + statement.execute("drop database if exists mbcj_test"); + statement.execute("create database mbcj_test"); + statement.execute("use mbcj_test"); + statement.execute("CREATE TABLE if not exists foo (i int)"); + statement.execute("CREATE TABLE if not exists bar (i int)"); + } + }); + // get current gtid + final String[] currentGtidPos = new String[1]; + master.query("show global variables like 'gtid_current_pos%'", new BinaryLogClientIntegrationTest.Callback() { + + @Override + public void execute(ResultSet rs) throws SQLException { + rs.next(); + currentGtidPos[0] = rs.getString(2); + } + }); + + CountDownEventListener eventListener; + MariadbBinaryLogClient client = new MariadbBinaryLogClient("127.0.0.1", 3306, "root", "123456"); + client.setGtidSet(currentGtidPos[0]); + client.setUseSendAnnotateRowsEvent(true); + + EventDeserializer eventDeserializer = new EventDeserializer(); + eventDeserializer.setCompatibilityMode(EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY, + EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG); + client.setEventDeserializer(eventDeserializer); + client.registerEventListener(new TraceEventListener()); + client.registerLifecycleListener(new TraceLifecycleListener()); + client.registerEventListener(eventListener = new CountDownEventListener()); + + master.execute(new BinaryLogClientIntegrationTest.Callback() { + @Override + public void execute(Statement statement) throws SQLException { + statement.execute("INSERT INTO foo set i = 2"); + statement.execute("DROP TABLE IF EXISTS bar"); + } + }); + + try { + eventListener.reset(); + client.connect(); + + eventListener.waitFor(MariadbGtidEventData.class, 1, TimeUnit.SECONDS.toMillis(4)); + String gtidSet = client.getGtidSet(); + assertNotNull(gtidSet); + + eventListener.reset(); + eventListener.waitFor(AnnotateRowsEventData.class, 1, TimeUnit.SECONDS.toMillis(4)); + gtidSet = client.getGtidSet(); + assertNotEquals(currentGtidPos[0], gtidSet); + } finally { + client.disconnect(); + } + } +} diff --git a/src/test/java/com/github/shyiko/mysql/binlog/MariadbGtidSetTest.java b/src/test/java/com/github/shyiko/mysql/binlog/MariadbGtidSetTest.java new file mode 100644 index 0000000..71d5753 --- /dev/null +++ b/src/test/java/com/github/shyiko/mysql/binlog/MariadbGtidSetTest.java @@ -0,0 +1,36 @@ +package com.github.shyiko.mysql.binlog; + +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; + +/** + * @author Winger + */ +public class MariadbGtidSetTest { + + @Test + public void testAdd() { + MariadbGtidSet gtidSet = new MariadbGtidSet("0-102-7255"); + gtidSet.add("0-102-7256"); + gtidSet.add("0-102-7257"); + gtidSet.add("0-102-7259"); + gtidSet.add("1-102-7300"); + assertNotEquals(gtidSet.toString(), "1-102-7300"); + assertNotEquals(gtidSet.toString(), "0-102-7259"); + assertEquals(gtidSet.toString(), "0-102-7259,1-102-7300"); + } + + @Test + public void testEmptySet() { + assertEquals(new MariadbGtidSet("").toString(), ""); + } + + @Test + public void testEquals() { + assertEquals(new MariadbGtidSet(""), new MariadbGtidSet(null)); + assertEquals(new MariadbGtidSet(""), new MariadbGtidSet("")); + assertEquals(new MariadbGtidSet("0-0-7404"), new MariadbGtidSet("0-0-7404")); + } +} diff --git a/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/AnnotateRowsEventDataDeserializerTest.java b/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/AnnotateRowsEventDataDeserializerTest.java new file mode 100644 index 0000000..e117d32 --- /dev/null +++ b/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/AnnotateRowsEventDataDeserializerTest.java @@ -0,0 +1,27 @@ +package com.github.shyiko.mysql.binlog.event.deserialization; + +import com.github.shyiko.mysql.binlog.event.AnnotateRowsEventData; +import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream; +import org.junit.Test; + +import java.io.IOException; + +import static junit.framework.Assert.assertEquals; + +/** + * @author Winger + */ +public class AnnotateRowsEventDataDeserializerTest { + + private static final byte[] DATA = {73, 78, 83, 69, 82, 84, 32, 73, 78, 84, 79, 32, 102, 111, 111, 32, 115, 101, 116, 32, 105, 32, 61, 32, 50}; + + private static final String sql = "INSERT INTO foo set i = 2"; + + @Test + public void deserialize() throws IOException { + AnnotateRowsEventDataDeserializer deserializer = new AnnotateRowsEventDataDeserializer(); + AnnotateRowsEventData eventData = deserializer.deserialize(new ByteArrayInputStream(DATA)); + + assertEquals(sql, eventData.getRowsQuery()); + } +} diff --git a/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/MariadbGtidEventDataDeserializerTest.java b/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/MariadbGtidEventDataDeserializerTest.java new file mode 100644 index 0000000..f6e703b --- /dev/null +++ b/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/MariadbGtidEventDataDeserializerTest.java @@ -0,0 +1,26 @@ +package com.github.shyiko.mysql.binlog.event.deserialization; + +import com.github.shyiko.mysql.binlog.event.MariadbGtidEventData; +import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream; +import org.junit.Test; + +import java.io.IOException; + +import static junit.framework.Assert.assertEquals; + +/** + * @author Winger + */ +public class MariadbGtidEventDataDeserializerTest { + + private static final byte[] DATA = {-20, 28, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 12, 0, 0, 0, 0, 0, 0, 121}; + + private static final String GTID_SET = "0-0-7404"; + + @Test + public void deserialize() throws IOException { + MariadbGtidEventDataDeserializer deserializer = new MariadbGtidEventDataDeserializer(); + MariadbGtidEventData eventData = deserializer.deserialize(new ByteArrayInputStream(DATA)); + assertEquals(GTID_SET, eventData.toString()); + } +} diff --git a/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/MariadbGtidListEventDataDeserializerTest.java b/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/MariadbGtidListEventDataDeserializerTest.java new file mode 100644 index 0000000..3a6f33f --- /dev/null +++ b/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/MariadbGtidListEventDataDeserializerTest.java @@ -0,0 +1,26 @@ +package com.github.shyiko.mysql.binlog.event.deserialization; + +import com.github.shyiko.mysql.binlog.event.MariadbGtidListEventData; +import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream; +import org.junit.Test; + +import java.io.IOException; + +import static junit.framework.Assert.assertEquals; + +/** + * @author Winger + */ +public class MariadbGtidListEventDataDeserializerTest { + + private static final byte[] DATA = {1, 0, 0, 0, 0, 0, 0, 0, 102, 0, 0, 0, 87, 28, 0, 0, 0, 0, 0, 0, 77}; + + private static final String GTID_SET_LIST = "MariadbGtidListEventData{mariaGTIDSet=0-102-7255}"; + + @Test + public void deserialize() throws IOException { + MariadbGtidListEventDataDeserializer deserializer = new MariadbGtidListEventDataDeserializer(); + MariadbGtidListEventData eventData = deserializer.deserialize(new ByteArrayInputStream(DATA)); + assertEquals(GTID_SET_LIST, eventData.toString()); + } +}