Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[clickhouse] experiment with new JSON type for raw table #2304

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 25 additions & 7 deletions flow/connectors/clickhouse/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared"
)

Expand Down Expand Up @@ -45,20 +46,29 @@ func (c *ClickHouseConnector) checkIfTableExists(ctx context.Context, databaseNa
func (c *ClickHouseConnector) CreateRawTable(ctx context.Context, req *protos.CreateRawTableInput) (*protos.CreateRawTableOutput, error) {
rawTableName := c.getRawTableName(req.FlowJobName)

createRawTableSQL := `CREATE TABLE IF NOT EXISTS %s (
peerdbDataType := "String"
useJSON, err := peerdbenv.PeerDBClickHouseUseJSONType(ctx, req.Env)
if err != nil {
return nil, fmt.Errorf("unable to get fetch status of setting PEERDB_CLICKHOUSE_USE_JSON_TYPE: %w", err)
}
if useJSON {
if err := c.enableExperimentalJSONType(ctx); err != nil {
return nil, err
}
peerdbDataType = "JSON"
}

createRawTableSQL := `CREATE TABLE IF NOT EXISTS %[1]s (
_peerdb_uid UUID,
_peerdb_timestamp Int64,
_peerdb_destination_table_name String,
_peerdb_data String,
_peerdb_data %[2]s,
_peerdb_record_type Int,
_peerdb_match_data String,
_peerdb_match_data %[2]s,
_peerdb_batch_id Int64,
_peerdb_unchanged_toast_columns String
) ENGINE = MergeTree() ORDER BY (_peerdb_batch_id, _peerdb_destination_table_name);`

err := c.execWithLogging(ctx,
fmt.Sprintf(createRawTableSQL, rawTableName))
if err != nil {
if err := c.execWithLogging(ctx, fmt.Sprintf(createRawTableSQL, rawTableName, peerdbDataType)); err != nil {
return nil, fmt.Errorf("unable to create raw table: %w", err)
}
return &protos.CreateRawTableOutput{
Expand Down Expand Up @@ -267,3 +277,11 @@ func (c *ClickHouseConnector) RemoveTableEntriesFromRawTable(

return nil
}

func (c *ClickHouseConnector) enableExperimentalJSONType(ctx context.Context) error {
if err := c.exec(ctx, "SET allow_experimental_json_type = 1"); err != nil {
return fmt.Errorf("unable to enable experimental JSON type: %w", err)
}
c.logger.Info("[clickhouse] enabled experimental JSON type")
return nil
}
122 changes: 77 additions & 45 deletions flow/connectors/clickhouse/normalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,7 @@ func generateCreateTableSQLForNormalizedTable(
}

var engine string
if tableMapping == nil {
engine = fmt.Sprintf("ReplacingMergeTree(`%s`)", versionColName)
} else if tableMapping.Engine == protos.TableEngine_CH_ENGINE_MERGE_TREE {
if tableMapping != nil && tableMapping.Engine == protos.TableEngine_CH_ENGINE_MERGE_TREE {
engine = "MergeTree()"
} else {
engine = fmt.Sprintf("ReplacingMergeTree(`%s`)", versionColName)
Expand Down Expand Up @@ -263,6 +261,17 @@ func (c *ClickHouseConnector) NormalizeRecords(
return model.NormalizeResponse{}, err
}

useJSONMap, err := c.columnsAreJSON(ctx, req.FlowJobName, []string{"_peerdb_data", "_peerdb_match_data"})
if err != nil {
c.logger.Error("[clickhouse] error while checking data types of raw table", "error", err)
return model.NormalizeResponse{}, err
}
if useJSONMap["_peerdb_data"] || useJSONMap["_peerdb_match_data"] {
if err := c.enableExperimentalJSONType(ctx); err != nil {
return model.NormalizeResponse{}, err
}
}

enablePrimaryUpdate, err := peerdbenv.PeerDBEnableClickHousePrimaryUpdate(ctx, req.Env)
if err != nil {
return model.NormalizeResponse{}, err
Expand Down Expand Up @@ -305,7 +314,6 @@ func (c *ClickHouseConnector) NormalizeRecords(
}

for _, tbl := range destinationTableNames {
// SELECT projection FROM raw_table WHERE _peerdb_batch_id > normalize_batch_id AND _peerdb_batch_id <= sync_batch_id
selectQuery := strings.Builder{}
selectQuery.WriteString("SELECT ")

Expand Down Expand Up @@ -362,29 +370,37 @@ func (c *ClickHouseConnector) NormalizeRecords(
}
}

switch clickHouseType {
case "Date32", "Nullable(Date32)":
projection.WriteString(fmt.Sprintf(
"toDate32(parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_data, '%s'),6)) AS `%s`,",
colName, dstColName,
))
jsonExtractStringData := "JSONExtractString(_peerdb_data, '%[1]s')"
jsonExtractStringMatchData := "JSONExtractString(_peerdb_match_data, '%[1]s')"
jsonExtractData := "JSONExtract(_peerdb_data, '%[1]s', '%[2]s')"
jsonExtractMatchData := "JSONExtract(_peerdb_match_data, '%[1]s', '%[2]s')"
if useJSONMap["_peerdb_data"] {
jsonExtractStringData = "_peerdb_data.`%[1]s`"
jsonExtractData = "_peerdb_data.`%[1]s`::%[2]s"
}
if useJSONMap["_peerdb_match_data"] {
jsonExtractStringMatchData = "_peerdb_match_data.`%[1]s`"
jsonExtractMatchData = "_peerdb_match_data.`%[1]s`::%[2]s"
}

writeColumnExtractStringData := func(extractPrefix string, extractSuffix string, args ...any) {
projection.WriteString(fmt.Sprintf(extractPrefix+jsonExtractStringData+extractSuffix, args...))
if enablePrimaryUpdate {
projectionUpdate.WriteString(fmt.Sprintf(
"toDate32(parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_match_data, '%s'),6)) AS `%s`,",
colName, dstColName,
))
projectionUpdate.WriteString(fmt.Sprintf(extractPrefix+jsonExtractStringMatchData+extractSuffix, args...))
}
case "DateTime64(6)", "Nullable(DateTime64(6))":
projection.WriteString(fmt.Sprintf(
"parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_data, '%s'),6) AS `%s`,",
colName, dstColName,
))
}
writeColumnExtractData := func(extractSuffix string, args ...any) {
projection.WriteString(fmt.Sprintf(jsonExtractData+extractSuffix, args...))
if enablePrimaryUpdate {
projectionUpdate.WriteString(fmt.Sprintf(
"parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_match_data, '%s'),6) AS `%s`,",
colName, dstColName,
))
projectionUpdate.WriteString(fmt.Sprintf(jsonExtractMatchData+extractSuffix, args...))
}
}

switch clickHouseType {
case "Date32", "Nullable(Date32)":
writeColumnExtractStringData("toDate32(parseDateTime64BestEffortOrNull(", ",6)) AS `%[2]s`,", colName, dstColName)
case "DateTime64(6)", "Nullable(DateTime64(6))":
writeColumnExtractStringData("parseDateTime64BestEffortOrNull(", ",6) AS `%[2]s`,", colName, dstColName)
default:
projLen := projection.Len()
if colType == qvalue.QValueKindBytes {
Expand All @@ -394,34 +410,15 @@ func (c *ClickHouseConnector) NormalizeRecords(
}
switch format {
case peerdbenv.BinaryFormatRaw:
projection.WriteString(fmt.Sprintf("base64Decode(JSONExtractString(_peerdb_data, '%s')) AS `%s`,", colName, dstColName))
if enablePrimaryUpdate {
projectionUpdate.WriteString(fmt.Sprintf(
"base64Decode(JSONExtractString(_peerdb_match_data, '%s')) AS `%s`,",
colName, dstColName,
))
}
writeColumnExtractStringData("base64Encode(", ") AS `%[2]s`,", colName, dstColName)
case peerdbenv.BinaryFormatHex:
projection.WriteString(fmt.Sprintf("hex(base64Decode(JSONExtractString(_peerdb_data, '%s'))) AS `%s`,",
colName, dstColName))
if enablePrimaryUpdate {
projectionUpdate.WriteString(fmt.Sprintf(
"hex(base64Decode(JSONExtractString(_peerdb_match_data, '%s'))) AS `%s`,",
colName, dstColName,
))
}
writeColumnExtractStringData("hex(base64Encode(", ")) AS `%[2]s`,", colName, dstColName)
}
}

// proceed with default logic if logic above didn't add any sql
if projection.Len() == projLen {
projection.WriteString(fmt.Sprintf("JSONExtract(_peerdb_data, '%s', '%s') AS `%s`,", colName, clickHouseType, dstColName))
if enablePrimaryUpdate {
projectionUpdate.WriteString(fmt.Sprintf(
"JSONExtract(_peerdb_match_data, '%s', '%s') AS `%s`,",
colName, clickHouseType, dstColName,
))
}
writeColumnExtractData(" AS `%[3]s`,", colName, clickHouseType, dstColName)
}
}
}
Expand Down Expand Up @@ -542,6 +539,41 @@ func (c *ClickHouseConnector) getDistinctTableNamesInBatch(
return tableNames, nil
}

func (c *ClickHouseConnector) columnsAreJSON(
ctx context.Context,
flowJobName string,
columnNames []string,
) (map[string]bool, error) {
rawTable := c.getRawTableName(flowJobName)
columnTypes := make(map[string]bool)

query := fmt.Sprintf(
`SELECT name, type='JSON' FROM system.columns
WHERE database=currentDatabase() AND table='%s' AND name IN ('%s')`,
rawTable, strings.Join(columnNames, "','"))

rows, err := c.query(ctx, query)
if err != nil {
return nil, fmt.Errorf("error while querying raw table for column types: %w", err)
}
defer rows.Close()

for rows.Next() {
var columnName string
var isJSON bool
if err := rows.Scan(&columnName, &isJSON); err != nil {
return nil, fmt.Errorf("error while scanning column type: %w", err)
}
columnTypes[columnName] = isJSON
}

if err := rows.Err(); err != nil {
return nil, fmt.Errorf("failed to read rows: %w", err)
}

return columnTypes, nil
}

func (c *ClickHouseConnector) copyAvroStageToDestination(ctx context.Context, flowJobName string, syncBatchID int64) error {
avroSyncMethod := c.avroSyncMethod(flowJobName)
avroFile, err := GetAvroStage(ctx, flowJobName, syncBatchID)
Expand Down
5 changes: 4 additions & 1 deletion flow/connectors/clickhouse/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ func (s *ClickHouseAvroSyncMethod) CopyStageToDestination(ctx context.Context, a
if creds.AWS.SessionToken != "" {
sessionTokenPart = fmt.Sprintf(", '%s'", creds.AWS.SessionToken)
}
query := fmt.Sprintf("INSERT INTO `%s` SELECT * FROM s3('%s','%s','%s'%s, 'Avro')",

// use_structure_from_insertion_table_in_table_functions needed because Avro parser in CH won't accept Avro string into JSON
query := fmt.Sprintf(
`INSERT INTO "%s" SELECT * FROM s3('%s','%s','%s'%s, 'Avro') SETTINGS use_structure_from_insertion_table_in_table_functions=0`,
s.config.DestinationTableIdentifier, avroFileUrl,
creds.AWS.AccessKeyID, creds.AWS.SecretAccessKey, sessionTokenPart)

Expand Down
13 changes: 13 additions & 0 deletions flow/peerdbenv/dynamicconf.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,15 @@ var DynamicSettings = [...]*protos.DynamicSetting{
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
TargetForSetting: protos.DynconfTarget_ALL,
},
{
Name: "PEERDB_CLICKHOUSE_USE_JSON_TYPE",
Description: "Use ClickHouse's experimental JSON type for _peerdb_data in raw table " +
"https://clickhouse.com/docs/en/sql-reference/data-types/newjson",
DefaultValue: "false",
ValueType: protos.DynconfValueType_BOOL,
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_NEW_MIRROR,
TargetForSetting: protos.DynconfTarget_CLICKHOUSE,
},
}

var DynamicIndex = func() map[string]int {
Expand Down Expand Up @@ -471,6 +480,10 @@ func PeerDBClickHouseAWSS3BucketName(ctx context.Context, env map[string]string)
return dynLookup(ctx, env, "PEERDB_CLICKHOUSE_AWS_S3_BUCKET_NAME")
}

func PeerDBClickHouseUseJSONType(ctx context.Context, env map[string]string) (bool, error) {
return dynamicConfBool(ctx, env, "PEERDB_CLICKHOUSE_USE_JSON_TYPE")
}

func PeerDBS3PartSize(ctx context.Context, env map[string]string) (int64, error) {
return dynamicConfSigned[int64](ctx, env, "PEERDB_S3_PART_SIZE")
}
Expand Down
1 change: 1 addition & 0 deletions flow/workflows/setup_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ func (s *SetupFlowExecution) createRawTable(
PeerName: config.DestinationName,
FlowJobName: s.cdcFlowName,
TableNameMapping: s.tableNameMapping,
Env: config.Env,
}

rawTblFuture := workflow.ExecuteActivity(ctx, flowable.CreateRawTable, createRawTblInput)
Expand Down
1 change: 1 addition & 0 deletions protos/flow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ message CreateRawTableInput {
string flow_job_name = 2;
map<string, string> table_name_mapping = 3;
string peer_name = 4;
map<string, string> env = 5;
}

message CreateRawTableOutput { string table_identifier = 1; }
Expand Down
Loading