From 4eefd27d8bd4fdf4beb4aa8f43791896f5a0add7 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Sat, 26 Aug 2023 17:44:21 +0800 Subject: [PATCH 1/9] Avoid copying OpSendMsg when sending messages Fixes https://github.com/apache/pulsar-client-cpp/issues/306 ### Motivation `OpSendMsg` is a struct whose size is 400 bytes. We should avoid the copy operation on it. ### Modifications Pass the `unique_ptr` everywhere instead of `OpSendMsg`. - Use `unique_ptr` as the element of the pending message queue in `ProducerImpl` and disable the copy constructor and assignment for `OpSendMsg`. - Add `SendArgument`, which includes the necessary fields to construct a `CommandSend` request. Use `shared_ptr` rather than `unique_ptr` to store `SendArgument` in `OpSendMsg` because the producer might need to resend the message so the `SendArgument` object could be shared by `ProducerImpl` and `ClientConnection`. This patch is more like a refactor because the compiler optimization might reduce unnecessary copying. --- lib/BatchMessageContainer.cc | 13 +- lib/BatchMessageContainer.h | 13 +- lib/BatchMessageContainerBase.cc | 57 ++------- lib/BatchMessageContainerBase.h | 51 ++------ lib/BatchMessageKeyBasedContainer.cc | 29 ++--- lib/BatchMessageKeyBasedContainer.h | 12 +- lib/ClientConnection.cc | 46 +++---- lib/ClientConnection.h | 6 +- lib/Commands.cc | 16 ++- lib/Commands.h | 6 +- lib/MessageAndCallbackBatch.cc | 13 +- lib/MessageAndCallbackBatch.h | 10 +- lib/OpSendMsg.h | 85 ++++++++----- lib/ProducerImpl.cc | 183 ++++++++++++++------------- lib/ProducerImpl.h | 12 +- 15 files changed, 250 insertions(+), 302 deletions(-) diff --git a/lib/BatchMessageContainer.cc b/lib/BatchMessageContainer.cc index ae0425e2..777b2c35 100644 --- a/lib/BatchMessageContainer.cc +++ b/lib/BatchMessageContainer.cc @@ -21,6 +21,7 @@ #include #include "LogUtils.h" +#include "OpSendMsg.h" DECLARE_LOG_OBJECT() @@ -52,14 +53,10 @@ void BatchMessageContainer::clear() { LOG_DEBUG(*this << " clear() called"); } -Result BatchMessageContainer::createOpSendMsg(OpSendMsg& opSendMsg, - const FlushCallback& flushCallback) const { - return createOpSendMsgHelper(opSendMsg, flushCallback, batch_); -} - -std::vector BatchMessageContainer::createOpSendMsgs(std::vector& opSendMsgs, - const FlushCallback& flushCallback) const { - throw std::runtime_error("createOpSendMsgs is not supported for BatchMessageContainer"); +std::unique_ptr BatchMessageContainer::createOpSendMsg(const FlushCallback& flushCallback) { + auto op = createOpSendMsgHelper(flushCallback, batch_); + clear(); + return op; } void BatchMessageContainer::serialize(std::ostream& os) const { diff --git a/lib/BatchMessageContainer.h b/lib/BatchMessageContainer.h index cd8a62cb..5b1213c0 100644 --- a/lib/BatchMessageContainer.h +++ b/lib/BatchMessageContainer.h @@ -39,25 +39,22 @@ class BatchMessageContainer : public BatchMessageContainerBase { ~BatchMessageContainer(); - size_t getNumBatches() const override { return 1; } + bool hasMultiOpSendMsgs() const override { return false; } bool isFirstMessageToAdd(const Message& msg) const override { return batch_.empty(); } bool add(const Message& msg, const SendCallback& callback) override; - void clear() override; - - Result createOpSendMsg(OpSendMsg& opSendMsg, const FlushCallback& flushCallback) const override; - - std::vector createOpSendMsgs(std::vector& opSendMsgs, - const FlushCallback& flushCallback) const override; - void serialize(std::ostream& os) const override; + std::unique_ptr createOpSendMsg(const FlushCallback& flushCallback) override; + private: MessageAndCallbackBatch batch_; size_t numberOfBatchesSent_ = 0; double averageBatchSize_ = 0; + + void clear() override; }; } // namespace pulsar diff --git a/lib/BatchMessageContainerBase.cc b/lib/BatchMessageContainerBase.cc index 807a2615..96ea94bc 100644 --- a/lib/BatchMessageContainerBase.cc +++ b/lib/BatchMessageContainerBase.cc @@ -37,23 +37,13 @@ BatchMessageContainerBase::BatchMessageContainerBase(const ProducerImpl& produce producerId_(producer.producerId_), msgCryptoWeakPtr_(producer.msgCrypto_) {} -Result BatchMessageContainerBase::createOpSendMsgHelper(OpSendMsg& opSendMsg, - const FlushCallback& flushCallback, - const MessageAndCallbackBatch& batch) const { - opSendMsg.sendCallback_ = batch.createSendCallback(); - opSendMsg.messagesCount_ = batch.messagesCount(); - opSendMsg.messagesSize_ = batch.messagesSize(); - - if (flushCallback) { - auto sendCallback = opSendMsg.sendCallback_; - opSendMsg.sendCallback_ = [sendCallback, flushCallback](Result result, const MessageId& id) { - sendCallback(result, id); - flushCallback(result); - }; - } +BatchMessageContainerBase::~BatchMessageContainerBase() {} +std::unique_ptr BatchMessageContainerBase::createOpSendMsgHelper( + const FlushCallback& flushCallback, const MessageAndCallbackBatch& batch) const { + auto sendCallback = batch.createSendCallback(flushCallback); if (batch.empty()) { - return ResultOperationNotSupported; + return OpSendMsg::create(ResultOperationNotSupported, std::move(sendCallback)); } MessageImplPtr impl = batch.msgImpl(); @@ -70,45 +60,18 @@ Result BatchMessageContainerBase::createOpSendMsgHelper(OpSendMsg& opSendMsg, SharedBuffer encryptedPayload; if (!msgCrypto->encrypt(producerConfig_.getEncryptionKeys(), producerConfig_.getCryptoKeyReader(), impl->metadata, impl->payload, encryptedPayload)) { - return ResultCryptoError; + return OpSendMsg::create(ResultCryptoError, std::move(sendCallback)); } impl->payload = encryptedPayload; } if (impl->payload.readableBytes() > ClientConnection::getMaxMessageSize()) { - return ResultMessageTooBig; + return OpSendMsg::create(ResultMessageTooBig, std::move(sendCallback)); } - opSendMsg.metadata_ = impl->metadata; - opSendMsg.payload_ = impl->payload; - opSendMsg.sequenceId_ = impl->metadata.sequence_id(); - opSendMsg.producerId_ = producerId_; - opSendMsg.timeout_ = TimeUtils::now() + milliseconds(producerConfig_.getSendTimeout()); - - return ResultOk; -} - -void BatchMessageContainerBase::processAndClear( - std::function opSendMsgCallback, FlushCallback flushCallback) { - if (isEmpty()) { - if (flushCallback) { - // do nothing, flushCallback complete until the lastOpSend complete - } - } else { - const auto numBatches = getNumBatches(); - if (numBatches == 1) { - OpSendMsg opSendMsg; - Result result = createOpSendMsg(opSendMsg, flushCallback); - opSendMsgCallback(result, opSendMsg); - } else if (numBatches > 1) { - std::vector opSendMsgs; - std::vector results = createOpSendMsgs(opSendMsgs, flushCallback); - for (size_t i = 0; i < results.size(); i++) { - opSendMsgCallback(results[i], opSendMsgs[i]); - } - } // else numBatches is 0, do nothing - } - clear(); + return OpSendMsg::create(impl->metadata, batch.messagesCount(), batch.messagesSize(), + producerConfig_.getSendTimeout(), batch.createSendCallback(flushCallback), + nullptr, producerId_, impl->payload); } } // namespace pulsar diff --git a/lib/BatchMessageContainerBase.h b/lib/BatchMessageContainerBase.h index fe4e5df7..fb460196 100644 --- a/lib/BatchMessageContainerBase.h +++ b/lib/BatchMessageContainerBase.h @@ -26,6 +26,7 @@ #include #include +#include #include namespace pulsar { @@ -44,14 +45,9 @@ class BatchMessageContainerBase : public boost::noncopyable { public: BatchMessageContainerBase(const ProducerImpl& producer); - virtual ~BatchMessageContainerBase() {} + virtual ~BatchMessageContainerBase(); - /** - * Get number of batches in the batch message container - * - * @return number of batches - */ - virtual size_t getNumBatches() const = 0; + virtual bool hasMultiOpSendMsgs() const = 0; /** * Check the message will be the 1st message to be added to the batch @@ -73,32 +69,14 @@ class BatchMessageContainerBase : public boost::noncopyable { */ virtual bool add(const Message& msg, const SendCallback& callback) = 0; - /** - * Clear the batch message container - */ - virtual void clear() = 0; - - /** - * Create a OpSendMsg object to send - * - * @param opSendMsg the OpSendMsg object to create - * @param flushCallback the callback to trigger after the OpSendMsg was completed - * @return ResultOk if create successfully - * @note OpSendMsg's sendCallback_ must be set even if it failed - */ - virtual Result createOpSendMsg(OpSendMsg& opSendMsg, - const FlushCallback& flushCallback = nullptr) const = 0; + virtual std::unique_ptr createOpSendMsg(const FlushCallback& flushCallback = nullptr) { + throw std::runtime_error("createOpSendMsg is not supported"); + } - /** - * Create a OpSendMsg list to send - * - * @param opSendMsgList the OpSendMsg list to create - * @param flushCallback the callback to trigger after the OpSendMsg was completed - * @return all create results of `opSendMsgs`, ResultOk means create successfully - * @note OpSendMsg's sendCallback_ must be set even if it failed - */ - virtual std::vector createOpSendMsgs(std::vector& opSendMsgs, - const FlushCallback& flushCallback = nullptr) const = 0; + virtual std::vector> createOpSendMsgs( + const FlushCallback& flushCallback = nullptr) { + throw std::runtime_error("createOpSendMsgs is not supported"); + } /** * Serialize into a std::ostream for logging @@ -110,9 +88,6 @@ class BatchMessageContainerBase : public boost::noncopyable { bool hasEnoughSpace(const Message& msg) const noexcept; bool isEmpty() const noexcept; - void processAndClear(std::function opSendMsgCallback, - FlushCallback flushCallback); - protected: // references to ProducerImpl's fields const std::shared_ptr topicName_; @@ -134,8 +109,10 @@ class BatchMessageContainerBase : public boost::noncopyable { void updateStats(const Message& msg); void resetStats(); - Result createOpSendMsgHelper(OpSendMsg& opSendMsg, const FlushCallback& flushCallback, - const MessageAndCallbackBatch& batch) const; + std::unique_ptr createOpSendMsgHelper(const FlushCallback& flushCallback, + const MessageAndCallbackBatch& batch) const; + + virtual void clear() = 0; }; inline bool BatchMessageContainerBase::hasEnoughSpace(const Message& msg) const noexcept { diff --git a/lib/BatchMessageKeyBasedContainer.cc b/lib/BatchMessageKeyBasedContainer.cc index 05baf342..e88674b5 100644 --- a/lib/BatchMessageKeyBasedContainer.cc +++ b/lib/BatchMessageKeyBasedContainer.cc @@ -72,16 +72,8 @@ void BatchMessageKeyBasedContainer::clear() { LOG_DEBUG(*this << " clear() called"); } -Result BatchMessageKeyBasedContainer::createOpSendMsg(OpSendMsg& opSendMsg, - const FlushCallback& flushCallback) const { - if (batches_.size() < 1) { - return ResultOperationNotSupported; - } - return createOpSendMsgHelper(opSendMsg, flushCallback, batches_.begin()->second); -} - -std::vector BatchMessageKeyBasedContainer::createOpSendMsgs( - std::vector& opSendMsgs, const FlushCallback& flushCallback) const { +std::vector> BatchMessageKeyBasedContainer::createOpSendMsgs( + const FlushCallback& flushCallback) { // Sorted the batches by sequence id std::vector sortedBatches; for (const auto& kv : batches_) { @@ -92,18 +84,15 @@ std::vector BatchMessageKeyBasedContainer::createOpSendMsgs( return lhs->sequenceId() < rhs->sequenceId(); }); - size_t numBatches = sortedBatches.size(); - opSendMsgs.resize(numBatches); - - std::vector results(numBatches); - for (size_t i = 0; i + 1 < numBatches; i++) { - results[i] = createOpSendMsgHelper(opSendMsgs[i], nullptr, *sortedBatches[i]); + std::vector> opSendMsgs{sortedBatches.size()}; + for (size_t i = 0; i + 1 < opSendMsgs.size(); i++) { + opSendMsgs[i].reset(createOpSendMsgHelper(nullptr, *sortedBatches[i]).release()); } - if (numBatches > 0) { - // Add flush callback to the last batch - results.back() = createOpSendMsgHelper(opSendMsgs.back(), flushCallback, *sortedBatches.back()); + if (!opSendMsgs.empty()) { + opSendMsgs.back().reset(createOpSendMsgHelper(flushCallback, *sortedBatches.back()).release()); } - return results; + clear(); + return opSendMsgs; } void BatchMessageKeyBasedContainer::serialize(std::ostream& os) const { diff --git a/lib/BatchMessageKeyBasedContainer.h b/lib/BatchMessageKeyBasedContainer.h index f580a053..e534fbaa 100644 --- a/lib/BatchMessageKeyBasedContainer.h +++ b/lib/BatchMessageKeyBasedContainer.h @@ -32,18 +32,13 @@ class BatchMessageKeyBasedContainer : public BatchMessageContainerBase { ~BatchMessageKeyBasedContainer(); - size_t getNumBatches() const override { return batches_.size(); } + bool hasMultiOpSendMsgs() const override { return true; } bool isFirstMessageToAdd(const Message& msg) const override; bool add(const Message& msg, const SendCallback& callback) override; - void clear() override; - - Result createOpSendMsg(OpSendMsg& opSendMsg, const FlushCallback& flushCallback) const override; - - std::vector createOpSendMsgs(std::vector& opSendMsgs, - const FlushCallback& flushCallback) const override; + std::vector> createOpSendMsgs(const FlushCallback& flushCallback) override; void serialize(std::ostream& os) const override; @@ -53,8 +48,7 @@ class BatchMessageKeyBasedContainer : public BatchMessageContainerBase { size_t numberOfBatchesSent_ = 0; double averageBatchSize_ = 0; - Result createOpSendMsg(OpSendMsg& opSendMsg, const FlushCallback& flushCallback, - MessageAndCallbackBatch& batch) const; + void clear() override; }; } // namespace pulsar diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc index 2a63c7ab..4e1a667c 100644 --- a/lib/ClientConnection.cc +++ b/lib/ClientConnection.cc @@ -1025,37 +1025,30 @@ void ClientConnection::sendCommandInternal(const SharedBuffer& cmd) { std::bind(&ClientConnection::handleSend, shared_from_this(), std::placeholders::_1, cmd))); } -void ClientConnection::sendMessage(const OpSendMsg& opSend) { +void ClientConnection::sendMessage(const std::shared_ptr& args) { Lock lock(mutex_); - - if (pendingWriteOperations_++ == 0) { - // Write immediately to socket - if (tlsSocket_) { + if (pendingWriteOperations_++ > 0) { + pendingWriteBuffers_.emplace_back(args); + return; + } + auto self = shared_from_this(); + auto sendMessageInternal = [this, self, args] { + BaseCommand outgoingCmd; + auto buffer = Commands::newSend(outgoingBuffer_, outgoingCmd, getChecksumType(), *args); + asyncWrite(buffer, customAllocReadHandler(std::bind(&ClientConnection::handleSendPair, + shared_from_this(), std::placeholders::_1))); + }; + if (tlsSocket_) { #if BOOST_VERSION >= 106600 - boost::asio::post(strand_, - std::bind(&ClientConnection::sendMessageInternal, shared_from_this(), opSend)); + boost::asio::post(strand_, sendMessageInternal); #else - strand_.post(std::bind(&ClientConnection::sendMessageInternal, shared_from_this(), opSend)); + strand_.post(sendMessageInternal); #endif - } else { - sendMessageInternal(opSend); - } } else { - // Queue to send later - pendingWriteBuffers_.push_back(opSend); + sendMessageInternal(); } } -void ClientConnection::sendMessageInternal(const OpSendMsg& opSend) { - BaseCommand outgoingCmd; - PairSharedBuffer buffer = - Commands::newSend(outgoingBuffer_, outgoingCmd, opSend.producerId_, opSend.sequenceId_, - getChecksumType(), opSend.metadata_, opSend.payload_); - - asyncWrite(buffer, customAllocWriteHandler(std::bind(&ClientConnection::handleSendPair, - shared_from_this(), std::placeholders::_1))); -} - void ClientConnection::handleSend(const boost::system::error_code& err, const SharedBuffer&) { if (err) { LOG_WARN(cnxString_ << "Could not send message on connection: " << err << " " << err.message()); @@ -1088,13 +1081,12 @@ void ClientConnection::sendPendingCommands() { customAllocWriteHandler(std::bind(&ClientConnection::handleSend, shared_from_this(), std::placeholders::_1, buffer))); } else { - assert(any.type() == typeid(OpSendMsg)); + assert(any.type() == typeid(std::shared_ptr)); - const OpSendMsg& op = boost::any_cast(any); + auto args = boost::any_cast>(any); BaseCommand outgoingCmd; PairSharedBuffer buffer = - Commands::newSend(outgoingBuffer_, outgoingCmd, op.producerId_, op.sequenceId_, - getChecksumType(), op.metadata_, op.payload_); + Commands::newSend(outgoingBuffer_, outgoingCmd, getChecksumType(), *args); asyncWrite(buffer, customAllocWriteHandler(std::bind(&ClientConnection::handleSendPair, shared_from_this(), std::placeholders::_1))); diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h index 38b814ce..9abff9d4 100644 --- a/lib/ClientConnection.h +++ b/lib/ClientConnection.h @@ -69,8 +69,7 @@ typedef std::weak_ptr ConsumerImplWeakPtr; class LookupDataResult; class BrokerConsumerStatsImpl; class PeriodicTask; - -struct OpSendMsg; +struct SendArguments; namespace proto { class BaseCommand; @@ -153,8 +152,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this& args); void registerProducer(int producerId, ProducerImplPtr producer); void registerConsumer(int consumerId, ConsumerImplPtr consumer); diff --git a/lib/Commands.cc b/lib/Commands.cc index d9251a0a..5b1734c3 100644 --- a/lib/Commands.cc +++ b/lib/Commands.cc @@ -32,6 +32,7 @@ #include "ChunkMessageIdImpl.h" #include "LogUtils.h" #include "MessageImpl.h" +#include "OpSendMsg.h" #include "PulsarApi.pb.h" #include "Url.h" #include "checksum/ChecksumProvider.h" @@ -193,13 +194,13 @@ SharedBuffer Commands::newConsumerStats(uint64_t consumerId, uint64_t requestId) return buffer; } -PairSharedBuffer Commands::newSend(SharedBuffer& headers, BaseCommand& cmd, uint64_t producerId, - uint64_t sequenceId, ChecksumType checksumType, - const proto::MessageMetadata& metadata, const SharedBuffer& payload) { +PairSharedBuffer Commands::newSend(SharedBuffer& headers, BaseCommand& cmd, ChecksumType checksumType, + const SendArguments& args) { cmd.set_type(BaseCommand::SEND); CommandSend* send = cmd.mutable_send(); - send->set_producer_id(producerId); - send->set_sequence_id(sequenceId); + send->set_producer_id(args.producerId); + send->set_sequence_id(args.sequenceId); + const auto& metadata = args.metadata; if (metadata.has_num_messages_in_batch()) { send->set_num_messages(metadata.num_messages_in_batch()); } @@ -210,8 +211,9 @@ PairSharedBuffer Commands::newSend(SharedBuffer& headers, BaseCommand& cmd, uint // / Wire format // [TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [PAYLOAD] - int cmdSize = cmd.ByteSize(); - int msgMetadataSize = metadata.ByteSize(); + int cmdSize = cmd.ByteSizeLong(); + int msgMetadataSize = metadata.ByteSizeLong(); + const auto& payload = args.payload; int payloadSize = payload.readableBytes(); int magicAndChecksumLength = (Crc32c == (checksumType)) ? (2 + 4 /* magic + checksumLength*/) : 0; diff --git a/lib/Commands.h b/lib/Commands.h index c21adb4f..22a4b7bd 100644 --- a/lib/Commands.h +++ b/lib/Commands.h @@ -40,6 +40,7 @@ using BatchMessageAckerPtr = std::shared_ptr; class MessageIdImpl; using MessageIdImplPtr = std::shared_ptr; class BitSet; +struct SendArguments; namespace proto { class BaseCommand; @@ -98,9 +99,8 @@ class Commands { static SharedBuffer newGetSchema(const std::string& topic, const std::string& version, uint64_t requestId); - static PairSharedBuffer newSend(SharedBuffer& headers, proto::BaseCommand& cmd, uint64_t producerId, - uint64_t sequenceId, ChecksumType checksumType, - const proto::MessageMetadata& metadata, const SharedBuffer& payload); + static PairSharedBuffer newSend(SharedBuffer& headers, proto::BaseCommand& cmd, ChecksumType checksumType, + const SendArguments& args); static SharedBuffer newSubscribe( const std::string& topic, const std::string& subscription, uint64_t consumerId, uint64_t requestId, diff --git a/lib/MessageAndCallbackBatch.cc b/lib/MessageAndCallbackBatch.cc index 56725389..e17330c1 100644 --- a/lib/MessageAndCallbackBatch.cc +++ b/lib/MessageAndCallbackBatch.cc @@ -64,10 +64,17 @@ void MessageAndCallbackBatch::complete(Result result, const MessageId& id) const completeSendCallbacks(callbacks_, result, id); } -SendCallback MessageAndCallbackBatch::createSendCallback() const { +SendCallback MessageAndCallbackBatch::createSendCallback(const FlushCallback& flushCallback) const { const auto& callbacks = callbacks_; - return [callbacks] // save a copy of `callbacks_` - (Result result, const MessageId& id) { completeSendCallbacks(callbacks, result, id); }; + if (flushCallback) { + return [callbacks, flushCallback](Result result, const MessageId& id) { + completeSendCallbacks(callbacks, result, id); + flushCallback(result); + }; + } else { + return [callbacks] // save a copy of `callbacks_` + (Result result, const MessageId& id) { completeSendCallbacks(callbacks, result, id); }; + } } } // namespace pulsar diff --git a/lib/MessageAndCallbackBatch.h b/lib/MessageAndCallbackBatch.h index 3d107c63..e8717b3e 100644 --- a/lib/MessageAndCallbackBatch.h +++ b/lib/MessageAndCallbackBatch.h @@ -30,6 +30,7 @@ namespace pulsar { class MessageImpl; using MessageImplPtr = std::shared_ptr; +using FlushCallback = std::function; class MessageAndCallbackBatch : public boost::noncopyable { public: @@ -58,14 +59,7 @@ class MessageAndCallbackBatch : public boost::noncopyable { */ void complete(Result result, const MessageId& id) const; - /** - * Create a single callback to trigger all the internal callbacks in order - * It's used when you want to clear and add new messages and callbacks but current callbacks need to be - * triggered later. - * - * @return the merged send callback - */ - SendCallback createSendCallback() const; + SendCallback createSendCallback(const FlushCallback& flushCallback) const; const MessageImplPtr& msgImpl() const { return msgImpl_; } uint64_t sequenceId() const noexcept { return sequenceId_; } diff --git a/lib/OpSendMsg.h b/lib/OpSendMsg.h index d365b906..0b062852 100644 --- a/lib/OpSendMsg.h +++ b/lib/OpSendMsg.h @@ -21,6 +21,7 @@ #include #include +#include #include @@ -31,46 +32,72 @@ namespace pulsar { -struct OpSendMsg { - proto::MessageMetadata metadata_; - SharedBuffer payload_; - SendCallback sendCallback_; - uint64_t producerId_; - uint64_t sequenceId_; - boost::posix_time::ptime timeout_; - uint32_t messagesCount_; - uint64_t messagesSize_; - std::vector> trackerCallbacks_; - ChunkMessageIdImplPtr chunkedMessageId_; +struct SendArguments { + const uint64_t producerId; + const uint64_t sequenceId; + const proto::MessageMetadata metadata; + const SharedBuffer payload; - OpSendMsg() = default; + SendArguments(uint64_t producerId, uint64_t sequenceId, const proto::MessageMetadata& metadata, + const SharedBuffer& payload) + : producerId(producerId), sequenceId(sequenceId), metadata(metadata), payload(payload) {} + SendArguments(const SendArguments&) = delete; + SendArguments& operator=(const SendArguments&) = delete; +}; - OpSendMsg(const proto::MessageMetadata& metadata, const SharedBuffer& payload, - const SendCallback& sendCallback, uint64_t producerId, uint64_t sequenceId, int sendTimeoutMs, - uint32_t messagesCount, uint64_t messagesSize, ChunkMessageIdImplPtr chunkedMessageId = nullptr) - : metadata_(metadata), // the copy happens here because OpSendMsg of chunks are constructed with - // a shared metadata object - payload_(payload), - sendCallback_(sendCallback), - producerId_(producerId), - sequenceId_(sequenceId), - timeout_(TimeUtils::now() + milliseconds(sendTimeoutMs)), - messagesCount_(messagesCount), - messagesSize_(messagesSize), - chunkedMessageId_(chunkedMessageId) {} +struct OpSendMsg { + const Result result; + const int32_t chunkId; + const int32_t numChunks; + const uint32_t messagesCount; + const uint64_t messagesSize; + const boost::posix_time::ptime timeout; + const SendCallback sendCallback; + std::vector> trackerCallbacks; + ChunkMessageIdImplPtr chunkedMessageId; + // Use shared_ptr here because producer might resend the message with the same arguments + const std::shared_ptr sendArgs; + + template + static std::unique_ptr create(Args&&... args) { + return std::unique_ptr(new OpSendMsg(std::forward(args)...)); + } void complete(Result result, const MessageId& messageId) const { - if (sendCallback_) { - sendCallback_(result, messageId); + if (sendCallback) { + sendCallback(result, messageId); } - for (const auto& trackerCallback : trackerCallbacks_) { + for (const auto& trackerCallback : trackerCallbacks) { trackerCallback(result); } } void addTrackerCallback(std::function trackerCallback) { - trackerCallbacks_.emplace_back(trackerCallback); + trackerCallbacks.emplace_back(trackerCallback); } + + private: + OpSendMsg(Result result, SendCallback&& callback) + : result(result), + chunkId(-1), + numChunks(-1), + messagesCount(0), + messagesSize(0), + sendCallback(std::move(callback)), + sendArgs(nullptr) {} + + OpSendMsg(const proto::MessageMetadata& metadata, uint32_t messagesCount, uint64_t messagesSize, + int sendTimeoutMs, SendCallback&& callback, ChunkMessageIdImplPtr chunkedMessageId, + uint64_t producerId, SharedBuffer payload) + : result(ResultOk), + chunkId(metadata.chunk_id()), + numChunks(metadata.num_chunks_from_msg()), + messagesCount(messagesCount), + messagesSize(messagesSize), + timeout(TimeUtils::now() + boost::posix_time::milliseconds(sendTimeoutMs)), + sendCallback(std::move(callback)), + chunkedMessageId(chunkedMessageId), + sendArgs(new SendArguments(producerId, metadata.sequence_id(), metadata, payload)) {} }; } // namespace pulsar diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc index 71559ffa..d8f02be6 100644 --- a/lib/ProducerImpl.cc +++ b/lib/ProducerImpl.cc @@ -45,16 +45,6 @@ namespace pulsar { DECLARE_LOG_OBJECT() -struct ProducerImpl::PendingCallbacks { - std::vector opSendMsgs; - - void complete(Result result) { - for (const auto& opSendMsg : opSendMsgs) { - opSendMsg.complete(result, {}); - } - } -}; - ProducerImpl::ProducerImpl(ClientImplPtr client, const TopicName& topicName, const ProducerConfiguration& conf, const ProducerInterceptorsPtr& interceptors, int32_t partition) @@ -64,7 +54,6 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const TopicName& topicName, milliseconds(std::max(100, conf.getSendTimeout() - 100)))), conf_(conf), semaphore_(), - pendingMessagesQueue_(), partition_(partition), producerName_(conf_.getProducerName()), userProvidedProducerName_(false), @@ -297,43 +286,46 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r } } -std::shared_ptr ProducerImpl::getPendingCallbacksWhenFailed() { - auto callbacks = std::make_shared(); - callbacks->opSendMsgs.reserve(pendingMessagesQueue_.size()); +auto ProducerImpl::getPendingCallbacksWhenFailed() -> decltype(pendingMessagesQueue_) { + decltype(pendingMessagesQueue_) pendingMessages; LOG_DEBUG(getName() << "# messages in pending queue : " << pendingMessagesQueue_.size()); - // Iterate over a copy of the pending messages queue, to trigger the future completion - // without holding producer mutex. - for (auto& op : pendingMessagesQueue_) { - callbacks->opSendMsgs.push_back(op); - releaseSemaphoreForSendOp(op); + pendingMessages.swap(pendingMessagesQueue_); + for (const auto& op : pendingMessages) { + releaseSemaphoreForSendOp(*op); } - if (batchMessageContainer_) { - batchMessageContainer_->processAndClear( - [this, &callbacks](Result result, const OpSendMsg& opSendMsg) { - if (result == ResultOk) { - callbacks->opSendMsgs.emplace_back(opSendMsg); - } - releaseSemaphoreForSendOp(opSendMsg); - }, - nullptr); + if (!batchMessageContainer_ || batchMessageContainer_->isEmpty()) { + return pendingMessages; } - pendingMessagesQueue_.clear(); - return callbacks; + auto handleOp = [this, &pendingMessages](std::unique_ptr&& op) { + releaseSemaphoreForSendOp(*op); + if (op->result == ResultOk) { + pendingMessages.emplace_back(std::move(op)); + } + }; + + if (batchMessageContainer_->hasMultiOpSendMsgs()) { + auto opSendMsgs = batchMessageContainer_->createOpSendMsgs(); + for (auto&& op : opSendMsgs) { + handleOp(std::move(op)); + } + } else { + handleOp(batchMessageContainer_->createOpSendMsg()); + } + return pendingMessages; } -std::shared_ptr ProducerImpl::getPendingCallbacksWhenFailedWithLock() { +auto ProducerImpl::getPendingCallbacksWhenFailedWithLock() -> decltype(pendingMessagesQueue_) { Lock lock(mutex_); return getPendingCallbacksWhenFailed(); } void ProducerImpl::failPendingMessages(Result result, bool withLock) { - if (withLock) { - getPendingCallbacksWhenFailedWithLock()->complete(result); - } else { - getPendingCallbacksWhenFailed()->complete(result); + auto opSendMsgs = withLock ? getPendingCallbacksWhenFailedWithLock() : getPendingCallbacksWhenFailed(); + for (const auto& op : opSendMsgs) { + op->complete(result, {}); } } @@ -345,8 +337,8 @@ void ProducerImpl::resendMessages(ClientConnectionPtr cnx) { LOG_DEBUG(getName() << "Re-Sending " << pendingMessagesQueue_.size() << " messages to server"); for (const auto& op : pendingMessagesQueue_) { - LOG_DEBUG(getName() << "Re-Sending " << op.sequenceId_); - cnx->sendMessage(op); + LOG_DEBUG(getName() << "Re-Sending " << op->sendArgs->sequenceId); + cnx->sendMessage(op->sendArgs); } } @@ -378,7 +370,7 @@ void ProducerImpl::flushAsync(FlushCallback callback) { auto& opSendMsg = pendingMessagesQueue_.back(); lock.unlock(); failures.complete(); - opSendMsg.addTrackerCallback(callback); + opSendMsg->addTrackerCallback(callback); } else { lock.unlock(); failures.complete(); @@ -389,7 +381,7 @@ void ProducerImpl::flushAsync(FlushCallback callback) { if (!pendingMessagesQueue_.empty()) { auto& opSendMsg = pendingMessagesQueue_.back(); lock.unlock(); - opSendMsg.addTrackerCallback(callback); + opSendMsg->addTrackerCallback(callback); } else { lock.unlock(); callback(ResultOk); @@ -462,7 +454,7 @@ void ProducerImpl::sendAsync(const Message& msg, SendCallback callback) { }); } -void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, const SendCallback& callback) { +void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, SendCallback&& callback) { if (!isValidProducerState(callback)) { return; } @@ -601,17 +593,18 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, const SendCallba handleFailedResult(ResultCryptoError); return; } - OpSendMsg op{msgMetadata, encryptedPayload, (chunkId == totalChunks - 1) ? callback : nullptr, - producerId_, sequenceId, conf_.getSendTimeout(), - 1, uncompressedSize, chunkMessageId}; + + auto op = OpSendMsg::create(msgMetadata, 1, uncompressedSize, conf_.getSendTimeout(), + (chunkId == totalChunks - 1) ? callback : nullptr, chunkMessageId, + producerId_, encryptedPayload); if (!chunkingEnabled_) { - const uint32_t msgMetadataSize = op.metadata_.ByteSize(); - const uint32_t payloadSize = op.payload_.readableBytes(); + const uint32_t msgMetadataSize = op->sendArgs->metadata.ByteSizeLong(); + const uint32_t payloadSize = op->sendArgs->payload.readableBytes(); const uint32_t msgHeadersAndPayloadSize = msgMetadataSize + payloadSize; if (msgHeadersAndPayloadSize > maxMessageSize) { lock.unlock(); - releaseSemaphoreForSendOp(op); + releaseSemaphoreForSendOp(*op); LOG_WARN(getName() << " - compressed Message size " << msgHeadersAndPayloadSize << " cannot exceed " << maxMessageSize << " bytes unless chunking is enabled"); @@ -620,7 +613,7 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, const SendCallba } } - sendMessage(op); + sendMessage(std::move(op)); } } } @@ -667,10 +660,10 @@ void ProducerImpl::releaseSemaphore(uint32_t payloadSize) { void ProducerImpl::releaseSemaphoreForSendOp(const OpSendMsg& op) { if (semaphore_) { - semaphore_->release(op.messagesCount_); + semaphore_->release(op.messagesCount); } - memoryLimitController_.releaseMemory(op.messagesSize_); + memoryLimitController_.releaseMemory(op.messagesSize); } // It must be called while `mutex_` is acquired @@ -678,37 +671,50 @@ PendingFailures ProducerImpl::batchMessageAndSend(const FlushCallback& flushCall PendingFailures failures; LOG_DEBUG("batchMessageAndSend " << *batchMessageContainer_); batchTimer_->cancel(); + if (batchMessageContainer_->isEmpty()) { + return failures; + } - batchMessageContainer_->processAndClear( - [this, &failures](Result result, const OpSendMsg& opSendMsg) { - if (result == ResultOk) { - sendMessage(opSendMsg); - } else { - // A spot has been reserved for this batch, but the batch failed to be pushed to the queue, so - // we need to release the spot manually - LOG_ERROR("batchMessageAndSend | Failed to createOpSendMsg: " << result); - releaseSemaphoreForSendOp(opSendMsg); - failures.add([opSendMsg, result] { opSendMsg.complete(result, {}); }); - } - }, - flushCallback); + auto handleOp = [this, &failures](std::unique_ptr&& op) { + if (op->result == ResultOk) { + sendMessage(std::move(op)); + } else { + LOG_ERROR("batchMessageAndSend | Failed to createOpSendMsg: " << op->result); + releaseSemaphoreForSendOp(*op); + auto rawOpPtr = op.release(); + failures.add([rawOpPtr] { + std::unique_ptr op{rawOpPtr}; + op->complete(op->result, {}); + }); + } + }; + + if (batchMessageContainer_->hasMultiOpSendMsgs()) { + auto opSendMsgs = batchMessageContainer_->createOpSendMsgs(flushCallback); + for (auto&& op : opSendMsgs) { + handleOp(std::move(op)); + } + } else { + handleOp(batchMessageContainer_->createOpSendMsg(flushCallback)); + } return failures; } // Precondition - // a. we have a reserved spot on the queue // b. call this function after acquiring the ProducerImpl mutex_ -void ProducerImpl::sendMessage(const OpSendMsg& op) { - const auto sequenceId = op.metadata_.sequence_id(); +void ProducerImpl::sendMessage(std::unique_ptr opSendMsg) { + const auto sequenceId = opSendMsg->sendArgs->sequenceId; LOG_DEBUG("Inserting data to pendingMessagesQueue_"); - pendingMessagesQueue_.push_back(op); + auto args = opSendMsg->sendArgs; + pendingMessagesQueue_.emplace_back(std::move(opSendMsg)); ClientConnectionPtr cnx = getCnx().lock(); if (cnx) { // If we do have a connection, the message is sent immediately, otherwise // we'll try again once a new connection is established LOG_DEBUG(getName() << "Sending msg immediately - seq: " << sequenceId); - cnx->sendMessage(op); + cnx->sendMessage(args); } else { LOG_DEBUG(getName() << "Connection is not ready - seq: " << sequenceId); } @@ -808,7 +814,7 @@ void ProducerImpl::handleSendTimeout(const boost::system::error_code& err) { return; } - std::shared_ptr pendingCallbacks; + decltype(pendingMessagesQueue_) pendingMessages; if (pendingMessagesQueue_.empty()) { // If there are no pending messages, reset the timeout to the configured value. LOG_DEBUG(getName() << "Producer timeout triggered on empty pending message queue"); @@ -816,11 +822,11 @@ void ProducerImpl::handleSendTimeout(const boost::system::error_code& err) { } else { // If there is at least one message, calculate the diff between the message timeout and // the current time. - time_duration diff = pendingMessagesQueue_.front().timeout_ - TimeUtils::now(); + time_duration diff = pendingMessagesQueue_.front()->timeout - TimeUtils::now(); if (diff.total_milliseconds() <= 0) { // The diff is less than or equal to zero, meaning that the message has been expired. LOG_DEBUG(getName() << "Timer expired. Calling timeout callbacks."); - pendingCallbacks = getPendingCallbacksWhenFailed(); + pendingMessages = getPendingCallbacksWhenFailed(); // Since the pending queue is cleared now, set timer to expire after configured value. asyncWaitSendTimeout(milliseconds(conf_.getSendTimeout())); } else { @@ -831,8 +837,8 @@ void ProducerImpl::handleSendTimeout(const boost::system::error_code& err) { } lock.unlock(); - if (pendingCallbacks) { - pendingCallbacks->complete(ResultTimeout); + for (const auto& op : pendingMessages) { + op->complete(ResultTimeout, {}); } } @@ -844,8 +850,8 @@ bool ProducerImpl::removeCorruptMessage(uint64_t sequenceId) { return true; } - OpSendMsg op = pendingMessagesQueue_.front(); - uint64_t expectedSequenceId = op.sequenceId_; + std::unique_ptr op{std::move(pendingMessagesQueue_.front().release())}; + uint64_t expectedSequenceId = op->sendArgs->sequenceId; if (sequenceId > expectedSequenceId) { LOG_WARN(getName() << "Got ack failure for msg " << sequenceId // << " expecting: " << expectedSequenceId << " queue size=" // @@ -860,11 +866,11 @@ bool ProducerImpl::removeCorruptMessage(uint64_t sequenceId) { lock.unlock(); try { // to protect from client callback exception - op.complete(ResultChecksumError, {}); + op->complete(ResultChecksumError, {}); } catch (const std::exception& e) { LOG_ERROR(getName() << "Exception thrown from callback " << e.what()); } - releaseSemaphoreForSendOp(op); + releaseSemaphoreForSendOp(*op); return true; } } @@ -880,8 +886,14 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& rawMessageId) { return true; } - OpSendMsg op = pendingMessagesQueue_.front(); - uint64_t expectedSequenceId = op.sequenceId_; + const auto& op = *pendingMessagesQueue_.front(); + if (op.result != ResultOk) { + LOG_ERROR("Unexpected OpSendMsg whose result is " << op.result << " for " << sequenceId << " and " + << rawMessageId); + return false; + } + + uint64_t expectedSequenceId = op.sendArgs->sequenceId; if (sequenceId > expectedSequenceId) { LOG_WARN(getName() << "Got ack for msg " << sequenceId // << " expecting: " << expectedSequenceId << " queue size=" // @@ -898,24 +910,25 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& rawMessageId) { // Message was persisted correctly LOG_DEBUG(getName() << "Received ack for msg " << sequenceId); - if (op.chunkedMessageId_) { + if (op.chunkedMessageId) { // Handling the chunk message id. - if (op.metadata_.chunk_id() == 0) { - op.chunkedMessageId_->setFirstChunkMessageId(messageId); - } else if (op.metadata_.chunk_id() == op.metadata_.num_chunks_from_msg() - 1) { - op.chunkedMessageId_->setLastChunkMessageId(messageId); - messageId = op.chunkedMessageId_->build(); + if (op.chunkId == 0) { + op.chunkedMessageId->setFirstChunkMessageId(messageId); + } else if (op.chunkId == op.numChunks - 1) { + op.chunkedMessageId->setLastChunkMessageId(messageId); + messageId = op.chunkedMessageId->build(); } } releaseSemaphoreForSendOp(op); - lastSequenceIdPublished_ = sequenceId + op.messagesCount_ - 1; + lastSequenceIdPublished_ = sequenceId + op.messagesCount - 1; + std::unique_ptr opSendMsg{pendingMessagesQueue_.front().release()}; pendingMessagesQueue_.pop_front(); lock.unlock(); try { - op.complete(ResultOk, messageId); + opSendMsg->complete(ResultOk, messageId); } catch (const std::exception& e) { LOG_ERROR(getName() << "Exception thrown from callback " << e.what()); } diff --git a/lib/ProducerImpl.h b/lib/ProducerImpl.h index afc6346b..8611bfef 100644 --- a/lib/ProducerImpl.h +++ b/lib/ProducerImpl.h @@ -104,11 +104,9 @@ class ProducerImpl : public HandlerBase, protected: ProducerStatsBasePtr producerStatsBasePtr_; - typedef std::deque MessageQueue; - void setMessageMetadata(const Message& msg, const uint64_t& sequenceId, const uint32_t& uncompressedSize); - void sendMessage(const OpSendMsg& opSendMsg); + void sendMessage(std::unique_ptr opSendMsg); void startSendTimeoutTimer(); @@ -138,7 +136,7 @@ class ProducerImpl : public HandlerBase, bool encryptMessage(proto::MessageMetadata& metadata, SharedBuffer& payload, SharedBuffer& encryptedPayload); - void sendAsyncWithStatsUpdate(const Message& msg, const SendCallback& callback); + void sendAsyncWithStatsUpdate(const Message& msg, SendCallback&& callback); /** * Reserve a spot in the messages queue before acquiring the ProducerImpl mutex. When the queue is full, @@ -163,7 +161,7 @@ class ProducerImpl : public HandlerBase, ProducerConfiguration conf_; std::unique_ptr semaphore_; - MessageQueue pendingMessagesQueue_; + std::list> pendingMessagesQueue_; const int32_t partition_; // -1 if topic is non-partitioned std::string producerName_; @@ -187,8 +185,8 @@ class ProducerImpl : public HandlerBase, Promise producerCreatedPromise_; struct PendingCallbacks; - std::shared_ptr getPendingCallbacksWhenFailed(); - std::shared_ptr getPendingCallbacksWhenFailedWithLock(); + decltype(pendingMessagesQueue_) getPendingCallbacksWhenFailed(); + decltype(pendingMessagesQueue_) getPendingCallbacksWhenFailedWithLock(); void failPendingMessages(Result result, bool withLock); From 2723e1e524f7c3a1b45bbc982ca83c9ab9a2338e Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Mon, 28 Aug 2023 21:31:00 +0800 Subject: [PATCH 2/9] Avoid calling serializeSingleMessageInBatchWithPayload each time a message is added ### Motivation Currently, each time a message is added to the batch message container, `serializeSingleMessageInBatchWithPayload` will be called. In this method, if the payload buffer's size is not enough, it will grow twice. After batch is cleared, the payload buffer will be reset. For example, here is a typical buffer size increament during a period of a batch: ``` increase buffer size from 0 to 1033 increase buffer size from 1033 to 2066 increase buffer size from 2066 to 4132 increase buffer size from 3099 to 6198 increase buffer size from 5165 to 10330 increase buffer size from 9297 to 18594 increase buffer size from 17561 to 35122 increase buffer size from 34089 to 68178 increase buffer size from 67145 to 134290 ``` ### Modifications Refactor the `MessageAndCallbackBatch` design, in `add` method, only store the message and callback. Provide a `createOpSendMsg` method to serialize the messages and callbacks into a `OpSendMsg`. --- lib/BatchMessageContainer.cc | 5 +- lib/BatchMessageContainerBase.cc | 39 +---------- lib/BatchMessageContainerBase.h | 3 +- lib/BatchMessageKeyBasedContainer.cc | 25 ++++---- lib/Commands.cc | 5 +- lib/MessageAndCallbackBatch.cc | 96 +++++++++++++++++++--------- lib/MessageAndCallbackBatch.h | 42 ++++++------ lib/OpSendMsg.h | 6 +- 8 files changed, 111 insertions(+), 110 deletions(-) diff --git a/lib/BatchMessageContainer.cc b/lib/BatchMessageContainer.cc index 777b2c35..cd7ddc85 100644 --- a/lib/BatchMessageContainer.cc +++ b/lib/BatchMessageContainer.cc @@ -54,7 +54,10 @@ void BatchMessageContainer::clear() { } std::unique_ptr BatchMessageContainer::createOpSendMsg(const FlushCallback& flushCallback) { - auto op = createOpSendMsgHelper(flushCallback, batch_); + auto op = createOpSendMsgHelper(batch_); + if (flushCallback) { + op->addTrackerCallback(flushCallback); + } clear(); return op; } diff --git a/lib/BatchMessageContainerBase.cc b/lib/BatchMessageContainerBase.cc index 96ea94bc..c1b17270 100644 --- a/lib/BatchMessageContainerBase.cc +++ b/lib/BatchMessageContainerBase.cc @@ -18,14 +18,10 @@ */ #include "BatchMessageContainerBase.h" -#include "ClientConnection.h" -#include "CompressionCodec.h" #include "MessageAndCallbackBatch.h" #include "MessageCrypto.h" -#include "MessageImpl.h" #include "OpSendMsg.h" #include "ProducerImpl.h" -#include "PulsarApi.pb.h" #include "SharedBuffer.h" namespace pulsar { @@ -40,38 +36,9 @@ BatchMessageContainerBase::BatchMessageContainerBase(const ProducerImpl& produce BatchMessageContainerBase::~BatchMessageContainerBase() {} std::unique_ptr BatchMessageContainerBase::createOpSendMsgHelper( - const FlushCallback& flushCallback, const MessageAndCallbackBatch& batch) const { - auto sendCallback = batch.createSendCallback(flushCallback); - if (batch.empty()) { - return OpSendMsg::create(ResultOperationNotSupported, std::move(sendCallback)); - } - - MessageImplPtr impl = batch.msgImpl(); - impl->metadata.set_num_messages_in_batch(batch.size()); - auto compressionType = producerConfig_.getCompressionType(); - if (compressionType != CompressionNone) { - impl->metadata.set_compression(static_cast(compressionType)); - impl->metadata.set_uncompressed_size(impl->payload.readableBytes()); - } - impl->payload = CompressionCodecProvider::getCodec(compressionType).encode(impl->payload); - - auto msgCrypto = msgCryptoWeakPtr_.lock(); - if (msgCrypto && producerConfig_.isEncryptionEnabled()) { - SharedBuffer encryptedPayload; - if (!msgCrypto->encrypt(producerConfig_.getEncryptionKeys(), producerConfig_.getCryptoKeyReader(), - impl->metadata, impl->payload, encryptedPayload)) { - return OpSendMsg::create(ResultCryptoError, std::move(sendCallback)); - } - impl->payload = encryptedPayload; - } - - if (impl->payload.readableBytes() > ClientConnection::getMaxMessageSize()) { - return OpSendMsg::create(ResultMessageTooBig, std::move(sendCallback)); - } - - return OpSendMsg::create(impl->metadata, batch.messagesCount(), batch.messagesSize(), - producerConfig_.getSendTimeout(), batch.createSendCallback(flushCallback), - nullptr, producerId_, impl->payload); + MessageAndCallbackBatch& batch) const { + auto crypto = msgCryptoWeakPtr_.lock(); + return batch.createOpSendMsg(producerId_, producerConfig_, crypto.get()); } } // namespace pulsar diff --git a/lib/BatchMessageContainerBase.h b/lib/BatchMessageContainerBase.h index fb460196..cd17d6d6 100644 --- a/lib/BatchMessageContainerBase.h +++ b/lib/BatchMessageContainerBase.h @@ -109,8 +109,7 @@ class BatchMessageContainerBase : public boost::noncopyable { void updateStats(const Message& msg); void resetStats(); - std::unique_ptr createOpSendMsgHelper(const FlushCallback& flushCallback, - const MessageAndCallbackBatch& batch) const; + std::unique_ptr createOpSendMsgHelper(MessageAndCallbackBatch& flushCallback) const; virtual void clear() = 0; }; diff --git a/lib/BatchMessageKeyBasedContainer.cc b/lib/BatchMessageKeyBasedContainer.cc index e88674b5..20067363 100644 --- a/lib/BatchMessageKeyBasedContainer.cc +++ b/lib/BatchMessageKeyBasedContainer.cc @@ -74,22 +74,19 @@ void BatchMessageKeyBasedContainer::clear() { std::vector> BatchMessageKeyBasedContainer::createOpSendMsgs( const FlushCallback& flushCallback) { - // Sorted the batches by sequence id - std::vector sortedBatches; - for (const auto& kv : batches_) { - sortedBatches.emplace_back(&kv.second); + // Store raw pointers to use std::sort + std::vector rawOpSendMsgs; + for (auto& kv : batches_) { + rawOpSendMsgs.emplace_back(createOpSendMsgHelper(kv.second).release()); } - std::sort(sortedBatches.begin(), sortedBatches.end(), - [](const MessageAndCallbackBatch* lhs, const MessageAndCallbackBatch* rhs) { - return lhs->sequenceId() < rhs->sequenceId(); - }); + std::sort(rawOpSendMsgs.begin(), rawOpSendMsgs.end(), [](const OpSendMsg* lhs, const OpSendMsg* rhs) { + return lhs->sendArgs->sequenceId < rhs->sendArgs->sequenceId; + }); + rawOpSendMsgs.back()->addTrackerCallback(flushCallback); - std::vector> opSendMsgs{sortedBatches.size()}; - for (size_t i = 0; i + 1 < opSendMsgs.size(); i++) { - opSendMsgs[i].reset(createOpSendMsgHelper(nullptr, *sortedBatches[i]).release()); - } - if (!opSendMsgs.empty()) { - opSendMsgs.back().reset(createOpSendMsgHelper(flushCallback, *sortedBatches.back()).release()); + std::vector> opSendMsgs{rawOpSendMsgs.size()}; + for (size_t i = 0; i < opSendMsgs.size(); i++) { + opSendMsgs[i].reset(rawOpSendMsgs[i]); } clear(); return opSendMsgs; diff --git a/lib/Commands.cc b/lib/Commands.cc index 5b1734c3..b3362610 100644 --- a/lib/Commands.cc +++ b/lib/Commands.cc @@ -839,7 +839,8 @@ void Commands::initBatchMessageMetadata(const Message& msg, pulsar::proto::Messa uint64_t Commands::serializeSingleMessageInBatchWithPayload(const Message& msg, SharedBuffer& batchPayLoad, unsigned long maxMessageSizeInBytes) { const auto& msgMetadata = msg.impl_->metadata; - SingleMessageMetadata metadata; + thread_local SingleMessageMetadata metadata; + metadata.Clear(); if (msgMetadata.has_partition_key()) { metadata.set_partition_key(msgMetadata.partition_key()); } @@ -868,7 +869,7 @@ uint64_t Commands::serializeSingleMessageInBatchWithPayload(const Message& msg, int payloadSize = msg.impl_->payload.readableBytes(); metadata.set_payload_size(payloadSize); - int msgMetadataSize = metadata.ByteSize(); + auto msgMetadataSize = metadata.ByteSizeLong(); unsigned long requiredSpace = sizeof(uint32_t) + msgMetadataSize + payloadSize; if (batchPayLoad.writableBytes() <= sizeof(uint32_t) + msgMetadataSize + payloadSize) { diff --git a/lib/MessageAndCallbackBatch.cc b/lib/MessageAndCallbackBatch.cc index e17330c1..e9dc0e15 100644 --- a/lib/MessageAndCallbackBatch.cc +++ b/lib/MessageAndCallbackBatch.cc @@ -22,59 +22,95 @@ #include "ClientConnection.h" #include "Commands.h" -#include "LogUtils.h" -#include "MessageImpl.h" - -DECLARE_LOG_OBJECT() +#include "CompressionCodec.h" +#include "MessageCrypto.h" +#include "OpSendMsg.h" +#include "PulsarApi.pb.h" namespace pulsar { +MessageAndCallbackBatch::MessageAndCallbackBatch() {} + +MessageAndCallbackBatch::~MessageAndCallbackBatch() {} + void MessageAndCallbackBatch::add(const Message& msg, const SendCallback& callback) { - if (empty()) { - msgImpl_.reset(new MessageImpl); - Commands::initBatchMessageMetadata(msg, msgImpl_->metadata); + if (callbacks_.empty()) { + metadata_.reset(new proto::MessageMetadata); + Commands::initBatchMessageMetadata(msg, *metadata_); + sequenceId_ = metadata_->sequence_id(); } - LOG_DEBUG(" Before serialization payload size in bytes = " << msgImpl_->payload.readableBytes()); - sequenceId_ = Commands::serializeSingleMessageInBatchWithPayload(msg, msgImpl_->payload, - ClientConnection::getMaxMessageSize()); - LOG_DEBUG(" After serialization payload size in bytes = " << msgImpl_->payload.readableBytes()); + messages_.emplace_back(msg); callbacks_.emplace_back(callback); - - ++messagesCount_; messagesSize_ += msg.getLength(); } +std::unique_ptr MessageAndCallbackBatch::createOpSendMsg( + uint64_t producerId, const ProducerConfiguration& producerConfig, MessageCrypto* crypto) { + auto callback = createSendCallback(); + if (empty()) { + return OpSendMsg::create(ResultOperationNotSupported, std::move(callback)); + } + + // The magic number 64 is just an estimated size increment after setting some fields of the + // SingleMessageMetadata. It does not have to be accurate because it's only used to reduce the + // reallocation of the payload buffer. + static const size_t kEstimatedHeaderSize = + sizeof(uint32_t) + proto::MessageMetadata{}.ByteSizeLong() + 64; + const auto maxMessageSize = ClientConnection::getMaxMessageSize(); + // Estimate the buffer size just to avoid resizing the buffer + size_t maxBufferSize = kEstimatedHeaderSize * messages_.size(); + for (const auto& msg : messages_) { + maxBufferSize += msg.getLength(); + } + auto payload = SharedBuffer::allocate(maxBufferSize); + for (const auto& msg : messages_) { + sequenceId_ = Commands::serializeSingleMessageInBatchWithPayload(msg, payload, maxMessageSize); + } + metadata_->set_sequence_id(sequenceId_); + metadata_->set_num_messages_in_batch(messages_.size()); + auto compressionType = producerConfig.getCompressionType(); + if (compressionType != CompressionNone) { + metadata_->set_compression(static_cast(compressionType)); + metadata_->set_uncompressed_size(payload.readableBytes()); + } + payload = CompressionCodecProvider::getCodec(compressionType).encode(payload); + + if (producerConfig.isEncryptionEnabled() && crypto) { + SharedBuffer encryptedPayload; + if (!crypto->encrypt(producerConfig.getEncryptionKeys(), producerConfig.getCryptoKeyReader(), + *metadata_, payload, encryptedPayload)) { + return OpSendMsg::create(ResultCryptoError, std::move(callback)); + } + payload = encryptedPayload; + } + + if (payload.readableBytes() > ClientConnection::getMaxMessageSize()) { + return OpSendMsg::create(ResultMessageTooBig, std::move(callback)); + } + + auto op = OpSendMsg::create(*metadata_, callbacks_.size(), messagesSize_, producerConfig.getSendTimeout(), + std::move(callback), nullptr, producerId, payload); + clear(); + return op; +} + void MessageAndCallbackBatch::clear() { - msgImpl_.reset(); + messages_.clear(); callbacks_.clear(); - messagesCount_ = 0; messagesSize_ = 0; } static void completeSendCallbacks(const std::vector& callbacks, Result result, const MessageId& id) { int32_t numOfMessages = static_cast(callbacks.size()); - LOG_DEBUG("Batch complete [Result = " << result << "] [numOfMessages = " << numOfMessages << "]"); for (int32_t i = 0; i < numOfMessages; i++) { callbacks[i](result, MessageIdBuilder::from(id).batchIndex(i).batchSize(numOfMessages).build()); } } -void MessageAndCallbackBatch::complete(Result result, const MessageId& id) const { - completeSendCallbacks(callbacks_, result, id); -} - -SendCallback MessageAndCallbackBatch::createSendCallback(const FlushCallback& flushCallback) const { +SendCallback MessageAndCallbackBatch::createSendCallback() const { const auto& callbacks = callbacks_; - if (flushCallback) { - return [callbacks, flushCallback](Result result, const MessageId& id) { - completeSendCallbacks(callbacks, result, id); - flushCallback(result); - }; - } else { - return [callbacks] // save a copy of `callbacks_` - (Result result, const MessageId& id) { completeSendCallbacks(callbacks, result, id); }; - } + return [callbacks](Result result, const MessageId& id) { completeSendCallbacks(callbacks, result, id); }; } } // namespace pulsar diff --git a/lib/MessageAndCallbackBatch.h b/lib/MessageAndCallbackBatch.h index e8717b3e..1dce6bf9 100644 --- a/lib/MessageAndCallbackBatch.h +++ b/lib/MessageAndCallbackBatch.h @@ -24,16 +24,24 @@ #include #include +#include #include namespace pulsar { -class MessageImpl; -using MessageImplPtr = std::shared_ptr; +struct OpSendMsg; +class MessageCrypto; using FlushCallback = std::function; -class MessageAndCallbackBatch : public boost::noncopyable { +namespace proto { +class MessageMetadata; +} + +class MessageAndCallbackBatch final : public boost::noncopyable { public: + MessageAndCallbackBatch(); + ~MessageAndCallbackBatch(); + // Wrapper methods of STL container bool empty() const noexcept { return callbacks_.empty(); } size_t size() const noexcept { return callbacks_.size(); } @@ -46,34 +54,22 @@ class MessageAndCallbackBatch : public boost::noncopyable { */ void add(const Message& msg, const SendCallback& callback); - /** - * Clear the internal stats - */ - void clear(); + std::unique_ptr createOpSendMsg(uint64_t producerId, + const ProducerConfiguration& producerConfig, + MessageCrypto* crypto); - /** - * Complete all the callbacks with given parameters - * - * @param result this batch's send result - * @param id this batch's message id - */ - void complete(Result result, const MessageId& id) const; - - SendCallback createSendCallback(const FlushCallback& flushCallback) const; - - const MessageImplPtr& msgImpl() const { return msgImpl_; } uint64_t sequenceId() const noexcept { return sequenceId_; } - uint32_t messagesCount() const { return messagesCount_; } - uint64_t messagesSize() const { return messagesSize_; } + void clear(); private: - MessageImplPtr msgImpl_; + std::unique_ptr metadata_; + std::vector messages_; std::vector callbacks_; std::atomic sequenceId_{static_cast(-1L)}; - - uint32_t messagesCount_{0}; uint64_t messagesSize_{0ull}; + + SendCallback createSendCallback() const; }; } // namespace pulsar diff --git a/lib/OpSendMsg.h b/lib/OpSendMsg.h index 0b062852..06fa77f4 100644 --- a/lib/OpSendMsg.h +++ b/lib/OpSendMsg.h @@ -36,7 +36,7 @@ struct SendArguments { const uint64_t producerId; const uint64_t sequenceId; const proto::MessageMetadata metadata; - const SharedBuffer payload; + SharedBuffer payload; SendArguments(uint64_t producerId, uint64_t sequenceId, const proto::MessageMetadata& metadata, const SharedBuffer& payload) @@ -73,7 +73,9 @@ struct OpSendMsg { } void addTrackerCallback(std::function trackerCallback) { - trackerCallbacks.emplace_back(trackerCallback); + if (trackerCallback) { + trackerCallbacks.emplace_back(trackerCallback); + } } private: From 3b31f315d010e4c465ac451856514f18098385ff Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 29 Aug 2023 19:43:31 +0800 Subject: [PATCH 3/9] Optimize metadata update and callback in sendAsync --- lib/ProducerImpl.cc | 61 +++++++++++++++---------------- lib/ProducerImpl.h | 2 - lib/ProducerInterceptors.h | 2 + lib/stats/ProducerStatsDisabled.h | 31 ---------------- 4 files changed, 32 insertions(+), 64 deletions(-) delete mode 100644 lib/stats/ProducerStatsDisabled.h diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc index d8f02be6..1aab065f 100644 --- a/lib/ProducerImpl.cc +++ b/lib/ProducerImpl.cc @@ -39,7 +39,6 @@ #include "Semaphore.h" #include "TimeUtils.h" #include "TopicName.h" -#include "stats/ProducerStatsDisabled.h" #include "stats/ProducerStatsImpl.h" namespace pulsar { @@ -82,13 +81,11 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const TopicName& topicName, } unsigned int statsIntervalInSeconds = client->getClientConfig().getStatsIntervalInSeconds(); - if (statsIntervalInSeconds) { + if (statsIntervalInSeconds > 0) { producerStatsBasePtr_ = std::make_shared(producerStr_, executor_, statsIntervalInSeconds); - } else { - producerStatsBasePtr_ = std::make_shared(); + producerStatsBasePtr_->start(); } - producerStatsBasePtr_->start(); if (conf_.isEncryptionEnabled()) { std::ostringstream logCtxStream; @@ -342,22 +339,6 @@ void ProducerImpl::resendMessages(ClientConnectionPtr cnx) { } } -void ProducerImpl::setMessageMetadata(const Message& msg, const uint64_t& sequenceId, - const uint32_t& uncompressedSize) { - // Call this function after acquiring the mutex_ - proto::MessageMetadata& msgMetadata = msg.impl_->metadata; - msgMetadata.set_producer_name(producerName_); - msgMetadata.set_publish_time(TimeUtils::currentTimeMillis()); - msgMetadata.set_sequence_id(sequenceId); - if (conf_.getCompressionType() != CompressionNone) { - msgMetadata.set_compression(static_cast(conf_.getCompressionType())); - msgMetadata.set_uncompressed_size(uncompressedSize); - } - if (!this->getSchemaVersion().empty()) { - msgMetadata.set_schema_version(this->getSchemaVersion()); - } -} - void ProducerImpl::flushAsync(FlushCallback callback) { if (state_ != Ready) { callback(ResultAlreadyClosed); @@ -435,18 +416,24 @@ static SharedBuffer applyCompression(const SharedBuffer& uncompressedPayload, } void ProducerImpl::sendAsync(const Message& msg, SendCallback callback) { - producerStatsBasePtr_->messageSent(msg); + if (!producerStatsBasePtr_ && !interceptors_) { + return sendAsyncWithStatsUpdate(msg, std::move(callback)); + } + auto self = shared_from_this(); + if (producerStatsBasePtr_) { + producerStatsBasePtr_->messageSent(msg); + } - Producer producer = Producer(shared_from_this()); - auto interceptorMessage = interceptors_->beforeSend(producer, msg); + const auto interceptorMessage = interceptors_ ? interceptors_->beforeSend(Producer{self}, msg) : msg; + const auto now = producerStatsBasePtr_ ? TimeUtils::now() : boost::posix_time::ptime{}; - const auto now = boost::posix_time::microsec_clock::universal_time(); - auto self = shared_from_this(); - sendAsyncWithStatsUpdate(interceptorMessage, [this, self, now, callback, producer, interceptorMessage]( + sendAsyncWithStatsUpdate(interceptorMessage, [this, self, now, callback, interceptorMessage]( Result result, const MessageId& messageId) { - producerStatsBasePtr_->messageReceived(result, now); + if (producerStatsBasePtr_) { + producerStatsBasePtr_->messageReceived(result, now); + } - interceptors_->onSendAcknowledgement(producer, result, interceptorMessage, messageId); + interceptors_->onSendAcknowledgement(Producer{self}, result, interceptorMessage, messageId); if (callback) { callback(result, messageId); @@ -501,10 +488,22 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, SendCallback&& c uint64_t sequenceId; if (!msgMetadata.has_sequence_id()) { sequenceId = msgSequenceGenerator_++; + msgMetadata.set_sequence_id(sequenceId); } else { sequenceId = msgMetadata.sequence_id(); } - setMessageMetadata(msg, sequenceId, uncompressedSize); + + if (compressed || batchMessageContainer_->isFirstMessageToAdd(msg)) { + msgMetadata.set_producer_name(producerName_); + msgMetadata.set_publish_time(TimeUtils::currentTimeMillis()); + if (conf_.getCompressionType() != CompressionNone) { + msgMetadata.set_compression(static_cast(conf_.getCompressionType())); + msgMetadata.set_uncompressed_size(uncompressedSize); + } + if (!schemaVersion_.empty()) { + msgMetadata.set_schema_version(schemaVersion_); + } + } // else: The 2nd or later message in the batch only needs to set the sequence id auto payloadChunkSize = maxMessageSize; int totalChunks; @@ -531,7 +530,7 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, SendCallback&& c } } - if (canAddToBatch(msg)) { + if (!compressed) { // Batching is enabled and the message is not delayed if (!batchMessageContainer_->hasEnoughSpace(msg)) { batchMessageAndSend().complete(); diff --git a/lib/ProducerImpl.h b/lib/ProducerImpl.h index 8611bfef..0226d060 100644 --- a/lib/ProducerImpl.h +++ b/lib/ProducerImpl.h @@ -104,8 +104,6 @@ class ProducerImpl : public HandlerBase, protected: ProducerStatsBasePtr producerStatsBasePtr_; - void setMessageMetadata(const Message& msg, const uint64_t& sequenceId, const uint32_t& uncompressedSize); - void sendMessage(std::unique_ptr opSendMsg); void startSendTimeoutTimer(); diff --git a/lib/ProducerInterceptors.h b/lib/ProducerInterceptors.h index f83394f1..d9c8c10b 100644 --- a/lib/ProducerInterceptors.h +++ b/lib/ProducerInterceptors.h @@ -41,6 +41,8 @@ class ProducerInterceptors { void close(); + operator bool() const noexcept { return interceptors_.empty(); } + private: enum State { diff --git a/lib/stats/ProducerStatsDisabled.h b/lib/stats/ProducerStatsDisabled.h deleted file mode 100644 index df1df0f8..00000000 --- a/lib/stats/ProducerStatsDisabled.h +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#ifndef PULSAR_PRODUCER_STATS_DISABLED_HEADER -#define PULSAR_PRODUCER_STATS_DISABLED_HEADER -#include "ProducerStatsBase.h" - -namespace pulsar { -class ProducerStatsDisabled : public ProducerStatsBase { - public: - virtual void messageSent(const Message& msg){}; - virtual void messageReceived(Result, const boost::posix_time::ptime&){}; -}; -} // namespace pulsar -#endif // PULSAR_PRODUCER_STATS_DISABLED_HEADER From 7533f288f57812241aa65dba22c0f7e4f9c4656f Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 29 Aug 2023 19:52:47 +0800 Subject: [PATCH 4/9] Pass std::function by rvalue reference --- lib/BatchMessageContainer.cc | 4 ++-- lib/BatchMessageContainer.h | 2 +- lib/BatchMessageContainerBase.h | 2 +- lib/BatchMessageKeyBasedContainer.cc | 4 ++-- lib/BatchMessageKeyBasedContainer.h | 2 +- lib/MessageAndCallbackBatch.cc | 4 ++-- lib/MessageAndCallbackBatch.h | 2 +- lib/ProducerImpl.cc | 2 +- 8 files changed, 11 insertions(+), 11 deletions(-) diff --git a/lib/BatchMessageContainer.cc b/lib/BatchMessageContainer.cc index cd7ddc85..1f8bee9f 100644 --- a/lib/BatchMessageContainer.cc +++ b/lib/BatchMessageContainer.cc @@ -36,9 +36,9 @@ BatchMessageContainer::~BatchMessageContainer() { << "] [averageBatchSize_ = " << averageBatchSize_ << "]"); } -bool BatchMessageContainer::add(const Message& msg, const SendCallback& callback) { +bool BatchMessageContainer::add(const Message& msg, SendCallback&& callback) { LOG_DEBUG("Before add: " << *this << " [message = " << msg << "]"); - batch_.add(msg, callback); + batch_.add(msg, std::move(callback)); updateStats(msg); LOG_DEBUG("After add: " << *this); return isFull(); diff --git a/lib/BatchMessageContainer.h b/lib/BatchMessageContainer.h index 5b1213c0..4c55a9fb 100644 --- a/lib/BatchMessageContainer.h +++ b/lib/BatchMessageContainer.h @@ -43,7 +43,7 @@ class BatchMessageContainer : public BatchMessageContainerBase { bool isFirstMessageToAdd(const Message& msg) const override { return batch_.empty(); } - bool add(const Message& msg, const SendCallback& callback) override; + bool add(const Message& msg, SendCallback&& callback) override; void serialize(std::ostream& os) const override; diff --git a/lib/BatchMessageContainerBase.h b/lib/BatchMessageContainerBase.h index cd17d6d6..b372f69d 100644 --- a/lib/BatchMessageContainerBase.h +++ b/lib/BatchMessageContainerBase.h @@ -67,7 +67,7 @@ class BatchMessageContainerBase : public boost::noncopyable { * @param callback message send callback * @return true if the batch is full, otherwise false */ - virtual bool add(const Message& msg, const SendCallback& callback) = 0; + virtual bool add(const Message& msg, SendCallback&& callback) = 0; virtual std::unique_ptr createOpSendMsg(const FlushCallback& flushCallback = nullptr) { throw std::runtime_error("createOpSendMsg is not supported"); diff --git a/lib/BatchMessageKeyBasedContainer.cc b/lib/BatchMessageKeyBasedContainer.cc index 20067363..8ab7f071 100644 --- a/lib/BatchMessageKeyBasedContainer.cc +++ b/lib/BatchMessageKeyBasedContainer.cc @@ -55,9 +55,9 @@ bool BatchMessageKeyBasedContainer::isFirstMessageToAdd(const Message& msg) cons } } -bool BatchMessageKeyBasedContainer::add(const Message& msg, const SendCallback& callback) { +bool BatchMessageKeyBasedContainer::add(const Message& msg, SendCallback&& callback) { LOG_DEBUG("Before add: " << *this << " [message = " << msg << "]"); - batches_[getKey(msg)].add(msg, callback); + batches_[getKey(msg)].add(msg, std::move(callback)); updateStats(msg); LOG_DEBUG("After add: " << *this); return isFull(); diff --git a/lib/BatchMessageKeyBasedContainer.h b/lib/BatchMessageKeyBasedContainer.h index e534fbaa..7fa6b136 100644 --- a/lib/BatchMessageKeyBasedContainer.h +++ b/lib/BatchMessageKeyBasedContainer.h @@ -36,7 +36,7 @@ class BatchMessageKeyBasedContainer : public BatchMessageContainerBase { bool isFirstMessageToAdd(const Message& msg) const override; - bool add(const Message& msg, const SendCallback& callback) override; + bool add(const Message& msg, SendCallback&& callback) override; std::vector> createOpSendMsgs(const FlushCallback& flushCallback) override; diff --git a/lib/MessageAndCallbackBatch.cc b/lib/MessageAndCallbackBatch.cc index e9dc0e15..c22bdc17 100644 --- a/lib/MessageAndCallbackBatch.cc +++ b/lib/MessageAndCallbackBatch.cc @@ -33,14 +33,14 @@ MessageAndCallbackBatch::MessageAndCallbackBatch() {} MessageAndCallbackBatch::~MessageAndCallbackBatch() {} -void MessageAndCallbackBatch::add(const Message& msg, const SendCallback& callback) { +void MessageAndCallbackBatch::add(const Message& msg, SendCallback&& callback) { if (callbacks_.empty()) { metadata_.reset(new proto::MessageMetadata); Commands::initBatchMessageMetadata(msg, *metadata_); sequenceId_ = metadata_->sequence_id(); } messages_.emplace_back(msg); - callbacks_.emplace_back(callback); + callbacks_.emplace_back(std::move(callback)); messagesSize_ += msg.getLength(); } diff --git a/lib/MessageAndCallbackBatch.h b/lib/MessageAndCallbackBatch.h index 1dce6bf9..91ae8091 100644 --- a/lib/MessageAndCallbackBatch.h +++ b/lib/MessageAndCallbackBatch.h @@ -52,7 +52,7 @@ class MessageAndCallbackBatch final : public boost::noncopyable { * @param message * @callback the associated send callback */ - void add(const Message& msg, const SendCallback& callback); + void add(const Message& msg, SendCallback&& callback); std::unique_ptr createOpSendMsg(uint64_t producerId, const ProducerConfiguration& producerConfig, diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc index 1aab065f..d68473b2 100644 --- a/lib/ProducerImpl.cc +++ b/lib/ProducerImpl.cc @@ -536,7 +536,7 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, SendCallback&& c batchMessageAndSend().complete(); } bool isFirstMessage = batchMessageContainer_->isFirstMessageToAdd(msg); - bool isFull = batchMessageContainer_->add(msg, callback); + bool isFull = batchMessageContainer_->add(msg, std::move(callback)); if (isFirstMessage) { batchTimer_->expires_from_now( boost::posix_time::milliseconds(conf_.getBatchingMaxPublishDelayMs())); From 696a941b14673329d0e5bfb9ece9a760ff4e1c27 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 29 Aug 2023 20:14:28 +0800 Subject: [PATCH 5/9] Do not create a callback based on the previous callbacks --- include/pulsar/MessageId.h | 1 + lib/MessageAndCallbackBatch.cc | 22 ++++--------------- lib/MessageAndCallbackBatch.h | 2 -- lib/OpSendMsg.h | 39 ++++++++++++++++++++++++++++------ 4 files changed, 38 insertions(+), 26 deletions(-) diff --git a/include/pulsar/MessageId.h b/include/pulsar/MessageId.h index a05a8fb6..071b2446 100644 --- a/include/pulsar/MessageId.h +++ b/include/pulsar/MessageId.h @@ -107,6 +107,7 @@ class PULSAR_PUBLIC MessageId { friend class MultiTopicsConsumerImpl; friend class UnAckedMessageTrackerEnabled; friend class BatchAcknowledgementTracker; + friend struct OpSendMsg; friend class PulsarWrapper; friend class PulsarFriend; friend class NegativeAcksTracker; diff --git a/lib/MessageAndCallbackBatch.cc b/lib/MessageAndCallbackBatch.cc index c22bdc17..9c1f5813 100644 --- a/lib/MessageAndCallbackBatch.cc +++ b/lib/MessageAndCallbackBatch.cc @@ -46,9 +46,8 @@ void MessageAndCallbackBatch::add(const Message& msg, SendCallback&& callback) { std::unique_ptr MessageAndCallbackBatch::createOpSendMsg( uint64_t producerId, const ProducerConfiguration& producerConfig, MessageCrypto* crypto) { - auto callback = createSendCallback(); if (empty()) { - return OpSendMsg::create(ResultOperationNotSupported, std::move(callback)); + return OpSendMsg::create(ResultOperationNotSupported, std::move(callbacks_)); } // The magic number 64 is just an estimated size increment after setting some fields of the @@ -79,17 +78,17 @@ std::unique_ptr MessageAndCallbackBatch::createOpSendMsg( SharedBuffer encryptedPayload; if (!crypto->encrypt(producerConfig.getEncryptionKeys(), producerConfig.getCryptoKeyReader(), *metadata_, payload, encryptedPayload)) { - return OpSendMsg::create(ResultCryptoError, std::move(callback)); + return OpSendMsg::create(ResultCryptoError, std::move(callbacks_)); } payload = encryptedPayload; } if (payload.readableBytes() > ClientConnection::getMaxMessageSize()) { - return OpSendMsg::create(ResultMessageTooBig, std::move(callback)); + return OpSendMsg::create(ResultMessageTooBig, std::move(callbacks_)); } auto op = OpSendMsg::create(*metadata_, callbacks_.size(), messagesSize_, producerConfig.getSendTimeout(), - std::move(callback), nullptr, producerId, payload); + std::move(callbacks_), nullptr, producerId, payload); clear(); return op; } @@ -100,17 +99,4 @@ void MessageAndCallbackBatch::clear() { messagesSize_ = 0; } -static void completeSendCallbacks(const std::vector& callbacks, Result result, - const MessageId& id) { - int32_t numOfMessages = static_cast(callbacks.size()); - for (int32_t i = 0; i < numOfMessages; i++) { - callbacks[i](result, MessageIdBuilder::from(id).batchIndex(i).batchSize(numOfMessages).build()); - } -} - -SendCallback MessageAndCallbackBatch::createSendCallback() const { - const auto& callbacks = callbacks_; - return [callbacks](Result result, const MessageId& id) { completeSendCallbacks(callbacks, result, id); }; -} - } // namespace pulsar diff --git a/lib/MessageAndCallbackBatch.h b/lib/MessageAndCallbackBatch.h index 91ae8091..dbd837c0 100644 --- a/lib/MessageAndCallbackBatch.h +++ b/lib/MessageAndCallbackBatch.h @@ -68,8 +68,6 @@ class MessageAndCallbackBatch final : public boost::noncopyable { std::vector callbacks_; std::atomic sequenceId_{static_cast(-1L)}; uint64_t messagesSize_{0ull}; - - SendCallback createSendCallback() const; }; } // namespace pulsar diff --git a/lib/OpSendMsg.h b/lib/OpSendMsg.h index 06fa77f4..421f3af3 100644 --- a/lib/OpSendMsg.h +++ b/lib/OpSendMsg.h @@ -26,6 +26,7 @@ #include #include "ChunkMessageIdImpl.h" +#include "MessageIdImpl.h" #include "PulsarApi.pb.h" #include "SharedBuffer.h" #include "TimeUtils.h" @@ -52,7 +53,8 @@ struct OpSendMsg { const uint32_t messagesCount; const uint64_t messagesSize; const boost::posix_time::ptime timeout; - const SendCallback sendCallback; + SendCallback nonBatchSendCallback; + std::vector batchSendCallbacks; std::vector> trackerCallbacks; ChunkMessageIdImplPtr chunkedMessageId; // Use shared_ptr here because producer might resend the message with the same arguments @@ -64,8 +66,20 @@ struct OpSendMsg { } void complete(Result result, const MessageId& messageId) const { - if (sendCallback) { - sendCallback(result, messageId); + for (size_t i = 0; i < batchSendCallbacks.size(); i++) { + auto& callback = batchSendCallbacks[i]; + if (callback) { + if (result == ResultOk) { + auto msgIdImpl = std::make_shared( + messageId.partition(), messageId.ledgerId(), messageId.entryId(), i); + callback(result, MessageId{msgIdImpl}); + } else { + callback(result, {}); + } + } + } + if (nonBatchSendCallback) { + nonBatchSendCallback(result, messageId); } for (const auto& trackerCallback : trackerCallbacks) { trackerCallback(result); @@ -79,13 +93,13 @@ struct OpSendMsg { } private: - OpSendMsg(Result result, SendCallback&& callback) + OpSendMsg(Result result, std::vector&& callbacks) : result(result), chunkId(-1), numChunks(-1), messagesCount(0), messagesSize(0), - sendCallback(std::move(callback)), + batchSendCallbacks(std::move(callbacks)), sendArgs(nullptr) {} OpSendMsg(const proto::MessageMetadata& metadata, uint32_t messagesCount, uint64_t messagesSize, @@ -97,7 +111,20 @@ struct OpSendMsg { messagesCount(messagesCount), messagesSize(messagesSize), timeout(TimeUtils::now() + boost::posix_time::milliseconds(sendTimeoutMs)), - sendCallback(std::move(callback)), + nonBatchSendCallback(std::move(callback)), + chunkedMessageId(chunkedMessageId), + sendArgs(new SendArguments(producerId, metadata.sequence_id(), metadata, payload)) {} + + OpSendMsg(const proto::MessageMetadata& metadata, uint32_t messagesCount, uint64_t messagesSize, + int sendTimeoutMs, std::vector&& callbacks, + ChunkMessageIdImplPtr chunkedMessageId, uint64_t producerId, SharedBuffer payload) + : result(ResultOk), + chunkId(metadata.chunk_id()), + numChunks(metadata.num_chunks_from_msg()), + messagesCount(messagesCount), + messagesSize(messagesSize), + timeout(TimeUtils::now() + boost::posix_time::milliseconds(sendTimeoutMs)), + batchSendCallbacks(std::move(callbacks)), chunkedMessageId(chunkedMessageId), sendArgs(new SendArguments(producerId, metadata.sequence_id(), metadata, payload)) {} }; From 03373796f9abe7da85bcb38aeae0250980ffd084 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 29 Aug 2023 20:26:26 +0800 Subject: [PATCH 6/9] Preallocate the MessageAndCallbackBatch --- lib/BatchMessageContainer.cc | 2 +- lib/BatchMessageKeyBasedContainer.cc | 7 ++++++- lib/MessageAndCallbackBatch.cc | 5 ++++- lib/MessageAndCallbackBatch.h | 4 +++- 4 files changed, 14 insertions(+), 4 deletions(-) diff --git a/lib/BatchMessageContainer.cc b/lib/BatchMessageContainer.cc index 1f8bee9f..d8c5de1a 100644 --- a/lib/BatchMessageContainer.cc +++ b/lib/BatchMessageContainer.cc @@ -28,7 +28,7 @@ DECLARE_LOG_OBJECT() namespace pulsar { BatchMessageContainer::BatchMessageContainer(const ProducerImpl& producer) - : BatchMessageContainerBase(producer) {} + : BatchMessageContainerBase(producer), batch_(producerConfig_.getBatchingMaxMessages()) {} BatchMessageContainer::~BatchMessageContainer() { LOG_DEBUG(*this << " destructed"); diff --git a/lib/BatchMessageKeyBasedContainer.cc b/lib/BatchMessageKeyBasedContainer.cc index 8ab7f071..22a07081 100644 --- a/lib/BatchMessageKeyBasedContainer.cc +++ b/lib/BatchMessageKeyBasedContainer.cc @@ -57,7 +57,12 @@ bool BatchMessageKeyBasedContainer::isFirstMessageToAdd(const Message& msg) cons bool BatchMessageKeyBasedContainer::add(const Message& msg, SendCallback&& callback) { LOG_DEBUG("Before add: " << *this << " [message = " << msg << "]"); - batches_[getKey(msg)].add(msg, std::move(callback)); + const auto key = getKey(msg); + auto it = batches_.find(getKey(msg)); + if (it == batches_.end()) { + it = batches_.emplace(key, static_cast(producerConfig_.getBatchingMaxMessages())).first; + } + it->second.add(msg, std::move(callback)); updateStats(msg); LOG_DEBUG("After add: " << *this); return isFull(); diff --git a/lib/MessageAndCallbackBatch.cc b/lib/MessageAndCallbackBatch.cc index 9c1f5813..c4b644df 100644 --- a/lib/MessageAndCallbackBatch.cc +++ b/lib/MessageAndCallbackBatch.cc @@ -29,7 +29,10 @@ namespace pulsar { -MessageAndCallbackBatch::MessageAndCallbackBatch() {} +MessageAndCallbackBatch::MessageAndCallbackBatch(size_t capacity) { + messages_.reserve(capacity); + callbacks_.reserve(capacity); +} MessageAndCallbackBatch::~MessageAndCallbackBatch() {} diff --git a/lib/MessageAndCallbackBatch.h b/lib/MessageAndCallbackBatch.h index dbd837c0..d1fd3d5a 100644 --- a/lib/MessageAndCallbackBatch.h +++ b/lib/MessageAndCallbackBatch.h @@ -39,7 +39,9 @@ class MessageMetadata; class MessageAndCallbackBatch final : public boost::noncopyable { public: - MessageAndCallbackBatch(); + // This default constructor is added just to make this class able to be stored in a map + MessageAndCallbackBatch() = default; + MessageAndCallbackBatch(size_t capacity); ~MessageAndCallbackBatch(); // Wrapper methods of STL container From 579730bef2da7327df5c7d7767b8336acbc81eae Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 29 Aug 2023 21:20:05 +0800 Subject: [PATCH 7/9] Add thread pool for send callback: --- lib/ClientImpl.cc | 1 + lib/ClientImpl.h | 9 +++++++++ lib/ProducerImpl.cc | 21 +++++++++++++++++---- 3 files changed, 27 insertions(+), 4 deletions(-) diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc index 762d2b83..ff149a03 100644 --- a/lib/ClientImpl.cc +++ b/lib/ClientImpl.cc @@ -694,6 +694,7 @@ void ClientImpl::shutdown() { LOG_DEBUG(producers.size() << " producers and " << consumers.size() << " consumers have been shutdown."); } + sendCallbackPool_.stop(); if (!pool_.close()) { // pool_ has already been closed. It means shutdown() has been called before. return; diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h index 9ee70951..fd195953 100644 --- a/lib/ClientImpl.h +++ b/lib/ClientImpl.h @@ -22,6 +22,8 @@ #include #include +#include +#include #include #include "ConnectionPool.h" @@ -126,6 +128,11 @@ class ClientImpl : public std::enable_shared_from_this { friend class PulsarFriend; + template + void dispatch(Function&& function) { + boost::asio::dispatch(pool_, function); + } + private: void handleCreateProducer(const Result result, const LookupDataResultPtr partitionMetadata, TopicNamePtr topicName, ProducerConfiguration conf, @@ -176,6 +183,8 @@ class ClientImpl : public std::enable_shared_from_this { ExecutorServiceProviderPtr ioExecutorProvider_; ExecutorServiceProviderPtr listenerExecutorProvider_; ExecutorServiceProviderPtr partitionListenerExecutorProvider_; + // TODO: make it configurable + boost::asio::thread_pool sendCallbackPool_{1}; LookupServicePtr lookupServicePtr_; ConnectionPool pool_; diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc index d68473b2..3aea15e7 100644 --- a/lib/ProducerImpl.cc +++ b/lib/ProducerImpl.cc @@ -926,10 +926,23 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& rawMessageId) { pendingMessagesQueue_.pop_front(); lock.unlock(); - try { - opSendMsg->complete(ResultOk, messageId); - } catch (const std::exception& e) { - LOG_ERROR(getName() << "Exception thrown from callback " << e.what()); + auto client = client_.lock(); + if (client) { + auto rawOpPtr{opSendMsg.release()}; + client->dispatch([rawOpPtr, messageId] { + std::unique_ptr opSendMsg{rawOpPtr}; + try { + opSendMsg->complete(ResultOk, messageId); + } catch (const std::exception& e) { + LOG_ERROR("Exception thrown from callback " << e.what()); + } + }); + } else { + try { + opSendMsg->complete(ResultOk, messageId); + } catch (const std::exception& e) { + LOG_ERROR(getName() << "Exception thrown from callback " << e.what()); + } } return true; } From 1f9743e974e15515452a9640008300877ad6b449 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 29 Aug 2023 21:24:25 +0800 Subject: [PATCH 8/9] Fix compile error --- lib/ClientImpl.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h index fd195953..16720d96 100644 --- a/lib/ClientImpl.h +++ b/lib/ClientImpl.h @@ -129,8 +129,8 @@ class ClientImpl : public std::enable_shared_from_this { friend class PulsarFriend; template - void dispatch(Function&& function) { - boost::asio::dispatch(pool_, function); + void dispatch(Function&& f) { + boost::asio::dispatch(sendCallbackPool_, std::move(f)); } private: From a6d4a6b4810c7d5725d2d141891b92310870868c Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 30 Aug 2023 10:38:12 +0800 Subject: [PATCH 9/9] Do not preallocate for key based batching --- lib/BatchMessageKeyBasedContainer.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/BatchMessageKeyBasedContainer.cc b/lib/BatchMessageKeyBasedContainer.cc index 22a07081..aeb90675 100644 --- a/lib/BatchMessageKeyBasedContainer.cc +++ b/lib/BatchMessageKeyBasedContainer.cc @@ -60,7 +60,8 @@ bool BatchMessageKeyBasedContainer::add(const Message& msg, SendCallback&& callb const auto key = getKey(msg); auto it = batches_.find(getKey(msg)); if (it == batches_.end()) { - it = batches_.emplace(key, static_cast(producerConfig_.getBatchingMaxMessages())).first; + // Do not preallocate for key based batching in case there are many keys + it = batches_.emplace(key, 1).first; } it->second.add(msg, std::move(callback)); updateStats(msg);