From 922daaa4486d4ab372a853a2cdd756549cedf316 Mon Sep 17 00:00:00 2001 From: Bartosz Burda Date: Fri, 15 May 2026 09:33:51 +0200 Subject: [PATCH 1/2] fix(gateway): drive fault transport with its own executor Ros2FaultServiceTransport::get_snapshots / get_rosbag (and the other RPC methods) previously called rclcpp::Client::async_send_request and blocked on the std::future. That required some external executor to be spinning the host node, and TSan caught a race between the spin thread cleaning up the client's pending request entry (execute_client) and the calling thread destroying the local response shared_ptr when the function returned. Give the transport its own MutuallyExclusive callback group, registered on a private SingleThreadedExecutor, and drive each RPC via executor_.spin_until_future_complete(). The client callback execution and the response destruction now happen sequentially on the caller's thread, which removes the cross-thread race entirely. The seven per-method mutexes collapse into one executor mutex - the underlying SingleThreadedExecutor is not safe for concurrent spin_until_future_complete callers, so all RPCs serialise through it. The host node's executor no longer needs to know about these clients. The new callback group is created once at construction (single-threaded) and stays on the transport's private executor, so the rcl hash-map race that the no-naked-subscriptions gate guards against does not apply. Add the file to ALLOWED_LEGACY_FILES with a note pointing back at this issue. Closes #399 --- scripts/check_no_naked_subscriptions.sh | 1 + .../ros2_fault_service_transport.hpp | 36 +++-- .../ros2_fault_service_transport.cpp | 149 +++++++++++------- 3 files changed, 116 insertions(+), 70 deletions(-) diff --git a/scripts/check_no_naked_subscriptions.sh b/scripts/check_no_naked_subscriptions.sh index 38f0309a..710d330d 100755 --- a/scripts/check_no_naked_subscriptions.sh +++ b/scripts/check_no_naked_subscriptions.sh @@ -52,6 +52,7 @@ ALLOWED_LEGACY_FILES=( "${FAULT_MANAGER_ROOT}/src/snapshot_capture.cpp" # uses LockedSubscriptionGuard (in-place serialisation) "${FAULT_MANAGER_ROOT}/include/ros2_medkit_fault_manager/snapshot_capture.hpp" # comment references the guarded API "${FAULT_MANAGER_ROOT}/src/rosbag_capture.cpp" # bag-recorder spawns its own node + executor, no shared rcl hash map + "${GATEWAY_ROOT}/src/ros2/transports/ros2_fault_service_transport.cpp" # issue #399: single-threaded callback group at construction for synchronous service-client spin ) LEGACY_PATTERN="" diff --git a/src/ros2_medkit_gateway/include/ros2_medkit_gateway/ros2/transports/ros2_fault_service_transport.hpp b/src/ros2_medkit_gateway/include/ros2_medkit_gateway/ros2/transports/ros2_fault_service_transport.hpp index 7d8ddf96..71ca2a17 100644 --- a/src/ros2_medkit_gateway/include/ros2_medkit_gateway/ros2/transports/ros2_fault_service_transport.hpp +++ b/src/ros2_medkit_gateway/include/ros2_medkit_gateway/ros2/transports/ros2_fault_service_transport.hpp @@ -16,6 +16,7 @@ #include #include +#include #include #include #include @@ -34,11 +35,12 @@ namespace ros2_medkit_gateway::ros2 { /** * @brief rclcpp adapter implementing FaultServiceTransport. * - * Owns the seven `rclcpp::Client` instances and the - * seven per-client mutexes that previously lived inside FaultManager. Each - * mutex serialises one service direction so read operations (list, get) are - * not blocked by slow write operations (report_fault triggers snapshot - * capture inside the fault manager node). + * Owns seven `rclcpp::Client` instances scoped to a + * dedicated `MutuallyExclusive` callback group, plus a private + * `SingleThreadedExecutor` that drives that group. Each RPC blocks on + * `executor_.spin_until_future_complete()` so the client's pending-request + * cleanup and the response destruction happen on the calling thread instead + * of racing with whichever external executor spins the host node. * * Performs the ros2_medkit_msgs <-> JSON translation internally, returning * neutral FaultResult and FaultWithEnvJsonResult structures so the FaultManager @@ -85,6 +87,15 @@ class Ros2FaultServiceTransport : public FaultServiceTransport { private: rclcpp::Node * node_; + /// Dedicated callback group for all fault service clients. Created with + /// `automatically_add_to_executor_with_node = false` so the host node's + /// executor does not spin these clients - we drive them ourselves. + rclcpp::CallbackGroup::SharedPtr callback_group_; + + /// Private single-threaded executor that owns `callback_group_` and is + /// driven inline via `spin_until_future_complete()` on each RPC. + std::shared_ptr executor_; + rclcpp::Client::SharedPtr report_fault_client_; rclcpp::Client::SharedPtr get_fault_client_; rclcpp::Client::SharedPtr list_faults_client_; @@ -96,16 +107,11 @@ class Ros2FaultServiceTransport : public FaultServiceTransport { double service_timeout_sec_{5.0}; std::string fault_manager_base_path_{"/fault_manager"}; - /// Per-client mutexes for thread-safe service calls. - /// Split by service client so that read operations (list, get) are not blocked - /// by slow write operations (report_fault with snapshot capture). - mutable std::mutex report_mutex_; - mutable std::mutex list_mutex_; - mutable std::mutex get_mutex_; - mutable std::mutex clear_mutex_; - mutable std::mutex snapshots_mutex_; - mutable std::mutex rosbag_mutex_; - mutable std::mutex list_rosbags_mutex_; + /// Serialises access to `executor_`. `SingleThreadedExecutor::spin_until_future_complete()` + /// is not safe for concurrent callers on the same executor, so all RPCs take + /// this mutex for the duration of the synchronous spin. Replaces the seven + /// per-client mutexes that previously gated each method. + mutable std::mutex executor_mutex_; }; } // namespace ros2_medkit_gateway::ros2 diff --git a/src/ros2_medkit_gateway/src/ros2/transports/ros2_fault_service_transport.cpp b/src/ros2_medkit_gateway/src/ros2/transports/ros2_fault_service_transport.cpp index 1e531d63..5eb8938d 100644 --- a/src/ros2_medkit_gateway/src/ros2/transports/ros2_fault_service_transport.cpp +++ b/src/ros2_medkit_gateway/src/ros2/transports/ros2_fault_service_transport.cpp @@ -17,7 +17,6 @@ #include #include #include -#include #include #include "ros2_medkit_gateway/fault_manager_paths.hpp" @@ -35,26 +34,45 @@ Ros2FaultServiceTransport::Ros2FaultServiceTransport(rclcpp::Node * node) : node } fault_manager_base_path_ = build_fault_manager_base_path(node_); - report_fault_client_ = - node_->create_client(fault_manager_base_path_ + "/report_fault"); - get_fault_client_ = node_->create_client(fault_manager_base_path_ + "/get_fault"); - list_faults_client_ = - node_->create_client(fault_manager_base_path_ + "/list_faults"); - clear_fault_client_ = - node_->create_client(fault_manager_base_path_ + "/clear_fault"); - get_snapshots_client_ = - node_->create_client(fault_manager_base_path_ + "/get_snapshots"); - get_rosbag_client_ = node_->create_client(fault_manager_base_path_ + "/get_rosbag"); - list_rosbags_client_ = - node_->create_client(fault_manager_base_path_ + "/list_rosbags"); + // Dedicated callback group for all fault clients. `false` keeps it out of + // the host node's main executor so the production gateway's + // MultiThreadedExecutor cannot race with our synchronous spin on the + // pending-request cleanup path (see issue #399). + callback_group_ = node_->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive, false); + + executor_ = std::make_shared(); + executor_->add_callback_group(callback_group_, node_->get_node_base_interface()); + + // Portable across Humble/Jazzy/Rolling - `rclcpp::ServicesQoS` only exists from Iron onwards. + const auto service_qos = rclcpp::QoS(rclcpp::QoSInitialization::from_rmw(rmw_qos_profile_services_default)); + + report_fault_client_ = node_->create_client( + fault_manager_base_path_ + "/report_fault", service_qos, callback_group_); + get_fault_client_ = node_->create_client(fault_manager_base_path_ + "/get_fault", + service_qos, callback_group_); + list_faults_client_ = node_->create_client( + fault_manager_base_path_ + "/list_faults", service_qos, callback_group_); + clear_fault_client_ = node_->create_client( + fault_manager_base_path_ + "/clear_fault", service_qos, callback_group_); + get_snapshots_client_ = node_->create_client( + fault_manager_base_path_ + "/get_snapshots", service_qos, callback_group_); + get_rosbag_client_ = node_->create_client(fault_manager_base_path_ + "/get_rosbag", + service_qos, callback_group_); + list_rosbags_client_ = node_->create_client( + fault_manager_base_path_ + "/list_rosbags", service_qos, callback_group_); RCLCPP_INFO(node_->get_logger(), "Ros2FaultServiceTransport initialized (base_path=%s, timeout=%.1fs)", fault_manager_base_path_.c_str(), service_timeout_sec_); } Ros2FaultServiceTransport::~Ros2FaultServiceTransport() { - // Reset clients before implicit member destruction so any in-flight - // future-callback paths drop their references in a defined order. + // Tear down under the executor mutex so any in-flight RPC sees a consistent + // view, then drop the executor before the clients/callback group so it can + // unregister cleanly while the underlying entities still exist. + std::lock_guard lock(executor_mutex_); + + executor_.reset(); + report_fault_client_.reset(); get_fault_client_.reset(); list_faults_client_.reset(); @@ -62,6 +80,8 @@ Ros2FaultServiceTransport::~Ros2FaultServiceTransport() { get_snapshots_client_.reset(); get_rosbag_client_.reset(); list_rosbags_client_.reset(); + + callback_group_.reset(); } bool Ros2FaultServiceTransport::wait_for_services(std::chrono::duration timeout) { @@ -76,7 +96,6 @@ bool Ros2FaultServiceTransport::is_available() const { FaultResult Ros2FaultServiceTransport::report_fault(const std::string & fault_code, uint8_t severity, const std::string & description, const std::string & source_id) { - std::lock_guard lock(report_mutex_); FaultResult result; auto timeout = std::chrono::duration(service_timeout_sec_); @@ -93,9 +112,10 @@ FaultResult Ros2FaultServiceTransport::report_fault(const std::string & fault_co request->description = description; request->source_id = source_id; + std::lock_guard lock(executor_mutex_); auto future = report_fault_client_->async_send_request(request); - if (future.wait_for(timeout) != std::future_status::ready) { + if (executor_->spin_until_future_complete(future, timeout) != rclcpp::FutureReturnCode::SUCCESS) { result.success = false; result.error_message = "ReportFault service call timed out"; return result; @@ -114,7 +134,6 @@ FaultResult Ros2FaultServiceTransport::report_fault(const std::string & fault_co FaultResult Ros2FaultServiceTransport::list_faults(const std::string & source_id, bool include_prefailed, bool include_confirmed, bool include_cleared, bool include_healed, bool include_muted, bool include_clusters) { - std::lock_guard lock(list_mutex_); FaultResult result; auto timeout = std::chrono::duration(service_timeout_sec_); @@ -145,15 +164,19 @@ FaultResult Ros2FaultServiceTransport::list_faults(const std::string & source_id request->include_muted = include_muted; request->include_clusters = include_clusters; - auto future = list_faults_client_->async_send_request(request); + std::shared_ptr response; + { + std::lock_guard lock(executor_mutex_); + auto future = list_faults_client_->async_send_request(request); - if (future.wait_for(timeout) != std::future_status::ready) { - result.success = false; - result.error_message = "ListFaults service call timed out"; - return result; - } + if (executor_->spin_until_future_complete(future, timeout) != rclcpp::FutureReturnCode::SUCCESS) { + result.success = false; + result.error_message = "ListFaults service call timed out"; + return result; + } - auto response = future.get(); + response = future.get(); + } // Filter by source_id if provided (uses prefix matching) json faults_array = json::array(); @@ -216,7 +239,6 @@ FaultResult Ros2FaultServiceTransport::list_faults(const std::string & source_id FaultWithEnvJsonResult Ros2FaultServiceTransport::get_fault_with_env(const std::string & fault_code, const std::string & source_id) { - std::lock_guard lock(get_mutex_); FaultWithEnvJsonResult result; auto timeout = std::chrono::duration(service_timeout_sec_); @@ -229,15 +251,20 @@ FaultWithEnvJsonResult Ros2FaultServiceTransport::get_fault_with_env(const std:: auto request = std::make_shared(); request->fault_code = fault_code; - auto future = get_fault_client_->async_send_request(request); + std::shared_ptr response; + { + std::lock_guard lock(executor_mutex_); + auto future = get_fault_client_->async_send_request(request); - if (future.wait_for(timeout) != std::future_status::ready) { - result.success = false; - result.error_message = "GetFault service call timed out"; - return result; + if (executor_->spin_until_future_complete(future, timeout) != rclcpp::FutureReturnCode::SUCCESS) { + result.success = false; + result.error_message = "GetFault service call timed out"; + return result; + } + + response = future.get(); } - auto response = future.get(); if (!response->success) { result.success = false; result.error_message = response->error_message; @@ -282,7 +309,6 @@ FaultResult Ros2FaultServiceTransport::get_fault(const std::string & fault_code, } FaultResult Ros2FaultServiceTransport::clear_fault(const std::string & fault_code) { - std::lock_guard lock(clear_mutex_); FaultResult result; auto timeout = std::chrono::duration(service_timeout_sec_); @@ -295,9 +321,10 @@ FaultResult Ros2FaultServiceTransport::clear_fault(const std::string & fault_cod auto request = std::make_shared(); request->fault_code = fault_code; + std::lock_guard lock(executor_mutex_); auto future = clear_fault_client_->async_send_request(request); - if (future.wait_for(timeout) != std::future_status::ready) { + if (executor_->spin_until_future_complete(future, timeout) != rclcpp::FutureReturnCode::SUCCESS) { result.success = false; result.error_message = "ClearFault service call timed out"; return result; @@ -318,7 +345,6 @@ FaultResult Ros2FaultServiceTransport::clear_fault(const std::string & fault_cod } FaultResult Ros2FaultServiceTransport::get_snapshots(const std::string & fault_code, const std::string & topic) { - std::lock_guard lock(snapshots_mutex_); FaultResult result; auto timeout = std::chrono::duration(service_timeout_sec_); @@ -332,15 +358,20 @@ FaultResult Ros2FaultServiceTransport::get_snapshots(const std::string & fault_c request->fault_code = fault_code; request->topic = topic; - auto future = get_snapshots_client_->async_send_request(request); + std::shared_ptr response; + { + std::lock_guard lock(executor_mutex_); + auto future = get_snapshots_client_->async_send_request(request); - if (future.wait_for(timeout) != std::future_status::ready) { - result.success = false; - result.error_message = "GetSnapshots service call timed out"; - return result; + if (executor_->spin_until_future_complete(future, timeout) != rclcpp::FutureReturnCode::SUCCESS) { + result.success = false; + result.error_message = "GetSnapshots service call timed out"; + return result; + } + + response = future.get(); } - auto response = future.get(); result.success = response->success; if (response->success) { @@ -361,7 +392,6 @@ FaultResult Ros2FaultServiceTransport::get_snapshots(const std::string & fault_c } FaultResult Ros2FaultServiceTransport::get_rosbag(const std::string & fault_code) { - std::lock_guard lock(rosbag_mutex_); FaultResult result; auto timeout = std::chrono::duration(service_timeout_sec_); @@ -374,15 +404,20 @@ FaultResult Ros2FaultServiceTransport::get_rosbag(const std::string & fault_code auto request = std::make_shared(); request->fault_code = fault_code; - auto future = get_rosbag_client_->async_send_request(request); + std::shared_ptr response; + { + std::lock_guard lock(executor_mutex_); + auto future = get_rosbag_client_->async_send_request(request); - if (future.wait_for(timeout) != std::future_status::ready) { - result.success = false; - result.error_message = "GetRosbag service call timed out"; - return result; + if (executor_->spin_until_future_complete(future, timeout) != rclcpp::FutureReturnCode::SUCCESS) { + result.success = false; + result.error_message = "GetRosbag service call timed out"; + return result; + } + + response = future.get(); } - auto response = future.get(); result.success = response->success; if (response->success) { @@ -398,7 +433,6 @@ FaultResult Ros2FaultServiceTransport::get_rosbag(const std::string & fault_code } FaultResult Ros2FaultServiceTransport::list_rosbags(const std::string & entity_fqn) { - std::lock_guard lock(list_rosbags_mutex_); FaultResult result; auto timeout = std::chrono::duration(service_timeout_sec_); @@ -411,15 +445,20 @@ FaultResult Ros2FaultServiceTransport::list_rosbags(const std::string & entity_f auto request = std::make_shared(); request->entity_fqn = entity_fqn; - auto future = list_rosbags_client_->async_send_request(request); + std::shared_ptr response; + { + std::lock_guard lock(executor_mutex_); + auto future = list_rosbags_client_->async_send_request(request); - if (future.wait_for(timeout) != std::future_status::ready) { - result.success = false; - result.error_message = "ListRosbags service call timed out"; - return result; + if (executor_->spin_until_future_complete(future, timeout) != rclcpp::FutureReturnCode::SUCCESS) { + result.success = false; + result.error_message = "ListRosbags service call timed out"; + return result; + } + + response = future.get(); } - auto response = future.get(); result.success = response->success; if (response->success) { From 10328c89fb6c5d083dd3a2f34f27a6fdec53e0d6 Mon Sep 17 00:00:00 2001 From: Bartosz Burda Date: Fri, 15 May 2026 10:21:44 +0200 Subject: [PATCH 2/2] fix(gateway): host fault service clients on a private node Build the seven fault service clients on a private rclcpp::Node owned by Ros2FaultServiceTransport instead of a dedicated callback group on the host node. The private node is driven by the transport's own SingleThreadedExecutor via spin_until_future_complete(), so the host gateway node's executor never processes these clients and the client callback / response destruction stay on the caller's thread. This avoids the create_client(name, rclcpp::QoS, group) overload, which does not exist on Humble (rclcpp there only ships the rmw_qos_profile_t form), and keeps the no-naked-subscriptions regression gate satisfied without an allowlist entry, since create_client and create_node are not gated APIs. All seven RPC methods now share one consistent shape: build the request unlocked, then under executor_mutex_ wait for the service, send, spin to completion and take the response; response translation happens unlocked. Adds a regression test that completes a get_snapshots RPC while the host node is never spun, proving the transport drives its private client itself. --- scripts/check_no_naked_subscriptions.sh | 1 - .../ros2_fault_service_transport.hpp | 30 +-- .../ros2_fault_service_transport.cpp | 205 ++++++++---------- .../test/test_fault_manager.cpp | 34 +++ 4 files changed, 144 insertions(+), 126 deletions(-) diff --git a/scripts/check_no_naked_subscriptions.sh b/scripts/check_no_naked_subscriptions.sh index 710d330d..38f0309a 100755 --- a/scripts/check_no_naked_subscriptions.sh +++ b/scripts/check_no_naked_subscriptions.sh @@ -52,7 +52,6 @@ ALLOWED_LEGACY_FILES=( "${FAULT_MANAGER_ROOT}/src/snapshot_capture.cpp" # uses LockedSubscriptionGuard (in-place serialisation) "${FAULT_MANAGER_ROOT}/include/ros2_medkit_fault_manager/snapshot_capture.hpp" # comment references the guarded API "${FAULT_MANAGER_ROOT}/src/rosbag_capture.cpp" # bag-recorder spawns its own node + executor, no shared rcl hash map - "${GATEWAY_ROOT}/src/ros2/transports/ros2_fault_service_transport.cpp" # issue #399: single-threaded callback group at construction for synchronous service-client spin ) LEGACY_PATTERN="" diff --git a/src/ros2_medkit_gateway/include/ros2_medkit_gateway/ros2/transports/ros2_fault_service_transport.hpp b/src/ros2_medkit_gateway/include/ros2_medkit_gateway/ros2/transports/ros2_fault_service_transport.hpp index 71ca2a17..78e81e51 100644 --- a/src/ros2_medkit_gateway/include/ros2_medkit_gateway/ros2/transports/ros2_fault_service_transport.hpp +++ b/src/ros2_medkit_gateway/include/ros2_medkit_gateway/ros2/transports/ros2_fault_service_transport.hpp @@ -35,12 +35,12 @@ namespace ros2_medkit_gateway::ros2 { /** * @brief rclcpp adapter implementing FaultServiceTransport. * - * Owns seven `rclcpp::Client` instances scoped to a - * dedicated `MutuallyExclusive` callback group, plus a private - * `SingleThreadedExecutor` that drives that group. Each RPC blocks on - * `executor_.spin_until_future_complete()` so the client's pending-request - * cleanup and the response destruction happen on the calling thread instead - * of racing with whichever external executor spins the host node. + * Owns seven `rclcpp::Client` instances created on a + * private `rclcpp::Node` plus a private `SingleThreadedExecutor` that drives + * that node. Each RPC blocks on `executor_->spin_until_future_complete()`, so + * the client's pending-request cleanup and the response destruction both run + * on the calling thread instead of racing with whatever executor spins the + * host gateway node. * * Performs the ros2_medkit_msgs <-> JSON translation internally, returning * neutral FaultResult and FaultWithEnvJsonResult structures so the FaultManager @@ -49,8 +49,10 @@ namespace ros2_medkit_gateway::ros2 { class Ros2FaultServiceTransport : public FaultServiceTransport { public: /** - * @param node Non-owning ROS node used for client creation and to derive - * the configured fault_manager namespace. + * @param node Non-owning ROS node used to read the service timeout + * parameter and to derive the configured fault_manager + * namespace. The service clients themselves live on a private + * node owned by this transport. */ explicit Ros2FaultServiceTransport(rclcpp::Node * node); @@ -87,13 +89,13 @@ class Ros2FaultServiceTransport : public FaultServiceTransport { private: rclcpp::Node * node_; - /// Dedicated callback group for all fault service clients. Created with - /// `automatically_add_to_executor_with_node = false` so the host node's - /// executor does not spin these clients - we drive them ourselves. - rclcpp::CallbackGroup::SharedPtr callback_group_; + /// Private node hosting the fault service clients. Kept off the host node's + /// executor so the gateway's MultiThreadedExecutor never processes these + /// clients' responses concurrently with our synchronous spin. + std::shared_ptr client_node_; - /// Private single-threaded executor that owns `callback_group_` and is - /// driven inline via `spin_until_future_complete()` on each RPC. + /// Private single-threaded executor that drives `client_node_`, spun inline + /// via `spin_until_future_complete()` on each RPC. std::shared_ptr executor_; rclcpp::Client::SharedPtr report_fault_client_; diff --git a/src/ros2_medkit_gateway/src/ros2/transports/ros2_fault_service_transport.cpp b/src/ros2_medkit_gateway/src/ros2/transports/ros2_fault_service_transport.cpp index 5eb8938d..accd57cd 100644 --- a/src/ros2_medkit_gateway/src/ros2/transports/ros2_fault_service_transport.cpp +++ b/src/ros2_medkit_gateway/src/ros2/transports/ros2_fault_service_transport.cpp @@ -34,45 +34,40 @@ Ros2FaultServiceTransport::Ros2FaultServiceTransport(rclcpp::Node * node) : node } fault_manager_base_path_ = build_fault_manager_base_path(node_); - // Dedicated callback group for all fault clients. `false` keeps it out of - // the host node's main executor so the production gateway's - // MultiThreadedExecutor cannot race with our synchronous spin on the - // pending-request cleanup path (see issue #399). - callback_group_ = node_->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive, false); - + // The fault service clients live on a private node driven by a private + // executor. The host gateway node's MultiThreadedExecutor therefore never + // processes these clients, so the client's pending-request cleanup cannot + // race against the calling thread destroying the response shared_ptr - both + // happen inline on the caller's thread inside spin_until_future_complete(). + client_node_ = std::make_shared(std::string(node_->get_name()) + "_fault_clients"); executor_ = std::make_shared(); - executor_->add_callback_group(callback_group_, node_->get_node_base_interface()); - - // Portable across Humble/Jazzy/Rolling - `rclcpp::ServicesQoS` only exists from Iron onwards. - const auto service_qos = rclcpp::QoS(rclcpp::QoSInitialization::from_rmw(rmw_qos_profile_services_default)); - - report_fault_client_ = node_->create_client( - fault_manager_base_path_ + "/report_fault", service_qos, callback_group_); - get_fault_client_ = node_->create_client(fault_manager_base_path_ + "/get_fault", - service_qos, callback_group_); - list_faults_client_ = node_->create_client( - fault_manager_base_path_ + "/list_faults", service_qos, callback_group_); - clear_fault_client_ = node_->create_client( - fault_manager_base_path_ + "/clear_fault", service_qos, callback_group_); - get_snapshots_client_ = node_->create_client( - fault_manager_base_path_ + "/get_snapshots", service_qos, callback_group_); - get_rosbag_client_ = node_->create_client(fault_manager_base_path_ + "/get_rosbag", - service_qos, callback_group_); - list_rosbags_client_ = node_->create_client( - fault_manager_base_path_ + "/list_rosbags", service_qos, callback_group_); + executor_->add_node(client_node_); + + report_fault_client_ = + client_node_->create_client(fault_manager_base_path_ + "/report_fault"); + get_fault_client_ = + client_node_->create_client(fault_manager_base_path_ + "/get_fault"); + list_faults_client_ = + client_node_->create_client(fault_manager_base_path_ + "/list_faults"); + clear_fault_client_ = + client_node_->create_client(fault_manager_base_path_ + "/clear_fault"); + get_snapshots_client_ = + client_node_->create_client(fault_manager_base_path_ + "/get_snapshots"); + get_rosbag_client_ = + client_node_->create_client(fault_manager_base_path_ + "/get_rosbag"); + list_rosbags_client_ = + client_node_->create_client(fault_manager_base_path_ + "/list_rosbags"); RCLCPP_INFO(node_->get_logger(), "Ros2FaultServiceTransport initialized (base_path=%s, timeout=%.1fs)", fault_manager_base_path_.c_str(), service_timeout_sec_); } Ros2FaultServiceTransport::~Ros2FaultServiceTransport() { - // Tear down under the executor mutex so any in-flight RPC sees a consistent - // view, then drop the executor before the clients/callback group so it can - // unregister cleanly while the underlying entities still exist. + // Tear down under the executor mutex so any in-flight RPC has finished. Drop + // the clients first, then the executor, then the node, so the executor never + // references a freed node. std::lock_guard lock(executor_mutex_); - executor_.reset(); - report_fault_client_.reset(); get_fault_client_.reset(); list_faults_client_.reset(); @@ -81,7 +76,8 @@ Ros2FaultServiceTransport::~Ros2FaultServiceTransport() { get_rosbag_client_.reset(); list_rosbags_client_.reset(); - callback_group_.reset(); + executor_.reset(); + client_node_.reset(); } bool Ros2FaultServiceTransport::wait_for_services(std::chrono::duration timeout) { @@ -97,13 +93,7 @@ bool Ros2FaultServiceTransport::is_available() const { FaultResult Ros2FaultServiceTransport::report_fault(const std::string & fault_code, uint8_t severity, const std::string & description, const std::string & source_id) { FaultResult result; - - auto timeout = std::chrono::duration(service_timeout_sec_); - if (!report_fault_client_->wait_for_service(timeout)) { - result.success = false; - result.error_message = "ReportFault service not available"; - return result; - } + const auto timeout = std::chrono::duration(service_timeout_sec_); auto request = std::make_shared(); request->fault_code = fault_code; @@ -112,16 +102,23 @@ FaultResult Ros2FaultServiceTransport::report_fault(const std::string & fault_co request->description = description; request->source_id = source_id; - std::lock_guard lock(executor_mutex_); - auto future = report_fault_client_->async_send_request(request); - - if (executor_->spin_until_future_complete(future, timeout) != rclcpp::FutureReturnCode::SUCCESS) { - result.success = false; - result.error_message = "ReportFault service call timed out"; - return result; + ros2_medkit_msgs::srv::ReportFault::Response::SharedPtr response; + { + std::lock_guard lock(executor_mutex_); + if (!report_fault_client_->wait_for_service(timeout)) { + result.success = false; + result.error_message = "ReportFault service not available"; + return result; + } + auto future = report_fault_client_->async_send_request(request); + if (executor_->spin_until_future_complete(future, timeout) != rclcpp::FutureReturnCode::SUCCESS) { + result.success = false; + result.error_message = "ReportFault service call timed out"; + return result; + } + response = future.get(); } - auto response = future.get(); result.success = response->accepted; result.data = {{"accepted", response->accepted}}; if (!response->accepted) { @@ -135,13 +132,7 @@ FaultResult Ros2FaultServiceTransport::list_faults(const std::string & source_id bool include_confirmed, bool include_cleared, bool include_healed, bool include_muted, bool include_clusters) { FaultResult result; - - auto timeout = std::chrono::duration(service_timeout_sec_); - if (!list_faults_client_->wait_for_service(timeout)) { - result.success = false; - result.error_message = "ListFaults service not available"; - return result; - } + const auto timeout = std::chrono::duration(service_timeout_sec_); auto request = std::make_shared(); request->filter_by_severity = false; @@ -164,17 +155,20 @@ FaultResult Ros2FaultServiceTransport::list_faults(const std::string & source_id request->include_muted = include_muted; request->include_clusters = include_clusters; - std::shared_ptr response; + ros2_medkit_msgs::srv::ListFaults::Response::SharedPtr response; { std::lock_guard lock(executor_mutex_); + if (!list_faults_client_->wait_for_service(timeout)) { + result.success = false; + result.error_message = "ListFaults service not available"; + return result; + } auto future = list_faults_client_->async_send_request(request); - if (executor_->spin_until_future_complete(future, timeout) != rclcpp::FutureReturnCode::SUCCESS) { result.success = false; result.error_message = "ListFaults service call timed out"; return result; } - response = future.get(); } @@ -240,28 +234,25 @@ FaultResult Ros2FaultServiceTransport::list_faults(const std::string & source_id FaultWithEnvJsonResult Ros2FaultServiceTransport::get_fault_with_env(const std::string & fault_code, const std::string & source_id) { FaultWithEnvJsonResult result; - - auto timeout = std::chrono::duration(service_timeout_sec_); - if (!get_fault_client_->wait_for_service(timeout)) { - result.success = false; - result.error_message = "GetFault service not available"; - return result; - } + const auto timeout = std::chrono::duration(service_timeout_sec_); auto request = std::make_shared(); request->fault_code = fault_code; - std::shared_ptr response; + ros2_medkit_msgs::srv::GetFault::Response::SharedPtr response; { std::lock_guard lock(executor_mutex_); + if (!get_fault_client_->wait_for_service(timeout)) { + result.success = false; + result.error_message = "GetFault service not available"; + return result; + } auto future = get_fault_client_->async_send_request(request); - if (executor_->spin_until_future_complete(future, timeout) != rclcpp::FutureReturnCode::SUCCESS) { result.success = false; result.error_message = "GetFault service call timed out"; return result; } - response = future.get(); } @@ -310,27 +301,28 @@ FaultResult Ros2FaultServiceTransport::get_fault(const std::string & fault_code, FaultResult Ros2FaultServiceTransport::clear_fault(const std::string & fault_code) { FaultResult result; - - auto timeout = std::chrono::duration(service_timeout_sec_); - if (!clear_fault_client_->wait_for_service(timeout)) { - result.success = false; - result.error_message = "ClearFault service not available"; - return result; - } + const auto timeout = std::chrono::duration(service_timeout_sec_); auto request = std::make_shared(); request->fault_code = fault_code; - std::lock_guard lock(executor_mutex_); - auto future = clear_fault_client_->async_send_request(request); - - if (executor_->spin_until_future_complete(future, timeout) != rclcpp::FutureReturnCode::SUCCESS) { - result.success = false; - result.error_message = "ClearFault service call timed out"; - return result; + ros2_medkit_msgs::srv::ClearFault::Response::SharedPtr response; + { + std::lock_guard lock(executor_mutex_); + if (!clear_fault_client_->wait_for_service(timeout)) { + result.success = false; + result.error_message = "ClearFault service not available"; + return result; + } + auto future = clear_fault_client_->async_send_request(request); + if (executor_->spin_until_future_complete(future, timeout) != rclcpp::FutureReturnCode::SUCCESS) { + result.success = false; + result.error_message = "ClearFault service call timed out"; + return result; + } + response = future.get(); } - auto response = future.get(); result.success = response->success; result.data = {{"success", response->success}, {"message", response->message}}; if (!response->success) { @@ -346,29 +338,26 @@ FaultResult Ros2FaultServiceTransport::clear_fault(const std::string & fault_cod FaultResult Ros2FaultServiceTransport::get_snapshots(const std::string & fault_code, const std::string & topic) { FaultResult result; - - auto timeout = std::chrono::duration(service_timeout_sec_); - if (!get_snapshots_client_->wait_for_service(timeout)) { - result.success = false; - result.error_message = "GetSnapshots service not available"; - return result; - } + const auto timeout = std::chrono::duration(service_timeout_sec_); auto request = std::make_shared(); request->fault_code = fault_code; request->topic = topic; - std::shared_ptr response; + ros2_medkit_msgs::srv::GetSnapshots::Response::SharedPtr response; { std::lock_guard lock(executor_mutex_); + if (!get_snapshots_client_->wait_for_service(timeout)) { + result.success = false; + result.error_message = "GetSnapshots service not available"; + return result; + } auto future = get_snapshots_client_->async_send_request(request); - if (executor_->spin_until_future_complete(future, timeout) != rclcpp::FutureReturnCode::SUCCESS) { result.success = false; result.error_message = "GetSnapshots service call timed out"; return result; } - response = future.get(); } @@ -393,28 +382,25 @@ FaultResult Ros2FaultServiceTransport::get_snapshots(const std::string & fault_c FaultResult Ros2FaultServiceTransport::get_rosbag(const std::string & fault_code) { FaultResult result; - - auto timeout = std::chrono::duration(service_timeout_sec_); - if (!get_rosbag_client_->wait_for_service(timeout)) { - result.success = false; - result.error_message = "GetRosbag service not available"; - return result; - } + const auto timeout = std::chrono::duration(service_timeout_sec_); auto request = std::make_shared(); request->fault_code = fault_code; - std::shared_ptr response; + ros2_medkit_msgs::srv::GetRosbag::Response::SharedPtr response; { std::lock_guard lock(executor_mutex_); + if (!get_rosbag_client_->wait_for_service(timeout)) { + result.success = false; + result.error_message = "GetRosbag service not available"; + return result; + } auto future = get_rosbag_client_->async_send_request(request); - if (executor_->spin_until_future_complete(future, timeout) != rclcpp::FutureReturnCode::SUCCESS) { result.success = false; result.error_message = "GetRosbag service call timed out"; return result; } - response = future.get(); } @@ -434,28 +420,25 @@ FaultResult Ros2FaultServiceTransport::get_rosbag(const std::string & fault_code FaultResult Ros2FaultServiceTransport::list_rosbags(const std::string & entity_fqn) { FaultResult result; - - auto timeout = std::chrono::duration(service_timeout_sec_); - if (!list_rosbags_client_->wait_for_service(timeout)) { - result.success = false; - result.error_message = "ListRosbags service not available"; - return result; - } + const auto timeout = std::chrono::duration(service_timeout_sec_); auto request = std::make_shared(); request->entity_fqn = entity_fqn; - std::shared_ptr response; + ros2_medkit_msgs::srv::ListRosbags::Response::SharedPtr response; { std::lock_guard lock(executor_mutex_); + if (!list_rosbags_client_->wait_for_service(timeout)) { + result.success = false; + result.error_message = "ListRosbags service not available"; + return result; + } auto future = list_rosbags_client_->async_send_request(request); - if (executor_->spin_until_future_complete(future, timeout) != rclcpp::FutureReturnCode::SUCCESS) { result.success = false; result.error_message = "ListRosbags service call timed out"; return result; } - response = future.get(); } diff --git a/src/ros2_medkit_gateway/test/test_fault_manager.cpp b/src/ros2_medkit_gateway/test/test_fault_manager.cpp index 99569ae1..1631e54b 100644 --- a/src/ros2_medkit_gateway/test/test_fault_manager.cpp +++ b/src/ros2_medkit_gateway/test/test_fault_manager.cpp @@ -168,6 +168,40 @@ TEST_F(FaultManagerTest, GetSnapshotsSuccessWithValidJson) { EXPECT_TRUE(result.data["topics"].contains("/joint_states")); } +// Regression guard: Ros2FaultServiceTransport must drive its service clients +// on its own private executor. The transport's host node (node_) is left +// unspun here - the mock service runs on a separate node with its own spin +// thread - so the RPC can only complete if the transport spins its private +// client node itself. +TEST_F(FaultManagerTest, GetSnapshotsDrivesPrivateClientWithoutSpinningHostNode) { + auto service_node = std::make_shared("mock_fault_service_node_" + std::to_string(test_counter_++)); + auto service = service_node->create_service( + "/fault_manager/get_snapshots", [](const std::shared_ptr & request, + const std::shared_ptr & response) { + response->success = true; + nlohmann::json snapshot_data; + snapshot_data["fault_code"] = request->fault_code; + response->data = snapshot_data.dump(); + }); + + rclcpp::executors::SingleThreadedExecutor service_executor; + service_executor.add_node(service_node); + std::thread service_thread([&service_executor]() { + service_executor.spin(); + }); + + // node_ is deliberately never spun by this test. + FaultManager fault_manager(std::make_shared(node_.get())); + auto result = fault_manager.get_snapshots("SELF_DRIVEN_FAULT"); + + service_executor.cancel(); + service_thread.join(); + + EXPECT_TRUE(result.success); + EXPECT_TRUE(result.error_message.empty()); + EXPECT_EQ(result.data["fault_code"], "SELF_DRIVEN_FAULT"); +} + // @verifies REQ_INTEROP_088 TEST_F(FaultManagerTest, GetSnapshotsUsesConfiguredFaultManagerNamespace) { node_ = std::make_shared(