Skip to content

FLUME-3327 add useKafkaHeader option to convert flume headers as kafk…#280

Open
stakafum wants to merge 5 commits into
apache:trunkfrom
stakafum:FLUME-3327
Open

FLUME-3327 add useKafkaHeader option to convert flume headers as kafk…#280
stakafum wants to merge 5 commits into
apache:trunkfrom
stakafum:FLUME-3327

Conversation

@stakafum

@stakafum stakafum commented Apr 3, 2019

Copy link
Copy Markdown
Contributor

…a ones transparently in KafkaSource,

KafkaChannel, and KafkaSink

Headers kafkaHeaders = new RecordHeaders();
for (Entry<String, String> header : headers.entrySet()) {
kafkaHeaders.add(header.getKey(), header.getValue().getBytes());
kafkaHeaders.add(header.getKey(), header.getValue().getBytes(Charsets.UTF_8));

@slem1 slem1 Sep 3, 2019

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think expected Charsets should be configurable through flume context parameters

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

private static Map<String, String> toStringMap(Headers kafkaHeaders) {
Map<String, String> stringMap = new HashMap<String, String>();
for (Header kafkaHeader : kafkaHeaders) {
stringMap.put(kafkaHeader.key(), new String(kafkaHeader.value()));

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be construct through public String(byte bytes[], String charsetName) with a configurable Charset value

@slem1

slem1 commented Sep 3, 2019

Copy link
Copy Markdown

+1 for this feature ^^


for (int i = 0; i < 50; i++) {
RecordHeaders headers = new RecordHeaders();
headers.add(KEY_HEADER, (String.valueOf(i) + "-header").getBytes());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the significance of -header here? Can we make this configurable, leaving -header as default?

@kamazee

kamazee commented Sep 3, 2020

Copy link
Copy Markdown

Is there a chance for it to get in if all the comments are addressed?

@bessbd

bessbd commented Sep 7, 2020

Copy link
Copy Markdown
Member

Is there a chance for it to get in if all the comments are addressed?

I don't have the time and resources to do a proper review here, but when the comments are addressed and @tmgstevens approves the change, then I don't see why not.

@tmgstevens

Copy link
Copy Markdown
Contributor

Yes, I'm happy to push this once the comments have been resolved.

@rgoers

rgoers commented Oct 8, 2022

Copy link
Copy Markdown
Member

This has conflicts with the patch I just applied for FLUME-3435 which adds support for the timestamp and headers to KafkaSink and KafkaSource. I did not add them to KafkaChannel as I wasn't sure if the semantics should be the same.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants