diff --git a/src/main/java/io/confluent/connect/jdbc/dialect/DatabaseDialect.java b/src/main/java/io/confluent/connect/jdbc/dialect/DatabaseDialect.java index 7a32c9bd7..2df33acee 100644 --- a/src/main/java/io/confluent/connect/jdbc/dialect/DatabaseDialect.java +++ b/src/main/java/io/confluent/connect/jdbc/dialect/DatabaseDialect.java @@ -342,6 +342,20 @@ String buildUpdateStatement( Collection nonKeyColumns ); + /** + * Build the DELETE prepared statement expression for the given table and its columns. Variables + * for each key column should also appear in the WHERE clause of the statement. + * + * @param table the identifier of the table; may not be null + * @param keyColumns the identifiers of the columns in the primary/unique key; may not be null + * but may be empty + * @return the DELETE statement; may not be null + */ + String buildDeleteStatement( + TableId table, + Collection keyColumns + ); + /** * Build the UPSERT or MERGE prepared statement expression to either insert a new record into the * given table or update an existing record in that table Variables for each key column should @@ -401,10 +415,12 @@ String buildUpsertQueryStatement( */ StatementBinder statementBinder( PreparedStatement statement, + PreparedStatement deleteStatement, JdbcSinkConfig.PrimaryKeyMode pkMode, SchemaPair schemaPair, FieldsMetadata fieldsMetadata, - JdbcSinkConfig.InsertMode insertMode + JdbcSinkConfig.InsertMode insertMode, + JdbcSinkConfig config ); /** diff --git a/src/main/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialect.java b/src/main/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialect.java index af5440de3..7a72b4c5b 100644 --- a/src/main/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialect.java +++ b/src/main/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialect.java @@ -1348,6 +1348,23 @@ public String buildInsertStatement( return builder.toString(); } + public String buildDeleteStatement( + TableId table, + Collection keyColumns + ) { + ExpressionBuilder builder = expressionBuilder(); + builder.append("DELETE FROM "); + builder.append(table); + if (!keyColumns.isEmpty()) { + builder.append(" WHERE "); + builder.appendList() + .delimitedBy(" AND ") + .transformedBy(ExpressionBuilder.columnNamesWith(" = ?")) + .of(keyColumns); + } + return builder.toString(); + } + @Override public String buildUpdateStatement( TableId table, @@ -1384,18 +1401,22 @@ public String buildUpsertQueryStatement( @Override public StatementBinder statementBinder( PreparedStatement statement, + PreparedStatement deleteStatement, PrimaryKeyMode pkMode, SchemaPair schemaPair, FieldsMetadata fieldsMetadata, - InsertMode insertMode + InsertMode insertMode, + JdbcSinkConfig config ) { return new PreparedStatementBinder( this, statement, + deleteStatement, pkMode, schemaPair, fieldsMetadata, - insertMode + insertMode, + config ); } diff --git a/src/main/java/io/confluent/connect/jdbc/sink/BufferedRecords.java b/src/main/java/io/confluent/connect/jdbc/sink/BufferedRecords.java index 522296cc4..14e547207 100644 --- a/src/main/java/io/confluent/connect/jdbc/sink/BufferedRecords.java +++ b/src/main/java/io/confluent/connect/jdbc/sink/BufferedRecords.java @@ -16,6 +16,7 @@ package io.confluent.connect.jdbc.sink; +import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.sink.SinkRecord; import org.slf4j.Logger; @@ -27,8 +28,8 @@ import java.sql.Statement; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.stream.Collectors; import io.confluent.connect.jdbc.dialect.DatabaseDialect; @@ -48,11 +49,15 @@ public class BufferedRecords { private final Connection connection; private List records = new ArrayList<>(); - private SchemaPair currentSchemaPair; private FieldsMetadata fieldsMetadata; private PreparedStatement preparedStatement; private StatementBinder preparedStatementBinder; + private Schema keySchema; + private Schema valueSchema; + private boolean batchContainsDeletes = false; + private PreparedStatement deletePreparedStatement; + public BufferedRecords( JdbcSinkConfig config, TableId tableId, @@ -73,59 +78,87 @@ public List add(SinkRecord record) throws SQLException { record.valueSchema() ); - if (currentSchemaPair == null) { - currentSchemaPair = schemaPair; - // re-initialize everything that depends on the record schema - fieldsMetadata = FieldsMetadata.extract( - tableId.tableName(), - config.pkMode, - config.pkFields, - config.fieldsWhitelist, - currentSchemaPair - ); - dbStructure.createOrAmendIfNecessary( - config, - connection, - tableId, - fieldsMetadata - ); + List flushed = new ArrayList<>(); - final String sql = getInsertSql(); - log.debug( - "{} sql: {}", - config.insertMode, - sql - ); - close(); - preparedStatement = connection.prepareStatement(sql); - preparedStatementBinder = dbDialect.statementBinder( - preparedStatement, - config.pkMode, - schemaPair, - fieldsMetadata, - config.insertMode - ); + boolean schemaChanged = false; + + if (!Objects.equals(keySchema, record.keySchema())) { + keySchema = record.keySchema(); + schemaChanged = true; } - final List flushed; - if (currentSchemaPair.equals(schemaPair)) { - // Continue with current batch state - records.add(record); - if (records.size() >= config.batchSize) { - flushed = flush(); - } else { - flushed = Collections.emptyList(); + if (schemaPair.valueSchema == null) { + // For deletes, both the value and value schema come in as null. + // We don't want to treat this as a schema change if key schemas is the same + // otherwise we flush unnecessarily. + if (config.deleteEnabled) { + batchContainsDeletes = true; + } + } else if (Objects.equals(valueSchema, record.valueSchema())) { + if (config.deleteEnabled && batchContainsDeletes) { + // flush so an insert after a delete of same record isn't lost + flushed.addAll(flush()); } } else { - // Each batch needs to have the same SchemaPair, so get the buffered records out, reset - // state and re-attempt the add - flushed = flush(); - currentSchemaPair = null; - flushed.addAll(add(record)); + valueSchema = record.valueSchema(); + schemaChanged = true; + } + + if (schemaChanged) { + // Each batch needs to have the same schemas, so get the buffered records out + flushed.addAll(flush()); + + onSchemaChanged(schemaPair); } + + records.add(record); + if (records.size() >= config.batchSize) { + log.debug("Flushing buffered records after exceeding configured batch size {}.", + config.batchSize); + flushed.addAll(flush()); + } + return flushed; } + private void onSchemaChanged(SchemaPair schemaPair) throws SQLException { + // re-initialize everything that depends on the record schema + fieldsMetadata = FieldsMetadata.extract( + tableId.tableName(), + config.pkMode, + config.pkFields, + config.fieldsWhitelist, + schemaPair + ); + dbStructure.createOrAmendIfNecessary( + config, + connection, + tableId, + fieldsMetadata + ); + + final String sql = getInsertSql(); + final String deleteSql = getDeleteSql(); + log.debug( + "{} sql: {}, DELETE sql: {}", + config.insertMode, + sql, + deleteSql + ); + close(); + preparedStatement = connection.prepareStatement(sql); + deletePreparedStatement = config.deleteEnabled ? connection.prepareStatement(deleteSql) : null; + preparedStatementBinder = dbDialect.statementBinder( + preparedStatement, + deletePreparedStatement, + config.pkMode, + schemaPair, + fieldsMetadata, + config.insertMode, + config + ); + } + public List flush() throws SQLException { if (records.isEmpty()) { return new ArrayList<>(); @@ -142,21 +175,39 @@ public List flush() throws SQLException { } totalUpdateCount += updateCount; } - if (totalUpdateCount != records.size() && !successNoInfo) { + int totalDeleteCount = 0; + if (deletePreparedStatement != null) { + for (int updateCount : deletePreparedStatement.executeBatch()) { + if (updateCount != Statement.SUCCESS_NO_INFO) { + totalDeleteCount += updateCount; + } + } + } + + checkAffectedRowCount(totalUpdateCount + totalDeleteCount, successNoInfo); + + final List flushedRecords = records; + records = new ArrayList<>(); + batchContainsDeletes = false; + return flushedRecords; + } + + private void checkAffectedRowCount(int totalCount, boolean successNoInfo) { + if (totalCount != records.size() && !successNoInfo) { switch (config.insertMode) { case INSERT: throw new ConnectException(String.format( - "Update count (%d) did not sum up to total number of records inserted (%d)", - totalUpdateCount, + "Row count (%d) did not sum up to total number of records inserted/deleted (%d)", + totalCount, records.size() )); case UPSERT: case UPDATE: - log.trace( - "{} records:{} resulting in in totalUpdateCount:{}", + log.debug( + "{}/deleted records:{} resulting in in totalUpdateCount:{}", config.insertMode, records.size(), - totalUpdateCount + totalCount ); break; default: @@ -170,17 +221,20 @@ public List flush() throws SQLException { records.size() ); } - - final List flushedRecords = records; - records = new ArrayList<>(); - return flushedRecords; } public void close() throws SQLException { + log.info("Closing BufferedRecords with preparedStatement: {} deletePreparedStatement: {}", + preparedStatement, + deletePreparedStatement); if (preparedStatement != null) { preparedStatement.close(); preparedStatement = null; } + if (deletePreparedStatement != null) { + deletePreparedStatement.close(); + deletePreparedStatement = null; + } } private String getInsertSql() { @@ -223,6 +277,30 @@ private String getInsertSql() { } } + private String getDeleteSql() { + String sql = null; + if (config.deleteEnabled) { + switch (config.pkMode) { + case NONE: + case KAFKA: + case RECORD_VALUE: + throw new ConnectException("Deletes are only supported for pk.mode record_key"); + case RECORD_KEY: + if (fieldsMetadata.keyFieldNames.isEmpty()) { + throw new ConnectException("Require primary keys to support delete"); + } + sql = dbDialect.buildDeleteStatement( + tableId, + asColumns(fieldsMetadata.keyFieldNames) + ); + break; + default: + break; + } + } + return sql; + } + private Collection asColumns(Collection names) { return names.stream() .map(name -> new ColumnId(tableId, name)) diff --git a/src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkConfig.java b/src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkConfig.java index cb3ada95c..037e5c356 100644 --- a/src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkConfig.java +++ b/src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkConfig.java @@ -98,6 +98,13 @@ public enum PrimaryKeyMode { + " table, when possible."; private static final String BATCH_SIZE_DISPLAY = "Batch Size"; + public static final String DELETE_ENABLED = "delete.enabled"; + private static final String DELETE_ENABLED_DEFAULT = "false"; + private static final String DELETE_ENABLED_DOC = + "Whether to treat ``null`` record values as deletes. Requires ``pk.mode`` to be" + + " ``record_key``."; + private static final String DELETE_ENABLED_DISPLAY = "Enable deletes"; + public static final String AUTO_CREATE = "auto.create"; private static final String AUTO_CREATE_DEFAULT = "false"; private static final String AUTO_CREATE_DOC = @@ -259,6 +266,17 @@ public enum PrimaryKeyMode { ConfigDef.Width.SHORT, BATCH_SIZE_DISPLAY ) + .define( + DELETE_ENABLED, + ConfigDef.Type.BOOLEAN, + DELETE_ENABLED_DEFAULT, + ConfigDef.Importance.MEDIUM, + DELETE_ENABLED_DOC, + WRITES_GROUP, + 3, + ConfigDef.Width.SHORT, + DELETE_ENABLED_DISPLAY + ) // Data Mapping .define( TABLE_NAME_FORMAT, @@ -356,6 +374,7 @@ public enum PrimaryKeyMode { public final String connectionPassword; public final String tableNameFormat; public final int batchSize; + public final boolean deleteEnabled; public final int maxRetries; public final int retryBackoffMs; public final boolean autoCreate; @@ -373,6 +392,7 @@ public JdbcSinkConfig(Map props) { connectionPassword = getPasswordValue(CONNECTION_PASSWORD); tableNameFormat = getString(TABLE_NAME_FORMAT).trim(); batchSize = getInt(BATCH_SIZE); + deleteEnabled = getBoolean(DELETE_ENABLED); maxRetries = getInt(MAX_RETRIES); retryBackoffMs = getInt(RETRY_BACKOFF_MS); autoCreate = getBoolean(AUTO_CREATE); diff --git a/src/main/java/io/confluent/connect/jdbc/sink/PreparedStatementBinder.java b/src/main/java/io/confluent/connect/jdbc/sink/PreparedStatementBinder.java index 9592ca041..53e412e84 100644 --- a/src/main/java/io/confluent/connect/jdbc/sink/PreparedStatementBinder.java +++ b/src/main/java/io/confluent/connect/jdbc/sink/PreparedStatementBinder.java @@ -34,25 +34,31 @@ public class PreparedStatementBinder implements StatementBinder { private final JdbcSinkConfig.PrimaryKeyMode pkMode; private final PreparedStatement statement; + private final PreparedStatement deleteStatement; private final SchemaPair schemaPair; private final FieldsMetadata fieldsMetadata; private final JdbcSinkConfig.InsertMode insertMode; + private final JdbcSinkConfig config; private final DatabaseDialect dialect; public PreparedStatementBinder( DatabaseDialect dialect, PreparedStatement statement, + PreparedStatement deleteStatement, JdbcSinkConfig.PrimaryKeyMode pkMode, SchemaPair schemaPair, FieldsMetadata fieldsMetadata, - JdbcSinkConfig.InsertMode insertMode + JdbcSinkConfig.InsertMode insertMode, + JdbcSinkConfig config ) { this.dialect = dialect; this.pkMode = pkMode; this.statement = statement; + this.deleteStatement = deleteStatement; this.schemaPair = schemaPair; this.fieldsMetadata = fieldsMetadata; this.insertMode = insertMode; + this.config = config; } @Override @@ -61,29 +67,40 @@ public void bindRecord(SinkRecord record) throws SQLException { // Assumption: the relevant SQL has placeholders for keyFieldNames first followed by // nonKeyFieldNames, in iteration order for all INSERT/ UPSERT queries + // the relevant SQL has placeholders for keyFieldNames, in iteration order + // for all DELETE queries // the relevant SQL has placeholders for nonKeyFieldNames first followed by // keyFieldNames, in iteration order for all UPDATE queries int index = 1; - switch (insertMode) { - case INSERT: - case UPSERT: - index = bindKeyFields(record, index); - bindNonKeyFields(record, valueStruct, index); - break; - - case UPDATE: - index = bindNonKeyFields(record, valueStruct, index); - bindKeyFields(record, index); - break; - default: - throw new AssertionError(); + if (valueStruct == null && config.deleteEnabled) { + bindKeyFields(record, index, deleteStatement); + deleteStatement.addBatch(); + } else { + switch (insertMode) { + case INSERT: + case UPSERT: + index = bindKeyFields(record, index, statement); + bindNonKeyFields(record, valueStruct, index, statement); + break; + + case UPDATE: + index = bindNonKeyFields(record, valueStruct, index, statement); + bindKeyFields(record, index, statement); + break; + default: + throw new AssertionError(); + } + statement.addBatch(); } - statement.addBatch(); } - protected int bindKeyFields(SinkRecord record, int index) throws SQLException { + protected int bindKeyFields( + SinkRecord record, + int index, + PreparedStatement statement + ) throws SQLException { switch (pkMode) { case NONE: if (!fieldsMetadata.keyFieldNames.isEmpty()) { @@ -93,20 +110,20 @@ protected int bindKeyFields(SinkRecord record, int index) throws SQLException { case KAFKA: { assert fieldsMetadata.keyFieldNames.size() == 3; - bindField(index++, Schema.STRING_SCHEMA, record.topic()); - bindField(index++, Schema.INT32_SCHEMA, record.kafkaPartition()); - bindField(index++, Schema.INT64_SCHEMA, record.kafkaOffset()); + bindField(statement, index++, Schema.STRING_SCHEMA, record.topic()); + bindField(statement, index++, Schema.INT32_SCHEMA, record.kafkaPartition()); + bindField(statement, index++, Schema.INT64_SCHEMA, record.kafkaOffset()); } break; case RECORD_KEY: { if (schemaPair.keySchema.type().isPrimitive()) { assert fieldsMetadata.keyFieldNames.size() == 1; - bindField(index++, schemaPair.keySchema, record.key()); + bindField(statement, index++, schemaPair.keySchema, record.key()); } else { for (String fieldName : fieldsMetadata.keyFieldNames) { final Field field = schemaPair.keySchema.field(fieldName); - bindField(index++, field.schema(), ((Struct) record.key()).get(field)); + bindField(statement, index++, field.schema(), ((Struct) record.key()).get(field)); } } } @@ -115,7 +132,7 @@ protected int bindKeyFields(SinkRecord record, int index) throws SQLException { case RECORD_VALUE: { for (String fieldName : fieldsMetadata.keyFieldNames) { final Field field = schemaPair.valueSchema.field(fieldName); - bindField(index++, field.schema(), ((Struct) record.value()).get(field)); + bindField(statement, index++, field.schema(), ((Struct) record.value()).get(field)); } } break; @@ -129,16 +146,22 @@ protected int bindKeyFields(SinkRecord record, int index) throws SQLException { protected int bindNonKeyFields( SinkRecord record, Struct valueStruct, - int index + int index, + PreparedStatement statement ) throws SQLException { for (final String fieldName : fieldsMetadata.nonKeyFieldNames) { final Field field = record.valueSchema().field(fieldName); - bindField(index++, field.schema(), valueStruct.get(field)); + bindField(statement, index++, field.schema(), valueStruct.get(field)); } return index; } - protected void bindField(int index, Schema schema, Object value) throws SQLException { + protected void bindField( + PreparedStatement statement, + int index, + Schema schema, + Object value + ) throws SQLException { dialect.bindField(statement, index, schema, value); } } diff --git a/src/test/java/io/confluent/connect/jdbc/dialect/Db2DatabaseDialectTest.java b/src/test/java/io/confluent/connect/jdbc/dialect/Db2DatabaseDialectTest.java index 94a62148d..594d835cd 100644 --- a/src/test/java/io/confluent/connect/jdbc/dialect/Db2DatabaseDialectTest.java +++ b/src/test/java/io/confluent/connect/jdbc/dialect/Db2DatabaseDialectTest.java @@ -217,4 +217,12 @@ public void upsertOnlyKeyCols() { actor, columns(actor, "actor_id"), columns(actor)); assertEquals(expected, sql); } + + @Test + public void shouldBuildDeleteStatement() { + String expected = "DELETE FROM \"myTable\" WHERE \"id1\" = ? AND \"id2\" = ?"; + String sql = dialect.buildDeleteStatement(tableId, pkColumns); + + assertEquals(expected, sql); + } } \ No newline at end of file diff --git a/src/test/java/io/confluent/connect/jdbc/dialect/DerbyDatabaseDialectTest.java b/src/test/java/io/confluent/connect/jdbc/dialect/DerbyDatabaseDialectTest.java index b616c10b4..4bb9bf36e 100644 --- a/src/test/java/io/confluent/connect/jdbc/dialect/DerbyDatabaseDialectTest.java +++ b/src/test/java/io/confluent/connect/jdbc/dialect/DerbyDatabaseDialectTest.java @@ -220,4 +220,12 @@ public void upsertOnlyKeyCols() { actor, columns(actor, "actor_id"), columns(actor)); assertEquals(expected, sql); } + + @Test + public void shouldBuildDeleteStatement() { + String expected = "DELETE FROM \"myTable\" WHERE \"id1\" = ? AND \"id2\" = ?"; + String sql = dialect.buildDeleteStatement(tableId, pkColumns); + + assertEquals(expected, sql); + } } \ No newline at end of file diff --git a/src/test/java/io/confluent/connect/jdbc/dialect/MySqlDatabaseDialectTest.java b/src/test/java/io/confluent/connect/jdbc/dialect/MySqlDatabaseDialectTest.java index 1817d5837..2e7862d3e 100644 --- a/src/test/java/io/confluent/connect/jdbc/dialect/MySqlDatabaseDialectTest.java +++ b/src/test/java/io/confluent/connect/jdbc/dialect/MySqlDatabaseDialectTest.java @@ -123,6 +123,14 @@ public void shouldBuildUpsertStatement() { assertEquals(expected, sql); } + @Test + public void shouldBuildDeleteStatement() { + String expected = "DELETE FROM `myTable` WHERE `id1` = ? AND `id2` = ?"; + String sql = dialect.buildDeleteStatement(tableId, pkColumns); + + assertEquals(expected, sql); + } + @Test public void createOneColNoPk() { verifyCreateOneColNoPk( diff --git a/src/test/java/io/confluent/connect/jdbc/dialect/OracleDatabaseDialectTest.java b/src/test/java/io/confluent/connect/jdbc/dialect/OracleDatabaseDialectTest.java index 14c8d9e4e..930726543 100644 --- a/src/test/java/io/confluent/connect/jdbc/dialect/OracleDatabaseDialectTest.java +++ b/src/test/java/io/confluent/connect/jdbc/dialect/OracleDatabaseDialectTest.java @@ -130,6 +130,14 @@ public void shouldBuildUpsertStatement() { assertEquals(expected, sql); } + @Test + public void shouldBuildDeleteStatement() { + String expected = "DELETE FROM \"myTable\" WHERE \"id1\" = ? AND \"id2\" = ?"; + String sql = dialect.buildDeleteStatement(tableId, pkColumns); + + assertEquals(expected, sql); + } + @Test public void createOneColNoPk() { verifyCreateOneColNoPk( diff --git a/src/test/java/io/confluent/connect/jdbc/dialect/PostgreSqlDatabaseDialectTest.java b/src/test/java/io/confluent/connect/jdbc/dialect/PostgreSqlDatabaseDialectTest.java index 19af3e1cf..3444ceb72 100644 --- a/src/test/java/io/confluent/connect/jdbc/dialect/PostgreSqlDatabaseDialectTest.java +++ b/src/test/java/io/confluent/connect/jdbc/dialect/PostgreSqlDatabaseDialectTest.java @@ -123,6 +123,14 @@ public void shouldBuildUpsertStatement() { assertEquals(expected, sql); } + @Test + public void shouldBuildDeleteStatement() { + String expected = "DELETE FROM \"myTable\" WHERE \"id1\" = ? AND \"id2\" = ?"; + String sql = dialect.buildDeleteStatement(tableId, pkColumns); + + assertEquals(expected, sql); + } + @Test public void createOneColNoPk() { verifyCreateOneColNoPk( diff --git a/src/test/java/io/confluent/connect/jdbc/dialect/SapHanaDatabaseDialectTest.java b/src/test/java/io/confluent/connect/jdbc/dialect/SapHanaDatabaseDialectTest.java index c9213b6ca..256f73657 100644 --- a/src/test/java/io/confluent/connect/jdbc/dialect/SapHanaDatabaseDialectTest.java +++ b/src/test/java/io/confluent/connect/jdbc/dialect/SapHanaDatabaseDialectTest.java @@ -124,6 +124,13 @@ public void shouldBuildUpsertStatement() { assertEquals(expected, sql); } + @Test + public void shouldBuildDeleteStatement() { + String expected = "DELETE FROM \"myTable\" WHERE \"id1\" = ? AND \"id2\" = ?"; + String sql = dialect.buildDeleteStatement(tableId, pkColumns); + + assertEquals(expected, sql); + } @Test public void createOneColNoPk() { diff --git a/src/test/java/io/confluent/connect/jdbc/dialect/SqlServerDatabaseDialectTest.java b/src/test/java/io/confluent/connect/jdbc/dialect/SqlServerDatabaseDialectTest.java index 62d94df8e..9ae283140 100644 --- a/src/test/java/io/confluent/connect/jdbc/dialect/SqlServerDatabaseDialectTest.java +++ b/src/test/java/io/confluent/connect/jdbc/dialect/SqlServerDatabaseDialectTest.java @@ -128,6 +128,14 @@ public void shouldBuildUpsertStatement() { assertEquals(expected, sql); } + @Test + public void shouldBuildDeleteStatement() { + String expected = "DELETE FROM [myTable] WHERE [id1] = ? AND [id2] = ?"; + String sql = dialect.buildDeleteStatement(tableId, pkColumns); + + assertEquals(expected, sql); + } + @Test public void createOneColNoPk() { verifyCreateOneColNoPk( diff --git a/src/test/java/io/confluent/connect/jdbc/dialect/SqliteDatabaseDialectTest.java b/src/test/java/io/confluent/connect/jdbc/dialect/SqliteDatabaseDialectTest.java index 5601e7fa6..eb7bc7636 100644 --- a/src/test/java/io/confluent/connect/jdbc/dialect/SqliteDatabaseDialectTest.java +++ b/src/test/java/io/confluent/connect/jdbc/dialect/SqliteDatabaseDialectTest.java @@ -142,6 +142,14 @@ public void shouldBuildUpsertStatement() { assertEquals(expected, sql); } + @Test + public void shouldBuildDeleteStatement() { + String expected = "DELETE FROM `myTable` WHERE `id1` = ? AND `id2` = ?"; + String sql = dialect.buildDeleteStatement(tableId, pkColumns); + + assertEquals(expected, sql); + } + @Test public void createOneColNoPk() { verifyCreateOneColNoPk( diff --git a/src/test/java/io/confluent/connect/jdbc/dialect/SybaseDatabaseDialectTest.java b/src/test/java/io/confluent/connect/jdbc/dialect/SybaseDatabaseDialectTest.java index d11376d11..722d15d3c 100644 --- a/src/test/java/io/confluent/connect/jdbc/dialect/SybaseDatabaseDialectTest.java +++ b/src/test/java/io/confluent/connect/jdbc/dialect/SybaseDatabaseDialectTest.java @@ -158,6 +158,14 @@ public void shouldBuildUpsertStatement() { assertEquals(expected, sql); } + @Test + public void shouldBuildDeleteStatement() { + String expected = "DELETE FROM \"myTable\" WHERE \"id1\" = ? AND \"id2\" = ?"; + String sql = dialect.buildDeleteStatement(tableId, pkColumns); + + assertEquals(expected, sql); + } + @Test public void createOneColNoPk() { verifyCreateOneColNoPk( diff --git a/src/test/java/io/confluent/connect/jdbc/dialect/VerticaDatabaseDialectTest.java b/src/test/java/io/confluent/connect/jdbc/dialect/VerticaDatabaseDialectTest.java index 00df56ee1..1dbb8b190 100644 --- a/src/test/java/io/confluent/connect/jdbc/dialect/VerticaDatabaseDialectTest.java +++ b/src/test/java/io/confluent/connect/jdbc/dialect/VerticaDatabaseDialectTest.java @@ -114,6 +114,14 @@ public void shouldBuildAlterTableStatement() { assertStatements(sql, statements); } + @Test + public void shouldBuildDeleteStatement() { + String expected = "DELETE FROM \"myTable\" WHERE \"id1\" = ? AND \"id2\" = ?"; + String sql = dialect.buildDeleteStatement(tableId, pkColumns); + + assertEquals(expected, sql); + } + @Test(expected = UnsupportedOperationException.class) public void shouldBuildUpsertStatement() { dialect.buildUpsertQueryStatement(tableId, pkColumns, columnsAtoD); diff --git a/src/test/java/io/confluent/connect/jdbc/sink/BufferedRecordsTest.java b/src/test/java/io/confluent/connect/jdbc/sink/BufferedRecordsTest.java index 417758bac..5e9765810 100644 --- a/src/test/java/io/confluent/connect/jdbc/sink/BufferedRecordsTest.java +++ b/src/test/java/io/confluent/connect/jdbc/sink/BufferedRecordsTest.java @@ -58,6 +58,126 @@ public void tearDown() throws IOException, SQLException { sqliteHelper.tearDown(); } + @Test + public void insertThenDeleteInBatchNoFlush() throws SQLException { + final HashMap props = new HashMap<>(); + props.put("connection.url", sqliteHelper.sqliteUri()); + props.put("auto.create", true); + props.put("auto.evolve", true); + props.put("delete.enabled", true); + props.put("insert.mode", "upsert"); + props.put("pk.mode", "record_key"); + props.put("batch.size", 1000); // sufficiently high to not cause flushes due to buffer being full + final JdbcSinkConfig config = new JdbcSinkConfig(props); + + final String url = sqliteHelper.sqliteUri(); + final DatabaseDialect dbDialect = DatabaseDialects.findBestFor(url, config); + final DbStructure dbStructure = new DbStructure(dbDialect); + + final TableId tableId = new TableId(null, null, "dummy"); + final BufferedRecords buffer = new BufferedRecords(config, tableId, dbDialect, dbStructure, sqliteHelper.connection); + + final Schema keySchemaA = SchemaBuilder.struct() + .field("id", Schema.INT64_SCHEMA) + .build(); + final Schema valueSchemaA = SchemaBuilder.struct() + .field("name", Schema.STRING_SCHEMA) + .build(); + final Struct keyA = new Struct(keySchemaA) + .put("id", 1234L); + final Struct valueA = new Struct(valueSchemaA) + .put("name", "cuba"); + final SinkRecord recordA = new SinkRecord("dummy", 0, keySchemaA, keyA, valueSchemaA, valueA, 0); + final SinkRecord recordADelete = new SinkRecord("dummy", 0, keySchemaA, keyA, null, null, 0); + + final Schema schemaB = SchemaBuilder.struct() + .field("name", Schema.STRING_SCHEMA) + .field("age", Schema.OPTIONAL_INT32_SCHEMA) + .build(); + final Struct valueB = new Struct(schemaB) + .put("name", "cuba") + .put("age", 4); + final SinkRecord recordB = new SinkRecord("dummy", 1, keySchemaA, keyA, schemaB, valueB, 1); + + // test records are batched correctly based on schema equality as records are added + // (schemaA,schemaA,schemaA,schemaB,schemaA) -> ([schemaA,schemaA,schemaA],[schemaB],[schemaA]) + + assertEquals(Collections.emptyList(), buffer.add(recordA)); + assertEquals(Collections.emptyList(), buffer.add(recordA)); + + // delete should not cause a flush (i.e. not treated as a schema change) + assertEquals(Collections.emptyList(), buffer.add(recordADelete)); + + assertEquals(Arrays.asList(recordA, recordA, recordADelete), buffer.add(recordB)); + + assertEquals(Collections.singletonList(recordB), buffer.add(recordA)); + + assertEquals(Collections.singletonList(recordA), buffer.flush()); + } + + @Test + public void insertThenDeleteThenInsertInBatchFlush() throws SQLException { + + + final HashMap props = new HashMap<>(); + props.put("connection.url", sqliteHelper.sqliteUri()); + props.put("auto.create", true); + props.put("auto.evolve", true); + props.put("delete.enabled", true); + props.put("insert.mode", "upsert"); + props.put("pk.mode", "record_key"); + props.put("batch.size", 1000); // sufficiently high to not cause flushes due to buffer being full + final JdbcSinkConfig config = new JdbcSinkConfig(props); + + final String url = sqliteHelper.sqliteUri(); + final DatabaseDialect dbDialect = DatabaseDialects.findBestFor(url, config); + final DbStructure dbStructure = new DbStructure(dbDialect); + + final TableId tableId = new TableId(null, null, "dummy"); + final BufferedRecords buffer = new BufferedRecords(config, tableId, dbDialect, dbStructure, sqliteHelper.connection); + + final Schema keySchemaA = SchemaBuilder.struct() + .field("id", Schema.INT64_SCHEMA) + .build(); + final Schema valueSchemaA = SchemaBuilder.struct() + .field("name", Schema.STRING_SCHEMA) + .build(); + final Struct keyA = new Struct(keySchemaA) + .put("id", 1234L); + final Struct valueA = new Struct(valueSchemaA) + .put("name", "cuba"); + final SinkRecord recordA = new SinkRecord("dummy", 0, keySchemaA, keyA, valueSchemaA, valueA, 0); + final SinkRecord recordADelete = new SinkRecord("dummy", 0, keySchemaA, keyA, null, null, 0); + + final Schema schemaB = SchemaBuilder.struct() + .field("name", Schema.STRING_SCHEMA) + .field("age", Schema.OPTIONAL_INT32_SCHEMA) + .build(); + final Struct valueB = new Struct(schemaB) + .put("name", "cuba") + .put("age", 4); + final SinkRecord recordB = new SinkRecord("dummy", 1, keySchemaA, keyA, schemaB, valueB, 1); + + // test records are batched correctly based on schema equality as records are added + // (schemaA,schemaA,schemaA,schemaB,schemaA) -> ([schemaA,schemaA,schemaA],[schemaB],[schemaA]) + + assertEquals(Collections.emptyList(), buffer.add(recordA)); + assertEquals(Collections.emptyList(), buffer.add(recordA)); + + // delete should not cause a flush (i.e. not treated as a schema change) + assertEquals(Collections.emptyList(), buffer.add(recordADelete)); + + // insert after default should flush to insure insert isn't lost in batching + assertEquals(Arrays.asList(recordA, recordA, recordADelete), buffer.add(recordA)); + + assertEquals(Collections.singletonList(recordA), buffer.add(recordB)); + + assertEquals(Collections.singletonList(recordB), buffer.add(recordA)); + + assertEquals(Collections.singletonList(recordA), buffer.flush()); + } + + @Test public void correctBatching() throws SQLException { final HashMap props = new HashMap<>(); diff --git a/src/test/java/io/confluent/connect/jdbc/sink/JdbcDbWriterTest.java b/src/test/java/io/confluent/connect/jdbc/sink/JdbcDbWriterTest.java index aea8a038c..c4d0de12e 100644 --- a/src/test/java/io/confluent/connect/jdbc/sink/JdbcDbWriterTest.java +++ b/src/test/java/io/confluent/connect/jdbc/sink/JdbcDbWriterTest.java @@ -74,6 +74,99 @@ private JdbcDbWriter newWriter(Map props) { return new JdbcDbWriter(config, dialect, dbStructure); } + @Test + public void idempotentDeletes() throws SQLException { + String topic = "books"; + int partition = 7; + long offset = 42; + + Map props = new HashMap<>(); + props.put("connection.url", sqliteHelper.sqliteUri()); + props.put("auto.create", "true"); + props.put("delete.enabled", "true"); + props.put("pk.mode", "record_key"); + props.put("insert.mode", "upsert"); + + writer = newWriter(props); + + Schema keySchema = SchemaBuilder.struct() + .field("id", SchemaBuilder.INT64_SCHEMA); + + Struct keyStruct = new Struct(keySchema).put("id", 0L); + + Schema valueSchema = SchemaBuilder.struct() + .field("author", Schema.STRING_SCHEMA) + .field("title", Schema.STRING_SCHEMA) + .build(); + + Struct valueStruct = new Struct(valueSchema) + .put("author", "Tom Robbins") + .put("title", "Villa Incognito"); + + SinkRecord record = new SinkRecord(topic, partition, keySchema, keyStruct, valueSchema, valueStruct, offset); + + writer.write(Collections.nCopies(2, record)); + + SinkRecord deleteRecord = new SinkRecord(topic, partition, keySchema, keyStruct, null, null, offset); + writer.write(Collections.nCopies(2, deleteRecord)); + + assertEquals( + 1, + sqliteHelper.select("select count(*) from books", new SqliteHelper.ResultSetReadCallback() { + @Override + public void read(ResultSet rs) throws SQLException { + assertEquals(0, rs.getInt(1)); + } + }) + ); + } + + @Test + public void insertDeleteInsertSameRecord() throws SQLException { + String topic = "books"; + int partition = 7; + long offset = 42; + + Map props = new HashMap<>(); + props.put("connection.url", sqliteHelper.sqliteUri()); + props.put("auto.create", "true"); + props.put("delete.enabled", "true"); + props.put("pk.mode", "record_key"); + props.put("insert.mode", "upsert"); + + writer = newWriter(props); + + Schema keySchema = SchemaBuilder.struct() + .field("id", SchemaBuilder.INT64_SCHEMA); + + Struct keyStruct = new Struct(keySchema).put("id", 0L); + + Schema valueSchema = SchemaBuilder.struct() + .field("author", Schema.STRING_SCHEMA) + .field("title", Schema.STRING_SCHEMA) + .build(); + + Struct valueStruct = new Struct(valueSchema) + .put("author", "Tom Robbins") + .put("title", "Villa Incognito"); + + SinkRecord record = new SinkRecord(topic, partition, keySchema, keyStruct, valueSchema, valueStruct, offset); + SinkRecord deleteRecord = new SinkRecord(topic, partition, keySchema, keyStruct, null, null, offset); + writer.write(Collections.singletonList(record)); + writer.write(Collections.singletonList(deleteRecord)); + writer.write(Collections.singletonList(record)); + + assertEquals( + 1, + sqliteHelper.select("select count(*) from books", new SqliteHelper.ResultSetReadCallback() { + @Override + public void read(ResultSet rs) throws SQLException { + assertEquals(1, rs.getInt(1)); + } + }) + ); + } + @Test public void autoCreateWithAutoEvolve() throws SQLException { String topic = "books"; diff --git a/src/test/java/io/confluent/connect/jdbc/sink/PreparedStatementBinderTest.java b/src/test/java/io/confluent/connect/jdbc/sink/PreparedStatementBinderTest.java index 62903c671..478fd63dc 100644 --- a/src/test/java/io/confluent/connect/jdbc/sink/PreparedStatementBinderTest.java +++ b/src/test/java/io/confluent/connect/jdbc/sink/PreparedStatementBinderTest.java @@ -55,6 +55,7 @@ public class PreparedStatementBinderTest { private DatabaseDialect dialect; + private JdbcSinkConfig config; @Before public void beforeEach() { @@ -64,6 +65,7 @@ public void beforeEach() { props.put(JdbcSinkConfig.CONNECTION_PASSWORD, "password"); JdbcSinkConfig config = new JdbcSinkConfig(props); dialect = new GenericDatabaseDialect(config); + this.config = config; } @Test @@ -115,10 +117,12 @@ public void bindRecordInsert() throws SQLException, ParseException { PreparedStatementBinder binder = new PreparedStatementBinder( dialect, statement, + null, pkMode, schemaPair, fieldsMetadata, - JdbcSinkConfig.InsertMode.INSERT + JdbcSinkConfig.InsertMode.INSERT, + config ); binder.bindRecord(new SinkRecord("topic", 0, null, null, valueSchema, valueStruct, 0)); @@ -168,9 +172,11 @@ public void bindRecordUpsertMode() throws SQLException, ParseException { PreparedStatementBinder binder = new PreparedStatementBinder( dialect, statement, + null, pkMode, schemaPair, - fieldsMetadata, JdbcSinkConfig.InsertMode.UPSERT + fieldsMetadata, JdbcSinkConfig.InsertMode.UPSERT, + config ); binder.bindRecord(new SinkRecord("topic", 0, null, null, valueSchema, valueStruct, 0)); @@ -207,9 +213,11 @@ public void bindRecordUpdateMode() throws SQLException, ParseException { PreparedStatementBinder binder = new PreparedStatementBinder( dialect, statement, + null, pkMode, schemaPair, - fieldsMetadata, JdbcSinkConfig.InsertMode.UPDATE + fieldsMetadata, JdbcSinkConfig.InsertMode.UPDATE, + config ); binder.bindRecord(new SinkRecord("topic", 0, null, null, valueSchema, valueStruct, 0));