-
Notifications
You must be signed in to change notification settings - Fork 104
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft: add mysql validations #2594
base: main
Are you sure you want to change the base?
Conversation
Ubuntu seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account. You have signed the CLA already but the status is still pending? Let us recheck it. |
123fb2e
to
4a1fe95
Compare
flow/connectors/mysql/validate.go
Outdated
) | ||
|
||
type MySQLConnector struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MySqlConnector already exists in mysql.go
flow/connectors/mysql/validate.go
Outdated
|
||
// Check binlog_expire_logs_seconds | ||
var expireSeconds int | ||
err := c.conn.QueryRowContext(ctx, "SELECT @@binlog_expire_logs_seconds").Scan(&expireSeconds) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Settings are flavor specific. Fine to return nil when flavor is not MySQL for now
flow/connectors/mysql/validate.go
Outdated
import ( | ||
"context" | ||
"database/sql" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
have been avoiding database/sql
, you can use existing Execute method on MySqlConnector
|
||
func (c *MySQLConnector) CheckSourceTables(ctx context.Context, tableNames []*utils.SchemaTable) error { | ||
if c.conn == nil { | ||
return errors.New("check tables: conn is nil") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MySql connector doesn't maintain a single long lived connection, let Execute manage this
flow/connectors/mysql/validate.go
Outdated
|
||
"github.com/go-mysql-org/go-mysql/mysql" | ||
_ "github.com/go-sql-driver/mysql" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we're using go-mysql-org/go-mysql, no need to mix in go-sql-driver/mysql
flow/connectors/mysql/validate.go
Outdated
query := fmt.Sprintf("SELECT 1 FROM `%s`.`%s` LIMIT 1", parsedTable.Schema, parsedTable.Table) | ||
_, err := c.conn.QueryContext(ctx, query) | ||
if err != nil { | ||
return fmt.Errorf("error checking table %s.%s: %v", parsedTable.Schema, parsedTable.Table, err) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
query := fmt.Sprintf("SELECT 1 FROM `%s`.`%s` LIMIT 1", parsedTable.Schema, parsedTable.Table) | |
_, err := c.conn.QueryContext(ctx, query) | |
if err != nil { | |
return fmt.Errorf("error checking table %s.%s: %v", parsedTable.Schema, parsedTable.Table, err) | |
} | |
if _, err := c.conn.QueryContext(ctx, fmt.Sprintf("SELECT 1 FROM %s LIMIT 1", parsedTable.MySQL())); err != nil { | |
return fmt.Errorf("error checking table %s: %w", parsedTable.MySQL(), err) | |
} |
flow/connectors/mysql/validate.go
Outdated
var masterLogFile string | ||
var masterLogPos int | ||
|
||
err := c.conn.QueryRowContext(ctx, "SHOW MASTER STATUS").Scan(&masterLogFile, &masterLogPos) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this has flavor/versioning considerations, see GetMasterPos
& GetMasterGTIDSet
. Can probably use those methods instead of this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to check binlog is enabled should only need to select @@log_bin
flow/connectors/mysql/validate.go
Outdated
return fmt.Errorf("failed to retrieve binlog_row_metadata: %v", err) | ||
} | ||
if binlogRowMetadata != "FULL" { | ||
return errors.New("binlog_row_metadata must be set to 'FULL' for column exclusion support") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should only be warning, not error. might be best to defer until mirror validation
flow/connectors/mysql/validate.go
Outdated
err = c.conn.QueryRowContext(ctx, "SELECT @@binlog_format").Scan(&binlogFormat) | ||
if err != nil { | ||
return fmt.Errorf("failed to retrieve binlog_format: %v", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
err = c.conn.QueryRowContext(ctx, "SELECT @@binlog_format").Scan(&binlogFormat) | |
if err != nil { | |
return fmt.Errorf("failed to retrieve binlog_format: %v", err) | |
if err := c.conn.QueryRowContext(ctx, "SELECT @@binlog_format").Scan(&binlogFormat); err != nil { | |
return fmt.Errorf("failed to retrieve binlog_format: %w", err) |
been trying to have consistent error scoping, this decision has avoided a couple issues
%w over %v in fmt.Errorf
allows errors.As
/ errors.Is
to pick up the underlying error, which is important for us to get mysql error codes
flow/connectors/mysql/validate.go
Outdated
for _, tableMapping := range cfg.TableMappings { | ||
parsedTable, parseErr := utils.ParseSchemaTable(tableMapping.SourceTableIdentifier) | ||
if parseErr != nil { | ||
return fmt.Errorf("invalid source table identifier: %s", parseErr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return fmt.Errorf("invalid source table identifier: %s", parseErr) | |
return fmt.Errorf("invalid source table identifier: %w", parseErr) |
flow/connectors/mysql/validate.go
Outdated
@@ -1,14 +1,172 @@ | |||
package connmysql | |||
|
|||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
flow/connectors/mysql/validate.go
Outdated
return fmt.Errorf("failed to retrieve binlog_row_image: %v", err) | ||
} | ||
if binlogRowImage != "FULL" { | ||
return errors.New("binlog_row_image must be set to 'FULL' (equivalent to PostgreSQL's REPLICA IDENTITY FULL)") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would prefer to avoid REPLICA IDENTITY FULL
comparisons
flow/connectors/mysql/validate.go
Outdated
} | ||
|
||
var replicationPrivilege string | ||
err := c.conn.QueryRowContext(ctx, "SHOW GRANTS FOR CURRENT_USER()").Scan(&replicationPrivilege) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's more robust to parse this into a string array and then check to avoid matching partial permission names later
e5df246
to
8a15869
Compare
No description provided.