Skip to content

[Feature-4221-1.2][cdc] CDCSOURCE supports kafka source #4222

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: 1.2
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,11 @@ public String getSchemaFieldName() {
return "schema";
}

public Map<String, Map<String, String>> parseMetaDataConfigs() {
Map<String, Map<String, String>> allConfigMap = new HashMap<>();
for (String schema : getSchemaList()) {
String url = generateUrl(schema);
allConfigMap.put(schema, parseMetaDataSingleConfig(url));
}
return allConfigMap;
public Map<String, String> generateMetaDataConfig(String schema) {
return parseMetaDataSingleConfig(generateUrl(schema));
}

public Map<String, String> parseMetaDataSingleConfig(String url) {
protected Map<String, String> parseMetaDataSingleConfig(String url) {
Map<String, String> configMap = new HashMap<>();
configMap.put(ClientConstant.METADATA_NAME, url);
configMap.put(ClientConstant.METADATA_URL, url);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.dinky.cdc;

import org.dinky.data.exception.SplitTableException;
import org.dinky.data.model.FlinkCDCConfig;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
Expand All @@ -40,11 +39,7 @@ public interface CDCBuilder {

List<String> getTableList();

Map<String, Map<String, String>> parseMetaDataConfigs();
Map<String, String> generateMetaDataConfig(String schema);

String getSchemaFieldName();

default Map<String, String> parseMetaDataConfig() {
throw new SplitTableException("此数据源并未实现分库分表");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.dinky.cdc;

import org.dinky.assertion.Asserts;
import org.dinky.cdc.kafka.KafkaSourceBuilder;
import org.dinky.cdc.mysql.MysqlCDCBuilder;
import org.dinky.cdc.oracle.OracleCDCBuilder;
import org.dinky.cdc.postgres.PostgresCDCBuilder;
Expand All @@ -41,6 +42,7 @@ private CDCBuilderFactory() {}
.put(OracleCDCBuilder.KEY_WORD, OracleCDCBuilder::new)
.put(PostgresCDCBuilder.KEY_WORD, PostgresCDCBuilder::new)
.put(SqlServerCDCBuilder.KEY_WORD, SqlServerCDCBuilder::new)
.put(KafkaSourceBuilder.KEY_WORD, KafkaSourceBuilder::new)
.build();

public static CDCBuilder buildCDCBuilder(FlinkCDCConfig config) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.cdc.kafka;

import org.dinky.assertion.Asserts;
import org.dinky.cdc.AbstractCDCBuilder;
import org.dinky.cdc.CDCBuilder;
import org.dinky.data.model.FlinkCDCConfig;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Map;

public class KafkaSourceBuilder extends AbstractCDCBuilder {

public static final String KEY_WORD = "kafka";

public KafkaSourceBuilder() {}

public KafkaSourceBuilder(FlinkCDCConfig config) {
super(config);
}

@Override
public String getHandle() {
return KEY_WORD;
}

@Override
public CDCBuilder create(FlinkCDCConfig config) {
return new KafkaSourceBuilder(config);
}

@Override
public DataStreamSource<String> build(StreamExecutionEnvironment env) {
Map<String, String> source = config.getSource();
String brokers = source.get("properties.bootstrap.servers");
String topic = source.get("topic");
String groupId = source.get("properties.group.id");
String scanBoundedSpecificOffsets = source.get("scan.bounded.specific-offsets");
String scanBoundedTimestampMillis = source.get("scan.bounded.timestamp-millis");

final org.apache.flink.connector.kafka.source.KafkaSourceBuilder<String> sourceBuilder =
KafkaSource.<String>builder()
.setBootstrapServers(brokers)
.setValueOnlyDeserializer(new SimpleStringSchema());

if (Asserts.isNotNullString(topic)) {
sourceBuilder.setTopics(topic);
}

if (Asserts.isNotNullString(groupId)) {
sourceBuilder.setGroupId(groupId);
}

if (Asserts.isNotNullString(config.getStartupMode())) {
switch (config.getStartupMode().toLowerCase()) {
case "earliest-offset":
sourceBuilder.setStartingOffsets(OffsetsInitializer.earliest());
break;
case "latest-offset":
sourceBuilder.setStartingOffsets(OffsetsInitializer.latest());
break;
case "group-offsets":
sourceBuilder.setStartingOffsets(OffsetsInitializer.committedOffsets());
break;
/*If specific-offsets is specified, another config option scan.bounded.specific-offsets
is required to specify specific bounded offsets for each partition, e.g. an option value
partition:0,offset:42;partition:1,offset:300 indicates offset 42 for partition 0 and offset 300
for partition 1. If an offset for a partition is not provided it will not consume from that partition.*/
/*case "specific-offset":
if (Asserts.isNotNullString(scanBoundedSpecificOffsets)) {
sourceBuilder.setStartingOffsets(OffsetsInitializer.offsets(scanBoundedSpecificOffsets));
} else {
throw new RuntimeException("No specific offset parameter specified.");
}
break;*/
case "timestamp":
if (Asserts.isNotNullString(scanBoundedTimestampMillis)) {
sourceBuilder.setStartingOffsets(
OffsetsInitializer.timestamp(Long.valueOf(scanBoundedTimestampMillis)));
} else {
throw new RuntimeException("No timestamp parameter specified.");
}
break;
default:
}
} else {
sourceBuilder.setStartingOffsets(OffsetsInitializer.latest());
}

return env.fromSource(sourceBuilder.build(), WatermarkStrategy.noWatermarks(), "Kafka Source");
}

@Override
public String getSchemaFieldName() {
return "db";
}

@Override
public String getSchema() {
return config.getDatabase();
}

@Override
protected String getMetadataType() {
return null;
}

@Override
protected String generateUrl(String schema) {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -213,14 +213,6 @@ public DataStreamSource<String> build(StreamExecutionEnvironment env) {
return env.fromSource(sourceBuilder.build(), WatermarkStrategy.noWatermarks(), "MySQL CDC Source");
}

@Override
public Map<String, String> parseMetaDataConfig() {
String url = String.format(
"jdbc:mysql://%s:%d/%s",
config.getHostname(), config.getPort(), composeJdbcProperties(config.getJdbc()));
return parseMetaDataSingleConfig(url);
}

@Override
public String getSchemaFieldName() {
return "db";
Expand Down
13 changes: 6 additions & 7 deletions dinky-common/src/main/java/org/dinky/data/enums/TableType.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,16 @@

package org.dinky.data.enums;

/** 分库分表的类型 */
/** The types of database and table division */
public enum TableType {

/** 分库分表 */
/** Separation of databases and tables */
SPLIT_DATABASE_AND_TABLE,
/** 分表单库 */
/** Database division with single table */
SPLIT_DATABASE_AND_SINGLE_TABLE,
/** 单库分表 */
SINGLE_DATABASE_AND_SPLIT_TABLE
/** 单库单表 */
,
/** Single database with table division */
SINGLE_DATABASE_AND_SPLIT_TABLE,
/** Single database and single table */
SINGLE_DATABASE_AND_TABLE;

public static TableType type(boolean splitDatabase, boolean splitTable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.dinky.data.model;

import org.dinky.assertion.Asserts;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -176,6 +178,10 @@ public String getSinkConfigurationString() {
.collect(Collectors.joining(",\n"));
}

public boolean isAutoCreateSchemaAndTables() {
return Asserts.isEqualsIgnoreCase(sink.get(FlinkCDCConfig.AUTO_CREATE), "true");
}

public String getType() {
return type;
}
Expand Down
Loading
Loading