Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ state "Standalone (1)" as Standalone1 <<NonBGActive>> {

Standalone1 --> Active1 : InitDomain

note right #white
note on link #white
existing consumer-groups:
1. ns-1.ms-1.orders
2. [new] ns-1.ms-1.orders-v1-a_i-2023-07-07_10-30-00
Expand Down Expand Up @@ -62,7 +62,7 @@ state "Active (2)" as Active1 <<Green>> {

Active1 --> BGPrepare1 : Warmup

note right #white
note on link #white
existing consumer-groups:
1. ns-1.ms-1.orders-v1-a_i-2023-07-07_10-30-00
2. [new] ns-1.ms-1.orders-v1-a_c-2023-07-07_11-30-00
Expand Down Expand Up @@ -99,7 +99,7 @@ state "BG Prepare (3)" as BGPrepare1 <<Green>> {

BGPrepare1 --> Active2 : Commit

note right #white
note on link #white
existing consumer-groups:
1. ns-1.ms-1.orders-v1-a_c-2023-07-07_11-30-00
2. ns-1.ms-1.orders-v2-c_a-2023-07-07_11-30-00
Expand Down Expand Up @@ -136,7 +136,7 @@ state "Active (4)" as Active2 <<Green>> {

Active2 --> BGPrepare2 : Warmup

note right #white
note on link #white
existing consumer-groups:
1. ns-1.ms-1.orders-v1-a_i-2023-07-07_12-30-00
2. [new] ns-1.ms-1.orders-v1-a_c-2023-07-07_13-30-00
Expand Down Expand Up @@ -173,7 +173,7 @@ state "BG Prepare (5)" as BGPrepare2 <<Green>> {

BGPrepare2 --> BGFinalize2 : Promote

note right #white
note on link #white
existing consumer-groups:
1. ns-1.ms-1.orders-v1-a_c-2023-07-07_13-30-00
2. ns-1.ms-1.orders-v3-c_a-2023-07-07_13-30-00
Expand Down Expand Up @@ -219,7 +219,7 @@ state "BG Finalize (6)" as BGFinalize2 <<Blue>> {

BGFinalize2 --> BGPrepare3 : Rollback

note right #white
note on link #white
existing consumer-groups:
1. ns-1.ms-1.orders-v1-l_a-2023-07-07_14-30-00
2. ns-1.ms-1.orders-v3-a_l-2023-07-07_14-30-00
Expand Down Expand Up @@ -265,7 +265,7 @@ state "BG Prepare (7)" as BGPrepare3 <<Green>> {

BGPrepare3 --> BGFinalize3 : Promote

note right #white
note on link #white
existing consumer-groups:
1. ns-1.ms-1.orders-v1-a_c-2023-07-07_15-30-00
2. ns-1.ms-1.orders-v3-c_a-2023-07-07_15-30-00
Expand Down Expand Up @@ -311,7 +311,7 @@ state "BG Finalize (8)" as BGFinalize3 <<Blue>> {

BGFinalize3 --> Active4 : Commit

note right #white
note on link #white
existing consumer-groups:
1. ns-1.ms-1.orders-v1-l_a-2023-07-07_16-30-00
2. ns-1.ms-1.orders-v3-a_l-2023-07-07_16-30-00
Expand Down Expand Up @@ -347,7 +347,7 @@ state "Active (9)" as Active4 <<Green>> {

Active4 --> Standalone2 : DestroyDomain

note right #white
note on link #white
existing consumer-groups:
1. ns-1.ms-1.orders-v3-a_i-2023-07-07_17-30-00
2. [new] ns-1.ms-1.orders
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,9 @@ public interface BGKafkaConsumer<K, V> extends AutoCloseable {

Collection<TopicPartition> assignment();

void setPartitionsAssignedListener(PartitionsAssignedListener listener);

void close();
}


Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.netcracker.cloud.maas.bluegreen.kafka;

import org.apache.kafka.common.TopicPartition;

import java.util.Collection;

@FunctionalInterface
public interface PartitionsAssignedListener {
void onPartitionsAssigned(Collection<TopicPartition> partitions);
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.netcracker.cloud.bluegreen.api.service.BlueGreenStatePublisher;
import com.netcracker.cloud.maas.bluegreen.kafka.BGKafkaConsumer;
import com.netcracker.cloud.maas.bluegreen.kafka.CommitMarker;
import com.netcracker.cloud.maas.bluegreen.kafka.PartitionsAssignedListener;
import com.netcracker.cloud.maas.bluegreen.kafka.Record;
import com.netcracker.cloud.maas.bluegreen.kafka.RecordsBatch;
import com.netcracker.cloud.maas.bluegreen.versiontracker.impl.VersionFilterConstructor;
Expand Down Expand Up @@ -38,6 +39,7 @@ public class BGKafkaConsumerImpl<K, V> implements BGKafkaConsumer<K, V> {

private BlueGreenState activeState;
private final AtomicReference<BlueGreenState> bgStateRef = new AtomicReference<>();
private PartitionsAssignedListener partitionsAssignedListener;

public BGKafkaConsumerImpl(BGKafkaConsumerConfig config) {
this.config = config;
Expand Down Expand Up @@ -125,6 +127,11 @@ public void commitSync(CommitMarker marker) {
}
}

@Override
public void setPartitionsAssignedListener(PartitionsAssignedListener listener) {
this.partitionsAssignedListener = listener;
}

@SneakyThrows
public void close() {
log.info("Closing consumer");
Expand Down Expand Up @@ -209,6 +216,9 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
log.debug("Adjusting current consumer offsets to: {}", alignedOffsets);
alignedOffsets.forEach(((topicPartition, offsetAndMetadata) -> kafkaConsumer.seek(topicPartition, offsetAndMetadata)));
if (partitionsAssignedListener != null) {
partitionsAssignedListener.onPartitionsAssigned(partitions);
}
}
};
log.debug("Subscribing kafka consumer to the topics: {}", config.getTopics().stream().sorted().toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.netcracker.cloud.maas.bluegreen.kafka.BGKafkaConsumer;
import com.netcracker.cloud.maas.bluegreen.kafka.CommitMarker;
import com.netcracker.cloud.maas.bluegreen.kafka.PartitionsAssignedListener;
import com.netcracker.cloud.maas.bluegreen.kafka.Record;
import com.netcracker.cloud.maas.bluegreen.kafka.RecordsBatch;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -21,6 +22,7 @@
public class DefaultKafkaConsumer<K, V> implements BGKafkaConsumer<K, V> {

private final Consumer<K, V> consumer;
private PartitionsAssignedListener partitionsAssignedListener;

public DefaultKafkaConsumer(BGKafkaConsumerConfig config) {
this.consumer = config.getConsumerSupplier().apply(config.getProperties());
Expand All @@ -36,6 +38,9 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
log.debug("Partitions were assigned: {}", partitions);
consumer.committed(new HashSet<>(partitions)).forEach(((topicPartition, offsetAndMetadata) ->
consumer.seek(topicPartition, Optional.ofNullable(offsetAndMetadata).map(OffsetAndMetadata::offset).orElse(0L))));
if (partitionsAssignedListener != null) {
partitionsAssignedListener.onPartitionsAssigned(partitions);
}
}
});
}
Expand Down Expand Up @@ -63,6 +68,11 @@ public void commitSync(CommitMarker marker) {
consumer.commitSync(marker.getPosition());
}

@Override
public void setPartitionsAssignedListener(PartitionsAssignedListener listener) {
this.partitionsAssignedListener = listener;
}

@Override
public void pause() {
this.consumer.pause(this.assignment());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.jupiter.api.Assertions;
Expand All @@ -23,12 +24,14 @@

import java.lang.reflect.Field;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
Expand Down Expand Up @@ -221,6 +224,40 @@ void testIdleState() throws Exception {
}
}

@Test
void partitionsAssignedListenerIsCalledAfterRebalance() {
var connectionProperties = Map.<String, Object>of("group.id", TEST_GROUP_NAME);
InMemoryBlueGreenStatePublisher statePublisher = new InMemoryBlueGreenStatePublisher(NAMESPACE_1);
Consumer consumer = Mockito.mock(Consumer.class);
ConsumerRebalanceListener[] rebalanceListener = new ConsumerRebalanceListener[1];
Mockito.doAnswer(i -> {
rebalanceListener[0] = i.getArgument(1);
return null;
}).when(consumer).subscribe(Mockito.any(Collection.class), Mockito.any(ConsumerRebalanceListener.class));

AdminAdapter adminAdapter = Mockito.mock(AdminAdapter.class);
Mockito.when(adminAdapter.listConsumerGroup()).thenReturn(List.of());
Mockito.when(consumer.partitionsFor(TEST_TOPIC_NAME)).thenReturn(List.of(partitionInfo(0)));
Mockito.when(consumer.offsetsForTimes(Mockito.any())).thenReturn(Map.of());
Mockito.when(consumer.endOffsets(Mockito.any())).thenReturn(topicOffsetLongMap(topicOffsetLong(0, 1)));

Mockito.when(consumer.poll(Mockito.any())).thenReturn(new ConsumerRecords<>(Map.of()));

try (BGKafkaConsumer<String, String> bgConsumer = new BGKafkaConsumerImpl<>(BGKafkaConsumerConfig.builder(connectionProperties,
TEST_TOPIC_NAME, M2M_TOKEN_SUPPLIER, statePublisher)
.deserializers(new StringDeserializer(), new StringDeserializer())
.consistencyMode(ConsumerConsistencyMode.GUARANTEE_CONSUMPTION)
.consumerSupplier(props -> consumer)
.adminAdapterSupplier(props -> adminAdapter).build())) {

var called = new AtomicBoolean();
bgConsumer.setPartitionsAssignedListener(partitions -> called.set(true));
bgConsumer.poll(Duration.ofMillis(10));
rebalanceListener[0].onPartitionsAssigned(List.of(new TopicPartition(TEST_TOPIC_NAME, 0)));
assertTrue(called.get());
}
}

@Test
void testXVersionHeader() {
ConsumerRecord<String, String> record = new ConsumerRecord<>(TEST_TOPIC_NAME, 1, 1, "key-1", "value-1");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package com.netcracker.cloud.maas.bluegreen.kafka.impl;

import com.netcracker.cloud.bluegreen.impl.service.InMemoryBlueGreenStatePublisher;
import com.netcracker.cloud.maas.bluegreen.kafka.PartitionsAssignedListener;
import com.netcracker.cloud.maas.client.impl.Env;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.Test;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anySet;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

class DefaultKafkaConsumerPartitionsAssignedListenerTest {

@Test
void partitionsAssignedListenerIsInvokedOnRebalance() {
System.setProperty(Env.PROP_NAMESPACE, "test-namespace");
Consumer<String, String> kafkaConsumer = mock(Consumer.class);
ConsumerRebalanceListener[] rebalanceListener = new ConsumerRebalanceListener[1];
doAnswer(inv -> {
rebalanceListener[0] = inv.getArgument(1);
return null;
}).when(kafkaConsumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class));
Map<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<>();
committedOffsets.put(new TopicPartition("orders", 0), null);
when(kafkaConsumer.committed(anySet())).thenReturn(committedOffsets);

var config = BGKafkaConsumerConfig.builder(
Map.of("group.id", "g1", "bootstrap.servers", "localhost:9092"),
"orders",
() -> "token",
new InMemoryBlueGreenStatePublisher(Env.namespace()))
.consumerSupplier(props -> kafkaConsumer)
.build();

var invoked = new AtomicBoolean();
try (var consumer = new DefaultKafkaConsumer<String, String>(config)) {
consumer.setPartitionsAssignedListener((PartitionsAssignedListener) partitions -> invoked.set(true));
var partitions = List.of(new TopicPartition("orders", 0));
rebalanceListener[0].onPartitionsAssigned(partitions);
}

assertTrue(invoked.get());
verify(kafkaConsumer).seek(new TopicPartition("orders", 0), 0L);
}

@Test
void partitionsAssignedListenerUsesCommittedOffsetWhenPresent() {
System.setProperty(Env.PROP_NAMESPACE, "test-namespace");
Consumer<String, String> kafkaConsumer = mock(Consumer.class);
ConsumerRebalanceListener[] rebalanceListener = new ConsumerRebalanceListener[1];
doAnswer(inv -> {
rebalanceListener[0] = inv.getArgument(1);
return null;
}).when(kafkaConsumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class));

var tp = new TopicPartition("orders", 1);
when(kafkaConsumer.committed(anySet())).thenReturn(Map.of(tp, new OffsetAndMetadata(42L)));

var config = BGKafkaConsumerConfig.builder(
Map.of("group.id", "g1", "bootstrap.servers", "localhost:9092"),
"orders",
() -> "token",
new InMemoryBlueGreenStatePublisher(Env.namespace()))
.consumerSupplier(props -> kafkaConsumer)
.build();

try (var consumer = new DefaultKafkaConsumer<String, String>(config)) {
consumer.setPartitionsAssignedListener(partitions -> {});
rebalanceListener[0].onPartitionsAssigned(List.of(tp));
}

verify(kafkaConsumer).seek(tp, 42L);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -275,11 +275,6 @@ private ConsumerExecContext createExecContext(TopicAddress topic) {
execContext.setDeserializerHolder(deserializerHolder);
execContext.setPollDuration(Duration.ofMillis(pollDuration));

MaasConsumingExecutor executor;
executor = new MaasConsumingExecutor(execContext,
errorHandler, kafkaClientCreationService, recordFilters, statePublisher);
execContext.setExecutor(executor);

MaasKafkaBlueGreenDefinition blueGreenDefinition = consumerDefinition.getBlueGreenDefinition();
KafkaConsumerConfiguration.Builder kafkaConsumerConfigurationBuilder = KafkaConsumerConfiguration.builder(consumerCfg);
if (blueGreenDefinition != null) {
Expand All @@ -291,7 +286,12 @@ private ConsumerExecContext createExecContext(TopicAddress topic) {
.setLocaldev(blueGreenDefinition.isLocaldev())
.setVersioned(topic.isVersioned());
}
// Must be set before MaasConsumingExecutor construction: executor reads max.poll.records from here.
execContext.setBlueGreenConfiguration(kafkaConsumerConfigurationBuilder.build());

MaasConsumingExecutor executor = new MaasConsumingExecutor(execContext,
errorHandler, kafkaClientCreationService, recordFilters, statePublisher);
execContext.setExecutor(executor);
return execContext;
}

Expand Down
Loading
Loading