Skip to content

Commit f989787

Browse files
committed
ALP: Add codec-level and pipeline throughput benchmarks
Add two benchmarks for measuring ALP performance: - AlpCodecThroughput: Codec-level encode/decode throughput in MB/s with 1M values across decimal/integer/mixed datasets for both float and double types. Directly comparable to C++ encoding_alp_benchmark. - AlpDecompressionThroughput: Full Parquet pipeline read throughput measuring end-to-end decompression on real parquet files. Also add Hadoop CRC files for Java-generated test parquet resources.
1 parent c7c798b commit f989787

6 files changed

Lines changed: 350 additions & 0 deletions

File tree

Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.parquet.column.values.alp.benchmark;
20+
21+
import java.io.IOException;
22+
import java.nio.ByteBuffer;
23+
import java.util.Random;
24+
import org.apache.parquet.bytes.ByteBufferInputStream;
25+
import org.apache.parquet.column.values.alp.AlpValuesReaderForDouble;
26+
import org.apache.parquet.column.values.alp.AlpValuesReaderForFloat;
27+
import org.apache.parquet.column.values.alp.AlpValuesWriter;
28+
import org.junit.BeforeClass;
29+
import org.junit.Test;
30+
31+
/**
32+
* Codec-level ALP throughput benchmark reporting MB/s.
33+
*
34+
* <p>Comparable to C++ encoding_alp_benchmark.cc. Measures encode and decode
35+
* throughput at the codec level (no Parquet pipeline overhead).
36+
*/
37+
public class AlpCodecThroughput {
38+
39+
private static final int NUM_VALUES = 1_000_000;
40+
private static final int WARMUP = 10;
41+
private static final int MEASURED = 30;
42+
43+
// Datasets
44+
private static double[] doubleDecimal;
45+
private static double[] doubleInteger;
46+
private static double[] doubleMixed;
47+
private static float[] floatDecimal;
48+
private static float[] floatInteger;
49+
private static float[] floatMixed;
50+
51+
// Pre-compressed
52+
private static byte[] doubleDecimalComp;
53+
private static byte[] doubleIntegerComp;
54+
private static byte[] doubleMixedComp;
55+
private static byte[] floatDecimalComp;
56+
private static byte[] floatIntegerComp;
57+
private static byte[] floatMixedComp;
58+
59+
@BeforeClass
60+
public static void setup() throws IOException {
61+
Random rng = new Random(42);
62+
63+
doubleDecimal = new double[NUM_VALUES];
64+
for (int i = 0; i < NUM_VALUES; i++) {
65+
doubleDecimal[i] = Math.round(rng.nextDouble() * 10000) / 100.0;
66+
}
67+
68+
doubleInteger = new double[NUM_VALUES];
69+
for (int i = 0; i < NUM_VALUES; i++) {
70+
doubleInteger[i] = (double) rng.nextInt(100000);
71+
}
72+
73+
doubleMixed = new double[NUM_VALUES];
74+
for (int i = 0; i < NUM_VALUES; i++) {
75+
doubleMixed[i] = Math.round(rng.nextDouble() * 10000) / 100.0;
76+
}
77+
for (int i = 0; i < NUM_VALUES; i += 50) {
78+
doubleMixed[i] = Double.NaN;
79+
}
80+
81+
floatDecimal = new float[NUM_VALUES];
82+
for (int i = 0; i < NUM_VALUES; i++) {
83+
floatDecimal[i] = Math.round(rng.nextFloat() * 10000) / 100.0f;
84+
}
85+
86+
floatInteger = new float[NUM_VALUES];
87+
for (int i = 0; i < NUM_VALUES; i++) {
88+
floatInteger[i] = (float) rng.nextInt(100000);
89+
}
90+
91+
floatMixed = new float[NUM_VALUES];
92+
for (int i = 0; i < NUM_VALUES; i++) {
93+
floatMixed[i] = Math.round(rng.nextFloat() * 10000) / 100.0f;
94+
}
95+
for (int i = 0; i < NUM_VALUES; i += 50) {
96+
floatMixed[i] = Float.NaN;
97+
}
98+
99+
doubleDecimalComp = compressDoubles(doubleDecimal);
100+
doubleIntegerComp = compressDoubles(doubleInteger);
101+
doubleMixedComp = compressDoubles(doubleMixed);
102+
floatDecimalComp = compressFloats(floatDecimal);
103+
floatIntegerComp = compressFloats(floatInteger);
104+
floatMixedComp = compressFloats(floatMixed);
105+
}
106+
107+
@Test
108+
public void measureThroughput() throws IOException {
109+
System.out.println();
110+
System.out.println("=== ALP Codec-Level Throughput (1M values) ===");
111+
System.out.printf("%-30s %10s %10s %10s %10s%n",
112+
"Dataset", "Enc MB/s", "Dec MB/s", "Raw KB", "Comp KB");
113+
System.out.println("------------------------------" +
114+
" ---------- ---------- ---------- ----------");
115+
116+
benchDouble("double_decimal", doubleDecimal, doubleDecimalComp);
117+
benchDouble("double_integer", doubleInteger, doubleIntegerComp);
118+
benchDouble("double_mixed(2%exc)", doubleMixed, doubleMixedComp);
119+
benchFloat("float_decimal", floatDecimal, floatDecimalComp);
120+
benchFloat("float_integer", floatInteger, floatIntegerComp);
121+
benchFloat("float_mixed(2%exc)", floatMixed, floatMixedComp);
122+
123+
System.out.println();
124+
}
125+
126+
private void benchDouble(String name, double[] data, byte[] compressed) throws IOException {
127+
long rawBytes = (long) data.length * Double.BYTES;
128+
129+
// Warmup encode
130+
for (int i = 0; i < WARMUP; i++) {
131+
compressDoubles(data);
132+
}
133+
// Measure encode
134+
long encNanos = 0;
135+
for (int i = 0; i < MEASURED; i++) {
136+
long t0 = System.nanoTime();
137+
compressDoubles(data);
138+
encNanos += System.nanoTime() - t0;
139+
}
140+
141+
// Warmup decode
142+
for (int i = 0; i < WARMUP; i++) {
143+
decompressDoubles(compressed, data.length);
144+
}
145+
// Measure decode
146+
long decNanos = 0;
147+
for (int i = 0; i < MEASURED; i++) {
148+
long t0 = System.nanoTime();
149+
decompressDoubles(compressed, data.length);
150+
decNanos += System.nanoTime() - t0;
151+
}
152+
153+
double encMBps = (rawBytes * MEASURED / (encNanos / 1e9)) / (1024.0 * 1024.0);
154+
double decMBps = (rawBytes * MEASURED / (decNanos / 1e9)) / (1024.0 * 1024.0);
155+
156+
System.out.printf("%-30s %10.1f %10.1f %10d %10d%n",
157+
name, encMBps, decMBps, rawBytes / 1024, compressed.length / 1024);
158+
}
159+
160+
private void benchFloat(String name, float[] data, byte[] compressed) throws IOException {
161+
long rawBytes = (long) data.length * Float.BYTES;
162+
163+
for (int i = 0; i < WARMUP; i++) {
164+
compressFloats(data);
165+
}
166+
long encNanos = 0;
167+
for (int i = 0; i < MEASURED; i++) {
168+
long t0 = System.nanoTime();
169+
compressFloats(data);
170+
encNanos += System.nanoTime() - t0;
171+
}
172+
173+
for (int i = 0; i < WARMUP; i++) {
174+
decompressFloats(compressed, data.length);
175+
}
176+
long decNanos = 0;
177+
for (int i = 0; i < MEASURED; i++) {
178+
long t0 = System.nanoTime();
179+
decompressFloats(compressed, data.length);
180+
decNanos += System.nanoTime() - t0;
181+
}
182+
183+
double encMBps = (rawBytes * MEASURED / (encNanos / 1e9)) / (1024.0 * 1024.0);
184+
double decMBps = (rawBytes * MEASURED / (decNanos / 1e9)) / (1024.0 * 1024.0);
185+
186+
System.out.printf("%-30s %10.1f %10.1f %10d %10d%n",
187+
name, encMBps, decMBps, rawBytes / 1024, compressed.length / 1024);
188+
}
189+
190+
private static byte[] compressDoubles(double[] values) throws IOException {
191+
AlpValuesWriter.DoubleAlpValuesWriter writer = new AlpValuesWriter.DoubleAlpValuesWriter();
192+
for (double v : values) {
193+
writer.writeDouble(v);
194+
}
195+
return writer.getBytes().toByteArray();
196+
}
197+
198+
private static byte[] compressFloats(float[] values) throws IOException {
199+
AlpValuesWriter.FloatAlpValuesWriter writer = new AlpValuesWriter.FloatAlpValuesWriter();
200+
for (float v : values) {
201+
writer.writeFloat(v);
202+
}
203+
return writer.getBytes().toByteArray();
204+
}
205+
206+
private static void decompressDoubles(byte[] compressed, int numValues) throws IOException {
207+
AlpValuesReaderForDouble reader = new AlpValuesReaderForDouble();
208+
reader.initFromPage(numValues, ByteBufferInputStream.wrap(ByteBuffer.wrap(compressed)));
209+
for (int i = 0; i < numValues; i++) {
210+
reader.readDouble();
211+
}
212+
}
213+
214+
private static void decompressFloats(byte[] compressed, int numValues) throws IOException {
215+
AlpValuesReaderForFloat reader = new AlpValuesReaderForFloat();
216+
reader.initFromPage(numValues, ByteBufferInputStream.wrap(ByteBuffer.wrap(compressed)));
217+
for (int i = 0; i < numValues; i++) {
218+
reader.readFloat();
219+
}
220+
}
221+
}
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.parquet.hadoop;
20+
21+
import java.io.IOException;
22+
import java.net.URISyntaxException;
23+
import org.apache.hadoop.fs.Path;
24+
import org.apache.parquet.example.data.Group;
25+
import org.apache.parquet.hadoop.example.GroupReadSupport;
26+
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
27+
import org.junit.Test;
28+
29+
/**
30+
* Measures ALP decompression throughput in bytes/second for real-world datasets.
31+
* Reports raw (uncompressed) bytes/second and compressed bytes/second.
32+
*/
33+
public class AlpDecompressionThroughput {
34+
35+
private static final int WARMUP_ITERS = 5;
36+
private static final int MEASURED_ITERS = 20;
37+
38+
private static Path resourcePath(String name) {
39+
try {
40+
return new Path(AlpDecompressionThroughput.class.getResource("/" + name).toURI());
41+
} catch (URISyntaxException e) {
42+
throw new RuntimeException(e);
43+
}
44+
}
45+
46+
@Test
47+
public void measureDecompressionThroughput() throws IOException {
48+
System.out.println();
49+
System.out.println("=== ALP Decompression Throughput ===");
50+
System.out.printf(
51+
"%-40s %8s %6s %6s %12s %12s %12s%n",
52+
"File", "Rows", "Cols", "Type", "Compressed", "Raw MB/s", "Comp MB/s");
53+
System.out.println(
54+
"---------------------------------------- -------- ------ ------ ------------ ------------ ------------");
55+
56+
// Double datasets
57+
benchmarkFile("alp_arade.parquet", 15000, 4, "double", 8);
58+
benchmarkFile("alp_spotify1.parquet", 15000, 9, "double", 8);
59+
benchmarkFile("alp_java_arade.parquet", 15000, 4, "double", 8);
60+
benchmarkFile("alp_java_spotify1.parquet", 15000, 9, "double", 8);
61+
62+
// Float datasets
63+
benchmarkFile("alp_float_arade.parquet", 15000, 4, "float", 4);
64+
benchmarkFile("alp_float_spotify1.parquet", 15000, 9, "float", 4);
65+
benchmarkFile("alp_java_float_arade.parquet", 15000, 4, "float", 4);
66+
benchmarkFile("alp_java_float_spotify1.parquet", 15000, 9, "float", 4);
67+
68+
System.out.println();
69+
}
70+
71+
private void benchmarkFile(String fileName, int expectedRows, int numCols, String type, int bytesPerValue)
72+
throws IOException {
73+
Path path = resourcePath(fileName);
74+
75+
// Get compressed file size from parquet metadata
76+
long compressedSize = 0;
77+
try (ParquetFileReader pfr = ParquetFileReader.open(
78+
org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(
79+
path, new org.apache.hadoop.conf.Configuration()))) {
80+
ParquetMetadata footer = pfr.getFooter();
81+
for (org.apache.parquet.hadoop.metadata.BlockMetaData block : footer.getBlocks()) {
82+
compressedSize += block.getTotalByteSize();
83+
}
84+
}
85+
86+
long rawBytes = (long) expectedRows * numCols * bytesPerValue;
87+
88+
// Warmup
89+
for (int i = 0; i < WARMUP_ITERS; i++) {
90+
readAllValues(path, type, numCols);
91+
}
92+
93+
// Measured runs
94+
long totalNanos = 0;
95+
for (int i = 0; i < MEASURED_ITERS; i++) {
96+
long start = System.nanoTime();
97+
readAllValues(path, type, numCols);
98+
totalNanos += System.nanoTime() - start;
99+
}
100+
101+
double avgSeconds = (totalNanos / (double) MEASURED_ITERS) / 1_000_000_000.0;
102+
double rawMBps = (rawBytes / avgSeconds) / (1024.0 * 1024.0);
103+
double compMBps = (compressedSize / avgSeconds) / (1024.0 * 1024.0);
104+
105+
System.out.printf(
106+
"%-40s %8d %6d %6s %12d %10.1f %10.1f%n",
107+
fileName, expectedRows, numCols, type, compressedSize, rawMBps, compMBps);
108+
}
109+
110+
private void readAllValues(Path path, String type, int numCols) throws IOException {
111+
try (ParquetReader<Group> reader =
112+
ParquetReader.builder(new GroupReadSupport(), path).build()) {
113+
Group group;
114+
if ("double".equals(type)) {
115+
while ((group = reader.read()) != null) {
116+
for (int c = 0; c < numCols; c++) {
117+
group.getDouble(c, 0);
118+
}
119+
}
120+
} else {
121+
while ((group = reader.read()) != null) {
122+
for (int c = 0; c < numCols; c++) {
123+
group.getFloat(c, 0);
124+
}
125+
}
126+
}
127+
}
128+
}
129+
}
1.39 KB
Binary file not shown.
Binary file not shown.
Binary file not shown.
2.38 KB
Binary file not shown.

0 commit comments

Comments
 (0)