fix(ws): cap V3 batcher output by real Arrow bytes#66375
Conversation
|
Reviews (1): Last reviewed commit: "fix(data-warehouse): cap V3 batcher outp..." | Re-trigger Greptile |
| if pa.types.is_list(col_type): | ||
| elements = int(pc.sum(pc.list_value_length(col)).as_py() or 0) | ||
| return elements + col_length * 4 |
There was a problem hiding this comment.
list column payload bytes: element count used instead of element bytes
For list columns, the function returns element_count + offset_bytes rather than element_value_bytes + offset_bytes. For a list<int64> column, each child int64 takes 8 bytes, so a column with 1,000 elements would return 1000 + offset_bytes instead of 8000 + offset_bytes — an 8× undercount. For lists containing strings or large fixed-width types, the gap grows further. The docstring's claim of "value bytes + offset buffer" is therefore inaccurate for list types, and could mislead a future refactor into thinking it's measuring actual payload. The offset guard already protects against 32-bit offset overflow for list, but the byte cap meant to bound loader memory won't fire for wide list-of-primitive columns the way the comment implies.
| if pa.types.is_list(col_type): | |
| elements = int(pc.sum(pc.list_value_length(col)).as_py() or 0) | |
| return elements + col_length * 4 | |
| if pa.types.is_list(col_type): | |
| # Use element count as a proxy for offset pressure (mirrors _column_offset_pressure). | |
| # NOTE: this does NOT measure the child array's actual value bytes; for lists of large | |
| # elements the byte cap will undercount. The offset guard already prevents 32-bit | |
| # overflow; the byte cap is a best-effort bound for the common OOM driver (large | |
| # string/JSON cells), not a precise accounting of list child values. | |
| elements = int(pc.sum(pc.list_value_length(col)).as_py() or 0) | |
| return elements + col_length * 4 |
| def _set_ready(self, table: pa.Table) -> None: | ||
| """Split `table` so no yielded chunk can overflow a 32-bit offset column.""" | ||
| self._ready = deque(_split_to_offset_limit(table, self._max_column_offset_bytes)) | ||
| """Split `table` so no yielded chunk overflows a 32-bit offset column or exceeds | ||
| the per-table Arrow-payload cap (keeping the loader's per-batch merge bounded).""" | ||
| chunks = _split_table(table, offset_limit=self._max_column_offset_bytes, bytes_limit=self._max_table_bytes) | ||
| if len(chunks) > 1: | ||
| payload_bytes = _table_payload_bytes(table) | ||
| if payload_bytes > self._max_table_bytes: |
There was a problem hiding this comment.
Redundant
_table_payload_bytes call after split
_split_table already evaluates _table_payload_bytes(table) internally on its first call, so recomputing it here for the logging check duplicates O(n_rows × n_columns) work for every split event. Hoisting the call before _split_table avoids the redundancy.
| def _set_ready(self, table: pa.Table) -> None: | |
| """Split `table` so no yielded chunk can overflow a 32-bit offset column.""" | |
| self._ready = deque(_split_to_offset_limit(table, self._max_column_offset_bytes)) | |
| """Split `table` so no yielded chunk overflows a 32-bit offset column or exceeds | |
| the per-table Arrow-payload cap (keeping the loader's per-batch merge bounded).""" | |
| chunks = _split_table(table, offset_limit=self._max_column_offset_bytes, bytes_limit=self._max_table_bytes) | |
| if len(chunks) > 1: | |
| payload_bytes = _table_payload_bytes(table) | |
| if payload_bytes > self._max_table_bytes: | |
| def _set_ready(self, table: pa.Table) -> None: | |
| """Split `table` so no yielded chunk overflows a 32-bit offset column or exceeds | |
| the per-table Arrow-payload cap (keeping the loader's per-batch merge bounded).""" | |
| payload_bytes = _table_payload_bytes(table) | |
| chunks = _split_table(table, offset_limit=self._max_column_offset_bytes, bytes_limit=self._max_table_bytes) | |
| if len(chunks) > 1: | |
| if payload_bytes > self._max_table_bytes: |
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
Problem
The V3 warehouse loader (
warehouse-sources-load) OOMs on a small tail of imports. An overnight health check found V3 healthy overall (~99.6% success) except for a loader-specific OOM class: pods get SIGKILLed (exit 137) writing tiny-by-row-count batches (1–1,078 rows) that are heavy by width / cell size — Convex JSON documents, wide ad-platform stat schemas, large Postgres JSON/text columns.Root cause: the extract batcher bounds each batch only by row count and a Python
sys.getsizeofestimate, neither of which reflects the materialized Arrow table's memory. A wide batch can materialize into a multi-GiB Arrow table from a few thousand rows, which becomes the loader's per-batch Delta merge working set and blows past the pod memory limit.Changes
Generalize the batcher's existing post-materialization splitter so each yielded table is bounded by real, slice-accurate Arrow payload bytes (256 MiB default), reusing the proven row-halving recursion already used for the 32-bit offset-overflow guard.
_column_payload_bytes/_table_payload_bytes— sum value bytes + offset buffers (string/binary/large_string/large_binary/list) and fixed-width columns, deliberately avoidingArray.nbytes(which reports the full shared buffer for a zero-copy slice and would stop the recursion from converging)._split_tablenow row-halves while either a column's offset pressure exceeds the offset limit or the table's total payload exceeds the byte limit.num_rows <= 1base case preserved.max_table_bytesknob onBatcher(defaults apply, soPipelineV3needs no change).batcher_split_by_byteslog line when the byte cap drives a split, so we can confirm it firing in production logs.The batcher runs on the extract worker but is the single point that sets each batch's size for the S3 Parquet and (transitively) the loader's per-batch merge. Since
loader_peak ≈ per_batch × concurrency, this shrinks theper_batchterm without touching the consumer's 16-way concurrency model.Per-column string/binary/list payloads stay under ~1.4 GiB (< 2 GiB / 2³¹), so no column can hit the Arrow offset overflow either.
How did you test this code?
I'm an agent (PostHog Code, with the
investigating-warehouse-sources-loadskill for the diagnosis). I did not do manual/prod testing. Automated only:test_batcher.py— 36 tests pass,ruff check+ruff formatclean.test_split_table_byte_cap_sums_across_columns— the byte cap measures the total across columns, not the worst column (guards asum→maxregression that every single-column test would miss).test_table_payload_bytes_is_slice_accurate— guards against a refactor reintroducing.nbytes, which would silently break the recursion on zero-copy slices.test_column_payload_bytes(parameterized) — per-type byte accounting incl. binary, large_binary, list, bool (sub-byte→0), nulls, nested struct→0.test_batcher_logs_when_splitting_by_bytes/test_batcher_does_not_log_byte_split_for_offset_only_split— the observability signal fires on a byte split and not on an offset-only split.test_split_table_single_oversized_row_terminates— termination on a lone oversized row.🤖 Agent context
Autonomy: Human-driven (agent-assisted)
Diagnosis-first: used the
investigating-warehouse-sources-loadskill to triage the overnight V3 run health (Loki, VictoriaMetrics, federated Postgres viaexecute-sql), which isolated the OOM tail to 5 teams / wide tables and confirmed real SIGKILLs (exit 137) rather than native delta-rs crashes.Scope was deliberately narrowed to this one change. Two alternatives were considered and rejected for this PR: a byte-budget concurrency semaphore (riskier concurrency-primitive change), and passing
max_rows_per_group/target_file_sizeto delta-rs (complementary, separate). The byte cap was chosen because it cuts the per-batch memory term safely and is self-contained to one file + tests.One honest limitation, documented inline at the fallback: deeply-nested struct/map and sub-byte bool columns undercount to 0 — acceptable because the OOM driver is large string/JSON payloads, which are counted.
Created with PostHog Code