Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ state "Standalone (1)" as Standalone1 <<NonBGActive>> {

Standalone1 --> Active1 : InitDomain

note right #white
note on link #white
existing consumer-groups:
1. ns-1.ms-1.orders
2. [new] ns-1.ms-1.orders-v1-a_i-2023-07-07_10-30-00
Expand Down Expand Up @@ -62,7 +62,7 @@ state "Active (2)" as Active1 <<Green>> {

Active1 --> BGPrepare1 : Warmup

note right #white
note on link #white
existing consumer-groups:
1. ns-1.ms-1.orders-v1-a_i-2023-07-07_10-30-00
2. [new] ns-1.ms-1.orders-v1-a_c-2023-07-07_11-30-00
Expand Down Expand Up @@ -99,7 +99,7 @@ state "BG Prepare (3)" as BGPrepare1 <<Green>> {

BGPrepare1 --> Active2 : Commit

note right #white
note on link #white
existing consumer-groups:
1. ns-1.ms-1.orders-v1-a_c-2023-07-07_11-30-00
2. ns-1.ms-1.orders-v2-c_a-2023-07-07_11-30-00
Expand Down Expand Up @@ -136,7 +136,7 @@ state "Active (4)" as Active2 <<Green>> {

Active2 --> BGPrepare2 : Warmup

note right #white
note on link #white
existing consumer-groups:
1. ns-1.ms-1.orders-v1-a_i-2023-07-07_12-30-00
2. [new] ns-1.ms-1.orders-v1-a_c-2023-07-07_13-30-00
Expand Down Expand Up @@ -173,7 +173,7 @@ state "BG Prepare (5)" as BGPrepare2 <<Green>> {

BGPrepare2 --> BGFinalize2 : Promote

note right #white
note on link #white
existing consumer-groups:
1. ns-1.ms-1.orders-v1-a_c-2023-07-07_13-30-00
2. ns-1.ms-1.orders-v3-c_a-2023-07-07_13-30-00
Expand Down Expand Up @@ -219,7 +219,7 @@ state "BG Finalize (6)" as BGFinalize2 <<Blue>> {

BGFinalize2 --> BGPrepare3 : Rollback

note right #white
note on link #white
existing consumer-groups:
1. ns-1.ms-1.orders-v1-l_a-2023-07-07_14-30-00
2. ns-1.ms-1.orders-v3-a_l-2023-07-07_14-30-00
Expand Down Expand Up @@ -265,7 +265,7 @@ state "BG Prepare (7)" as BGPrepare3 <<Green>> {

BGPrepare3 --> BGFinalize3 : Promote

note right #white
note on link #white
existing consumer-groups:
1. ns-1.ms-1.orders-v1-a_c-2023-07-07_15-30-00
2. ns-1.ms-1.orders-v3-c_a-2023-07-07_15-30-00
Expand Down Expand Up @@ -311,7 +311,7 @@ state "BG Finalize (8)" as BGFinalize3 <<Blue>> {

BGFinalize3 --> Active4 : Commit

note right #white
note on link #white
existing consumer-groups:
1. ns-1.ms-1.orders-v1-l_a-2023-07-07_16-30-00
2. ns-1.ms-1.orders-v3-a_l-2023-07-07_16-30-00
Expand Down Expand Up @@ -347,7 +347,7 @@ state "Active (9)" as Active4 <<Green>> {

Active4 --> Standalone2 : DestroyDomain

note right #white
note on link #white
existing consumer-groups:
1. ns-1.ms-1.orders-v3-a_i-2023-07-07_17-30-00
2. [new] ns-1.ms-1.orders
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,9 @@ public interface BGKafkaConsumer<K, V> extends AutoCloseable {

Collection<TopicPartition> assignment();

void setPartitionsAssignedListener(PartitionsAssignedListener listener);

void close();
}


Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.netcracker.cloud.maas.bluegreen.kafka;

import org.apache.kafka.common.TopicPartition;

import java.util.Collection;

@FunctionalInterface
public interface PartitionsAssignedListener {
void onPartitionsAssigned(Collection<TopicPartition> partitions);
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.netcracker.cloud.bluegreen.api.service.BlueGreenStatePublisher;
import com.netcracker.cloud.maas.bluegreen.kafka.BGKafkaConsumer;
import com.netcracker.cloud.maas.bluegreen.kafka.CommitMarker;
import com.netcracker.cloud.maas.bluegreen.kafka.PartitionsAssignedListener;
import com.netcracker.cloud.maas.bluegreen.kafka.Record;
import com.netcracker.cloud.maas.bluegreen.kafka.RecordsBatch;
import com.netcracker.cloud.maas.bluegreen.versiontracker.impl.VersionFilterConstructor;
Expand Down Expand Up @@ -38,6 +39,7 @@ public class BGKafkaConsumerImpl<K, V> implements BGKafkaConsumer<K, V> {

private BlueGreenState activeState;
private final AtomicReference<BlueGreenState> bgStateRef = new AtomicReference<>();
private PartitionsAssignedListener partitionsAssignedListener;

public BGKafkaConsumerImpl(BGKafkaConsumerConfig config) {
this.config = config;
Expand Down Expand Up @@ -125,6 +127,11 @@ public void commitSync(CommitMarker marker) {
}
}

@Override
public void setPartitionsAssignedListener(PartitionsAssignedListener listener) {
this.partitionsAssignedListener = listener;
}

@SneakyThrows
public void close() {
log.info("Closing consumer");
Expand Down Expand Up @@ -209,6 +216,9 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
log.debug("Adjusting current consumer offsets to: {}", alignedOffsets);
alignedOffsets.forEach(((topicPartition, offsetAndMetadata) -> kafkaConsumer.seek(topicPartition, offsetAndMetadata)));
if (partitionsAssignedListener != null) {
partitionsAssignedListener.onPartitionsAssigned(partitions);
}
}
};
log.debug("Subscribing kafka consumer to the topics: {}", config.getTopics().stream().sorted().toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.netcracker.cloud.maas.bluegreen.kafka.BGKafkaConsumer;
import com.netcracker.cloud.maas.bluegreen.kafka.CommitMarker;
import com.netcracker.cloud.maas.bluegreen.kafka.PartitionsAssignedListener;
import com.netcracker.cloud.maas.bluegreen.kafka.Record;
import com.netcracker.cloud.maas.bluegreen.kafka.RecordsBatch;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -21,6 +22,7 @@
public class DefaultKafkaConsumer<K, V> implements BGKafkaConsumer<K, V> {

private final Consumer<K, V> consumer;
private PartitionsAssignedListener partitionsAssignedListener;

public DefaultKafkaConsumer(BGKafkaConsumerConfig config) {
this.consumer = config.getConsumerSupplier().apply(config.getProperties());
Expand All @@ -36,6 +38,9 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
log.debug("Partitions were assigned: {}", partitions);
consumer.committed(new HashSet<>(partitions)).forEach(((topicPartition, offsetAndMetadata) ->
consumer.seek(topicPartition, Optional.ofNullable(offsetAndMetadata).map(OffsetAndMetadata::offset).orElse(0L))));
if (partitionsAssignedListener != null) {
partitionsAssignedListener.onPartitionsAssigned(partitions);
}
}
});
}
Expand Down Expand Up @@ -63,6 +68,11 @@ public void commitSync(CommitMarker marker) {
consumer.commitSync(marker.getPosition());
}

@Override
public void setPartitionsAssignedListener(PartitionsAssignedListener listener) {
this.partitionsAssignedListener = listener;
}

@Override
public void pause() {
this.consumer.pause(this.assignment());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,11 +275,6 @@ private ConsumerExecContext createExecContext(TopicAddress topic) {
execContext.setDeserializerHolder(deserializerHolder);
execContext.setPollDuration(Duration.ofMillis(pollDuration));

MaasConsumingExecutor executor;
executor = new MaasConsumingExecutor(execContext,
errorHandler, kafkaClientCreationService, recordFilters, statePublisher);
execContext.setExecutor(executor);

MaasKafkaBlueGreenDefinition blueGreenDefinition = consumerDefinition.getBlueGreenDefinition();
KafkaConsumerConfiguration.Builder kafkaConsumerConfigurationBuilder = KafkaConsumerConfiguration.builder(consumerCfg);
if (blueGreenDefinition != null) {
Expand All @@ -291,7 +286,12 @@ private ConsumerExecContext createExecContext(TopicAddress topic) {
.setLocaldev(blueGreenDefinition.isLocaldev())
.setVersioned(topic.isVersioned());
}
// Must be set before MaasConsumingExecutor construction: executor reads max.poll.records from here.
execContext.setBlueGreenConfiguration(kafkaConsumerConfigurationBuilder.build());

MaasConsumingExecutor executor = new MaasConsumingExecutor(execContext,
errorHandler, kafkaClientCreationService, recordFilters, statePublisher);
execContext.setExecutor(executor);
return execContext;
}

Expand Down
Loading
Loading