Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions kafka/producer/transaction_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,11 @@ def send_offsets_to_transaction(self, offsets, group_metadata):
"send_offsets_to_transaction expects group_metadata to be a "
"ConsumerGroupMetadata or a group_id str, got %r" % (type(group_metadata),))

if group_metadata.generation_id > 0 and not group_metadata.member_id:
raise ValueError(
"Invalid ConsumerGroupMetadata: generation_id=%s implies a"
" joined group but member_id is empty" % (group_metadata.generation_id,))

with self._lock:
self._ensure_transactional()
self._maybe_fail_with_error()
Expand Down Expand Up @@ -1191,6 +1196,11 @@ def handle_response(self, response):
# Java client normalizes INVALID_PRODUCER_EPOCH to PRODUCER_FENCED
# on the txn-coordinator RPC paths (KIP-360).
self.fatal_error(Errors.ProducerFencedError())
elif error_type in (Errors.UnknownProducerIdError, Errors.InvalidProducerIdMappingError):
if self.transaction_manager._supports_epoch_bump():
self.abortable_error(error_type())
else:
self.fatal_error(error_type())
elif error_type is Errors.TransactionalIdAuthorizationFailedError:
self.fatal_error(error_type())
elif error_type is Errors.GroupAuthorizationFailedError:
Expand Down Expand Up @@ -1275,6 +1285,24 @@ def handle_response(self, response):
# on the txn-coordinator RPC paths (KIP-360).
self.fatal_error(Errors.ProducerFencedError())
return
elif error_type is Errors.FencedInstanceIdError:
# KIP-447: static-membership fencing - another consumer
# instance with this group_instance_id displaced ours. The
# transaction must be aborted, but the producer can be
# reused for a fresh transaction.
self.abortable_error(error_type())
return
elif error_type in (Errors.IllegalGenerationError,
Errors.UnknownMemberIdError):
# KIP-447: the consumer generation / member_id we passed
# in are stale (the consumer rebalanced between when we
# snapshotted group_metadata and when the broker checked
# it). Abort the txn so the application can re-snapshot
# and retry.
self.abortable_error(Errors.CommitFailedError(
"Transaction offset commit failed due to consumer group"
" metadata mismatch: %s" % (error_type.__name__,)))
return
elif error_type in (Errors.TransactionalIdAuthorizationFailedError,
Errors.UnsupportedForMessageFormatError):
self.fatal_error(error_type())
Expand Down
92 changes: 92 additions & 0 deletions test/producer/test_transaction_manager_mock_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -931,6 +931,48 @@ def test_group_authz_failed_is_abortable(self, broker, client):
assert isinstance(tm.last_error, Errors.GroupAuthorizationFailedError)
assert handler._result.failed

@pytest.mark.parametrize("error", [
Errors.UnknownProducerIdError,
Errors.InvalidProducerIdMappingError,
])
def test_unknown_producer_id_abortable_on_modern_broker(
self, broker, client, error):
"""KIP-360: UNKNOWN_PRODUCER_ID / INVALID_PRODUCER_ID_MAPPING on a
broker that supports epoch bumping (>= 2.5) is abortable - the
application aborts the txn and retries; the producer epoch is
bumped under the hood. Matches Java's abortableErrorIfPossible."""
tm = _make_manager(client, api_version=(2, 5))
handler, _, _ = self._enqueue_add_offsets(tm)

broker.respond(AddOffsetsToTxnResponse,
self._response(error_code=error.errno))
_, future = _dispatch_next(client, tm)
_poll_for_future(client, future)

assert tm._current_state == TransactionState.ABORTABLE_ERROR
assert isinstance(tm.last_error, error)
assert handler._result.failed

@pytest.mark.parametrize("error", [
Errors.UnknownProducerIdError,
Errors.InvalidProducerIdMappingError,
])
def test_unknown_producer_id_fatal_on_old_broker(
self, broker, client, error):
"""On a broker < 2.5 there's no epoch-bump path, so these errors
remain fatal."""
tm = _make_manager(client, api_version=(2, 4))
handler, _, _ = self._enqueue_add_offsets(tm)

broker.respond(AddOffsetsToTxnResponse,
self._response(error_code=error.errno))
_, future = _dispatch_next(client, tm)
_poll_for_future(client, future)

assert tm._current_state == TransactionState.FATAL_ERROR
assert isinstance(tm.last_error, error)
assert handler._result.failed

def test_unknown_error_is_fatal(self, broker, client):
tm = _make_manager(client)
handler, _, _ = self._enqueue_add_offsets(tm)
Expand Down Expand Up @@ -1077,6 +1119,44 @@ def test_group_authz_failed_is_abortable(self, broker, client):
assert isinstance(tm.last_error, Errors.GroupAuthorizationFailedError)
assert handler._result.failed

def test_fenced_instance_id_is_abortable(self, broker, client):
"""KIP-447: FENCED_INSTANCE_ID means another static-membership
instance displaced us - the transaction must abort, but the producer
can be reused (matches Java)."""
tm = _make_manager(client)
handler, tp = self._enqueue_offset_commit(tm)
broker.respond(
TxnOffsetCommitResponse,
self._response({(tp.topic, tp.partition):
Errors.FencedInstanceIdError.errno}))
_, future = _dispatch_next(client, tm)
_poll_for_future(client, future)
assert tm._current_state == TransactionState.ABORTABLE_ERROR
assert isinstance(tm.last_error, Errors.FencedInstanceIdError)
assert handler._result.failed

@pytest.mark.parametrize("error", [
Errors.IllegalGenerationError,
Errors.UnknownMemberIdError,
])
def test_consumer_group_metadata_mismatch_is_abortable(
self, broker, client, error):
"""KIP-447: ILLEGAL_GENERATION / UNKNOWN_MEMBER_ID indicate the
consumer rebalanced between snapshot and commit. Abort the txn so
the app can re-snapshot and retry (matches Java's
CommitFailedException)."""
tm = _make_manager(client)
handler, tp = self._enqueue_offset_commit(tm)
broker.respond(
TxnOffsetCommitResponse,
self._response({(tp.topic, tp.partition): error.errno}))
_, future = _dispatch_next(client, tm)
_poll_for_future(client, future)
assert tm._current_state == TransactionState.ABORTABLE_ERROR
assert isinstance(tm.last_error, Errors.CommitFailedError)
assert error.__name__ in str(tm.last_error)
assert handler._result.failed

def test_unknown_partition_error_is_fatal(self, broker, client):
tm = _make_manager(client)
handler, tp = self._enqueue_offset_commit(tm)
Expand Down Expand Up @@ -1211,6 +1291,18 @@ def test_send_offsets_to_transaction_rejects_garbage(self, broker, client):
with pytest.raises(TypeError):
tm.send_offsets_to_transaction(self._offsets(), 42)

def test_send_offsets_to_transaction_rejects_incoherent_metadata(
self, broker, client):
"""Mirror Java's throwIfInvalidGroupMetadata: a generation_id > 0
with an empty member_id is incoherent and should be rejected at
the API boundary rather than sent to the broker."""
tm = _make_manager(client)
tm._current_state = TransactionState.IN_TRANSACTION
bad = ConsumerGroupMetadata(group_id='g', generation_id=5,
member_id='', group_instance_id=None)
with pytest.raises(ValueError, match='generation_id'):
tm.send_offsets_to_transaction(self._offsets(), bad)

def test_group_metadata_propagates_through_add_offsets_to_commit_handler(
self, broker, client):
"""The AddOffsetsToTxn -> TxnOffsetCommit chain must preserve the
Expand Down
Loading