1616
1717package io .confluent .connect .jdbc .sink ;
1818
19+ import org .apache .kafka .connect .data .Schema ;
1920import org .apache .kafka .connect .errors .ConnectException ;
2021import org .apache .kafka .connect .sink .SinkRecord ;
2122import org .slf4j .Logger ;
2728import java .sql .Statement ;
2829import java .util .ArrayList ;
2930import java .util .Collection ;
30- import java .util .Collections ;
3131import java .util .List ;
32+ import java .util .Objects ;
3233import java .util .stream .Collectors ;
3334
3435import io .confluent .connect .jdbc .dialect .DatabaseDialect ;
@@ -48,11 +49,15 @@ public class BufferedRecords {
4849 private final Connection connection ;
4950
5051 private List <SinkRecord > records = new ArrayList <>();
51- private SchemaPair currentSchemaPair ;
5252 private FieldsMetadata fieldsMetadata ;
5353 private PreparedStatement preparedStatement ;
5454 private StatementBinder preparedStatementBinder ;
5555
56+ private Schema keySchema ;
57+ private Schema valueSchema ;
58+ private boolean batchContainsDeletes = false ;
59+ private PreparedStatement deletePreparedStatement ;
60+
5661 public BufferedRecords (
5762 JdbcSinkConfig config ,
5863 TableId tableId ,
@@ -73,59 +78,87 @@ public List<SinkRecord> add(SinkRecord record) throws SQLException {
7378 record .valueSchema ()
7479 );
7580
76- if (currentSchemaPair == null ) {
77- currentSchemaPair = schemaPair ;
78- // re-initialize everything that depends on the record schema
79- fieldsMetadata = FieldsMetadata .extract (
80- tableId .tableName (),
81- config .pkMode ,
82- config .pkFields ,
83- config .fieldsWhitelist ,
84- currentSchemaPair
85- );
86- dbStructure .createOrAmendIfNecessary (
87- config ,
88- connection ,
89- tableId ,
90- fieldsMetadata
91- );
81+ List <SinkRecord > flushed = new ArrayList <>();
9282
93- final String sql = getInsertSql ();
94- log .debug (
95- "{} sql: {}" ,
96- config .insertMode ,
97- sql
98- );
99- close ();
100- preparedStatement = connection .prepareStatement (sql );
101- preparedStatementBinder = dbDialect .statementBinder (
102- preparedStatement ,
103- config .pkMode ,
104- schemaPair ,
105- fieldsMetadata ,
106- config .insertMode
107- );
83+ boolean schemaChanged = false ;
84+
85+ if (!Objects .equals (keySchema , record .keySchema ())) {
86+ keySchema = record .keySchema ();
87+ schemaChanged = true ;
10888 }
10989
110- final List <SinkRecord > flushed ;
111- if (currentSchemaPair .equals (schemaPair )) {
112- // Continue with current batch state
113- records .add (record );
114- if (records .size () >= config .batchSize ) {
115- flushed = flush ();
116- } else {
117- flushed = Collections .emptyList ();
90+ if (schemaPair .valueSchema == null ) {
91+ // For deletes, both the value and value schema come in as null.
92+ // We don't want to treat this as a schema change if key schemas is the same
93+ // otherwise we flush unnecessarily.
94+ if (config .deleteEnabled ) {
95+ batchContainsDeletes = true ;
96+ }
97+ } else if (Objects .equals (valueSchema , record .valueSchema ())) {
98+ if (config .deleteEnabled && batchContainsDeletes ) {
99+ // flush so an insert after a delete of same record isn't lost
100+ flushed .addAll (flush ());
118101 }
119102 } else {
120- // Each batch needs to have the same SchemaPair, so get the buffered records out, reset
121- // state and re-attempt the add
122- flushed = flush ();
123- currentSchemaPair = null ;
124- flushed .addAll (add (record ));
103+ valueSchema = record .valueSchema ();
104+ schemaChanged = true ;
105+ }
106+
107+ if (schemaChanged ) {
108+ // Each batch needs to have the same schemas, so get the buffered records out
109+ flushed .addAll (flush ());
110+
111+ onSchemaChanged (schemaPair );
125112 }
113+
114+ records .add (record );
115+ if (records .size () >= config .batchSize ) {
116+ log .debug ("Flushing buffered records after exceeding configured batch size {}." ,
117+ config .batchSize );
118+ flushed .addAll (flush ());
119+ }
120+
126121 return flushed ;
127122 }
128123
124+ private void onSchemaChanged (SchemaPair schemaPair ) throws SQLException {
125+ // re-initialize everything that depends on the record schema
126+ fieldsMetadata = FieldsMetadata .extract (
127+ tableId .tableName (),
128+ config .pkMode ,
129+ config .pkFields ,
130+ config .fieldsWhitelist ,
131+ schemaPair
132+ );
133+ dbStructure .createOrAmendIfNecessary (
134+ config ,
135+ connection ,
136+ tableId ,
137+ fieldsMetadata
138+ );
139+
140+ final String sql = getInsertSql ();
141+ final String deleteSql = getDeleteSql ();
142+ log .debug (
143+ "{} sql: {}, DELETE sql: {}" ,
144+ config .insertMode ,
145+ sql ,
146+ deleteSql
147+ );
148+ close ();
149+ preparedStatement = connection .prepareStatement (sql );
150+ deletePreparedStatement = config .deleteEnabled ? connection .prepareStatement (deleteSql ) : null ;
151+ preparedStatementBinder = dbDialect .statementBinder (
152+ preparedStatement ,
153+ deletePreparedStatement ,
154+ config .pkMode ,
155+ schemaPair ,
156+ fieldsMetadata ,
157+ config .insertMode ,
158+ config
159+ );
160+ }
161+
129162 public List <SinkRecord > flush () throws SQLException {
130163 if (records .isEmpty ()) {
131164 return new ArrayList <>();
@@ -142,21 +175,39 @@ public List<SinkRecord> flush() throws SQLException {
142175 }
143176 totalUpdateCount += updateCount ;
144177 }
145- if (totalUpdateCount != records .size () && !successNoInfo ) {
178+ int totalDeleteCount = 0 ;
179+ if (deletePreparedStatement != null ) {
180+ for (int updateCount : deletePreparedStatement .executeBatch ()) {
181+ if (updateCount != Statement .SUCCESS_NO_INFO ) {
182+ totalDeleteCount += updateCount ;
183+ }
184+ }
185+ }
186+
187+ checkAffectedRowCount (totalUpdateCount + totalDeleteCount , successNoInfo );
188+
189+ final List <SinkRecord > flushedRecords = records ;
190+ records = new ArrayList <>();
191+ batchContainsDeletes = false ;
192+ return flushedRecords ;
193+ }
194+
195+ private void checkAffectedRowCount (int totalCount , boolean successNoInfo ) {
196+ if (totalCount != records .size () && !successNoInfo ) {
146197 switch (config .insertMode ) {
147198 case INSERT :
148199 throw new ConnectException (String .format (
149- "Update count (%d) did not sum up to total number of records inserted (%d)" ,
150- totalUpdateCount ,
200+ "Row count (%d) did not sum up to total number of records inserted/deleted (%d)" ,
201+ totalCount ,
151202 records .size ()
152203 ));
153204 case UPSERT :
154205 case UPDATE :
155- log .trace (
156- "{} records:{} resulting in in totalUpdateCount:{}" ,
206+ log .debug (
207+ "{}/deleted records:{} resulting in in totalUpdateCount:{}" ,
157208 config .insertMode ,
158209 records .size (),
159- totalUpdateCount
210+ totalCount
160211 );
161212 break ;
162213 default :
@@ -170,17 +221,20 @@ public List<SinkRecord> flush() throws SQLException {
170221 records .size ()
171222 );
172223 }
173-
174- final List <SinkRecord > flushedRecords = records ;
175- records = new ArrayList <>();
176- return flushedRecords ;
177224 }
178225
179226 public void close () throws SQLException {
227+ log .info ("Closing BufferedRecords with preparedStatement: {} deletePreparedStatement: {}" ,
228+ preparedStatement ,
229+ deletePreparedStatement );
180230 if (preparedStatement != null ) {
181231 preparedStatement .close ();
182232 preparedStatement = null ;
183233 }
234+ if (deletePreparedStatement != null ) {
235+ deletePreparedStatement .close ();
236+ deletePreparedStatement = null ;
237+ }
184238 }
185239
186240 private String getInsertSql () {
@@ -223,6 +277,30 @@ private String getInsertSql() {
223277 }
224278 }
225279
280+ private String getDeleteSql () {
281+ String sql = null ;
282+ if (config .deleteEnabled ) {
283+ switch (config .pkMode ) {
284+ case NONE :
285+ case KAFKA :
286+ case RECORD_VALUE :
287+ throw new ConnectException ("Deletes are only supported for pk.mode record_key" );
288+ case RECORD_KEY :
289+ if (fieldsMetadata .keyFieldNames .isEmpty ()) {
290+ throw new ConnectException ("Require primary keys to support delete" );
291+ }
292+ sql = dbDialect .buildDeleteStatement (
293+ tableId ,
294+ asColumns (fieldsMetadata .keyFieldNames )
295+ );
296+ break ;
297+ default :
298+ break ;
299+ }
300+ }
301+ return sql ;
302+ }
303+
226304 private Collection <ColumnId > asColumns (Collection <String > names ) {
227305 return names .stream ()
228306 .map (name -> new ColumnId (tableId , name ))
0 commit comments