feat(data-warehouse): duckgres sink backfill primer#63144
Conversation
# Conflicts: # posthog/temporal/data_imports/pipelines/pipeline_v3/postgres_queue/consumer.py
…shared engine Master added group-halting on non-success, structlog workflow_type/log_source_id binding, and a recovery-sweep error_response after this branch forked. The shared v3 batch consumer engine (extracted from the older base) lacked them, so the merged branch regressed four tests in test_consumer.py. Port the three behaviors into BatchConsumer so the Delta consumer keeps master's semantics. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…-consumer # Conflicts: # posthog/temporal/data_imports/pipelines/pipeline_v3/postgres_queue/consumer.py
…-consumer # Conflicts: # posthog/temporal/data_imports/pipelines/pipeline_v3/postgres_queue/consumer.py
# Conflicts: # products/warehouse_sources_queue/backend/migrations/max_migration.txt
The charts deployment (PostHog/charts#11198) tracks a warehouse-sources-duckgres-load release in state.yaml, but nothing in CD dispatched commit_state_update for it — the seeded image sha would never advance (and predates the run_warehouse_sources_duckgres_load command). Mirrors the warehouse-sources-load dispatch block. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…lter
posthog/ducklake/ and products/warehouse_sources{,_queue}/backend/ are
imported by the consumer at runtime; changes there must roll its image.
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Addresses the design-audit findings on the duckgres batch sink: Correctness (critical): - Cross-run head-of-line gate in get_delta_succeeded_and_lock: a batch is ineligible while an older non-failed run of the same (team, schema) has unapplied data batches, so a newer run's CREATE OR REPLACE can never interleave with an older run's inserts/merges. - supersede_replaced_runs: when a newer replace-run (full refresh / first-ever incremental) is ready, older runs' pending duckgres work is retired as 'superseded' — keeps the gate live without applying stale rows. - Org gating: the consumer only claims batches for teams of DuckgresServer orgs with the duckgres-batch-sink flag on (cached 60s; empty set short- circuits the poll). Mutual exclusion: the DuckLake copy workflow gate now skips sink-enabled teams (same tables, two writers). - Extract-bucket credentials: the processor mints short-lived AWS creds and injects a session secret SCOPEd to s3://DATAWAREHOUSE_BUCKET, since the duckgres worker's org-scoped credentials cannot read the internal extract bucket. Session setup now installs only httpfs (bundled; 'delta' is not and would hang on egress-restricted workers). - Transactional apply-marker arbiter: the duckgres-side marker insert aborts (and rolls back the data write) when a concurrent processor already applied the batch — double-apply is impossible regardless of advisory-lock state. Recovery grace default raised to 900s. Reliability / operability: - Real parallelism: process_batch uses thread_sensitive=False (the default serialized all batches on asgiref's single thread); duckgres connections get connect_timeout + TCP keepalives; a stuck-batch watchdog withholds liveness when one batch exceeds --stuck-batch-timeout (default 30m). - Retry budget fit for an external sink: max_attempts 8, backoff base 300s (~3h coverage); reset_duckgres_failed_runs management command un-sticks a permanently failed run inside the retention window. - Lag observability: duckgres_sink_eligible_backlog and duckgres_sink_oldest_eligible_age_seconds gauges; engine metrics are now injectable so the sink emits duckgres_pg_consumer_* instead of overloading the Delta consumer's metric names. - Partition management covers sourcebatchduckgresstatus and prunes sourcebatchduckgresapply (30d, > the 14d eligibility window); the duckgres-side marker table is pruned opportunistically at batch 0. - Engine: restored batch_picked_up/batch_processed_ok log events dropped by the refactor; configure_logger wired into the duckgres entrypoint; fail_run no-raise contract documented + engine backstop; crash-recovered applied batches now converge to 'succeeded' instead of stranding in waiting_retry. Tests: DB-backed regression tests for the cross-run gate, supersede, team filter, and backlog stats; unit tests for enablement gating and the watchdog. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
The gate test's catch-all feature_enabled fake would have tripped the new sink-exclusion check; make it key-aware and add a test asserting a sink-enabled team never runs the copy workflow. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
frozen.access_key/secret_key are typed str | None; guard explicitly (an empty key pair is unusable anyway) so the CREATE SECRET builder takes str. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Pre-existing incremental/append schemas have history in Delta that the
sink's per-batch stream never replays; the legacy full-table delta_scan
copy hangs/OOMs on huge tables (one giant statement, one worker session,
no resume). This backfills with bounded memory and checkpoints instead:
- DuckgresSinkSchemaState (app DB): PENDING_BACKFILL -> BACKFILLING ->
PRIMED lifecycle per schema (+ NEEDS_RESYNC for retention-loss repair).
Live batches for a schema are blocked until PRIMED — including schemas
with no state row yet, so flag-flip has no partial-history window.
- Planner (runs in the consumer's maintenance tick): pins a Delta snapshot
via deltalake metadata, derives ~1GiB parquet chunks — the table's own
live files when the snapshot has no deletion vectors (zero data
movement), else a streamed, resumable staging pass — and enqueues them
as a synthetic run pre-marked delta-succeeded (invisible to the Delta
consumer, immediately eligible for the duckgres fetch). Re-plans insert
only missing chunks, so queue-partition drops cost nothing.
- Processor: chunks apply into <table>__backfill via the normal apply-
marker machinery (bounded statements, retries, exactly-once); the last
chunk atomically DROP+RENAMEs it over the live table in the same
transaction, then flips the schema to PRIMED. Live runs queued behind
the backfill (cross-run gate) MERGE on top afterwards.
- A live full refresh superseding the backfill flips the schema straight
to PRIMED (the replace produces the complete table anyway); backfill
runs are is_resume=true so they can never act as replace-heads.
- reset_duckgres_failed_runs --replan-backfill retires a wedged backfill
run and re-enters planning; duckgres_backfill_schemas{state} gauge.
Tests: chunk grouping, processor backfill branch (create/swap/idempotent
no-op), planner fail-safe gating in the consumer, and DB-backed coverage
for the blocked-schema gate, backfill pass-through, and supersede
exclusion.
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Migration SQL ChangesHey 👋, we've detected some migrations on this PR. Here's the SQL output for each migration, make sure they make sense:
|
🔍 Migration Risk AnalysisWe've analyzed your migrations for potential risks. Summary: 1 Safe | 0 Needs Review | 0 Blocked ✅ SafeBrief or no lock, backwards compatible Last updated: 2026-06-26 18:52 UTC (8c8f6c0) |
|
| if is_last: | ||
| from posthog.temporal.data_imports.pipelines.pipeline_v3.duckgres.backfill import mark_primed | ||
|
|
||
| mark_primed(batch.schema_id, chunks_applied=chunk_count) |
There was a problem hiding this comment.
mark_primed outside the committed transaction — permanently stuck BACKFILLING
mark_primed runs after with conn.transaction() exits, so the duckgres swap (DROP + RENAME + apply marker) can commit while the app-DB state update fails. When mark_primed raises:
- The exception propagates through
process_batch; the consumer marks this batch'failed'. - On retry,
_has_duckgres_batch_applied→True(apply marker committed in duckgres), so the processor returns at the early-exit guard —mark_primedis never reached again. after_batch_processedruns and marks the batch succeeded in the queue; the reconciler finds no failed batches for thisrun_uuid._reconcile_backfillingonly transitions toPRIMEDwhen a failed batch's error contains"superseded"; there is no path forchunks_applied == chunk_count → PRIMED.
The schema is permanently stuck in BACKFILLING with all live batches blocked, no last_error set, and no automated recovery. The only escape is operator intervention via reset_duckgres_failed_runs --replan-backfill.
Consider adding an auto-PRIME transition in _reconcile_backfilling when applied[0] >= state.chunk_count (and state.chunk_count is set), or catching the mark_primed error and scheduling a retry rather than propagating.
Prompt To Fix With AI
This is a comment left during a code review.
Path: posthog/temporal/data_imports/pipelines/pipeline_v3/duckgres/processor.py
Line: 312-315
Comment:
**`mark_primed` outside the committed transaction — permanently stuck BACKFILLING**
`mark_primed` runs after `with conn.transaction()` exits, so the duckgres swap (DROP + RENAME + apply marker) can commit while the app-DB state update fails. When `mark_primed` raises:
1. The exception propagates through `process_batch`; the consumer marks this batch `'failed'`.
2. On retry, `_has_duckgres_batch_applied` → `True` (apply marker committed in duckgres), so the processor returns at the early-exit guard — `mark_primed` is never reached again.
3. `after_batch_processed` runs and marks the batch succeeded in the queue; the reconciler finds no failed batches for this `run_uuid`.
4. `_reconcile_backfilling` only transitions to `PRIMED` when a failed batch's error contains `"superseded"`; there is no path for `chunks_applied == chunk_count → PRIMED`.
The schema is permanently stuck in `BACKFILLING` with all live batches blocked, no `last_error` set, and no automated recovery. The only escape is operator intervention via `reset_duckgres_failed_runs --replan-backfill`.
Consider adding an auto-PRIME transition in `_reconcile_backfilling` when `applied[0] >= state.chunk_count` (and `state.chunk_count` is set), or catching the `mark_primed` error and scheduling a retry rather than propagating.
How can I resolve this? If you propose a fix, please make it concise.Root-cause fixes for the 39 confirmed review findings on the initial implementation: Deadlock (critical): pre-enablement live runs are delta-succeeded, blocked (schema unprimed), and older than the backfill run — they gated every backfill chunk via the cross-run check forever, serialized org-wide by the backfill cap. _plan_one now retires the schema's pre-snapshot runs first (terminal 'covered by duckgres backfill snapshot vN' status; their data is inside the pinned snapshot by construction), exactly like the supersede sweep. DB test pins chunk eligibility despite an older unapplied live run. Replan correctness (critical): run uuids gain a per-attempt generation nonce, so replanning at an unadvanced Delta version produces a fresh, claimable run instead of reusing the terminally-failed one; reconcile's supersede match is narrowed to the live replace-run prefix and the replan reason is distinct, killing the false flip-to-PRIMED. Multi-pod safety (critical): PENDING->BACKFILLING is a CAS claim (single planner per schema; org-cap re-checked post-claim); each synthetic batch row commits WITH its pre-succeeded delta-status row (a row visible without it would be claimed by the Delta consumer and loaded into Delta); mark_primed and all reconcile transitions are CAS so stale passes can't regress state. PRIMED healing (critical): reconcile is now authoritative — full apply count proves the swap (the last chunk's marker shares its transaction) and flips PRIMED even when the post-swap call was lost; the processor's already- applied last-chunk retry path also calls mark_primed. Retention (critical): reconcile re-enqueues chunk rows dropped by 7-day queue retention, re-resolved at the pinned snapshot version (idempotent by (run_uuid, batch_index)). deltalake reality (critical): get_deltalake_storage_options needs DuckLake RDS env the consumer lacks — replaced with consumer-local options (ambient chain in prod, MinIO in dev); deltalake 1.4.0 cannot stream deletion-vector tables, so the staging path is removed and DV tables park in NEEDS_RESYNC with a clear error (full-refresh resync heals). Also: union_by_name on multi-file chunk reads (schema-evolved tables); percent-decoded add-action paths; 512-file cap per chunk; collision-proof __bf_<schema-fragment> staging table name; chunk_paths absent -> loud error instead of silently applying one file; blocked-vs-eligible backlog split (blocked batches get their own gauges instead of pinning the loss alert); planner threads close_old_connections; [:50] slice ordered by updated_at; --replan-backfill respects --dry-run and validates the schema id; DuckgresSinkSchemaState registered in the IDOR coverage exemptions. Tests: delta-fetch invisibility of pre-succeeded rows, retire-opens-gate, nonce uniqueness, blocked/eligible backlog split, updated processor and consumer expectations. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
5343813 to
c9ee56b
Compare
…ckfill reconciler Every terminal-retire writer (supersede sweep, retire-at-plan, replan) now stamps a structured 'kind' into error_response; the reconciler dispatches on it instead of error-message prose (prefix fallback kept for rows written before 'kind' existed). Also hoists the live-vs-backfill batch SQL predicate into a single LIVE_BATCH_SQL_PREDICATE constant (Python twin: processor._is_backfill_batch). Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…cutoff, NEEDS_RESYNC healing Addresses the second external review round: 1. Half-planned wedge: planning is now a two-phase durable claim. The PENDING->BACKFILLING CAS only leases the schema; the real claim is the plan CAS that writes run_uuid/snapshot_version/plan_cutoff/chunk_count atomically BEFORE any queue rows exist. A crash before the plan CAS is healed by a planning lease (reconcile resets run-less BACKFILLING rows to PENDING after 15m); a crash after it is healed by reconcile re-running retire + enqueue from the stored plan (all idempotent). 2. Snapshot/queue boundary: retiring 'everything currently queued' assumed batches can't become delta-succeeded between snapshot resolution and the retire — false. Replaced with a provable containment rule: capture a queue-DB clock BEFORE resolving the snapshot, and retire only runs whose EVERY batch has a delta-succeeded status older than that cutoff (the Delta consumer marks success after the commit, so such runs committed at or below the pinned version). Runs straddling the cutoff survive and apply AFTER the swap — enabled by exempting backfill chunks from the cross-run gate against live runs (they still order behind older backfill runs so two generations can never interleave on the staging table), while live batches keep queueing behind the backfill run. plan_cutoff is persisted on the state row so crash-healing re-retires with identical semantics. 3. NEEDS_RESYNC deadlock: the documented healing path (a full-refresh resync) was itself blocked by blocked_schema_ids. Reconcile now watches NEEDS_RESYNC schemas and CAS-flips them to PRIMED when a pending live replace-head appears; the supersede sweep retires older queued runs around it on its own cadence. 4. Batch-kind convention centralized in batch_kind.py: one Python predicate, one SQL fragment, one metadata builder/parser — consumed by the queue SQL, the processor dispatch, and the planner enqueue. A typed kind column on sourcebatch remains the follow-up migration. Tests: cross-run exemption (chunks ignore older live runs; live batches still queue behind the backfill run), run-level cutoff retire incl. idempotent re-run and a straddling run surviving, updated processor/planner expectations. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…duckgres-backfill-primer # Conflicts: # posthog/migrations/max_migration.txt
Resolve conflicts from master's relocation of the data_imports tree into products/warehouse_sources (#65817): - Move the PR's duckgres backfill code and modifications to the new products/warehouse_sources/backend/temporal/data_imports path, 3-way merging the duckgres files (consumer, jobs_db, processor + tests) against the duckgres-v3-consumer base and relocating the backfill-only files. - Rewrite posthog.temporal.data_imports imports to the new package path in the management commands and the ducklake copy workflow. - Take master's feature_enabled_or_false gate in the ducklake copy workflow (and its tests) over the PR's posthoganalytics.feature_enabled. - Keep both new models (DuckgresServerTeam from master, DuckgresSinkSchemaState from the PR) in models/__init__ and the IDOR coverage allowlist. - Renumber the duckgres sink schema-state migration to 1239 behind master's 1238. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…e hot-table FKs The migration-risk policy (HotTableAlterPolicy) blocked the new table: real FK constraints on posthog_team and posthog_user take a lock on those hot tables when the table is created. Set db_constraint=False on both FKs (the policy's recommended lock-free path); the links stay enforced at the app level. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Two correctness fixes from review of the backfill state machine: - Superseded-by-replace no longer flips straight to PRIMED. The replacing live refresh has only reached Delta when it retires the backfill and may still fail in duckgres; park the schema in NEEDS_RESYNC so the resync path promotes to PRIMED only once the replace run's final marker reaches duckgres-succeeded. - enqueue_chunks serializes per run_uuid with a session advisory lock (distinct namespace from the batch-claim locks). (run_uuid, batch_index) has no DB uniqueness guard, so concurrent _reconcile_one replays could otherwise insert duplicate chunk rows. Presence/applied checks now count DISTINCT batch_index so a stray duplicate can never mask a genuinely missing chunk. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
|
Reviews (2): Last reviewed commit: "Merge branch 'master' into eric/duckgres..." | Re-trigger Greptile |
The repo-root conftest.py registers an autouse _activate_personhog_fake fixture that imports posthog -> django. The ci-scripts.yml job runs pytest with only pytest installed (no django) and triggers on .github/scripts/** changes, so the fixture's import crashed every script test with ModuleNotFoundError: django. Shadow the fixture with a no-op in .github/scripts/conftest.py. These tests never read person/group data, so the personhog fake isn't needed; the main suite is unaffected (the override only applies under .github/scripts). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Master's move PR relocated the warehouse_sources management commands to products/warehouse_sources/backend/management/commands/, but this branch still carried copies at the old posthog/management/commands/ path (the move couldn't be detected as a rename against the branch's independently-added copies). Two apps defining run_warehouse_sources_duckgres_load / reset_duckgres_failed_runs is a Django command-name collision. Remove the old-path duplicates and fold this branch's only divergence — the --replan-backfill option on reset_duckgres_failed_runs — into the canonical products/ command. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
|
⏭️ Skipped snapshot commit because branch advanced to The new commit will trigger its own snapshot update workflow. If you expected this workflow to succeed: This can happen due to concurrent commits. To get a fresh workflow run, either:
|
Query snapshots: Backend query snapshots updatedChanges: 1 snapshots (1 modified, 0 added, 0 deleted) What this means:
Next steps:
|
Resolve conflicts from master's duckgres-model consolidation (#63834, migrations 1240-1242) which dropped DuckLakeBackfill and DuckLakeCatalog: - ducklake/models.py: take master's consolidated DuckgresServer + DuckgresServerTeam, keep this PR's DuckgresSinkSchemaState (with the db_constraint=False hot-table FKs). - models/__init__: drop DuckLakeBackfill/DuckLakeCatalog imports + __all__ entries (master removed the models), keep DuckgresSinkSchemaState. - Renumber the sink-state migration 1239 -> 1243 behind master's 1242 (master also added 1239_migrate_approvals_models), re-depending on 1242_consolidate_duckgres_models_drop. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Only conflict was the migration head: master added 1243_alter_team_options (off 1242), colliding with this PR's 1243_duckgres_sink_schema_state. Renumber the sink-state migration to 1244, re-depending on 1243_alter_team_options. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Query snapshots: Backend query snapshots updatedChanges: 1 snapshots (1 modified, 0 added, 0 deleted) What this means:
Next steps:
|
Migration-head conflict only: master added 1244_team_event_retention_months (off 1243), colliding with this PR's sink-state migration. Renumber to 1245, re-depending on 1244_team_event_retention_months. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Problem
When the duckgres batch sink (#59251) is enabled for a team, pre-existing incremental/append schemas get a permanently partial duckgres table: their history lives in Delta, the queue only retains ~7 days of batches, and the sink's per-batch stream never replays it. The legacy fix —
DuckLakeCopyDataImportsWorkflow's full-tableCREATE OR REPLACE … AS delta_scan(...)— hangs/OOMs on huge tables: one giant statement in one duckgres worker session, bounded by the session memory budget, with no checkpoint or resume.Design
Backfill becomes normal sink traffic instead of one heroic statement:
DuckgresSinkSchemaState(app DB):PENDING_BACKFILL → BACKFILLING → PRIMED(+NEEDS_RESYNC). Live batches for a schema are blocked until PRIMED — including schemas with no state row yet, so there is no partial-history window at flag flip. Full-refresh/new/CDC schemas go straight to PRIMED.duckgres_backfill_staging/(outside the extract-retention sweep). Re-planning inserts only missing chunks, so queue-partition drops cost no progress.sourcebatchrows get asourcebatchstatus='succeeded'row up front — the Delta consumer never claims them, the duckgres fetch sees them immediately. Zero Delta-side changes.DROP TABLE IF EXISTS live+ALTER TABLE __backfill RENAME TO live+ apply marker. Rename is metadata-only in DuckLake (integration-tested in duckgres:tests/integration/ddl_test.go:144), so swap cost is size-independent and readers never see a half-built table.is_resume=trueso they can never act as supersede replace-heads;MAX_CONCURRENT_BACKFILLS_PER_ORG=1keeps org workers usable.reset_duckgres_failed_runs --replan-backfill <schema-id>;duckgres_backfill_schemas{state}gauge; progress denormalized on the state row.How did you test this code?
Stacked on
#59251 (
duckgres-v3-consumer) — uses its queue SQL, apply markers, enablement, and processor machinery.Publish to changelog?
do not publish to changelog
🤖 Generated with Claude Code