Skip to content

Kafka Connect: Surface commit failures instead of silently swallowing them#16237

Open
yadavay-amzn wants to merge 1 commit into
apache:mainfrom
yadavay-amzn:fix/iceberg_15878
Open

Kafka Connect: Surface commit failures instead of silently swallowing them#16237
yadavay-amzn wants to merge 1 commit into
apache:mainfrom
yadavay-amzn:fix/iceberg_15878

Conversation

@yadavay-amzn
Copy link
Copy Markdown
Contributor

@yadavay-amzn yadavay-amzn commented May 7, 2026

Fixes #15878.

Problem

The Kafka Connect Coordinator previously caught Exception around doCommit() and only logged a warning, so when a commit failed fatally (e.g., ValidationException from stale offsets, IllegalArgumentException from bad partition spec), the connector stayed RUNNING while silently dropping the data that was in flight.

Fix

Selectively handle exceptions from doCommit():

  • CommitFailedException (retryable conflict): log at WARN and retry on the next commit cycle. This preserves the existing retry behavior for transient catalog conflicts (e.g., concurrent commits winning the race).
  • All other RuntimeException (fatal): log at ERROR and rethrow, terminating the coordinator. CoordinatorThread.run() catches the propagated exception, sets terminated = true, and the Connect task transitions to FAILED when CommitterImpl.save() sees coordinatorThread.isTerminated().

The finally block that calls commitState.endCurrentCommit() is preserved so per-commit state is cleaned up regardless of the outcome.

Testing

  • testCommitFailedExceptionSwallowed: verifies that CommitFailedException is caught and does NOT propagate (coordinator stays alive for retry).
  • testCommitError: verifies that IllegalArgumentException (bad partition spec) propagates and kills the coordinator.
  • testCoordinatorCommittedOffsetValidation: verifies that ValidationException (stale offsets) propagates.
  • Full TestCoordinator suite passes locally.
  • spotlessApply + checkstyleMain + checkstyleTest pass.

Copy link
Copy Markdown
Contributor

@Baunsgaard Baunsgaard left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch and cleanup.

However, the error logging strategy you are proposing seems to be double-logging every commit failure in CoordinatorThread.run(). I have left some specific suggestions.

Comment on lines +157 to +162
LOG.error(
"Coordinator {} failed to commit for commit {}; propagating failure to terminate task",
taskId,
commitState.currentCommitId(),
e);
throw e;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change it to

    throw new RuntimeException(
          String.format("Coordinator %s failed to commit %s",
              taskId, commitState.currentCommitId()),
          e);

This allows the further up CoordinatorThread.run() catch to log the error once, and still attribute the error to this location.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, updated in latest revision

ImmutableList.of(),
EventTestUtil.now()))
.isInstanceOf(CommitFailedException.class)
.hasMessageContaining("Glue detected concurrent update");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you do the above change, then i think this need to be :

        .hasRootCauseMessage("Glue detected concurrent update");

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@yadavay-amzn
Copy link
Copy Markdown
Contributor Author

Thanks @Baunsgaard for taking a look, you're right about the double-logging.
I've pushed an update with your recommended changes, please take a look when you get a chance. Thanks!

Copy link
Copy Markdown
Contributor

@Baunsgaard Baunsgaard left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, left one nit for production code. Tests looks fine!

Comment on lines +151 to +155
// Do not swallow commit failures: wrap with Coordinator context and propagate so
// CoordinatorThread.run() terminates and the Kafka Connect task transitions to FAILED
// instead of silently dropping data (e.g., CommitFailedException from catalogs that
// detect concurrent updates). The taskId and commitId are embedded in the wrapper
// message so that the single log emitted by CoordinatorThread retains the context.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think it is too much to leave this comment. It is a personal preference, but I would remove it.

@yadavay-amzn
Copy link
Copy Markdown
Contributor Author

yadavay-amzn commented May 11, 2026

Done, removed the comment block.

Copy link
Copy Markdown

@laskoviymishka laskoviymishka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for tackling #15878, the underlying data-loss bug is real.

Silently swallowing CommitFailedException means Connect can keep the task RUNNING while dropping in-flight data, so the fix direction is right: failures need to surface and move the task to FAILED.

Before merge, I’d resolve a few mismatches between the PR description and the actual diff, since these will be confusing later in git log / blame:

  1. “log at ERROR level ... before rethrowing”
    The code does not rethrow the original exception. It wraps it in a new RuntimeException(String.format(...), e), so CommitFailedException becomes a generic RuntimeException with the original only available as getCause(). That may lose useful signal for operator alerting / log pattern matching. I’d either rethrow e directly after logging, or update the description to say “wrap and rethrow with context.”

  2. “removes the catch-all around doCommit()
    The catch is not removed; it is narrowed from catch (Exception) to catch (RuntimeException). That may be fine, but the PR should say that explicitly. Otherwise, either keep catch (Exception) or remove the block entirely and rely on the outer catch in CoordinatorThread.run().

  3. CoordinatorThread.run() terminates the thread on uncaught exceptions, which transitions the Kafka Connect task to FAILED
    The end state is right, but the mechanism is different. run() catches Exception, logs, and sets terminated = true. The Connect task transitions later when CommitterImpl.save()processControlEvents() sees coordinatorThread.isTerminated() and throws NotRunningException. Worth tightening the description so future readers don’t learn the wrong invariant.

One behavioral change I’d like a Kafka Connect domain opinion on: this also makes the partial-commit path — commit(true), triggered by commitState.isCommitTimedOut() — fatal on any RuntimeException. Previously, a transient blip during partial commit was swallowed and retried; now it terminates the coordinator and needs operator intervention.

Is that the intended trade-off, or should the rethrow be gated on !partialCommit?

@AnatolyPopov, can you take a look, would value your read on two things:

  • should partial-commit failures be equally fatal?
  • does wrapping vs rethrowing the original CommitFailedException matter for downstream alerting in your deployments?

Inline comments below cover the test-side observations: spy reuse, brittle post-throw assertions, and missing partial-commit test.

@yadavay-amzn
Copy link
Copy Markdown
Contributor Author

yadavay-amzn commented May 15, 2026

Thanks for the thorough review @laskoviymishka. Addressed all points:

  1. Rethrow original exception: no longer wrapping in RuntimeException. The original CommitFailedException (or whatever the commit throws) is logged at ERROR and rethrown directly, preserving the type for alerting.

  2. Partial-commit gating: rethrow is now gated on !partialCommit. Transient failures during partial commits are logged but not fatal, matching the previous retry behavior. Only full-commit failures terminate the coordinator.

  3. Updated test assertions: tests now expect the original exception types directly.

Will update the PR description to accurately reflect the narrowed catch and the mechanism for task FAILED transition.

Copy link
Copy Markdown

@laskoviymishka laskoviymishka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good now, all the description/code mismatches are resolved and the partial-commit gate is in.

Small nit: the Testing section still references testCoordinatorWithBadDataFile, but the actually-modified test is testCommitError, worth a quick fix. Plus CI is a bit red.

Approving; will wait for @AnatolyPopov's domain read before merge.

@yadavay-amzn yadavay-amzn force-pushed the fix/iceberg_15878 branch 2 times, most recently from 6a6728a to ebccb88 Compare May 16, 2026 00:18
Narrow the catch around doCommit() and selectively handle exceptions:
- CommitFailedException (retryable conflict): log at WARN and retry on
  the next commit cycle. This preserves the existing retry behavior for
  transient catalog conflicts.
- All other RuntimeExceptions (fatal): log at ERROR and rethrow,
  terminating the coordinator and transitioning the task to FAILED.

This ensures genuine data-integrity failures surface to operators while
transient conflicts (e.g., concurrent commits) are handled gracefully.

Fixes apache#15878
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Kafka Connect] Connector enters silent broken state after CommitFailedException (Glue concurrent update) — no data written, no error surfaced

3 participants