@@ -182,6 +182,40 @@ class TransactionCoordinatorTest {
182182 assertEquals(Errors .NONE , result.error)
183183 }
184184
185+ @ Test
186+ def shouldGenerateNewProducerIdIfEpochsExhaustedV2 (): Unit = {
187+ initPidGenericMocks(transactionalId)
188+
189+ val txnMetadata1 = new TransactionMetadata (transactionalId, producerId, producerId, RecordBatch .NO_PRODUCER_ID , (Short .MaxValue - 1 ).toShort,
190+ (Short .MaxValue - 2 ).toShort, txnTimeoutMs, Ongoing , mutable.Set .empty, time.milliseconds(), time.milliseconds(), TV_2 )
191+ // We start with txnMetadata1 so we can transform the metadata to PrepareCommit.
192+ val txnMetadata2 = new TransactionMetadata (transactionalId, producerId, producerId, RecordBatch .NO_PRODUCER_ID , (Short .MaxValue - 1 ).toShort,
193+ (Short .MaxValue - 2 ).toShort, txnTimeoutMs, Ongoing , mutable.Set .empty, time.milliseconds(), time.milliseconds(), TV_2 )
194+ val transitMetadata = txnMetadata2.prepareAbortOrCommit(PrepareCommit , TV_2 , producerId2, time.milliseconds(), false )
195+ txnMetadata2.completeTransitionTo(transitMetadata)
196+
197+ assertEquals(producerId, txnMetadata2.producerId)
198+ assertEquals(Short .MaxValue , txnMetadata2.producerEpoch)
199+
200+ when(transactionManager.getTransactionState(ArgumentMatchers .eq(transactionalId)))
201+ .thenReturn(Right (Some (CoordinatorEpochAndTxnMetadata (coordinatorEpoch, txnMetadata1))))
202+ .thenReturn(Right (Some (CoordinatorEpochAndTxnMetadata (coordinatorEpoch, txnMetadata2))))
203+
204+ when(transactionManager.appendTransactionToLog(
205+ ArgumentMatchers .eq(transactionalId),
206+ ArgumentMatchers .eq(coordinatorEpoch),
207+ any[TxnTransitMetadata ],
208+ capturedErrorsCallback.capture(),
209+ any(),
210+ any()
211+ )).thenAnswer(_ => capturedErrorsCallback.getValue.apply(Errors .NONE ))
212+
213+ coordinator.handleEndTransaction(transactionalId, producerId, (Short .MaxValue - 1 ).toShort, TransactionResult .COMMIT , TV_2 , endTxnCallback)
214+ assertEquals(producerId2, newProducerId)
215+ assertEquals(0 , newEpoch)
216+ assertEquals(Errors .NONE , error)
217+ }
218+
185219 @ Test
186220 def shouldRespondWithNotCoordinatorOnInitPidWhenNotCoordinator (): Unit = {
187221 when(transactionManager.validateTransactionTimeoutMs(anyInt()))
@@ -519,7 +553,7 @@ class TransactionCoordinatorTest {
519553 .thenReturn(Right (Some (CoordinatorEpochAndTxnMetadata (coordinatorEpoch, txnMetadata))))
520554
521555 val nextProducerEpoch = if (isRetry) producerEpoch - 1 else producerEpoch
522- coordinator.handleEndTransaction(transactionalId, producerId, nextProducerEpoch.toShort , TransactionResult .ABORT , clientTransactionVersion, endTxnCallback)
556+ coordinator.handleEndTransaction(transactionalId, producerId, nextProducerEpoch.toShort, TransactionResult .ABORT , clientTransactionVersion, endTxnCallback)
523557 if (isRetry) {
524558 assertEquals(Errors .PRODUCER_FENCED , error)
525559 } else {
@@ -770,6 +804,39 @@ class TransactionCoordinatorTest {
770804 verify(transactionManager, times(2 )).getTransactionState(ArgumentMatchers .eq(transactionalId))
771805 }
772806
807+ @ Test
808+ def shouldReturnConcurrentTxnOnAddPartitionsIfEndTxnV2EpochOverflowAndNotComplete (): Unit = {
809+ val prepareWithPending = new TransactionMetadata (transactionalId, producerId, producerId,
810+ producerId2, Short .MaxValue , (Short .MaxValue - 1 ).toShort, 1 , PrepareCommit , collection.mutable.Set .empty[TopicPartition ], 0 , time.milliseconds(), TV_2 )
811+ val txnTransitMetadata = prepareWithPending.prepareComplete(time.milliseconds())
812+
813+ when(transactionManager.getTransactionState(ArgumentMatchers .eq(transactionalId)))
814+ .thenReturn(Right (Some (CoordinatorEpochAndTxnMetadata (coordinatorEpoch, prepareWithPending))))
815+
816+ // Return CONCURRENT_TRANSACTIONS while transaction is still completing
817+ coordinator.handleAddPartitionsToTransaction(transactionalId, producerId2, 0 , partitions, errorsCallback, TV_2 )
818+ assertEquals(Errors .CONCURRENT_TRANSACTIONS , error)
819+ verify(transactionManager).getTransactionState(ArgumentMatchers .eq(transactionalId))
820+
821+ prepareWithPending.completeTransitionTo(txnTransitMetadata)
822+ assertEquals(CompleteCommit , prepareWithPending.state)
823+ when(transactionManager.getTransactionState(ArgumentMatchers .eq(transactionalId)))
824+ .thenReturn(Right (Some (CoordinatorEpochAndTxnMetadata (coordinatorEpoch, prepareWithPending))))
825+ when(transactionManager.appendTransactionToLog(
826+ ArgumentMatchers .eq(transactionalId),
827+ ArgumentMatchers .eq(coordinatorEpoch),
828+ any[TxnTransitMetadata ],
829+ capturedErrorsCallback.capture(),
830+ any(),
831+ any())
832+ ).thenAnswer(_ => capturedErrorsCallback.getValue.apply(Errors .NONE ))
833+
834+ coordinator.handleAddPartitionsToTransaction(transactionalId, producerId2, 0 , partitions, errorsCallback, TV_2 )
835+
836+ assertEquals(Errors .NONE , error)
837+ verify(transactionManager, times(2 )).getTransactionState(ArgumentMatchers .eq(transactionalId))
838+ }
839+
773840 @ ParameterizedTest
774841 @ ValueSource (shorts = Array (0 , 2 ))
775842 def shouldAppendPrepareCommitToLogOnEndTxnWhenStatusIsOngoingAndResultIsCommit (transactionVersion : Short ): Unit = {
0 commit comments