Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,38 +21,22 @@
import static org.testng.Assert.assertEquals;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.Schema;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Slf4j
@Test(groups = "broker-impl")
public class ConsumerUnsubscribeIntegrationTest extends ProducerConsumerBase {

@BeforeClass(alwaysRun = true)
@Override
protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
}

@AfterClass(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}
public class ConsumerUnsubscribeIntegrationTest extends SharedPulsarBaseTest {

@Test
public void testUnSubscribeWhenCursorNotExists() throws Exception {
final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
final String topic = newTopicName();
final String subscription = "s1";
admin.topics().createNonPartitionedTopic(topic);
admin.topics().createSubscription(topic, subscription, MessageId.earliest);
Expand All @@ -65,7 +49,7 @@ public void testUnSubscribeWhenCursorNotExists() throws Exception {
consumer.acknowledge(consumer.receive(2, TimeUnit.SECONDS));

PersistentTopic persistentTopic =
(PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).join().get();
(PersistentTopic) getTopic(topic, false).join().get();
ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
ManagedCursorImpl cursor = (ManagedCursorImpl) ml.getCursors().get(subscription);
Awaitility.await().untilAsserted(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,38 +20,25 @@

import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = "broker-api")
public class BytesKeyTest extends ProducerConsumerBase {

@BeforeMethod
@Override
protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
}

@AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}
public class BytesKeyTest extends SharedPulsarBaseTest {

private void byteKeysTest(boolean batching) throws Exception {
Random r = new Random(0);
String topic = newTopicName();
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic("persistent://my-property/my-ns/my-topic1")
.topic(topic)
.subscriptionName("my-subscriber-name").subscribe();

Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.enableBatching(batching)
.batchingMaxPublishDelay(Long.MAX_VALUE, TimeUnit.SECONDS)
.batchingMaxMessages(Integer.MAX_VALUE)
.topic("persistent://my-property/my-ns/my-topic1").create();
.topic(topic).create();

byte[] byteKey = new byte[1000];
r.nextBytes(byteKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,27 +25,13 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = "broker-api")
public class ConsumerAckListTest extends ProducerConsumerBase {

@BeforeClass
@Override
protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
}

@AfterClass(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}
public class ConsumerAckListTest extends SharedPulsarBaseTest {

@DataProvider(name = "ackReceiptEnabled")
public Object[][] ackReceiptEnabled() {
Expand All @@ -61,7 +47,7 @@ public void testBatchListAck(boolean ackReceiptEnabled) throws Exception {
}

private void ackListMessage(boolean isBatch, boolean isPartitioned, boolean ackReceiptEnabled) throws Exception {
final String topic = "persistent://my-property/my-ns/batch-ack-" + UUID.randomUUID();
final String topic = newTopicName();
final String subName = "testBatchAck-sub" + UUID.randomUUID();
final int messageNum = ThreadLocalRandom.current().nextInt(50, 100);
if (isPartitioned) {
Expand Down Expand Up @@ -113,9 +99,9 @@ private void sendMessagesAsyncAndWait(Producer<String> producer, int messages) t
@Test(timeOut = 30000)
public void testAckMessageInAnotherTopic() throws Exception {
final String[] topics = {
"persistent://my-property/my-ns/test-ack-message-in-other-topic1" + UUID.randomUUID(),
"persistent://my-property/my-ns/test-ack-message-in-other-topic2" + UUID.randomUUID(),
"persistent://my-property/my-ns/test-ack-message-in-other-topic3" + UUID.randomUUID()
newTopicName(),
newTopicName(),
newTopicName()
};
@Cleanup final Consumer<String> allTopicsConsumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topics)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,32 +19,16 @@
package org.apache.pulsar.client.api;

import io.netty.util.HashedWheelTimer;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = "broker-api")
public class ConsumerCleanupTest extends ProducerConsumerBase {

@BeforeClass
@Override
protected void setup() throws Exception {
// use Pulsar binary lookup since the HTTP client shares the Pulsar client timer
isTcpLookup = true;
super.internalSetup();
super.producerBaseSetup();
}

@AfterClass(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}
public class ConsumerCleanupTest extends SharedPulsarBaseTest {

@DataProvider(name = "ackReceiptEnabled")
public Object[][] ackReceiptEnabled() {
Expand All @@ -55,9 +39,12 @@ public Object[][] ackReceiptEnabled() {
public void testAllTimerTaskShouldCanceledAfterConsumerClosed(boolean ackReceiptEnabled)
throws PulsarClientException, InterruptedException {
@Cleanup
PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 1);
PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl(getBrokerServiceUrl())
.statsInterval(1, TimeUnit.SECONDS)
.build();
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic("persistent://public/default/" + UUID.randomUUID().toString())
.topic(newTopicName())
.subscriptionName("test")
.isAckReceiptEnabled(ackReceiptEnabled)
.subscribe();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,12 @@
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = "broker-api")
public class CustomMessageIdTest extends ProducerConsumerBase {

@BeforeClass
@Override
protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
}

@AfterClass(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}
public class CustomMessageIdTest extends SharedPulsarBaseTest {

@DataProvider
public static Object[][] enableBatching() {
Expand All @@ -56,7 +42,7 @@ public static Object[][] enableBatching() {

@Test
public void testSeek() throws Exception {
final var topic = "persistent://my-property/my-ns/test-seek-" + System.currentTimeMillis();
final var topic = newTopicName();
@Cleanup final var producer = pulsarClient.newProducer(Schema.INT32).topic(topic).create();
final var msgIds = new ArrayList<SimpleMessageIdImpl>();
for (int i = 0; i < 10; i++) {
Expand All @@ -72,8 +58,7 @@ public void testSeek() throws Exception {

@Test(dataProvider = "enableBatching")
public void testAcknowledgment(boolean enableBatching) throws Exception {
final var topic = "persistent://my-property/my-ns/test-ack-"
+ enableBatching + System.currentTimeMillis();
final var topic = newTopicName();
final var producer = pulsarClient.newProducer(Schema.INT32)
.topic(topic)
.enableBatching(enableBatching)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,32 +21,18 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = "broker-api")
public class ExposeMessageRedeliveryCountTest extends ProducerConsumerBase {

@BeforeMethod
@Override
protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
}

@AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}
public class ExposeMessageRedeliveryCountTest extends SharedPulsarBaseTest {

@Test(timeOut = 30000)
public void testRedeliveryCount() throws PulsarClientException {

final String topic = "persistent://my-property/my-ns/redeliveryCount";
final String topic = newTopicName();

Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic(topic)
Expand Down Expand Up @@ -81,7 +67,7 @@ public void testRedeliveryCount() throws PulsarClientException {
@Test(timeOut = 30000)
public void testRedeliveryCountWithPartitionedTopic() throws PulsarClientException, PulsarAdminException {

final String topic = "persistent://my-property/my-ns/redeliveryCount.partitioned";
final String topic = newTopicName();

admin.topics().createPartitionedTopic(topic, 3);

Expand Down Expand Up @@ -119,7 +105,7 @@ public void testRedeliveryCountWithPartitionedTopic() throws PulsarClientExcepti
@Test(timeOut = 30000)
public void testRedeliveryCountWhenConsumerDisconnected() throws PulsarClientException {

String topic = "persistent://my-property/my-ns/testRedeliveryCountWhenConsumerDisconnected";
String topic = newTopicName();

Consumer<String> consumer0 = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,38 +23,24 @@
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Slf4j
@Test(groups = "broker-api")
public class FailoverSubscriptionTest extends ProducerConsumerBase {
@BeforeClass(alwaysRun = true)
@Override
protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
}

@AfterClass(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}
public class FailoverSubscriptionTest extends SharedPulsarBaseTest {

@Test(timeOut = 30_000, invocationCount = 5)
public void testWaitingCursorsCountAfterSwitchingActiveConsumers() throws Exception {
final String tp = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp");
final String tp = newTopicName();
final String subscription = "s1";
admin.topics().createNonPartitionedTopic(tp);
admin.topics().createSubscription(tp, subscription, MessageId.earliest);
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopic(tp, false).join().get();
PersistentTopic topic = (PersistentTopic) getTopic(tp, false).join().get();
Map<String, ConsumerImpl<byte[]>> consumerMap = new HashMap<>();
ConsumerImpl<byte[]> firstConsumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(tp)
.subscriptionType(SubscriptionType.Failover).subscriptionName(subscription).subscribe();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,16 @@
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
import org.apache.pulsar.client.api.PulsarClientException.MemoryBufferIsFullError;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarTestClient;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = "broker-api")
public class MemoryLimitTest extends ProducerConsumerBase {
public class MemoryLimitTest extends SharedPulsarBaseTest {

@DataProvider(name = "batchingAndMemoryLimit")
public Object[][] provider() {
Expand All @@ -44,26 +43,13 @@ public Object[][] provider() {
};
}

@BeforeMethod
@Override
protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
}

@AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}

@Test(dataProvider = "batchingAndMemoryLimit")
public void testRejectMessages(boolean batching, int memoryLimit)
throws Exception {
String topic = newTopicName();

ClientBuilder clientBuilder = PulsarClient.builder()
.serviceUrl(pulsar.getBrokerServiceUrl())
.serviceUrl(getBrokerServiceUrl())
.memoryLimit(memoryLimit, SizeUnit.KILO_BYTES);

@Cleanup
Expand Down Expand Up @@ -120,7 +106,7 @@ public void testRejectMessagesOnMultipleTopics(boolean batching, int memoryLimit
String t2 = newTopicName();

ClientBuilder clientBuilder = PulsarClient.builder()
.serviceUrl(pulsar.getBrokerServiceUrl())
.serviceUrl(getBrokerServiceUrl())
.memoryLimit(memoryLimit, SizeUnit.KILO_BYTES);

@Cleanup
Expand Down
Loading
Loading