-
Notifications
You must be signed in to change notification settings - Fork 14.8k
KAFKA-19999 Transaction coordinator livelock caused by invalid producer epoch #21176
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
|
Test the test_id: kafkatest.tests.core.transactions_upgrade_test.TransactionsUpgradeTest.test_transactions_upgrade.from_kafka_version=3.3.2.metadata_quorum=ISOLATED_KRAFT.group_protocol=None test_id: kafkatest.tests.core.transactions_upgrade_test.TransactionsUpgradeTest.test_transactions_upgrade.from_kafka_version=4.1.1.metadata_quorum=ISOLATED_KRAFT.group_protocol=None test_id: kafkatest.tests.core.transactions_upgrade_test.TransactionsUpgradeTest.test_transactions_upgrade.from_kafka_version=3.4.1.metadata_quorum=ISOLATED_KRAFT.group_protocol=None test_id: kafkatest.tests.core.transactions_upgrade_test.TransactionsUpgradeTest.test_transactions_upgrade.from_kafka_version=3.5.2.metadata_quorum=ISOLATED_KRAFT.group_protocol=None test_id: kafkatest.tests.core.transactions_upgrade_test.TransactionsUpgradeTest.test_transactions_upgrade.from_kafka_version=3.6.2.metadata_quorum=ISOLATED_KRAFT.group_protocol=None test_id: kafkatest.tests.core.transactions_upgrade_test.TransactionsUpgradeTest.test_transactions_upgrade.from_kafka_version=3.7.2.metadata_quorum=ISOLATED_KRAFT.group_protocol=None test_id: kafkatest.tests.core.transactions_upgrade_test.TransactionsUpgradeTest.test_transactions_upgrade.from_kafka_version=3.9.1.metadata_quorum=ISOLATED_KRAFT.group_protocol=None test_id: kafkatest.tests.core.transactions_upgrade_test.TransactionsUpgradeTest.test_transactions_upgrade.from_kafka_version=3.8.1.metadata_quorum=ISOLATED_KRAFT.group_protocol=None test_id: kafkatest.tests.core.transactions_upgrade_test.TransactionsUpgradeTest.test_transactions_upgrade.from_kafka_version=4.0.1.metadata_quorum=ISOLATED_KRAFT.group_protocol=None |
chia7712
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@m1a2st thanks for this quick fix.
...c/main/java/org/apache/kafka/storage/internals/log/IdempotentTransactionMarkerException.java
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
Outdated
Show resolved
Hide resolved
| Errors.NOT_LEADER_OR_FOLLOWER | ||
| case error => | ||
| error | ||
| if (IdempotentTransactionMarkerException.isInstanceOf(exception)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please also add comments
| // TV2 Idempotent Retry Detection: | ||
| // When markerEpoch == currentEpoch and no transaction is ongoing, this is a retry | ||
| // of a marker that was already successfully written. Common scenarios: | ||
| // 1. Coordinator recovery: reloading PREPARE_COMMIT/ABORT from transaction log |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: from the transaction log
| public List<RecordError> recordErrors; | ||
| public String errorMessage; | ||
| public ProduceResponseData.LeaderIdAndEpoch currentLeader; | ||
| private Throwable exception; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The exception field might be feeling lonely since it was not invited to the hashCode, equals, and toString party
| // Verify initial state | ||
| ProducerStateEntry initialEntry = getLastEntryOrElseThrownByProducerId(stateManager, producerId); | ||
| assertEquals(0, initialEntry.producerEpoch()); | ||
| assertEquals(2, initialEntry.lastSeq()); | ||
| assertFalse(initialEntry.isEmpty()); // Has batch metadata | ||
|
|
||
| ProducerAppendInfo appendInfo = stateManager.prepareUpdate(producerId, AppendOrigin.CLIENT); | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would you mind removing those unrelated changes?
|
Thanks for @chia7712 review, I have addressed all comments |
In Transaction Version 2, strict epoch validation (
markerEpoch > currentEpoch) causes hanging transactions in two scenarios:transaction log, retried markers are rejected with
InvalidProducerEpochExceptionbecause they use the same epochcoordinator retries are rejected for the same reason
Both cases leave transactions permanently hanging in PREPARE state,
causing clients to fail with
CONCURRENT_TRANSACTIONS.Detect idempotent marker retries in
ProducerStateManager.checkProducerEpoch()by checking threeconditions:
When all conditions are met, treat the marker as a successful idempotent
retry instead of throwing an error.