Skip to content

KAFKA-15370: Support Participation in 2PC (KIP-939) (K/N) #19751

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from

Conversation

artemlivshits
Copy link
Contributor

Support keepPreparedTxn=true on transaction coordinator. The ongoing transaction needs to be preserved (and its producerId / epoch needs to be preserved as well so that we can send transaction markers), but the client-facing producerId / epoch needs to move forward, so now we have 2 id / epoch pairs that need to be maintained during the Ongoing and Prepare states.

To simplify state machine implementation, the transaction coordinator now eagerly rejects requests during the prepareCommit / prepareAbort states with CONCURRENT_TRANSACTIONS error - the client would get this error anyway and we avoid handing a complex combination of conditions that try to distinguish between retries and invalid requests.

Support keepPreparedTxn=true on transaction coordinator.  The ongoing
transaction needs to be preserved (and its producerId / epoch needs to
be preserved as well so that we can send transaction markers), but the
client-facing producerId / epoch needs to move forward, so now we have 2
id / epoch pairs that need to be maintained during the ongoing and
prepare states.

To simplify state machine implementation, the transaction coordinator
eagerly rejects requests during the prepareCommit / prepareAbort states
with CONCURRENT_TRANSACTIONS error - the client would get this error
anyway and we avoid handing a complex combinaton of conditions that try
to distinguish between retries and invalid requests.
@github-actions github-actions bot added triage PRs from the community core Kafka Broker transactions Transactions and EOS labels May 17, 2025
// (including PrepareCommit and PrepareAbort) in already complex state machine.
if (txnMetadata.pendingTransitionInProgress && !txnMetadata.pendingState.contains(TransactionState.PREPARE_EPOCH_FENCE)) {
Left(Errors.CONCURRENT_TRANSACTIONS)
} else if (txnMetadata.state == TransactionState.PREPARE_COMMIT || txnMetadata.state == TransactionState.PREPARE_ABORT) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we eagerly reject requests if the transaction is in PrepareCommit / PrepareAbort state. Previously the code tried to validate the state and return fenced error on mismatch, but in the end if the checks were successful the request got rejected with concurrent transactions error. The checks grew extremely complex with the new nextProduceEpoch state, so we avoid this complexity altogether by just returning concurrent transactions error for this transient state.

@@ -109,7 +125,7 @@ private[transaction] class TransactionMetadata(val transactionalId: String,
def prepareIncrementProducerEpoch(newTxnTimeoutMs: Int,
expectedProducerEpoch: Option[Short],
updateTimestamp: Long): Either[Errors, TxnTransitMetadata] = {
if (isProducerEpochExhausted)
if (TransactionMetadata.isEpochExhausted(producerEpoch))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This changed because isProducerEpochExhausted used producerEpoch, but now it uses clientProducerEpoch (which sometimes could be nextProducerEpoch), so in the place we want to continue checking producerEpoch, thus this is an explicit call.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify for myself:
This method would not be called from keepPrepared=True initProducerId calls, and thus we should update the ongoing transaction's producer epoch and not the client (in some cases nextProducerEpoch) epoch.

If we call this method, how does that affect nextProducerEpoch/Id? I could imagine we bump the epoch here such that it matches the nextProducerEpoch.

@github-actions github-actions bot removed the triage PRs from the community label May 18, 2025
responseCallback(initTransactionError(Errors.UNSUPPORTED_VERSION))
} else if (keepPreparedTxn && expectedProducerIdAndEpoch.nonEmpty) {
// keepPreparedTxn can only be specified in the initial call of InitProducerId, so
// no exposed producerId / epoch should be set
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: did we mean to say expected or exposed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be "expected", thanks

// When InitProducerId keeps an Ongoing transaction (which is "prepared" from
// the 2PC protocol perspective) it needs to preserve the producerId / epoch of
// the transaction (that will be used for sending markers), but update the
// producerId / epoch that's going ot be used by client (so that we could fence
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: ot -> to

@@ -174,6 +190,29 @@ private[transaction] class TransactionMetadata(val transactionalId: String,
)
}

def prepareIncrementProducerEpochForOngoing(updateTimestamp: Long): TxnTransitMetadata = {
Copy link
Contributor

@rreddy-22 rreddy-22 May 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could we maybe name this to specify that we're updating the "next" producer epoch, and only in case of prepared txns? Perhaps even add a Javadoc. Something like prepareNextProducerEpochForOngoingTxn?

@@ -250,7 +300,7 @@ private[transaction] class TransactionMetadata(val transactionalId: String,
* Check if the epochs have been exhausted for the current producerId. We do not allow the client to use an
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we update the javadoc to specify that this method checks if the client producer epoch is exhausted and what that entails wrt next producer epoch and producer epoch

// then we also need to bump this epoch -- we want to fence requests that were issued with that
// epoch after the transaction is complete. Consider this example:
// 1. (initial state): Ongoing, producerId = 42, producerEpoch = 100, nextProducerId = -1, nextProducerEpoch = -1
// 2. InitProducerId(keepPreparedTxn): Ongoing, producerId = 42, producerEpoch = 100, nextProducerId = 42, nextProducerEpoch = 101
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At step 2, are we returning the epoch 100 or 101 to the client? In other words, what is the epoch we send at step 3?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ci-approved core Kafka Broker transactions Transactions and EOS
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants