Skip to content

Commit

Permalink
Squashed commit of the following:
Browse files Browse the repository at this point in the history
commit 904a630e81059af3c25d5e0488a20ccdcb04f877
Author: Kevin Biju <[email protected]>
Date:   Sun Jan 5 13:28:01 2025 +0530

    fix review feedback pt.1

commit ce43855
Author: Kevin Biju <[email protected]>
Date:   Mon Dec 9 19:20:47 2024 +0530

    didn't actually make _peerdb_match_data JSON

commit 874e3c8
Author: Kevin Biju <[email protected]>
Date:   Sat Nov 30 00:17:30 2024 +0530

    workaround for getting isJSON, fixes

commit ab0959b
Author: Kevin Biju <[email protected]>
Date:   Fri Nov 29 18:55:52 2024 +0530

    [clickhouse] experiment with new JSON type for raw table
  • Loading branch information
heavycrystal committed Jan 5, 2025
1 parent a4d6525 commit 5261711
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 53 deletions.
32 changes: 25 additions & 7 deletions flow/connectors/clickhouse/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared"
)

Expand Down Expand Up @@ -45,20 +46,29 @@ func (c *ClickHouseConnector) checkIfTableExists(ctx context.Context, databaseNa
func (c *ClickHouseConnector) CreateRawTable(ctx context.Context, req *protos.CreateRawTableInput) (*protos.CreateRawTableOutput, error) {
rawTableName := c.getRawTableName(req.FlowJobName)

createRawTableSQL := `CREATE TABLE IF NOT EXISTS %s (
peerdbDataType := "String"
useJSON, err := peerdbenv.PeerDBClickHouseUseJSONType(ctx, req.Env)
if err != nil {
return nil, fmt.Errorf("unable to get fetch status of setting PEERDB_CLICKHOUSE_USE_JSON_TYPE: %w", err)
}
if useJSON {
if err := c.enableExperimentalJSONType(ctx); err != nil {
return nil, err
}
peerdbDataType = "JSON"
}

createRawTableSQL := `CREATE TABLE IF NOT EXISTS %[1]s (
_peerdb_uid UUID,
_peerdb_timestamp Int64,
_peerdb_destination_table_name String,
_peerdb_data String,
_peerdb_data %[2]s,
_peerdb_record_type Int,
_peerdb_match_data String,
_peerdb_match_data %[2]s,
_peerdb_batch_id Int64,
_peerdb_unchanged_toast_columns String
) ENGINE = MergeTree() ORDER BY (_peerdb_batch_id, _peerdb_destination_table_name);`

err := c.execWithLogging(ctx,
fmt.Sprintf(createRawTableSQL, rawTableName))
if err != nil {
if err := c.execWithLogging(ctx, fmt.Sprintf(createRawTableSQL, rawTableName, peerdbDataType)); err != nil {
return nil, fmt.Errorf("unable to create raw table: %w", err)
}
return &protos.CreateRawTableOutput{
Expand Down Expand Up @@ -267,3 +277,11 @@ func (c *ClickHouseConnector) RemoveTableEntriesFromRawTable(

return nil
}

func (c *ClickHouseConnector) enableExperimentalJSONType(ctx context.Context) error {
if err := c.exec(ctx, "SET allow_experimental_json_type = 1"); err != nil {
return fmt.Errorf("unable to enable experimental JSON type: %w", err)
}
c.logger.Info("[clickhouse] enabled experimental JSON type")
return nil
}
122 changes: 77 additions & 45 deletions flow/connectors/clickhouse/normalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,7 @@ func generateCreateTableSQLForNormalizedTable(
}

var engine string
if tableMapping == nil {
engine = fmt.Sprintf("ReplacingMergeTree(`%s`)", versionColName)
} else if tableMapping.Engine == protos.TableEngine_CH_ENGINE_MERGE_TREE {
if tableMapping != nil && tableMapping.Engine == protos.TableEngine_CH_ENGINE_MERGE_TREE {
engine = "MergeTree()"
} else {
engine = fmt.Sprintf("ReplacingMergeTree(`%s`)", versionColName)
Expand Down Expand Up @@ -263,6 +261,17 @@ func (c *ClickHouseConnector) NormalizeRecords(
return model.NormalizeResponse{}, err
}

useJSONMap, err := c.columnsAreJSON(ctx, req.FlowJobName, []string{"_peerdb_data", "_peerdb_match_data"})
if err != nil {
c.logger.Error("[clickhouse] error while checking data types of raw table", "error", err)
return model.NormalizeResponse{}, err
}
if useJSONMap["_peerdb_data"] || useJSONMap["_peerdb_match_data"] {
if err := c.enableExperimentalJSONType(ctx); err != nil {
return model.NormalizeResponse{}, err
}
}

enablePrimaryUpdate, err := peerdbenv.PeerDBEnableClickHousePrimaryUpdate(ctx, req.Env)
if err != nil {
return model.NormalizeResponse{}, err
Expand Down Expand Up @@ -305,7 +314,6 @@ func (c *ClickHouseConnector) NormalizeRecords(
}

for _, tbl := range destinationTableNames {
// SELECT projection FROM raw_table WHERE _peerdb_batch_id > normalize_batch_id AND _peerdb_batch_id <= sync_batch_id
selectQuery := strings.Builder{}
selectQuery.WriteString("SELECT ")

Expand Down Expand Up @@ -362,29 +370,37 @@ func (c *ClickHouseConnector) NormalizeRecords(
}
}

switch clickHouseType {
case "Date32", "Nullable(Date32)":
projection.WriteString(fmt.Sprintf(
"toDate32(parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_data, '%s'),6)) AS `%s`,",
colName, dstColName,
))
jsonExtractStringData := "JSONExtractString(_peerdb_data, '%[1]s')"
jsonExtractStringMatchData := "JSONExtractString(_peerdb_match_data, '%[1]s')"
jsonExtractData := "JSONExtract(_peerdb_data, '%[1]s', '%[2]s')"
jsonExtractMatchData := "JSONExtract(_peerdb_match_data, '%[1]s', '%[2]s')"
if useJSONMap["_peerdb_data"] {
jsonExtractStringData = "_peerdb_data.`%[1]s`"
jsonExtractData = "_peerdb_data.`%[1]s`::%[2]s"
}
if useJSONMap["_peerdb_match_data"] {
jsonExtractStringMatchData = "_peerdb_match_data.`%[1]s`"
jsonExtractMatchData = "_peerdb_match_data.`%[1]s`::%[2]s"
}

writeColumnExtractStringData := func(extractPrefix string, extractSuffix string, args ...any) {
projection.WriteString(fmt.Sprintf(extractPrefix+jsonExtractStringData+extractSuffix, args...))
if enablePrimaryUpdate {
projectionUpdate.WriteString(fmt.Sprintf(
"toDate32(parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_match_data, '%s'),6)) AS `%s`,",
colName, dstColName,
))
projectionUpdate.WriteString(fmt.Sprintf(extractPrefix+jsonExtractStringMatchData+extractSuffix, args...))
}
case "DateTime64(6)", "Nullable(DateTime64(6))":
projection.WriteString(fmt.Sprintf(
"parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_data, '%s'),6) AS `%s`,",
colName, dstColName,
))
}
writeColumnExtractData := func(extractSuffix string, args ...any) {
projection.WriteString(fmt.Sprintf(jsonExtractData+extractSuffix, args...))
if enablePrimaryUpdate {
projectionUpdate.WriteString(fmt.Sprintf(
"parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_match_data, '%s'),6) AS `%s`,",
colName, dstColName,
))
projectionUpdate.WriteString(fmt.Sprintf(jsonExtractMatchData+extractSuffix, args...))
}
}

switch clickHouseType {
case "Date32", "Nullable(Date32)":
writeColumnExtractStringData("toDate32(parseDateTime64BestEffortOrNull(", ",6)) AS `%[2]s`,", colName, dstColName)
case "DateTime64(6)", "Nullable(DateTime64(6))":
writeColumnExtractStringData("parseDateTime64BestEffortOrNull(", ",6) AS `%[2]s`,", colName, dstColName)
default:
projLen := projection.Len()
if colType == qvalue.QValueKindBytes {
Expand All @@ -394,34 +410,15 @@ func (c *ClickHouseConnector) NormalizeRecords(
}
switch format {
case peerdbenv.BinaryFormatRaw:
projection.WriteString(fmt.Sprintf("base64Decode(JSONExtractString(_peerdb_data, '%s')) AS `%s`,", colName, dstColName))
if enablePrimaryUpdate {
projectionUpdate.WriteString(fmt.Sprintf(
"base64Decode(JSONExtractString(_peerdb_match_data, '%s')) AS `%s`,",
colName, dstColName,
))
}
writeColumnExtractStringData("base64Encode(", ") AS `%[2]s`,", colName, dstColName)
case peerdbenv.BinaryFormatHex:
projection.WriteString(fmt.Sprintf("hex(base64Decode(JSONExtractString(_peerdb_data, '%s'))) AS `%s`,",
colName, dstColName))
if enablePrimaryUpdate {
projectionUpdate.WriteString(fmt.Sprintf(
"hex(base64Decode(JSONExtractString(_peerdb_match_data, '%s'))) AS `%s`,",
colName, dstColName,
))
}
writeColumnExtractStringData("hex(base64Encode(", ")) AS `%[2]s`,", colName, dstColName)
}
}

// proceed with default logic if logic above didn't add any sql
if projection.Len() == projLen {
projection.WriteString(fmt.Sprintf("JSONExtract(_peerdb_data, '%s', '%s') AS `%s`,", colName, clickHouseType, dstColName))
if enablePrimaryUpdate {
projectionUpdate.WriteString(fmt.Sprintf(
"JSONExtract(_peerdb_match_data, '%s', '%s') AS `%s`,",
colName, clickHouseType, dstColName,
))
}
writeColumnExtractData(" AS `%[3]s`,", colName, clickHouseType, dstColName)
}
}
}
Expand Down Expand Up @@ -542,6 +539,41 @@ func (c *ClickHouseConnector) getDistinctTableNamesInBatch(
return tableNames, nil
}

func (c *ClickHouseConnector) columnsAreJSON(
ctx context.Context,
flowJobName string,
columnNames []string,
) (map[string]bool, error) {
rawTable := c.getRawTableName(flowJobName)
columnTypes := make(map[string]bool)

query := fmt.Sprintf(
`SELECT name, type='JSON' FROM system.columns
WHERE database=currentDatabase() AND table='%s' AND name IN ('%s')`,
rawTable, strings.Join(columnNames, "','"))

rows, err := c.query(ctx, query)
if err != nil {
return nil, fmt.Errorf("error while querying raw table for column types: %w", err)
}
defer rows.Close()

for rows.Next() {
var columnName string
var isJSON bool
if err := rows.Scan(&columnName, &isJSON); err != nil {
return nil, fmt.Errorf("error while scanning column type: %w", err)
}
columnTypes[columnName] = isJSON
}

if err := rows.Err(); err != nil {
return nil, fmt.Errorf("failed to read rows: %w", err)
}

return columnTypes, nil
}

func (c *ClickHouseConnector) copyAvroStageToDestination(ctx context.Context, flowJobName string, syncBatchID int64) error {
avroSyncMethod := c.avroSyncMethod(flowJobName)
avroFile, err := GetAvroStage(ctx, flowJobName, syncBatchID)
Expand Down
5 changes: 4 additions & 1 deletion flow/connectors/clickhouse/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ func (s *ClickHouseAvroSyncMethod) CopyStageToDestination(ctx context.Context, a
if creds.AWS.SessionToken != "" {
sessionTokenPart = fmt.Sprintf(", '%s'", creds.AWS.SessionToken)
}
query := fmt.Sprintf("INSERT INTO `%s` SELECT * FROM s3('%s','%s','%s'%s, 'Avro')",

// use_structure_from_insertion_table_in_table_functions needed because Avro parser in CH won't accept Avro string into JSON
query := fmt.Sprintf(
`INSERT INTO "%s" SELECT * FROM s3('%s','%s','%s'%s, 'Avro') SETTINGS use_structure_from_insertion_table_in_table_functions=0`,
s.config.DestinationTableIdentifier, avroFileUrl,
creds.AWS.AccessKeyID, creds.AWS.SecretAccessKey, sessionTokenPart)

Expand Down
13 changes: 13 additions & 0 deletions flow/peerdbenv/dynamicconf.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,15 @@ var DynamicSettings = [...]*protos.DynamicSetting{
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
TargetForSetting: protos.DynconfTarget_ALL,
},
{
Name: "PEERDB_CLICKHOUSE_USE_JSON_TYPE",
Description: "Use ClickHouse's experimental JSON type for _peerdb_data in raw table " +
"https://clickhouse.com/docs/en/sql-reference/data-types/newjson",
DefaultValue: "false",
ValueType: protos.DynconfValueType_BOOL,
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_NEW_MIRROR,
TargetForSetting: protos.DynconfTarget_CLICKHOUSE,
},
}

var DynamicIndex = func() map[string]int {
Expand Down Expand Up @@ -471,6 +480,10 @@ func PeerDBClickHouseAWSS3BucketName(ctx context.Context, env map[string]string)
return dynLookup(ctx, env, "PEERDB_CLICKHOUSE_AWS_S3_BUCKET_NAME")
}

func PeerDBClickHouseUseJSONType(ctx context.Context, env map[string]string) (bool, error) {
return dynamicConfBool(ctx, env, "PEERDB_CLICKHOUSE_USE_JSON_TYPE")
}

func PeerDBS3PartSize(ctx context.Context, env map[string]string) (int64, error) {
return dynamicConfSigned[int64](ctx, env, "PEERDB_S3_PART_SIZE")
}
Expand Down
1 change: 1 addition & 0 deletions flow/workflows/setup_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ func (s *SetupFlowExecution) createRawTable(
PeerName: config.DestinationName,
FlowJobName: s.cdcFlowName,
TableNameMapping: s.tableNameMapping,
Env: config.Env,
}

rawTblFuture := workflow.ExecuteActivity(ctx, flowable.CreateRawTable, createRawTblInput)
Expand Down
1 change: 1 addition & 0 deletions protos/flow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ message CreateRawTableInput {
string flow_job_name = 2;
map<string, string> table_name_mapping = 3;
string peer_name = 4;
map<string, string> env = 5;
}

message CreateRawTableOutput { string table_identifier = 1; }
Expand Down

0 comments on commit 5261711

Please sign in to comment.