From 024be22263cc799e80f652252293b6038ac0367a Mon Sep 17 00:00:00 2001 From: Hicham Bouzkraoui Date: Sun, 14 Sep 2025 12:39:10 -0600 Subject: [PATCH 1/3] issue 80 : SQL Server Change tracking --- .../jdbc/source/ChangeTrackingOffset.java | 106 +++++++++ .../source/ChangeTrackingTableQuerier.java | 200 ++++++++++++++++ .../source/JdbcSourceConnectorConfig.java | 32 ++- .../connect/jdbc/source/JdbcSourceTask.java | 16 +- .../jdbc/source/ChangeTrackingOffsetTest.java | 102 ++++++++ .../ChangeTrackingTableQuerierTest.java | 217 ++++++++++++++++++ 6 files changed, 665 insertions(+), 8 deletions(-) create mode 100644 src/main/java/io/confluent/connect/jdbc/source/ChangeTrackingOffset.java create mode 100644 src/main/java/io/confluent/connect/jdbc/source/ChangeTrackingTableQuerier.java create mode 100644 src/test/java/io/confluent/connect/jdbc/source/ChangeTrackingOffsetTest.java create mode 100644 src/test/java/io/confluent/connect/jdbc/source/ChangeTrackingTableQuerierTest.java diff --git a/src/main/java/io/confluent/connect/jdbc/source/ChangeTrackingOffset.java b/src/main/java/io/confluent/connect/jdbc/source/ChangeTrackingOffset.java new file mode 100644 index 000000000..d48ca4f0c --- /dev/null +++ b/src/main/java/io/confluent/connect/jdbc/source/ChangeTrackingOffset.java @@ -0,0 +1,106 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.connect.jdbc.source; + +import io.confluent.connect.jdbc.dialect.DatabaseDialect; +import io.confluent.connect.jdbc.util.TableId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +public class ChangeTrackingOffset { + private static final Logger log = LoggerFactory.getLogger(ChangeTrackingOffset.class); + static final String CHANGE_TRACKING_OFFSET_FIELD = "sys_change_version"; + static final String MIN_CHANGE_TRACKING_OFFSET_FIELD = "min_valid_version"; + + private final Long changeVersionOffset; + + /** + * @param changeVersionOffset the incrementing offset. + * If null, {@link #getChangeVersionOffset()} will return 0. + */ + public ChangeTrackingOffset(Long changeVersionOffset) { + this.changeVersionOffset = changeVersionOffset; + } + + public long getChangeVersionOffset() { + return changeVersionOffset == null ? 0 : changeVersionOffset; + } + + public long getChangeVersionOffset(DatabaseDialect dialect, Connection db, TableId tableId) + throws SQLException { + return changeVersionOffset == null + ? getMinChangeVersionOffset(dialect, db, tableId) : changeVersionOffset; + } + + private long getMinChangeVersionOffset(DatabaseDialect dialect, Connection db, TableId tableId) + throws SQLException { + String minChangeTrackingSQL = "SELECT CHANGE_TRACKING_MIN_VALID_VERSION(OBJECT_ID('" + + tableId.schemaName() + "." + tableId.tableName() + + "')) as " + MIN_CHANGE_TRACKING_OFFSET_FIELD; + + try (PreparedStatement stm = dialect.createPreparedStatement(db, minChangeTrackingSQL)) { + try (ResultSet resultSet = stm.executeQuery()) { + while (resultSet.next()) { + return resultSet.getLong(MIN_CHANGE_TRACKING_OFFSET_FIELD); + } + return 0; + } + } + } + + public Map toMap() { + Map map = new HashMap<>(1); + if (changeVersionOffset != null) { + map.put(CHANGE_TRACKING_OFFSET_FIELD, changeVersionOffset); + } + return map; + } + + public static ChangeTrackingOffset fromMap(Map map) { + if (map == null || map.isEmpty()) { + return new ChangeTrackingOffset(null); + } + Long versionOffset = (Long) map.get(CHANGE_TRACKING_OFFSET_FIELD); + return new ChangeTrackingOffset(versionOffset); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + ChangeTrackingOffset that = (ChangeTrackingOffset) o; + + return Objects.equals(changeVersionOffset, that.changeVersionOffset); + } + + @Override + public int hashCode() { + return Objects.hash(changeVersionOffset); + } +} diff --git a/src/main/java/io/confluent/connect/jdbc/source/ChangeTrackingTableQuerier.java b/src/main/java/io/confluent/connect/jdbc/source/ChangeTrackingTableQuerier.java new file mode 100644 index 000000000..60606ab8a --- /dev/null +++ b/src/main/java/io/confluent/connect/jdbc/source/ChangeTrackingTableQuerier.java @@ -0,0 +1,200 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.connect.jdbc.source; + +import io.confluent.connect.jdbc.dialect.DatabaseDialect; +import io.confluent.connect.jdbc.source.SchemaMapping.FieldSetter; +import io.confluent.connect.jdbc.util.ColumnDefinition; +import io.confluent.connect.jdbc.util.ColumnId; +import io.confluent.connect.jdbc.util.ExpressionBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.errors.DataException; +import org.apache.kafka.connect.source.SourceRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * ChangeTrackingTableQuerier always returns the latest changed rows after a specific + * change_version. + */ +public class ChangeTrackingTableQuerier extends TableQuerier { + private static final Logger log = LoggerFactory.getLogger(ChangeTrackingTableQuerier.class); + protected final Map partition; + protected final String topic; + private List columns; + private ColumnId primaryKeyColumn; + protected ChangeTrackingOffset committedOffset; + protected ChangeTrackingOffset offset; + private static final String CHANGE_TRACKING_SQL = + "SELECT CT.%s,%s,CT.SYS_CHANGE_OPERATION AS operation_ind," + + "CT.SYS_CHANGE_VERSION as %s FROM %s.%s " + + "RIGHT OUTER JOIN CHANGETABLE(CHANGES %s.%s, %s) AS CT " + + "ON %s.%s.%s = CT.%s ORDER BY CT.SYS_CHANGE_VERSION"; + + public ChangeTrackingTableQuerier( + DatabaseDialect dialect, + QueryMode mode, + String name, + String topicPrefix, + Map offsetMap, + String suffix + ) { + super(dialect, mode, name, topicPrefix, suffix); + ChangeTrackingOffset initialOffset = ChangeTrackingOffset.fromMap(offsetMap); + this.committedOffset = initialOffset; + this.offset = initialOffset; + String tableName = tableId.tableName(); + topic = topicPrefix + tableName; // backward compatible + partition = OffsetProtocols.sourcePartitionForProtocolV1(tableId); + } + + @Override + protected void createPreparedStatement(Connection db) throws SQLException { + findAllNonPKColumns(db); + findPrimaryKeyColumn(db); + + String schemaName = tableId.schemaName(); + String tableName = tableId.tableName(); + + ExpressionBuilder builder = dialect.expressionBuilder(); + builder.append(CHANGE_TRACKING_SQL); + String columnsString = columns.stream() + .map(columnId -> schemaName + '.' + tableName + "." + columnId.name()) + .collect(Collectors.joining(",")); + long changeVersionOffset = offset.getChangeVersionOffset(dialect, db, tableId); + String queryString = String.format(builder.toString(), + primaryKeyColumn.name(), + columnsString, + ChangeTrackingOffset.CHANGE_TRACKING_OFFSET_FIELD, + schemaName, + tableName, + schemaName, + tableName, + changeVersionOffset, + schemaName, + tableName, + primaryKeyColumn.name(), + primaryKeyColumn.name()); + recordQuery(queryString); + log.trace("{} prepared SQL query: {}", this, queryString); + stmt = dialect.createPreparedStatement(db, queryString); + } + + @Override + protected ResultSet executeQuery() throws SQLException { + return stmt.executeQuery(); + } + + @Override + public SourceRecord extractRecord() throws SQLException { + Struct record = new Struct(schemaMapping.schema()); + for (FieldSetter setter : schemaMapping.fieldSetters()) { + try { + setter.setField(record, resultSet); + } catch (IOException e) { + log.warn("Error mapping fields into Connect record", e); + throw new ConnectException(e); + } catch (SQLException e) { + log.warn("SQL error mapping fields into Connect record", e); + throw new DataException(e); + } + } + offset = extractOffset(); + return new SourceRecord(partition, offset.toMap(), topic, record.schema(), record); + } + + @Override + public void reset(long now, boolean resetOffset) { + // the task is being reset, any uncommitted offset needs to be reset as well + // use the previous committedOffset to set the running offset + if (resetOffset) { + this.offset = this.committedOffset; + } + super.reset(now, resetOffset); + } + + @Override + public String toString() { + return "ChangeTrackingTableQuerier{" + "table='" + tableId + '\'' + ", query='" + query + '\'' + + ", topicPrefix='" + topicPrefix + '\'' + '}'; + } + + @Override + public void maybeStartQuery(Connection db) throws SQLException, ConnectException { + if (resultSet == null) { + this.db = db; + stmt = getOrCreatePreparedStatement(db); + resultSet = executeQuery(); + String schemaName = tableId != null ? tableId.tableName() : null; + ResultSetMetaData metadata = resultSet.getMetaData(); + dialect.validateSpecificColumnTypes(metadata, columns); + schemaMapping = SchemaMapping.create(schemaName, metadata, dialect); + } else { + log.trace("Current ResultSet {} isn't null. Continuing to seek.", resultSet.hashCode()); + } + + // This is called everytime during poll() before extracting records, + // to ensure that the previous run succeeded, allowing us to move the committedOffset forward. + // This action is a no-op for the first poll() + this.committedOffset = this.offset; + log.trace("Set the committed offset: {}", + committedOffset.getChangeVersionOffset(dialect, db, tableId)); + } + + private void findPrimaryKeyColumn(Connection db) throws SQLException { + for (ColumnDefinition defn : dialect.describeColumns( + db, + tableId.catalogName(), + tableId.schemaName(), + tableId.tableName(), + null).values()) { + if (defn.isPrimaryKey()) { + primaryKeyColumn = defn.id(); + break; + } + } + } + + private void findAllNonPKColumns(Connection db) throws SQLException { + columns = new ArrayList<>(); + for (ColumnDefinition defn : dialect.describeColumns( + db, + tableId.catalogName(), + tableId.schemaName(), + tableId.tableName(), + null).values()) { + if (!defn.isPrimaryKey()) { + columns.add(defn.id()); + } + } + } + + private ChangeTrackingOffset extractOffset() throws SQLException { + return new ChangeTrackingOffset( + resultSet.getLong(ChangeTrackingOffset.CHANGE_TRACKING_OFFSET_FIELD)); + } +} diff --git a/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java b/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java index e5b3d32a0..40d182bd2 100644 --- a/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java +++ b/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java @@ -912,21 +912,41 @@ public boolean visible(String name, Map config) { switch (mode) { case MODE_BULK: return false; + case MODE_CHANGE_TRACKING: + return isChangeTrackingConfig(name); case MODE_TIMESTAMP: - return name.equals(TIMESTAMP_COLUMN_NAME_CONFIG) || name.equals(VALIDATE_NON_NULL_CONFIG); + return isTimestampOrValidationConfig(name); case MODE_INCREMENTING: - return name.equals(INCREMENTING_COLUMN_NAME_CONFIG) - || name.equals(VALIDATE_NON_NULL_CONFIG); + return isIncrementingOrValidationConfig(name); case MODE_TIMESTAMP_INCREMENTING: - return name.equals(TIMESTAMP_COLUMN_NAME_CONFIG) - || name.equals(INCREMENTING_COLUMN_NAME_CONFIG) - || name.equals(VALIDATE_NON_NULL_CONFIG); + return isTimestampIncrementingConfig(name); case MODE_UNSPECIFIED: throw new ConfigException("Query mode must be specified"); default: throw new ConfigException("Invalid mode: " + mode); } } + + private boolean isTimestampOrValidationConfig(String name) { + return name.equals(TIMESTAMP_COLUMN_NAME_CONFIG) || name.equals(VALIDATE_NON_NULL_CONFIG); + } + + private boolean isIncrementingOrValidationConfig(String name) { + return name.equals(INCREMENTING_COLUMN_NAME_CONFIG) + || name.equals(VALIDATE_NON_NULL_CONFIG); + } + + private boolean isTimestampIncrementingConfig(String name) { + return name.equals(TIMESTAMP_COLUMN_NAME_CONFIG) + || name.equals(INCREMENTING_COLUMN_NAME_CONFIG) + || name.equals(VALIDATE_NON_NULL_CONFIG); + } + + private boolean isChangeTrackingConfig(String name) { + return !name.equals(INCREMENTING_COLUMN_NAME_CONFIG) + && !name.equals(TIMESTAMP_COLUMN_NAME_CONFIG) + && !name.equals(VALIDATE_NON_NULL_CONFIG); + } } public enum NumericMapping { diff --git a/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceTask.java b/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceTask.java index 6c41acfec..6b2c97947 100644 --- a/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceTask.java +++ b/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceTask.java @@ -161,7 +161,8 @@ public void start(Map properties) { Map, Map> offsets = null; if (mode.equals(JdbcSourceTaskConfig.MODE_INCREMENTING) || mode.equals(JdbcSourceTaskConfig.MODE_TIMESTAMP) - || mode.equals(JdbcSourceTaskConfig.MODE_TIMESTAMP_INCREMENTING)) { + || mode.equals(JdbcSourceTaskConfig.MODE_TIMESTAMP_INCREMENTING) + || mode.equals(JdbcSourceTaskConfig.MODE_CHANGE_TRACKING)) { List> partitions = new ArrayList<>(tables.size()); switch (queryMode) { case TABLE: @@ -207,7 +208,7 @@ public void start(Map properties) { log.trace("Task executing in {} mode",queryMode); switch (queryMode) { case TABLE: - if (validateNonNulls) { + if (validateNonNulls & !mode.equals(JdbcSourceTaskConfig.MODE_CHANGE_TRACKING)) { validateNonNullable( mode, tableOrQuery, @@ -305,6 +306,17 @@ public void start(Map properties) { timestampGranularity ) ); + } else if (mode.equals(JdbcSourceTaskConfig.MODE_CHANGE_TRACKING)) { + tableQueue.add( + new ChangeTrackingTableQuerier( + dialect, + queryMode, + tableOrQuery, + topicPrefix, + offset, + suffix + ) + ); } } diff --git a/src/test/java/io/confluent/connect/jdbc/source/ChangeTrackingOffsetTest.java b/src/test/java/io/confluent/connect/jdbc/source/ChangeTrackingOffsetTest.java new file mode 100644 index 000000000..d0a084a57 --- /dev/null +++ b/src/test/java/io/confluent/connect/jdbc/source/ChangeTrackingOffsetTest.java @@ -0,0 +1,102 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.connect.jdbc.source; + +import io.confluent.connect.jdbc.dialect.DatabaseDialect; +import io.confluent.connect.jdbc.util.TableId; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.easymock.annotation.Mock; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.sql.*; +import java.util.Collections; + +import static org.easymock.EasyMock.*; +import static org.easymock.EasyMock.anyObject; +import static org.junit.Assert.*; +import static org.powermock.api.easymock.PowerMock.expectLastCall; +import static org.powermock.api.easymock.PowerMock.replayAll; + +@RunWith(PowerMockRunner.class) +public class ChangeTrackingOffsetTest { + private final long expectedOffset = 1001L; + private final long MIN_CHANGE_TRACKING_OFFSET = 1000L; + private final String MIN_CHANGE_TRACKING_OFFSET_FIELD = "min_valid_version"; + private static final TableId tableId = new TableId("", "", "table"); + private final ChangeTrackingOffset unset = new ChangeTrackingOffset(null); + private final ChangeTrackingOffset set = new ChangeTrackingOffset(expectedOffset); + @Mock + private PreparedStatement stmt; + @Mock + private Connection db; + @Mock + private ResultSet resultSet; + private DatabaseDialect dialect; + + @Before + public void setUp() throws SQLException { + dialect = mock(DatabaseDialect.class); + expect(dialect.createPreparedStatement(eq(db), anyString())).andReturn(stmt); + replay(dialect); + } + + @Test + public void testDefaults() throws Exception { + assertEquals(0, unset.getChangeVersionOffset()); + expectNewQuery(); + assertEquals(MIN_CHANGE_TRACKING_OFFSET,unset.getChangeVersionOffset(dialect,db,tableId)); + } + + private void expectNewQuery() throws Exception { + expect(stmt.executeQuery()).andReturn(resultSet); + expect(resultSet.next()).andReturn(true); + expect(resultSet.getLong(MIN_CHANGE_TRACKING_OFFSET_FIELD)).andReturn(MIN_CHANGE_TRACKING_OFFSET); + resultSet.close(); + stmt.close(); + replayAll(); + } + + @Test + public void testToMap() { + assertEquals(0, unset.toMap().size()); + assertEquals(1, set.toMap().size()); + } + + @Test + public void testGetChangeVersionOffset() throws Exception { + assertEquals(0, unset.getChangeVersionOffset()); + assertEquals(expectedOffset, set.getChangeVersionOffset()); + expectNewQuery(); + assertEquals(MIN_CHANGE_TRACKING_OFFSET, unset.getChangeVersionOffset(dialect,db,tableId)); + assertEquals(expectedOffset, set.getChangeVersionOffset(dialect,db,tableId)); + } + + @Test + public void testFromMap() { + assertEquals(unset, ChangeTrackingOffset.fromMap(unset.toMap())); + assertEquals(set, ChangeTrackingOffset.fromMap(set.toMap())); + } + + @Test + public void testEquals() { + assertEquals(unset, new ChangeTrackingOffset(null)); + assertEquals(set,new ChangeTrackingOffset(expectedOffset)); + + } + +} diff --git a/src/test/java/io/confluent/connect/jdbc/source/ChangeTrackingTableQuerierTest.java b/src/test/java/io/confluent/connect/jdbc/source/ChangeTrackingTableQuerierTest.java new file mode 100644 index 000000000..69f6b27b2 --- /dev/null +++ b/src/test/java/io/confluent/connect/jdbc/source/ChangeTrackingTableQuerierTest.java @@ -0,0 +1,217 @@ +/** + * Copyright 2016 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package io.confluent.connect.jdbc.source; + +import io.confluent.connect.jdbc.dialect.DatabaseDialect; +import io.confluent.connect.jdbc.util.ColumnDefinition; +import io.confluent.connect.jdbc.util.ColumnId; +import io.confluent.connect.jdbc.util.ExpressionBuilder; +import io.confluent.connect.jdbc.util.TableId; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.source.SourceRecord; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.easymock.annotation.Mock; +import org.powermock.api.easymock.annotation.MockNice; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.Timestamp; +import java.util.*; + +import static org.easymock.EasyMock.*; +import static org.easymock.EasyMock.expect; +import static org.junit.Assert.*; +import static org.powermock.api.easymock.PowerMock.expectLastCall; +import static org.powermock.api.easymock.PowerMock.replay; +import static org.powermock.api.easymock.PowerMock.*; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(SchemaMapping.class) +public class ChangeTrackingTableQuerierTest { + + private static final long INITIAL_CHANGE_VERSION_OFFSET = 1000; + private static final String PK_COLUMN = "col1"; + private static final String tableName = "table"; + private static final TableId tableId = new TableId("", "", tableName); + private static final ColumnId columnId = new ColumnId(tableId, PK_COLUMN); + + @Mock + private PreparedStatement stmt; + @Mock + private ResultSet resultSet; + @Mock + private Connection db; + @MockNice + private ExpressionBuilder expressionBuilder; + @Mock + private SchemaMapping schemaMapping; + private DatabaseDialect dialect; + + @Mock + ColumnDefinition columnDefinition; + + @Before + public void setUp() { + dialect = mock(DatabaseDialect.class); + mockStatic(SchemaMapping.class); + } + private Schema schema() { + SchemaBuilder result = SchemaBuilder.struct(); + result.field(PK_COLUMN, Schema.INT64_SCHEMA); + return result.build(); + } + + private ChangeTrackingTableQuerier querier(long changeVersionOffset) { + expect(dialect.parseTableIdentifier(tableName)).andReturn(tableId); + replay(dialect); + // Have to replay the dialect here since it's used to the table ID in the querier's constructor + return new ChangeTrackingTableQuerier( + dialect, + TableQuerier.QueryMode.TABLE, + tableName, + "", + new ChangeTrackingOffset(changeVersionOffset).toMap(), + "" + ); + } + + + private void expectNewQuery() throws Exception { + expect(columnDefinition.isPrimaryKey()).andStubReturn(true); + expect(columnDefinition.id()).andStubReturn(columnId); + expect(dialect.createPreparedStatement(eq(db), anyObject())).andReturn(stmt); + expect(dialect.describeColumns(eq(db), anyString(), anyString(), anyString(), anyString())).andStubReturn(Collections.singletonMap(columnId,columnDefinition)); + expect(dialect.expressionBuilder()).andReturn(expressionBuilder); + dialect.validateSpecificColumnTypes(anyObject(), anyObject()); + expectLastCall(); + expect(stmt.executeQuery()).andReturn(resultSet); + expect(resultSet.getMetaData()).andReturn(null); + expect(SchemaMapping.create(anyObject(), anyObject(), anyObject())).andReturn(schemaMapping); + } + + @Test + public void testEmptyResultSet() throws Exception { + expectNewQuery(); + ChangeTrackingTableQuerier querier = querier(INITIAL_CHANGE_VERSION_OFFSET); + expect(resultSet.next()).andReturn(false); + + replayAll(); + + querier.maybeStartQuery(db); + + assertFalse(querier.next()); + } + + @Test + public void testSingleRecordInResultSet() throws Exception { + long newChangeVersionOffset = INITIAL_CHANGE_VERSION_OFFSET + 1; + expectNewQuery(); + ChangeTrackingTableQuerier querier = querier(INITIAL_CHANGE_VERSION_OFFSET); + expectRecord(newChangeVersionOffset); + expect(resultSet.next()).andReturn(false); + + replayAll(); + + querier.maybeStartQuery(db); + + assertNextRecord(querier, newChangeVersionOffset); + + assertFalse(querier.next()); + } + + @Test + public void testChangeTrackingMode() throws Exception { + long firstNewChangeVersionOffset = INITIAL_CHANGE_VERSION_OFFSET + 1; + long secondNewChangeVersionOffset = INITIAL_CHANGE_VERSION_OFFSET + 2; + expectNewQuery(); + ChangeTrackingTableQuerier querier = querier(INITIAL_CHANGE_VERSION_OFFSET); + expectRecord(firstNewChangeVersionOffset); + expectRecord(secondNewChangeVersionOffset); + expect(resultSet.next()).andReturn(false); + + replayAll(); + + querier.maybeStartQuery(db); + + // We commit offsets immediately in this mode + assertNextRecord(querier, firstNewChangeVersionOffset); + assertNextRecord(querier, secondNewChangeVersionOffset); + + assertFalse(querier.next()); + } + + @Test + public void testMultipleSingleRecordResultSets() throws Exception { + expectNewQuery(); + expectNewQuery(); + ChangeTrackingTableQuerier querier = querier(INITIAL_CHANGE_VERSION_OFFSET); + expectRecord(INITIAL_CHANGE_VERSION_OFFSET); + expect(resultSet.next()).andReturn(false); + expectReset(); + expectRecord(INITIAL_CHANGE_VERSION_OFFSET); + expect(resultSet.next()).andReturn(false); + + replayAll(); + + querier.maybeStartQuery(db); + + assertNextRecord(querier, INITIAL_CHANGE_VERSION_OFFSET); + + assertFalse(querier.next()); + + querier.reset(0, true); + querier.maybeStartQuery(db); + + assertNextRecord(querier, INITIAL_CHANGE_VERSION_OFFSET); + + assertFalse(querier.next()); + } + + + private void assertNextRecord( + ChangeTrackingTableQuerier querier, long expectedChangeVersionOffset + ) throws Exception { + assertTrue(querier.next()); + SourceRecord record = querier.extractRecord(); + ChangeTrackingOffset actualOffset = ChangeTrackingOffset.fromMap(record.sourceOffset()); + assertEquals(expectedChangeVersionOffset, actualOffset.getChangeVersionOffset()); + } + + private void expectRecord(long changeVersionOffset) throws Exception { + expect(schemaMapping.schema()).andReturn(schema()).times(2); + expect(resultSet.next()).andReturn(true); + expect(schemaMapping.fieldSetters()).andReturn(Collections.emptyList()); + ChangeTrackingOffset offset = new ChangeTrackingOffset(changeVersionOffset); + expect(resultSet.getLong(anyString())).andReturn(changeVersionOffset); + } + + private void expectReset() throws Exception { + resultSet.close(); + expectLastCall(); + stmt.close(); + expectLastCall(); + db.commit(); + expectLastCall(); + } + +} From 681c0764071941a7666c4a2a2da771af97cc4f02 Mon Sep 17 00:00:00 2001 From: Hicham Bouzkraoui Date: Sun, 14 Sep 2025 22:59:01 -0600 Subject: [PATCH 2/3] add change tracking integration test --- .../source/JdbcSourceConnectorConfig.java | 4 +- .../integration/MSSQLChangeTrackingIT.java | 273 ++++++++++++++++++ 2 files changed, 276 insertions(+), 1 deletion(-) create mode 100644 src/test/java/io/confluent/connect/jdbc/source/integration/MSSQLChangeTrackingIT.java diff --git a/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java b/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java index 40d182bd2..764192ae2 100644 --- a/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java +++ b/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java @@ -179,6 +179,7 @@ public class JdbcSourceConnectorConfig extends AbstractConfig { public static final String MODE_TIMESTAMP = "timestamp"; public static final String MODE_INCREMENTING = "incrementing"; public static final String MODE_TIMESTAMP_INCREMENTING = "timestamp+incrementing"; + public static final String MODE_CHANGE_TRACKING = "change_tracking"; public static final String INCREMENTING_COLUMN_NAME_CONFIG = "incrementing.column.name"; private static final String INCREMENTING_COLUMN_NAME_DOC = @@ -598,7 +599,8 @@ private static final void addModeOptions(ConfigDef config) { MODE_BULK, MODE_TIMESTAMP, MODE_INCREMENTING, - MODE_TIMESTAMP_INCREMENTING + MODE_TIMESTAMP_INCREMENTING, + MODE_CHANGE_TRACKING ), Importance.HIGH, MODE_DOC, diff --git a/src/test/java/io/confluent/connect/jdbc/source/integration/MSSQLChangeTrackingIT.java b/src/test/java/io/confluent/connect/jdbc/source/integration/MSSQLChangeTrackingIT.java new file mode 100644 index 000000000..29d609f5c --- /dev/null +++ b/src/test/java/io/confluent/connect/jdbc/source/integration/MSSQLChangeTrackingIT.java @@ -0,0 +1,273 @@ +/* + * Copyright [2017 - 2019] Confluent Inc. + */ + +package io.confluent.connect.jdbc.source.integration; + +import java.sql.SQLException; +import java.time.Duration; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.connect.storage.StringConverter; +import org.testcontainers.containers.FixedHostPortGenericContainer; +import org.testcontainers.containers.wait.strategy.Wait; + +import org.apache.kafka.test.IntegrationTest; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.ClassRule; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.DriverManager; +import java.sql.Connection; +import java.sql.PreparedStatement; + +import io.confluent.connect.jdbc.integration.BaseConnectorIT; +import org.apache.kafka.clients.consumer.ConsumerRecords; + +import static org.junit.Assert.assertEquals; +import static io.confluent.connect.jdbc.source.JdbcSourceConnectorConfig.CONNECTION_URL_CONFIG; +import static io.confluent.connect.jdbc.source.JdbcSourceConnectorConfig.CONNECTION_USER_CONFIG; +import static io.confluent.connect.jdbc.source.JdbcSourceConnectorConfig.CONNECTION_PASSWORD_CONFIG; +import static io.confluent.connect.jdbc.source.JdbcSourceConnectorConfig.TABLE_WHITELIST_CONFIG; +import static io.confluent.connect.jdbc.source.JdbcSourceConnectorConfig.MODE_CONFIG; +import static io.confluent.connect.jdbc.source.JdbcSourceConnectorConfig.MODE_CHANGE_TRACKING; +import static io.confluent.connect.jdbc.source.JdbcSourceConnectorConfig.TOPIC_PREFIX_CONFIG; +import static io.confluent.connect.jdbc.source.JdbcSourceConnectorConfig.POLL_INTERVAL_MS_CONFIG; + +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG; + +/** + * Integration test for JDBC source connector change tracking mode with MSSQL Server. + */ +@Category(IntegrationTest.class) +public class MSSQLChangeTrackingIT extends BaseConnectorIT { + + private static final Logger log = LoggerFactory.getLogger(MSSQLChangeTrackingIT.class); + private static final String CONNECTOR_NAME = "JdbcSourceConnector"; + private static final int NUM_RECORDS_PRODUCED = 3; + private static final long CONSUME_MAX_DURATION_MS = TimeUnit.MINUTES.toMillis(2); + private static final int TASKS_MAX = 3; + private static final String MSSQL_URL = "jdbc:sqlserver://0.0.0.0:1433"; + private static final String MSSQL_URL_TESTDB = "jdbc:sqlserver://0.0.0.0:1433;databaseName=testdb"; + private static final String MSSQL_Table = "TestChangeTrackingTable"; + private static final String TOPIC_PREFIX = "test-"; + private static final List KAFKA_TOPICS = Collections.singletonList(TOPIC_PREFIX + MSSQL_Table); + + private Map props; + private static final String USER = "sa"; + private static final String PASS = "reallyStrongPwd123"; + private Connection connection; + + @ClassRule + @SuppressWarnings("deprecation") + public static final FixedHostPortGenericContainer mssqlServer = + new FixedHostPortGenericContainer<>("mcr.microsoft.com/mssql/server:2019-latest") + .withEnv("ACCEPT_EULA","Y") + .withEnv("SA_PASSWORD","reallyStrongPwd123") + .withFixedExposedPort(1433, 1433) + .waitingFor(Wait.forLogMessage(".*SQL Server is now ready for client connections.*", 1) + .withStartupTimeout(Duration.ofMinutes(2))); + + @Before + public void setup() throws Exception { + Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver"); + Thread.sleep(5000); + connection = DriverManager.getConnection(MSSQL_URL, USER, PASS); + startConnect(); + } + + @After + public void close() throws SQLException { + deleteTable(); + try { + connect.deleteConnector(CONNECTOR_NAME); + } catch (Exception e) { + log.warn("Failed to delete connector: {}", e.getMessage()); + } + connection.close(); + stopConnect(); + } + + @Test + public void verifyChangeTrackingModeWorksWithInserts() throws Exception { + props = configProperties(); + props.put(MODE_CONFIG, MODE_CHANGE_TRACKING); + + enableChangeTracking(); + createTableWithChangeTracking(); + + KAFKA_TOPICS.forEach(topic -> connect.kafka().createTopic(topic, 1)); + configureAndWaitForConnector(); + + insertTestRecord(1, "initial record"); + insertTestRecord(2, "second record"); + + ConsumerRecords records = connect.kafka().consume( + NUM_RECORDS_PRODUCED - 1, + CONSUME_MAX_DURATION_MS, + KAFKA_TOPICS.toArray(new String[0]) + ); + + assertEquals(2, records.count()); + } + + @Test + public void verifyChangeTrackingModeWorksWithUpdates() throws Exception { + props = configProperties(); + props.put(MODE_CONFIG, MODE_CHANGE_TRACKING); + + enableChangeTracking(); + createTableWithChangeTracking(); + + insertTestRecord(1, "initial record"); + + KAFKA_TOPICS.forEach(topic -> connect.kafka().createTopic(topic, 1)); + configureAndWaitForConnector(); + + updateTestRecord(1, "updated record"); + + ConsumerRecords records = connect.kafka().consume( + 1, + CONSUME_MAX_DURATION_MS, + KAFKA_TOPICS.toArray(new String[0]) + ); + + assertEquals(1, records.count()); + } + + @Test + public void verifyChangeTrackingModeWorksWithDeletes() throws Exception { + props = configProperties(); + props.put(MODE_CONFIG, MODE_CHANGE_TRACKING); + + enableChangeTracking(); + createTableWithChangeTracking(); + + insertTestRecord(1, "record to delete"); + + KAFKA_TOPICS.forEach(topic -> connect.kafka().createTopic(topic, 1)); + configureAndWaitForConnector(); + + deleteTestRecord(1); + + ConsumerRecords records = connect.kafka().consume( + 1, + CONSUME_MAX_DURATION_MS, + KAFKA_TOPICS.toArray(new String[0]) + ); + + assertEquals(1, records.count()); + } + + private void enableChangeTracking() throws SQLException { + try { + String sql = "CREATE DATABASE testdb"; + PreparedStatement stmt = connection.prepareStatement(sql); + executeSQL(stmt); + } catch (SQLException e) { + if (!e.getMessage().contains("already exists")) { + throw e; + } + } + + try { + String sql = "ALTER DATABASE testdb SET CHANGE_TRACKING = ON (CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON)"; + PreparedStatement stmt = connection.prepareStatement(sql); + executeSQL(stmt); + } catch (SQLException e) { + if (!e.getMessage().contains("already enabled")) { + throw e; + } + } + + connection.close(); + connection = DriverManager.getConnection(MSSQL_URL_TESTDB, USER, PASS); + } + + private void createTableWithChangeTracking() throws SQLException { + String sql = "CREATE TABLE " + MSSQL_Table + " (id INT PRIMARY KEY, record VARCHAR(255))"; + PreparedStatement createStmt = connection.prepareStatement(sql); + executeSQL(createStmt); + + sql = "ALTER TABLE " + MSSQL_Table + " ENABLE CHANGE_TRACKING WITH (TRACK_COLUMNS_UPDATED = ON)"; + PreparedStatement enableStmt = connection.prepareStatement(sql); + executeSQL(enableStmt); + } + + private void insertTestRecord(int id, String record) throws SQLException { + String sql = "INSERT INTO " + MSSQL_Table + " (id, record) VALUES (?, ?)"; + PreparedStatement stmt = connection.prepareStatement(sql); + stmt.setInt(1, id); + stmt.setString(2, record); + executeSQL(stmt); + } + + private void updateTestRecord(int id, String record) throws SQLException { + String sql = "UPDATE " + MSSQL_Table + " SET record = ? WHERE id = ?"; + PreparedStatement stmt = connection.prepareStatement(sql); + stmt.setString(1, record); + stmt.setInt(2, id); + executeSQL(stmt); + } + + private void deleteTestRecord(int id) throws SQLException { + String sql = "DELETE FROM " + MSSQL_Table + " WHERE id = ?"; + PreparedStatement stmt = connection.prepareStatement(sql); + stmt.setInt(1, id); + executeSQL(stmt); + } + + private void deleteTable() throws SQLException { + try { + String sql = "DROP TABLE IF EXISTS " + MSSQL_Table; + PreparedStatement stmt = connection.prepareStatement(sql); + executeSQL(stmt); + } catch (SQLException e) { + log.warn("Failed to delete table: {}", e.getMessage()); + } + } + + private void executeSQL(PreparedStatement stmt) throws SQLException { + try { + stmt.execute(); + } finally { + stmt.close(); + } + } + + private Map configProperties() { + Map props = new HashMap<>(); + props.put(CONNECTOR_CLASS_CONFIG, "io.confluent.connect.jdbc.JdbcSourceConnector"); + props.put(TASKS_MAX_CONFIG, String.valueOf(TASKS_MAX)); + props.put(CONNECTION_URL_CONFIG, MSSQL_URL_TESTDB); + props.put(CONNECTION_USER_CONFIG, USER); + props.put(CONNECTION_PASSWORD_CONFIG, PASS); + props.put(TABLE_WHITELIST_CONFIG, MSSQL_Table); + props.put(TOPIC_PREFIX_CONFIG, TOPIC_PREFIX); + props.put(POLL_INTERVAL_MS_CONFIG, "1000"); + props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); + props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); + return props; + } + + private void configureAndWaitForConnector() throws InterruptedException { + connect.configureConnector(CONNECTOR_NAME, props); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning( + CONNECTOR_NAME, + Math.min(KAFKA_TOPICS.size(), TASKS_MAX), + "Connector tasks did not start in time." + ); + } +} \ No newline at end of file From e906e09603f747d3db917cdf226d9a4bcb07567c Mon Sep 17 00:00:00 2001 From: Hicham Bouzkraoui Date: Mon, 15 Sep 2025 09:13:21 -0600 Subject: [PATCH 3/3] Empty commit to re-trigger CLA checks