Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: add mysql validations #2594

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 171 additions & 0 deletions flow/connectors/mysql/validate.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,172 @@
package connmysql


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change

import (
"context"
"database/sql"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have been avoiding database/sql, you can use existing Execute method on MySqlConnector

"errors"
"fmt"
"strings"

"github.com/go-mysql-org/go-mysql/mysql"
_ "github.com/go-sql-driver/mysql"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we're using go-mysql-org/go-mysql, no need to mix in go-sql-driver/mysql


"github.com/PeerDB-io/peerdb/flow/connectors/utils"
"github.com/PeerDB-io/peerdb/flow/generated/protos"
"github.com/PeerDB-io/peerdb/flow/shared"
)

type MySQLConnector struct {
Copy link
Contributor

@serprex serprex Feb 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MySqlConnector already exists in mysql.go

conn *sql.DB
config *MySQLConfig
}

func (c *MySQLConnector) CheckSourceTables(ctx context.Context, tableNames []*utils.SchemaTable) error {
if c.conn == nil {
return errors.New("check tables: conn is nil")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MySql connector doesn't maintain a single long lived connection, let Execute manage this

}

for _, parsedTable := range tableNames {
query := fmt.Sprintf("SELECT 1 FROM `%s`.`%s` LIMIT 1", parsedTable.Schema, parsedTable.Table)
_, err := c.conn.QueryContext(ctx, query)
if err != nil {
return fmt.Errorf("error checking table %s.%s: %v", parsedTable.Schema, parsedTable.Table, err)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
query := fmt.Sprintf("SELECT 1 FROM `%s`.`%s` LIMIT 1", parsedTable.Schema, parsedTable.Table)
_, err := c.conn.QueryContext(ctx, query)
if err != nil {
return fmt.Errorf("error checking table %s.%s: %v", parsedTable.Schema, parsedTable.Table, err)
}
if _, err := c.conn.QueryContext(ctx, fmt.Sprintf("SELECT 1 FROM %s LIMIT 1", parsedTable.MySQL())); err != nil {
return fmt.Errorf("error checking table %s: %w", parsedTable.MySQL(), err)
}

}
return nil
}

func (c *MySQLConnector) CheckReplicationPermissions(ctx context.Context) error {
if c.conn == nil {
return errors.New("check replication permissions: conn is nil")
}

var replicationPrivilege string
err := c.conn.QueryRowContext(ctx, "SHOW GRANTS FOR CURRENT_USER()").Scan(&replicationPrivilege)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's more robust to parse this into a string array and then check to avoid matching partial permission names later

if err != nil {
return fmt.Errorf("failed to check replication privileges: %v", err)
}

if !strings.Contains(replicationPrivilege, "REPLICATION SLAVE") && !strings.Contains(replicationPrivilege, "REPLICATION CLIENT") {
return errors.New("MySQL user does not have replication privileges")
}

return nil
}

func (c *MySQLConnector) CheckReplicationConnectivity(ctx context.Context) error {
if c.conn == nil {
return errors.New("check replication connectivity: conn is nil")
}

var masterLogFile string
var masterLogPos int

err := c.conn.QueryRowContext(ctx, "SHOW MASTER STATUS").Scan(&masterLogFile, &masterLogPos)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this has flavor/versioning considerations, see GetMasterPos & GetMasterGTIDSet. Can probably use those methods instead of this

Copy link
Contributor

@serprex serprex Feb 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to check binlog is enabled should only need to select @@log_bin

if err != nil {
// Handle case where SHOW MASTER STATUS returns no rows (binary logging disabled)
if errors.Is(err, sql.ErrNoRows) {
return errors.New("binary logging is disabled on this MySQL server")
}
return fmt.Errorf("failed to check replication status: %v", err)
}

// Additional validation: Check if the values are valid
if masterLogFile == "" || masterLogPos <= 0 {
return errors.New("invalid replication status: missing log file or position")
}

return nil
}

func (c *MySQLConnector) CheckBinlogSettings(ctx context.Context) error {
if c.conn == nil {
return errors.New("check binlog settings: conn is nil")
}

// Check binlog_expire_logs_seconds
var expireSeconds int
err := c.conn.QueryRowContext(ctx, "SELECT @@binlog_expire_logs_seconds").Scan(&expireSeconds)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Settings are flavor specific. Fine to return nil when flavor is not MySQL for now

if err != nil {
return fmt.Errorf("failed to retrieve binlog_expire_logs_seconds: %v", err)
}
if expireSeconds <= 86400 {
return errors.New("binlog_expire_logs_seconds is too low. Must be greater than 1 day")
}

// Check binlog_format
var binlogFormat string
err = c.conn.QueryRowContext(ctx, "SELECT @@binlog_format").Scan(&binlogFormat)
if err != nil {
return fmt.Errorf("failed to retrieve binlog_format: %v", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
err = c.conn.QueryRowContext(ctx, "SELECT @@binlog_format").Scan(&binlogFormat)
if err != nil {
return fmt.Errorf("failed to retrieve binlog_format: %v", err)
if err := c.conn.QueryRowContext(ctx, "SELECT @@binlog_format").Scan(&binlogFormat); err != nil {
return fmt.Errorf("failed to retrieve binlog_format: %w", err)

been trying to have consistent error scoping, this decision has avoided a couple issues

%w over %v in fmt.Errorf allows errors.As / errors.Is to pick up the underlying error, which is important for us to get mysql error codes

}
if binlogFormat != "ROW" {
return errors.New("binlog_format must be set to 'ROW'")
}

// Check binlog_row_metadata
var binlogRowMetadata string
err = c.conn.QueryRowContext(ctx, "SELECT @@binlog_row_metadata").Scan(&binlogRowMetadata)
if err != nil {
return fmt.Errorf("failed to retrieve binlog_row_metadata: %v", err)
}
if binlogRowMetadata != "FULL" {
return errors.New("binlog_row_metadata must be set to 'FULL' for column exclusion support")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should only be warning, not error. might be best to defer until mirror validation

}

// Check binlog_row_image
var binlogRowImage string
err = c.conn.QueryRowContext(ctx, "SELECT @@binlog_row_image").Scan(&binlogRowImage)
if err != nil {
return fmt.Errorf("failed to retrieve binlog_row_image: %v", err)
}
if binlogRowImage != "FULL" {
return errors.New("binlog_row_image must be set to 'FULL' (equivalent to PostgreSQL's REPLICA IDENTITY FULL)")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would prefer to avoid REPLICA IDENTITY FULL comparisons

}

// Check binlog_row_value_options
var binlogRowValueOptions string
err = c.conn.QueryRowContext(ctx, "SELECT @@binlog_row_value_options").Scan(&binlogRowValueOptions)
if err != nil {
return fmt.Errorf("failed to retrieve binlog_row_value_options: %v", err)
}
if binlogRowValueOptions != "" {
return errors.New("binlog_row_value_options must be disabled to prevent JSON change deltas")
}

return nil
}

func (c *MySQLConnector) ValidateMirrorSource(ctx context.Context, cfg *protos.FlowConnectionConfigs) error {
sourceTables := make([]*utils.SchemaTable, 0, len(cfg.TableMappings))
for _, tableMapping := range cfg.TableMappings {
parsedTable, parseErr := utils.ParseSchemaTable(tableMapping.SourceTableIdentifier)
if parseErr != nil {
return fmt.Errorf("invalid source table identifier: %s", parseErr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return fmt.Errorf("invalid source table identifier: %s", parseErr)
return fmt.Errorf("invalid source table identifier: %w", parseErr)

}
sourceTables = append(sourceTables, parsedTable)
}

if err := c.CheckReplicationConnectivity(ctx); err != nil {
return fmt.Errorf("unable to establish replication connectivity: %v", err)
}

if err := c.CheckReplicationPermissions(ctx); err != nil {
return fmt.Errorf("failed to check replication permissions: %v", err)
}

if err := c.CheckSourceTables(ctx, sourceTables); err != nil {
return fmt.Errorf("provided source tables invalidated: %v", err)
}

if err := c.CheckBinlogSettings(ctx); err != nil {
return fmt.Errorf("binlog configuration error: %v", err)
}

return nil
}


func (c *MySqlConnector) ValidateCheck(ctx context.Context) error {
if _, err := c.Execute(ctx, "select @@gtid_mode"); err != nil {
var mErr *mysql.MyError
Expand All @@ -24,6 +182,19 @@ func (c *MySqlConnector) ValidateCheck(ctx context.Context) error {
} else if c.config.Flavor == protos.MySqlFlavor_MYSQL_UNKNOWN {
return errors.New("flavor is set to unknown")
}

if err := c.CheckReplicationConnectivity(ctx); err != nil {
return fmt.Errorf("unable to establish replication connectivity: %v", err)
}

if err := c.CheckReplicationPermissions(ctx); err != nil {
return fmt.Errorf("failed to check replication permissions: %v", err)
}

if err := c.CheckBinlogSettings(ctx); err != nil {
return fmt.Errorf("binlog configuration error: %v", err)
}

return nil
}

Expand Down
Loading