diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java index 852b4bd586..5572ba1838 100644 --- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java +++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java @@ -53,6 +53,9 @@ import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.security.JaasUtils; import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; @@ -72,6 +75,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Properties; import java.util.UUID; import java.util.concurrent.Future; @@ -97,6 +101,7 @@ public class KafkaChannel extends BasicChannelSemantics { private AtomicReference topic = new AtomicReference(); private boolean parseAsFlumeEvent = DEFAULT_PARSE_AS_FLUME_EVENT; + private boolean useKafkaHeader = DEFAULT_USE_KAFKA_HEADER; private String zookeeperConnect = null; private String topicStr = DEFAULT_TOPIC; private String groupId = DEFAULT_GROUP_ID; @@ -193,6 +198,7 @@ public void configure(Context ctx) { setConsumerProps(ctx, bootStrapServers); parseAsFlumeEvent = ctx.getBoolean(PARSE_AS_FLUME_EVENT, DEFAULT_PARSE_AS_FLUME_EVENT); + useKafkaHeader = ctx.getBoolean(USE_KAFKA_HEADER, DEFAULT_USE_KAFKA_HEADER); pollTimeout = ctx.getLong(POLL_TIMEOUT, DEFAULT_POLL_TIMEOUT); staticPartitionId = ctx.getInteger(STATIC_PARTITION_CONF); @@ -453,7 +459,12 @@ protected void doPut(Event event) throws InterruptedException { partitionId = Integer.parseInt(headerVal); } } - if (partitionId != null) { + if (useKafkaHeader) { + producerRecords.get().add( + new ProducerRecord(topic.get(), partitionId, key, + serializeValue(event, parseAsFlumeEvent), + toKafkaHeaders(event.getHeaders()))); + } else if (partitionId != null) { producerRecords.get().add( new ProducerRecord(topic.get(), partitionId, key, serializeValue(event, parseAsFlumeEvent))); @@ -507,7 +518,7 @@ protected Event doTake() throws InterruptedException { } if (consumerAndRecords.get().recordIterator.hasNext()) { ConsumerRecord record = consumerAndRecords.get().recordIterator.next(); - e = deserializeValue(record.value(), parseAsFlumeEvent); + e = deserializeEvent(record, parseAsFlumeEvent, useKafkaHeader); TopicPartition tp = new TopicPartition(record.topic(), record.partition()); OffsetAndMetadata oam = new OffsetAndMetadata(record.offset() + 1, batchUUID); consumerAndRecords.get().saveOffsets(tp,oam); @@ -636,23 +647,31 @@ private byte[] serializeValue(Event event, boolean parseAsFlumeEvent) throws IOE return bytes; } - private Event deserializeValue(byte[] value, boolean parseAsFlumeEvent) throws IOException { - Event e; + private Event deserializeEvent(ConsumerRecord record, + boolean parseAsFlumeEvent, + boolean useKafkaHeader) throws IOException { if (parseAsFlumeEvent) { ByteArrayInputStream in = - new ByteArrayInputStream(value); + new ByteArrayInputStream(record.value()); decoder = DecoderFactory.get().directBinaryDecoder(in, decoder); if (!reader.isPresent()) { reader = Optional.of( new SpecificDatumReader(AvroFlumeEvent.class)); } AvroFlumeEvent event = reader.get().read(null, decoder); - e = EventBuilder.withBody(event.getBody().array(), - toStringMap(event.getHeaders())); + return EventBuilder.withBody(event.getBody().array(), toStringMap(event.getHeaders())); } else { - e = EventBuilder.withBody(value, Collections.EMPTY_MAP); + return EventBuilder.withBody(record.value(), createHeaders(record, useKafkaHeader)); } - return e; + } + } + + private static Map createHeaders( + ConsumerRecord record, boolean useKafkaHeader) { + if (useKafkaHeader) { + return toStringMap(record.headers()); + } else { + return new HashMap(4); } } @@ -679,6 +698,22 @@ private static Map toStringMap(Map c return stringMap; } + private static Map toStringMap(Headers headers) { + Map stringMap = new HashMap(); + for (Header header : headers) { + stringMap.put(header.key(), new String(header.value())); + } + return stringMap; + } + + private static Headers toKafkaHeaders(Map headers) { + Headers kafkaHeaders = new RecordHeaders(); + for (Entry header : headers.entrySet()) { + kafkaHeaders.add(header.getKey(), header.getValue().getBytes()); + } + return kafkaHeaders; + } + /* Object to store our consumer */ private class ConsumerAndRecords { final KafkaConsumer consumer; diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java index ad5a15bf8b..1c75cd3000 100644 --- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java +++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java @@ -70,4 +70,7 @@ public class KafkaChannelConfiguration { public static final String READ_SMALLEST_OFFSET = "readSmallestOffset"; public static final boolean DEFAULT_READ_SMALLEST_OFFSET = false; + + public static final String USE_KAFKA_HEADER = "useKafkaHeader"; + public static final boolean DEFAULT_USE_KAFKA_HEADER = false; } diff --git a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannelBase.java b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannelBase.java index e1279a39c5..c874d8cb4d 100644 --- a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannelBase.java +++ b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannelBase.java @@ -47,6 +47,8 @@ import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.BOOTSTRAP_SERVERS_CONFIG; import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.KAFKA_CONSUMER_PREFIX; import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.PARSE_AS_FLUME_EVENT; +import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.USE_KAFKA_HEADER; +import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_USE_KAFKA_HEADER; import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.TOPIC_CONFIG; public class TestKafkaChannelBase { @@ -96,23 +98,33 @@ static void deleteTopic(String topicName) { testUtil.deleteTopic(topicName); } - KafkaChannel startChannel(boolean parseAsFlume) throws Exception { - Context context = prepareDefaultContext(parseAsFlume); + KafkaChannel startChannel(boolean parseAsFlume, boolean useKafkaHeader) throws Exception { + Context context = prepareDefaultContext(parseAsFlume, useKafkaHeader); KafkaChannel channel = createChannel(context); channel.start(); return channel; } - Context prepareDefaultContext(boolean parseAsFlume) { + KafkaChannel startChannel(boolean parseAsFlume) throws Exception { + return startChannel(parseAsFlume, DEFAULT_USE_KAFKA_HEADER); + } + + + Context prepareDefaultContext(boolean parseAsFlume, boolean useKafkaHeader) { // Prepares a default context with Kafka Server Properties Context context = new Context(); context.put(BOOTSTRAP_SERVERS_CONFIG, testUtil.getKafkaServerUrl()); context.put(PARSE_AS_FLUME_EVENT, String.valueOf(parseAsFlume)); + context.put(USE_KAFKA_HEADER, String.valueOf(useKafkaHeader)); context.put(TOPIC_CONFIG, topic); context.put(KAFKA_CONSUMER_PREFIX + "max.poll.interval.ms", "10000"); return context; } + + Context prepareDefaultContext(boolean parseAsFlume) { + return prepareDefaultContext(parseAsFlume, DEFAULT_USE_KAFKA_HEADER); + } KafkaChannel createChannel(Context context) throws Exception { final KafkaChannel channel = new KafkaChannel(); diff --git a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestParseAsFlumeEvent.java b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestParseAsFlumeEvent.java index 6baad77055..3d359bdbff 100644 --- a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestParseAsFlumeEvent.java +++ b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestParseAsFlumeEvent.java @@ -23,6 +23,7 @@ import org.apache.flume.event.EventBuilder; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.internals.RecordHeaders; import org.junit.Assert; import org.junit.Test; @@ -64,9 +65,12 @@ private void doParseAsFlumeEventFalse(Boolean checkHeaders) throws Exception { KafkaProducer producer = new KafkaProducer<>(props); for (int i = 0; i < 50; i++) { + RecordHeaders headers = new RecordHeaders(); + headers.add(KEY_HEADER, (String.valueOf(i) + "-header").getBytes()); ProducerRecord data = - new ProducerRecord<>(topic, String.valueOf(i) + "-header", - String.valueOf(i).getBytes()); + new ProducerRecord<>(topic, null, String.valueOf(i) + "-header", + String.valueOf(i).getBytes(), headers); + producer.send(data).get(); } ExecutorCompletionService submitterSvc = new diff --git a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestUseKafkaHeader.java b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestUseKafkaHeader.java new file mode 100644 index 0000000000..f790e7f10a --- /dev/null +++ b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestUseKafkaHeader.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.channel.kafka; + +import org.apache.flume.Event; +import org.apache.flume.Transaction; +import org.apache.flume.event.EventBuilder; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.Executors; + +import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.KEY_HEADER; + +public class TestUseKafkaHeader extends TestKafkaChannelBase { + + @Test + public void testUseKafkaHeaderFalse() throws Exception { + doUseKafkaHeaderFalse(false); + } + + @Test + public void testUseKafkaHeaderFalseCheckHeader() throws Exception { + doUseKafkaHeaderFalse(true); + } + + @Test + public void tesUserKafkaHeaderFalseAsSource() throws Exception { + doUseKafkaHeaderFalseAsSource(false); + } + + @Test + public void testUseKafkaHeaderFalseAsSourceCheckHeader() throws Exception { + doUseKafkaHeaderFalseAsSource(true); + } + + private void doUseKafkaHeaderFalse(Boolean checkHeaders) throws Exception { + final KafkaChannel channel = startChannel(false, true); + Properties props = channel.getProducerProps(); + KafkaProducer producer = new KafkaProducer<>(props); + + for (int i = 0; i < 50; i++) { + ProducerRecord data = + new ProducerRecord<>(topic, String.valueOf(i) + "-header", + String.valueOf(i).getBytes()); + producer.send(data).get(); + } + ExecutorCompletionService submitterSvc = new + ExecutorCompletionService<>(Executors.newCachedThreadPool()); + List events = pullEvents(channel, submitterSvc, 50, false, false); + wait(submitterSvc, 5); + Map finals = new HashMap<>(); + for (int i = 0; i < 50; i++) { + finals.put(Integer.parseInt(new String(events.get(i).getBody())), + events.get(i).getHeaders().get(KEY_HEADER)); + } + for (int i = 0; i < 50; i++) { + Assert.assertTrue(finals.keySet().contains(i)); + if (checkHeaders) { + Assert.assertTrue(finals.containsValue(String.valueOf(i) + "-header")); + } + finals.remove(i); + } + Assert.assertTrue(finals.isEmpty()); + channel.stop(); + } + + /** + * Like the previous test but here we write to the channel like a Flume source would do + * to verify that the events are written as text and not as an Avro object + * + * @throws Exception + */ + private void doUseKafkaHeaderFalseAsSource(Boolean checkHeaders) throws Exception { + final KafkaChannel channel = startChannel(false); + + List msgs = new ArrayList<>(); + Map headers = new HashMap<>(); + for (int i = 0; i < 50; i++) { + msgs.add(String.valueOf(i)); + } + Transaction tx = channel.getTransaction(); + tx.begin(); + for (int i = 0; i < msgs.size(); i++) { + headers.put(KEY_HEADER, String.valueOf(i) + "-header"); + channel.put(EventBuilder.withBody(msgs.get(i).getBytes(), headers)); + } + tx.commit(); + ExecutorCompletionService submitterSvc = + new ExecutorCompletionService<>(Executors.newCachedThreadPool()); + List events = pullEvents(channel, submitterSvc, 50, false, false); + wait(submitterSvc, 5); + Map finals = new HashMap<>(); + for (int i = 0; i < 50; i++) { + finals.put(Integer.parseInt(new String(events.get(i).getBody())), + events.get(i).getHeaders().get(KEY_HEADER)); + } + for (int i = 0; i < 50; i++) { + Assert.assertTrue(finals.keySet().contains(i)); + if (checkHeaders) { + Assert.assertTrue(finals.containsValue(String.valueOf(i) + "-header")); + } + finals.remove(i); + } + Assert.assertTrue(finals.isEmpty()); + channel.stop(); + } +} diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index cccb0283a1..1e417f821d 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -1483,6 +1483,7 @@ topicHeader topic Defines the name of the header from, if the ``setTopicHeader`` property is set to ``true``. Care should be taken if combining with the Kafka Sink ``topicHeader`` property so as to avoid sending the message back to the same topic in a loop. +useKafkaHeader false Set true to read Kafka record headers and convert them as event header. kafka.consumer.security.protocol PLAINTEXT Set to SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using some level of security. See below for additional info on secure setup. *more consumer security props* If using SASL_PLAINTEXT, SASL_SSL or SSL refer to `Kafka security `_ for additional properties that need to be set on consumer. @@ -3166,6 +3167,7 @@ useFlumeEventFormat false By default events are p true to store events as the Flume Avro binary format. Used in conjunction with the same property on the KafkaSource or with the parseAsFlumeEvent property on the Kafka Channel this will preserve any Flume headers for the producing side. +useKafkaHeader false Set to true to store event headers as Kafka record headers. defaultPartitionId -- Specifies a Kafka partition ID (integer) for all events in this channel to be sent to, unless overriden by ``partitionIdHeader``. By default, if this property is not set, events will be distributed by the Kafka Producer's partitioner - including by ``key`` if specified (or by a @@ -3575,6 +3577,7 @@ parseAsFlumeEvent true Expecting A This should be true if Flume source is writing to the channel and false if other producers are writing into the topic that the channel is using. Flume source messages to Kafka can be parsed outside of Flume by using org.apache.flume.source.avro.AvroFlumeEvent provided by the flume-ng-sdk artifact +useKafkaHeader false Set true to use Kafka record headers as Flume event ones transparently. pollTimeout 500 The amount of time(in milliseconds) to wait in the "poll()" call of the consumer. https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long) defaultPartitionId -- Specifies a Kafka partition ID (integer) for all events in this channel to be sent to, unless diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java index d32753f8e1..fad9ad202a 100644 --- a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java @@ -18,6 +18,7 @@ Licensed to the Apache Software Foundation (ASF) under one or more package org.apache.flume.sink.kafka; +import com.google.common.base.Charsets; import com.google.common.base.Optional; import com.google.common.base.Throwables; import org.apache.avro.io.BinaryEncoder; @@ -43,6 +44,8 @@ Licensed to the Apache Software Foundation (ASF) under one or more import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,6 +56,7 @@ Licensed to the Apache Software Foundation (ASF) under one or more import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Properties; import java.util.concurrent.Future; @@ -72,7 +76,6 @@ Licensed to the Apache Software Foundation (ASF) under one or more import static org.apache.flume.sink.kafka.KafkaSinkConstants.KEY_SERIALIZER_KEY; import static org.apache.flume.sink.kafka.KafkaSinkConstants.MESSAGE_SERIALIZER_KEY; - /** * A Flume Sink that can publish messages to Kafka. * This is a general implementation that can be used with any Flume agent and @@ -116,6 +119,7 @@ public class KafkaSink extends AbstractSink implements Configurable, BatchSizeSu private List> kafkaFutures; private KafkaSinkCounter counter; private boolean useAvroEventFormat; + private boolean useKafkaHeader; private String partitionHeader = null; private Integer staticPartitionId = null; private boolean allowTopicOverride; @@ -203,7 +207,6 @@ public Status process() throws EventDeliveryException { // create a message and add to buffer long startTime = System.currentTimeMillis(); - Integer partitionId = null; try { ProducerRecord record; @@ -217,7 +220,10 @@ public Status process() throws EventDeliveryException { partitionId = Integer.parseInt(headerVal); } } - if (partitionId != null) { + if (useKafkaHeader) { + record = new ProducerRecord(eventTopic, partitionId, eventKey, + serializeEvent(event, useAvroEventFormat), toKafkaHeaders(headers)); + } else if (partitionId != null) { record = new ProducerRecord(eventTopic, partitionId, eventKey, serializeEvent(event, useAvroEventFormat)); } else { @@ -329,6 +335,9 @@ public void configure(Context context) { useAvroEventFormat = context.getBoolean(KafkaSinkConstants.AVRO_EVENT, KafkaSinkConstants.DEFAULT_AVRO_EVENT); + useKafkaHeader = context.getBoolean(KafkaSinkConstants.KAFKA_HEADER, + KafkaSinkConstants.DEFAULT_KAFKA_HEADER); + partitionHeader = context.getString(KafkaSinkConstants.PARTITION_HEADER_NAME); staticPartitionId = context.getInteger(KafkaSinkConstants.STATIC_PARTITION_CONF); @@ -425,7 +434,15 @@ private void setProducerProps(Context context, String bootStrapServers) { KafkaSSLUtil.addGlobalSSLParameters(kafkaProps); } - + + private Headers toKafkaHeaders(Map headers) { + Headers kafkaHeaders = new RecordHeaders(); + for (Entry header : headers.entrySet()) { + kafkaHeaders.add(header.getKey(), header.getValue().getBytes(Charsets.UTF_8)); + } + return kafkaHeaders; + } + protected Properties getKafkaProps() { return kafkaProps; } diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java index ffca3df72d..af29e4c6a4 100644 --- a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java @@ -41,6 +41,9 @@ public class KafkaSinkConstants { public static final String AVRO_EVENT = "useFlumeEventFormat"; public static final boolean DEFAULT_AVRO_EVENT = false; + public static final String KAFKA_HEADER = "useKafkaHeader"; + public static final boolean DEFAULT_KAFKA_HEADER = false; + public static final String PARTITION_HEADER_NAME = "partitionIdHeader"; public static final String STATIC_PARTITION_CONF = "defaultPartitionId"; diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java index 8d6dce74c8..79665e3ca2 100644 --- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java @@ -21,6 +21,7 @@ Licensed to the Apache Software Foundation (ASF) under one or more public class TestConstants { public static final String STATIC_TOPIC = "static-topic"; public static final String HEADER_TOPIC = "%{header1}-topic"; + public static final String KAFKA_HEADER_TOPIC = "kafka-header-topic"; public static final String CUSTOM_KEY = "custom-key"; public static final String CUSTOM_TOPIC = "custom-topic"; public static final String HEADER_1_VALUE = "test-avro-header"; diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java index 5a94c82db9..fd76c5a168 100644 --- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java @@ -43,6 +43,10 @@ Licensed to the Apache Software Foundation (ASF) under one or more import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.header.internals.RecordHeaders; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -66,6 +70,7 @@ Licensed to the Apache Software Foundation (ASF) under one or more import static org.apache.flume.sink.kafka.KafkaSinkConstants.BROKER_LIST_FLUME_KEY; import static org.apache.flume.sink.kafka.KafkaSinkConstants.DEFAULT_KEY_SERIALIZER; import static org.apache.flume.sink.kafka.KafkaSinkConstants.DEFAULT_TOPIC; +import static org.apache.flume.sink.kafka.KafkaSinkConstants.KAFKA_HEADER; import static org.apache.flume.sink.kafka.KafkaSinkConstants.KAFKA_PREFIX; import static org.apache.flume.sink.kafka.KafkaSinkConstants.KAFKA_PRODUCER_PREFIX; import static org.apache.flume.sink.kafka.KafkaSinkConstants.OLD_BATCH_SIZE; @@ -87,11 +92,12 @@ public class TestKafkaSink { @BeforeClass public static void setup() { testUtil.prepare(); - List topics = new ArrayList(3); + List topics = new ArrayList(5); topics.add(DEFAULT_TOPIC); topics.add(TestConstants.STATIC_TOPIC); topics.add(TestConstants.CUSTOM_TOPIC); topics.add(TestConstants.HEADER_1_VALUE + "-topic"); + topics.add(TestConstants.KAFKA_HEADER_TOPIC); testUtil.initTopicList(topics); } @@ -187,6 +193,15 @@ private void checkMessageArrived(String msg, String topic) { assertEquals(msg, consumerRecord.value()); } + private void checkMessageAndHeaderArrived(String msg, String topic, Headers headers) { + ConsumerRecords recs = pollConsumerRecords(topic); + assertNotNull(recs); + assertTrue(recs.count() > 0); + ConsumerRecord consumerRecord = (ConsumerRecord) recs.iterator().next(); + assertEquals(msg, consumerRecord.value()); + assertEquals(headers, consumerRecord.headers()); + } + @Test public void testStaticTopic() { Context context = prepareDefaultContext(); @@ -330,7 +345,7 @@ public void testReplaceSubStringOfTopicWithHeaders() { kafkaSink.start(); String msg = "test-replace-substring-of-topic-with-headers"; - Map headers = new HashMap<>(); + Map headers = new HashMap(); headers.put(TestConstants.HEADER_1_KEY, TestConstants.HEADER_1_VALUE); Transaction tx = memoryChannel.getTransaction(); tx.begin(); @@ -426,6 +441,44 @@ private ConsumerRecords pollConsumerRecords(String topic, int ma return recs; } + @Test + public void testKafkaHeaders() { + Sink kafkaSink = new KafkaSink(); + Context context = prepareDefaultContext(); + context.put(TOPIC_CONFIG, TestConstants.KAFKA_HEADER_TOPIC); + context.put(KAFKA_HEADER, "true"); + Configurables.configure(kafkaSink, context); + Channel memoryChannel = new MemoryChannel(); + Configurables.configure(memoryChannel, context); + kafkaSink.setChannel(memoryChannel); + kafkaSink.start(); + + String msg = "test-kafka-headers"; + Map headers = new HashMap(); + headers.put(TestConstants.HEADER_1_KEY, TestConstants.HEADER_1_VALUE); + Transaction tx = memoryChannel.getTransaction(); + tx.begin(); + Event event = EventBuilder.withBody(msg.getBytes(), headers); + memoryChannel.put(event); + tx.commit(); + tx.close(); + + try { + Sink.Status status = kafkaSink.process(); + if (status == Sink.Status.BACKOFF) { + fail("Error Occurred"); + } + } catch (EventDeliveryException ex) { + // ignore + } + + checkMessageAndHeaderArrived(msg, TestConstants.KAFKA_HEADER_TOPIC, + new RecordHeaders(new Header[]{ + new RecordHeader(TestConstants.HEADER_1_KEY, + TestConstants.HEADER_1_VALUE.getBytes())})); + + } + @Test public void testEmptyChannel() throws EventDeliveryException { Sink kafkaSink = new KafkaSink(); diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java index b02285d913..ed35318735 100644 --- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java +++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java @@ -63,6 +63,8 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.security.JaasUtils; import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; @@ -124,6 +126,7 @@ public class KafkaSource extends AbstractPollableSource private BinaryDecoder decoder = null; private boolean useAvroEventFormat; + private boolean useKafkaHeader; private int batchUpperLimit; private int maxBatchDurationMillis; @@ -253,7 +256,7 @@ protected Status doProcess() throws EventDeliveryException { } else { eventBody = message.value(); headers.clear(); - headers = new HashMap(4); + headers = createHeaders(message, useKafkaHeader); } // Add headers to event (timestamp, topic, partition, key) only if they don't exist @@ -332,6 +335,15 @@ protected Status doProcess() throws EventDeliveryException { return Status.BACKOFF; } } + + private static Map createHeaders( + ConsumerRecord message, boolean useKafkaHeader) { + if (useKafkaHeader) { + return toStringMap(message.headers()); + } else { + return new HashMap(4); + } + } /** * We configure the source and generate properties for the Kafka Consumer @@ -376,6 +388,9 @@ protected void doConfigure(Context context) throws FlumeException { useAvroEventFormat = context.getBoolean(KafkaSourceConstants.AVRO_EVENT, KafkaSourceConstants.DEFAULT_AVRO_EVENT); + useKafkaHeader = context.getBoolean(KafkaSourceConstants.USE_KAFKA_HEADER, + KafkaSourceConstants.DEFAULT_USE_KAFKA_HEADER); + if (log.isDebugEnabled()) { log.debug(KafkaSourceConstants.AVRO_EVENT + " set to: {}", useAvroEventFormat); } @@ -515,6 +530,14 @@ private static Map toStringMap(Map c return stringMap; } + private static Map toStringMap(Headers kafkaHeaders) { + Map stringMap = new HashMap(); + for (Header kafkaHeader : kafkaHeaders) { + stringMap.put(kafkaHeader.key(), new String(kafkaHeader.value())); + } + return stringMap; + } + Subscriber getSubscriber() { return subscriber; } diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java index 0e15e73800..3ad120238b 100644 --- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java +++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java @@ -43,6 +43,9 @@ public class KafkaSourceConstants { public static final String AVRO_EVENT = "useFlumeEventFormat"; public static final boolean DEFAULT_AVRO_EVENT = false; + public static final String USE_KAFKA_HEADER = "useKafkaHeader"; + public static final boolean DEFAULT_USE_KAFKA_HEADER = false; + /* Old Properties */ public static final String ZOOKEEPER_CONNECT_FLUME_KEY = "zookeeperConnect"; public static final String TOPIC = "topic"; diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java index 56a582a188..5ac66241eb 100644 --- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java +++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java @@ -25,6 +25,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.StringSerializer; @@ -112,8 +113,17 @@ public void produce(String topic, String k, String v) { produce(topic, k, v.getBytes()); } + public void produce(String topic, String k, String v, Headers headers) { + produce(topic, k, v.getBytes(), headers); + } + public void produce(String topic, String k, byte[] v) { - ProducerRecord rec = new ProducerRecord(topic, k, v); + produce(topic, k, v, null); + } + + public void produce(String topic, String k, byte[] v, Headers headers) { + ProducerRecord rec = + new ProducerRecord(topic, null, k, v, headers); try { producer.send(rec).get(); } catch (InterruptedException e) { diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java index d866c98ed5..1f468f8dd3 100644 --- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java +++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java @@ -17,11 +17,41 @@ package org.apache.flume.source.kafka; -import com.google.common.base.Charsets; -import com.google.common.collect.Lists; -import junit.framework.Assert; +import static org.apache.flume.source.kafka.KafkaSourceConstants.AVRO_EVENT; +import static org.apache.flume.source.kafka.KafkaSourceConstants.BATCH_DURATION_MS; +import static org.apache.flume.source.kafka.KafkaSourceConstants.BATCH_SIZE; +import static org.apache.flume.source.kafka.KafkaSourceConstants.BOOTSTRAP_SERVERS; +import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_AUTO_COMMIT; +import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_TOPIC_HEADER; +import static org.apache.flume.source.kafka.KafkaSourceConstants.KAFKA_CONSUMER_PREFIX; +import static org.apache.flume.source.kafka.KafkaSourceConstants.USE_KAFKA_HEADER; +import static org.apache.flume.source.kafka.KafkaSourceConstants.OLD_GROUP_ID; +import static org.apache.flume.source.kafka.KafkaSourceConstants.PARTITION_HEADER; +import static org.apache.flume.source.kafka.KafkaSourceConstants.TIMESTAMP_HEADER; +import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPIC; +import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPICS; +import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPICS_REGEX; +import static org.apache.flume.source.kafka.KafkaSourceConstants.ZOOKEEPER_CONNECT_FLUME_KEY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.regex.Pattern; -import kafka.zk.KafkaZkClient; import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.EncoderFactory; import org.apache.avro.specific.SpecificDatumWriter; @@ -38,7 +68,6 @@ import org.apache.flume.source.avro.AvroFlumeEvent; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.common.errors.TopicExistsException; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.KafkaProducer; @@ -46,6 +75,10 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.TopicExistsException; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.security.JaasUtils; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.utils.Time; @@ -61,39 +94,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.regex.Pattern; +import com.google.common.base.Charsets; +import com.google.common.collect.Lists; -import static org.apache.flume.source.kafka.KafkaSourceConstants.AVRO_EVENT; -import static org.apache.flume.source.kafka.KafkaSourceConstants.BATCH_DURATION_MS; -import static org.apache.flume.source.kafka.KafkaSourceConstants.BATCH_SIZE; -import static org.apache.flume.source.kafka.KafkaSourceConstants.BOOTSTRAP_SERVERS; -import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_AUTO_COMMIT; -import static org.apache.flume.source.kafka.KafkaSourceConstants.KAFKA_CONSUMER_PREFIX; -import static org.apache.flume.source.kafka.KafkaSourceConstants.OLD_GROUP_ID; -import static org.apache.flume.source.kafka.KafkaSourceConstants.PARTITION_HEADER; -import static org.apache.flume.source.kafka.KafkaSourceConstants.TIMESTAMP_HEADER; -import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPIC; -import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPICS; -import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPICS_REGEX; -import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_TOPIC_HEADER; -import static org.apache.flume.source.kafka.KafkaSourceConstants.ZOOKEEPER_CONNECT_FLUME_KEY; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; +import junit.framework.Assert; +import kafka.zk.KafkaZkClient; public class TestKafkaSource { private static final Logger log = LoggerFactory.getLogger(TestKafkaSource.class); @@ -103,9 +108,10 @@ public class TestKafkaSource { private Context context; private List events; - private final List usedTopics = new ArrayList<>(); + private final List usedTopics = new ArrayList(); private String topic0; private String topic1; + private String topic2; @BeforeClass @@ -113,7 +119,6 @@ public static void startKafkaServer() { kafkaServer = new KafkaSourceEmbeddedKafka(null); startupCheck(); } - @SuppressWarnings("unchecked") @Before public void setup() throws Exception { @@ -125,6 +130,9 @@ public void setup() throws Exception { topic1 = findUnusedTopic(); kafkaServer.createTopic(topic1, 3); usedTopics.add(topic1); + topic2 = findUnusedTopic(); + kafkaServer.createTopic(topic2, 1); + usedTopics.add(topic2); } catch (TopicExistsException e) { //do nothing e.printStackTrace(); @@ -697,6 +705,36 @@ public void testAvroEvent() throws InterruptedException, EventDeliveryException, } + @Test + public void testKafkaHeader() throws EventDeliveryException, + SecurityException, NoSuchFieldException, IllegalArgumentException, + IllegalAccessException, InterruptedException { + context.put(TOPICS, topic0); + context.put(BATCH_SIZE, "1"); + context.put(USE_KAFKA_HEADER, "true"); + kafkaSource.configure(context); + startKafkaSource(); + + Thread.sleep(500L); + + kafkaServer.produce(topic0, "", "kafka header", + new RecordHeaders(new Header[]{new RecordHeader("k1", "v1".getBytes())})); + + Thread.sleep(500L); + Assert.assertEquals(Status.READY, kafkaSource.process()); + Assert.assertEquals(Status.BACKOFF, kafkaSource.process()); + Assert.assertEquals(1, events.size()); + + Assert.assertEquals("kafka header", new String(events.get(0).getBody(), + Charsets.UTF_8)); + Assert.assertEquals(6, events.get(0).getHeaders().size()); + Assert.assertEquals(topic0, events.get(0).getHeaders().get("topic")); + Assert.assertEquals("0", events.get(0).getHeaders().get("partition")); + Assert.assertEquals("0", events.get(0).getHeaders().get("offset")); + Assert.assertTrue(events.get(0).getHeaders().containsKey("key")); + Assert.assertTrue(events.get(0).getHeaders().containsKey("timestamp")); + } + @Test public void testBootstrapLookup() { Context context = new Context();