diff --git a/pcip/pcip-3.md b/pcip/pcip-3.md new file mode 100644 index 0000000..da0466a --- /dev/null +++ b/pcip/pcip-3.md @@ -0,0 +1,239 @@ +# PCIP-3:Pulsar Extended Transaction API Enhancement Proposal +# Background knowledge +When users consume messages using the Fail-over subscription mode and confirm messages using cumulative Ack, duplicate consumption may occur. In this case, even if users use Transaction, they cannot achieve Just-Once. +As shown in the figure below, in failover mode, the two consumer1 and consumer2 started simultaneously frequently undergo two disconnection switches. +Finally, Consumer1 consumed M1, M2, M4, M5; Consumer2 consumed M1, M2, M3. M1 and M2 were consumed twice. + +![cumulative-ack-issue.png](static/img/pcip-3/cumulative-ack-issue.png) + +# Goals +Solve the problem of cumulative ack consumption duplication by designing a new transaction API. +Why not fix this issue in the Pulsar main repository? +- The complexity of fixing problems without modifying the Client API is high, and the problem-solving cycle is long. +- There are indeed many confusing usage postures for existing transaction APIs + - For example, the abort () and commit () methods may seem synchronous in name, but they are actually asynchronous. In actual use, you need to use abort ().get () and commit ().get (). +- Modifying the Client API in the Pulsar main repository is a difficult task because we cannot determine whether the new solution is necessarily perfect, and the time cycle for updating the API is long. + The benefit of solving this problem in the contributor repository is: +- By wrapping the original Transaction API, this problem can be solved in a concise way. This wrapping can be seen as a best practice that does not affect the use of existing users, while providing a reference solution for users who encounter similar problems. +# High Level Design +Design a new API to place the context of message sending and consumption within Transaction, which not only solves the problem of repeated consumption, but also retains sufficient scalability for possible optimization in the future. +- Solve the problem of repeated consumption - use the function of individual ack messagelist to batch ack messages instead of the original cumulative ack. +- Retained sufficient scalability - sending messages requires using Transaction to construct messages, and consumed transaction messages are recorded in Transaction. Later, more optimizations can be added using this information without changing the interface. + +## Public-facing Changes +### Public API +The org.apache.pulsar.txn.api.Transaction interface is an optimized and extended transaction interface designed to enhance usability and clarity in the Pulsar contributors' library. It addresses issues with CumulativeAck and transactions not preventing repeated message consumption, and it refines ambiguous methods for better clarity. Key features include: +- Message Recording: Records messages in a transaction without automatic acknowledgment. +- Asynchronous and Synchronous Acknowledgment: Supports both asynchronous and synchronous acknowledgment of all received messages for specific consumers or across all consumers. +- Transactional Message Builder: Creates a new transactional message builder for a given producer to construct and send messages within a transaction context. +- Committing and Aborting Transactions: Offers both asynchronous and synchronous methods to commit or abort transactions, ensuring the effectiveness of message sends and acknowledgments. +- Transaction ID and State Retrieval: Provides methods to retrieve the unique transaction ID and its current state to determine the transaction's lifecycle phase. +```java +/** +* Interface representing an optimized and extended transaction interface in the Pulsar +* contributors' library. +* +*

This interface provides enhancements and extensions to the base transaction interface in +* Pulsar. It specifically addresses the issue where using CumulativeAck with transactions could not +* prevent message consumed repeated. Additionally, it clarifies and optimizes ambiguous methods for +* better usability and clarity. + */ + public interface Transaction { + +/** +* Records a message in the transaction. +* +*

This method is used to include a message in the current transaction. The message will not be +* automatically acknowledged when the transaction is committed. Instead, it must be explicitly +* acknowledged by calling one of the ack methods. +* +* @param messageId the ID of the message to record +* @param consumer the consumer that received the message + */ + void recordMsg(MessageId messageId, Consumer consumer); + +/** +* Asynchronously acknowledges all received messages for a specific consumer in the transaction. +* +*

This method is used to acknowledge all messages that have been recorded for the specified +* consumer in the transaction. The acknowledgment is asynchronous, and the future can be used to +* determine when the operation is complete. +* +* @param consumer the consumer that received the messages +* @return a CompletableFuture that will be completed when the acknowledgment is complete + */ + CompletableFuture ackAllReceivedMsgsAsync(Consumer consumer); + +/** +* Acknowledges all received messages for a specific consumer in the transaction. +* +*

This method is a synchronous version of {@link #ackAllReceivedMsgsAsync(Consumer)}. It will +* block until the acknowledgment is complete. +* +* @param consumer the consumer that received the messages +* @throws ExecutionException if the acknowledgment fails +* @throws InterruptedException if the thread is interrupted while waiting for the acknowledgment +* to complete +*/ +void ackAllReceivedMsgs(Consumer consumer) throws ExecutionException, InterruptedException; + +/** +* Acknowledges all received messages in the transaction. +* +*

This method is a convenience method that acknowledges all messages across all consumers. It +* will block until the acknowledgment is complete. +* +* @throws ExecutionException if the acknowledgment fails +* @throws InterruptedException if the thread is interrupted while waiting for the acknowledgment +* to complete +*/ +void ackAllReceivedMsgs() throws ExecutionException, InterruptedException; + +/** +* Asynchronously acknowledges all received messages in the transaction. +* +*

This method is a convenience method that acknowledges all messages across all consumers. The +* acknowledgment is asynchronous, and the future can be used to determine when the operation is +* complete. +* +* @return a CompletableFuture that will be completed when the acknowledgment is complete + */ + CompletableFuture ackAllReceivedMsgsAsync(); + +/** +* Creates a new transactional message builder for the given producer. +* +*

This method returns a {@link TypedMessageBuilder} instance that is bound to the specified +* producer and transaction. The returned message builder can be used to construct and send +* messages within the context of a transaction. +* +* @param producer the producer instance used to send messages +* @param the type of messages produced by the producer +* @return a TypedMessageBuilder instance for building transactional messages + */ + TypedMessageBuilder newTransactionMessage(Producer producer); + +/** +* Asynchronously commits the transaction. +* +*

This method is used to commit the transaction, making all sent messages and acknowledgments +* effective. When the transaction is committed, consumers receive the transaction messages and +* the pending-ack state becomes ack state. The commit is asynchronous, and the future can be used +* to determine when the operation is complete. +* +* @return a CompletableFuture that will be completed when the commit is complete + */ + CompletableFuture commitAsync(); + +/** +* Asynchronously aborts the transaction. +* +*

This method is used to abort the transaction, discarding all send messages and +* acknowledgments. The abort is asynchronous, and the future can be used to determine when the +* operation is complete. +* +* @return a CompletableFuture that will be completed when the abort is complete + */ + CompletableFuture abortAsync(); + +/** +* Commits the transaction. +* +*

This method is a synchronous version of {@link #commitAsync()}. It will block until the +* commit is complete. +* +* @throws ExecutionException if the commit fails +* @throws InterruptedException if the thread is interrupted while waiting for the commit to +* complete +*/ +void commit() throws ExecutionException, InterruptedException; + +/** +* Aborts the transaction. +* +*

This method is a synchronous version of {@link #abortAsync()}. It will block until the abort +* is complete. +* +* @throws ExecutionException if the abort fails +* @throws InterruptedException if the thread is interrupted while waiting for the abort to +* complete +*/ +void abort() throws ExecutionException, InterruptedException; + +/** +* Gets the transaction ID. +* +*

This method returns the unique identifier for the transaction. +* +* @return the transaction ID + */ + TxnID getTxnID(); + +/** +* Gets the current state of the transaction. +* +*

This method returns the current state of the transaction, which can be used to determine if +* the transaction is open, committed, aborted, error or timeout. +* +* @return the current state of the transaction + */ + org.apache.pulsar.client.api.transaction.Transaction.State getState(); + } +``` + +# Get started +## Quick Start +```java +public void transactionDemo() throws Exception { +String pubTopic = "persistent://public/default/my-pub-topic"; +String subTopic = "persistent://public/default/my-sub-topic"; +String subscription = "my-subscription"; + +// Create a Pulsar client instance +PulsarClient client = SingletonPulsarContainer.createPulsarClient(); + +// Create a Transaction object +// Use TransactionFactory to create a transaction object with a timeout of 5 seconds +Transaction transaction = +TransactionFactory.createTransaction(client, 5, TimeUnit.SECONDS).get(); + +// Create producers and a consumer +// Create two producers to send messages to different topics +Producer producerToPubTopic = client.newProducer(Schema.STRING).topic(pubTopic).create(); +Producer producerToSubTopic = client.newProducer(Schema.STRING).topic(subTopic).create(); + +// Create a consumer to receive messages from the subTopic +Consumer consumerFromSubTopic = client +.newConsumer(Schema.STRING) +.subscriptionName(subscription) +.topic(subTopic) +.subscribe(); + +// Send a message to the Sub Topic +producerToSubTopic.send("Hello World"); + +// Receive a message +Message receivedMessage = consumerFromSubTopic.receive(); +MessageId receivedMessageId = receivedMessage.getMessageId(); + +// Record the message in the transaction +transaction.recordMsg(receivedMessageId, consumerFromSubTopic); + +// Forward the transaction message to the pub topic +// Use the transaction message builder to forward the received message to the pubTopic +transaction.newTransactionMessage(producerToSubTopic).value(receivedMessage.getValue()).send(); + +// Acknowledge all received messages +// Acknowledge all messages received from the subTopic within the transaction +transaction.ackAllReceivedMsgs(consumerFromSubTopic); + +// Commit the transaction +// Commit the transaction to ensure all recorded messages and acknowledgments take effect +transaction.commit(); + +// Close the consumer, producers, and client to release resources +consumerFromSubTopic.close(); +producerToSubTopic.close(); +client.close(); +} +``` \ No newline at end of file diff --git a/pcip/static/img/pcip-3/cumulative-ack-issue.png b/pcip/static/img/pcip-3/cumulative-ack-issue.png new file mode 100644 index 0000000..f64cced Binary files /dev/null and b/pcip/static/img/pcip-3/cumulative-ack-issue.png differ diff --git a/pom.xml b/pom.xml index 749f363..e57a6e2 100644 --- a/pom.xml +++ b/pom.xml @@ -48,6 +48,8 @@ 2.12.0 4.2.2 1.20.1 + 4.13.1 + 5.12.0 diff --git a/pulsar-transaction-contrib/pom.xml b/pulsar-transaction-contrib/pom.xml index c9f0077..01b4501 100644 --- a/pulsar-transaction-contrib/pom.xml +++ b/pulsar-transaction-contrib/pom.xml @@ -21,8 +21,45 @@ pulsar-java-contrib 1.0.0-SNAPSHOT - 2024 pulsar-transaction-contrib + + + junit + junit + ${junit.version} + test + + + org.apache.pulsar + pulsar-client-all + ${pulsar.version} + + + org.mockito + mockito-core + ${mockito.version} + test + + + org.testcontainers + pulsar + test + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 17 + 17 + + + + + 2024 + diff --git a/pulsar-transaction-contrib/src/main/java/org/apache/pulsar/txn/api/Transaction.java b/pulsar-transaction-contrib/src/main/java/org/apache/pulsar/txn/api/Transaction.java new file mode 100644 index 0000000..d673032 --- /dev/null +++ b/pulsar-transaction-contrib/src/main/java/org/apache/pulsar/txn/api/Transaction.java @@ -0,0 +1,174 @@ +package org.apache.pulsar.txn.api; + +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.TypedMessageBuilder; +import org.apache.pulsar.client.api.transaction.TxnID; + +/** + * Interface representing an optimized and extended transaction interface in the Pulsar + * contributors' library. + * + *

This interface provides enhancements and extensions to the base transaction interface in + * Pulsar. It specifically addresses the issue where using CumulativeAck with transactions could not + * prevent message consumed repeated. Additionally, it clarifies and optimizes ambiguous methods for + * better usability and clarity. + */ +public interface Transaction { + + /** + * Records a message in the transaction. + * + *

This method is used to include a message in the current transaction. The message will not be + * automatically acknowledged when the transaction is committed. Instead, it must be explicitly + * acknowledged by calling one of the ack methods. + * + * @param messageId the ID of the message to record + * @param consumer the consumer that received the message + */ + void recordMsg(MessageId messageId, Consumer consumer); + + /** + * Asynchronously acknowledges all received messages for a specific consumer in the transaction. + * + *

This method is used to acknowledge all messages that have been recorded for the specified + * consumer in the transaction. The acknowledgment is asynchronous, and the future can be used to + * determine when the operation is complete. + * + * @param consumer the consumer that received the messages + * @return a CompletableFuture that will be completed when the acknowledgment is complete + */ + CompletableFuture ackAllReceivedMsgsAsync(Consumer consumer); + + /** + * Acknowledges all received messages for a specific consumer in the transaction. + * + *

This method is a synchronous version of {@link #ackAllReceivedMsgsAsync(Consumer)}. It will + * block until the acknowledgment is complete. + * + * @param consumer the consumer that received the messages + * @throws ExecutionException if the acknowledgment fails + * @throws InterruptedException if the thread is interrupted while waiting for the acknowledgment + * to complete + */ + void ackAllReceivedMsgs(Consumer consumer) throws ExecutionException, InterruptedException; + + /** + * Acknowledges all received messages in the transaction. + * + *

This method is a convenience method that acknowledges all messages across all consumers. It + * will block until the acknowledgment is complete. + * + * @throws ExecutionException if the acknowledgment fails + * @throws InterruptedException if the thread is interrupted while waiting for the acknowledgment + * to complete + */ + void ackAllReceivedMsgs() throws ExecutionException, InterruptedException; + + /** + * Asynchronously acknowledges all received messages in the transaction. + * + *

This method is a convenience method that acknowledges all messages across all consumers. The + * acknowledgment is asynchronous, and the future can be used to determine when the operation is + * complete. + * + * @return a CompletableFuture that will be completed when the acknowledgment is complete + */ + CompletableFuture ackAllReceivedMsgsAsync(); + + /** + * Creates a new transactional message builder for the given producer. + * + *

This method returns a {@link TypedMessageBuilder} instance that is bound to the specified + * producer and transaction. The returned message builder can be used to construct and send + * messages within the context of a transaction. + * + * @param producer the producer instance used to send messages + * @param the type of messages produced by the producer + * @return a TypedMessageBuilder instance for building transactional messages + */ + TypedMessageBuilder newTransactionMessage(Producer producer); + + /** + * Asynchronously commits the transaction. + * + *

This method is used to commit the transaction, making all sent messages and acknowledgments + * effective. When the transaction is committed, consumers receive the transaction messages and + * the pending-ack state becomes ack state. The commit is asynchronous, and the future can be used + * to determine when the operation is complete. + * + * @return a CompletableFuture that will be completed when the commit is complete + */ + CompletableFuture commitAsync(); + + /** + * Asynchronously aborts the transaction. + * + *

This method is used to abort the transaction, discarding all send messages and + * acknowledgments. The abort is asynchronous, and the future can be used to determine when the + * operation is complete. + * + * @return a CompletableFuture that will be completed when the abort is complete + */ + CompletableFuture abortAsync(); + + /** + * Commits the transaction. + * + *

This method is a synchronous version of {@link #commitAsync()}. It will block until the + * commit is complete. + * + * @throws ExecutionException if the commit fails + * @throws InterruptedException if the thread is interrupted while waiting for the commit to + * complete + */ + void commit() throws ExecutionException, InterruptedException; + + /** + * Aborts the transaction. + * + *

This method is a synchronous version of {@link #abortAsync()}. It will block until the abort + * is complete. + * + * @throws ExecutionException if the abort fails + * @throws InterruptedException if the thread is interrupted while waiting for the abort to + * complete + */ + void abort() throws ExecutionException, InterruptedException; + + /** + * Gets the transaction ID. + * + *

This method returns the unique identifier for the transaction. + * + * @return the transaction ID + */ + TxnID getTxnID(); + + /** + * Gets the current state of the transaction. + * + *

This method returns the current state of the transaction, which can be used to determine if + * the transaction is open, committed, aborted, error or timeout. + * + * @return the current state of the transaction + */ + org.apache.pulsar.client.api.transaction.Transaction.State getState(); +} diff --git a/pulsar-transaction-contrib/src/main/java/org/apache/pulsar/txn/api/TransactionFactory.java b/pulsar-transaction-contrib/src/main/java/org/apache/pulsar/txn/api/TransactionFactory.java new file mode 100644 index 0000000..b540b64 --- /dev/null +++ b/pulsar-transaction-contrib/src/main/java/org/apache/pulsar/txn/api/TransactionFactory.java @@ -0,0 +1,40 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pulsar.txn.api; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.txn.impl.TransactionImpl; + +public class TransactionFactory { + + /** + * Creates a new transaction with the specified timeout. + * + * @param pulsarClient the Pulsar client instance + * @param timeout the transaction timeout + * @param unit the time unit of the timeout + * @return a CompletableFuture that will be completed with the new transaction + */ + public static CompletableFuture createTransaction( + PulsarClient pulsarClient, long timeout, TimeUnit unit) { + // Create a transaction with the specified timeout + return pulsarClient + .newTransaction() + .withTransactionTimeout(timeout, unit) + .build() + .thenApply(TransactionImpl::new); + } +} diff --git a/pulsar-transaction-contrib/src/main/java/org/apache/pulsar/txn/api/package-info.java b/pulsar-transaction-contrib/src/main/java/org/apache/pulsar/txn/api/package-info.java new file mode 100644 index 0000000..a1cf663 --- /dev/null +++ b/pulsar-transaction-contrib/src/main/java/org/apache/pulsar/txn/api/package-info.java @@ -0,0 +1,14 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pulsar.txn.api; diff --git a/pulsar-transaction-contrib/src/main/java/org/apache/pulsar/txn/impl/TransactionImpl.java b/pulsar-transaction-contrib/src/main/java/org/apache/pulsar/txn/impl/TransactionImpl.java new file mode 100644 index 0000000..94434cd --- /dev/null +++ b/pulsar-transaction-contrib/src/main/java/org/apache/pulsar/txn/impl/TransactionImpl.java @@ -0,0 +1,109 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pulsar.txn.impl; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import lombok.Getter; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.TypedMessageBuilder; +import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.txn.api.Transaction; + +public class TransactionImpl implements Transaction { + @Getter + ConcurrentHashMap, List> receivedMessages = new ConcurrentHashMap<>(); + + org.apache.pulsar.client.api.transaction.Transaction transaction; + + public TransactionImpl(org.apache.pulsar.client.api.transaction.Transaction transaction) { + this.transaction = transaction; + } + + @Override + public void recordMsg(MessageId messageId, Consumer consumer) { + receivedMessages.computeIfAbsent(consumer, k -> new CopyOnWriteArrayList<>()).add(messageId); + } + + @Override + public CompletableFuture ackAllReceivedMsgsAsync(Consumer consumer) { + List messageIds = receivedMessages.remove(consumer); + if (messageIds != null) { + return consumer.acknowledgeAsync(messageIds, transaction); + } + return CompletableFuture.completedFuture(null); + } + + @Override + public void ackAllReceivedMsgs(Consumer consumer) + throws ExecutionException, InterruptedException { + ackAllReceivedMsgsAsync(consumer).get(); + } + + @Override + public void ackAllReceivedMsgs() throws ExecutionException, InterruptedException { + ackAllReceivedMsgsAsync().get(); + } + + @Override + public CompletableFuture ackAllReceivedMsgsAsync() { + return FutureUtil.waitForAll( + receivedMessages.keySet().stream() + .map(this::ackAllReceivedMsgsAsync) + .collect(Collectors.toList())); + } + + @Override + public TypedMessageBuilder newTransactionMessage(Producer producer) { + return producer.newMessage(transaction); + } + + @Override + public CompletableFuture commitAsync() { + return transaction.commit(); + } + + @Override + public CompletableFuture abortAsync() { + return transaction.abort(); + } + + @Override + public void commit() throws ExecutionException, InterruptedException { + transaction.commit().get(); + } + + @Override + public void abort() throws ExecutionException, InterruptedException { + transaction.abort().get(); + } + + @Override + public TxnID getTxnID() { + return transaction.getTxnID(); + } + + @Override + public org.apache.pulsar.client.api.transaction.Transaction.State getState() { + return transaction.getState(); + } +} diff --git a/pulsar-transaction-contrib/src/main/java/org/apache/pulsar/txn/impl/package-info.java b/pulsar-transaction-contrib/src/main/java/org/apache/pulsar/txn/impl/package-info.java new file mode 100644 index 0000000..2ec84e4 --- /dev/null +++ b/pulsar-transaction-contrib/src/main/java/org/apache/pulsar/txn/impl/package-info.java @@ -0,0 +1,14 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pulsar.txn.impl; diff --git a/pulsar-transaction-contrib/src/test/java/org/apache/pulsar/txn/SingletonPulsarContainer.java b/pulsar-transaction-contrib/src/test/java/org/apache/pulsar/txn/SingletonPulsarContainer.java new file mode 100644 index 0000000..afd3bdc --- /dev/null +++ b/pulsar-transaction-contrib/src/test/java/org/apache/pulsar/txn/SingletonPulsarContainer.java @@ -0,0 +1,70 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pulsar.txn; +import java.io.IOException; +import java.time.Duration; +import java.util.Properties; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.testcontainers.containers.PulsarContainer; +import org.testcontainers.utility.DockerImageName; + +@Slf4j +public class SingletonPulsarContainer { + + private static final PulsarContainer PULSAR_CONTAINER; + + static { + PULSAR_CONTAINER = new PulsarContainer(getPulsarImage()) + .withEnv("PULSAR_PREFIX_acknowledgmentAtBatchIndexLevelEnabled", "true") + .withEnv("PULSAR_PREFIX_transactionCoordinatorEnabled", "true") + .withStartupTimeout(Duration.ofMinutes(3)); + PULSAR_CONTAINER.start(); + } + + private static DockerImageName getPulsarImage() { + return DockerImageName.parse("apachepulsar/pulsar:" + getPulsarImageVersion()); + } + + private static String getPulsarImageVersion() { + String pulsarVersion = ""; + Properties properties = new Properties(); + try { + properties.load(SingletonPulsarContainer.class.getClassLoader() + .getResourceAsStream("pulsar-container.properties")); + if (!properties.isEmpty()) { + pulsarVersion = properties.getProperty("pulsar.version"); + } + } catch (IOException e) { + log.error("Failed to load pulsar version. " + e.getCause()); + } + return pulsarVersion; + } + + static PulsarClient createPulsarClient() throws PulsarClientException { + return PulsarClient.builder() + .serviceUrl(SingletonPulsarContainer.PULSAR_CONTAINER.getPulsarBrokerUrl()) + .enableTransaction(true) + .build(); + } + + static PulsarAdmin createPulsarAdmin() throws PulsarClientException { + return PulsarAdmin.builder() + .serviceHttpUrl(SingletonPulsarContainer.PULSAR_CONTAINER.getHttpServiceUrl()) + .build(); + } +} diff --git a/pulsar-transaction-contrib/src/test/java/org/apache/pulsar/txn/TransactionDemo.java b/pulsar-transaction-contrib/src/test/java/org/apache/pulsar/txn/TransactionDemo.java new file mode 100644 index 0000000..308c521 --- /dev/null +++ b/pulsar-transaction-contrib/src/test/java/org/apache/pulsar/txn/TransactionDemo.java @@ -0,0 +1,82 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pulsar.txn; + +import java.util.concurrent.TimeUnit; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.txn.api.Transaction; +import org.apache.pulsar.txn.api.TransactionFactory; +import org.testng.annotations.Test; + +public class TransactionDemo { + + @Test + public void transactionDemo() throws Exception { + String pubTopic = "persistent://public/default/my-pub-topic"; + String subTopic = "persistent://public/default/my-sub-topic"; + String subscription = "my-subscription"; + + // Create a Pulsar client instance + PulsarClient client = SingletonPulsarContainer.createPulsarClient(); + + // Create a Transaction object + // Use TransactionFactory to create a transaction object with a timeout of 5 seconds + Transaction transaction = + TransactionFactory.createTransaction(client, 5, TimeUnit.SECONDS).get(); + + // Create producers and a consumer + // Create two producers to send messages to different topics + Producer producerToPubTopic = client.newProducer(Schema.STRING).topic(pubTopic).create(); + Producer producerToSubTopic = client.newProducer(Schema.STRING).topic(subTopic).create(); + + // Create a consumer to receive messages from the subTopic + Consumer consumerFromSubTopic = client + .newConsumer(Schema.STRING) + .subscriptionName(subscription) + .topic(subTopic) + .subscribe(); + + // Send a message to the Sub Topic + producerToSubTopic.send("Hello World"); + + // Receive a message + Message receivedMessage = consumerFromSubTopic.receive(); + MessageId receivedMessageId = receivedMessage.getMessageId(); + + // Record the message in the transaction + transaction.recordMsg(receivedMessageId, consumerFromSubTopic); + + // Forward the transaction message to the pub topic + // Use the transaction message builder to forward the received message to the pubTopic + transaction.newTransactionMessage(producerToSubTopic).value(receivedMessage.getValue()).send(); + + // Acknowledge all received messages + // Acknowledge all messages received from the subTopic within the transaction + transaction.ackAllReceivedMsgs(consumerFromSubTopic); + + // Commit the transaction + // Commit the transaction to ensure all recorded messages and acknowledgments take effect + transaction.commit(); + + // Close the consumer, producers, and client to release resources + consumerFromSubTopic.close(); + producerToSubTopic.close(); + client.close(); + } +} \ No newline at end of file diff --git a/pulsar-transaction-contrib/src/test/java/org/apache/pulsar/txn/TransactionImplTest.java b/pulsar-transaction-contrib/src/test/java/org/apache/pulsar/txn/TransactionImplTest.java new file mode 100644 index 0000000..e4171a6 --- /dev/null +++ b/pulsar-transaction-contrib/src/test/java/org/apache/pulsar/txn/TransactionImplTest.java @@ -0,0 +1,241 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pulsar.txn; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.txn.impl.TransactionImpl; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class TransactionImplTest { + + private TransactionImpl transactionImpl; + private org.apache.pulsar.client.api.transaction.Transaction mockTransaction; + private List> mockConsumers; + private List messageIds; + + @BeforeMethod + public void setUp() { + mockTransaction = mock(org.apache.pulsar.client.api.transaction.Transaction.class); + mockConsumers = new ArrayList<>(); + messageIds = new ArrayList<>(); + + // Create two mock consumers and two message IDs + for (int i = 0; i < 2; i++) { + Consumer mockConsumer = mock(Consumer.class); + MessageId messageId = mock(MessageId.class); + mockConsumers.add(mockConsumer); + messageIds.add(messageId); + } + + transactionImpl = new TransactionImpl(mockTransaction); + } + + @Test + public void testRecordMsg() { + // Record a message for a consumer + Consumer consumer = mockConsumers.get(0); + MessageId messageId = messageIds.get(0); + transactionImpl.recordMsg(messageId, consumer); + + // Verify the message is recorded for the consumer + assertTrue(transactionImpl.getReceivedMessages().get(consumer).contains(messageId)); + } + + @Test + public void testAckAllReceivedMsgsAsync() throws ExecutionException, InterruptedException { + // Record messages for different consumers + for (int i = 0; i < mockConsumers.size(); i++) { + Consumer consumer = mockConsumers.get(i); + MessageId messageId = messageIds.get(i); + transactionImpl.recordMsg(messageId, consumer); + } + + // Mock the acknowledgeAsync method for each consumer + for (Consumer consumer : mockConsumers) { + when(consumer.acknowledgeAsync(anyList(), any())) + .thenReturn(CompletableFuture.completedFuture(null)); + } + + // Call the ackAllReceivedMsgsAsync method + CompletableFuture future = transactionImpl.ackAllReceivedMsgsAsync(); + future.get(); + + // Verify each consumer called the correct acknowledgeAsync method with the correct message IDs + for (int i = 0; i < mockConsumers.size(); i++) { + Consumer consumer = mockConsumers.get(i); + MessageId messageId = messageIds.get(i); + verify(consumer).acknowledgeAsync(eq(List.of(messageId)), eq(mockTransaction)); + } + } + + @Test + public void testAckAllReceivedMsgs() throws ExecutionException, InterruptedException { + // Record messages for a consumer + Consumer consumer = mockConsumers.get(0); + MessageId messageId = messageIds.get(0); + transactionImpl.recordMsg(messageId, consumer); + + // Mock the acknowledgeAsync method for the consumer + when(consumer.acknowledgeAsync(anyList(), any())) + .thenReturn(CompletableFuture.completedFuture(null)); + + // Call the ackAllReceivedMsgs method + transactionImpl.ackAllReceivedMsgs(consumer); + + // Verify the consumer called the correct acknowledgeAsync method with the correct message IDs + verify(consumer).acknowledgeAsync(eq(List.of(messageId)), eq(mockTransaction)); + // Verify the message Ids were removed from the transaction context after acked. + assertEquals(transactionImpl.getReceivedMessages().size(), 0); + } + + @Test + public void testAckAllReceivedMsgsAll() throws ExecutionException, InterruptedException { + // Record messages for different consumers + for (int i = 0; i < mockConsumers.size(); i++) { + Consumer consumer = mockConsumers.get(i); + MessageId messageId = messageIds.get(i); + transactionImpl.recordMsg(messageId, consumer); + } + + // Mock the acknowledgeAsync method for each consumer + for (Consumer consumer : mockConsumers) { + when(consumer.acknowledgeAsync(anyList(), any())) + .thenReturn(CompletableFuture.completedFuture(null)); + } + + // Call the ackAllReceivedMsgs method + transactionImpl.ackAllReceivedMsgs(); + + // Verify each consumer called the correct acknowledgeAsync method with the correct message IDs + for (int i = 0; i < mockConsumers.size(); i++) { + Consumer consumer = mockConsumers.get(i); + MessageId messageId = messageIds.get(i); + verify(consumer).acknowledgeAsync(eq(List.of(messageId)), eq(mockTransaction)); + } + } + + @Test + public void testAckAllReceivedMsgsAsyncAll() throws ExecutionException, InterruptedException { + // Record messages for different consumers + for (int i = 0; i < mockConsumers.size(); i++) { + Consumer consumer = mockConsumers.get(i); + MessageId messageId = messageIds.get(i); + transactionImpl.recordMsg(messageId, consumer); + } + + // Mock the acknowledgeAsync method for each consumer + for (Consumer consumer : mockConsumers) { + when(consumer.acknowledgeAsync(anyList(), any())) + .thenReturn(CompletableFuture.completedFuture(null)); + } + + // Call the ackAllReceivedMsgsAsync method + CompletableFuture future = transactionImpl.ackAllReceivedMsgsAsync(); + future.get(); + + // Verify each consumer called the correct acknowledgeAsync method with the correct message IDs + for (int i = 0; i < mockConsumers.size(); i++) { + Consumer consumer = mockConsumers.get(i); + MessageId messageId = messageIds.get(i); + verify(consumer).acknowledgeAsync(eq(List.of(messageId)), eq(mockTransaction)); + } + } + + @Test + public void testCommitAsync() throws ExecutionException, InterruptedException { + // Mock the commit method of the transaction + when(mockTransaction.commit()).thenReturn(CompletableFuture.completedFuture(null)); + + // Call the commitAsync method + CompletableFuture future = transactionImpl.commitAsync(); + future.get(); + + // Verify the commit method was called + verify(mockTransaction).commit(); + } + + @Test + public void testAbortAsync() throws ExecutionException, InterruptedException { + // Mock the abort method of the transaction + when(mockTransaction.abort()).thenReturn(CompletableFuture.completedFuture(null)); + + // Call the abortAsync method + CompletableFuture future = transactionImpl.abortAsync(); + future.get(); + + // Verify the abort method was called + verify(mockTransaction).abort(); + } + + @Test + public void testCommit() throws ExecutionException, InterruptedException { + // Mock the commit method of the transaction + when(mockTransaction.commit()).thenReturn(CompletableFuture.completedFuture(null)); + + // Call the commit method + transactionImpl.commit(); + + // Verify the commit method was called + verify(mockTransaction).commit(); + } + + @Test + public void testAbort() throws ExecutionException, InterruptedException { + // Mock the abort method of the transaction + when(mockTransaction.abort()).thenReturn(CompletableFuture.completedFuture(null)); + + // Call the abort method + transactionImpl.abort(); + + // Verify the abort method was called + verify(mockTransaction).abort(); + } + + @Test + public void testGetTxnID() { + // Mock the getTxnID method of the transaction + TxnID txnID = mock(TxnID.class); + when(mockTransaction.getTxnID()).thenReturn(txnID); + + // Call the getTxnID method + assertEquals(txnID, transactionImpl.getTxnID()); + } + + @Test + public void testGetState() { + // Mock the getState method of the transaction + org.apache.pulsar.client.api.transaction.Transaction.State state = + org.apache.pulsar.client.api.transaction.Transaction.State.OPEN; + when(mockTransaction.getState()).thenReturn(state); + + // Call the getState method + assertEquals(state, transactionImpl.getState()); + } +}