Skip to content

Commit e358c3c

Browse files
committed
ALP: Add writer pipeline integration and bidirectional cross-language tests
Wire ALP encoding into Java's writer pipeline so ParquetWriter can produce ALP-encoded parquet files. This enables bidirectional interop testing: Java writes ALP parquet that C++ reads, complementing the existing C++ writes / Java reads direction. - Add alpEnabled ColumnProperty to ParquetProperties with isAlpEnabled() - Update DefaultV2ValuesWriterFactory to select ALP writers for float/double - Add withAlpEncoding() builder methods to ParquetWriter - Add GenerateAlpParquet utility to produce test files from CSV data - Add Java interop tests reading back Java-generated ALP parquet files
1 parent 9386419 commit e358c3c

7 files changed

Lines changed: 242 additions & 2 deletions

File tree

parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ public class ParquetProperties {
5050
public static final int DEFAULT_DICTIONARY_PAGE_SIZE = DEFAULT_PAGE_SIZE;
5151
public static final boolean DEFAULT_IS_DICTIONARY_ENABLED = true;
5252
public static final boolean DEFAULT_IS_BYTE_STREAM_SPLIT_ENABLED = false;
53+
public static final boolean DEFAULT_IS_ALP_ENABLED = false;
5354
public static final WriterVersion DEFAULT_WRITER_VERSION = WriterVersion.PARQUET_1_0;
5455
public static final boolean DEFAULT_ESTIMATE_ROW_COUNT_FOR_PAGE_SIZE_CHECK = true;
5556
public static final int DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK = 100;
@@ -132,6 +133,7 @@ public static WriterVersion fromString(String name) {
132133
private final int pageRowCountLimit;
133134
private final boolean pageWriteChecksumEnabled;
134135
private final ColumnProperty<ByteStreamSplitMode> byteStreamSplitEnabled;
136+
private final ColumnProperty<Boolean> alpEnabled;
135137
private final Map<String, String> extraMetaData;
136138
private final ColumnProperty<Boolean> statistics;
137139
private final ColumnProperty<Boolean> sizeStatistics;
@@ -164,6 +166,7 @@ private ParquetProperties(Builder builder) {
164166
this.pageRowCountLimit = builder.pageRowCountLimit;
165167
this.pageWriteChecksumEnabled = builder.pageWriteChecksumEnabled;
166168
this.byteStreamSplitEnabled = builder.byteStreamSplitEnabled.build();
169+
this.alpEnabled = builder.alpEnabled.build();
167170
this.extraMetaData = builder.extraMetaData;
168171
this.statistics = builder.statistics.build();
169172
this.sizeStatistics = builder.sizeStatistics.build();
@@ -259,6 +262,20 @@ public boolean isByteStreamSplitEnabled(ColumnDescriptor column) {
259262
}
260263
}
261264

265+
/**
266+
* Returns true if ALP encoding is enabled for the given column.
267+
* ALP encoding is only applicable to FLOAT and DOUBLE columns.
268+
*/
269+
public boolean isAlpEnabled(ColumnDescriptor column) {
270+
switch (column.getPrimitiveType().getPrimitiveTypeName()) {
271+
case FLOAT:
272+
case DOUBLE:
273+
return alpEnabled.getValue(column);
274+
default:
275+
return false;
276+
}
277+
}
278+
262279
public ByteBufferAllocator getAllocator() {
263280
return allocator;
264281
}
@@ -416,6 +433,7 @@ public static class Builder {
416433
private int pageRowCountLimit = DEFAULT_PAGE_ROW_COUNT_LIMIT;
417434
private boolean pageWriteChecksumEnabled = DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED;
418435
private final ColumnProperty.Builder<ByteStreamSplitMode> byteStreamSplitEnabled;
436+
private final ColumnProperty.Builder<Boolean> alpEnabled;
419437
private Map<String, String> extraMetaData = new HashMap<>();
420438
private final ColumnProperty.Builder<Boolean> statistics;
421439
private final ColumnProperty.Builder<Boolean> sizeStatistics;
@@ -427,6 +445,7 @@ private Builder() {
427445
DEFAULT_IS_BYTE_STREAM_SPLIT_ENABLED
428446
? ByteStreamSplitMode.FLOATING_POINT
429447
: ByteStreamSplitMode.NONE);
448+
alpEnabled = ColumnProperty.<Boolean>builder().withDefaultValue(DEFAULT_IS_ALP_ENABLED);
430449
bloomFilterEnabled = ColumnProperty.<Boolean>builder().withDefaultValue(DEFAULT_BLOOM_FILTER_ENABLED);
431450
bloomFilterNDVs = ColumnProperty.<Long>builder().withDefaultValue(null);
432451
bloomFilterFPPs = ColumnProperty.<Double>builder().withDefaultValue(DEFAULT_BLOOM_FILTER_FPP);
@@ -457,6 +476,7 @@ private Builder(ParquetProperties toCopy) {
457476
this.numBloomFilterCandidates = ColumnProperty.builder(toCopy.numBloomFilterCandidates);
458477
this.maxBloomFilterBytes = toCopy.maxBloomFilterBytes;
459478
this.byteStreamSplitEnabled = ColumnProperty.builder(toCopy.byteStreamSplitEnabled);
479+
this.alpEnabled = ColumnProperty.builder(toCopy.alpEnabled);
460480
this.extraMetaData = toCopy.extraMetaData;
461481
this.statistics = ColumnProperty.builder(toCopy.statistics);
462482
this.sizeStatistics = ColumnProperty.builder(toCopy.sizeStatistics);
@@ -534,6 +554,29 @@ public Builder withExtendedByteStreamSplitEncoding(boolean enable) {
534554
return this;
535555
}
536556

557+
/**
558+
* Enable or disable ALP encoding for FLOAT and DOUBLE columns.
559+
*
560+
* @param enable whether ALP encoding should be enabled
561+
* @return this builder for method chaining.
562+
*/
563+
public Builder withAlpEncoding(boolean enable) {
564+
this.alpEnabled.withDefaultValue(enable);
565+
return this;
566+
}
567+
568+
/**
569+
* Enable or disable ALP encoding for the specified column.
570+
*
571+
* @param columnPath the path of the column (dot-string)
572+
* @param enable whether ALP encoding should be enabled
573+
* @return this builder for method chaining.
574+
*/
575+
public Builder withAlpEncoding(String columnPath, boolean enable) {
576+
this.alpEnabled.withValue(columnPath, enable);
577+
return this;
578+
}
579+
537580
/**
538581
* Set the Parquet format dictionary page size.
539582
*

parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV2ValuesWriterFactory.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.parquet.column.Encoding;
2626
import org.apache.parquet.column.ParquetProperties;
2727
import org.apache.parquet.column.values.ValuesWriter;
28+
import org.apache.parquet.column.values.alp.AlpValuesWriter;
2829
import org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesWriter;
2930
import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriterForInteger;
3031
import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriterForLong;
@@ -159,7 +160,9 @@ private ValuesWriter getInt96ValuesWriter(ColumnDescriptor path) {
159160

160161
private ValuesWriter getDoubleValuesWriter(ColumnDescriptor path) {
161162
final ValuesWriter fallbackWriter;
162-
if (this.parquetProperties.isByteStreamSplitEnabled(path)) {
163+
if (this.parquetProperties.isAlpEnabled(path)) {
164+
fallbackWriter = new AlpValuesWriter.DoubleAlpValuesWriter();
165+
} else if (this.parquetProperties.isByteStreamSplitEnabled(path)) {
163166
fallbackWriter = new ByteStreamSplitValuesWriter.DoubleByteStreamSplitValuesWriter(
164167
parquetProperties.getInitialSlabSize(),
165168
parquetProperties.getPageSizeThreshold(),
@@ -176,7 +179,9 @@ private ValuesWriter getDoubleValuesWriter(ColumnDescriptor path) {
176179

177180
private ValuesWriter getFloatValuesWriter(ColumnDescriptor path) {
178181
final ValuesWriter fallbackWriter;
179-
if (this.parquetProperties.isByteStreamSplitEnabled(path)) {
182+
if (this.parquetProperties.isAlpEnabled(path)) {
183+
fallbackWriter = new AlpValuesWriter.FloatAlpValuesWriter();
184+
} else if (this.parquetProperties.isByteStreamSplitEnabled(path)) {
180185
fallbackWriter = new ByteStreamSplitValuesWriter.FloatByteStreamSplitValuesWriter(
181186
parquetProperties.getInitialSlabSize(),
182187
parquetProperties.getPageSizeThreshold(),

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -705,6 +705,16 @@ public SELF withByteStreamSplitEncoding(String columnPath, boolean enableByteStr
705705
return self();
706706
}
707707

708+
public SELF withAlpEncoding(boolean enableAlp) {
709+
encodingPropsBuilder.withAlpEncoding(enableAlp);
710+
return self();
711+
}
712+
713+
public SELF withAlpEncoding(String columnPath, boolean enableAlp) {
714+
encodingPropsBuilder.withAlpEncoding(columnPath, enableAlp);
715+
return self();
716+
}
717+
708718
/**
709719
* Enable or disable dictionary encoding of the specified column for the constructed writer.
710720
*
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.parquet.hadoop;
20+
21+
import java.io.BufferedReader;
22+
import java.io.IOException;
23+
import java.io.InputStream;
24+
import java.io.InputStreamReader;
25+
import java.nio.charset.StandardCharsets;
26+
import java.nio.file.Files;
27+
import java.nio.file.Paths;
28+
import java.util.ArrayList;
29+
import java.util.List;
30+
import org.apache.hadoop.fs.Path;
31+
import org.apache.parquet.example.data.Group;
32+
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
33+
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
34+
import org.apache.parquet.schema.MessageType;
35+
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
36+
import org.apache.parquet.schema.Types;
37+
38+
/**
39+
* Standalone utility to generate ALP-encoded parquet files from CSV test data.
40+
*
41+
* <p>Reads the existing expect CSV files (alp_spotify1_expect.csv, alp_arade_expect.csv)
42+
* from test resources and writes ALP-encoded parquet files using the Java ALP encoder.
43+
*
44+
* <p>Usage: java GenerateAlpParquet [output_directory]
45+
* If no output directory is specified, files are written to the current directory.
46+
*/
47+
public class GenerateAlpParquet {
48+
49+
public static void main(String[] args) throws IOException {
50+
String outputDir = args.length > 0 ? args[0] : ".";
51+
Files.createDirectories(Paths.get(outputDir));
52+
53+
generateAlpParquet("/alp_arade_expect.csv", outputDir + "/alp_java_arade.parquet");
54+
System.out.println("Generated: " + outputDir + "/alp_java_arade.parquet");
55+
56+
generateAlpParquet("/alp_spotify1_expect.csv", outputDir + "/alp_java_spotify1.parquet");
57+
System.out.println("Generated: " + outputDir + "/alp_java_spotify1.parquet");
58+
}
59+
60+
private static void generateAlpParquet(String csvResource, String outputPath) throws IOException {
61+
// Read CSV
62+
String[] columnNames;
63+
List<double[]> rows = new ArrayList<>();
64+
65+
try (InputStream is = GenerateAlpParquet.class.getResourceAsStream(csvResource);
66+
BufferedReader br = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) {
67+
// Parse header
68+
String header = br.readLine();
69+
columnNames = header.split(",");
70+
71+
// Parse data rows
72+
String line;
73+
while ((line = br.readLine()) != null) {
74+
String[] parts = line.split(",");
75+
double[] values = new double[parts.length];
76+
for (int i = 0; i < parts.length; i++) {
77+
values[i] = Double.parseDouble(parts[i]);
78+
}
79+
rows.add(values);
80+
}
81+
}
82+
83+
// Build schema: all required DOUBLE columns
84+
Types.MessageTypeBuilder schemaBuilder = Types.buildMessage();
85+
for (String name : columnNames) {
86+
schemaBuilder.required(PrimitiveTypeName.DOUBLE).named(name);
87+
}
88+
MessageType schema = schemaBuilder.named("schema");
89+
90+
// Delete output file if it exists
91+
java.io.File outFile = new java.io.File(outputPath);
92+
if (outFile.exists()) {
93+
outFile.delete();
94+
}
95+
96+
// Write ALP-encoded parquet
97+
Path path = new Path(outFile.getAbsolutePath());
98+
SimpleGroupFactory groupFactory = new SimpleGroupFactory(schema);
99+
try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(path)
100+
.withType(schema)
101+
.withWriterVersion(org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_2_0)
102+
.withAlpEncoding(true)
103+
.withDictionaryEncoding(false)
104+
.build()) {
105+
for (double[] row : rows) {
106+
Group group = groupFactory.newGroup();
107+
for (int c = 0; c < columnNames.length; c++) {
108+
group.append(columnNames[c], row[c]);
109+
}
110+
writer.write(group);
111+
}
112+
}
113+
}
114+
}

parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInteropAlpEncoding.java

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,74 @@ public void testReadAlpSpotify1Parquet() throws IOException {
131131
}
132132
}
133133

134+
/**
135+
* Read the Java-generated ALP-encoded arade parquet file and verify all values
136+
* match the expected CSV.
137+
*/
138+
@Test
139+
public void testReadAlpJavaAradeParquet() throws IOException {
140+
Path parquetPath = resourcePath("alp_java_arade.parquet");
141+
String[] columnNames = {"value1", "value2", "value3", "value4"};
142+
int expectedRows = 15000;
143+
144+
double[][] expected = readExpectedCsv("/alp_arade_expect.csv", columnNames.length, expectedRows);
145+
146+
List<Group> rows = readParquetGroups(parquetPath);
147+
assertEquals("Row count should match", expectedRows, rows.size());
148+
149+
verifyAlpEncoding(parquetPath);
150+
151+
for (int r = 0; r < expectedRows; r++) {
152+
Group group = rows.get(r);
153+
for (int c = 0; c < columnNames.length; c++) {
154+
double actual = group.getDouble(columnNames[c], 0);
155+
assertEquals(
156+
String.format("Mismatch at row %d, column %s", r, columnNames[c]),
157+
Double.doubleToLongBits(expected[c][r]),
158+
Double.doubleToLongBits(actual));
159+
}
160+
}
161+
}
162+
163+
/**
164+
* Read the Java-generated ALP-encoded spotify1 parquet file and verify all values
165+
* match the expected CSV.
166+
*/
167+
@Test
168+
public void testReadAlpJavaSpotify1Parquet() throws IOException {
169+
Path parquetPath = resourcePath("alp_java_spotify1.parquet");
170+
String[] columnNames = {
171+
"danceability",
172+
"energy",
173+
"loudness",
174+
"speechiness",
175+
"acousticness",
176+
"instrumentalness",
177+
"liveness",
178+
"valence",
179+
"tempo"
180+
};
181+
int expectedRows = 15000;
182+
183+
double[][] expected = readExpectedCsv("/alp_spotify1_expect.csv", columnNames.length, expectedRows);
184+
185+
List<Group> rows = readParquetGroups(parquetPath);
186+
assertEquals("Row count should match", expectedRows, rows.size());
187+
188+
verifyAlpEncoding(parquetPath);
189+
190+
for (int r = 0; r < expectedRows; r++) {
191+
Group group = rows.get(r);
192+
for (int c = 0; c < columnNames.length; c++) {
193+
double actual = group.getDouble(columnNames[c], 0);
194+
assertEquals(
195+
String.format("Mismatch at row %d, column %s", r, columnNames[c]),
196+
Double.doubleToLongBits(expected[c][r]),
197+
Double.doubleToLongBits(actual));
198+
}
199+
}
200+
}
201+
134202
private List<Group> readParquetGroups(Path path) throws IOException {
135203
List<Group> rows = new ArrayList<>();
136204
try (ParquetReader<Group> reader =
176 KB
Binary file not shown.
304 KB
Binary file not shown.

0 commit comments

Comments
 (0)