From 83e746b8bffa6d4393ca7cb0a909d3ded010de6d Mon Sep 17 00:00:00 2001 From: amoxic Date: Wed, 25 Mar 2026 10:40:27 +0800 Subject: [PATCH] Handle response stream bind failures gracefully --- src/brpc/controller.cpp | 10 ++- src/brpc/policy/baidu_rpc_protocol.cpp | 45 +++++++++-- src/brpc/stream.cpp | 31 +++++--- src/brpc/stream_impl.h | 4 + test/brpc_streaming_rpc_unittest.cpp | 100 +++++++++++++++++++++++-- 5 files changed, 164 insertions(+), 26 deletions(-) diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp index 133d1f0453..c72db7c910 100644 --- a/src/brpc/controller.cpp +++ b/src/brpc/controller.cpp @@ -1459,7 +1459,15 @@ void Controller::HandleStreamConnection(Socket *host_socket) { if(!ptrs[i]) continue; Stream* extra_stream = (Stream *) ptrs[i]->conn(); _remote_stream_settings->set_stream_id(extra_stream_ids[i - 1]); - s->SetHostSocket(host_socket); + if (s->SetHostSocket(host_socket) != 0) { + const int ec = errno; + Stream::SetFailed(_request_streams, ec, + "Fail to bind response stream to %s", + host_socket->description().c_str()); + SetFailed(ec, "Fail to bind response stream to %s", + host_socket->description().c_str()); + return; + } extra_stream->SetConnected(_remote_stream_settings); } } diff --git a/src/brpc/policy/baidu_rpc_protocol.cpp b/src/brpc/policy/baidu_rpc_protocol.cpp index 0dba01624a..0917915994 100644 --- a/src/brpc/policy/baidu_rpc_protocol.cpp +++ b/src/brpc/policy/baidu_rpc_protocol.cpp @@ -360,7 +360,19 @@ void SendRpcResponse(int64_t correlation_id, Controller* cntl, Stream* s = (Stream *) stream_ptr->conn(); StreamSettings *stream_settings = meta.mutable_stream_settings(); s->FillSettings(stream_settings); - s->SetHostSocket(sock); + if (s->SetHostSocket(sock) != 0) { + const int errcode = errno; + LOG_IF(WARNING, errcode != EPIPE) + << "Fail to bind response stream=" << response_stream_id + << " to " << sock->description() << ": " + << berror(errcode); + cntl->SetFailed(errcode, "Fail to bind response stream to %s", + sock->description().c_str()); + Stream::SetFailed(response_stream_ids, errcode, + "Fail to bind response stream to %s", + sock->description().c_str()); + return; + } for (size_t i = 1; i < response_stream_ids.size(); ++i) { stream_settings->mutable_extra_stream_ids()->Add(response_stream_ids[i]); } @@ -390,6 +402,15 @@ void SendRpcResponse(int64_t correlation_id, Controller* cntl, ResponseWriteInfo args; bthread_id_t response_id = INVALID_BTHREAD_ID; + auto response_write_guard = butil::MakeScopeGuard([&response_id, &args, span] { + if (response_id == INVALID_BTHREAD_ID) { + return; + } + bthread_id_join(response_id); + // Do not care about the result of background writing. + // TODO: this is not sent + span->set_sent_us(args.sent_us); + }); if (span) { span->set_response_size(res_buf.size()); CHECK_EQ(0, bthread_id_create(&response_id, &args, HandleResponseWritten)); @@ -426,7 +447,21 @@ void SendRpcResponse(int64_t correlation_id, Controller* cntl, SocketUniquePtr extra_stream_ptr; if (Socket::Address(extra_stream_id, &extra_stream_ptr) == 0) { Stream* extra_stream = (Stream *) extra_stream_ptr->conn(); - extra_stream->SetHostSocket(sock); + if (extra_stream->SetHostSocket(sock) != 0) { + const int errcode = errno; + LOG_IF(WARNING, errcode != EPIPE) + << "Fail to bind response stream=" << extra_stream_id + << " to " << sock->description() << ": " + << berror(errcode); + cntl->SetFailed(errcode, "Fail to bind response stream to %s", + sock->description().c_str()); + StreamIds remaining_stream_ids(response_stream_ids.begin() + i, + response_stream_ids.end()); + Stream::SetFailed(remaining_stream_ids, errcode, + "Fail to bind response stream to %s", + sock->description().c_str()); + return; + } extra_stream->SetConnected(); } else { LOG(WARNING) << "Stream=" << extra_stream_id @@ -451,12 +486,6 @@ void SendRpcResponse(int64_t correlation_id, Controller* cntl, } } - if (span) { - bthread_id_join(response_id); - // Do not care about the result of background writing. - // TODO: this is not sent - span->set_sent_us(args.sent_us); - } } namespace { diff --git a/src/brpc/stream.cpp b/src/brpc/stream.cpp index a2a106a8b1..8a985c5243 100644 --- a/src/brpc/stream.cpp +++ b/src/brpc/stream.cpp @@ -19,6 +19,7 @@ #include "brpc/stream.h" #include +#include "butil/string_printf.h" #include "butil/time.h" #include "butil/object_pool.h" #include "butil/unique_ptr.h" @@ -57,6 +58,7 @@ Stream::Stream() , _pending_buf(NULL) , _start_idle_timer_us(0) , _idle_timer(0) + , _set_host_socket_ec(0) { _connect_meta.on_connect = NULL; CHECK_EQ(0, bthread_mutex_init(&_connect_mutex, NULL)); @@ -665,13 +667,16 @@ int Stream::SetHostSocket(Socket *host_socket) { std::call_once(_set_host_socket_flag, [this, host_socket]() { SocketUniquePtr ptr; host_socket->ReAddress(&ptr); - // TODO add *this to host socke if (ptr->AddStream(id()) != 0) { - CHECK(false) << id() << " fail to add stream to host socket"; + _set_host_socket_ec = errno ? errno : ptr->non_zero_error_code(); return; } _host_socket = ptr.release(); }); + if (_host_socket == NULL) { + errno = _set_host_socket_ec ? _set_host_socket_ec : EFAILEDSOCKET; + return -1; + } return 0; } @@ -731,27 +736,35 @@ void Stream::Close(int error_code, const char* reason_fmt, ...) { return TriggerOnConnectIfNeed(); } -int Stream::SetFailed(StreamId id, int error_code, const char* reason_fmt, ...) { +int Stream::SetFailedWithReason(StreamId id, int error_code, + const std::string& reason) { SocketUniquePtr ptr; if (Socket::AddressFailedAsWell(id, &ptr) == -1) { - // Don't care recycled stream return 0; } Stream* s = (Stream*)ptr->conn(); + s->Close(error_code, "%s", reason.c_str()); + return 0; +} + +int Stream::SetFailed(StreamId id, int error_code, const char* reason_fmt, ...) { va_list ap; va_start(ap, reason_fmt); - s->Close(error_code, reason_fmt, ap); + std::string reason; + butil::string_vprintf(&reason, reason_fmt, ap); va_end(ap); - return 0; + return SetFailedWithReason(id, error_code, reason); } int Stream::SetFailed(const StreamIds& ids, int error_code, const char* reason_fmt, ...) { va_list ap; va_start(ap, reason_fmt); - for(size_t i = 0; i< ids.size(); ++i) { - Stream::SetFailed(ids[i], error_code, reason_fmt, ap); - } + std::string reason; + butil::string_vprintf(&reason, reason_fmt, ap); va_end(ap); + for (size_t i = 0; i < ids.size(); ++i) { + Stream::SetFailedWithReason(ids[i], error_code, reason); + } return 0; } diff --git a/src/brpc/stream_impl.h b/src/brpc/stream_impl.h index 284b33ca33..fa3658a13e 100644 --- a/src/brpc/stream_impl.h +++ b/src/brpc/stream_impl.h @@ -20,6 +20,7 @@ #define BRPC_STREAM_IMPL_H #include +#include #include "bthread/bthread.h" #include "bthread/execution_queue.h" #include "brpc/socket.h" @@ -91,6 +92,8 @@ friend struct butil::DefaultDeleter; static int TriggerOnWritable(bthread_id_t id, void *data, int error_code); static void *RunOnWritable(void* arg); static void* RunOnConnect(void* arg); + static int SetFailedWithReason(StreamId id, int error_code, + const std::string& reason); struct ConnectMeta { int (*on_connect)(int, int, void*); @@ -136,6 +139,7 @@ friend struct butil::DefaultDeleter; int64_t _start_idle_timer_us; bthread_timer_t _idle_timer; std::once_flag _set_host_socket_flag; + int _set_host_socket_ec; }; } // namespace brpc diff --git a/test/brpc_streaming_rpc_unittest.cpp b/test/brpc_streaming_rpc_unittest.cpp index ecb88c6150..9fa0cf99d0 100644 --- a/test/brpc_streaming_rpc_unittest.cpp +++ b/test/brpc_streaming_rpc_unittest.cpp @@ -21,12 +21,14 @@ #include #include +#include #include "brpc/server.h" #include "brpc/controller.h" #include "brpc/channel.h" #include "brpc/callback.h" #include "brpc/socket.h" +#include "brpc/details/controller_private_accessor.h" #include "brpc/stream_impl.h" #include "brpc/policy/streaming_rpc_protocol.h" #include "echo.pb.h" @@ -38,12 +40,12 @@ class AfterAcceptStream { class MyServiceWithStream : public test::EchoService { public: - MyServiceWithStream(const brpc::StreamOptions& options) + MyServiceWithStream(const brpc::StreamOptions& options) : _options(options) , _after_accept_stream(NULL) {} MyServiceWithStream(const brpc::StreamOptions& options, - AfterAcceptStream* after_accept_stream) + AfterAcceptStream* after_accept_stream) : _options(options) , _after_accept_stream(after_accept_stream) {} @@ -53,9 +55,9 @@ class MyServiceWithStream : public test::EchoService { {} void Echo(::google::protobuf::RpcController* controller, - const ::test::EchoRequest* request, - ::test::EchoResponse* response, - ::google::protobuf::Closure* done) { + const ::test::EchoRequest* request, + ::test::EchoResponse* response, + ::google::protobuf::Closure* done) { brpc::ClosureGuard done_guard(done); response->set_message(request->message()); brpc::Controller* cntl = (brpc::Controller*)controller; @@ -125,7 +127,8 @@ class BatchStreamClientHandler : public brpc::StreamInputHandler { void on_closed(brpc::StreamId /*id*/) override {} - void on_failed(brpc::StreamId /*id*/, int /*error_code*/, const std::string& /*error_text*/) override {} + void on_failed(brpc::StreamId /*id*/, int /*error_code*/, + const std::string& /*error_text*/) override {} private: BatchStreamFeedbackRaceState* _state; @@ -162,7 +165,8 @@ static void* SendTwoMessagesOnServerExtraStream(void* arg) { std::string payload(64, 'a'); butil::IOBuf out; out.append(payload); - state->server_first_write_rc.store(brpc::StreamWrite(sid, out), std::memory_order_relaxed); + state->server_first_write_rc.store(brpc::StreamWrite(sid, out), + std::memory_order_relaxed); } // 2) Then send another byte. This write should become writable only after @@ -226,12 +230,59 @@ static void SetAtomicTrue(std::atomic* f) { static bool WaitForTrue(const std::atomic& f, int timeout_ms) { const int64_t deadline_us = butil::gettimeofday_us() + (int64_t)timeout_ms * 1000L; - while (!f.load(std::memory_order_acquire) && butil::gettimeofday_us() < deadline_us) { + while (!f.load(std::memory_order_acquire) && + butil::gettimeofday_us() < deadline_us) { usleep(1000); } return f.load(std::memory_order_acquire); } +class MyServiceWithStreamAndFailedSocket : public test::EchoService { +public: + explicit MyServiceWithStreamAndFailedSocket(const brpc::StreamOptions& options) + : _options(options) {} + + void Echo(::google::protobuf::RpcController* controller, + const ::test::EchoRequest* request, + ::test::EchoResponse* response, + ::google::protobuf::Closure* done) override { + brpc::ClosureGuard done_guard(done); + response->set_message(request->message()); + brpc::Controller* cntl = static_cast(controller); + brpc::StreamId response_stream; + ASSERT_EQ(0, StreamAccept(&response_stream, *cntl, &_options)); + brpc::ControllerPrivateAccessor accessor(cntl); + ASSERT_TRUE(accessor.get_sending_socket() != NULL); + accessor.get_sending_socket()->SetFailed(); + } + +private: + brpc::StreamOptions _options; +}; + +TEST_F(StreamingRpcTest, set_host_socket_returns_error_when_socket_is_failed) { + brpc::SocketOptions socket_options; + brpc::SocketId host_socket_id; + ASSERT_EQ(0, brpc::Socket::Create(socket_options, &host_socket_id)); + brpc::SocketUniquePtr host_socket; + ASSERT_EQ(0, brpc::Socket::Address(host_socket_id, &host_socket)); + ASSERT_EQ(0, host_socket->SetFailed()); + + brpc::StreamId stream_id; + brpc::StreamOptions stream_options; + ASSERT_EQ(0, brpc::Stream::Create(stream_options, NULL, &stream_id, false)); + brpc::ScopedStream stream_guard(stream_id); + + brpc::SocketUniquePtr stream_socket; + ASSERT_EQ(0, brpc::Socket::Address(stream_id, &stream_socket)); + brpc::Stream* stream = static_cast(stream_socket->conn()); + + errno = 0; + ASSERT_EQ(-1, stream->SetHostSocket(host_socket.get())); + ASSERT_NE(0, errno); + ASSERT_TRUE(stream->_host_socket == NULL); +} + TEST_F(StreamingRpcTest, sanity) { brpc::Server server; MyServiceWithStream service; @@ -393,6 +444,39 @@ class OrderedInputHandler : public brpc::StreamInputHandler { HandlerControl* _cntl; }; +TEST_F(StreamingRpcTest, server_failed_socket_before_response_closes_stream_without_abort) { + OrderedInputHandler handler; + brpc::StreamOptions response_stream_options; + response_stream_options.handler = &handler; + brpc::Server server; + MyServiceWithStreamAndFailedSocket service(response_stream_options); + ASSERT_EQ(0, server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE)); + ASSERT_EQ(0, server.Start(9007, NULL)); + + brpc::Channel channel; + ASSERT_EQ(0, channel.Init("127.0.0.1:9007", NULL)); + brpc::Controller cntl; + brpc::StreamId request_stream; + ASSERT_EQ(0, StreamCreate(&request_stream, cntl, NULL)); + brpc::ScopedStream stream_guard(request_stream); + + test::EchoService_Stub stub(&channel); + stub.Echo(&cntl, &request, &response, NULL); + ASSERT_TRUE(cntl.Failed()); + + for (int i = 0; i < 10000 && !handler.stopped(); ++i) { + usleep(100); + } + + server.Stop(0); + server.Join(); + + ASSERT_TRUE(handler.stopped()); + ASSERT_TRUE(handler.failed()); + ASSERT_EQ(0, handler.idle_times()); + ASSERT_EQ(0, handler._expected_next_value); +} + TEST_F(StreamingRpcTest, received_in_order) { OrderedInputHandler handler; brpc::StreamOptions opt;