diff --git a/src/main/java/io/confluent/connect/jdbc/sink/JdbcDbWriter.java b/src/main/java/io/confluent/connect/jdbc/sink/JdbcDbWriter.java index d0af7d8d9..cda6a36ea 100644 --- a/src/main/java/io/confluent/connect/jdbc/sink/JdbcDbWriter.java +++ b/src/main/java/io/confluent/connect/jdbc/sink/JdbcDbWriter.java @@ -17,6 +17,7 @@ import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.header.Header; import java.sql.Connection; import java.sql.SQLException; @@ -31,6 +32,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static io.confluent.connect.jdbc.sink.JdbcSinkConfig.TABLE_NAME_FORMAT; +import static io.confluent.connect.jdbc.sink.JdbcSinkConfig.TABLE_NAME_FORMAT_RECORD_HEADER; + public class JdbcDbWriter { private static final Logger log = LoggerFactory.getLogger(JdbcDbWriter.class); @@ -68,7 +72,7 @@ void write(final Collection records) try { final Map bufferByTable = new HashMap<>(); for (SinkRecord record : records) { - final TableId tableId = destinationTable(record.topic(), schemaName, catalogName); + final TableId tableId = determineTableId(record, schemaName, catalogName); BufferedRecords buffer = bufferByTable.get(tableId); if (buffer == null) { buffer = new BufferedRecords(config, tableId, dbDialect, dbStructure, connection); @@ -140,4 +144,34 @@ private Optional getCatalogSafe(Connection connection) { return Optional.empty(); } } + + private TableId determineTableId(SinkRecord sinkRecord, String schemaName, String catalogName) { + // Only try to get table name from header if config.tableNameFormat is __RECORD_HEADER__ + if (TABLE_NAME_FORMAT_RECORD_HEADER.equals(config.tableNameFormat)) { + String headerTableName = extractTableNameFormatFromHeader(sinkRecord); + if (headerTableName != null && !headerTableName.trim().isEmpty()) { + log.debug("Using table name from header: {} for record from topic: {}", + headerTableName, sinkRecord.topic()); + return new TableId(catalogName, schemaName, headerTableName.trim()); + } else { + throw new ConnectException(String.format( + "Header '%s' is not set or empty for record from topic '%s'. " + + "Please ensure the header is set correctly.", + TABLE_NAME_FORMAT, sinkRecord.topic() + )); + } + } + + // Fall back to default behavior (topic-based table naming) + return destinationTable(sinkRecord.topic(), schemaName, catalogName); + } + + private String extractTableNameFormatFromHeader(SinkRecord sinkRecord) { + Header header = sinkRecord.headers().lastWithName(TABLE_NAME_FORMAT); + log.debug("Extracting table name format from header: {}", header); + if (header != null && header.value() != null) { + return header.value().toString(); + } + return null; + } } diff --git a/src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkConfig.java b/src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkConfig.java index 282f0945e..ecfd628d9 100644 --- a/src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkConfig.java +++ b/src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkConfig.java @@ -126,8 +126,16 @@ public enum TimestampPrecisionMode { "A format string for the destination table name, which may contain '${topic}' as a " + "placeholder for the originating topic name.\n" + "For example, ``kafka_${topic}`` for the topic 'orders' will map to the table name " - + "'kafka_orders'."; + + "'kafka_orders'.\n\n" + + "Special value: If set to ``__RECORD_HEADER__``, the table name will be dynamically " + + "determined from the Kafka message headers. In this mode, the connector will look for " + + "a header key ('table.name.format')" + + "that contains the actual table name. This allows routing messages from a single topic " + + "to different tables based on the message headers.\n" + + "Example header: ``{\"table.name.format\": \"user_events\"}`` will route the message" + + " to the 'user_events' table."; private static final String TABLE_NAME_FORMAT_DISPLAY = "Table Name Format"; + public static final String TABLE_NAME_FORMAT_RECORD_HEADER = "__RECORD_HEADER__"; public static final String MAX_RETRIES = "max.retries"; private static final int MAX_RETRIES_DEFAULT = 10; diff --git a/src/test/java/io/confluent/connect/jdbc/sink/JdbcSinkTaskTest.java b/src/test/java/io/confluent/connect/jdbc/sink/JdbcSinkTaskTest.java index 512847259..35e4648c5 100644 --- a/src/test/java/io/confluent/connect/jdbc/sink/JdbcSinkTaskTest.java +++ b/src/test/java/io/confluent/connect/jdbc/sink/JdbcSinkTaskTest.java @@ -15,6 +15,8 @@ package io.confluent.connect.jdbc.sink; +import static io.confluent.connect.jdbc.sink.JdbcSinkConfig.TABLE_NAME_FORMAT; +import static io.confluent.connect.jdbc.sink.JdbcSinkConfig.TABLE_NAME_FORMAT_RECORD_HEADER; import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.expectLastCall; @@ -41,6 +43,7 @@ import java.util.concurrent.CompletableFuture; import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Timestamp; @@ -519,6 +522,214 @@ private List createRecordsList(int batchSize) { return records; } + @Test + public void putWithMultipleTableRoutingWithPkModeKafka() throws Exception { + Map props = new HashMap<>(); + props.put("connection.url", sqliteHelper.sqliteUri()); + props.put(TABLE_NAME_FORMAT, TABLE_NAME_FORMAT_RECORD_HEADER); + props.put("auto.create", "true"); + props.put("pk.mode", "kafka"); + props.put("pk.fields", "kafka_topic,kafka_partition,kafka_offset"); + + JdbcSinkTask task = new JdbcSinkTask(); + task.initialize(mock(SinkTaskContext.class)); + task.start(props); + + final Struct struct1 = new Struct(SCHEMA) + .put("firstName", "Alice") + .put("lastName", "Johnson") + .put("age", 28) + .put("modified", new Date(1474661402123L)); + + final Struct struct2 = new Struct(SCHEMA) + .put("firstName", "Bob") + .put("lastName", "Williams") + .put("age", 35) + .put("modified", new Date(1474661402123L)); + + final String topic = "source_topic"; + + // Create records with different target tables + SinkRecord record1 = new SinkRecord(topic, 1, null, null, SCHEMA, struct1, 44); + record1.headers().add(TABLE_NAME_FORMAT, new SchemaAndValue(Schema.STRING_SCHEMA, "users")); + + SinkRecord record2 = new SinkRecord(topic, 1, null, null, SCHEMA, struct2, 45); + record2.headers().add(TABLE_NAME_FORMAT, new SchemaAndValue(Schema.STRING_SCHEMA, "employees")); + + List records = new ArrayList<>(); + records.add(record1); + records.add(record2); + + task.put(records); + + // Verify first record went to 'users' table + assertEquals( + 1, + sqliteHelper.select( + "SELECT * FROM users", + new SqliteHelper.ResultSetReadCallback() { + @Override + public void read(ResultSet rs) throws SQLException { + assertEquals(struct1.getString("firstName"), rs.getString("firstName")); + assertEquals(struct1.getString("lastName"), rs.getString("lastName")); + assertEquals(44, rs.getLong("kafka_offset")); + } + } + ) + ); + + // Verify second record went to 'employees' table + assertEquals( + 1, + sqliteHelper.select( + "SELECT * FROM employees", + new SqliteHelper.ResultSetReadCallback() { + @Override + public void read(ResultSet rs) throws SQLException { + assertEquals(struct2.getString("firstName"), rs.getString("firstName")); + assertEquals(struct2.getString("lastName"), rs.getString("lastName")); + assertEquals(45, rs.getLong("kafka_offset")); + } + } + ) + ); + } + + @Test + public void putWithMultipleTableRoutingWithPkModeRecordKey() throws Exception { + Map props = new HashMap<>(); + props.put("connection.url", sqliteHelper.sqliteUri()); + props.put(TABLE_NAME_FORMAT, TABLE_NAME_FORMAT_RECORD_HEADER); + props.put("auto.create", "true"); + props.put("pk.mode", "record_key"); + props.put("pk.fields", ""); // Empty for record_key mode + + JdbcSinkTask task = new JdbcSinkTask(); + task.initialize(mock(SinkTaskContext.class)); + task.start(props); + + final Struct struct1 = new Struct(SCHEMA) + .put("firstName", "Alice") + .put("lastName", "Johnson") + .put("age", 28) + .put("modified", new Date(1474661402123L)); + + final Struct struct2 = new Struct(SCHEMA) + .put("firstName", "Bob") + .put("lastName", "Williams") + .put("age", 35) + .put("modified", new Date(1474661402123L)); + + final String topic = "source_topic"; + + // Define key schemas for record keys + final Schema keySchema = SchemaBuilder.struct() + .field("id", Schema.INT32_SCHEMA) + .field("type", Schema.STRING_SCHEMA) + .build(); + + // Create record keys + final Struct key1 = new Struct(keySchema) + .put("id", 1001) + .put("type", "user"); + + final Struct key2 = new Struct(keySchema) + .put("id", 2001) + .put("type", "employee"); + + // Create records with record keys and different target tables + SinkRecord record1 = new SinkRecord(topic, 1, keySchema, key1, SCHEMA, struct1, 44); + record1.headers().add(TABLE_NAME_FORMAT, new SchemaAndValue(Schema.STRING_SCHEMA, "users")); + + SinkRecord record2 = new SinkRecord(topic, 1, keySchema, key2, SCHEMA, struct2, 45); + record2.headers().add(TABLE_NAME_FORMAT, new SchemaAndValue(Schema.STRING_SCHEMA, "employees")); + + List records = new ArrayList<>(); + records.add(record1); + records.add(record2); + + task.put(records); + + // Verify first record went to 'users' table with record key as PK + assertEquals( + 1, + sqliteHelper.select( + "SELECT * FROM users", + new SqliteHelper.ResultSetReadCallback() { + @Override + public void read(ResultSet rs) throws SQLException { + // Verify data fields + assertEquals(struct1.getString("firstName"), rs.getString("firstName")); + assertEquals(struct1.getString("lastName"), rs.getString("lastName")); + assertEquals(struct1.getInt32("age").intValue(), rs.getInt("age")); + + // Verify record key fields are used as primary key + assertEquals(key1.getInt32("id").intValue(), rs.getInt("id")); + assertEquals(key1.getString("type"), rs.getString("type")); + } + } + ) + ); + + // Verify second record went to 'employees' table with record key as PK + assertEquals( + 1, + sqliteHelper.select( + "SELECT * FROM employees", + new SqliteHelper.ResultSetReadCallback() { + @Override + public void read(ResultSet rs) throws SQLException { + // Verify data fields + assertEquals(struct2.getString("firstName"), rs.getString("firstName")); + assertEquals(struct2.getString("lastName"), rs.getString("lastName")); + assertEquals(struct2.getInt32("age").intValue(), rs.getInt("age")); + + // Verify record key fields are used as primary key + assertEquals(key2.getInt32("id").intValue(), rs.getInt("id")); + assertEquals(key2.getString("type"), rs.getString("type")); + } + } + ) + ); + } + + @Test + public void putWithInvalidHeaderShouldFail() throws ConnectException { + Map props = new HashMap<>(); + props.put("connection.url", sqliteHelper.sqliteUri()); + props.put(TABLE_NAME_FORMAT, TABLE_NAME_FORMAT_RECORD_HEADER); + props.put("auto.create", "true"); + + JdbcSinkTask task = new JdbcSinkTask(); + task.initialize(mock(SinkTaskContext.class)); + task.start(props); + + final Struct struct = new Struct(SCHEMA) + .put("firstName", "Test") + .put("lastName", "User") + .put("age", 30) + .put("modified", new Date(1474661402123L)); + + // Test case 1: Missing header + SinkRecord recordWithoutHeader = new SinkRecord("source_topic", 1, null, null, SCHEMA, struct, 46); + try { + task.put(Collections.singleton(recordWithoutHeader)); + fail("Expected ConnectException for missing header"); + } catch (ConnectException e) { + assertTrue("Exception should mention header issue", e.getMessage().contains("Header 'table.name.format'")); + } + + // Test case 2: Empty header value + SinkRecord recordWithEmptyHeader = new SinkRecord("source_topic", 1, null, null, SCHEMA, struct, 47); + recordWithEmptyHeader.headers().add(TABLE_NAME_FORMAT, new SchemaAndValue(Schema.STRING_SCHEMA, "")); + try { + task.put(Collections.singleton(recordWithEmptyHeader)); + fail("Expected ConnectException for empty header value"); + } catch (ConnectException e) { + assertTrue("Exception should mention header issue", e.getMessage().contains("Header 'table.name.format'")); + } + } + private Map setupBasicProps(int maxRetries, long retryBackoffMs) { Map props = new HashMap<>(); props.put(JdbcSinkConfig.CONNECTION_URL, "stub");