-
Notifications
You must be signed in to change notification settings - Fork 475
Expand file tree
/
Copy pathfile.py
More file actions
365 lines (296 loc) · 12.3 KB
/
file.py
File metadata and controls
365 lines (296 loc) · 12.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# pylint: disable=W0621
"""Avro reader for reading Avro files."""
from __future__ import annotations
import io
import json
import os
import threading
from collections.abc import Callable
from dataclasses import dataclass
from enum import Enum
from types import TracebackType
from typing import (
Generic,
TypeVar,
)
from cachetools import LRUCache
from pyiceberg.avro.codecs import AVRO_CODEC_KEY, CODEC_MAPPING_ICEBERG_TO_AVRO, KNOWN_CODECS
from pyiceberg.avro.codecs.codec import Codec
from pyiceberg.avro.decoder import BinaryDecoder, new_decoder
from pyiceberg.avro.encoder import BinaryEncoder
from pyiceberg.avro.reader import Reader
from pyiceberg.avro.resolver import construct_reader, construct_writer, resolve_reader, resolve_writer
from pyiceberg.avro.writer import Writer
from pyiceberg.io import InputFile, OutputFile, OutputStream
from pyiceberg.schema import Schema
from pyiceberg.typedef import EMPTY_DICT, Record, StructProtocol
from pyiceberg.types import (
FixedType,
MapType,
NestedField,
StringType,
StructType,
)
from pyiceberg.utils.schema_conversion import AvroSchemaConversion
VERSION = 1
MAGIC = bytes(b"Obj" + bytearray([VERSION]))
MAGIC_SIZE = len(MAGIC)
SYNC_SIZE = 16
META_SCHEMA = StructType(
NestedField(name="magic", field_id=100, field_type=FixedType(length=MAGIC_SIZE), required=True),
NestedField(
field_id=200,
name="meta",
field_type=MapType(key_id=201, key_type=StringType(), value_id=202, value_type=StringType(), value_required=True),
required=True,
),
NestedField(field_id=300, name="sync", field_type=FixedType(length=SYNC_SIZE), required=True),
)
_SCHEMA_KEY = "avro.schema"
# Cache for Avro-to-Iceberg schema conversion, keyed by raw schema JSON string.
# Manifests of the same type share the same Avro schema, so this avoids
# redundant JSON parsing and schema conversion on every manifest open.
_schema_cache: LRUCache[str, Schema] = LRUCache(maxsize=32)
_schema_cache_lock = threading.Lock()
# Cache for resolved reader trees, keyed by object identity of (file_schema,
# read_schema, read_types, read_enums). Reader objects are stateless — read()
# takes a decoder and returns decoded data without mutating self, so sharing
# cached readers across threads and calls is safe.
_reader_cache: LRUCache[tuple[int, ...], Reader] = LRUCache(maxsize=32)
_reader_cache_lock = threading.Lock()
def _cached_avro_to_iceberg(avro_schema_string: str) -> Schema:
"""Convert an Avro schema JSON string to an Iceberg Schema, with caching."""
with _schema_cache_lock:
if avro_schema_string in _schema_cache:
return _schema_cache[avro_schema_string]
schema = AvroSchemaConversion().avro_to_iceberg(json.loads(avro_schema_string))
with _schema_cache_lock:
_schema_cache[avro_schema_string] = schema
return schema
def _cached_resolve_reader(
file_schema: Schema,
read_schema: Schema,
read_types: dict[int, Callable[..., StructProtocol]],
read_enums: dict[int, Callable[..., Enum]],
) -> Reader:
"""Resolve a reader tree for the given schema pair, with caching."""
key = (id(file_schema), id(read_schema), id(read_types), id(read_enums))
with _reader_cache_lock:
if key in _reader_cache:
return _reader_cache[key]
reader = resolve_reader(file_schema, read_schema, read_types, read_enums)
with _reader_cache_lock:
_reader_cache[key] = reader
return reader
class AvroFileHeader(Record):
@property
def magic(self) -> bytes:
return self._data[0]
@property
def meta(self) -> dict[str, str]:
return self._data[1]
@property
def sync(self) -> bytes:
return self._data[2]
def compression_codec(self) -> type[Codec] | None:
"""Get the file's compression codec algorithm from the file's metadata.
In the case of a null codec, we return a None indicating that we
don't need to compress/decompress.
"""
from pyiceberg.table import TableProperties
codec_name = self.meta.get(AVRO_CODEC_KEY, TableProperties.WRITE_AVRO_COMPRESSION_DEFAULT)
if codec_name not in KNOWN_CODECS:
raise ValueError(f"Unsupported codec: {codec_name}")
return KNOWN_CODECS[codec_name] # type: ignore
def get_schema(self) -> Schema:
if _SCHEMA_KEY in self.meta:
return _cached_avro_to_iceberg(self.meta[_SCHEMA_KEY])
else:
raise ValueError("No schema found in Avro file headers")
D = TypeVar("D", bound=StructProtocol)
@dataclass
class Block(Generic[D]):
reader: Reader
block_records: int
block_decoder: BinaryDecoder
position: int = 0
def __iter__(self) -> Block[D]:
"""Return an iterator for the Block class."""
return self
def has_next(self) -> bool:
return self.position < self.block_records
def __next__(self) -> D:
"""Return the next item when iterating over the Block class."""
if self.has_next():
self.position += 1
return self.reader.read(self.block_decoder)
raise StopIteration
class AvroFile(Generic[D]):
__slots__ = (
"input_file",
"read_schema",
"read_types",
"read_enums",
"header",
"schema",
"reader",
"decoder",
"block",
)
input_file: InputFile
read_schema: Schema | None
read_types: dict[int, Callable[..., StructProtocol]]
read_enums: dict[int, Callable[..., Enum]]
header: AvroFileHeader
schema: Schema
reader: Reader
decoder: BinaryDecoder
block: Block[D] | None
def __init__(
self,
input_file: InputFile,
read_schema: Schema | None = None,
read_types: dict[int, Callable[..., StructProtocol]] = EMPTY_DICT,
read_enums: dict[int, Callable[..., Enum]] = EMPTY_DICT,
) -> None:
self.input_file = input_file
self.read_schema = read_schema
self.read_types = read_types
self.read_enums = read_enums
self.block = None
def __enter__(self) -> AvroFile[D]:
"""Generate a reader tree for the payload within an avro file.
Return:
A generator returning the AvroStructs.
"""
with self.input_file.open() as f:
self.decoder = new_decoder(f.read())
self.header = self._read_header()
self.schema = self.header.get_schema()
if not self.read_schema:
self.read_schema = self.schema
self.reader = _cached_resolve_reader(self.schema, self.read_schema, self.read_types, self.read_enums)
return self
def __exit__(self, exctype: type[BaseException] | None, excinst: BaseException | None, exctb: TracebackType | None) -> None:
"""Perform cleanup when exiting the scope of a 'with' statement."""
def __iter__(self) -> AvroFile[D]:
"""Return an iterator for the AvroFile class."""
return self
def _read_block(self) -> int:
# If there is already a block, we'll have the sync bytes
if self.block:
sync_marker = self.decoder.read(SYNC_SIZE)
if sync_marker != self.header.sync:
raise ValueError(f"Expected sync bytes {self.header.sync!r}, but got {sync_marker!r}")
block_records = self.decoder.read_int()
block_bytes = self.decoder.read_bytes()
if codec := self.header.compression_codec():
block_bytes = codec.decompress(block_bytes)
self.block = Block(reader=self.reader, block_records=block_records, block_decoder=new_decoder(block_bytes))
return block_records
def __next__(self) -> D:
"""Return the next item when iterating over the AvroFile class."""
if self.block and self.block.has_next():
return next(self.block)
try:
new_block = self._read_block()
except EOFError as exc:
raise StopIteration from exc
if new_block > 0:
return self.__next__()
raise StopIteration
def _read_header(self) -> AvroFileHeader:
return construct_reader(META_SCHEMA, {-1: AvroFileHeader}).read(self.decoder)
class AvroOutputFile(Generic[D]):
output_file: OutputFile
output_stream: OutputStream
file_schema: Schema
schema_name: str
encoder: BinaryEncoder
sync_bytes: bytes
writer: Writer
def __init__(
self,
output_file: OutputFile,
file_schema: Schema,
schema_name: str,
record_schema: Schema | None = None,
metadata: dict[str, str] = EMPTY_DICT,
) -> None:
self.output_file = output_file
self.file_schema = file_schema
self.schema_name = schema_name
self.sync_bytes = os.urandom(SYNC_SIZE)
self.writer = (
construct_writer(file_schema=self.file_schema)
if record_schema is None
else resolve_writer(record_schema=record_schema, file_schema=self.file_schema)
)
self.metadata = metadata
def __enter__(self) -> AvroOutputFile[D]:
"""
Open the file and writes the header.
Returns:
The file object to write records to
"""
self.output_stream = self.output_file.create(overwrite=True)
self.encoder = BinaryEncoder(self.output_stream)
self._write_header()
return self
def __exit__(self, exctype: type[BaseException] | None, excinst: BaseException | None, exctb: TracebackType | None) -> None:
"""Perform cleanup when exiting the scope of a 'with' statement."""
self.output_stream.close()
def _write_header(self) -> None:
from pyiceberg.table import TableProperties
codec_name = self.metadata.get(AVRO_CODEC_KEY, TableProperties.WRITE_AVRO_COMPRESSION_DEFAULT)
if avro_codec_name := CODEC_MAPPING_ICEBERG_TO_AVRO.get(codec_name):
codec_name = avro_codec_name
json_schema = json.dumps(AvroSchemaConversion().iceberg_to_avro(self.file_schema, schema_name=self.schema_name))
meta = {**self.metadata, _SCHEMA_KEY: json_schema, AVRO_CODEC_KEY: codec_name}
header = AvroFileHeader(MAGIC, meta, self.sync_bytes)
construct_writer(META_SCHEMA).write(self.encoder, header)
def compression_codec(self) -> type[Codec] | None:
"""Get the file's compression codec algorithm from the file's metadata.
In the case of a null codec, we return a None indicating that we
don't need to compress/decompress.
"""
from pyiceberg.table import TableProperties
codec_name = self.metadata.get(AVRO_CODEC_KEY, TableProperties.WRITE_AVRO_COMPRESSION_DEFAULT)
if avro_codec_name := CODEC_MAPPING_ICEBERG_TO_AVRO.get(codec_name):
codec_name = avro_codec_name
if codec_name not in KNOWN_CODECS:
raise ValueError(f"Unsupported codec: {codec_name}")
return KNOWN_CODECS[codec_name] # type: ignore
def write_block(self, objects: list[D]) -> None:
in_memory = io.BytesIO()
block_content_encoder = BinaryEncoder(output_stream=in_memory)
for obj in objects:
self.writer.write(block_content_encoder, obj)
block_content = in_memory.getvalue()
self.encoder.write_int(len(objects))
if codec := self.compression_codec():
content, content_length = codec.compress(block_content)
self.encoder.write_int(content_length)
self.encoder.write(content)
else:
self.encoder.write_int(len(block_content))
self.encoder.write(block_content)
self.encoder.write(self.sync_bytes)
def tell(self) -> int:
return self.output_stream.tell()