FLUME-3327 add useKafkaHeader option to convert flume headers as kafk…#280
FLUME-3327 add useKafkaHeader option to convert flume headers as kafk…#280stakafum wants to merge 5 commits into
Conversation
…a ones transparently in KafkaSource, KafkaChannel, and KafkaSink
| Headers kafkaHeaders = new RecordHeaders(); | ||
| for (Entry<String, String> header : headers.entrySet()) { | ||
| kafkaHeaders.add(header.getKey(), header.getValue().getBytes()); | ||
| kafkaHeaders.add(header.getKey(), header.getValue().getBytes(Charsets.UTF_8)); |
There was a problem hiding this comment.
I think expected Charsets should be configurable through flume context parameters
| private static Map<String, String> toStringMap(Headers kafkaHeaders) { | ||
| Map<String, String> stringMap = new HashMap<String, String>(); | ||
| for (Header kafkaHeader : kafkaHeaders) { | ||
| stringMap.put(kafkaHeader.key(), new String(kafkaHeader.value())); |
There was a problem hiding this comment.
Should be construct through public String(byte bytes[], String charsetName) with a configurable Charset value
|
+1 for this feature ^^ |
|
|
||
| for (int i = 0; i < 50; i++) { | ||
| RecordHeaders headers = new RecordHeaders(); | ||
| headers.add(KEY_HEADER, (String.valueOf(i) + "-header").getBytes()); |
There was a problem hiding this comment.
What's the significance of -header here? Can we make this configurable, leaving -header as default?
|
Is there a chance for it to get in if all the comments are addressed? |
I don't have the time and resources to do a proper review here, but when the comments are addressed and @tmgstevens approves the change, then I don't see why not. |
|
Yes, I'm happy to push this once the comments have been resolved. |
|
This has conflicts with the patch I just applied for FLUME-3435 which adds support for the timestamp and headers to KafkaSink and KafkaSource. I did not add them to KafkaChannel as I wasn't sure if the semantics should be the same. |
…a ones transparently in KafkaSource,
KafkaChannel, and KafkaSink