diff --git a/include/libp2p/protocol/gossip/gossip.hpp b/include/libp2p/protocol/gossip/gossip.hpp index 0b289db4d..b1f9b3601 100644 --- a/include/libp2p/protocol/gossip/gossip.hpp +++ b/include/libp2p/protocol/gossip/gossip.hpp @@ -36,7 +36,6 @@ namespace libp2p { } // namespace libp2p namespace libp2p::protocol::gossip { - /// Gossip pub-sub protocol config struct Config { /// Network density factors for gossip meshes @@ -93,6 +92,26 @@ namespace libp2p::protocol::gossip { bool sign_messages = false; }; + /// RPC limits to control message processing + struct RPCLimits { + /// Maximum subscriptions that will be processed in a single message and the + /// rest will be ignored + size_t max_subscriptions = 5000; + + /// Maximum messages that will be processed in a single message and the rest + /// will be ignored + size_t max_ihave_messages = 5000; + size_t max_iwant_messages = 5000; + size_t max_graft_messages = 5000; + size_t max_prune_messages = 5000; + + /// Maximum message ids that will be processed in a single message and the + /// rest will be ignored + size_t max_ihave_message_ids = 5000; + size_t max_iwant_message_ids = 5000; + size_t max_prune_peer_infos = 16; + }; + using TopicId = std::string; using TopicList = std::vector; using TopicSet = std::set; @@ -157,6 +176,7 @@ namespace libp2p::protocol::gossip { std::shared_ptr idmgr, std::shared_ptr crypto_provider, std::shared_ptr key_marshaller, - Config config = Config{}); + Config config = Config{}, + RPCLimits limits = RPCLimits{}); } // namespace libp2p::protocol::gossip diff --git a/src/protocol/gossip/impl/connectivity.cpp b/src/protocol/gossip/impl/connectivity.cpp index 8a308d64d..306dc0a90 100644 --- a/src/protocol/gossip/impl/connectivity.cpp +++ b/src/protocol/gossip/impl/connectivity.cpp @@ -30,12 +30,14 @@ namespace libp2p::protocol::gossip { std::shared_ptr scheduler, std::shared_ptr host, std::shared_ptr msg_receiver, - ConnectionStatusFeedback on_connected) + ConnectionStatusFeedback on_connected, + std::shared_ptr limits) : config_(std::move(config)), scheduler_(std::move(scheduler)), host_(std::move(host)), msg_receiver_(std::move(msg_receiver)), connected_cb_(std::move(on_connected)), + limits_(std::move(limits)), log_("gossip", "Connectivity", host_->getPeerInfo().id.toBase58().substr(46)) {} @@ -191,7 +193,8 @@ namespace libp2p::protocol::gossip { on_stream_event_, *msg_receiver_, std::move(stream), - ctx); + ctx, + limits_); gossip_stream->read(); @@ -308,7 +311,8 @@ namespace libp2p::protocol::gossip { on_stream_event_, *msg_receiver_, std::move(stream), - ctx); + ctx, + limits_); gossip_stream->read(); diff --git a/src/protocol/gossip/impl/connectivity.hpp b/src/protocol/gossip/impl/connectivity.hpp index fe7a603a2..194b96dbc 100644 --- a/src/protocol/gossip/impl/connectivity.hpp +++ b/src/protocol/gossip/impl/connectivity.hpp @@ -36,7 +36,8 @@ namespace libp2p::protocol::gossip { std::shared_ptr scheduler, std::shared_ptr host, std::shared_ptr msg_receiver, - ConnectionStatusFeedback on_connected); + ConnectionStatusFeedback on_connected, + std::shared_ptr limits); ~Connectivity() override; @@ -106,6 +107,7 @@ namespace libp2p::protocol::gossip { ConnectionStatusFeedback connected_cb_; Stream::Feedback on_stream_event_; bool started_ = false; + std::shared_ptr limits_; /// All known peers PeerSet all_peers_; diff --git a/src/protocol/gossip/impl/gossip_core.cpp b/src/protocol/gossip/impl/gossip_core.cpp index 2ee17239e..b1cb25fb4 100644 --- a/src/protocol/gossip/impl/gossip_core.cpp +++ b/src/protocol/gossip/impl/gossip_core.cpp @@ -26,13 +26,15 @@ namespace libp2p::protocol::gossip { std::shared_ptr idmgr, std::shared_ptr crypto_provider, std::shared_ptr key_marshaller, - Config config) { + Config config, + RPCLimits limits) { return std::make_shared(std::move(config), std::move(scheduler), std::move(host), std::move(idmgr), std::move(crypto_provider), - std::move(key_marshaller)); + std::move(key_marshaller), + std::move(limits)); } // clang-format off @@ -41,8 +43,10 @@ namespace libp2p::protocol::gossip { std::shared_ptr host, std::shared_ptr idmgr, std::shared_ptr crypto_provider, - std::shared_ptr key_marshaller) + std::shared_ptr key_marshaller, + RPCLimits limits) : config_(std::move(config)), + limits_(std::move(limits)), create_message_id_([](const Bytes &from, const Bytes &seq, const Bytes &data){ return createMessageId(from, seq, data); @@ -62,7 +66,7 @@ namespace libp2p::protocol::gossip { onLocalSubscriptionChanged(subscribe, topic); } )), - msg_seq_(scheduler_->now().count()), + msg_seq_(scheduler_->now().count()), log_("gossip", "Gossip", local_peer_id_.toBase58().substr(46)) {} // clang-format on @@ -100,7 +104,8 @@ namespace libp2p::protocol::gossip { shared_from_this(), [this](bool connected, const PeerContextPtr &ctx) { onPeerConnection(connected, ctx); - } + }, + std::make_shared(limits_) ); // clang-format on diff --git a/src/protocol/gossip/impl/gossip_core.hpp b/src/protocol/gossip/impl/gossip_core.hpp index 0d7e42509..b5cabd724 100644 --- a/src/protocol/gossip/impl/gossip_core.hpp +++ b/src/protocol/gossip/impl/gossip_core.hpp @@ -40,7 +40,8 @@ namespace libp2p::protocol::gossip { std::shared_ptr host, std::shared_ptr idmgr, std::shared_ptr crypto_provider, - std::shared_ptr key_marshaller); + std::shared_ptr key_marshaller, + RPCLimits limits); ~GossipCore() override = default; @@ -90,6 +91,9 @@ namespace libp2p::protocol::gossip { /// Configuration parameters const Config config_; + /// RPC Parsing limits + const RPCLimits limits_; + /// Message ID function MessageIdFn create_message_id_; diff --git a/src/protocol/gossip/impl/message_parser.cpp b/src/protocol/gossip/impl/message_parser.cpp index 15fc22924..8c214c3b2 100644 --- a/src/protocol/gossip/impl/message_parser.cpp +++ b/src/protocol/gossip/impl/message_parser.cpp @@ -12,6 +12,8 @@ #include +#include "peer_context.hpp" + namespace libp2p::protocol::gossip { namespace { @@ -23,7 +25,8 @@ namespace libp2p::protocol::gossip { // need to define default ctor/dtor here in translation unit due to unique_ptr // to type which is incomplete in header - MessageParser::MessageParser() = default; + MessageParser::MessageParser(std::shared_ptr limits) + : limits_(std::move(limits)) {} MessageParser::~MessageParser() = default; bool MessageParser::parse(BytesIn bytes) { @@ -42,49 +45,91 @@ namespace libp2p::protocol::gossip { return; } + size_t curr_subscriptions = 0; + for (const auto &s : pb_msg_->subscriptions()) { if (!s.has_subscribe() || !s.has_topicid()) { continue; } + + if (curr_subscriptions == limits_->max_subscriptions) { + break; + } receiver.onSubscription(from, s.subscribe(), s.topicid()); + + curr_subscriptions++; } if (pb_msg_->has_control()) { const auto &c = pb_msg_->control(); + size_t curr_ihave_messages = 0; + size_t curr_iwant_messages = 0; + size_t curr_graft_messages = 0; + size_t curr_prune_messages = 0; for (const auto &h : c.ihave()) { + if (curr_ihave_messages == limits_->max_ihave_messages) { + break; + } + size_t curr_ihave_message_ids = 0; if (!h.has_topicid() || h.messageids_size() == 0) { continue; } const TopicId &topic = h.topicid(); for (const auto &msg_id : h.messageids()) { + if (curr_ihave_message_ids == limits_->max_ihave_message_ids) { + break; + } if (msg_id.empty()) { continue; } receiver.onIHave(from, topic, fromString(msg_id)); + + curr_ihave_message_ids++; } + + curr_ihave_messages++; } for (const auto &w : c.iwant()) { + if (curr_iwant_messages == limits_->max_iwant_messages) { + break; + } + size_t curr_iwant_message_ids = 0; if (w.messageids_size() == 0) { continue; } for (const auto &msg_id : w.messageids()) { + if (curr_iwant_message_ids == limits_->max_iwant_message_ids) { + break; + } if (msg_id.empty()) { continue; } receiver.onIWant(from, fromString(msg_id)); + + curr_iwant_message_ids++; } + curr_iwant_messages++; } for (const auto &gr : c.graft()) { + if (curr_graft_messages == limits_->max_graft_messages) { + break; + } if (!gr.has_topicid()) { continue; } receiver.onGraft(from, gr.topicid()); + + curr_graft_messages++; } for (const auto &pr : c.prune()) { + if (curr_prune_messages == limits_->max_prune_messages) { + break; + } + size_t curr_prune_peer_infos = 0; if (!pr.has_topicid()) { continue; } @@ -95,13 +140,20 @@ namespace libp2p::protocol::gossip { log()->debug( "prune backoff={}, {} peers", backoff_time, pr.peers_size()); for (const auto &peer : pr.peers()) { + if (curr_prune_peer_infos == limits_->max_prune_peer_infos) { + break; + } // TODO(artem): meshsub 1.1.0 + signed peer records NYI log()->debug("peer id size={}, signed peer record size={}", peer.peerid().size(), peer.signedpeerrecord().size()); + + curr_prune_peer_infos++; } receiver.onPrune(from, pr.topicid(), backoff_time); + + curr_prune_messages++; } } diff --git a/src/protocol/gossip/impl/message_parser.hpp b/src/protocol/gossip/impl/message_parser.hpp index 6523004c5..d43ab0e86 100644 --- a/src/protocol/gossip/impl/message_parser.hpp +++ b/src/protocol/gossip/impl/message_parser.hpp @@ -8,6 +8,8 @@ #include "common.hpp" +#include + namespace pubsub::pb { // protobuf message forward declaration class RPC; @@ -20,7 +22,7 @@ namespace libp2p::protocol::gossip { /// Protobuf message parser. class MessageParser { public: - MessageParser(); + MessageParser(std::shared_ptr limits); ~MessageParser(); @@ -33,6 +35,8 @@ namespace libp2p::protocol::gossip { private: /// Parsed protobuf message std::unique_ptr pb_msg_; + /// Initialised RPC Limits + std::shared_ptr limits_; }; } // namespace libp2p::protocol::gossip diff --git a/src/protocol/gossip/impl/stream.cpp b/src/protocol/gossip/impl/stream.cpp index 2b18a9f47..5f9f31937 100644 --- a/src/protocol/gossip/impl/stream.cpp +++ b/src/protocol/gossip/impl/stream.cpp @@ -12,12 +12,13 @@ #include #include -#include "message_parser.hpp" #include "peer_context.hpp" #define TRACE_ENABLED 0 #include +#include "message_parser.hpp" + namespace libp2p::protocol::gossip { Stream::Stream(size_t stream_id, @@ -26,7 +27,8 @@ namespace libp2p::protocol::gossip { const Feedback &feedback, MessageReceiver &msg_receiver, std::shared_ptr stream, - PeerContextPtr peer) + PeerContextPtr peer, + std::shared_ptr limits) : stream_id_(stream_id), timeout_(config.rw_timeout_msec), scheduler_(scheduler), @@ -35,6 +37,7 @@ namespace libp2p::protocol::gossip { msg_receiver_(msg_receiver), stream_(std::move(stream)), peer_(std::move(peer)), + limits_(std::move(limits)), read_buffer_(std::make_shared>()) { assert(feedback_); assert(stream_); @@ -109,7 +112,7 @@ namespace libp2p::protocol::gossip { peer_->str, stream_id_); - MessageParser parser; + MessageParser parser{limits_}; if (!parser.parse(*read_buffer_)) { feedback_(peer_, Error::MESSAGE_PARSE_ERROR); return; diff --git a/src/protocol/gossip/impl/stream.hpp b/src/protocol/gossip/impl/stream.hpp index cc5b317be..7e626176f 100644 --- a/src/protocol/gossip/impl/stream.hpp +++ b/src/protocol/gossip/impl/stream.hpp @@ -36,7 +36,8 @@ namespace libp2p::protocol::gossip { const Feedback &feedback, MessageReceiver &msg_receiver, std::shared_ptr stream, - PeerContextPtr peer); + PeerContextPtr peer, + std::shared_ptr limits); /// Begins reading messages from stream void read(); @@ -64,6 +65,7 @@ namespace libp2p::protocol::gossip { MessageReceiver &msg_receiver_; std::shared_ptr stream_; PeerContextPtr peer_; + std::shared_ptr limits_; std::deque pending_buffers_; diff --git a/test/libp2p/protocol/gossip/CMakeLists.txt b/test/libp2p/protocol/gossip/CMakeLists.txt index dcd6aba15..4b0db7854 100644 --- a/test/libp2p/protocol/gossip/CMakeLists.txt +++ b/test/libp2p/protocol/gossip/CMakeLists.txt @@ -20,3 +20,11 @@ target_link_libraries(gossip_local_subs_test p2p_gossip p2p_testutil_peer ) + +addtest(gossip_rpc_limits_test + gossip_rpc_limits_test.cpp + ) +target_link_libraries(gossip_rpc_limits_test + p2p_gossip + p2p_testutil_peer + ) diff --git a/test/libp2p/protocol/gossip/gossip_rpc_limits_test.cpp b/test/libp2p/protocol/gossip/gossip_rpc_limits_test.cpp new file mode 100644 index 000000000..6658bfce8 --- /dev/null +++ b/test/libp2p/protocol/gossip/gossip_rpc_limits_test.cpp @@ -0,0 +1,259 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "protocol/gossip/protobuf/rpc.pb.h" +#include "src/protocol/gossip/impl/common.hpp" +#include "src/protocol/gossip/impl/message_parser.hpp" +#include "src/protocol/gossip/impl/message_receiver.hpp" +#include "src/protocol/gossip/impl/peer_context.hpp" +#include "testutil/libp2p/peer.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace g = libp2p::protocol::gossip; + +namespace { + class TestMessageReceiver : public g::MessageReceiver { + public: + ~TestMessageReceiver() = default; + + void onSubscription(const g::PeerContextPtr &from, + bool subscribe, + const g::TopicId &topic) { + subscriptions_processed++; + } + + void onIHave(const g::PeerContextPtr &from, + const g::TopicId &topic, + const g::MessageId &msg_id) { + if (!ihave_topicIds_processed.contains(topic)) { + ihave_topicIds_processed.insert(topic); + ihave_messages_processed++; + } + ihave_message_ids_processed[topic]++; + } + + void onIWant(const g::PeerContextPtr &from, const g::MessageId &msg_id) { + total_iwant_message_Ids_processed++; + } + + void onGraft(const g::PeerContextPtr &from, const g::TopicId &topic) { + graft_processed++; + } + + void onPrune(const g::PeerContextPtr &from, + const g::TopicId &topic, + uint64_t backoff_time) { + prune_processed++; + } + + void onTopicMessage(const g::PeerContextPtr &from, + g::TopicMessage::Ptr msg) {} + + void onMessageEnd(const g::PeerContextPtr &from) {} + + size_t subscriptions_processed = 0; + + /// IHAVE Control messages can have multiple message ids and each control + /// message group can be uniquely identified using a topic Id + size_t ihave_messages_processed = 0; + std::set ihave_topicIds_processed{}; + std::map ihave_message_ids_processed{}; + + /// IWANT Control messages can have multiple message ids but each control + /// message group cannot be uniquely identified + size_t total_iwant_message_Ids_processed = 0; + + size_t graft_processed = 0; + + /// NOTE: Add Peer Info count after MessageParser Update + /// PRUNE messages can have multiple peer infos, but it's currently not + /// implemented in the parser. + size_t prune_processed = 0; + }; + + void serializeAndDispatch(pubsub::pb::RPC &rpc, + g::MessageParser &parser, + TestMessageReceiver &receiver) { + std::string serialized; + rpc.SerializeToString(&serialized); + libp2p::BytesIn pubsub_message{ + reinterpret_cast(serialized.data()), + serialized.size()}; + parser.parse(pubsub_message); + const g::PeerContextPtr mock_context_ptr = + std::make_shared(testutil::randomPeerId()); + parser.dispatch(mock_context_ptr, receiver); + } +} // namespace + +/** + * @given Limit on subscriptions in RPC + * @when we parse the RPC message + * @then subscriptions after the limit has been reached are ignored + */ +TEST(Gossip, RPCSubscriptionLimit) { + srand(0); + g::RPCLimits limits{}; + limits.max_subscriptions = rand() % 10; + + pubsub::pb::RPC rpc; + for (int i = 0; i < 100; i++) { + auto *sub = rpc.add_subscriptions(); + sub->set_subscribe(i % 2); + sub->set_topicid(std::to_string(i)); + } + g::MessageParser parser{std::make_shared(limits)}; + TestMessageReceiver receiver; + serializeAndDispatch(rpc, parser, receiver); + ASSERT_EQ(receiver.subscriptions_processed, limits.max_subscriptions); +} + +/** + * @given Limit on IHAVE control messages and IHAVE message ids on RPC + * @when we parse the RPC message + * @then messages and message ids after the limit are ignored + */ +TEST(Gossip, RPCIHaveLimit) { + srand(0); + g::RPCLimits limits{}; + limits.max_ihave_messages = rand() % 10; + limits.max_ihave_message_ids = rand() % 10; + + pubsub::pb::RPC rpc; + auto *control = rpc.mutable_control(); + for (int i = 0; i < 100; i++) { + auto *ihave = control->add_ihave(); + ihave->set_topicid(std::to_string(i)); + for (int j = 0; j < 100; j++) { + ihave->add_messageids(std::to_string(j)); + } + } + + g::MessageParser parser{std::make_shared(limits)}; + TestMessageReceiver receiver; + serializeAndDispatch(rpc, parser, receiver); + ASSERT_EQ(receiver.ihave_messages_processed, limits.max_ihave_messages); + for (size_t i = 0; i < limits.max_ihave_messages; i++) { + ASSERT_EQ(receiver.ihave_message_ids_processed[std::to_string(i)], + limits.max_ihave_message_ids); + } + for (int i = limits.max_ihave_messages; i < 100; i++) { + ASSERT_EQ(receiver.ihave_message_ids_processed[std::to_string(i)], 0); + } +} + +/** + * @given Limit on IWANT control messages and IWANT message ids on RPC + * @when we parse the RPC message + * @then messages and message ids after the limit are ignored + */ +TEST(Gossip, RPCIWantLimit) { + srand(0); + g::RPCLimits limits{}; + limits.max_iwant_messages = rand() % 10; + limits.max_iwant_message_ids = rand() % 10; + + pubsub::pb::RPC rpc; + auto *control = rpc.mutable_control(); + for (int i = 0; i < 100; i++) { + auto *iwant = control->add_iwant(); + for (int j = 0; j < 100; j++) { + iwant->add_messageids(std::to_string(j)); + } + } + + g::MessageParser parser{std::make_shared(limits)}; + TestMessageReceiver receiver; + serializeAndDispatch(rpc, parser, receiver); + ASSERT_EQ(receiver.total_iwant_message_Ids_processed, + limits.max_iwant_messages * limits.max_iwant_message_ids); +} + +/** + * @given Limit on GRAFT control messages on RPC + * @when we parse the RPC message + * @then messages after the limit are ignored + */ +TEST(Gossip, RPCGraftLimit) { + srand(0); + g::RPCLimits limits{}; + limits.max_graft_messages = rand() % 10; + + pubsub::pb::RPC rpc; + auto *control = rpc.mutable_control(); + for (int i = 0; i < 100; i++) { + auto *graft = control->add_graft(); + graft->set_topicid(std::to_string(i)); + } + + g::MessageParser parser{std::make_shared(limits)}; + TestMessageReceiver receiver; + serializeAndDispatch(rpc, parser, receiver); + + ASSERT_EQ(receiver.graft_processed, limits.max_graft_messages); +} + +/** + * @given Limit on PRUNE control messages on RPC + * @when we parse the RPC message + * @then messages after the limit are ignored + */ +TEST(Gossip, RPCPruneLimit) { + // Logging system is required as PRUNE control message parsing logs + const std::string logger_config(R"( +# ---------------- +sinks: + - name: console + type: console + color: true +groups: + - name: main + sink: console + level: debug + children: + - name: libp2p +# ---------------- + )"); + + auto logging_system = std::make_shared( + std::make_shared( + // Original LibP2P logging config + std::make_shared(), + // Additional logging config for application + logger_config)); + auto result = logging_system->configure(); + libp2p::log::setLoggingSystem(logging_system); + + srand(0); + g::RPCLimits limits{}; + limits.max_prune_messages = rand() % 10; + + pubsub::pb::RPC rpc; + auto *control = rpc.mutable_control(); + for (int i = 0; i < 100; i++) { + auto *prune = control->add_prune(); + prune->set_topicid(std::to_string(i)); + } + + g::MessageParser parser{std::make_shared(limits)}; + TestMessageReceiver receiver; + serializeAndDispatch(rpc, parser, receiver); + + ASSERT_EQ(receiver.prune_processed, limits.max_prune_messages); +}