Skip to content

Commit abbe36d

Browse files
committed
ALP: Add pipeline integration, reader buffer reuse, and validation
Writer pipeline integration: - Use CapacityByteArrayOutputStream for encoded vector storage instead of List<byte[]>, integrating with Parquet's memory management - Use BytesInput.concat() for zero-copy page assembly - Accept (initialCapacity, pageSize, ByteBufferAllocator) constructor params; factory now passes pipeline properties Reader memory efficiency: - Allocate decoded buffer once in initFromPage() and reuse across all vector decodes, eliminating per-vector float[]/double[] allocations - Improves decode throughput 5-24% across all datasets Reader validation: - Validate logVectorSize bounds (MIN_LOG to MAX_LOG) - Validate non-negative element count - Validate skip(n) bounds
1 parent 385d053 commit abbe36d

5 files changed

Lines changed: 147 additions & 95 deletions

File tree

parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpValuesReader.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@
3333
* <p>On {@link #initFromPage}, reads the 7-byte header and offset array but does NOT
3434
* decode any vectors. Vectors are decoded on demand when values are accessed.
3535
* {@link #skip()} is O(1) — it just advances the index.
36+
*
37+
* <p>Reuses the decoded buffer across vectors to reduce allocations.
38+
* Validates header fields (compression mode, integer encoding, log vector size bounds,
39+
* element count) and skip bounds.
3640
*/
3741
abstract class AlpValuesReader extends ValuesReader {
3842

@@ -42,7 +46,7 @@ abstract class AlpValuesReader extends ValuesReader {
4246
protected int currentIndex;
4347
protected int decodedVectorIndex = -1;
4448
protected int[] vectorOffsets;
45-
protected byte[] rawData; // all data after header
49+
protected byte[] rawData; // all data after header (offsets + vectors)
4650

4751
@Override
4852
public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException {
@@ -67,6 +71,13 @@ public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IO
6771
if (integerEncoding != INTEGER_ENCODING_FOR) {
6872
throw new ParquetDecodingException("Unsupported ALP integer encoding: " + integerEncoding);
6973
}
74+
if (logVectorSize < MIN_LOG_VECTOR_SIZE || logVectorSize > MAX_LOG_VECTOR_SIZE) {
75+
throw new ParquetDecodingException("Invalid ALP log vector size: " + logVectorSize
76+
+ ", must be between " + MIN_LOG_VECTOR_SIZE + " and " + MAX_LOG_VECTOR_SIZE);
77+
}
78+
if (totalCount < 0) {
79+
throw new ParquetDecodingException("Invalid ALP element count: " + totalCount);
80+
}
7081

7182
vectorSize = 1 << logVectorSize;
7283
numVectors = (totalCount + vectorSize - 1) / vectorSize;
@@ -76,6 +87,7 @@ public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IO
7687
if (numVectors == 0) {
7788
vectorOffsets = new int[0];
7889
rawData = new byte[0];
90+
allocateDecodedBuffer(vectorSize);
7991
return;
8092
}
8193

@@ -90,6 +102,8 @@ public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IO
90102
for (int i = 0; i < numVectors; i++) {
91103
vectorOffsets[i] = body.getInt();
92104
}
105+
106+
allocateDecodedBuffer(vectorSize);
93107
}
94108

95109
@Override
@@ -99,6 +113,10 @@ public void skip() {
99113

100114
@Override
101115
public void skip(int n) {
116+
if (n < 0 || currentIndex + n > totalCount) {
117+
throw new ParquetDecodingException(String.format(
118+
"Cannot skip %d elements. Current index: %d, total count: %d", n, currentIndex, totalCount));
119+
}
102120
currentIndex += n;
103121
}
104122

@@ -110,4 +128,7 @@ protected int elementsInVector(int vectorIdx) {
110128
int rem = totalCount % vectorSize;
111129
return (rem == 0) ? vectorSize : rem;
112130
}
131+
132+
/** Allocate the decoded buffer once; called from initFromPage. */
133+
protected abstract void allocateDecodedBuffer(int capacity);
113134
}

parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpValuesReaderForDouble.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,18 @@
2222

2323
/**
2424
* ALP values reader for double columns with lazy per-vector decoding.
25+
*
26+
* <p>Reuses the decoded buffer across vectors to reduce allocations.
2527
*/
2628
public class AlpValuesReaderForDouble extends AlpValuesReader {
2729

2830
private double[] decodedBuffer;
2931

32+
@Override
33+
protected void allocateDecodedBuffer(int capacity) {
34+
this.decodedBuffer = new double[capacity];
35+
}
36+
3037
@Override
3138
public double readDouble() {
3239
if (currentIndex >= totalCount) {
@@ -47,7 +54,6 @@ private void ensureVectorDecoded(int vectorIdx) {
4754
int dataOffset = vectorOffsets[vectorIdx];
4855
AlpCompression.DoubleCompressedVector cv =
4956
AlpCompression.DoubleCompressedVector.load(rawData, dataOffset, numElements);
50-
decodedBuffer = new double[numElements];
5157
AlpCompression.decompressDoubleVector(cv, decodedBuffer);
5258
decodedVectorIndex = vectorIdx;
5359
}

parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpValuesReaderForFloat.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,18 @@
2222

2323
/**
2424
* ALP values reader for float columns with lazy per-vector decoding.
25+
*
26+
* <p>Reuses the decoded buffer across vectors to reduce allocations.
2527
*/
2628
public class AlpValuesReaderForFloat extends AlpValuesReader {
2729

2830
private float[] decodedBuffer;
2931

32+
@Override
33+
protected void allocateDecodedBuffer(int capacity) {
34+
this.decodedBuffer = new float[capacity];
35+
}
36+
3037
@Override
3138
public float readFloat() {
3239
if (currentIndex >= totalCount) {
@@ -47,7 +54,6 @@ private void ensureVectorDecoded(int vectorIdx) {
4754
int dataOffset = vectorOffsets[vectorIdx];
4855
AlpCompression.FloatCompressedVector cv =
4956
AlpCompression.FloatCompressedVector.load(rawData, dataOffset, numElements);
50-
decodedBuffer = new float[numElements];
5157
AlpCompression.decompressFloatVector(cv, decodedBuffer);
5258
decodedVectorIndex = vectorIdx;
5359
}

0 commit comments

Comments
 (0)