Skip to content

Commit

Permalink
didn't actually make _peerdb_match_data JSON
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Dec 9, 2024
1 parent 874e3c8 commit ce43855
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 9 deletions.
6 changes: 3 additions & 3 deletions flow/connectors/clickhouse/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,13 @@ func (c *ClickHouseConnector) CreateRawTable(ctx context.Context, req *protos.Cr
}
}

createRawTableSQL := `CREATE TABLE IF NOT EXISTS %s (
createRawTableSQL := `CREATE TABLE IF NOT EXISTS %[1]s (
_peerdb_uid UUID,
_peerdb_timestamp Int64,
_peerdb_destination_table_name String,
_peerdb_data %s,
_peerdb_data %[2]s,
_peerdb_record_type Int,
_peerdb_match_data String,
_peerdb_match_data %[2]s,
_peerdb_batch_id Int64,
_peerdb_unchanged_toast_columns String
) ENGINE = MergeTree() ORDER BY (_peerdb_batch_id, _peerdb_destination_table_name);`
Expand Down
20 changes: 14 additions & 6 deletions flow/connectors/clickhouse/normalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,12 +261,17 @@ func (c *ClickHouseConnector) NormalizeRecords(
return nil, err
}

useJSON, err := c.peerdbDataTypeIsJSON(ctx, req.FlowJobName)
useJSONForData, err := c.columnIsJSON(ctx, req.FlowJobName, "_peerdb_data")
if err != nil {
c.logger.Error("[clickhouse] error while checking data type of _peerdb_data", "error", err)
return nil, err
}
if useJSON {
useJSONForMatchData, err := c.columnIsJSON(ctx, req.FlowJobName, "_peerdb_match_data")
if err != nil {
c.logger.Error("[clickhouse] error while checking data type of _peerdb_match_data", "error", err)
return nil, err
}
if useJSONForData || useJSONForMatchData {
if err := c.enableExperimentalJSONType(ctx); err != nil {
return nil, err
}
Expand Down Expand Up @@ -375,10 +380,12 @@ func (c *ClickHouseConnector) NormalizeRecords(
jsonExtractStringMatchData := "JSONExtractString(_peerdb_match_data, '%s')"
jsonExtractData := "JSONExtract(_peerdb_data, '%s', '%s')"
jsonExtractMatchData := "JSONExtract(_peerdb_match_data, '%s', '%s')"
if useJSON {
if useJSONForData {
jsonExtractStringData = "_peerdb_data.`%s`"
jsonExtractStringMatchData = "_peerdb_match_data.`%s`"
jsonExtractData = "_peerdb_data.`%s`::%s"
}
if useJSONForMatchData {
jsonExtractStringMatchData = "_peerdb_match_data.`%s`"
jsonExtractMatchData = "_peerdb_match_data.`%s`::%s"
}

Expand Down Expand Up @@ -534,16 +541,17 @@ func (c *ClickHouseConnector) getDistinctTableNamesInBatch(
return tableNames, nil
}

func (c *ClickHouseConnector) peerdbDataTypeIsJSON(
func (c *ClickHouseConnector) columnIsJSON(
ctx context.Context,
flowJobName string,
columnName string,
) (bool, error) {
rawTable := c.getRawTableName(flowJobName)

var isJSON bool
if err := c.queryRow(ctx,
`SELECT type='JSON' FROM system.columns
WHERE database=currentDatabase() AND table=? AND name='_peerdb_data'`, rawTable).Scan(&isJSON); err != nil {
WHERE database=currentDatabase() AND table=? AND name=?`, rawTable, columnName).Scan(&isJSON); err != nil {
return false, fmt.Errorf("error while querying raw table for data type: %w", err)
}
return isJSON, nil
Expand Down

0 comments on commit ce43855

Please sign in to comment.