Kafka Connect: Surface commit failures instead of silently swallowing them#16237
Kafka Connect: Surface commit failures instead of silently swallowing them#16237yadavay-amzn wants to merge 1 commit into
Conversation
Baunsgaard
left a comment
There was a problem hiding this comment.
Good catch and cleanup.
However, the error logging strategy you are proposing seems to be double-logging every commit failure in CoordinatorThread.run(). I have left some specific suggestions.
| LOG.error( | ||
| "Coordinator {} failed to commit for commit {}; propagating failure to terminate task", | ||
| taskId, | ||
| commitState.currentCommitId(), | ||
| e); | ||
| throw e; |
There was a problem hiding this comment.
change it to
throw new RuntimeException(
String.format("Coordinator %s failed to commit %s",
taskId, commitState.currentCommitId()),
e);
This allows the further up CoordinatorThread.run() catch to log the error once, and still attribute the error to this location.
There was a problem hiding this comment.
Done, updated in latest revision
| ImmutableList.of(), | ||
| EventTestUtil.now())) | ||
| .isInstanceOf(CommitFailedException.class) | ||
| .hasMessageContaining("Glue detected concurrent update"); |
There was a problem hiding this comment.
if you do the above change, then i think this need to be :
.hasRootCauseMessage("Glue detected concurrent update");
dd4620e to
8467e0d
Compare
|
Thanks @Baunsgaard for taking a look, you're right about the double-logging. |
Baunsgaard
left a comment
There was a problem hiding this comment.
LGTM, left one nit for production code. Tests looks fine!
| // Do not swallow commit failures: wrap with Coordinator context and propagate so | ||
| // CoordinatorThread.run() terminates and the Kafka Connect task transitions to FAILED | ||
| // instead of silently dropping data (e.g., CommitFailedException from catalogs that | ||
| // detect concurrent updates). The taskId and commitId are embedded in the wrapper | ||
| // message so that the single log emitted by CoordinatorThread retains the context. |
There was a problem hiding this comment.
nit: I think it is too much to leave this comment. It is a personal preference, but I would remove it.
8467e0d to
97cdeb1
Compare
|
Done, removed the comment block. |
laskoviymishka
left a comment
There was a problem hiding this comment.
Thanks for tackling #15878, the underlying data-loss bug is real.
Silently swallowing CommitFailedException means Connect can keep the task RUNNING while dropping in-flight data, so the fix direction is right: failures need to surface and move the task to FAILED.
Before merge, I’d resolve a few mismatches between the PR description and the actual diff, since these will be confusing later in git log / blame:
-
“log at ERROR level ... before rethrowing”
The code does not rethrow the original exception. It wraps it in a newRuntimeException(String.format(...), e), soCommitFailedExceptionbecomes a genericRuntimeExceptionwith the original only available asgetCause(). That may lose useful signal for operator alerting / log pattern matching. I’d either rethrowedirectly after logging, or update the description to say “wrap and rethrow with context.” -
“removes the catch-all around
doCommit()”
The catch is not removed; it is narrowed fromcatch (Exception)tocatch (RuntimeException). That may be fine, but the PR should say that explicitly. Otherwise, either keepcatch (Exception)or remove the block entirely and rely on the outer catch inCoordinatorThread.run(). -
“
CoordinatorThread.run()terminates the thread on uncaught exceptions, which transitions the Kafka Connect task toFAILED”
The end state is right, but the mechanism is different.run()catchesException, logs, and setsterminated = true. The Connect task transitions later whenCommitterImpl.save()→processControlEvents()seescoordinatorThread.isTerminated()and throwsNotRunningException. Worth tightening the description so future readers don’t learn the wrong invariant.
One behavioral change I’d like a Kafka Connect domain opinion on: this also makes the partial-commit path — commit(true), triggered by commitState.isCommitTimedOut() — fatal on any RuntimeException. Previously, a transient blip during partial commit was swallowed and retried; now it terminates the coordinator and needs operator intervention.
Is that the intended trade-off, or should the rethrow be gated on !partialCommit?
@AnatolyPopov, can you take a look, would value your read on two things:
- should partial-commit failures be equally fatal?
- does wrapping vs rethrowing the original
CommitFailedExceptionmatter for downstream alerting in your deployments?
Inline comments below cover the test-side observations: spy reuse, brittle post-throw assertions, and missing partial-commit test.
|
Thanks for the thorough review @laskoviymishka. Addressed all points:
Will update the PR description to accurately reflect the narrowed catch and the mechanism for task FAILED transition. |
There was a problem hiding this comment.
Looks good now, all the description/code mismatches are resolved and the partial-commit gate is in.
Small nit: the Testing section still references testCoordinatorWithBadDataFile, but the actually-modified test is testCommitError, worth a quick fix. Plus CI is a bit red.
Approving; will wait for @AnatolyPopov's domain read before merge.
6a6728a to
ebccb88
Compare
Narrow the catch around doCommit() and selectively handle exceptions: - CommitFailedException (retryable conflict): log at WARN and retry on the next commit cycle. This preserves the existing retry behavior for transient catalog conflicts. - All other RuntimeExceptions (fatal): log at ERROR and rethrow, terminating the coordinator and transitioning the task to FAILED. This ensures genuine data-integrity failures surface to operators while transient conflicts (e.g., concurrent commits) are handled gracefully. Fixes apache#15878
ebccb88 to
5a9e254
Compare
Fixes #15878.
Problem
The Kafka Connect
Coordinatorpreviously caughtExceptionarounddoCommit()and only logged a warning, so when a commit failed fatally (e.g.,ValidationExceptionfrom stale offsets,IllegalArgumentExceptionfrom bad partition spec), the connector stayedRUNNINGwhile silently dropping the data that was in flight.Fix
Selectively handle exceptions from
doCommit():CommitFailedException(retryable conflict): log at WARN and retry on the next commit cycle. This preserves the existing retry behavior for transient catalog conflicts (e.g., concurrent commits winning the race).RuntimeException(fatal): log at ERROR and rethrow, terminating the coordinator.CoordinatorThread.run()catches the propagated exception, setsterminated = true, and the Connect task transitions toFAILEDwhenCommitterImpl.save()seescoordinatorThread.isTerminated().The
finallyblock that callscommitState.endCurrentCommit()is preserved so per-commit state is cleaned up regardless of the outcome.Testing
testCommitFailedExceptionSwallowed: verifies thatCommitFailedExceptionis caught and does NOT propagate (coordinator stays alive for retry).testCommitError: verifies thatIllegalArgumentException(bad partition spec) propagates and kills the coordinator.testCoordinatorCommittedOffsetValidation: verifies thatValidationException(stale offsets) propagates.TestCoordinatorsuite passes locally.spotlessApply+checkstyleMain+checkstyleTestpass.