Skip to content

fix(ws): per-group connection ownership to prevent shared-connection deadlock in v3 batch consumer#66385

Open
estefaniarabadan wants to merge 1 commit into
masterfrom
estefania/improve-connextion-ownership
Open

fix(ws): per-group connection ownership to prevent shared-connection deadlock in v3 batch consumer#66385
estefaniarabadan wants to merge 1 commit into
masterfrom
estefania/improve-connextion-ownership

Conversation

@estefaniarabadan

Copy link
Copy Markdown
Contributor

Problem

The V3 batch consumer uses psycopg async connections, which are not safe for concurrent coroutine access. After #65181 replaced the asyncio.gather() barrier with fire-and-forget tasks, all concurrent group tasks, heartbeats, and the poll loop shared a single connection (self._conn). Under load, this caused the connection to wedge silently - pods stayed Running but stopped processing batches entirely.

The root cause is connection ownership: Postgres advisory locks are session-scoped, so all operations for a (team_id, schema_id) group must happen on the same connection. But concurrent coroutines on the same psycopg async connection causes undefined behavior.

Changes

Gives each in-flight group its own dedicated connection while preserving the fire-and-forget parallelism from #65181.

Connection model:

Connection Owner Purpose
_poll_conn (was _conn) poll loop only fetch_candidates (no locks acquired)
_recovery_conn recovery loop only stale-batch sweeps, reconcile
per-group connection group task advisory lock, status writes, heartbeat, unlock

Total max connections per pod: max_concurrency (16) + 2 = 18.

SQL change - split fetch-and-lock:

The old get_unprocessed_and_lock combined candidate selection with lock acquisition in one query. This is split into:

  • get_candidates / get_delta_succeeded_candidates - same MATERIALIZED CTE, returns all candidates without calling pg_try_advisory_lock
  • try_lock_group(conn, team_id, schema_id) - acquires the advisory lock on the group's dedicated connection
  • unlock_group(conn, team_id, schema_id) - releases the lock (depth 1, matching try_lock_group)

The old per-row lock methods (get_unprocessed_and_lock, unlock_for_batches) are preserved for the recovery sweep path.

Other fixes included:

  • Heartbeat race: heartbeat task is now cancelled before post-processing writes (_verify_ownership, update_status(succeeded)), not after. The finally block remains as a safety net.
  • Connection routing: should_process_batch and after_batch_processed now use the group connection instead of _ensure_main_conn(), which matters for the Duckgres adapter (real DB work).
  • _fail_run calls in _process_single_inner now pass conn=lock_conn explicitly.
  • Graceful shutdown drains in-flight tasks with a 30s timeout before cancelling stragglers.

How did you test this code?

Agent-authored. Ran the full consumer test suite (50 tests across Delta and Duckgres consumers, all passing).

New regression tests:

  • TestPerGroupConnectionIsolation.test_each_group_gets_distinct_connection - verifies each group task receives a unique connection object from _connect(). Fails against the old lock_conn = self._conn code.
  • TestPerGroupConnectionIsolation.test_group_connection_not_shared_with_poll - verifies group connections are distinct from _poll_conn.
  • TestProcessGroup.test_skips_group_when_lock_not_acquired - verifies try_lock_group returning False causes the group to be skipped without processing.
  • TestProcessGroup.test_closes_connection_on_exit - verifies group connection is closed in the finally block.
  • TestInFlightTaskRegistry.test_reap_finished_tasks_removes_completed - verifies completed tasks are cleaned from _in_flight.
  • TestInFlightTaskRegistry.test_process_group_tracked_swallows_exceptions - verifies unhandled exceptions don't propagate to the event loop.
  • TestBatchQueueGetCandidates - integration tests verifying candidates returned without acquiring advisory locks.
  • TestBatchQueueGroupLock - integration tests for try_lock_group exclusion across connections and unlock_group release.
  • Equivalent Duckgres integration tests for get_delta_succeeded_candidates, try_lock_group, unlock_group.

Automatic notifications

  • Publish to changelog?
  • Alert Sales and Marketing teams?

🤖 Agent context

Autonomy: Human-driven (agent-assisted)

Used Claude Code (Opus). The human provided detailed root cause analysis of the shared-connection deadlock, design constraints (advisory locks are session-scoped, psycopg async connections not safe for concurrent
use), and the per-group connection approach. Key design decisions during implementation:

  • Chose per-group dedicated connections over a connection pool for simplicity (18 connections per pod is acceptable)
  • Split fetch_and_lock into fetch_candidates + try_lock_group to keep the poll connection lock-free
  • Added unlock_group (single unlock, depth 1) alongside existing unlock_for_batches (per-row unlock for recovery sweep compatibility)
  • Fixed the heartbeat race by cancelling the heartbeat task between _process_batch and post-processing writes, rather than only in the finally block

Skills invoked: none (changes are to internal async infrastructure, not DRF/Django/frontend).

@greptile-apps

greptile-apps Bot commented Jun 26, 2026

Copy link
Copy Markdown
Contributor

Reviews (1): Last reviewed commit: "fix(data-imports): improve connection ow..." | Re-trigger Greptile

Comment on lines 383 to 393
)
break
finally:
self._semaphore.release()
try:
assert lock_conn is not None
await self._adapter.unlock(lock_conn, batches=batches)
await self._adapter.unlock_group(group_conn, team_id=team_id, schema_id=schema_id)
except Exception as e:
# A dead session already released its locks server-side; don't crash every concurrent group.
logger.exception(
self._event("unlock_for_batches_failed"),
self._event("unlock_group_failed"),
team_id=team_id,
external_data_schema_id=schema_id,
)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 unlock_group called unconditionally when lock was never acquired

When try_lock_group returns False, the function does an early return inside the try block. The finally block still runs and calls unlock_group, which issues pg_advisory_unlock on a lock this session never holds. PostgreSQL handles it gracefully (returns false, emits a server-side WARNING), but it generates noise in DB logs and wastes a round-trip on every skipped group. The local variable locked is in scope in the finally block and can guard this call.

Comment on lines +313 to +321
exc = task.exception() if not task.cancelled() else None
if exc is not None:
logger.exception(
self._event("group_task_unhandled_exception"),
team_id=key[0],
schema_id=key[1],
exc_info=exc,
)
capture_exception(exc)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Dead exception-logging branch in _reap_finished_tasks

_process_group_tracked catches all Exceptions and never re-raises, so completed tasks in _in_flight will always have task.exception() == None. The if exc is not None: block is unreachable — exceptions are already logged and captured inside _process_group_tracked. This is superfluous code that can be removed without loss of coverage.

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

@Gilbert09 Gilbert09 left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some mad SQL in here, but nothing jumped out at me

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants