From 218a78786d6043529c72886dd7c5690701dbfc9b Mon Sep 17 00:00:00 2001 From: Demogorgon314 Date: Thu, 27 Jul 2023 19:41:43 +0800 Subject: [PATCH] Use config's metadata tenant when disable multi tenant metadata --- .../handlers/kop/KafkaProtocolHandler.java | 8 +++- .../handlers/kop/KafkaRequestHandlerTest.java | 45 +++++++++++++++++++ 2 files changed, 51 insertions(+), 2 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java index 660e81cf4f..ca83a70903 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java @@ -470,12 +470,16 @@ public ProducerStateManagerSnapshotBuffer apply(String tenant) { if (!kafkaConfig.isKafkaTransactionCoordinatorEnabled()) { return new MemoryProducerStateManagerSnapshotBuffer(); } - return getTransactionCoordinator(tenant) + if (kafkaConfig.isKafkaEnableMultiTenantMetadata()) { + return getTransactionCoordinator(tenant) + .getProducerStateManagerSnapshotBuffer(); + } + return getTransactionCoordinator(kafkaConfig.getKafkaMetadataTenant()) .getProducerStateManagerSnapshotBuffer(); } } - private Function getProducerStateManagerSnapshotBufferByTenant = + protected final Function getProducerStateManagerSnapshotBufferByTenant = new ProducerStateManagerSnapshotProvider(); // this is called after initialize, and with kafkaConfig, brokerService all set. diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java index 6ded463c1c..9870cdf040 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java @@ -19,6 +19,7 @@ import static org.apache.pulsar.common.naming.TopicName.PARTITIONED_TOPIC_SUFFIX; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; @@ -38,6 +39,9 @@ import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata; import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadataManager; import io.streamnative.pulsar.handlers.kop.offset.OffsetAndMetadata; +import io.streamnative.pulsar.handlers.kop.storage.MemoryProducerStateManagerSnapshotBuffer; +import io.streamnative.pulsar.handlers.kop.storage.ProducerStateManagerSnapshotBuffer; +import io.streamnative.pulsar.handlers.kop.storage.PulsarTopicProducerStateManagerSnapshotBuffer; import io.streamnative.pulsar.handlers.kop.utils.KafkaResponseUtils; import io.streamnative.pulsar.handlers.kop.utils.TopicNameUtils; import java.net.InetSocketAddress; @@ -58,10 +62,12 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.LongStream; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.NewPartitions; @@ -1098,6 +1104,45 @@ public void testCommitOffsetRetryWhenProducerClosed() assertEquals(fetchMessages, numMessages); } + @Test(timeOut = 30000) + public void testProducerStateManagerSnapshotProvider() throws IllegalAccessException { + boolean kafkaTransactionCoordinatorEnabled = conf.isKafkaTransactionCoordinatorEnabled(); + boolean kafkaEnableMultiTenantMetadata = conf.isKafkaEnableMultiTenantMetadata(); + try { + String tenant = "test-tenant"; + KafkaProtocolHandler protocolHandler = spy(getProtocolHandler()); + conf.setKafkaTransactionCoordinatorEnabled(false); + Function fun = + protocolHandler.getProducerStateManagerSnapshotBufferByTenant; + ProducerStateManagerSnapshotBuffer buffer = fun.apply(tenant); + assertNotNull(buffer); + assertTrue(buffer instanceof MemoryProducerStateManagerSnapshotBuffer); + + // test multi-tenant + conf.setKafkaEnableMultiTenantMetadata(true); + conf.setKafkaTransactionCoordinatorEnabled(true); + buffer = fun.apply(tenant); + assertNotNull(buffer); + assertTrue(buffer instanceof PulsarTopicProducerStateManagerSnapshotBuffer); + PulsarTopicProducerStateManagerSnapshotBuffer bufferImpl = + (PulsarTopicProducerStateManagerSnapshotBuffer) buffer; + String topic = (String) FieldUtils.readField(bufferImpl, "topic", true); + assertEquals(TopicName.get(topic).getTenant(), tenant); + + // test default tenant + conf.setKafkaEnableMultiTenantMetadata(false); + buffer = fun.apply(tenant); + assertNotNull(buffer); + assertTrue(buffer instanceof PulsarTopicProducerStateManagerSnapshotBuffer); + bufferImpl = (PulsarTopicProducerStateManagerSnapshotBuffer) buffer; + topic = (String) FieldUtils.readField(bufferImpl, "topic", true); + assertEquals(TopicName.get(topic).getTenant(), conf.getKafkaMetadataTenant()); + } finally { + conf.setKafkaTransactionCoordinatorEnabled(kafkaTransactionCoordinatorEnabled); + conf.setKafkaEnableMultiTenantMetadata(kafkaEnableMultiTenantMetadata); + } + } + private KafkaHeaderAndRequest createTopicMetadataRequest(List topics, boolean allowAutoTopicCreation) { AbstractRequest.Builder builder = new MetadataRequest.Builder(topics, allowAutoTopicCreation); return buildRequest(builder);