diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 22f3be27d58b5..ae1177d295e89 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1273,34 +1273,29 @@ public CompletableFuture unsubscribe(String subscriptionName) { .getTransactionPendingAckStoreSuffix(topic, Codec.encode(subscriptionName))); if (brokerService.pulsar().getConfiguration().isTransactionCoordinatorEnabled()) { - CompletableFuture managedLedgerConfig = getBrokerService().getManagedLedgerConfig(tn); - managedLedgerConfig.thenAccept(config -> { - ManagedLedgerFactory managedLedgerFactory = - getBrokerService().getManagedLedgerFactoryForTopic(tn, config.getStorageClassName()); + ManagedLedgerConfig managedLedgerConfig = ledger.getConfig(); + ManagedLedgerFactory managedLedgerFactory = getBrokerService() + .getManagedLedgerFactoryForTopic(tn, managedLedgerConfig.getStorageClassName()); managedLedgerFactory.asyncDelete(tn.getPersistenceNamingEncoding(), - managedLedgerConfig, - new AsyncCallbacks.DeleteLedgerCallback() { - @Override - public void deleteLedgerComplete(Object ctx) { + CompletableFuture.completedFuture(managedLedgerConfig), + new AsyncCallbacks.DeleteLedgerCallback() { + @Override + public void deleteLedgerComplete(Object ctx) { + asyncDeleteCursorWithClearDelayedMessage(subscriptionName, unsubscribeFuture); + } + + @Override + public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) { + if (exception instanceof MetadataNotFoundException) { asyncDeleteCursorWithClearDelayedMessage(subscriptionName, unsubscribeFuture); + return; } - @Override - public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) { - if (exception instanceof MetadataNotFoundException) { - asyncDeleteCursorWithClearDelayedMessage(subscriptionName, unsubscribeFuture); - return; - } - - unsubscribeFuture.completeExceptionally(exception); - log.error("[{}][{}] Error deleting subscription pending ack store", - topic, subscriptionName, exception); - } - }, null); - }).exceptionally(ex -> { - unsubscribeFuture.completeExceptionally(ex); - return null; - }); + unsubscribeFuture.completeExceptionally(exception); + log.error("[{}][{}] Error deleting subscription pending ack store", + topic, subscriptionName, exception); + } + }, null); } else { asyncDeleteCursorWithClearDelayedMessage(subscriptionName, unsubscribeFuture); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java index 60672845b5e75..db16963f20874 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java @@ -495,6 +495,7 @@ public void testRemoveCluster() throws Exception { admin1.namespaces().createNamespace(ns1); admin1.namespaces().setNamespaceReplicationClusters(ns1, new HashSet<>(Arrays.asList(cluster1, cluster2))); admin1.topics().createNonPartitionedTopic(topic); + admin1.topics().createSubscription(topic, "s1", MessageId.earliest); // Wait for loading topic up. Producer p = client1.newProducer(Schema.STRING).topic(topic).create();