Skip to content

Commit 3288872

Browse files
committed
feat: add native Parquet VARIANT column support for JSON and semi-structured fields
Write Kafka Connect JSON fields (Debezium CDC, Confluent Protobuf Struct, custom messages, maps, arrays) as native Parquet VARIANT columns instead of plain STRING, improving storage efficiency and query performance. - Upgrade parquet-java to 1.17.0 for VARIANT logical type support - https://parquet.apache.org/blog/2026/02/27/variant-type-in-apache-parquet-for-semi-structured-data/ - Add config: parquet.variant.enabled, parquet.variant.connect.names, parquet.variant.field.names - Auto-detect recursive schemas (google.protobuf.Struct) as VARIANT - Stream JSON-to-Variant conversion ported from Apache Spark (PR also raised in parquet-java, once approved, code will be neat here in this repo: apache/parquet-java#3415) - Unwrap Protobuf Struct/Value/ListValue/map-as-array to clean JSON - Graceful fallback for non-JSON values (e.g. __debezium_unavailable_value) - Feature is fully opt-in (disabled by default), zero impact on existing connectors
1 parent bba6b97 commit 3288872

13 files changed

Lines changed: 3444 additions & 14 deletions

kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,45 @@ public class S3SinkConnectorConfig extends StorageSinkConnectorConfig {
238238
public static final String TOMBSTONE_ENCODED_PARTITION = "tombstone.encoded.partition";
239239
public static final String TOMBSTONE_ENCODED_PARTITION_DEFAULT = "tombstone";
240240

241+
// Parquet Variant configs
242+
public static final String PARQUET_VARIANT_ENABLED_CONFIG = "parquet.variant.enabled";
243+
public static final boolean PARQUET_VARIANT_ENABLED_DEFAULT = false;
244+
public static final String PARQUET_VARIANT_ENABLED_DOC =
245+
"When enabled, fields identified as carrying semi-structured "
246+
+ "data will be written as Parquet VARIANT columns. "
247+
+ "Fields with recursive schemas (e.g. Protobuf "
248+
+ "google.protobuf.Struct) are auto-detected and "
249+
+ "converted to VARIANT, preventing StackOverflowError "
250+
+ "in AvroSchemaConverter. Additionally supports STRING "
251+
+ "fields with JSON (Debezium io.debezium.data.Json), "
252+
+ "complex STRUCT/MAP fields, and ARRAY fields. "
253+
+ "Provides type-preserving binary encoding, efficient "
254+
+ "field access, and better query performance in "
255+
+ "downstream engines "
256+
+ "(Spark, DuckDB, Snowflake, Databricks, Trino).";
257+
258+
public static final String PARQUET_VARIANT_CONNECT_NAMES_CONFIG =
259+
"parquet.variant.connect.names";
260+
public static final String PARQUET_VARIANT_CONNECT_NAMES_DEFAULT =
261+
"io.debezium.data.Json";
262+
public static final String PARQUET_VARIANT_CONNECT_NAMES_DOC =
263+
"Comma-separated list of Kafka Connect schema names whose "
264+
+ "fields should be written as Parquet VARIANT columns. "
265+
+ "Works for any field type (STRING, STRUCT, MAP, ARRAY).";
266+
267+
public static final String PARQUET_VARIANT_FIELD_NAMES_CONFIG =
268+
"parquet.variant.field.names";
269+
public static final String PARQUET_VARIANT_FIELD_NAMES_DEFAULT = "";
270+
public static final String PARQUET_VARIANT_FIELD_NAMES_DOC =
271+
"Comma-separated list of field names to explicitly write as "
272+
+ "Parquet VARIANT columns, regardless of their Connect "
273+
+ "schema name or type. Note: fields with recursive "
274+
+ "schemas (google.protobuf.Struct) are auto-detected "
275+
+ "and do NOT need to be listed here. Use this for "
276+
+ "STRING fields containing JSON (e.g. model inference "
277+
+ "logs) or custom Protobuf messages that you want "
278+
+ "stored as VARIANT.";
279+
241280
/**
242281
* Append schema name in s3-path
243282
*/
@@ -820,6 +859,47 @@ public static ConfigDef newConfigDef() {
820859
);
821860
}
822861

862+
{
863+
final String group = "Parquet Variant";
864+
int orderInGroup = 0;
865+
866+
configDef.define(
867+
PARQUET_VARIANT_ENABLED_CONFIG,
868+
Type.BOOLEAN,
869+
PARQUET_VARIANT_ENABLED_DEFAULT,
870+
Importance.MEDIUM,
871+
PARQUET_VARIANT_ENABLED_DOC,
872+
group,
873+
++orderInGroup,
874+
Width.SHORT,
875+
"Enable Parquet Variant for JSON fields"
876+
);
877+
878+
configDef.define(
879+
PARQUET_VARIANT_CONNECT_NAMES_CONFIG,
880+
Type.LIST,
881+
PARQUET_VARIANT_CONNECT_NAMES_DEFAULT,
882+
Importance.LOW,
883+
PARQUET_VARIANT_CONNECT_NAMES_DOC,
884+
group,
885+
++orderInGroup,
886+
Width.LONG,
887+
"Connect schema names to treat as Variant"
888+
);
889+
890+
configDef.define(
891+
PARQUET_VARIANT_FIELD_NAMES_CONFIG,
892+
Type.LIST,
893+
PARQUET_VARIANT_FIELD_NAMES_DEFAULT,
894+
Importance.LOW,
895+
PARQUET_VARIANT_FIELD_NAMES_DOC,
896+
group,
897+
++orderInGroup,
898+
Width.LONG,
899+
"Explicit field names to write as Variant"
900+
);
901+
}
902+
823903
{
824904
final String group = "Keys and Headers";
825905
int orderInGroup = 0;
@@ -1418,6 +1498,28 @@ public boolean shouldRotateOnPartitionChange() {
14181498
return getBoolean(ROTATE_FILE_ON_PARTITION_CHANGE);
14191499
}
14201500

1501+
public boolean isParquetVariantEnabled() {
1502+
return getBoolean(PARQUET_VARIANT_ENABLED_CONFIG);
1503+
}
1504+
1505+
public List<String> getParquetVariantConnectNames() {
1506+
return getList(PARQUET_VARIANT_CONNECT_NAMES_CONFIG);
1507+
}
1508+
1509+
public Set<String> getParquetVariantFieldNames() {
1510+
List<String> list = getList(PARQUET_VARIANT_FIELD_NAMES_CONFIG);
1511+
if (list == null) {
1512+
return Collections.emptySet();
1513+
}
1514+
Set<String> result = new HashSet<>();
1515+
for (String s : list) {
1516+
if (s != null && !s.trim().isEmpty()) {
1517+
result.add(s.trim());
1518+
}
1519+
}
1520+
return result;
1521+
}
1522+
14211523
public enum IgnoreOrFailBehavior {
14221524
IGNORE,
14231525
FAIL;

kafka-connect-s3/src/main/java/io/confluent/connect/s3/format/parquet/ParquetRecordWriterProvider.java

Lines changed: 82 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import io.confluent.connect.s3.storage.IORecordWriter;
2626
import io.confluent.connect.s3.format.RecordViewSetter;
2727
import io.confluent.connect.s3.format.S3RetriableRecordWriter;
28+
import io.confluent.connect.s3.format.parquet.variant.JsonFieldDetector;
29+
import io.confluent.connect.s3.format.parquet.variant.VariantAwareWriteSupport;
2830
import io.confluent.connect.s3.storage.S3ParquetOutputStream;
2931
import io.confluent.connect.s3.storage.S3Storage;
3032
import io.confluent.connect.storage.format.RecordWriter;
@@ -37,12 +39,14 @@
3739
import org.apache.parquet.avro.AvroWriteSupport;
3840
import org.apache.parquet.hadoop.ParquetFileWriter;
3941
import org.apache.parquet.hadoop.ParquetWriter;
42+
import org.apache.parquet.hadoop.api.WriteSupport;
4043
import org.apache.parquet.io.OutputFile;
4144
import org.apache.parquet.io.PositionOutputStream;
4245
import org.slf4j.Logger;
4346
import org.slf4j.LoggerFactory;
4447

4548
import java.io.IOException;
49+
import java.util.Collections;
4650
import java.util.HashSet;
4751
import java.util.Set;
4852

@@ -66,12 +70,15 @@ public String getExtension() {
6670

6771
@Override
6872
public RecordWriter getRecordWriter(final S3SinkConnectorConfig conf, final String filename) {
73+
final boolean variantEnabled = conf.isParquetVariantEnabled();
74+
6975
return new S3RetriableRecordWriter(
7076
new IORecordWriter() {
7177
final String adjustedFilename = getAdjustedFilename(recordView, filename, getExtension());
7278
Schema schema = null;
7379
ParquetWriter<GenericRecord> writer;
7480
S3ParquetOutputFile s3ParquetOutputFile;
81+
Set<String> variantFieldPaths = Collections.emptySet();
7582

7683
@Override
7784
public void write(SinkRecord record) throws IOException {
@@ -80,26 +87,49 @@ public void write(SinkRecord record) throws IOException {
8087
log.info("Opening record writer for: {}", adjustedFilename);
8188
org.apache.avro.Schema avroSchema = avroData.fromConnectSchema(schema);
8289
s3ParquetOutputFile = new S3ParquetOutputFile(storage, adjustedFilename);
83-
AvroParquetWriter.Builder<GenericRecord> builder =
84-
AvroParquetWriter.<GenericRecord>builder(s3ParquetOutputFile)
85-
.withSchema(avroSchema)
86-
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
87-
.withDictionaryEncoding(true)
88-
.withCompressionCodec(storage.conf().parquetCompressionCodecName())
89-
.withPageSize(PAGE_SIZE);
90-
if (schemaHasArrayOfOptionalItems(schema, /*seenSchemas=*/null)) {
91-
// If the schema contains an array of optional items, then
92-
// it is possible that the array may have null items during the
93-
// writing process. In this case, we set a flag so as not to
94-
// incur a NullPointerException
90+
91+
boolean useOldListStructure = !schemaHasArrayOfOptionalItems(
92+
schema, /*seenSchemas=*/null
93+
);
94+
if (!useOldListStructure) {
9595
log.debug(
9696
"Setting \"" + AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE
9797
+ "\" to false because the schema contains an array "
9898
+ "with optional items"
9999
);
100-
builder.config(AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE, "false");
101100
}
102-
writer = builder.build();
101+
102+
if (variantEnabled) {
103+
variantFieldPaths = detectVariantFields(conf, schema);
104+
}
105+
106+
if (variantEnabled && !variantFieldPaths.isEmpty()) {
107+
log.info("Variant-aware Parquet writer enabled for fields: {}",
108+
variantFieldPaths);
109+
VariantAwareWriteSupport writeSupport = new VariantAwareWriteSupport(
110+
avroSchema,
111+
variantFieldPaths,
112+
useOldListStructure
113+
);
114+
writer = new VariantParquetWriterBuilder(s3ParquetOutputFile, writeSupport)
115+
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
116+
.withDictionaryEncoding(true)
117+
.withCompressionCodec(storage.conf().parquetCompressionCodecName())
118+
.withPageSize(PAGE_SIZE)
119+
.build();
120+
} else {
121+
AvroParquetWriter.Builder<GenericRecord> builder =
122+
AvroParquetWriter.<GenericRecord>builder(s3ParquetOutputFile)
123+
.withSchema(avroSchema)
124+
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
125+
.withDictionaryEncoding(true)
126+
.withCompressionCodec(storage.conf().parquetCompressionCodecName())
127+
.withPageSize(PAGE_SIZE);
128+
if (!useOldListStructure) {
129+
builder.config(AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE, "false");
130+
}
131+
writer = builder.build();
132+
}
103133
}
104134
log.trace("Sink record with view {}: {}", recordView,
105135
sinkRecordToLoggableString(record));
@@ -125,6 +155,14 @@ public void commit() throws IOException {
125155
);
126156
}
127157

158+
private static Set<String> detectVariantFields(S3SinkConnectorConfig conf, Schema schema) {
159+
JsonFieldDetector detector = new JsonFieldDetector(
160+
conf.getParquetVariantConnectNames(),
161+
conf.getParquetVariantFieldNames()
162+
);
163+
return detector.detect(schema);
164+
}
165+
128166
/**
129167
* Check if any schema (or nested schema) is an array of optional items
130168
* @param schema The shema to check
@@ -157,6 +195,36 @@ public static boolean schemaHasArrayOfOptionalItems(Schema schema, Set<Schema> s
157195
}
158196
}
159197

198+
private static class VariantParquetWriterBuilder
199+
extends ParquetWriter.Builder<GenericRecord, VariantParquetWriterBuilder> {
200+
201+
private final VariantAwareWriteSupport writeSupport;
202+
203+
VariantParquetWriterBuilder(OutputFile outputFile, VariantAwareWriteSupport writeSupport) {
204+
super(outputFile);
205+
this.writeSupport = writeSupport;
206+
}
207+
208+
@Override
209+
protected VariantParquetWriterBuilder self() {
210+
return this;
211+
}
212+
213+
@Override
214+
protected WriteSupport<GenericRecord> getWriteSupport(
215+
org.apache.hadoop.conf.Configuration conf
216+
) {
217+
return writeSupport;
218+
}
219+
220+
@Override
221+
protected WriteSupport<GenericRecord> getWriteSupport(
222+
org.apache.parquet.conf.ParquetConfiguration conf
223+
) {
224+
return writeSupport;
225+
}
226+
}
227+
160228
private static class S3ParquetOutputFile implements OutputFile {
161229
private static final int DEFAULT_BLOCK_SIZE = 0;
162230
private S3Storage storage;

0 commit comments

Comments
 (0)