Skip to content

Commit 05996ba

Browse files
committed
ALP: Add encoding comparison benchmark (ALP vs ZSTD vs BSS+ZSTD)
1 parent abbe36d commit 05996ba

1 file changed

Lines changed: 393 additions & 0 deletions

File tree

Lines changed: 393 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,393 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.parquet.hadoop;
20+
21+
import com.github.luben.zstd.Zstd;
22+
import java.io.IOException;
23+
import java.nio.ByteBuffer;
24+
import java.nio.ByteOrder;
25+
import java.util.Random;
26+
import org.apache.parquet.bytes.ByteBufferInputStream;
27+
import org.apache.parquet.bytes.HeapByteBufferAllocator;
28+
import org.apache.parquet.column.values.alp.AlpValuesReaderForDouble;
29+
import org.apache.parquet.column.values.alp.AlpValuesReaderForFloat;
30+
import org.apache.parquet.column.values.alp.AlpValuesWriter;
31+
import org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReaderForDouble;
32+
import org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReaderForFloat;
33+
import org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesWriter;
34+
import org.junit.BeforeClass;
35+
import org.junit.Test;
36+
37+
/**
38+
* Codec-level benchmark comparing ALP, ZSTD (plain), and ByteStreamSplit+ZSTD.
39+
*
40+
* <p>Comparable to C++ encoding_alp_benchmark.cc. Reports encode and decode
41+
* throughput in MB/s plus compression ratio for each encoding strategy.
42+
*/
43+
public class EncodingCompressionBenchmark {
44+
45+
private static final int NUM_VALUES = 1_000_000;
46+
private static final int WARMUP = 10;
47+
private static final int MEASURED = 30;
48+
49+
// Datasets
50+
private static double[] doubleDecimal;
51+
private static double[] doubleInteger;
52+
private static double[] doubleMixed;
53+
private static float[] floatDecimal;
54+
private static float[] floatInteger;
55+
private static float[] floatMixed;
56+
57+
@BeforeClass
58+
public static void setup() {
59+
Random rng = new Random(42);
60+
61+
doubleDecimal = new double[NUM_VALUES];
62+
for (int i = 0; i < NUM_VALUES; i++) {
63+
doubleDecimal[i] = Math.round(rng.nextDouble() * 10000) / 100.0;
64+
}
65+
66+
doubleInteger = new double[NUM_VALUES];
67+
for (int i = 0; i < NUM_VALUES; i++) {
68+
doubleInteger[i] = (double) rng.nextInt(100000);
69+
}
70+
71+
doubleMixed = new double[NUM_VALUES];
72+
for (int i = 0; i < NUM_VALUES; i++) {
73+
doubleMixed[i] = Math.round(rng.nextDouble() * 10000) / 100.0;
74+
}
75+
for (int i = 0; i < NUM_VALUES; i += 50) {
76+
doubleMixed[i] = Double.NaN;
77+
}
78+
79+
floatDecimal = new float[NUM_VALUES];
80+
for (int i = 0; i < NUM_VALUES; i++) {
81+
floatDecimal[i] = Math.round(rng.nextFloat() * 10000) / 100.0f;
82+
}
83+
84+
floatInteger = new float[NUM_VALUES];
85+
for (int i = 0; i < NUM_VALUES; i++) {
86+
floatInteger[i] = (float) rng.nextInt(100000);
87+
}
88+
89+
floatMixed = new float[NUM_VALUES];
90+
for (int i = 0; i < NUM_VALUES; i++) {
91+
floatMixed[i] = Math.round(rng.nextFloat() * 10000) / 100.0f;
92+
}
93+
for (int i = 0; i < NUM_VALUES; i += 50) {
94+
floatMixed[i] = Float.NaN;
95+
}
96+
}
97+
98+
@Test
99+
public void measureThroughput() throws IOException {
100+
System.out.println();
101+
System.out.println("=== Encoding/Compression Benchmark (1M values, " + MEASURED + " iters) ===");
102+
System.out.println();
103+
104+
String hdr = String.format(
105+
"%-35s %10s %10s %10s %10s %8s", "Dataset / Encoding", "Enc MB/s", "Dec MB/s", "Raw KB", "Comp KB", "Ratio");
106+
String sep = "-".repeat(hdr.length());
107+
108+
// --- Double datasets ---
109+
System.out.println("=== DOUBLE (8 bytes/value) ===");
110+
System.out.println(hdr);
111+
System.out.println(sep);
112+
113+
benchAllDouble("double_decimal", doubleDecimal);
114+
System.out.println();
115+
benchAllDouble("double_integer", doubleInteger);
116+
System.out.println();
117+
benchAllDouble("double_mixed(2%exc)", doubleMixed);
118+
System.out.println();
119+
120+
// --- Float datasets ---
121+
System.out.println("=== FLOAT (4 bytes/value) ===");
122+
System.out.println(hdr);
123+
System.out.println(sep);
124+
125+
benchAllFloat("float_decimal", floatDecimal);
126+
System.out.println();
127+
benchAllFloat("float_integer", floatInteger);
128+
System.out.println();
129+
benchAllFloat("float_mixed(2%exc)", floatMixed);
130+
System.out.println();
131+
}
132+
133+
// ---- Double benchmarks ----
134+
135+
private void benchAllDouble(String datasetName, double[] data) throws IOException {
136+
long rawBytes = (long) data.length * Double.BYTES;
137+
138+
// ALP
139+
byte[] alpComp = alpEncodeDoubles(data);
140+
benchEncodeDecode(
141+
datasetName + " ALP",
142+
rawBytes,
143+
alpComp.length,
144+
() -> alpEncodeDoubles(data),
145+
() -> alpDecodeDoubles(alpComp, data.length));
146+
147+
// PLAIN + ZSTD
148+
byte[] plainBytes = plainEncodeDoubles(data);
149+
byte[] plainZstd = Zstd.compress(plainBytes);
150+
benchEncodeDecode(
151+
datasetName + " PLAIN+ZSTD",
152+
rawBytes,
153+
plainZstd.length,
154+
() -> Zstd.compress(plainEncodeDoubles(data)),
155+
() -> {
156+
byte[] dec = Zstd.decompress(plainZstd, plainBytes.length);
157+
consumeDoublesFromPlain(dec, data.length);
158+
});
159+
160+
// BSS
161+
byte[] bssBytes = bssEncodeDoubles(data);
162+
benchEncodeDecode(
163+
datasetName + " BSS",
164+
rawBytes,
165+
bssBytes.length,
166+
() -> bssEncodeDoubles(data),
167+
() -> bssDecodeDoubles(bssBytes, data.length));
168+
169+
// BSS + ZSTD
170+
byte[] bssZstd = Zstd.compress(bssBytes);
171+
benchEncodeDecode(
172+
datasetName + " BSS+ZSTD",
173+
rawBytes,
174+
bssZstd.length,
175+
() -> Zstd.compress(bssEncodeDoubles(data)),
176+
() -> {
177+
byte[] dec = Zstd.decompress(bssZstd, bssBytes.length);
178+
bssDecodeDoublesFromRaw(dec, data.length);
179+
});
180+
}
181+
182+
// ---- Float benchmarks ----
183+
184+
private void benchAllFloat(String datasetName, float[] data) throws IOException {
185+
long rawBytes = (long) data.length * Float.BYTES;
186+
187+
// ALP
188+
byte[] alpComp = alpEncodeFloats(data);
189+
benchEncodeDecode(
190+
datasetName + " ALP",
191+
rawBytes,
192+
alpComp.length,
193+
() -> alpEncodeFloats(data),
194+
() -> alpDecodeFloats(alpComp, data.length));
195+
196+
// PLAIN + ZSTD
197+
byte[] plainBytes = plainEncodeFloats(data);
198+
byte[] plainZstd = Zstd.compress(plainBytes);
199+
benchEncodeDecode(
200+
datasetName + " PLAIN+ZSTD",
201+
rawBytes,
202+
plainZstd.length,
203+
() -> Zstd.compress(plainEncodeFloats(data)),
204+
() -> {
205+
byte[] dec = Zstd.decompress(plainZstd, plainBytes.length);
206+
consumeFloatsFromPlain(dec, data.length);
207+
});
208+
209+
// BSS
210+
byte[] bssBytes = bssEncodeFloats(data);
211+
benchEncodeDecode(
212+
datasetName + " BSS",
213+
rawBytes,
214+
bssBytes.length,
215+
() -> bssEncodeFloats(data),
216+
() -> bssDecodeFloats(bssBytes, data.length));
217+
218+
// BSS + ZSTD
219+
byte[] bssZstd = Zstd.compress(bssBytes);
220+
benchEncodeDecode(
221+
datasetName + " BSS+ZSTD",
222+
rawBytes,
223+
bssZstd.length,
224+
() -> Zstd.compress(bssEncodeFloats(data)),
225+
() -> {
226+
byte[] dec = Zstd.decompress(bssZstd, bssBytes.length);
227+
bssDecodeFloatsFromRaw(dec, data.length);
228+
});
229+
}
230+
231+
// ---- Benchmark harness ----
232+
233+
@FunctionalInterface
234+
interface BenchRunnable {
235+
void run() throws Exception;
236+
}
237+
238+
private void benchEncodeDecode(String name, long rawBytes, long compBytes, BenchRunnable enc, BenchRunnable dec)
239+
throws IOException {
240+
try {
241+
// Warmup
242+
for (int i = 0; i < WARMUP; i++) {
243+
enc.run();
244+
}
245+
long encNanos = 0;
246+
for (int i = 0; i < MEASURED; i++) {
247+
long t0 = System.nanoTime();
248+
enc.run();
249+
encNanos += System.nanoTime() - t0;
250+
}
251+
252+
for (int i = 0; i < WARMUP; i++) {
253+
dec.run();
254+
}
255+
long decNanos = 0;
256+
for (int i = 0; i < MEASURED; i++) {
257+
long t0 = System.nanoTime();
258+
dec.run();
259+
decNanos += System.nanoTime() - t0;
260+
}
261+
262+
double encMBps = (rawBytes * (double) MEASURED / (encNanos / 1e9)) / (1024.0 * 1024.0);
263+
double decMBps = (rawBytes * (double) MEASURED / (decNanos / 1e9)) / (1024.0 * 1024.0);
264+
double ratio = (double) compBytes / rawBytes * 100.0;
265+
266+
System.out.printf(
267+
"%-35s %10.1f %10.1f %10d %10d %7.1f%%%n",
268+
name, encMBps, decMBps, rawBytes / 1024, compBytes / 1024, ratio);
269+
} catch (Exception e) {
270+
throw new IOException("Benchmark failed for " + name, e);
271+
}
272+
}
273+
274+
// ---- ALP encode/decode ----
275+
276+
private static byte[] alpEncodeDoubles(double[] values) throws IOException {
277+
AlpValuesWriter.DoubleAlpValuesWriter writer = new AlpValuesWriter.DoubleAlpValuesWriter();
278+
for (double v : values) {
279+
writer.writeDouble(v);
280+
}
281+
return writer.getBytes().toByteArray();
282+
}
283+
284+
private static void alpDecodeDoubles(byte[] compressed, int numValues) throws IOException {
285+
AlpValuesReaderForDouble reader = new AlpValuesReaderForDouble();
286+
reader.initFromPage(numValues, ByteBufferInputStream.wrap(ByteBuffer.wrap(compressed)));
287+
for (int i = 0; i < numValues; i++) {
288+
reader.readDouble();
289+
}
290+
}
291+
292+
private static byte[] alpEncodeFloats(float[] values) throws IOException {
293+
AlpValuesWriter.FloatAlpValuesWriter writer = new AlpValuesWriter.FloatAlpValuesWriter();
294+
for (float v : values) {
295+
writer.writeFloat(v);
296+
}
297+
return writer.getBytes().toByteArray();
298+
}
299+
300+
private static void alpDecodeFloats(byte[] compressed, int numValues) throws IOException {
301+
AlpValuesReaderForFloat reader = new AlpValuesReaderForFloat();
302+
reader.initFromPage(numValues, ByteBufferInputStream.wrap(ByteBuffer.wrap(compressed)));
303+
for (int i = 0; i < numValues; i++) {
304+
reader.readFloat();
305+
}
306+
}
307+
308+
// ---- PLAIN encode/decode ----
309+
310+
private static byte[] plainEncodeDoubles(double[] values) {
311+
ByteBuffer buf = ByteBuffer.allocate(values.length * Double.BYTES).order(ByteOrder.LITTLE_ENDIAN);
312+
for (double v : values) {
313+
buf.putDouble(v);
314+
}
315+
return buf.array();
316+
}
317+
318+
private static void consumeDoublesFromPlain(byte[] raw, int numValues) {
319+
ByteBuffer buf = ByteBuffer.wrap(raw).order(ByteOrder.LITTLE_ENDIAN);
320+
for (int i = 0; i < numValues; i++) {
321+
buf.getDouble();
322+
}
323+
}
324+
325+
private static byte[] plainEncodeFloats(float[] values) {
326+
ByteBuffer buf = ByteBuffer.allocate(values.length * Float.BYTES).order(ByteOrder.LITTLE_ENDIAN);
327+
for (float v : values) {
328+
buf.putFloat(v);
329+
}
330+
return buf.array();
331+
}
332+
333+
private static void consumeFloatsFromPlain(byte[] raw, int numValues) {
334+
ByteBuffer buf = ByteBuffer.wrap(raw).order(ByteOrder.LITTLE_ENDIAN);
335+
for (int i = 0; i < numValues; i++) {
336+
buf.getFloat();
337+
}
338+
}
339+
340+
// ---- ByteStreamSplit encode/decode ----
341+
342+
private static byte[] bssEncodeDoubles(double[] values) throws IOException {
343+
ByteStreamSplitValuesWriter.DoubleByteStreamSplitValuesWriter writer =
344+
new ByteStreamSplitValuesWriter.DoubleByteStreamSplitValuesWriter(
345+
64 * 1024, 8 * 1024 * 1024, HeapByteBufferAllocator.getInstance());
346+
for (double v : values) {
347+
writer.writeDouble(v);
348+
}
349+
return writer.getBytes().toByteArray();
350+
}
351+
352+
private static void bssDecodeDoubles(byte[] encoded, int numValues) throws IOException {
353+
ByteStreamSplitValuesReaderForDouble reader = new ByteStreamSplitValuesReaderForDouble();
354+
reader.initFromPage(numValues, ByteBufferInputStream.wrap(ByteBuffer.wrap(encoded)));
355+
for (int i = 0; i < numValues; i++) {
356+
reader.readDouble();
357+
}
358+
}
359+
360+
private static void bssDecodeDoublesFromRaw(byte[] encoded, int numValues) throws IOException {
361+
ByteStreamSplitValuesReaderForDouble reader = new ByteStreamSplitValuesReaderForDouble();
362+
reader.initFromPage(numValues, ByteBufferInputStream.wrap(ByteBuffer.wrap(encoded)));
363+
for (int i = 0; i < numValues; i++) {
364+
reader.readDouble();
365+
}
366+
}
367+
368+
private static byte[] bssEncodeFloats(float[] values) throws IOException {
369+
ByteStreamSplitValuesWriter.FloatByteStreamSplitValuesWriter writer =
370+
new ByteStreamSplitValuesWriter.FloatByteStreamSplitValuesWriter(
371+
64 * 1024, 8 * 1024 * 1024, HeapByteBufferAllocator.getInstance());
372+
for (float v : values) {
373+
writer.writeFloat(v);
374+
}
375+
return writer.getBytes().toByteArray();
376+
}
377+
378+
private static void bssDecodeFloats(byte[] encoded, int numValues) throws IOException {
379+
ByteStreamSplitValuesReaderForFloat reader = new ByteStreamSplitValuesReaderForFloat();
380+
reader.initFromPage(numValues, ByteBufferInputStream.wrap(ByteBuffer.wrap(encoded)));
381+
for (int i = 0; i < numValues; i++) {
382+
reader.readFloat();
383+
}
384+
}
385+
386+
private static void bssDecodeFloatsFromRaw(byte[] encoded, int numValues) throws IOException {
387+
ByteStreamSplitValuesReaderForFloat reader = new ByteStreamSplitValuesReaderForFloat();
388+
reader.initFromPage(numValues, ByteBufferInputStream.wrap(ByteBuffer.wrap(encoded)));
389+
for (int i = 0; i < numValues; i++) {
390+
reader.readFloat();
391+
}
392+
}
393+
}

0 commit comments

Comments
 (0)