Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,20 @@ String buildUpdateStatement(
Collection<ColumnId> nonKeyColumns
);

/**
* Build the DELETE prepared statement expression for the given table and its columns. Variables
* for each key column should also appear in the WHERE clause of the statement.
*
* @param table the identifier of the table; may not be null
* @param keyColumns the identifiers of the columns in the primary/unique key; may not be null
* but may be empty
* @return the DELETE statement; may not be null
*/
String buildDeleteStatement(
TableId table,
Collection<ColumnId> keyColumns
);

/**
* Build the UPSERT or MERGE prepared statement expression to either insert a new record into the
* given table or update an existing record in that table Variables for each key column should
Expand Down Expand Up @@ -401,10 +415,12 @@ String buildUpsertQueryStatement(
*/
StatementBinder statementBinder(
PreparedStatement statement,
PreparedStatement deleteStatement,
JdbcSinkConfig.PrimaryKeyMode pkMode,
SchemaPair schemaPair,
FieldsMetadata fieldsMetadata,
JdbcSinkConfig.InsertMode insertMode
JdbcSinkConfig.InsertMode insertMode,
JdbcSinkConfig config
);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1348,6 +1348,23 @@ public String buildInsertStatement(
return builder.toString();
}

public String buildDeleteStatement(
TableId table,
Collection<ColumnId> keyColumns
) {
ExpressionBuilder builder = expressionBuilder();
builder.append("DELETE FROM ");
builder.append(table);
if (!keyColumns.isEmpty()) {
builder.append(" WHERE ");
builder.appendList()
.delimitedBy(" AND ")
.transformedBy(ExpressionBuilder.columnNamesWith(" = ?"))
.of(keyColumns);
}
return builder.toString();
}

@Override
public String buildUpdateStatement(
TableId table,
Expand Down Expand Up @@ -1384,18 +1401,22 @@ public String buildUpsertQueryStatement(
@Override
public StatementBinder statementBinder(
PreparedStatement statement,
PreparedStatement deleteStatement,
PrimaryKeyMode pkMode,
SchemaPair schemaPair,
FieldsMetadata fieldsMetadata,
InsertMode insertMode
InsertMode insertMode,
JdbcSinkConfig config
) {
return new PreparedStatementBinder(
this,
statement,
deleteStatement,
pkMode,
schemaPair,
fieldsMetadata,
insertMode
insertMode,
config
);
}

Expand Down
190 changes: 134 additions & 56 deletions src/main/java/io/confluent/connect/jdbc/sink/BufferedRecords.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.confluent.connect.jdbc.sink;

import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
Expand All @@ -27,8 +28,8 @@
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

import io.confluent.connect.jdbc.dialect.DatabaseDialect;
Expand All @@ -48,11 +49,15 @@ public class BufferedRecords {
private final Connection connection;

private List<SinkRecord> records = new ArrayList<>();
private SchemaPair currentSchemaPair;
private FieldsMetadata fieldsMetadata;
private PreparedStatement preparedStatement;
private StatementBinder preparedStatementBinder;

private Schema keySchema;
private Schema valueSchema;
private boolean batchContainsDeletes = false;
private PreparedStatement deletePreparedStatement;

public BufferedRecords(
JdbcSinkConfig config,
TableId tableId,
Expand All @@ -73,59 +78,87 @@ public List<SinkRecord> add(SinkRecord record) throws SQLException {
record.valueSchema()
);

if (currentSchemaPair == null) {
currentSchemaPair = schemaPair;
// re-initialize everything that depends on the record schema
fieldsMetadata = FieldsMetadata.extract(
tableId.tableName(),
config.pkMode,
config.pkFields,
config.fieldsWhitelist,
currentSchemaPair
);
dbStructure.createOrAmendIfNecessary(
config,
connection,
tableId,
fieldsMetadata
);
List<SinkRecord> flushed = new ArrayList<>();

final String sql = getInsertSql();
log.debug(
"{} sql: {}",
config.insertMode,
sql
);
close();
preparedStatement = connection.prepareStatement(sql);
preparedStatementBinder = dbDialect.statementBinder(
preparedStatement,
config.pkMode,
schemaPair,
fieldsMetadata,
config.insertMode
);
boolean schemaChanged = false;

if (!Objects.equals(keySchema, record.keySchema())) {
keySchema = record.keySchema();
schemaChanged = true;
}

final List<SinkRecord> flushed;
if (currentSchemaPair.equals(schemaPair)) {
// Continue with current batch state
records.add(record);
if (records.size() >= config.batchSize) {
flushed = flush();
} else {
flushed = Collections.emptyList();
if (schemaPair.valueSchema == null) {
// For deletes, both the value and value schema come in as null.
// We don't want to treat this as a schema change if key schemas is the same
// otherwise we flush unnecessarily.
if (config.deleteEnabled) {
batchContainsDeletes = true;
}
} else if (Objects.equals(valueSchema, record.valueSchema())) {
if (config.deleteEnabled && batchContainsDeletes) {
// flush so an insert after a delete of same record isn't lost
flushed.addAll(flush());
}
} else {
// Each batch needs to have the same SchemaPair, so get the buffered records out, reset
// state and re-attempt the add
flushed = flush();
currentSchemaPair = null;
flushed.addAll(add(record));
valueSchema = record.valueSchema();
schemaChanged = true;
}

if (schemaChanged) {
// Each batch needs to have the same schemas, so get the buffered records out
flushed.addAll(flush());

onSchemaChanged(schemaPair);
}

records.add(record);
if (records.size() >= config.batchSize) {
log.debug("Flushing buffered records after exceeding configured batch size {}.",
config.batchSize);
flushed.addAll(flush());
}

return flushed;
}

private void onSchemaChanged(SchemaPair schemaPair) throws SQLException {
// re-initialize everything that depends on the record schema
fieldsMetadata = FieldsMetadata.extract(
tableId.tableName(),
config.pkMode,
config.pkFields,
config.fieldsWhitelist,
schemaPair
);
dbStructure.createOrAmendIfNecessary(
config,
connection,
tableId,
fieldsMetadata
);

final String sql = getInsertSql();
final String deleteSql = getDeleteSql();
log.debug(
"{} sql: {}, DELETE sql: {}",
config.insertMode,
sql,
deleteSql
);
close();
preparedStatement = connection.prepareStatement(sql);
deletePreparedStatement = config.deleteEnabled ? connection.prepareStatement(deleteSql) : null;
preparedStatementBinder = dbDialect.statementBinder(
preparedStatement,
deletePreparedStatement,
config.pkMode,
schemaPair,
fieldsMetadata,
config.insertMode,
config
);
}

public List<SinkRecord> flush() throws SQLException {
if (records.isEmpty()) {
return new ArrayList<>();
Expand All @@ -142,21 +175,39 @@ public List<SinkRecord> flush() throws SQLException {
}
totalUpdateCount += updateCount;
}
if (totalUpdateCount != records.size() && !successNoInfo) {
int totalDeleteCount = 0;
if (deletePreparedStatement != null) {
for (int updateCount : deletePreparedStatement.executeBatch()) {
if (updateCount != Statement.SUCCESS_NO_INFO) {
totalDeleteCount += updateCount;
}
}
}

checkAffectedRowCount(totalUpdateCount + totalDeleteCount, successNoInfo);

final List<SinkRecord> flushedRecords = records;
records = new ArrayList<>();
batchContainsDeletes = false;
return flushedRecords;
}

private void checkAffectedRowCount(int totalCount, boolean successNoInfo) {
if (totalCount != records.size() && !successNoInfo) {
switch (config.insertMode) {
case INSERT:
throw new ConnectException(String.format(
"Update count (%d) did not sum up to total number of records inserted (%d)",
totalUpdateCount,
"Row count (%d) did not sum up to total number of records inserted/deleted (%d)",
totalCount,
records.size()
));
case UPSERT:
case UPDATE:
log.trace(
"{} records:{} resulting in in totalUpdateCount:{}",
log.debug(
"{}/deleted records:{} resulting in in totalUpdateCount:{}",
config.insertMode,
records.size(),
totalUpdateCount
totalCount
);
break;
default:
Expand All @@ -170,17 +221,20 @@ public List<SinkRecord> flush() throws SQLException {
records.size()
);
}

final List<SinkRecord> flushedRecords = records;
records = new ArrayList<>();
return flushedRecords;
}

public void close() throws SQLException {
log.info("Closing BufferedRecords with preparedStatement: {} deletePreparedStatement: {}",
preparedStatement,
deletePreparedStatement);
if (preparedStatement != null) {
preparedStatement.close();
preparedStatement = null;
}
if (deletePreparedStatement != null) {
deletePreparedStatement.close();
deletePreparedStatement = null;
}
}

private String getInsertSql() {
Expand Down Expand Up @@ -223,6 +277,30 @@ private String getInsertSql() {
}
}

private String getDeleteSql() {
String sql = null;
if (config.deleteEnabled) {
switch (config.pkMode) {
case NONE:
case KAFKA:
case RECORD_VALUE:
throw new ConnectException("Deletes are only supported for pk.mode record_key");
case RECORD_KEY:
if (fieldsMetadata.keyFieldNames.isEmpty()) {
throw new ConnectException("Require primary keys to support delete");
}
sql = dbDialect.buildDeleteStatement(
tableId,
asColumns(fieldsMetadata.keyFieldNames)
);
break;
default:
break;
}
}
return sql;
}

private Collection<ColumnId> asColumns(Collection<String> names) {
return names.stream()
.map(name -> new ColumnId(tableId, name))
Expand Down
Loading