Skip to content

Commit d82950f

Browse files
LordofAvernusffffwh
authored and
ffffwh
committed
lock/unlock tables for schema snapshot
1 parent 2a3aa7c commit d82950f

File tree

4 files changed

+105
-24
lines changed

4 files changed

+105
-24
lines changed

.gitignore

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,4 @@ bin/
44
dist
55
.vagrant
66
.scannerwork
7-
.vscode/launch.json
7+
.vscode/

driver/oracle/config/db_config.go

+49-4
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ type OracleDB struct {
2222
_db *sql.DB
2323
LogMinerConn *sql.Conn
2424
MetaDataConn *sql.Conn
25+
SCN int64
2526
}
2627

2728
func (m *OracleConfig) ConnectString() string {
@@ -110,13 +111,18 @@ func (o *OracleDB) NLS_DATE_FORMAT() error {
110111
return nil
111112
}
112113
func (o *OracleDB) GetTables(schema string) ([]string, error) {
114+
asOfSCN := ""
115+
// if o.SCN != 0 {
116+
// asOfSCN = fmt.Sprintf("AS OF SCN %d", o.SCN)
117+
// }
113118
query := fmt.Sprintf(`
114119
SELECT
115120
table_name
116121
FROM
117122
all_tables
123+
%s
118124
WHERE
119-
owner = '%s'`, schema)
125+
owner = '%s'`, asOfSCN, schema)
120126

121127
rows, err := o.MetaDataConn.QueryContext(context.TODO(), query)
122128
if err != nil {
@@ -137,12 +143,17 @@ WHERE
137143
}
138144

139145
func (o *OracleDB) GetSchemas() ([]string, error) {
140-
query := `SELECT
146+
asOfSCN := ""
147+
// if o.SCN != 0 {
148+
// asOfSCN = fmt.Sprintf("AS OF SCN %d", o.SCN)
149+
// }
150+
query := fmt.Sprintf(`SELECT
141151
USERNAME
142152
FROM
143153
DBA_USERS
154+
%s
144155
WHERE
145-
USERNAME NOT IN ( 'SYS', 'SYSTEM', 'ANONYMOUS', 'APEX_PUBLIC_USER', 'APEX_040000', 'OUTLN', 'XS$NULL', 'FLOWS_FILES', 'MDSYS', 'CTXSYS', 'XDB', 'HR' )`
156+
USERNAME NOT IN ( 'SYS', 'SYSTEM', 'ANONYMOUS', 'APEX_PUBLIC_USER', 'APEX_040000', 'OUTLN', 'XS$NULL', 'FLOWS_FILES', 'MDSYS', 'CTXSYS', 'XDB', 'HR' )`, asOfSCN)
146157

147158
rows, err := o.MetaDataConn.QueryContext(context.TODO(), query)
148159
if err != nil {
@@ -163,11 +174,16 @@ func (o *OracleDB) GetSchemas() ([]string, error) {
163174
}
164175

165176
func (o *OracleDB) GetColumns(schema, table string) ([]string, error) {
177+
asOfSCN := ""
178+
// if o.SCN != 0 {
179+
// asOfSCN = fmt.Sprintf("AS OF SCN %d", o.SCN)
180+
// }
166181
query := fmt.Sprintf(`SELECT column_name
167182
FROM all_tab_cols
183+
%s
168184
WHERE table_name = '%s'
169185
AND owner = '%s'
170-
ORDER BY COLUMN_ID`, table, schema)
186+
ORDER BY COLUMN_ID`, asOfSCN, table, schema)
171187

172188
rows, err := o.MetaDataConn.QueryContext(context.TODO(), query)
173189
if err != nil {
@@ -201,3 +217,32 @@ SELECT dbms_metadata.get_ddl('TABLE','%s','%s') FROM dual`, table, schema))
201217
}
202218
return query, nil
203219
}
220+
221+
func (o *OracleDB) NewTx(ctx context.Context) (*sql.Tx, error) {
222+
tx, err := o._db.BeginTx(ctx, &sql.TxOptions{})
223+
if err != nil {
224+
return nil, err
225+
}
226+
return tx, nil
227+
}
228+
229+
func (o *OracleDB) GetCurrentSnapshotSCN() (int64, error) {
230+
var globalSCN int64
231+
// 获取当前 SCN 号
232+
err := o.MetaDataConn.QueryRowContext(context.TODO(), "SELECT CURRENT_SCN FROM V$DATABASE").Scan(&globalSCN)
233+
if err != nil {
234+
return 0, err
235+
}
236+
return globalSCN, nil
237+
}
238+
239+
func (o *OracleDB) InitSCN(scn int64) (err error) {
240+
if scn == 0 {
241+
scn, err = o.GetCurrentSnapshotSCN()
242+
if err != nil {
243+
return err
244+
}
245+
}
246+
o.SCN = scn
247+
return nil
248+
}

driver/oracle/extractor/extractor_oracle.go

+53-7
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
package extractor
88

99
import (
10+
"context"
11+
"database/sql"
1012
gosql "database/sql"
1113
"encoding/binary"
1214
"fmt"
@@ -408,6 +410,10 @@ func (e *ExtractorOracle) Finish1() (err error) {
408410
}
409411

410412
func (e *ExtractorOracle) getSchemaTablesAndMeta() error {
413+
err := e.oracleDB.InitSCN(e.mysqlContext.OracleConfig.Scn)
414+
if err != nil {
415+
return err
416+
}
411417
if err := e.inspectTables(); err != nil {
412418
return err
413419
}
@@ -864,22 +870,35 @@ func (e *ExtractorOracle) CheckAndApplyLowerCaseTableNames() {
864870
}
865871

866872
func (e *ExtractorOracle) oracleDump() error {
867-
// step 1 : todo lock row
868-
// query : lock table schema.table in row share mode;
869873
if err := e.getSchemaTablesAndMeta(); err != nil {
870874
e.onError(common.TaskStateDead, err)
871875
return err
872876
}
873877

874-
// step 2 : get current scn for d.Dump()
875-
currentSCN, err := (&LogMinerStream{oracleDB: e.oracleDB}).GetCurrentSnapshotSCN()
878+
// step 1 : lock table schema.table in row share mode;
879+
tx, err := e.oracleDB.NewTx(context.TODO())
876880
if err != nil {
881+
e.onError(common.TaskStateDead, err)
877882
return err
878883
}
879884

880-
// step 3 : defer unlock row
885+
err = e.LockTablesForSchema(tx, context.TODO())
886+
if err != nil {
887+
e.onError(common.TaskStateDead, err)
888+
return err
889+
}
890+
defer func() {
891+
if tx != nil {
892+
tx.Rollback()
893+
}
894+
}()
895+
// step 2 : get current scn for d.Dump()
896+
// currentSCN, err := e.oracleDB.GetCurrentSnapshotSCN()
897+
// if err != nil {
898+
// return err
899+
// }
881900

882-
// step 4 : create table ddl
901+
// step 3 : create table ddl
883902
if !e.mysqlContext.SkipCreateDbTable {
884903
e.logger.Info("generating DROP and CREATE statements to reflect current database schemas",
885904
"replicateDoDb", e.replicateDoDb)
@@ -931,11 +950,17 @@ func (e *ExtractorOracle) oracleDump() error {
931950
}
932951
}
933952
}
953+
954+
// step 4: rollback for unlock table
955+
if tx != nil {
956+
tx.Rollback()
957+
}
958+
934959
// step 5: Dump all of the tables and generate source records ...
935960
// todo need merged dumper with mysql dumper
936961
for _, db := range e.replicateDoDb {
937962
for _, t := range db.Tables {
938-
d := NewDumper(e.oracleDB, t, e.mysqlContext.ChunkSize, e.logger.ResetNamed("dumper"), e.memory1, currentSCN)
963+
d := NewDumper(e.oracleDB, t, e.mysqlContext.ChunkSize, e.logger.ResetNamed("dumper"), e.memory1, e.oracleDB.SCN)
939964
if err := d.Dump(); err != nil {
940965
e.onError(common.TaskStateDead, err)
941966
}
@@ -971,3 +996,24 @@ func (e *ExtractorOracle) encodeAndSendDumpEntry(entry *common.DumpEntry) error
971996
}
972997
return nil
973998
}
999+
1000+
func (e *ExtractorOracle) LockTablesForSchema(tx *sql.Tx, ctx context.Context) error {
1001+
for _, db := range e.replicateDoDb {
1002+
for _, table := range db.Tables {
1003+
query := fmt.Sprintf("LOCK TABLE %s.%s IN ROW SHARE MODE", db.TableSchema, table.TableName)
1004+
_, err := tx.Exec(query)
1005+
if err != nil {
1006+
return err
1007+
}
1008+
}
1009+
}
1010+
return nil
1011+
}
1012+
1013+
func releaseSchemaSnapshotLock(tx *sql.Tx) {
1014+
// roll back connect
1015+
for err := tx.Rollback(); err != nil; {
1016+
// log roll back failed
1017+
continue
1018+
}
1019+
}

driver/oracle/extractor/log_miner.go

+2-12
Original file line numberDiff line numberDiff line change
@@ -28,16 +28,6 @@ import (
2828
"github.com/thinkeridea/go-extend/exbytes"
2929
)
3030

31-
func (l *LogMinerStream) GetCurrentSnapshotSCN() (int64, error) {
32-
var globalSCN int64
33-
// 获取当前 SCN 号
34-
err := l.oracleDB.LogMinerConn.QueryRowContext(context.TODO(), "SELECT CURRENT_SCN FROM V$DATABASE").Scan(&globalSCN)
35-
if err != nil {
36-
return 0, err
37-
}
38-
return globalSCN, nil
39-
}
40-
4131
type LogFile struct {
4232
Name string
4333
FirstChange int64
@@ -532,7 +522,7 @@ func (e *ExtractorOracle) DataStreamEvents(entriesChannel chan<- *common.EntryCo
532522
e.logger.Debug("start oracle. DataStreamEvents")
533523

534524
if e.LogMinerStream.startScn == 0 {
535-
scn, err := e.LogMinerStream.GetCurrentSnapshotSCN()
525+
scn, err := e.LogMinerStream.oracleDB.GetCurrentSnapshotSCN()
536526
if err != nil {
537527
e.logger.Error("GetCurrentSnapshotSCN", "err", err)
538528
return err
@@ -726,7 +716,7 @@ func (l *LogMinerStream) stopLogMiner() error {
726716
//}
727717

728718
func (l *LogMinerStream) getEndScn() (int64, error) {
729-
latestScn, err := l.GetCurrentSnapshotSCN()
719+
latestScn, err := l.oracleDB.GetCurrentSnapshotSCN()
730720
if err != nil {
731721
l.logger.Error("GetCurrentSnapshotSCN", "err", err)
732722
return 0, err

0 commit comments

Comments
 (0)