4
4
use MySQLReplication \BinaryDataReader \BinaryDataReader ;
5
5
use MySQLReplication \BinLog \Exception \BinLogException ;
6
6
use MySQLReplication \Config \Config ;
7
- use MySQLReplication \Repository \MySQLRepository ;
8
7
use MySQLReplication \Definitions \ConstCapabilityFlags ;
9
8
use MySQLReplication \Definitions \ConstCommand ;
10
- use MySQLReplication \Gtid \GtidCollection ;
9
+ use MySQLReplication \Gtid \GtidService ;
10
+ use MySQLReplication \Repository \MySQLRepository ;
11
11
12
12
/**
13
13
* Class BinLogConnect
@@ -35,10 +35,6 @@ class BinLogConnect
35
35
* @var BinLogAuth
36
36
*/
37
37
private $ packAuth ;
38
- /**
39
- * @var GtidCollection
40
- */
41
- private $ gtidCollection ;
42
38
/**
43
39
* http://dev.mysql.com/doc/internals/en/auth-phase-fast-path.html 00 FE
44
40
* @var array
@@ -49,24 +45,21 @@ class BinLogConnect
49
45
* @param Config $config
50
46
* @param MySQLRepository $mySQLRepository
51
47
* @param BinLogAuth $packAuth
52
- * @param GtidCollection $gtidCollection
48
+ * @param GtidService $gtidService
53
49
*/
54
50
public function __construct (
55
51
Config $ config ,
56
52
MySQLRepository $ mySQLRepository ,
57
53
BinLogAuth $ packAuth ,
58
- GtidCollection $ gtidCollection
54
+ GtidService $ gtidService
59
55
) {
60
56
$ this ->mySQLRepository = $ mySQLRepository ;
61
57
$ this ->config = $ config ;
62
58
$ this ->packAuth = $ packAuth ;
63
- $ this ->gtidCollection = $ gtidCollection ;
59
+ $ this ->gtidService = $ gtidService ;
64
60
}
65
61
66
- /**
67
- *
68
- */
69
- public function __destruct ()
62
+ public function disconnect ()
70
63
{
71
64
if (true === $ this ->isConnected ())
72
65
{
@@ -91,7 +84,6 @@ public function getCheckSum()
91
84
return $ this ->checkSum ;
92
85
}
93
86
94
-
95
87
/**
96
88
* @throws BinLogException
97
89
*/
@@ -141,9 +133,32 @@ public function getPacket($checkForOkByte = true)
141
133
{
142
134
$ this ->isWriteSuccessful ($ result );
143
135
}
136
+
144
137
return $ result ;
145
138
}
146
139
140
+ /**
141
+ * @param $length
142
+ * @return mixed
143
+ * @throws BinLogException
144
+ */
145
+ private function readFromSocket ($ length )
146
+ {
147
+ $ received = socket_recv ($ this ->socket , $ buf , $ length , MSG_WAITALL );
148
+ if ($ length === $ received )
149
+ {
150
+ return $ buf ;
151
+ }
152
+
153
+ // http://php.net/manual/pl/function.socket-recv.php#47182
154
+ if (0 === $ received )
155
+ {
156
+ throw new BinLogException ('Disconnected by remote side ' );
157
+ }
158
+
159
+ throw new BinLogException (socket_strerror (socket_last_error ()), socket_last_error ());
160
+ }
161
+
147
162
/**
148
163
* @param $packet
149
164
* @return array
@@ -170,28 +185,6 @@ public function isWriteSuccessful($packet)
170
185
}
171
186
}
172
187
173
- /**
174
- * @param $length
175
- * @return mixed
176
- * @throws BinLogException
177
- */
178
- private function readFromSocket ($ length )
179
- {
180
- $ received = socket_recv ($ this ->socket , $ buf , $ length , MSG_WAITALL );
181
- if ($ length === $ received )
182
- {
183
- return $ buf ;
184
- }
185
-
186
- // http://php.net/manual/pl/function.socket-recv.php#47182
187
- if (0 === $ received )
188
- {
189
- throw new BinLogException ('Disconnected by remote side ' );
190
- }
191
-
192
- throw new BinLogException (socket_strerror (socket_last_error ()), socket_last_error ());
193
- }
194
-
195
188
/**
196
189
* @throws BinLogException
197
190
*/
@@ -231,49 +224,16 @@ private function getBinlogStream()
231
224
$ this ->execute ('SET @master_binlog_checksum=@@global.binlog_checksum ' );
232
225
}
233
226
234
- if (0 === $ this ->gtidCollection ->count ())
235
- {
236
- $ binFilePos = $ this ->config ->getBinLogPosition ();
237
- $ binFileName = $ this ->config ->getBinLogFileName ();
227
+ $ this ->registerSlave ();
238
228
239
- if ('' !== $ this ->config ->getMariaDbGtid ())
240
- {
241
- $ this ->execute ("SET @mariadb_slave_capability = 4 " );
242
- $ this ->execute ("SET @slave_connect_state = ' " . $ this ->config ->getMariaDbGtid () . "' " );
243
- $ this ->execute ("SET @slave_gtid_strict_mode = 0 " );
244
- $ this ->execute ("SET @slave_gtid_ignore_duplicates = 0 " );
245
- }
246
-
247
- if ('' === $ binFilePos || '' === $ binFileName )
248
- {
249
- $ master = $ this ->mySQLRepository ->getMasterStatus ();
250
- $ binFilePos = $ master ['Position ' ];
251
- $ binFileName = $ master ['File ' ];
252
- }
253
-
254
- $ prelude = pack ('i ' , strlen ($ binFileName ) + 11 ) . chr (ConstCommand::COM_BINLOG_DUMP );
255
- $ prelude .= pack ('I ' , $ binFilePos );
256
- $ prelude .= pack ('v ' , 0 );
257
- $ prelude .= pack ('I ' , $ this ->config ->getSlaveId ());
258
- $ prelude .= $ binFileName ;
229
+ if ('' !== $ this ->config ->getGtid ())
230
+ {
231
+ $ this ->setBinLogDumpGtid ();
259
232
}
260
233
else
261
234
{
262
- $ prelude = pack ('l ' , 26 + $ this ->gtidCollection ->getEncodedLength ()) . chr (ConstCommand::COM_BINLOG_DUMP_GTID );
263
- $ prelude .= pack ('S ' , 0 );
264
- $ prelude .= pack ('I ' , $ this ->config ->getSlaveId ());
265
- $ prelude .= pack ('I ' , 3 );
266
- $ prelude .= chr (0 );
267
- $ prelude .= chr (0 );
268
- $ prelude .= chr (0 );
269
- $ prelude .= BinaryDataReader::pack64bit (4 );
270
-
271
- $ prelude .= pack ('I ' , $ this ->gtidCollection ->getEncodedLength ());
272
- $ prelude .= $ this ->gtidCollection ->getEncoded ();
235
+ $ this ->setBinLogDump ();
273
236
}
274
-
275
- $ this ->writeToSocket ($ prelude );
276
- $ this ->getPacket ();
277
237
}
278
238
279
239
/**
@@ -288,4 +248,81 @@ private function execute($sql)
288
248
$ this ->writeToSocket ($ prelude . $ sql );
289
249
$ this ->getPacket ();
290
250
}
251
+
252
+ /**
253
+ * @see https://dev.mysql.com/doc/internals/en/com-register-slave.html
254
+ * @throws BinLogException
255
+ */
256
+ private function registerSlave ()
257
+ {
258
+ $ prelude = pack ('l ' , 18 ) . chr (ConstCommand::COM_REGISTER_SLAVE );
259
+ $ prelude .= pack ('I ' , $ this ->config ->getSlaveId ());
260
+ $ prelude .= chr (0 );
261
+ $ prelude .= chr (0 );
262
+ $ prelude .= chr (0 );
263
+ $ prelude .= pack ('s ' , '' );
264
+ $ prelude .= pack ('I ' , 0 );
265
+ $ prelude .= pack ('I ' , 0 );
266
+
267
+ $ this ->writeToSocket ($ prelude );
268
+ $ this ->getPacket ();
269
+ }
270
+
271
+ /**
272
+ * @see https://dev.mysql.com/doc/internals/en/com-binlog-dump-gtid.html
273
+ * @throws BinLogException
274
+ */
275
+ private function setBinLogDumpGtid ()
276
+ {
277
+ $ collection = $ this ->gtidService ->makeCollectionFromString ($ this ->config ->getGtid ());
278
+
279
+ $ prelude = pack ('l ' , 26 + $ collection ->getEncodedLength ()) . chr (ConstCommand::COM_BINLOG_DUMP_GTID );
280
+ $ prelude .= pack ('S ' , 0 );
281
+ $ prelude .= pack ('I ' , $ this ->config ->getSlaveId ());
282
+ $ prelude .= pack ('I ' , 3 );
283
+ $ prelude .= chr (0 );
284
+ $ prelude .= chr (0 );
285
+ $ prelude .= chr (0 );
286
+ $ prelude .= BinaryDataReader::pack64bit (4 );
287
+
288
+ $ prelude .= pack ('I ' , $ collection ->getEncodedLength ());
289
+ $ prelude .= $ collection ->getEncoded ();
290
+
291
+ $ this ->writeToSocket ($ prelude );
292
+ $ this ->getPacket ();
293
+ }
294
+
295
+ /**
296
+ * @see https://dev.mysql.com/doc/internals/en/com-binlog-dump.html
297
+ * @throws BinLogException
298
+ */
299
+ private function setBinLogDump ()
300
+ {
301
+ $ binFilePos = $ this ->config ->getBinLogPosition ();
302
+ $ binFileName = $ this ->config ->getBinLogFileName ();
303
+
304
+ if ('' !== $ this ->config ->getMariaDbGtid ())
305
+ {
306
+ $ this ->execute ('SET @mariadb_slave_capability = 4 ' );
307
+ $ this ->execute ('SET @slave_connect_state = \'' . $ this ->config ->getMariaDbGtid () . '\'' );
308
+ $ this ->execute ('SET @slave_gtid_strict_mode = 0 ' );
309
+ $ this ->execute ('SET @slave_gtid_ignore_duplicates = 0 ' );
310
+ }
311
+
312
+ if ('' === $ binFilePos || '' === $ binFileName )
313
+ {
314
+ $ master = $ this ->mySQLRepository ->getMasterStatus ();
315
+ $ binFilePos = $ master ['Position ' ];
316
+ $ binFileName = $ master ['File ' ];
317
+ }
318
+
319
+ $ prelude = pack ('i ' , strlen ($ binFileName ) + 11 ) . chr (ConstCommand::COM_BINLOG_DUMP );
320
+ $ prelude .= pack ('I ' , $ binFilePos );
321
+ $ prelude .= pack ('v ' , 0 );
322
+ $ prelude .= pack ('I ' , $ this ->config ->getSlaveId ());
323
+ $ prelude .= $ binFileName ;
324
+
325
+ $ this ->writeToSocket ($ prelude );
326
+ $ this ->getPacket ();
327
+ }
291
328
}
0 commit comments