From 26243512886e078513fef7d7f5b7e3a5bdb60eee Mon Sep 17 00:00:00 2001 From: stakafum Date: Fri, 30 Jun 2017 19:37:57 +0900 Subject: [PATCH 1/5] FLUME-3327 add useKafkaHeader option to convert flume headers as kafka ones transparently in KafkaSource, KafkaChannel, and KafkaSink --- .../flume/channel/kafka/KafkaChannel.java | 48 +++++-- .../kafka/KafkaChannelConfiguration.java | 3 + .../channel/kafka/TestKafkaChannelBase.java | 18 ++- .../channel/kafka/TestParseAsFlumeEvent.java | 8 +- .../channel/kafka/TestUseKafkaHeader.java | 134 ++++++++++++++++++ flume-ng-doc/sphinx/FlumeUserGuide.rst | 3 + .../apache/flume/sink/kafka/KafkaSink.java | 24 +++- .../flume/sink/kafka/KafkaSinkConstants.java | 3 + .../flume/sink/kafka/TestConstants.java | 1 + .../flume/sink/kafka/TestKafkaSink.java | 61 +++++++- .../flume/source/kafka/KafkaSource.java | 25 +++- .../source/kafka/KafkaSourceConstants.java | 3 + .../kafka/KafkaSourceEmbeddedKafka.java | 12 +- .../flume/source/kafka/TestKafkaSource.java | 118 ++++++++++----- 14 files changed, 400 insertions(+), 61 deletions(-) create mode 100644 flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestUseKafkaHeader.java diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java index 852b4bd586..4289f9183c 100644 --- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java +++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java @@ -53,6 +53,9 @@ import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.security.JaasUtils; import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; @@ -72,6 +75,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Properties; import java.util.UUID; import java.util.concurrent.Future; @@ -97,6 +101,7 @@ public class KafkaChannel extends BasicChannelSemantics { private AtomicReference topic = new AtomicReference(); private boolean parseAsFlumeEvent = DEFAULT_PARSE_AS_FLUME_EVENT; + private boolean useKafkaHeader = DEFAULT_USE_KAFKA_HEADER; private String zookeeperConnect = null; private String topicStr = DEFAULT_TOPIC; private String groupId = DEFAULT_GROUP_ID; @@ -193,6 +198,7 @@ public void configure(Context ctx) { setConsumerProps(ctx, bootStrapServers); parseAsFlumeEvent = ctx.getBoolean(PARSE_AS_FLUME_EVENT, DEFAULT_PARSE_AS_FLUME_EVENT); + useKafkaHeader = ctx.getBoolean(USE_KAFKA_HEADER, DEFAULT_USE_KAFKA_HEADER); pollTimeout = ctx.getLong(POLL_TIMEOUT, DEFAULT_POLL_TIMEOUT); staticPartitionId = ctx.getInteger(STATIC_PARTITION_CONF); @@ -453,7 +459,11 @@ protected void doPut(Event event) throws InterruptedException { partitionId = Integer.parseInt(headerVal); } } - if (partitionId != null) { + if (useKafkaHeader) { + producerRecords.get().add( + new ProducerRecord(topic.get(), partitionId, key, + serializeValue(event, parseAsFlumeEvent), toKafkaHeaders(event.getHeaders()))); + } else if (partitionId != null) { producerRecords.get().add( new ProducerRecord(topic.get(), partitionId, key, serializeValue(event, parseAsFlumeEvent))); @@ -507,7 +517,7 @@ protected Event doTake() throws InterruptedException { } if (consumerAndRecords.get().recordIterator.hasNext()) { ConsumerRecord record = consumerAndRecords.get().recordIterator.next(); - e = deserializeValue(record.value(), parseAsFlumeEvent); + e = deserializeEvent(record, parseAsFlumeEvent, useKafkaHeader); TopicPartition tp = new TopicPartition(record.topic(), record.partition()); OffsetAndMetadata oam = new OffsetAndMetadata(record.offset() + 1, batchUUID); consumerAndRecords.get().saveOffsets(tp,oam); @@ -636,23 +646,26 @@ private byte[] serializeValue(Event event, boolean parseAsFlumeEvent) throws IOE return bytes; } - private Event deserializeValue(byte[] value, boolean parseAsFlumeEvent) throws IOException { - Event e; - if (parseAsFlumeEvent) { + private Event deserializeEvent(ConsumerRecord record, boolean parseAsFlumeEvent, boolean useKafkaHeader) throws IOException { + byte[] body; + if(parseAsFlumeEvent) { ByteArrayInputStream in = - new ByteArrayInputStream(value); + new ByteArrayInputStream(record.value()); decoder = DecoderFactory.get().directBinaryDecoder(in, decoder); if (!reader.isPresent()) { reader = Optional.of( new SpecificDatumReader(AvroFlumeEvent.class)); } AvroFlumeEvent event = reader.get().read(null, decoder); - e = EventBuilder.withBody(event.getBody().array(), - toStringMap(event.getHeaders())); + body = event.getBody(); } else { - e = EventBuilder.withBody(value, Collections.EMPTY_MAP); + body = record.value(); + } + if(useKafkaHeader) { + return EventBuilder.withBody(body, toStringMap(record.headers())); + } else { + return EventBuilder.withBody(body); } - return e; } } @@ -678,6 +691,21 @@ private static Map toStringMap(Map c } return stringMap; } + private static Headers toKafkaHeaders(Map headers) { + Headers kafkaHeaders = new RecordHeaders(); + for (Entry header : headers.entrySet()) { + kafkaHeaders.add(header.getKey(), header.getValue().getBytes()); + } + return kafkaHeaders; + } + + private static Map toStringMap(Headers headers) { + Map stringMap = new HashMap(); + for (Header header : headers) { + stringMap.put(header.key(), new String(header.value())); + } + return stringMap; + } /* Object to store our consumer */ private class ConsumerAndRecords { diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java index ad5a15bf8b..1c75cd3000 100644 --- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java +++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java @@ -70,4 +70,7 @@ public class KafkaChannelConfiguration { public static final String READ_SMALLEST_OFFSET = "readSmallestOffset"; public static final boolean DEFAULT_READ_SMALLEST_OFFSET = false; + + public static final String USE_KAFKA_HEADER = "useKafkaHeader"; + public static final boolean DEFAULT_USE_KAFKA_HEADER = false; } diff --git a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannelBase.java b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannelBase.java index e1279a39c5..c874d8cb4d 100644 --- a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannelBase.java +++ b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannelBase.java @@ -47,6 +47,8 @@ import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.BOOTSTRAP_SERVERS_CONFIG; import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.KAFKA_CONSUMER_PREFIX; import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.PARSE_AS_FLUME_EVENT; +import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.USE_KAFKA_HEADER; +import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_USE_KAFKA_HEADER; import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.TOPIC_CONFIG; public class TestKafkaChannelBase { @@ -96,23 +98,33 @@ static void deleteTopic(String topicName) { testUtil.deleteTopic(topicName); } - KafkaChannel startChannel(boolean parseAsFlume) throws Exception { - Context context = prepareDefaultContext(parseAsFlume); + KafkaChannel startChannel(boolean parseAsFlume, boolean useKafkaHeader) throws Exception { + Context context = prepareDefaultContext(parseAsFlume, useKafkaHeader); KafkaChannel channel = createChannel(context); channel.start(); return channel; } - Context prepareDefaultContext(boolean parseAsFlume) { + KafkaChannel startChannel(boolean parseAsFlume) throws Exception { + return startChannel(parseAsFlume, DEFAULT_USE_KAFKA_HEADER); + } + + + Context prepareDefaultContext(boolean parseAsFlume, boolean useKafkaHeader) { // Prepares a default context with Kafka Server Properties Context context = new Context(); context.put(BOOTSTRAP_SERVERS_CONFIG, testUtil.getKafkaServerUrl()); context.put(PARSE_AS_FLUME_EVENT, String.valueOf(parseAsFlume)); + context.put(USE_KAFKA_HEADER, String.valueOf(useKafkaHeader)); context.put(TOPIC_CONFIG, topic); context.put(KAFKA_CONSUMER_PREFIX + "max.poll.interval.ms", "10000"); return context; } + + Context prepareDefaultContext(boolean parseAsFlume) { + return prepareDefaultContext(parseAsFlume, DEFAULT_USE_KAFKA_HEADER); + } KafkaChannel createChannel(Context context) throws Exception { final KafkaChannel channel = new KafkaChannel(); diff --git a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestParseAsFlumeEvent.java b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestParseAsFlumeEvent.java index 6baad77055..3d359bdbff 100644 --- a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestParseAsFlumeEvent.java +++ b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestParseAsFlumeEvent.java @@ -23,6 +23,7 @@ import org.apache.flume.event.EventBuilder; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.internals.RecordHeaders; import org.junit.Assert; import org.junit.Test; @@ -64,9 +65,12 @@ private void doParseAsFlumeEventFalse(Boolean checkHeaders) throws Exception { KafkaProducer producer = new KafkaProducer<>(props); for (int i = 0; i < 50; i++) { + RecordHeaders headers = new RecordHeaders(); + headers.add(KEY_HEADER, (String.valueOf(i) + "-header").getBytes()); ProducerRecord data = - new ProducerRecord<>(topic, String.valueOf(i) + "-header", - String.valueOf(i).getBytes()); + new ProducerRecord<>(topic, null, String.valueOf(i) + "-header", + String.valueOf(i).getBytes(), headers); + producer.send(data).get(); } ExecutorCompletionService submitterSvc = new diff --git a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestUseKafkaHeader.java b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestUseKafkaHeader.java new file mode 100644 index 0000000000..788c143aee --- /dev/null +++ b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestUseKafkaHeader.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.channel.kafka; + +import org.apache.flume.Event; +import org.apache.flume.Transaction; +import org.apache.flume.event.EventBuilder; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.Executors; + +import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.KEY_HEADER; + +public class TestUseKafkaHeader extends TestKafkaChannelBase { + + @Test + public void testUseKafkaHeaderFalse() throws Exception { + doUseKafkaHeaderFalse(false); + } + + @Test + public void testUseKafkaHeaderFalseCheckHeader() throws Exception { + doUseKafkaHeaderFalse(true); + } + + @Test + public void tesUserKafkaHeaderFalseAsSource() throws Exception { + doUseKafkaHeaderFalseAsSource(false); + } + + @Test + public void testUseKafkaHeaderFalseAsSourceCheckHeader() throws Exception { + doUseKafkaHeaderFalseAsSource(true); + } + + private void doUseKafkaHeaderFalse(Boolean checkHeaders) throws Exception { + final KafkaChannel channel = startChannel(false, true); + Properties props = channel.getProducerProps(); + KafkaProducer producer = new KafkaProducer<>(props); + + for (int i = 0; i < 50; i++) { + ProducerRecord data = + new ProducerRecord<>(topic, String.valueOf(i) + "-header", + String.valueOf(i).getBytes()); + producer.send(data).get(); + } + ExecutorCompletionService submitterSvc = new + ExecutorCompletionService<>(Executors.newCachedThreadPool()); + List events = pullEvents(channel, submitterSvc, 50, false, false); + wait(submitterSvc, 5); + Map finals = new HashMap<>(); + for (int i = 0; i < 50; i++) { + finals.put(Integer.parseInt(new String(events.get(i).getBody())), + events.get(i).getHeaders().get(KEY_HEADER)); + } + for (int i = 0; i < 50; i++) { + Assert.assertTrue(finals.keySet().contains(i)); + if (checkHeaders) { + Assert.assertTrue(finals.containsValue(String.valueOf(i) + "-header")); + } + finals.remove(i); + } + Assert.assertTrue(finals.isEmpty()); + channel.stop(); + } + + /** + * Like the previous test but here we write to the channel like a Flume source would do + * to verify that the events are written as text and not as an Avro object + * + * @throws Exception + */ + private void doUseKafkaHeaderFalseAsSource(Boolean checkHeaders) throws Exception { + final KafkaChannel channel = startChannel(false); + + List msgs = new ArrayList<>(); + Map headers = new HashMap<>(); + for (int i = 0; i < 50; i++) { + msgs.add(String.valueOf(i)); + } + Transaction tx = channel.getTransaction(); + tx.begin(); + for (int i = 0; i < msgs.size(); i++) { + headers.put(KEY_HEADER, String.valueOf(i) + "-header"); + channel.put(EventBuilder.withBody(msgs.get(i).getBytes(), headers)); + } + tx.commit(); + ExecutorCompletionService submitterSvc = + new ExecutorCompletionService<>(Executors.newCachedThreadPool()); + List events = pullEvents(channel, submitterSvc, 50, false, false); + wait(submitterSvc, 5); + Map finals = new HashMap<>(); + for (int i = 0; i < 50; i++) { + finals.put(Integer.parseInt(new String(events.get(i).getBody())), + events.get(i).getHeaders().get(KEY_HEADER)); + } + for (int i = 0; i < 50; i++) { + Assert.assertTrue(finals.keySet().contains(i)); + if (checkHeaders) { + Assert.assertTrue(finals.containsValue(String.valueOf(i) + "-header")); + } + finals.remove(i); + } + Assert.assertTrue(finals.isEmpty()); + channel.stop(); + } +} diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index cccb0283a1..1e417f821d 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -1483,6 +1483,7 @@ topicHeader topic Defines the name of the header from, if the ``setTopicHeader`` property is set to ``true``. Care should be taken if combining with the Kafka Sink ``topicHeader`` property so as to avoid sending the message back to the same topic in a loop. +useKafkaHeader false Set true to read Kafka record headers and convert them as event header. kafka.consumer.security.protocol PLAINTEXT Set to SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using some level of security. See below for additional info on secure setup. *more consumer security props* If using SASL_PLAINTEXT, SASL_SSL or SSL refer to `Kafka security `_ for additional properties that need to be set on consumer. @@ -3166,6 +3167,7 @@ useFlumeEventFormat false By default events are p true to store events as the Flume Avro binary format. Used in conjunction with the same property on the KafkaSource or with the parseAsFlumeEvent property on the Kafka Channel this will preserve any Flume headers for the producing side. +useKafkaHeader false Set to true to store event headers as Kafka record headers. defaultPartitionId -- Specifies a Kafka partition ID (integer) for all events in this channel to be sent to, unless overriden by ``partitionIdHeader``. By default, if this property is not set, events will be distributed by the Kafka Producer's partitioner - including by ``key`` if specified (or by a @@ -3575,6 +3577,7 @@ parseAsFlumeEvent true Expecting A This should be true if Flume source is writing to the channel and false if other producers are writing into the topic that the channel is using. Flume source messages to Kafka can be parsed outside of Flume by using org.apache.flume.source.avro.AvroFlumeEvent provided by the flume-ng-sdk artifact +useKafkaHeader false Set true to use Kafka record headers as Flume event ones transparently. pollTimeout 500 The amount of time(in milliseconds) to wait in the "poll()" call of the consumer. https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long) defaultPartitionId -- Specifies a Kafka partition ID (integer) for all events in this channel to be sent to, unless diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java index d32753f8e1..bc281fb6c4 100644 --- a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java @@ -43,6 +43,8 @@ Licensed to the Apache Software Foundation (ASF) under one or more import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,6 +55,7 @@ Licensed to the Apache Software Foundation (ASF) under one or more import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Properties; import java.util.concurrent.Future; @@ -72,7 +75,6 @@ Licensed to the Apache Software Foundation (ASF) under one or more import static org.apache.flume.sink.kafka.KafkaSinkConstants.KEY_SERIALIZER_KEY; import static org.apache.flume.sink.kafka.KafkaSinkConstants.MESSAGE_SERIALIZER_KEY; - /** * A Flume Sink that can publish messages to Kafka. * This is a general implementation that can be used with any Flume agent and @@ -116,6 +118,7 @@ public class KafkaSink extends AbstractSink implements Configurable, BatchSizeSu private List> kafkaFutures; private KafkaSinkCounter counter; private boolean useAvroEventFormat; + private boolean useKafkaHeader; private String partitionHeader = null; private Integer staticPartitionId = null; private boolean allowTopicOverride; @@ -203,7 +206,6 @@ public Status process() throws EventDeliveryException { // create a message and add to buffer long startTime = System.currentTimeMillis(); - Integer partitionId = null; try { ProducerRecord record; @@ -217,7 +219,10 @@ public Status process() throws EventDeliveryException { partitionId = Integer.parseInt(headerVal); } } - if (partitionId != null) { + if (useKafkaHeader) { + record = new ProducerRecord(eventTopic, partitionId, eventKey, + serializeEvent(event, useAvroEventFormat), toKafkaHeaders(headers)); + } else if (partitionId != null) { record = new ProducerRecord(eventTopic, partitionId, eventKey, serializeEvent(event, useAvroEventFormat)); } else { @@ -329,6 +334,9 @@ public void configure(Context context) { useAvroEventFormat = context.getBoolean(KafkaSinkConstants.AVRO_EVENT, KafkaSinkConstants.DEFAULT_AVRO_EVENT); + useKafkaHeader = context.getBoolean(KafkaSinkConstants.KAFKA_HEADER, + KafkaSinkConstants.DEFAULT_KAFKA_HEADER); + partitionHeader = context.getString(KafkaSinkConstants.PARTITION_HEADER_NAME); staticPartitionId = context.getInteger(KafkaSinkConstants.STATIC_PARTITION_CONF); @@ -425,7 +433,15 @@ private void setProducerProps(Context context, String bootStrapServers) { KafkaSSLUtil.addGlobalSSLParameters(kafkaProps); } - + + private Headers toKafkaHeaders(Map headers) { + Headers kafkaHeaders = new RecordHeaders(); + for (Entry header : headers.entrySet()) { + kafkaHeaders.add(header.getKey(), header.getValue().getBytes()); + } + return kafkaHeaders; + } + protected Properties getKafkaProps() { return kafkaProps; } diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java index ffca3df72d..af29e4c6a4 100644 --- a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java @@ -41,6 +41,9 @@ public class KafkaSinkConstants { public static final String AVRO_EVENT = "useFlumeEventFormat"; public static final boolean DEFAULT_AVRO_EVENT = false; + public static final String KAFKA_HEADER = "useKafkaHeader"; + public static final boolean DEFAULT_KAFKA_HEADER = false; + public static final String PARTITION_HEADER_NAME = "partitionIdHeader"; public static final String STATIC_PARTITION_CONF = "defaultPartitionId"; diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java index 8d6dce74c8..79665e3ca2 100644 --- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java @@ -21,6 +21,7 @@ Licensed to the Apache Software Foundation (ASF) under one or more public class TestConstants { public static final String STATIC_TOPIC = "static-topic"; public static final String HEADER_TOPIC = "%{header1}-topic"; + public static final String KAFKA_HEADER_TOPIC = "kafka-header-topic"; public static final String CUSTOM_KEY = "custom-key"; public static final String CUSTOM_TOPIC = "custom-topic"; public static final String HEADER_1_VALUE = "test-avro-header"; diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java index 5a94c82db9..e83df50dbd 100644 --- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java @@ -19,6 +19,11 @@ Licensed to the Apache Software Foundation (ASF) under one or more package org.apache.flume.sink.kafka; import com.google.common.base.Charsets; + +import kafka.admin.AdminUtils; +import kafka.admin.RackAwareMode; +import kafka.utils.ZkUtils; + import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.specific.SpecificDatumReader; @@ -43,6 +48,10 @@ Licensed to the Apache Software Foundation (ASF) under one or more import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.header.internals.RecordHeaders; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -51,6 +60,7 @@ Licensed to the Apache Software Foundation (ASF) under one or more import java.io.ByteArrayInputStream; import java.io.IOException; +import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -66,6 +76,7 @@ Licensed to the Apache Software Foundation (ASF) under one or more import static org.apache.flume.sink.kafka.KafkaSinkConstants.BROKER_LIST_FLUME_KEY; import static org.apache.flume.sink.kafka.KafkaSinkConstants.DEFAULT_KEY_SERIALIZER; import static org.apache.flume.sink.kafka.KafkaSinkConstants.DEFAULT_TOPIC; +import static org.apache.flume.sink.kafka.KafkaSinkConstants.KAFKA_HEADER; import static org.apache.flume.sink.kafka.KafkaSinkConstants.KAFKA_PREFIX; import static org.apache.flume.sink.kafka.KafkaSinkConstants.KAFKA_PRODUCER_PREFIX; import static org.apache.flume.sink.kafka.KafkaSinkConstants.OLD_BATCH_SIZE; @@ -87,11 +98,12 @@ public class TestKafkaSink { @BeforeClass public static void setup() { testUtil.prepare(); - List topics = new ArrayList(3); + List topics = new ArrayList(5); topics.add(DEFAULT_TOPIC); topics.add(TestConstants.STATIC_TOPIC); topics.add(TestConstants.CUSTOM_TOPIC); topics.add(TestConstants.HEADER_1_VALUE + "-topic"); + topics.add(TestConstants.KAFKA_HEADER_TOPIC); testUtil.initTopicList(topics); } @@ -187,6 +199,15 @@ private void checkMessageArrived(String msg, String topic) { assertEquals(msg, consumerRecord.value()); } + private void checkMessageAndHeaderArrived(String msg, String topic, Headers headers) { + ConsumerRecords recs = pollConsumerRecords(topic); + assertNotNull(recs); + assertTrue(recs.count() > 0); + ConsumerRecord consumerRecord = (ConsumerRecord) recs.iterator().next(); + assertEquals(msg, consumerRecord.value()); + assertEquals(headers, consumerRecord.headers()); + } + @Test public void testStaticTopic() { Context context = prepareDefaultContext(); @@ -330,7 +351,7 @@ public void testReplaceSubStringOfTopicWithHeaders() { kafkaSink.start(); String msg = "test-replace-substring-of-topic-with-headers"; - Map headers = new HashMap<>(); + Map headers = new HashMap(); headers.put(TestConstants.HEADER_1_KEY, TestConstants.HEADER_1_VALUE); Transaction tx = memoryChannel.getTransaction(); tx.begin(); @@ -426,6 +447,42 @@ private ConsumerRecords pollConsumerRecords(String topic, int ma return recs; } + @Test + public void testKafkaHeaders() { + Sink kafkaSink = new KafkaSink(); + Context context = prepareDefaultContext(); + context.put(TOPIC_CONFIG, TestConstants.KAFKA_HEADER_TOPIC); + context.put(KAFKA_HEADER, "true"); + Configurables.configure(kafkaSink, context); + Channel memoryChannel = new MemoryChannel(); + Configurables.configure(memoryChannel, context); + kafkaSink.setChannel(memoryChannel); + kafkaSink.start(); + + String msg = "test-kafka-headers"; + Map headers = new HashMap(); + headers.put(TestConstants.HEADER_1_KEY, TestConstants.HEADER_1_VALUE); + Transaction tx = memoryChannel.getTransaction(); + tx.begin(); + Event event = EventBuilder.withBody(msg.getBytes(), headers); + memoryChannel.put(event); + tx.commit(); + tx.close(); + + try { + Sink.Status status = kafkaSink.process(); + if (status == Sink.Status.BACKOFF) { + fail("Error Occurred"); + } + } catch (EventDeliveryException ex) { + // ignore + } + + checkMessageAndHeaderArrived(msg, TestConstants.KAFKA_HEADER_TOPIC, new RecordHeaders(new Header[]{ + new RecordHeader(TestConstants.HEADER_1_KEY, TestConstants.HEADER_1_VALUE.getBytes())})); + + } + @Test public void testEmptyChannel() throws EventDeliveryException { Sink kafkaSink = new KafkaSink(); diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java index b02285d913..9a0423cbea 100644 --- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java +++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java @@ -17,6 +17,7 @@ package org.apache.flume.source.kafka; import java.io.ByteArrayInputStream; +import java.io.IOException; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; @@ -63,6 +64,8 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.security.JaasUtils; import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; @@ -124,6 +127,7 @@ public class KafkaSource extends AbstractPollableSource private BinaryDecoder decoder = null; private boolean useAvroEventFormat; + private boolean useKafkaHeader; private int batchUpperLimit; private int maxBatchDurationMillis; @@ -253,7 +257,7 @@ protected Status doProcess() throws EventDeliveryException { } else { eventBody = message.value(); headers.clear(); - headers = new HashMap(4); + headers = createHeaders(message, useKafkaHeader); } // Add headers to event (timestamp, topic, partition, key) only if they don't exist @@ -332,6 +336,14 @@ protected Status doProcess() throws EventDeliveryException { return Status.BACKOFF; } } + + private static Map createHeaders(ConsumerRecord message, boolean useKafkaHeader) { + if (useKafkaHeader) { + return toStringMap(message.headers()); + } else { + return new HashMap(4); + } + } /** * We configure the source and generate properties for the Kafka Consumer @@ -376,6 +388,9 @@ protected void doConfigure(Context context) throws FlumeException { useAvroEventFormat = context.getBoolean(KafkaSourceConstants.AVRO_EVENT, KafkaSourceConstants.DEFAULT_AVRO_EVENT); + useKafkaHeader = context.getBoolean(KafkaSourceConstants.USE_KAFKA_HEADER, + KafkaSourceConstants.DEFAULT_USE_KAFKA_HEADER); + if (log.isDebugEnabled()) { log.debug(KafkaSourceConstants.AVRO_EVENT + " set to: {}", useAvroEventFormat); } @@ -515,6 +530,14 @@ private static Map toStringMap(Map c return stringMap; } + private static Map toStringMap(Headers kafkaHeaders) { + Map stringMap = new HashMap(); + for (Header kafkaHeader : kafkaHeaders) { + stringMap.put(kafkaHeader.key(), new String(kafkaHeader.value())); + } + return stringMap; + } + Subscriber getSubscriber() { return subscriber; } diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java index 0e15e73800..3ad120238b 100644 --- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java +++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java @@ -43,6 +43,9 @@ public class KafkaSourceConstants { public static final String AVRO_EVENT = "useFlumeEventFormat"; public static final boolean DEFAULT_AVRO_EVENT = false; + public static final String USE_KAFKA_HEADER = "useKafkaHeader"; + public static final boolean DEFAULT_USE_KAFKA_HEADER = false; + /* Old Properties */ public static final String ZOOKEEPER_CONNECT_FLUME_KEY = "zookeeperConnect"; public static final String TOPIC = "topic"; diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java index 56a582a188..5ac66241eb 100644 --- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java +++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java @@ -25,6 +25,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.StringSerializer; @@ -112,8 +113,17 @@ public void produce(String topic, String k, String v) { produce(topic, k, v.getBytes()); } + public void produce(String topic, String k, String v, Headers headers) { + produce(topic, k, v.getBytes(), headers); + } + public void produce(String topic, String k, byte[] v) { - ProducerRecord rec = new ProducerRecord(topic, k, v); + produce(topic, k, v, null); + } + + public void produce(String topic, String k, byte[] v, Headers headers) { + ProducerRecord rec = + new ProducerRecord(topic, null, k, v, headers); try { producer.send(rec).get(); } catch (InterruptedException e) { diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java index d866c98ed5..426cc3f77d 100644 --- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java +++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java @@ -17,11 +17,43 @@ package org.apache.flume.source.kafka; -import com.google.common.base.Charsets; -import com.google.common.collect.Lists; -import junit.framework.Assert; +import static org.apache.flume.source.kafka.KafkaSourceConstants.AVRO_EVENT; +import static org.apache.flume.source.kafka.KafkaSourceConstants.BATCH_DURATION_MS; +import static org.apache.flume.source.kafka.KafkaSourceConstants.BATCH_SIZE; +import static org.apache.flume.source.kafka.KafkaSourceConstants.BOOTSTRAP_SERVERS; +import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_AUTO_COMMIT; +import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_TOPIC_HEADER; +import static org.apache.flume.source.kafka.KafkaSourceConstants.KAFKA_CONSUMER_PREFIX; +import static org.apache.flume.source.kafka.KafkaSourceConstants.USE_KAFKA_HEADER; +import static org.apache.flume.source.kafka.KafkaSourceConstants.OLD_GROUP_ID; +import static org.apache.flume.source.kafka.KafkaSourceConstants.PARTITION_HEADER; +import static org.apache.flume.source.kafka.KafkaSourceConstants.TIMESTAMP_HEADER; +import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPIC; +import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPICS; +import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPICS_REGEX; +import static org.apache.flume.source.kafka.KafkaSourceConstants.ZOOKEEPER_CONNECT_FLUME_KEY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.regex.Pattern; -import kafka.zk.KafkaZkClient; import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.EncoderFactory; import org.apache.avro.specific.SpecificDatumWriter; @@ -38,7 +70,6 @@ import org.apache.flume.source.avro.AvroFlumeEvent; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.common.errors.TopicExistsException; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.KafkaProducer; @@ -46,6 +77,10 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.TopicExistsException; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.security.JaasUtils; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.utils.Time; @@ -61,39 +96,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.regex.Pattern; +import com.google.common.base.Charsets; +import com.google.common.collect.Lists; + +import junit.framework.Assert; +import kafka.zk.KafkaZkClient; + -import static org.apache.flume.source.kafka.KafkaSourceConstants.AVRO_EVENT; -import static org.apache.flume.source.kafka.KafkaSourceConstants.BATCH_DURATION_MS; -import static org.apache.flume.source.kafka.KafkaSourceConstants.BATCH_SIZE; -import static org.apache.flume.source.kafka.KafkaSourceConstants.BOOTSTRAP_SERVERS; -import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_AUTO_COMMIT; -import static org.apache.flume.source.kafka.KafkaSourceConstants.KAFKA_CONSUMER_PREFIX; -import static org.apache.flume.source.kafka.KafkaSourceConstants.OLD_GROUP_ID; -import static org.apache.flume.source.kafka.KafkaSourceConstants.PARTITION_HEADER; -import static org.apache.flume.source.kafka.KafkaSourceConstants.TIMESTAMP_HEADER; -import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPIC; -import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPICS; -import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPICS_REGEX; -import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_TOPIC_HEADER; -import static org.apache.flume.source.kafka.KafkaSourceConstants.ZOOKEEPER_CONNECT_FLUME_KEY; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; public class TestKafkaSource { private static final Logger log = LoggerFactory.getLogger(TestKafkaSource.class); @@ -106,6 +115,7 @@ public class TestKafkaSource { private final List usedTopics = new ArrayList<>(); private String topic0; private String topic1; + private String topic2; @BeforeClass @@ -113,7 +123,6 @@ public static void startKafkaServer() { kafkaServer = new KafkaSourceEmbeddedKafka(null); startupCheck(); } - @SuppressWarnings("unchecked") @Before public void setup() throws Exception { @@ -125,6 +134,9 @@ public void setup() throws Exception { topic1 = findUnusedTopic(); kafkaServer.createTopic(topic1, 3); usedTopics.add(topic1); + topic2 = findUnusedTopic(); + kafkaServer.createTopic(topic2, 1); + usedTopics.add(topic2); } catch (TopicExistsException e) { //do nothing e.printStackTrace(); @@ -697,6 +709,36 @@ public void testAvroEvent() throws InterruptedException, EventDeliveryException, } + @Test + public void testKafkaHeader() throws EventDeliveryException, + SecurityException, NoSuchFieldException, IllegalArgumentException, + IllegalAccessException, InterruptedException { + context.put(TOPICS, topic0); + context.put(BATCH_SIZE, "1"); + context.put(USE_KAFKA_HEADER, "true"); + kafkaSource.configure(context); + startKafkaSource(); + + Thread.sleep(500L); + + kafkaServer.produce(topic0, "", "kafka header", + new RecordHeaders(new Header[]{new RecordHeader("k1", "v1".getBytes())})); + + Thread.sleep(500L); + Assert.assertEquals(Status.READY, kafkaSource.process()); + Assert.assertEquals(Status.BACKOFF, kafkaSource.process()); + Assert.assertEquals(1, events.size()); + + Assert.assertEquals("kafka header", new String(events.get(0).getBody(), + Charsets.UTF_8)); + Assert.assertEquals(6, events.get(0).getHeaders().size()); + Assert.assertEquals(topic0, events.get(0).getHeaders().get("topic")); + Assert.assertEquals("0", events.get(0).getHeaders().get("partition")); + Assert.assertEquals("0", events.get(0).getHeaders().get("offset")); + Assert.assertTrue(events.get(0).getHeaders().containsKey("key")); + Assert.assertTrue(events.get(0).getHeaders().containsKey("timestamp")); + } + @Test public void testBootstrapLookup() { Context context = new Context(); From dbc3773bda8f19ef2f5deacb0eeba823270638aa Mon Sep 17 00:00:00 2001 From: stakafum Date: Wed, 3 Apr 2019 20:05:42 +0900 Subject: [PATCH 2/5] fromatted --- .../org/apache/flume/sink/kafka/TestKafkaSink.java | 12 ++++-------- .../org/apache/flume/source/kafka/KafkaSource.java | 4 ++-- .../apache/flume/source/kafka/TestKafkaSource.java | 6 +----- 3 files changed, 7 insertions(+), 15 deletions(-) diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java index e83df50dbd..fd76c5a168 100644 --- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java @@ -19,11 +19,6 @@ Licensed to the Apache Software Foundation (ASF) under one or more package org.apache.flume.sink.kafka; import com.google.common.base.Charsets; - -import kafka.admin.AdminUtils; -import kafka.admin.RackAwareMode; -import kafka.utils.ZkUtils; - import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.specific.SpecificDatumReader; @@ -60,7 +55,6 @@ Licensed to the Apache Software Foundation (ASF) under one or more import java.io.ByteArrayInputStream; import java.io.IOException; -import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -478,8 +472,10 @@ public void testKafkaHeaders() { // ignore } - checkMessageAndHeaderArrived(msg, TestConstants.KAFKA_HEADER_TOPIC, new RecordHeaders(new Header[]{ - new RecordHeader(TestConstants.HEADER_1_KEY, TestConstants.HEADER_1_VALUE.getBytes())})); + checkMessageAndHeaderArrived(msg, TestConstants.KAFKA_HEADER_TOPIC, + new RecordHeaders(new Header[]{ + new RecordHeader(TestConstants.HEADER_1_KEY, + TestConstants.HEADER_1_VALUE.getBytes())})); } diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java index 9a0423cbea..ed35318735 100644 --- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java +++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java @@ -17,7 +17,6 @@ package org.apache.flume.source.kafka; import java.io.ByteArrayInputStream; -import java.io.IOException; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; @@ -337,7 +336,8 @@ protected Status doProcess() throws EventDeliveryException { } } - private static Map createHeaders(ConsumerRecord message, boolean useKafkaHeader) { + private static Map createHeaders( + ConsumerRecord message, boolean useKafkaHeader) { if (useKafkaHeader) { return toStringMap(message.headers()); } else { diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java index 426cc3f77d..1f468f8dd3 100644 --- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java +++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java @@ -47,11 +47,9 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.Set; import java.util.regex.Pattern; import org.apache.avro.io.BinaryEncoder; @@ -102,8 +100,6 @@ import junit.framework.Assert; import kafka.zk.KafkaZkClient; - - public class TestKafkaSource { private static final Logger log = LoggerFactory.getLogger(TestKafkaSource.class); @@ -112,7 +108,7 @@ public class TestKafkaSource { private Context context; private List events; - private final List usedTopics = new ArrayList<>(); + private final List usedTopics = new ArrayList(); private String topic0; private String topic1; private String topic2; From 42c5cf836bb00b6bf79ca7458486f843df214472 Mon Sep 17 00:00:00 2001 From: stakafum Date: Thu, 4 Apr 2019 16:20:23 +0900 Subject: [PATCH 3/5] bug fix to add charset to getByte() --- .../src/main/java/org/apache/flume/sink/kafka/KafkaSink.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java index bc281fb6c4..fad9ad202a 100644 --- a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java @@ -18,6 +18,7 @@ Licensed to the Apache Software Foundation (ASF) under one or more package org.apache.flume.sink.kafka; +import com.google.common.base.Charsets; import com.google.common.base.Optional; import com.google.common.base.Throwables; import org.apache.avro.io.BinaryEncoder; @@ -437,7 +438,7 @@ private void setProducerProps(Context context, String bootStrapServers) { private Headers toKafkaHeaders(Map headers) { Headers kafkaHeaders = new RecordHeaders(); for (Entry header : headers.entrySet()) { - kafkaHeaders.add(header.getKey(), header.getValue().getBytes()); + kafkaHeaders.add(header.getKey(), header.getValue().getBytes(Charsets.UTF_8)); } return kafkaHeaders; } From ebb8cb6e80306fdaf1f194e4a2f5ec362d4afe14 Mon Sep 17 00:00:00 2001 From: stakafum Date: Thu, 4 Apr 2019 19:24:00 +0900 Subject: [PATCH 4/5] bug fix --- .../main/java/org/apache/flume/channel/kafka/KafkaChannel.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java index 4289f9183c..14b63f4135 100644 --- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java +++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java @@ -657,7 +657,7 @@ private Event deserializeEvent(ConsumerRecord record, boolean pa new SpecificDatumReader(AvroFlumeEvent.class)); } AvroFlumeEvent event = reader.get().read(null, decoder); - body = event.getBody(); + body = event.getBody().array(); } else { body = record.value(); } From cd69e5036e34b580a8f115aee94b14189119b199 Mon Sep 17 00:00:00 2001 From: stakafum Date: Fri, 5 Apr 2019 15:14:31 +0900 Subject: [PATCH 5/5] bug fix and formatted --- .../flume/channel/kafka/KafkaChannel.java | 43 +++++++++++-------- .../channel/kafka/TestUseKafkaHeader.java | 2 - 2 files changed, 25 insertions(+), 20 deletions(-) diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java index 14b63f4135..5572ba1838 100644 --- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java +++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java @@ -462,7 +462,8 @@ protected void doPut(Event event) throws InterruptedException { if (useKafkaHeader) { producerRecords.get().add( new ProducerRecord(topic.get(), partitionId, key, - serializeValue(event, parseAsFlumeEvent), toKafkaHeaders(event.getHeaders()))); + serializeValue(event, parseAsFlumeEvent), + toKafkaHeaders(event.getHeaders()))); } else if (partitionId != null) { producerRecords.get().add( new ProducerRecord(topic.get(), partitionId, key, @@ -646,9 +647,10 @@ private byte[] serializeValue(Event event, boolean parseAsFlumeEvent) throws IOE return bytes; } - private Event deserializeEvent(ConsumerRecord record, boolean parseAsFlumeEvent, boolean useKafkaHeader) throws IOException { - byte[] body; - if(parseAsFlumeEvent) { + private Event deserializeEvent(ConsumerRecord record, + boolean parseAsFlumeEvent, + boolean useKafkaHeader) throws IOException { + if (parseAsFlumeEvent) { ByteArrayInputStream in = new ByteArrayInputStream(record.value()); decoder = DecoderFactory.get().directBinaryDecoder(in, decoder); @@ -657,18 +659,22 @@ private Event deserializeEvent(ConsumerRecord record, boolean pa new SpecificDatumReader(AvroFlumeEvent.class)); } AvroFlumeEvent event = reader.get().read(null, decoder); - body = event.getBody().array(); - } else { - body = record.value(); - } - if(useKafkaHeader) { - return EventBuilder.withBody(body, toStringMap(record.headers())); + return EventBuilder.withBody(event.getBody().array(), toStringMap(event.getHeaders())); } else { - return EventBuilder.withBody(body); + return EventBuilder.withBody(record.value(), createHeaders(record, useKafkaHeader)); } } } + private static Map createHeaders( + ConsumerRecord record, boolean useKafkaHeader) { + if (useKafkaHeader) { + return toStringMap(record.headers()); + } else { + return new HashMap(4); + } + } + /** * Helper function to convert a map of String to a map of CharSequence. */ @@ -691,13 +697,6 @@ private static Map toStringMap(Map c } return stringMap; } - private static Headers toKafkaHeaders(Map headers) { - Headers kafkaHeaders = new RecordHeaders(); - for (Entry header : headers.entrySet()) { - kafkaHeaders.add(header.getKey(), header.getValue().getBytes()); - } - return kafkaHeaders; - } private static Map toStringMap(Headers headers) { Map stringMap = new HashMap(); @@ -707,6 +706,14 @@ private static Map toStringMap(Headers headers) { return stringMap; } + private static Headers toKafkaHeaders(Map headers) { + Headers kafkaHeaders = new RecordHeaders(); + for (Entry header : headers.entrySet()) { + kafkaHeaders.add(header.getKey(), header.getValue().getBytes()); + } + return kafkaHeaders; + } + /* Object to store our consumer */ private class ConsumerAndRecords { final KafkaConsumer consumer; diff --git a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestUseKafkaHeader.java b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestUseKafkaHeader.java index 788c143aee..f790e7f10a 100644 --- a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestUseKafkaHeader.java +++ b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestUseKafkaHeader.java @@ -23,8 +23,6 @@ import org.apache.flume.event.EventBuilder; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.header.Header; -import org.apache.kafka.common.header.internals.RecordHeaders; import org.junit.Assert; import org.junit.Test;