fix(ws): per-group connection ownership to prevent shared-connection deadlock in v3 batch consumer#66385
fix(ws): per-group connection ownership to prevent shared-connection deadlock in v3 batch consumer#66385estefaniarabadan wants to merge 1 commit into
Conversation
|
Reviews (1): Last reviewed commit: "fix(data-imports): improve connection ow..." | Re-trigger Greptile |
| ) | ||
| break | ||
| finally: | ||
| self._semaphore.release() | ||
| try: | ||
| assert lock_conn is not None | ||
| await self._adapter.unlock(lock_conn, batches=batches) | ||
| await self._adapter.unlock_group(group_conn, team_id=team_id, schema_id=schema_id) | ||
| except Exception as e: | ||
| # A dead session already released its locks server-side; don't crash every concurrent group. | ||
| logger.exception( | ||
| self._event("unlock_for_batches_failed"), | ||
| self._event("unlock_group_failed"), | ||
| team_id=team_id, | ||
| external_data_schema_id=schema_id, | ||
| ) |
There was a problem hiding this comment.
unlock_group called unconditionally when lock was never acquired
When try_lock_group returns False, the function does an early return inside the try block. The finally block still runs and calls unlock_group, which issues pg_advisory_unlock on a lock this session never holds. PostgreSQL handles it gracefully (returns false, emits a server-side WARNING), but it generates noise in DB logs and wastes a round-trip on every skipped group. The local variable locked is in scope in the finally block and can guard this call.
| exc = task.exception() if not task.cancelled() else None | ||
| if exc is not None: | ||
| logger.exception( | ||
| self._event("group_task_unhandled_exception"), | ||
| team_id=key[0], | ||
| schema_id=key[1], | ||
| exc_info=exc, | ||
| ) | ||
| capture_exception(exc) |
There was a problem hiding this comment.
Dead exception-logging branch in
_reap_finished_tasks
_process_group_tracked catches all Exceptions and never re-raises, so completed tasks in _in_flight will always have task.exception() == None. The if exc is not None: block is unreachable — exceptions are already logged and captured inside _process_group_tracked. This is superfluous code that can be removed without loss of coverage.
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
Gilbert09
left a comment
There was a problem hiding this comment.
Some mad SQL in here, but nothing jumped out at me
Problem
The V3 batch consumer uses
psycopgasync connections, which are not safe for concurrent coroutine access. After #65181 replaced theasyncio.gather()barrier with fire-and-forget tasks, all concurrent group tasks, heartbeats, and the poll loop shared a single connection (self._conn). Under load, this caused the connection to wedge silently - pods stayed Running but stopped processing batches entirely.The root cause is connection ownership: Postgres advisory locks are session-scoped, so all operations for a
(team_id, schema_id)group must happen on the same connection. But concurrent coroutines on the samepsycopgasync connection causes undefined behavior.Changes
Gives each in-flight group its own dedicated connection while preserving the fire-and-forget parallelism from #65181.
Connection model:
_poll_conn(was_conn)fetch_candidates(no locks acquired)_recovery_connTotal max connections per pod:
max_concurrency(16) + 2 = 18.SQL change - split fetch-and-lock:
The old
get_unprocessed_and_lockcombined candidate selection with lock acquisition in one query. This is split into:get_candidates/get_delta_succeeded_candidates- same MATERIALIZED CTE, returns all candidates without callingpg_try_advisory_locktry_lock_group(conn, team_id, schema_id)- acquires the advisory lock on the group's dedicated connectionunlock_group(conn, team_id, schema_id)- releases the lock (depth 1, matchingtry_lock_group)The old per-row lock methods (
get_unprocessed_and_lock,unlock_for_batches) are preserved for the recovery sweep path.Other fixes included:
_verify_ownership,update_status(succeeded)), not after. Thefinallyblock remains as a safety net.should_process_batchandafter_batch_processednow use the group connection instead of_ensure_main_conn(), which matters for the Duckgres adapter (real DB work)._fail_runcalls in_process_single_innernow passconn=lock_connexplicitly.How did you test this code?
Agent-authored. Ran the full consumer test suite (50 tests across Delta and Duckgres consumers, all passing).
New regression tests:
TestPerGroupConnectionIsolation.test_each_group_gets_distinct_connection- verifies each group task receives a unique connection object from_connect(). Fails against the oldlock_conn = self._conncode.TestPerGroupConnectionIsolation.test_group_connection_not_shared_with_poll- verifies group connections are distinct from_poll_conn.TestProcessGroup.test_skips_group_when_lock_not_acquired- verifiestry_lock_groupreturningFalsecauses the group to be skipped without processing.TestProcessGroup.test_closes_connection_on_exit- verifies group connection is closed in the finally block.TestInFlightTaskRegistry.test_reap_finished_tasks_removes_completed- verifies completed tasks are cleaned from_in_flight.TestInFlightTaskRegistry.test_process_group_tracked_swallows_exceptions- verifies unhandled exceptions don't propagate to the event loop.TestBatchQueueGetCandidates- integration tests verifying candidates returned without acquiring advisory locks.TestBatchQueueGroupLock- integration tests fortry_lock_groupexclusion across connections andunlock_grouprelease.get_delta_succeeded_candidates,try_lock_group,unlock_group.Automatic notifications
🤖 Agent context
Autonomy: Human-driven (agent-assisted)
Used Claude Code (Opus). The human provided detailed root cause analysis of the shared-connection deadlock, design constraints (advisory locks are session-scoped, psycopg async connections not safe for concurrent
use), and the per-group connection approach. Key design decisions during implementation:
fetch_and_lockintofetch_candidates+try_lock_groupto keep the poll connection lock-freeunlock_group(single unlock, depth 1) alongside existingunlock_for_batches(per-row unlock for recovery sweep compatibility)_process_batchand post-processing writes, rather than only in thefinallyblockSkills invoked: none (changes are to internal async infrastructure, not DRF/Django/frontend).