From 1169f0c354b3ca8cd4596f3022c227b31178b22d Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Thu, 12 Mar 2026 14:52:54 -0700 Subject: [PATCH 1/5] [cleanup] Convert 10 test classes to SharedPulsarBaseTest BytesKeyTest, ConsumerAckListTest, ConsumerCleanupTest, CustomMessageIdTest, FailoverSubscriptionTest, NonPartitionedTopicExpectedTest, PersistentTopicTerminateTest, ProducerCleanupTest, ProducerQueueSizeTest, ReplicateSubscriptionTest --- .../pulsar/client/api/BytesKeyTest.java | 23 +++--------- .../client/api/ConsumerAckListTest.java | 26 ++++---------- .../client/api/ConsumerCleanupTest.java | 29 +++++---------- .../client/api/CustomMessageIdTest.java | 23 +++--------- .../client/api/FailoverSubscriptionTest.java | 22 +++--------- .../api/NonPartitionedTopicExpectedTest.java | 35 ++++++------------- .../api/PersistentTopicTerminateTest.java | 24 +++---------- .../client/api/ProducerCleanupTest.java | 23 ++---------- .../client/api/ProducerQueueSizeTest.java | 20 ++--------- .../client/api/ReplicateSubscriptionTest.java | 27 +++----------- 10 files changed, 52 insertions(+), 200 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BytesKeyTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BytesKeyTest.java index fdc34add89bf7..69453f9265e9a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BytesKeyTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BytesKeyTest.java @@ -20,38 +20,25 @@ import java.util.Random; import java.util.concurrent.TimeUnit; +import org.apache.pulsar.broker.service.SharedPulsarBaseTest; import org.testng.Assert; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @Test(groups = "broker-api") -public class BytesKeyTest extends ProducerConsumerBase { - - @BeforeMethod - @Override - protected void setup() throws Exception { - super.internalSetup(); - super.producerBaseSetup(); - } - - @AfterMethod(alwaysRun = true) - @Override - protected void cleanup() throws Exception { - super.internalCleanup(); - } +public class BytesKeyTest extends SharedPulsarBaseTest { private void byteKeysTest(boolean batching) throws Exception { Random r = new Random(0); + String topic = newTopicName(); Consumer consumer = pulsarClient.newConsumer(Schema.STRING) - .topic("persistent://my-property/my-ns/my-topic1") + .topic(topic) .subscriptionName("my-subscriber-name").subscribe(); Producer producer = pulsarClient.newProducer(Schema.STRING) .enableBatching(batching) .batchingMaxPublishDelay(Long.MAX_VALUE, TimeUnit.SECONDS) .batchingMaxMessages(Integer.MAX_VALUE) - .topic("persistent://my-property/my-ns/my-topic1").create(); + .topic(topic).create(); byte[] byteKey = new byte[1000]; r.nextBytes(byteKey); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerAckListTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerAckListTest.java index ec5540dbbd0b3..3d085425b13b2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerAckListTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerAckListTest.java @@ -25,27 +25,13 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import lombok.Cleanup; +import org.apache.pulsar.broker.service.SharedPulsarBaseTest; import org.testng.Assert; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @Test(groups = "broker-api") -public class ConsumerAckListTest extends ProducerConsumerBase { - - @BeforeClass - @Override - protected void setup() throws Exception { - super.internalSetup(); - super.producerBaseSetup(); - } - - @AfterClass(alwaysRun = true) - @Override - protected void cleanup() throws Exception { - super.internalCleanup(); - } +public class ConsumerAckListTest extends SharedPulsarBaseTest { @DataProvider(name = "ackReceiptEnabled") public Object[][] ackReceiptEnabled() { @@ -61,7 +47,7 @@ public void testBatchListAck(boolean ackReceiptEnabled) throws Exception { } private void ackListMessage(boolean isBatch, boolean isPartitioned, boolean ackReceiptEnabled) throws Exception { - final String topic = "persistent://my-property/my-ns/batch-ack-" + UUID.randomUUID(); + final String topic = newTopicName(); final String subName = "testBatchAck-sub" + UUID.randomUUID(); final int messageNum = ThreadLocalRandom.current().nextInt(50, 100); if (isPartitioned) { @@ -113,9 +99,9 @@ private void sendMessagesAsyncAndWait(Producer producer, int messages) t @Test(timeOut = 30000) public void testAckMessageInAnotherTopic() throws Exception { final String[] topics = { - "persistent://my-property/my-ns/test-ack-message-in-other-topic1" + UUID.randomUUID(), - "persistent://my-property/my-ns/test-ack-message-in-other-topic2" + UUID.randomUUID(), - "persistent://my-property/my-ns/test-ack-message-in-other-topic3" + UUID.randomUUID() + newTopicName(), + newTopicName(), + newTopicName() }; @Cleanup final Consumer allTopicsConsumer = pulsarClient.newConsumer(Schema.STRING) .topic(topics) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCleanupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCleanupTest.java index 0cbfde0638172..c043182f6b950 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCleanupTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCleanupTest.java @@ -19,32 +19,16 @@ package org.apache.pulsar.client.api; import io.netty.util.HashedWheelTimer; -import java.util.UUID; +import java.util.concurrent.TimeUnit; import lombok.Cleanup; +import org.apache.pulsar.broker.service.SharedPulsarBaseTest; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.testng.Assert; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @Test(groups = "broker-api") -public class ConsumerCleanupTest extends ProducerConsumerBase { - - @BeforeClass - @Override - protected void setup() throws Exception { - // use Pulsar binary lookup since the HTTP client shares the Pulsar client timer - isTcpLookup = true; - super.internalSetup(); - super.producerBaseSetup(); - } - - @AfterClass(alwaysRun = true) - @Override - protected void cleanup() throws Exception { - super.internalCleanup(); - } +public class ConsumerCleanupTest extends SharedPulsarBaseTest { @DataProvider(name = "ackReceiptEnabled") public Object[][] ackReceiptEnabled() { @@ -55,9 +39,12 @@ public Object[][] ackReceiptEnabled() { public void testAllTimerTaskShouldCanceledAfterConsumerClosed(boolean ackReceiptEnabled) throws PulsarClientException, InterruptedException { @Cleanup - PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 1); + PulsarClient pulsarClient = PulsarClient.builder() + .serviceUrl(getBrokerServiceUrl()) + .statsInterval(1, TimeUnit.SECONDS) + .build(); Consumer consumer = pulsarClient.newConsumer() - .topic("persistent://public/default/" + UUID.randomUUID().toString()) + .topic(newTopicName()) .subscriptionName("test") .isAckReceiptEnabled(ackReceiptEnabled) .subscribe(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/CustomMessageIdTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/CustomMessageIdTest.java index d64a2d8b7e465..e8a73aa5a3274 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/CustomMessageIdTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/CustomMessageIdTest.java @@ -25,26 +25,12 @@ import java.util.ArrayList; import java.util.concurrent.TimeUnit; import lombok.Cleanup; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; +import org.apache.pulsar.broker.service.SharedPulsarBaseTest; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @Test(groups = "broker-api") -public class CustomMessageIdTest extends ProducerConsumerBase { - - @BeforeClass - @Override - protected void setup() throws Exception { - super.internalSetup(); - super.producerBaseSetup(); - } - - @AfterClass(alwaysRun = true) - @Override - protected void cleanup() throws Exception { - super.internalCleanup(); - } +public class CustomMessageIdTest extends SharedPulsarBaseTest { @DataProvider public static Object[][] enableBatching() { @@ -56,7 +42,7 @@ public static Object[][] enableBatching() { @Test public void testSeek() throws Exception { - final var topic = "persistent://my-property/my-ns/test-seek-" + System.currentTimeMillis(); + final var topic = newTopicName(); @Cleanup final var producer = pulsarClient.newProducer(Schema.INT32).topic(topic).create(); final var msgIds = new ArrayList(); for (int i = 0; i < 10; i++) { @@ -72,8 +58,7 @@ public void testSeek() throws Exception { @Test(dataProvider = "enableBatching") public void testAcknowledgment(boolean enableBatching) throws Exception { - final var topic = "persistent://my-property/my-ns/test-ack-" - + enableBatching + System.currentTimeMillis(); + final var topic = newTopicName(); final var producer = pulsarClient.newProducer(Schema.INT32) .topic(topic) .enableBatching(enableBatching) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/FailoverSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/FailoverSubscriptionTest.java index 6263193b0a4a2..04d6aaf39ed9d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/FailoverSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/FailoverSubscriptionTest.java @@ -23,38 +23,24 @@ import java.util.Map; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; -import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.service.SharedPulsarBaseTest; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.impl.ConsumerImpl; import org.awaitility.Awaitility; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @Slf4j @Test(groups = "broker-api") -public class FailoverSubscriptionTest extends ProducerConsumerBase { - @BeforeClass(alwaysRun = true) - @Override - protected void setup() throws Exception { - super.internalSetup(); - super.producerBaseSetup(); - } - - @AfterClass(alwaysRun = true) - @Override - protected void cleanup() throws Exception { - super.internalCleanup(); - } +public class FailoverSubscriptionTest extends SharedPulsarBaseTest { @Test(timeOut = 30_000, invocationCount = 5) public void testWaitingCursorsCountAfterSwitchingActiveConsumers() throws Exception { - final String tp = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp"); + final String tp = newTopicName(); final String subscription = "s1"; admin.topics().createNonPartitionedTopic(tp); admin.topics().createSubscription(tp, subscription, MessageId.earliest); - PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopic(tp, false).join().get(); + PersistentTopic topic = (PersistentTopic) getTopic(tp, false).join().get(); Map> consumerMap = new HashMap<>(); ConsumerImpl firstConsumer = (ConsumerImpl) pulsarClient.newConsumer().topic(tp) .subscriptionType(SubscriptionType.Failover).subscriptionName(subscription).subscribe(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPartitionedTopicExpectedTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPartitionedTopicExpectedTest.java index 7b0edd314d055..928a23b0a4f23 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPartitionedTopicExpectedTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPartitionedTopicExpectedTest.java @@ -19,36 +19,22 @@ package org.apache.pulsar.client.api; import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.service.SharedPulsarBaseTest; +import org.apache.pulsar.broker.service.SharedPulsarCluster; import org.apache.pulsar.client.impl.ProducerBuilderImpl; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; import org.apache.pulsar.common.policies.data.TopicType; import org.testng.Assert; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @Slf4j -public class NonPartitionedTopicExpectedTest extends ProducerConsumerBase { - - @BeforeClass - @Override - protected void setup() throws Exception { - super.internalSetup(); - super.producerBaseSetup(); - } - - @AfterClass(alwaysRun = true) - @Override - protected void cleanup() throws Exception { - super.internalCleanup(); - } +public class NonPartitionedTopicExpectedTest extends SharedPulsarBaseTest { @Test public void testWhenNonPartitionedTopicExists() throws Exception { - final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + final String topic = newTopicName(); admin.topics().createNonPartitionedTopic(topic); ProducerBuilderImpl producerBuilder = (ProducerBuilderImpl) pulsarClient.newProducer(Schema.STRING).topic(topic); @@ -62,7 +48,7 @@ public void testWhenNonPartitionedTopicExists() throws Exception { @Test public void testWhenPartitionedTopicExists() throws Exception { - final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + final String topic = newTopicName(); admin.topics().createPartitionedTopic(topic, 2); ProducerBuilderImpl producerBuilder = (ProducerBuilderImpl) pulsarClient.newProducer(Schema.STRING).topic(topic); @@ -89,8 +75,8 @@ public Object[][] topicTypes() { @Test(dataProvider = "topicTypes") public void testWhenTopicNotExists(TopicType topicType) throws Exception { - final String namespace = "public/default"; - final String topic = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/tp"); + final String namespace = getNamespace(); + final String topic = newTopicName(); final TopicName topicName = TopicName.get(topic); AutoTopicCreationOverride.Builder policyBuilder = AutoTopicCreationOverride.builder() .topicType(topicType.toString()).allowAutoTopicCreation(true); @@ -106,9 +92,10 @@ public void testWhenTopicNotExists(TopicType topicType) throws Exception { // Verify: create successfully. Producer producer = producerBuilder.create(); // Verify: only create non-partitioned topic. - Assert.assertFalse(pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources() - .partitionedTopicExists(topicName)); - Assert.assertTrue(pulsar.getNamespaceService().checkNonPartitionedTopicExists(topicName).join()); + Assert.assertFalse(SharedPulsarCluster.get().getPulsarService().getPulsarResources() + .getNamespaceResources().getPartitionedTopicResources().partitionedTopicExists(topicName)); + Assert.assertTrue(SharedPulsarCluster.get().getPulsarService().getNamespaceService() + .checkNonPartitionedTopicExists(topicName).join()); // cleanup. producer.close(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PersistentTopicTerminateTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PersistentTopicTerminateTest.java index 69c012b7622a2..4cd9371cd9d74 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PersistentTopicTerminateTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PersistentTopicTerminateTest.java @@ -25,34 +25,18 @@ import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; -import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.service.SharedPulsarBaseTest; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.awaitility.Awaitility; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @Test(groups = "broker-api") @Slf4j -public class PersistentTopicTerminateTest extends ProducerConsumerBase { - - - @BeforeClass - @Override - protected void setup() throws Exception { - super.internalSetup(); - super.producerBaseSetup(); - } - - @AfterClass(alwaysRun = true) - @Override - protected void cleanup() throws Exception { - super.internalCleanup(); - } +public class PersistentTopicTerminateTest extends SharedPulsarBaseTest { @Test public void testRecoverAfterTerminate() throws Exception { - final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + final String topicName = newTopicName(); final String subscriptionName = "s1"; admin.topics().createNonPartitionedTopic(topicName); admin.topics().createSubscription(topicName, subscriptionName, MessageId.earliest); @@ -83,7 +67,7 @@ public void testRecoverAfterTerminate() throws Exception { admin.topics().skipAllMessages(topicName, subscriptionName); Awaitility.await().untilAsserted(() -> { PersistentTopic persistentTopic = - (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).join().get(); + (PersistentTopic) getTopic(topicName, false).join().get(); ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger(); CompletableFuture trimLedgersFuture = new CompletableFuture<>(); ml.trimConsumedLedgersInBackground(trimLedgersFuture); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCleanupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCleanupTest.java index f4daba3bf0536..2a7fcb3eb17af 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCleanupTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCleanupTest.java @@ -19,36 +19,19 @@ package org.apache.pulsar.client.api; import io.netty.util.HashedWheelTimer; -import java.util.UUID; import java.util.concurrent.TimeUnit; +import org.apache.pulsar.broker.service.SharedPulsarBaseTest; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.testng.Assert; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @Test(groups = "broker-api") -public class ProducerCleanupTest extends ProducerConsumerBase { - - @BeforeMethod - @Override - protected void setup() throws Exception { - // use Pulsar binary lookup since the HTTP client shares the Pulsar client timer - isTcpLookup = true; - super.internalSetup(); - super.producerBaseSetup(); - } - - @AfterMethod(alwaysRun = true) - @Override - protected void cleanup() throws Exception { - super.internalCleanup(); - } +public class ProducerCleanupTest extends SharedPulsarBaseTest { @Test public void testAllTimerTaskShouldCanceledAfterProducerClosed() throws PulsarClientException, InterruptedException { Producer producer = pulsarClient.newProducer() - .topic("persistent://public/default/" + UUID.randomUUID().toString()) + .topic(newTopicName()) .sendTimeout(1, TimeUnit.SECONDS) .create(); producer.close(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerQueueSizeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerQueueSizeTest.java index 0c470ae587512..2e17913599da5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerQueueSizeTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerQueueSizeTest.java @@ -22,25 +22,11 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import lombok.Cleanup; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; +import org.apache.pulsar.broker.service.SharedPulsarBaseTest; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; -public class ProducerQueueSizeTest extends ProducerConsumerBase { - - @BeforeMethod - @Override - protected void setup() throws Exception { - super.internalSetup(); - super.producerBaseSetup(); - } - - @AfterMethod(alwaysRun = true) - @Override - protected void cleanup() throws Exception { - super.internalCleanup(); - } +public class ProducerQueueSizeTest extends SharedPulsarBaseTest { @DataProvider(name = "matrix") public Object[][] matrix() { @@ -62,7 +48,7 @@ public void testRemoveMaxQueueLimit(boolean blockIfQueueFull, boolean partitione @Cleanup PulsarClient client = PulsarClient.builder() - .serviceUrl(brokerUrl.toString()) + .serviceUrl(getWebServiceUrl()) .memoryLimit(10, SizeUnit.KILO_BYTES) .build(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ReplicateSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ReplicateSubscriptionTest.java index 327081bf1b9c8..dd468c80d37eb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ReplicateSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ReplicateSubscriptionTest.java @@ -26,34 +26,15 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import lombok.Cleanup; +import org.apache.pulsar.broker.service.SharedPulsarBaseTest; import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.client.impl.ConsumerBuilderImpl; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; -public class ReplicateSubscriptionTest extends ProducerConsumerBase { - - @BeforeClass - @Override - protected void setup() throws Exception { - super.internalSetup(); - super.producerBaseSetup(); - } - - @AfterClass(alwaysRun = true) - @Override - protected void cleanup() throws Exception { - super.internalCleanup(); - } - - @Override - protected void doInitConf() throws Exception { - super.doInitConf(); - } +public class ReplicateSubscriptionTest extends SharedPulsarBaseTest { @DataProvider public Object[] replicateSubscriptionState() { @@ -67,7 +48,7 @@ public Object[] replicateSubscriptionState() { @Test(dataProvider = "replicateSubscriptionState") public void testReplicateSubscriptionState(Boolean replicateSubscriptionState) throws Exception { - String topic = "persistent://my-property/my-ns/" + System.nanoTime(); + String topic = newTopicName(); String subName = "sub-" + System.nanoTime(); ConsumerBuilder consumerBuilder = pulsarClient.newConsumer(Schema.STRING) .topic(topic) @@ -79,7 +60,7 @@ public void testReplicateSubscriptionState(Boolean replicateSubscriptionState) assertEquals(consumerBuilderImpl.getConf().getReplicateSubscriptionState(), replicateSubscriptionState); @Cleanup Consumer ignored = consumerBuilder.subscribe(); - CompletableFuture> topicIfExists = pulsar.getBrokerService().getTopicIfExists(topic); + CompletableFuture> topicIfExists = getTopicIfExists(topic); assertThat(topicIfExists) .succeedsWithin(3, TimeUnit.SECONDS) .matches(optionalTopic -> { From d5649023a7db6b9e78b1debc96570b1558a804ec Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Thu, 12 Mar 2026 14:52:54 -0700 Subject: [PATCH 2/5] [cleanup] Convert 13 test classes to SharedPulsarBaseTest CompactedOutBatchMessageTest, ConsumeBaseExceptionTest, ConsumerCloseTest, ConsumerDedupPermitsUpdateTest, ConsumerMemoryLimitTest, ConsumerUnsubscribeIntegrationTest, DispatchAccordingPermitsTest, ExposeMessageRedeliveryCountTest, HierarchyTopicAutoCreationTest, MemoryLimitTest, ProduceWithMessageIdTest, SimpleTypedProducerConsumerTest, TopicFromMessageTest --- .../ConsumerUnsubscribeIntegrationTest.java | 21 +-- .../api/ExposeMessageRedeliveryCountTest.java | 24 +-- .../pulsar/client/api/MemoryLimitTest.java | 22 +-- .../api/SimpleTypedProducerConsumerTest.java | 178 ++++++++++-------- .../impl/CompactedOutBatchMessageTest.java | 21 +-- .../client/impl/ConsumeBaseExceptionTest.java | 23 +-- .../pulsar/client/impl/ConsumerCloseTest.java | 26 +-- .../impl/ConsumerDedupPermitsUpdateTest.java | 21 +-- .../client/impl/ConsumerMemoryLimitTest.java | 21 +-- .../impl/DispatchAccordingPermitsTest.java | 21 +-- .../impl/HierarchyTopicAutoCreationTest.java | 23 +-- .../client/impl/ProduceWithMessageIdTest.java | 16 +- .../client/impl/TopicFromMessageTest.java | 42 +++-- 13 files changed, 173 insertions(+), 286 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/ConsumerUnsubscribeIntegrationTest.java b/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/ConsumerUnsubscribeIntegrationTest.java index 547958de8452d..b8cd072a1d14f 100644 --- a/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/ConsumerUnsubscribeIntegrationTest.java +++ b/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/ConsumerUnsubscribeIntegrationTest.java @@ -22,33 +22,18 @@ import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.service.SharedPulsarBaseTest; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.Schema; import org.awaitility.Awaitility; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @Slf4j @Test(groups = "broker-impl") -public class ConsumerUnsubscribeIntegrationTest extends ProducerConsumerBase { - - @BeforeClass(alwaysRun = true) - @Override - protected void setup() throws Exception { - super.internalSetup(); - super.producerBaseSetup(); - } - - @AfterClass(alwaysRun = true) - @Override - protected void cleanup() throws Exception { - super.internalCleanup(); - } +public class ConsumerUnsubscribeIntegrationTest extends SharedPulsarBaseTest { @Test public void testUnSubscribeWhenCursorNotExists() throws Exception { @@ -65,7 +50,7 @@ public void testUnSubscribeWhenCursorNotExists() throws Exception { consumer.acknowledge(consumer.receive(2, TimeUnit.SECONDS)); PersistentTopic persistentTopic = - (PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).join().get(); + (PersistentTopic) getTopic(topic, false).join().get(); ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger(); ManagedCursorImpl cursor = (ManagedCursorImpl) ml.getCursors().get(subscription); Awaitility.await().untilAsserted(() -> { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ExposeMessageRedeliveryCountTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ExposeMessageRedeliveryCountTest.java index a9e21f77aa143..0ddaad34e0d11 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ExposeMessageRedeliveryCountTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ExposeMessageRedeliveryCountTest.java @@ -21,32 +21,18 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; +import org.apache.pulsar.broker.service.SharedPulsarBaseTest; import org.apache.pulsar.client.admin.PulsarAdminException; import org.testng.Assert; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @Test(groups = "broker-api") -public class ExposeMessageRedeliveryCountTest extends ProducerConsumerBase { - - @BeforeMethod - @Override - protected void setup() throws Exception { - super.internalSetup(); - super.producerBaseSetup(); - } - - @AfterMethod(alwaysRun = true) - @Override - protected void cleanup() throws Exception { - super.internalCleanup(); - } +public class ExposeMessageRedeliveryCountTest extends SharedPulsarBaseTest { @Test(timeOut = 30000) public void testRedeliveryCount() throws PulsarClientException { - final String topic = "persistent://my-property/my-ns/redeliveryCount"; + final String topic = newTopicName(); Consumer consumer = pulsarClient.newConsumer(Schema.BYTES) .topic(topic) @@ -81,7 +67,7 @@ public void testRedeliveryCount() throws PulsarClientException { @Test(timeOut = 30000) public void testRedeliveryCountWithPartitionedTopic() throws PulsarClientException, PulsarAdminException { - final String topic = "persistent://my-property/my-ns/redeliveryCount.partitioned"; + final String topic = newTopicName(); admin.topics().createPartitionedTopic(topic, 3); @@ -119,7 +105,7 @@ public void testRedeliveryCountWithPartitionedTopic() throws PulsarClientExcepti @Test(timeOut = 30000) public void testRedeliveryCountWhenConsumerDisconnected() throws PulsarClientException { - String topic = "persistent://my-property/my-ns/testRedeliveryCountWhenConsumerDisconnected"; + String topic = newTopicName(); Consumer consumer0 = pulsarClient.newConsumer(Schema.STRING) .topic(topic) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MemoryLimitTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MemoryLimitTest.java index 60eb79e77bc88..6ed5a1118b16e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MemoryLimitTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MemoryLimitTest.java @@ -23,17 +23,16 @@ import java.time.Duration; import java.util.concurrent.TimeUnit; import lombok.Cleanup; +import org.apache.pulsar.broker.service.SharedPulsarBaseTest; import org.apache.pulsar.client.api.PulsarClientException.MemoryBufferIsFullError; import org.apache.pulsar.client.impl.ProducerImpl; import org.apache.pulsar.client.impl.PulsarTestClient; import org.awaitility.Awaitility; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @Test(groups = "broker-api") -public class MemoryLimitTest extends ProducerConsumerBase { +public class MemoryLimitTest extends SharedPulsarBaseTest { @DataProvider(name = "batchingAndMemoryLimit") public Object[][] provider() { @@ -44,26 +43,13 @@ public Object[][] provider() { }; } - @BeforeMethod - @Override - protected void setup() throws Exception { - super.internalSetup(); - super.producerBaseSetup(); - } - - @AfterMethod(alwaysRun = true) - @Override - protected void cleanup() throws Exception { - super.internalCleanup(); - } - @Test(dataProvider = "batchingAndMemoryLimit") public void testRejectMessages(boolean batching, int memoryLimit) throws Exception { String topic = newTopicName(); ClientBuilder clientBuilder = PulsarClient.builder() - .serviceUrl(pulsar.getBrokerServiceUrl()) + .serviceUrl(getBrokerServiceUrl()) .memoryLimit(memoryLimit, SizeUnit.KILO_BYTES); @Cleanup @@ -120,7 +106,7 @@ public void testRejectMessagesOnMultipleTopics(boolean batching, int memoryLimit String t2 = newTopicName(); ClientBuilder clientBuilder = PulsarClient.builder() - .serviceUrl(pulsar.getBrokerServiceUrl()) + .serviceUrl(getBrokerServiceUrl()) .memoryLimit(memoryLimit, SizeUnit.KILO_BYTES); @Cleanup diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java index 293d3adcb876b..0477856359659 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java @@ -33,6 +33,8 @@ import java.util.concurrent.TimeUnit; import lombok.Cleanup; import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.service.SharedPulsarBaseTest; +import org.apache.pulsar.broker.service.SharedPulsarCluster; import org.apache.pulsar.broker.service.schema.SchemaRegistry; import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException; import org.apache.pulsar.client.api.schema.GenericRecord; @@ -46,43 +48,37 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @Test(groups = "broker-api") -public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase { +public class SimpleTypedProducerConsumerTest extends SharedPulsarBaseTest { private static final Logger log = LoggerFactory.getLogger(SimpleTypedProducerConsumerTest.class); - @BeforeMethod - @Override - protected void setup() throws Exception { - super.internalSetup(); - super.producerBaseSetup(); - } - - @AfterMethod(alwaysRun = true) - @Override - protected void cleanup() throws Exception { - super.internalCleanup(); + private void testMessageOrderAndDuplicates(Set messagesReceived, T receivedMessage, T expectedMessage) { + Assert.assertEquals(receivedMessage, expectedMessage, + "Received message " + receivedMessage + " did not match the expected message " + expectedMessage); + Assert.assertTrue(messagesReceived.add(receivedMessage), "Received duplicate message " + receivedMessage); } @Test public void testJsonProducerAndConsumer() throws Exception { - log.info("-- Starting {} test --", methodName); + log.info("-- Starting {} test --", "testJsonProducerAndConsumer"); + + final String topic = newTopicName(); + final String schemaKey = topic.replace("persistent://", ""); JSONSchema jsonSchema = JSONSchema.of(SchemaDefinition.builder().withPojo(JsonEncodedPojo.class).build()); Consumer consumer = pulsarClient .newConsumer(jsonSchema) - .topic("persistent://my-property/my-ns/my-topic1") + .topic(topic) .subscriptionName("my-subscriber-name") .subscribe(); Producer producer = pulsarClient .newProducer(jsonSchema) - .topic("persistent://my-property/my-ns/my-topic1") + .topic(topic) .create(); for (int i = 0; i < 10; i++) { @@ -103,24 +99,28 @@ public void testJsonProducerAndConsumer() throws Exception { consumer.acknowledgeCumulative(msg); consumer.close(); - SchemaRegistry.SchemaAndMetadata storedSchema = pulsar.getSchemaRegistryService() - .getSchema("my-property/my-ns/my-topic1") + SchemaRegistry.SchemaAndMetadata storedSchema = SharedPulsarCluster.get().getPulsarService() + .getSchemaRegistryService() + .getSchema(schemaKey) .get(); Assert.assertEquals(storedSchema.schema.getData(), jsonSchema.getSchemaInfo().getSchema()); - log.info("-- Exiting {} test --", methodName); + log.info("-- Exiting {} test --", "testJsonProducerAndConsumer"); } @Test public void testJsonProducerAndConsumerWithPrestoredSchema() throws Exception { - log.info("-- Starting {} test --", methodName); + log.info("-- Starting {} test --", "testJsonProducerAndConsumerWithPrestoredSchema"); + + final String topic = newTopicName(); + final String schemaKey = topic.replace("persistent://", ""); JSONSchema jsonSchema = JSONSchema.of(SchemaDefinition.builder().withPojo(JsonEncodedPojo.class).build()); - pulsar.getSchemaRegistryService() - .putSchemaIfAbsent("my-property/my-ns/my-topic1", + SharedPulsarCluster.get().getPulsarService().getSchemaRegistryService() + .putSchemaIfAbsent(schemaKey, SchemaData.builder() .type(SchemaType.JSON) .isDeleted(false) @@ -134,36 +134,40 @@ public void testJsonProducerAndConsumerWithPrestoredSchema() throws Exception { Consumer consumer = pulsarClient .newConsumer(jsonSchema) - .topic("persistent://my-property/my-ns/my-topic1") + .topic(topic) .subscriptionName("my-subscriber-name") .subscribe(); Producer producer = pulsarClient .newProducer(jsonSchema) - .topic("persistent://my-property/my-ns/my-topic1") + .topic(topic) .create(); consumer.close(); producer.close(); - SchemaRegistry.SchemaAndMetadata storedSchema = pulsar.getSchemaRegistryService() - .getSchema("my-property/my-ns/my-topic1") + SchemaRegistry.SchemaAndMetadata storedSchema = SharedPulsarCluster.get().getPulsarService() + .getSchemaRegistryService() + .getSchema(schemaKey) .get(); Assert.assertEquals(storedSchema.schema.getData(), jsonSchema.getSchemaInfo().getSchema()); - log.info("-- Exiting {} test --", methodName); + log.info("-- Exiting {} test --", "testJsonProducerAndConsumerWithPrestoredSchema"); } @Test public void testWrongCorruptedSchema() throws Exception { - log.info("-- Starting {} test --", methodName); + log.info("-- Starting {} test --", "testWrongCorruptedSchema"); + + final String topic = newTopicName(); + final String schemaKey = topic.replace("persistent://", ""); byte[] randomSchemaBytes = "hello".getBytes(); try { - pulsar.getSchemaRegistryService() - .putSchemaIfAbsent("my-property/my-ns/my-topic1", + SharedPulsarCluster.get().getPulsarService().getSchemaRegistryService() + .putSchemaIfAbsent(schemaKey, SchemaData.builder() .type(SchemaType.JSON) .isDeleted(false) @@ -179,25 +183,28 @@ public void testWrongCorruptedSchema() throws Exception { assertTrue(e.getCause() instanceof InvalidSchemaDataException); } - log.info("-- Exiting {} test --", methodName); + log.info("-- Exiting {} test --", "testWrongCorruptedSchema"); } @Test public void testProtobufProducerAndConsumer() throws Exception { - log.info("-- Starting {} test --", methodName); + log.info("-- Starting {} test --", "testProtobufProducerAndConsumer"); + + final String topic = newTopicName(); + final String schemaKey = topic.replace("persistent://", ""); ProtobufSchema protobufSchema = ProtobufSchema.of(org.apache.pulsar.client.api.schema.proto.Test.TestMessage.class); Consumer consumer = pulsarClient .newConsumer(protobufSchema) - .topic("persistent://my-property/my-ns/my-topic1") + .topic(topic) .subscriptionName("my-subscriber-name") .subscribe(); Producer producer = pulsarClient .newProducer(protobufSchema) - .topic("persistent://my-property/my-ns/my-topic1") + .topic(topic) .create(); for (int i = 0; i < 10; i++) { @@ -222,24 +229,28 @@ public void testProtobufProducerAndConsumer() throws Exception { consumer.acknowledgeCumulative(msg); consumer.close(); - SchemaRegistry.SchemaAndMetadata storedSchema = pulsar.getSchemaRegistryService() - .getSchema("my-property/my-ns/my-topic1") + SchemaRegistry.SchemaAndMetadata storedSchema = SharedPulsarCluster.get().getPulsarService() + .getSchemaRegistryService() + .getSchema(schemaKey) .get(); Assert.assertEquals(storedSchema.schema.getData(), protobufSchema.getSchemaInfo().getSchema()); - log.info("-- Exiting {} test --", methodName); + log.info("-- Exiting {} test --", "testProtobufProducerAndConsumer"); } @Test(expectedExceptions = {PulsarClientException.class}) public void testProtobufConsumerWithWrongPrestoredSchema() throws Exception { - log.info("-- Starting {} test --", methodName); + log.info("-- Starting {} test --", "testProtobufConsumerWithWrongPrestoredSchema"); + + final String topic = newTopicName(); + final String schemaKey = topic.replace("persistent://", ""); ProtobufSchema schema = ProtobufSchema.of(org.apache.pulsar.client.api.schema.proto.Test.TestMessage.class); - pulsar.getSchemaRegistryService() - .putSchemaIfAbsent("my-property/my-ns/my-topic1", + SharedPulsarCluster.get().getPulsarService().getSchemaRegistryService() + .putSchemaIfAbsent(schemaKey, SchemaData.builder() .type(SchemaType.PROTOBUF) .isDeleted(false) @@ -255,16 +266,19 @@ public void testProtobufConsumerWithWrongPrestoredSchema() throws Exception { .newConsumer(AvroSchema.of (SchemaDefinition.builder(). withPojo(org.apache.pulsar.client.api.schema.proto.Test.TestMessageWrong.class).build())) - .topic("persistent://my-property/my-ns/my-topic1") + .topic(topic) .subscriptionName("my-subscriber-name") .subscribe(); - log.info("-- Exiting {} test --", methodName); + log.info("-- Exiting {} test --", "testProtobufConsumerWithWrongPrestoredSchema"); } @Test public void testAvroProducerAndConsumer() throws Exception { - log.info("-- Starting {} test --", methodName); + log.info("-- Starting {} test --", "testAvroProducerAndConsumer"); + + final String topic = newTopicName(); + final String schemaKey = topic.replace("persistent://", ""); AvroSchema avroSchema = AvroSchema.of(SchemaDefinition.builder(). @@ -272,13 +286,13 @@ public void testAvroProducerAndConsumer() throws Exception { Consumer consumer = pulsarClient .newConsumer(avroSchema) - .topic("persistent://my-property/my-ns/my-topic1") + .topic(topic) .subscriptionName("my-subscriber-name") .subscribe(); Producer producer = pulsarClient .newProducer(avroSchema) - .topic("persistent://my-property/my-ns/my-topic1") + .topic(topic) .create(); for (int i = 0; i < 10; i++) { @@ -299,19 +313,23 @@ public void testAvroProducerAndConsumer() throws Exception { consumer.acknowledgeCumulative(msg); consumer.close(); - SchemaRegistry.SchemaAndMetadata storedSchema = pulsar.getSchemaRegistryService() - .getSchema("my-property/my-ns/my-topic1") + SchemaRegistry.SchemaAndMetadata storedSchema = SharedPulsarCluster.get().getPulsarService() + .getSchemaRegistryService() + .getSchema(schemaKey) .get(); Assert.assertEquals(storedSchema.schema.getData(), avroSchema.getSchemaInfo().getSchema()); - log.info("-- Exiting {} test --", methodName); + log.info("-- Exiting {} test --", "testAvroProducerAndConsumer"); } @Test(expectedExceptions = {PulsarClientException.class}) public void testAvroConsumerWithWrongRestoredSchema() throws Exception { - log.info("-- Starting {} test --", methodName); + log.info("-- Starting {} test --", "testAvroConsumerWithWrongRestoredSchema"); + + final String topic = newTopicName(); + final String schemaKey = topic.replace("persistent://", ""); byte[] randomSchemaBytes = ("{\n" + " \"type\": \"record\",\n" @@ -323,8 +341,8 @@ public void testAvroConsumerWithWrongRestoredSchema() throws Exception { + " ]\n" + "} ").getBytes(); - pulsar.getSchemaRegistryService() - .putSchemaIfAbsent("my-property/my-ns/my-topic1", + SharedPulsarCluster.get().getPulsarService().getSchemaRegistryService() + .putSchemaIfAbsent(schemaKey, SchemaData.builder() .type(SchemaType.AVRO) .isDeleted(false) @@ -338,11 +356,11 @@ public void testAvroConsumerWithWrongRestoredSchema() throws Exception { Consumer consumer = pulsarClient .newConsumer(AvroSchema.of(SchemaDefinition.builder(). withPojo(AvroEncodedPojo.class).withAlwaysAllowNull(false).build())) - .topic("persistent://my-property/my-ns/my-topic1") + .topic(topic) .subscriptionName("my-subscriber-name") .subscribe(); - log.info("-- Exiting {} test --", methodName); + log.info("-- Exiting {} test --", "testAvroConsumerWithWrongRestoredSchema"); } public static class AvroEncodedPojo { @@ -433,7 +451,10 @@ public String toString() { @Test public void testAvroProducerAndAutoSchemaConsumer() throws Exception { - log.info("-- Starting {} test --", methodName); + log.info("-- Starting {} test --", "testAvroProducerAndAutoSchemaConsumer"); + + final String topic = newTopicName(); + final String schemaKey = topic.replace("persistent://", ""); AvroSchema avroSchema = AvroSchema.of(SchemaDefinition.builder(). @@ -441,7 +462,7 @@ public void testAvroProducerAndAutoSchemaConsumer() throws Exception { Producer producer = pulsarClient .newProducer(avroSchema) - .topic("persistent://my-property/my-ns/my-topic1") + .topic(topic) .create(); for (int i = 0; i < 10; i++) { @@ -451,7 +472,7 @@ public void testAvroProducerAndAutoSchemaConsumer() throws Exception { Consumer consumer = pulsarClient .newConsumer(Schema.AUTO_CONSUME()) - .topic("persistent://my-property/my-ns/my-topic1") + .topic(topic) .subscriptionName("my-subscriber-name") .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .subscribe(); @@ -470,19 +491,23 @@ public void testAvroProducerAndAutoSchemaConsumer() throws Exception { consumer.acknowledgeCumulative(msg); consumer.close(); - SchemaRegistry.SchemaAndMetadata storedSchema = pulsar.getSchemaRegistryService() - .getSchema("my-property/my-ns/my-topic1") + SchemaRegistry.SchemaAndMetadata storedSchema = SharedPulsarCluster.get().getPulsarService() + .getSchemaRegistryService() + .getSchema(schemaKey) .get(); Assert.assertEquals(storedSchema.schema.getData(), avroSchema.getSchemaInfo().getSchema()); - log.info("-- Exiting {} test --", methodName); + log.info("-- Exiting {} test --", "testAvroProducerAndAutoSchemaConsumer"); } @Test public void testAvroProducerAndAutoSchemaReader() throws Exception { - log.info("-- Starting {} test --", methodName); + log.info("-- Starting {} test --", "testAvroProducerAndAutoSchemaReader"); + + final String topic = newTopicName(); + final String schemaKey = topic.replace("persistent://", ""); AvroSchema avroSchema = AvroSchema.of(SchemaDefinition.builder(). @@ -490,7 +515,7 @@ public void testAvroProducerAndAutoSchemaReader() throws Exception { Producer producer = pulsarClient .newProducer(avroSchema) - .topic("persistent://my-property/my-ns/my-topic1") + .topic(topic) .create(); for (int i = 0; i < 10; i++) { @@ -500,7 +525,7 @@ public void testAvroProducerAndAutoSchemaReader() throws Exception { Reader reader = pulsarClient .newReader(Schema.AUTO_CONSUME()) - .topic("persistent://my-property/my-ns/my-topic1") + .topic(topic) .startMessageId(MessageId.earliest) .create(); @@ -517,19 +542,23 @@ public void testAvroProducerAndAutoSchemaReader() throws Exception { // Acknowledge the consumption of all messages at once reader.close(); - SchemaRegistry.SchemaAndMetadata storedSchema = pulsar.getSchemaRegistryService() - .getSchema("my-property/my-ns/my-topic1") + SchemaRegistry.SchemaAndMetadata storedSchema = SharedPulsarCluster.get().getPulsarService() + .getSchemaRegistryService() + .getSchema(schemaKey) .get(); Assert.assertEquals(storedSchema.schema.getData(), avroSchema.getSchemaInfo().getSchema()); - log.info("-- Exiting {} test --", methodName); + log.info("-- Exiting {} test --", "testAvroProducerAndAutoSchemaReader"); } @Test public void testAutoBytesProducer() throws Exception { - log.info("-- Starting {} test --", methodName); + log.info("-- Starting {} test --", "testAutoBytesProducer"); + + final String topic = newTopicName(); + final String schemaKey = topic.replace("persistent://", ""); AvroSchema avroSchema = AvroSchema.of(SchemaDefinition.builder(). @@ -537,7 +566,7 @@ public void testAutoBytesProducer() throws Exception { try (Producer producer = pulsarClient .newProducer(avroSchema) - .topic("persistent://my-property/my-ns/my-topic1") + .topic(topic) .create()) { for (int i = 0; i < 10; i++) { String message = "my-message-" + i; @@ -547,7 +576,7 @@ public void testAutoBytesProducer() throws Exception { try (Producer producer = pulsarClient .newProducer(Schema.AUTO_PRODUCE_BYTES()) - .topic("persistent://my-property/my-ns/my-topic1") + .topic(topic) .create()) { // try to produce junk data for (int i = 10; i < 20; i++) { @@ -572,7 +601,7 @@ public void testAutoBytesProducer() throws Exception { Consumer consumer = pulsarClient .newConsumer(Schema.AUTO_CONSUME()) - .topic("persistent://my-property/my-ns/my-topic1") + .topic(topic) .subscriptionName("my-subscriber-name") .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .subscribe(); @@ -591,19 +620,20 @@ public void testAutoBytesProducer() throws Exception { consumer.acknowledgeCumulative(msg); consumer.close(); - SchemaRegistry.SchemaAndMetadata storedSchema = pulsar.getSchemaRegistryService() - .getSchema("my-property/my-ns/my-topic1") + SchemaRegistry.SchemaAndMetadata storedSchema = SharedPulsarCluster.get().getPulsarService() + .getSchemaRegistryService() + .getSchema(schemaKey) .get(); Assert.assertEquals(storedSchema.schema.getData(), avroSchema.getSchemaInfo().getSchema()); - log.info("-- Exiting {} test --", methodName); + log.info("-- Exiting {} test --", "testAutoBytesProducer"); } @Test public void testMessageBuilderLoadConf() throws Exception { - String topic = BrokerTestUtil.newUniqueName("my-topic"); + String topic = newTopicName(); @Cleanup Consumer consumer = pulsarClient.newConsumer(Schema.STRING) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactedOutBatchMessageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactedOutBatchMessageTest.java index 0eff19d31a364..56bf88c64299f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactedOutBatchMessageTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactedOutBatchMessageTest.java @@ -22,36 +22,21 @@ import static org.testng.Assert.assertEquals; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import org.apache.pulsar.broker.service.SharedPulsarBaseTest; import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.common.api.proto.BrokerEntryMetadata; import org.apache.pulsar.common.api.proto.MessageIdData; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.api.proto.SingleMessageMetadata; import org.apache.pulsar.common.protocol.Commands; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @Test(groups = "broker-impl") -public class CompactedOutBatchMessageTest extends ProducerConsumerBase { - - @BeforeMethod - @Override - protected void setup() throws Exception { - super.internalSetup(); - producerBaseSetup(); - } - - @AfterMethod(alwaysRun = true) - @Override - protected void cleanup() throws Exception { - super.internalCleanup(); - } +public class CompactedOutBatchMessageTest extends SharedPulsarBaseTest { @Test public void testCompactedOutMessages() throws Exception { - final String topic1 = "persistent://my-property/my-ns/my-topic"; + final String topic1 = newTopicName(); BrokerEntryMetadata brokerEntryMetadata = new BrokerEntryMetadata().setBrokerTimestamp(1).setBrokerTimestamp(1); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumeBaseExceptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumeBaseExceptionTest.java index ef1c993642b4d..e64871f9d750a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumeBaseExceptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumeBaseExceptionTest.java @@ -18,33 +18,18 @@ */ package org.apache.pulsar.client.impl; +import org.apache.pulsar.broker.service.SharedPulsarBaseTest; import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClientException; import org.testng.Assert; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @Test(groups = "broker-impl") -public class ConsumeBaseExceptionTest extends ProducerConsumerBase { - - @BeforeMethod - @Override - protected void setup() throws Exception { - super.internalSetup(); - producerBaseSetup(); - } - - @AfterMethod(alwaysRun = true) - @Override - protected void cleanup() throws Exception { - super.internalCleanup(); - } +public class ConsumeBaseExceptionTest extends SharedPulsarBaseTest { @Test public void testClosedConsumer() throws PulsarClientException { - Consumer consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/topicName") + Consumer consumer = pulsarClient.newConsumer().topic(newTopicName()) .subscriptionName("my-subscription").subscribe(); consumer.close(); Assert.assertTrue(consumer.receiveAsync().isCompletedExceptionally()); @@ -62,7 +47,7 @@ public void testClosedConsumer() throws PulsarClientException { @Test public void testListener() throws PulsarClientException { - Consumer consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/topicName") + Consumer consumer = pulsarClient.newConsumer().topic(newTopicName()) .subscriptionName("my-subscription").messageListener((consumer1, msg) -> { }).subscribe(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerCloseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerCloseTest.java index b9355d19c2767..6d29e30f46be4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerCloseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerCloseTest.java @@ -21,36 +21,20 @@ import static org.testng.Assert.assertTrue; import java.util.concurrent.CompletableFuture; import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.service.SharedPulsarBaseTest; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; import org.awaitility.Awaitility; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @Slf4j @Test(groups = "broker-api") -public class ConsumerCloseTest extends ProducerConsumerBase { - - @BeforeClass - @Override - protected void setup() throws Exception { - super.internalSetup(); - super.producerBaseSetup(); - } - - @AfterClass(alwaysRun = true) - @Override - protected void cleanup() throws Exception { - super.internalCleanup(); - } +public class ConsumerCloseTest extends SharedPulsarBaseTest { @Test public void testReceiveWillDoneAfterClosedConsumer() throws Exception { - String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + String tpName = newTopicName(); String subName = "test-sub"; admin.topics().createNonPartitionedTopic(tpName); admin.topics().createSubscription(tpName, subName, MessageId.earliest); @@ -65,10 +49,10 @@ public void testReceiveWillDoneAfterClosedConsumer() throws Exception { @Test public void testReceiveWillDoneAfterTopicDeleted() throws Exception { - String namespace = "public/default"; + String namespace = getNamespace(); admin.namespaces().setAutoTopicCreation(namespace, AutoTopicCreationOverride.builder() .allowAutoTopicCreation(false).build()); - String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + String tpName = newTopicName(); String subName = "test-sub"; admin.topics().createNonPartitionedTopic(tpName); admin.topics().createSubscription(tpName, subName, MessageId.earliest); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdateTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdateTest.java index 612af814d3a11..bc608cef7cd7e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdateTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdateTest.java @@ -23,31 +23,16 @@ import java.util.concurrent.TimeUnit; import lombok.Cleanup; import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.service.SharedPulsarBaseTest; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.Schema; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @Test(groups = "broker-impl") -public class ConsumerDedupPermitsUpdateTest extends ProducerConsumerBase { - - @BeforeClass - @Override - protected void setup() throws Exception { - super.internalSetup(); - producerBaseSetup(); - } - - @AfterClass(alwaysRun = true) - @Override - protected void cleanup() throws Exception { - super.internalCleanup(); - } +public class ConsumerDedupPermitsUpdateTest extends SharedPulsarBaseTest { @DataProvider(name = "combinations") public Object[][] combinations() { @@ -65,7 +50,7 @@ public Object[][] combinations() { @Test(timeOut = 30000, dataProvider = "combinations") public void testConsumerDedup(boolean batchingEnabled, int receiverQueueSize) throws Exception { - String topic = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/my-topic"); + String topic = newTopicName(); @Cleanup Consumer consumer = pulsarClient.newConsumer(Schema.STRING) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerMemoryLimitTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerMemoryLimitTest.java index cf647eeda2e9f..f3bf80a646cad 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerMemoryLimitTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerMemoryLimitTest.java @@ -20,40 +20,25 @@ import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.service.SharedPulsarBaseTest; import org.apache.pulsar.client.api.ClientBuilder; -import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SizeUnit; import org.awaitility.Awaitility; import org.testng.Assert; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @Test(groups = "broker-impl") @Slf4j -public class ConsumerMemoryLimitTest extends ProducerConsumerBase { - - @BeforeMethod - @Override - protected void setup() throws Exception { - super.internalSetup(); - super.producerBaseSetup(); - } - - @AfterMethod(alwaysRun = true) - @Override - protected void cleanup() throws Exception { - super.internalCleanup(); - } +public class ConsumerMemoryLimitTest extends SharedPulsarBaseTest { @Test public void testConsumerMemoryLimit() throws Exception { String topic = newTopicName(); ClientBuilder clientBuilder = PulsarClient.builder() - .serviceUrl(pulsar.getBrokerServiceUrl()) + .serviceUrl(getBrokerServiceUrl()) .memoryLimit(10, SizeUnit.KILO_BYTES); @Cleanup diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/DispatchAccordingPermitsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/DispatchAccordingPermitsTest.java index 878a368e4736b..f989a60d36c09 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/DispatchAccordingPermitsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/DispatchAccordingPermitsTest.java @@ -19,36 +19,21 @@ package org.apache.pulsar.client.impl; import java.util.concurrent.TimeUnit; +import org.apache.pulsar.broker.service.SharedPulsarBaseTest; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.policies.data.TopicStats; import org.testng.Assert; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @Test(groups = "broker-impl") -public class DispatchAccordingPermitsTest extends ProducerConsumerBase { - - @Override - @BeforeMethod - public void setup() throws Exception { - super.internalSetup(); - super.producerBaseSetup(); - } - - @Override - @AfterMethod(alwaysRun = true) - public void cleanup() throws Exception { - super.internalCleanup(); - } +public class DispatchAccordingPermitsTest extends SharedPulsarBaseTest { /** * The test case is to simulate dispatch batches with different batch size to the consumer. @@ -59,7 +44,7 @@ public void cleanup() throws Exception { */ @Test public void testFlowPermitsWithMultiBatchesDispatch() throws PulsarAdminException, PulsarClientException { - final String topic = "persistent://public/default/testFlowPermitsWithMultiBatchesDispatch"; + final String topic = newTopicName(); final String subName = "test"; admin.topics().createSubscription(topic, "test", MessageId.earliest); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/HierarchyTopicAutoCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/HierarchyTopicAutoCreationTest.java index f7e6f1c60d21c..662b18420e30e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/HierarchyTopicAutoCreationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/HierarchyTopicAutoCreationTest.java @@ -23,33 +23,19 @@ import lombok.Cleanup; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.service.SharedPulsarBaseTest; +import org.apache.pulsar.broker.service.SharedPulsarCluster; import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.metadata.api.MetadataCache; import org.testng.Assert; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @Test(groups = "broker-impl") @Slf4j -public class HierarchyTopicAutoCreationTest extends ProducerConsumerBase { - - @Override - @BeforeMethod - protected void setup() throws Exception { - super.internalSetup(); - super.producerBaseSetup(); - } - - @Override - @AfterMethod(alwaysRun = true) - protected void cleanup() throws Exception { - super.internalCleanup(); - } +public class HierarchyTopicAutoCreationTest extends SharedPulsarBaseTest { @Test(invocationCount = 3) @SneakyThrows @@ -69,7 +55,8 @@ public void testPartitionedTopicAutoCreation() { .getAutoTopicCreation(namespace); Assert.assertEquals(nsAutoTopicCreationOverride, expectedPolicies); // Background invalidate cache - final MetadataCache nsCache = pulsar.getPulsarResources().getNamespaceResources().getCache(); + final MetadataCache nsCache = SharedPulsarCluster.get().getPulsarService() + .getPulsarResources().getNamespaceResources().getCache(); @Cleanup("interrupt") final Thread t1 = new Thread(() -> { while (!Thread.currentThread().isInterrupted()) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProduceWithMessageIdTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProduceWithMessageIdTest.java index bb69c0daefcea..ff72f23b0a2bd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProduceWithMessageIdTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProduceWithMessageIdTest.java @@ -27,9 +27,9 @@ import java.util.concurrent.atomic.AtomicReference; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.service.SharedPulsarBaseTest; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MockBrokerService; -import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.api.proto.MessageMetadata; @@ -42,25 +42,21 @@ @Test(groups = "broker-impl") @Slf4j -public class ProduceWithMessageIdTest extends ProducerConsumerBase { +public class ProduceWithMessageIdTest extends SharedPulsarBaseTest { MockBrokerService mockBrokerService; @BeforeClass(alwaysRun = true) - public void setup() throws Exception { + public void setupMockBroker() throws Exception { mockBrokerService = new MockBrokerService(); mockBrokerService.start(); - super.internalSetup(); - super.producerBaseSetup(); } - @Override @AfterClass(alwaysRun = true) - public void cleanup() throws Exception { + public void cleanupMockBroker() throws Exception { if (mockBrokerService != null) { mockBrokerService.stop(); mockBrokerService = null; } - super.internalCleanup(); } @Test @@ -81,7 +77,7 @@ public void testSend() throws Exception { .serviceUrl(mockBrokerService.getBrokerAddress()) .build(); - String topic = "persistent://public/default/t1"; + String topic = newTopicName(); ProducerImpl producer = (ProducerImpl) client.newProducer().topic(topic).enableBatching(false).create(); @@ -129,7 +125,7 @@ public void sendWithCallBack() throws Exception { int batchSize = 10; - String topic = "persistent://public/default/testSendWithCallBack"; + String topic = newTopicName(); ProducerImpl producer = (ProducerImpl) pulsarClient.newProducer().topic(topic) .enableBatching(true) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicFromMessageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicFromMessageTest.java index 2856de00bf656..68f0f6efedb80 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicFromMessageTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicFromMessageTest.java @@ -21,31 +21,38 @@ import com.google.common.collect.Lists; import java.util.HashSet; import java.util.Set; +import org.apache.pulsar.broker.service.SharedPulsarBaseTest; +import org.apache.pulsar.broker.service.SharedPulsarCluster; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.testng.Assert; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; +import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @Test(groups = "broker-impl") -public class TopicFromMessageTest extends ProducerConsumerBase { +public class TopicFromMessageTest extends SharedPulsarBaseTest { private static final long TEST_TIMEOUT = 90000; // 1.5 min private static final int BATCHING_MAX_MESSAGES_THRESHOLD = 2; @Override - @BeforeMethod - public void setup() throws Exception { - super.internalSetup(); - super.producerBaseSetup(); - } - - @Override - @AfterMethod(alwaysRun = true) - public void cleanup() throws Exception { - super.internalCleanup(); + @BeforeClass(alwaysRun = true) + public void setupSharedCluster() throws Exception { + super.setupSharedCluster(); + // These tests use short topic names (e.g. "topic1") which resolve to public/default + try { + admin.tenants().createTenant("public", + new TenantInfoImpl(Set.of(), Set.of(SharedPulsarCluster.CLUSTER_NAME))); + } catch (Exception e) { + // tenant may already exist + } + try { + admin.namespaces().createNamespace("public/default", + Set.of(SharedPulsarCluster.CLUSTER_NAME)); + } catch (Exception e) { + // namespace may already exist + } } @Test(timeOut = TEST_TIMEOUT) @@ -61,12 +68,13 @@ public void testSingleTopicConsumerNoBatchShortName() throws Exception { @Test(timeOut = TEST_TIMEOUT) public void testSingleTopicConsumerNoBatchFullName() throws Exception { + final String topic = newTopicName(); try (Consumer consumer = pulsarClient.newConsumer() - .topic("my-property/my-ns/topic1").subscriptionName("sub1").subscribe(); + .topic(topic).subscriptionName("sub1").subscribe(); Producer producer = pulsarClient.newProducer() - .topic("my-property/my-ns/topic1").enableBatching(false).create()) { + .topic(topic).enableBatching(false).create()) { producer.send("foobar".getBytes()); - Assert.assertEquals(consumer.receive().getTopicName(), "persistent://my-property/my-ns/topic1"); + Assert.assertEquals(consumer.receive().getTopicName(), topic); } } From 21df6fe7682dd59ad2ea520faea4627f470c71e3 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Mon, 16 Mar 2026 21:47:41 -0700 Subject: [PATCH 3/5] Remove unused BrokerTestUtil imports --- .../pulsar/client/api/SimpleTypedProducerConsumerTest.java | 1 - .../pulsar/client/impl/ConsumerDedupPermitsUpdateTest.java | 1 - 2 files changed, 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java index 0477856359659..d03f6746d1eff 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java @@ -32,7 +32,6 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import lombok.Cleanup; -import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.service.SharedPulsarBaseTest; import org.apache.pulsar.broker.service.SharedPulsarCluster; import org.apache.pulsar.broker.service.schema.SchemaRegistry; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdateTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdateTest.java index bc608cef7cd7e..d5fd0bf1a1895 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdateTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdateTest.java @@ -22,7 +22,6 @@ import static org.testng.Assert.assertNull; import java.util.concurrent.TimeUnit; import lombok.Cleanup; -import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.service.SharedPulsarBaseTest; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; From b76373c6d4ced867450f5d4b4243d3a2fd5d85ad Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Tue, 17 Mar 2026 08:52:39 -0700 Subject: [PATCH 4/5] Fix HierarchyTopicAutoCreationTest to use base class namespace --- .../client/impl/HierarchyTopicAutoCreationTest.java | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/HierarchyTopicAutoCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/HierarchyTopicAutoCreationTest.java index 662b18420e30e..133cf1ad28f3e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/HierarchyTopicAutoCreationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/HierarchyTopicAutoCreationTest.java @@ -19,7 +19,6 @@ package org.apache.pulsar.client.impl; import java.util.List; -import java.util.UUID; import lombok.Cleanup; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; @@ -40,9 +39,7 @@ public class HierarchyTopicAutoCreationTest extends SharedPulsarBaseTest { @Test(invocationCount = 3) @SneakyThrows public void testPartitionedTopicAutoCreation() { - // Create namespace - final String namespace = "public/testPartitionedTopicAutoCreation"; - admin.namespaces().createNamespace(namespace); + final String namespace = getNamespace(); // Set policies final AutoTopicCreationOverride expectedPolicies = AutoTopicCreationOverride.builder() .allowAutoTopicCreation(true) @@ -66,7 +63,7 @@ public void testPartitionedTopicAutoCreation() { t1.start(); // trigger auto-creation - final String topicName = "persistent://" + namespace + "/test-" + UUID.randomUUID(); + final String topicName = newTopicName(); @Cleanup final Producer producer = pulsarClient.newProducer() .topic(topicName) .create(); @@ -76,7 +73,8 @@ public void testPartitionedTopicAutoCreation() { TopicName.get(topicName).getPartition(0).toString()); // expect partitioned topic // double-check policies - final AutoTopicCreationOverride actualPolicies2 = admin.namespaces().getAutoTopicCreation(namespace); + final AutoTopicCreationOverride actualPolicies2 = admin.namespaces() + .getAutoTopicCreation(namespace); Assert.assertEquals(actualPolicies2, expectedPolicies); } } From 3224b818bf4094acadde340e4689355d5eb2eba2 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Tue, 17 Mar 2026 09:00:04 -0700 Subject: [PATCH 5/5] Fix ConsumerUnsubscribeIntegrationTest to use newTopicName() --- .../mledger/impl/ConsumerUnsubscribeIntegrationTest.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/ConsumerUnsubscribeIntegrationTest.java b/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/ConsumerUnsubscribeIntegrationTest.java index b8cd072a1d14f..ef33a61139d35 100644 --- a/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/ConsumerUnsubscribeIntegrationTest.java +++ b/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/ConsumerUnsubscribeIntegrationTest.java @@ -21,7 +21,6 @@ import static org.testng.Assert.assertEquals; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.service.SharedPulsarBaseTest; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.Consumer; @@ -37,7 +36,7 @@ public class ConsumerUnsubscribeIntegrationTest extends SharedPulsarBaseTest { @Test public void testUnSubscribeWhenCursorNotExists() throws Exception { - final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + final String topic = newTopicName(); final String subscription = "s1"; admin.topics().createNonPartitionedTopic(topic); admin.topics().createSubscription(topic, subscription, MessageId.earliest);