Skip to content

Commit 1f27791

Browse files
committed
refact to support mariadb types
1 parent 3598de7 commit 1f27791

File tree

7 files changed

+288
-95
lines changed

7 files changed

+288
-95
lines changed

src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java

Lines changed: 80 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,24 @@
1515
*/
1616
package com.github.shyiko.mysql.binlog;
1717

18-
import com.github.shyiko.mysql.binlog.event.*;
19-
import com.github.shyiko.mysql.binlog.event.deserialization.*;
18+
import com.github.shyiko.mysql.binlog.event.AnnotateRowsEventData;
19+
import com.github.shyiko.mysql.binlog.event.Event;
20+
import com.github.shyiko.mysql.binlog.event.EventHeader;
21+
import com.github.shyiko.mysql.binlog.event.EventHeaderV4;
22+
import com.github.shyiko.mysql.binlog.event.EventType;
23+
import com.github.shyiko.mysql.binlog.event.GtidEventData;
24+
import com.github.shyiko.mysql.binlog.event.MariadbGtidEventData;
25+
import com.github.shyiko.mysql.binlog.event.MariadbGtidListEventData;
26+
import com.github.shyiko.mysql.binlog.event.QueryEventData;
27+
import com.github.shyiko.mysql.binlog.event.RotateEventData;
28+
import com.github.shyiko.mysql.binlog.event.deserialization.ChecksumType;
29+
import com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException;
30+
import com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializer;
31+
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
2032
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.EventDataWrapper;
33+
import com.github.shyiko.mysql.binlog.event.deserialization.GtidEventDataDeserializer;
34+
import com.github.shyiko.mysql.binlog.event.deserialization.QueryEventDataDeserializer;
35+
import com.github.shyiko.mysql.binlog.event.deserialization.RotateEventDataDeserializer;
2136
import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
2237
import com.github.shyiko.mysql.binlog.jmx.BinaryLogClientMXBean;
2338
import com.github.shyiko.mysql.binlog.network.AuthenticationException;
@@ -34,7 +49,12 @@
3449
import com.github.shyiko.mysql.binlog.network.protocol.Packet;
3550
import com.github.shyiko.mysql.binlog.network.protocol.PacketChannel;
3651
import com.github.shyiko.mysql.binlog.network.protocol.ResultSetRowPacket;
37-
import com.github.shyiko.mysql.binlog.network.protocol.command.*;
52+
import com.github.shyiko.mysql.binlog.network.protocol.command.Command;
53+
import com.github.shyiko.mysql.binlog.network.protocol.command.DumpBinaryLogCommand;
54+
import com.github.shyiko.mysql.binlog.network.protocol.command.DumpBinaryLogGtidCommand;
55+
import com.github.shyiko.mysql.binlog.network.protocol.command.PingCommand;
56+
import com.github.shyiko.mysql.binlog.network.protocol.command.QueryCommand;
57+
import com.github.shyiko.mysql.binlog.network.protocol.command.SSLRequestCommand;
3858

3959
import javax.net.ssl.SSLContext;
4060
import javax.net.ssl.TrustManager;
@@ -115,14 +135,12 @@ public X509Certificate[] getAcceptedIssuers() {
115135
private volatile long connectionId;
116136
private SSLMode sslMode = SSLMode.DISABLED;
117137

118-
private GtidSet gtidSet;
119-
private final Object gtidSetAccessLock = new Object();
138+
protected GtidSet gtidSet;
139+
protected final Object gtidSetAccessLock = new Object();
120140
private boolean gtidSetFallbackToPurged;
121141
private boolean useBinlogFilenamePositionInGtidMode;
122142
private String gtid;
123143
private boolean tx;
124-
private boolean isMariadb = false;
125-
private boolean mariadbSendAnnotateRowsEvent = false;
126144

127145
private EventDeserializer eventDeserializer = new EventDeserializer();
128146

@@ -132,7 +150,7 @@ public X509Certificate[] getAcceptedIssuers() {
132150
private SocketFactory socketFactory;
133151
private SSLSocketFactory sslSocketFactory;
134152

135-
private volatile PacketChannel channel;
153+
protected volatile PacketChannel channel;
136154
private volatile boolean connected;
137155
private volatile long masterServerId = -1;
138156

@@ -317,15 +335,14 @@ public void setGtidSet(String gtidSet) {
317335
this.binlogFilename = "";
318336
}
319337
synchronized (gtidSetAccessLock) {
320-
// mariadb GtidSet format will be domainId-serverId-sequence
321-
if (gtidSet != null && !gtidSet.contains(":")) {
322-
this.gtidSet = new MariadbGtidSet(gtidSet);
323-
} else {
324-
this.gtidSet = gtidSet != null ? new GtidSet(gtidSet) : null;
325-
}
338+
this.gtidSet = gtidSet != null ? buildGtidSet(gtidSet) : null;
326339
}
327340
}
328341

342+
protected GtidSet buildGtidSet(String gtidSet) {
343+
return new GtidSet(gtidSet);
344+
}
345+
329346
/**
330347
* @see #setGtidSetFallbackToPurged(boolean)
331348
* @return whether gtid_purged is used as a fallback
@@ -488,19 +505,6 @@ public void setThreadFactory(ThreadFactory threadFactory) {
488505
this.threadFactory = threadFactory;
489506
}
490507

491-
public boolean isMariadbSendAnnotateRowsEvent() {
492-
return mariadbSendAnnotateRowsEvent;
493-
}
494-
495-
/**
496-
* Only in Mariadb, if set true, the Slave server connects with the BINLOG_SEND_ANNOTATE_ROWS_EVENT flag (value is 2)
497-
* in the COM_BINLOG_DUMP Slave Registration phase
498-
* @param mariadbSendAnnotateRowsEvent
499-
*/
500-
public void setMariadbSendAnnotateRowsEvent(boolean mariadbSendAnnotateRowsEvent) {
501-
this.mariadbSendAnnotateRowsEvent = mariadbSendAnnotateRowsEvent;
502-
}
503-
504508
/**
505509
* Connect to the replication stream. Note that this method blocks until disconnected.
506510
* @throws AuthenticationException if authentication fails
@@ -538,17 +542,8 @@ public void connect() throws IOException, IllegalStateException {
538542
channel.authenticationComplete();
539543

540544
connectionId = greetingPacket.getThreadId();
541-
isMariadb = greetingPacket.getServerVersion().toLowerCase().contains("mariadb");
542545
if ("".equals(binlogFilename)) {
543-
synchronized (gtidSetAccessLock) {
544-
if (gtidSet != null && "".equals(gtidSet.toString()) && gtidSetFallbackToPurged) {
545-
if (isMariadb) {
546-
gtidSet = new MariadbGtidSet(fetchGtidPurged());
547-
} else {
548-
gtidSet = new GtidSet(fetchGtidPurged());
549-
}
550-
}
551-
}
546+
setupGtidSet();
552547
}
553548
if (binlogFilename == null) {
554549
fetchBinlogFilenameAndPosition();
@@ -597,13 +592,7 @@ public void connect() throws IOException, IllegalStateException {
597592
ensureEventDataDeserializer(EventType.ROTATE, RotateEventDataDeserializer.class);
598593
synchronized (gtidSetAccessLock) {
599594
if (gtidSet != null) {
600-
ensureEventDataDeserializer(EventType.GTID, GtidEventDataDeserializer.class);
601-
ensureEventDataDeserializer(EventType.QUERY, QueryEventDataDeserializer.class);
602-
if (isMariadb) {
603-
ensureEventDataDeserializer(EventType.ANNOTATE_ROWS, AnnotateRowsEventDataDeserializer.class);
604-
ensureEventDataDeserializer(EventType.MARIADB_GTID, MariadbGtidEventDataDeserializer.class);
605-
ensureEventDataDeserializer(EventType.MARIADB_GTID_LIST, MariadbGtidListEventDataDeserializer.class);
606-
}
595+
ensureGtidEventDataDeserializer();
607596
}
608597
}
609598
listenForEventPackets();
@@ -676,7 +665,7 @@ public Object call() throws Exception {
676665
};
677666
}
678667

679-
private void checkError(byte[] packet) throws IOException {
668+
protected void checkError(byte[] packet) throws IOException {
680669
if (packet[0] == (byte) 0xFF /* error */) {
681670
byte[] bytes = Arrays.copyOfRange(packet, 1, packet.length);
682671
ErrorPacket errorPacket = new ErrorPacket(bytes);
@@ -720,7 +709,6 @@ private boolean tryUpgradeToSSL(GreetingPacket greetingPacket) throws IOExceptio
720709
return false;
721710
}
722711

723-
724712
private void enableHeartbeat() throws IOException {
725713
channel.write(new QueryCommand("set @master_heartbeat_period=" + heartbeatInterval * 1000000));
726714
byte[] statementResult = channel.read();
@@ -735,35 +723,23 @@ private void setMasterServerId() throws IOException {
735723
}
736724
}
737725

738-
private void requestBinaryLogStream() throws IOException {
726+
protected void requestBinaryLogStream() throws IOException {
739727
long serverId = blocking ? this.serverId : 0; // http://bugs.mysql.com/bug.php?id=71178
740728
Command dumpBinaryLogCommand;
741729
synchronized (gtidSetAccessLock) {
742730
if (gtidSet != null) {
743-
if (isMariadb) {
744-
channel.write(new QueryCommand("SET @mariadb_slave_capability=4"));
745-
checkError(channel.read());
746-
channel.write(new QueryCommand("SET @slave_connect_state = '" + gtidSet.toString() + "'"));
747-
checkError(channel.read());
748-
channel.write(new QueryCommand("SET @slave_gtid_strict_mode = 0"));
749-
checkError(channel.read());
750-
channel.write(new QueryCommand("SET @slave_gtid_ignore_duplicates = 0"));
751-
checkError(channel.read());
752-
dumpBinaryLogCommand = new DumpBinaryLogCommand(serverId, "", 0L, isMariadbSendAnnotateRowsEvent());
753-
} else {
754-
dumpBinaryLogCommand = new DumpBinaryLogGtidCommand(serverId,
755-
useBinlogFilenamePositionInGtidMode ? binlogFilename : "",
756-
useBinlogFilenamePositionInGtidMode ? binlogPosition : 4,
757-
gtidSet);
758-
}
731+
dumpBinaryLogCommand = new DumpBinaryLogGtidCommand(serverId,
732+
useBinlogFilenamePositionInGtidMode ? binlogFilename : "",
733+
useBinlogFilenamePositionInGtidMode ? binlogPosition : 4,
734+
gtidSet);
759735
} else {
760736
dumpBinaryLogCommand = new DumpBinaryLogCommand(serverId, binlogFilename, binlogPosition);
761737
}
762738
}
763739
channel.write(dumpBinaryLogCommand);
764740
}
765741

766-
private void ensureEventDataDeserializer(EventType eventType,
742+
protected void ensureEventDataDeserializer(EventType eventType,
767743
Class<? extends EventDataDeserializer> eventDataDeserializerClass) {
768744
EventDataDeserializer eventDataDeserializer = eventDeserializer.getEventDataDeserializer(eventType);
769745
if (eventDataDeserializer.getClass() != eventDataDeserializerClass &&
@@ -780,6 +756,10 @@ private void ensureEventDataDeserializer(EventType eventType,
780756
}
781757
}
782758

759+
protected void ensureGtidEventDataDeserializer() {
760+
ensureEventDataDeserializer(EventType.GTID, GtidEventDataDeserializer.class);
761+
ensureEventDataDeserializer(EventType.QUERY, QueryEventDataDeserializer.class);
762+
}
783763

784764
private void spawnKeepAliveThread() {
785765
final ExecutorService threadExecutor =
@@ -924,6 +904,14 @@ private String fetchGtidPurged() throws IOException {
924904
return "";
925905
}
926906

907+
protected void setupGtidSet() throws IOException{
908+
synchronized (gtidSetAccessLock) {
909+
if (gtidSet != null && "".equals(gtidSet.toString()) && gtidSetFallbackToPurged) {
910+
gtidSet = new GtidSet(fetchGtidPurged());
911+
}
912+
}
913+
}
914+
927915
private void fetchBinlogFilenameAndPosition() throws IOException {
928916
ResultSetRowPacket[] resultSet;
929917
channel.write(new QueryCommand("show master status"));
@@ -1025,7 +1013,7 @@ private byte[] readPacketSplitInChunks(ByteArrayInputStream inputStream, int pac
10251013
return result;
10261014
}
10271015

1028-
private void updateClientBinlogFilenameAndPosition(Event event) {
1016+
protected void updateClientBinlogFilenameAndPosition(Event event) {
10291017
EventHeader eventHeader = event.getHeader();
10301018
EventType eventType = eventHeader.getEventType();
10311019
if (eventType == EventType.ROTATE) {
@@ -1044,7 +1032,7 @@ private void updateClientBinlogFilenameAndPosition(Event event) {
10441032
}
10451033
}
10461034

1047-
private void updateGtidSet(Event event) {
1035+
protected void updateGtidSet(Event event) {
10481036
synchronized (gtidSetAccessLock) {
10491037
if (gtidSet == null) {
10501038
return;
@@ -1070,34 +1058,39 @@ private void updateGtidSet(Event event) {
10701058
tx = false;
10711059
break;
10721060
case QUERY:
1073-
case ANNOTATE_ROWS:
1074-
String sql;
1075-
if (eventHeader.getEventType() == EventType.QUERY) {
1076-
QueryEventData queryEventData = (QueryEventData) EventDataWrapper.internal(event.getData());
1077-
sql = queryEventData.getSql();
1078-
} else {
1079-
AnnotateRowsEventData annotateRowsEventData = (AnnotateRowsEventData) EventDataWrapper.internal(event.getData());
1080-
sql = annotateRowsEventData.getRowsQuery();
1081-
}
1082-
1061+
QueryEventData queryEventData = (QueryEventData) EventDataWrapper.internal(event.getData());
1062+
String sql = queryEventData.getSql();
10831063
if (sql == null) {
10841064
break;
10851065
}
1086-
if ("BEGIN".equals(sql)) {
1087-
tx = true;
1088-
} else
1089-
if ("COMMIT".equals(sql) || "ROLLBACK".equals(sql)) {
1090-
commitGtid();
1091-
tx = false;
1092-
} else
1093-
if (!tx) {
1094-
// auto-commit query, likely DDL
1095-
commitGtid();
1066+
commitGtid(sql);
1067+
break;
1068+
case ANNOTATE_ROWS:
1069+
AnnotateRowsEventData annotateRowsEventData = (AnnotateRowsEventData) EventDeserializer.EventDataWrapper.internal(event.getData());
1070+
sql = annotateRowsEventData.getRowsQuery();
1071+
if (sql == null) {
1072+
break;
10961073
}
1074+
commitGtid(sql);
1075+
break;
10971076
default:
10981077
}
10991078
}
11001079

1080+
protected void commitGtid(String sql) {
1081+
if ("BEGIN".equals(sql)) {
1082+
tx = true;
1083+
} else
1084+
if ("COMMIT".equals(sql) || "ROLLBACK".equals(sql)) {
1085+
commitGtid();
1086+
tx = false;
1087+
} else
1088+
if (!tx) {
1089+
// auto-commit query, likely DDL
1090+
commitGtid();
1091+
}
1092+
}
1093+
11011094
private void commitGtid() {
11021095
if (gtid != null) {
11031096
synchronized (gtidSetAccessLock) {
@@ -1308,7 +1301,7 @@ public interface LifecycleListener {
13081301
/**
13091302
* Default (no-op) implementation of {@link LifecycleListener}.
13101303
*/
1311-
public static abstract class AbstractLifecycleListener implements LifecycleListener {
1304+
public static abstract class AbstractLifecycleListener implements LifecycleListener {
13121305

13131306
public void onConnect(BinaryLogClient client) { }
13141307

0 commit comments

Comments
 (0)