From 559eef3a6d6c406daafdf81d924503b2c6342013 Mon Sep 17 00:00:00 2001 From: Le Zhang Date: Fri, 10 Apr 2026 22:17:36 -0700 Subject: [PATCH] perf: Speed up purge_table by deduplicating manifest reads and parallelizing file deletion Three changes to reduce purge_table wall time from ~7s to ~0.13s (54x) on a table with 200 snapshots: 1. Deduplicate manifests by path before iterating in delete_data_files(). The same manifest appears across many snapshots' manifest lists. For 200 snapshots this reduces 20,100 manifest opens to 200. 2. Parallelize file deletion using the existing ExecutorFactory ThreadPoolExecutor, matching the pattern already used for manifest reading in plan_files() and data file reading in to_arrow(). This aligns with the Java reference implementation (CatalogUtil.dropTableData) which also deletes files concurrently via a worker thread pool. 3. Cache Avro-to-Iceberg schema conversion and reader tree resolution. All manifests of the same type share the same Avro schema, but it was being JSON-parsed, converted, and resolved into a reader tree on every open. Uses explicit threading.Lock for thread safety across all Python implementations. --- pyiceberg/avro/file.py | 51 ++++++++++++++++++++++++++++++++--- pyiceberg/catalog/__init__.py | 31 ++++++++++++++------- 2 files changed, 68 insertions(+), 14 deletions(-) diff --git a/pyiceberg/avro/file.py b/pyiceberg/avro/file.py index 7db92818fe..b10ef9247c 100644 --- a/pyiceberg/avro/file.py +++ b/pyiceberg/avro/file.py @@ -22,6 +22,7 @@ import io import json import os +import threading from collections.abc import Callable from dataclasses import dataclass from enum import Enum @@ -31,6 +32,8 @@ TypeVar, ) +from cachetools import LRUCache + from pyiceberg.avro.codecs import AVRO_CODEC_KEY, CODEC_MAPPING_ICEBERG_TO_AVRO, KNOWN_CODECS from pyiceberg.avro.codecs.codec import Codec from pyiceberg.avro.decoder import BinaryDecoder, new_decoder @@ -68,6 +71,48 @@ _SCHEMA_KEY = "avro.schema" +# Cache for Avro-to-Iceberg schema conversion, keyed by raw schema JSON string. +# Manifests of the same type share the same Avro schema, so this avoids +# redundant JSON parsing and schema conversion on every manifest open. +_schema_cache: LRUCache[str, Schema] = LRUCache(maxsize=32) +_schema_cache_lock = threading.Lock() + +# Cache for resolved reader trees, keyed by object identity of (file_schema, +# read_schema, read_types, read_enums). Reader objects are stateless — read() +# takes a decoder and returns decoded data without mutating self, so sharing +# cached readers across threads and calls is safe. +_reader_cache: LRUCache[tuple[int, ...], Reader] = LRUCache(maxsize=32) +_reader_cache_lock = threading.Lock() + + +def _cached_avro_to_iceberg(avro_schema_string: str) -> Schema: + """Convert an Avro schema JSON string to an Iceberg Schema, with caching.""" + with _schema_cache_lock: + if avro_schema_string in _schema_cache: + return _schema_cache[avro_schema_string] + schema = AvroSchemaConversion().avro_to_iceberg(json.loads(avro_schema_string)) + with _schema_cache_lock: + _schema_cache[avro_schema_string] = schema + return schema + + +def _cached_resolve_reader( + file_schema: Schema, + read_schema: Schema, + read_types: dict[int, Callable[..., StructProtocol]], + read_enums: dict[int, Callable[..., Enum]], +) -> Reader: + """Resolve a reader tree for the given schema pair, with caching.""" + key = (id(file_schema), id(read_schema), id(read_types), id(read_enums)) + with _reader_cache_lock: + if key in _reader_cache: + return _reader_cache[key] + reader = resolve_reader(file_schema, read_schema, read_types, read_enums) + with _reader_cache_lock: + _reader_cache[key] = reader + return reader + + class AvroFileHeader(Record): @property def magic(self) -> bytes: @@ -97,9 +142,7 @@ def compression_codec(self) -> type[Codec] | None: def get_schema(self) -> Schema: if _SCHEMA_KEY in self.meta: - avro_schema_string = self.meta[_SCHEMA_KEY] - avro_schema = json.loads(avro_schema_string) - return AvroSchemaConversion().avro_to_iceberg(avro_schema) + return _cached_avro_to_iceberg(self.meta[_SCHEMA_KEY]) else: raise ValueError("No schema found in Avro file headers") @@ -178,7 +221,7 @@ def __enter__(self) -> AvroFile[D]: if not self.read_schema: self.read_schema = self.schema - self.reader = resolve_reader(self.schema, self.read_schema, self.read_types, self.read_enums) + self.reader = _cached_resolve_reader(self.schema, self.read_schema, self.read_types, self.read_enums) return self diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py index 5797e1f050..603788f4db 100644 --- a/pyiceberg/catalog/__init__.py +++ b/pyiceberg/catalog/__init__.py @@ -66,6 +66,7 @@ RecursiveDict, TableVersion, ) +from pyiceberg.utils.concurrent import ExecutorFactory from pyiceberg.utils.config import Config, merge_config from pyiceberg.utils.properties import property_as_bool from pyiceberg.view import View @@ -90,6 +91,7 @@ MANIFEST_LIST = "manifest list" PREVIOUS_METADATA = "previous metadata" METADATA = "metadata" +DATA_FILE = "data" URI = "uri" LOCATION = "location" EXTERNAL_TABLE = "EXTERNAL_TABLE" @@ -284,7 +286,7 @@ def list_catalogs() -> list[str]: def delete_files(io: FileIO, files_to_delete: set[str], file_type: str) -> None: - """Delete files. + """Delete files in parallel. Log warnings if failing to delete any file. @@ -293,32 +295,41 @@ def delete_files(io: FileIO, files_to_delete: set[str], file_type: str) -> None: files_to_delete: A set of file paths to be deleted. file_type: The type of the file. """ - for file in files_to_delete: + + def _delete_file(file: str) -> None: try: io.delete(file) except OSError: logger.warning(f"Failed to delete {file_type} file {file}", exc_info=logger.isEnabledFor(logging.DEBUG)) + executor = ExecutorFactory.get_or_create() + list(executor.map(_delete_file, files_to_delete)) + def delete_data_files(io: FileIO, manifests_to_delete: list[ManifestFile]) -> None: """Delete data files linked to given manifests. + Deduplicates manifests by path before reading entries, since the same manifest + appears across multiple snapshots' manifest lists. Deletes data files in parallel. + Log warnings if failing to delete any file. Args: io: The FileIO used to delete the object. manifests_to_delete: A list of manifest contains paths of data files to be deleted. """ - deleted_files: dict[str, bool] = {} + unique_manifests: dict[str, ManifestFile] = {} for manifest_file in manifests_to_delete: + unique_manifests.setdefault(manifest_file.manifest_path, manifest_file) + + # Collect all unique data file paths + data_file_paths: set[str] = set() + for manifest_file in unique_manifests.values(): for entry in manifest_file.fetch_manifest_entry(io, discard_deleted=False): - path = entry.data_file.file_path - if not deleted_files.get(path, False): - try: - io.delete(path) - except OSError: - logger.warning(f"Failed to delete data file {path}", exc_info=logger.isEnabledFor(logging.DEBUG)) - deleted_files[path] = True + data_file_paths.add(entry.data_file.file_path) + + # Delete in parallel + delete_files(io, data_file_paths, DATA_FILE) def _import_catalog(name: str, catalog_impl: str, properties: Properties) -> Catalog | None: