Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.security.JaasUtils;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
Expand All @@ -72,6 +75,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.Future;
Expand All @@ -97,6 +101,7 @@ public class KafkaChannel extends BasicChannelSemantics {

private AtomicReference<String> topic = new AtomicReference<String>();
private boolean parseAsFlumeEvent = DEFAULT_PARSE_AS_FLUME_EVENT;
private boolean useKafkaHeader = DEFAULT_USE_KAFKA_HEADER;
private String zookeeperConnect = null;
private String topicStr = DEFAULT_TOPIC;
private String groupId = DEFAULT_GROUP_ID;
Expand Down Expand Up @@ -193,6 +198,7 @@ public void configure(Context ctx) {
setConsumerProps(ctx, bootStrapServers);

parseAsFlumeEvent = ctx.getBoolean(PARSE_AS_FLUME_EVENT, DEFAULT_PARSE_AS_FLUME_EVENT);
useKafkaHeader = ctx.getBoolean(USE_KAFKA_HEADER, DEFAULT_USE_KAFKA_HEADER);
pollTimeout = ctx.getLong(POLL_TIMEOUT, DEFAULT_POLL_TIMEOUT);

staticPartitionId = ctx.getInteger(STATIC_PARTITION_CONF);
Expand Down Expand Up @@ -453,7 +459,12 @@ protected void doPut(Event event) throws InterruptedException {
partitionId = Integer.parseInt(headerVal);
}
}
if (partitionId != null) {
if (useKafkaHeader) {
producerRecords.get().add(
new ProducerRecord<String, byte[]>(topic.get(), partitionId, key,
serializeValue(event, parseAsFlumeEvent),
toKafkaHeaders(event.getHeaders())));
} else if (partitionId != null) {
producerRecords.get().add(
new ProducerRecord<String, byte[]>(topic.get(), partitionId, key,
serializeValue(event, parseAsFlumeEvent)));
Expand Down Expand Up @@ -507,7 +518,7 @@ protected Event doTake() throws InterruptedException {
}
if (consumerAndRecords.get().recordIterator.hasNext()) {
ConsumerRecord<String, byte[]> record = consumerAndRecords.get().recordIterator.next();
e = deserializeValue(record.value(), parseAsFlumeEvent);
e = deserializeEvent(record, parseAsFlumeEvent, useKafkaHeader);
TopicPartition tp = new TopicPartition(record.topic(), record.partition());
OffsetAndMetadata oam = new OffsetAndMetadata(record.offset() + 1, batchUUID);
consumerAndRecords.get().saveOffsets(tp,oam);
Expand Down Expand Up @@ -636,23 +647,31 @@ private byte[] serializeValue(Event event, boolean parseAsFlumeEvent) throws IOE
return bytes;
}

private Event deserializeValue(byte[] value, boolean parseAsFlumeEvent) throws IOException {
Event e;
private Event deserializeEvent(ConsumerRecord<String, byte[]> record,
boolean parseAsFlumeEvent,
boolean useKafkaHeader) throws IOException {
if (parseAsFlumeEvent) {
ByteArrayInputStream in =
new ByteArrayInputStream(value);
new ByteArrayInputStream(record.value());
decoder = DecoderFactory.get().directBinaryDecoder(in, decoder);
if (!reader.isPresent()) {
reader = Optional.of(
new SpecificDatumReader<AvroFlumeEvent>(AvroFlumeEvent.class));
}
AvroFlumeEvent event = reader.get().read(null, decoder);
e = EventBuilder.withBody(event.getBody().array(),
toStringMap(event.getHeaders()));
return EventBuilder.withBody(event.getBody().array(), toStringMap(event.getHeaders()));
} else {
e = EventBuilder.withBody(value, Collections.EMPTY_MAP);
return EventBuilder.withBody(record.value(), createHeaders(record, useKafkaHeader));
}
return e;
}
}

private static Map<String, String> createHeaders(
ConsumerRecord<String, byte[]> record, boolean useKafkaHeader) {
if (useKafkaHeader) {
return toStringMap(record.headers());
} else {
return new HashMap<String, String>(4);
}
}

Expand All @@ -679,6 +698,22 @@ private static Map<String, String> toStringMap(Map<CharSequence, CharSequence> c
return stringMap;
}

private static Map<String, String> toStringMap(Headers headers) {
Map<String, String> stringMap = new HashMap<String, String>();
for (Header header : headers) {
stringMap.put(header.key(), new String(header.value()));
}
return stringMap;
}

private static Headers toKafkaHeaders(Map<String, String> headers) {
Headers kafkaHeaders = new RecordHeaders();
for (Entry<String, String> header : headers.entrySet()) {
kafkaHeaders.add(header.getKey(), header.getValue().getBytes());
}
return kafkaHeaders;
}

/* Object to store our consumer */
private class ConsumerAndRecords {
final KafkaConsumer<String, byte[]> consumer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,7 @@ public class KafkaChannelConfiguration {

public static final String READ_SMALLEST_OFFSET = "readSmallestOffset";
public static final boolean DEFAULT_READ_SMALLEST_OFFSET = false;

public static final String USE_KAFKA_HEADER = "useKafkaHeader";
public static final boolean DEFAULT_USE_KAFKA_HEADER = false;
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.KAFKA_CONSUMER_PREFIX;
import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.PARSE_AS_FLUME_EVENT;
import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.USE_KAFKA_HEADER;
import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_USE_KAFKA_HEADER;
import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.TOPIC_CONFIG;

public class TestKafkaChannelBase {
Expand Down Expand Up @@ -96,23 +98,33 @@ static void deleteTopic(String topicName) {
testUtil.deleteTopic(topicName);
}

KafkaChannel startChannel(boolean parseAsFlume) throws Exception {
Context context = prepareDefaultContext(parseAsFlume);
KafkaChannel startChannel(boolean parseAsFlume, boolean useKafkaHeader) throws Exception {
Context context = prepareDefaultContext(parseAsFlume, useKafkaHeader);
KafkaChannel channel = createChannel(context);
channel.start();
return channel;
}

Context prepareDefaultContext(boolean parseAsFlume) {
KafkaChannel startChannel(boolean parseAsFlume) throws Exception {
return startChannel(parseAsFlume, DEFAULT_USE_KAFKA_HEADER);
}


Context prepareDefaultContext(boolean parseAsFlume, boolean useKafkaHeader) {
// Prepares a default context with Kafka Server Properties
Context context = new Context();
context.put(BOOTSTRAP_SERVERS_CONFIG, testUtil.getKafkaServerUrl());
context.put(PARSE_AS_FLUME_EVENT, String.valueOf(parseAsFlume));
context.put(USE_KAFKA_HEADER, String.valueOf(useKafkaHeader));
context.put(TOPIC_CONFIG, topic);
context.put(KAFKA_CONSUMER_PREFIX + "max.poll.interval.ms", "10000");

return context;
}

Context prepareDefaultContext(boolean parseAsFlume) {
return prepareDefaultContext(parseAsFlume, DEFAULT_USE_KAFKA_HEADER);
}

KafkaChannel createChannel(Context context) throws Exception {
final KafkaChannel channel = new KafkaChannel();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flume.event.EventBuilder;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.junit.Assert;
import org.junit.Test;

Expand Down Expand Up @@ -64,9 +65,12 @@ private void doParseAsFlumeEventFalse(Boolean checkHeaders) throws Exception {
KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);

for (int i = 0; i < 50; i++) {
RecordHeaders headers = new RecordHeaders();
headers.add(KEY_HEADER, (String.valueOf(i) + "-header").getBytes());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the significance of -header here? Can we make this configurable, leaving -header as default?

ProducerRecord<String, byte[]> data =
new ProducerRecord<>(topic, String.valueOf(i) + "-header",
String.valueOf(i).getBytes());
new ProducerRecord<>(topic, null, String.valueOf(i) + "-header",
String.valueOf(i).getBytes(), headers);

producer.send(data).get();
}
ExecutorCompletionService<Void> submitterSvc = new
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.flume.channel.kafka;

import org.apache.flume.Event;
import org.apache.flume.Transaction;
import org.apache.flume.event.EventBuilder;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.Assert;
import org.junit.Test;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Executors;

import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.KEY_HEADER;

public class TestUseKafkaHeader extends TestKafkaChannelBase {

@Test
public void testUseKafkaHeaderFalse() throws Exception {
doUseKafkaHeaderFalse(false);
}

@Test
public void testUseKafkaHeaderFalseCheckHeader() throws Exception {
doUseKafkaHeaderFalse(true);
}

@Test
public void tesUserKafkaHeaderFalseAsSource() throws Exception {
doUseKafkaHeaderFalseAsSource(false);
}

@Test
public void testUseKafkaHeaderFalseAsSourceCheckHeader() throws Exception {
doUseKafkaHeaderFalseAsSource(true);
}

private void doUseKafkaHeaderFalse(Boolean checkHeaders) throws Exception {
final KafkaChannel channel = startChannel(false, true);
Properties props = channel.getProducerProps();
KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);

for (int i = 0; i < 50; i++) {
ProducerRecord<String, byte[]> data =
new ProducerRecord<>(topic, String.valueOf(i) + "-header",
String.valueOf(i).getBytes());
producer.send(data).get();
}
ExecutorCompletionService<Void> submitterSvc = new
ExecutorCompletionService<>(Executors.newCachedThreadPool());
List<Event> events = pullEvents(channel, submitterSvc, 50, false, false);
wait(submitterSvc, 5);
Map<Integer, String> finals = new HashMap<>();
for (int i = 0; i < 50; i++) {
finals.put(Integer.parseInt(new String(events.get(i).getBody())),
events.get(i).getHeaders().get(KEY_HEADER));
}
for (int i = 0; i < 50; i++) {
Assert.assertTrue(finals.keySet().contains(i));
if (checkHeaders) {
Assert.assertTrue(finals.containsValue(String.valueOf(i) + "-header"));
}
finals.remove(i);
}
Assert.assertTrue(finals.isEmpty());
channel.stop();
}

/**
* Like the previous test but here we write to the channel like a Flume source would do
* to verify that the events are written as text and not as an Avro object
*
* @throws Exception
*/
private void doUseKafkaHeaderFalseAsSource(Boolean checkHeaders) throws Exception {
final KafkaChannel channel = startChannel(false);

List<String> msgs = new ArrayList<>();
Map<String, String> headers = new HashMap<>();
for (int i = 0; i < 50; i++) {
msgs.add(String.valueOf(i));
}
Transaction tx = channel.getTransaction();
tx.begin();
for (int i = 0; i < msgs.size(); i++) {
headers.put(KEY_HEADER, String.valueOf(i) + "-header");
channel.put(EventBuilder.withBody(msgs.get(i).getBytes(), headers));
}
tx.commit();
ExecutorCompletionService<Void> submitterSvc =
new ExecutorCompletionService<>(Executors.newCachedThreadPool());
List<Event> events = pullEvents(channel, submitterSvc, 50, false, false);
wait(submitterSvc, 5);
Map<Integer, String> finals = new HashMap<>();
for (int i = 0; i < 50; i++) {
finals.put(Integer.parseInt(new String(events.get(i).getBody())),
events.get(i).getHeaders().get(KEY_HEADER));
}
for (int i = 0; i < 50; i++) {
Assert.assertTrue(finals.keySet().contains(i));
if (checkHeaders) {
Assert.assertTrue(finals.containsValue(String.valueOf(i) + "-header"));
}
finals.remove(i);
}
Assert.assertTrue(finals.isEmpty());
channel.stop();
}
}
3 changes: 3 additions & 0 deletions flume-ng-doc/sphinx/FlumeUserGuide.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1483,6 +1483,7 @@ topicHeader topic Defines the name of the header
from, if the ``setTopicHeader`` property is set to ``true``. Care should be taken if combining
with the Kafka Sink ``topicHeader`` property so as to avoid sending the message back to the same
topic in a loop.
useKafkaHeader false Set true to read Kafka record headers and convert them as event header.
kafka.consumer.security.protocol PLAINTEXT Set to SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using some level of security. See below for additional info on secure setup.
*more consumer security props* If using SASL_PLAINTEXT, SASL_SSL or SSL refer to `Kafka security <http://kafka.apache.org/documentation.html#security>`_ for additional
properties that need to be set on consumer.
Expand Down Expand Up @@ -3166,6 +3167,7 @@ useFlumeEventFormat false By default events are p
true to store events as the Flume Avro binary format. Used in conjunction with the same property
on the KafkaSource or with the parseAsFlumeEvent property on the Kafka Channel this will preserve
any Flume headers for the producing side.
useKafkaHeader false Set to true to store event headers as Kafka record headers.
defaultPartitionId -- Specifies a Kafka partition ID (integer) for all events in this channel to be sent to, unless
overriden by ``partitionIdHeader``. By default, if this property is not set, events will be
distributed by the Kafka Producer's partitioner - including by ``key`` if specified (or by a
Expand Down Expand Up @@ -3575,6 +3577,7 @@ parseAsFlumeEvent true Expecting A
This should be true if Flume source is writing to the channel and false if other producers are
writing into the topic that the channel is using. Flume source messages to Kafka can be parsed outside of Flume by using
org.apache.flume.source.avro.AvroFlumeEvent provided by the flume-ng-sdk artifact
useKafkaHeader false Set true to use Kafka record headers as Flume event ones transparently.
pollTimeout 500 The amount of time(in milliseconds) to wait in the "poll()" call of the consumer.
https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long)
defaultPartitionId -- Specifies a Kafka partition ID (integer) for all events in this channel to be sent to, unless
Expand Down
Loading