From 2ca4e68e5c3511f23f108d4309b2355c7c50adcd Mon Sep 17 00:00:00 2001 From: jay Date: Thu, 12 Mar 2026 13:20:34 +0000 Subject: [PATCH 1/3] port ipc statistics fix --- .../subscription_intra_process.hpp | 36 +++++++++++++++++-- rclcpp/include/rclcpp/subscription.hpp | 5 +-- 2 files changed, 36 insertions(+), 5 deletions(-) diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp index 0624f92c62..7fed14bb69 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp @@ -30,9 +30,19 @@ #include "rclcpp/experimental/buffers/intra_process_buffer.hpp" #include "rclcpp/experimental/subscription_intra_process_buffer.hpp" #include "rclcpp/qos.hpp" +#include "rclcpp/time.hpp" #include "rclcpp/type_support_decl.hpp" +#include "rclcpp/logging.hpp" #include "tracetools/tracetools.h" +namespace rclcpp +{ +namespace topic_statistics +{ +class SubscriptionTopicStatistics; +} // namespace topic_statistics +} // namespace rclcpp + namespace rclcpp { namespace experimental @@ -70,6 +80,8 @@ class SubscriptionIntraProcess using ConstMessageSharedPtr = typename SubscriptionIntraProcessBufferT::ConstDataSharedPtr; using MessageUniquePtr = typename SubscriptionIntraProcessBufferT::SubscribedTypeUniquePtr; using BufferUniquePtr = typename SubscriptionIntraProcessBufferT::BufferUniquePtr; + using SubscriptionTopicStatisticsSharedPtr = + std::shared_ptr; SubscriptionIntraProcess( AnySubscriptionCallback callback, @@ -77,7 +89,8 @@ class SubscriptionIntraProcess rclcpp::Context::SharedPtr context, const std::string & topic_name, const rclcpp::QoS & qos_profile, - rclcpp::IntraProcessBufferType buffer_type) + rclcpp::IntraProcessBufferType buffer_type, + SubscriptionTopicStatisticsSharedPtr subscription_topic_statistics = nullptr) : SubscriptionIntraProcessBuffer( std::make_shared(*allocator), @@ -85,7 +98,8 @@ class SubscriptionIntraProcess topic_name, qos_profile, buffer_type), - any_callback_(callback) + any_callback_(callback), + subscription_topic_statistics_(std::move(subscription_topic_statistics)) { TRACETOOLS_TRACEPOINT( rclcpp_subscription_callback_added, @@ -174,6 +188,14 @@ class SubscriptionIntraProcess msg_info.publisher_gid = {0, {0}}; msg_info.from_intra_process = true; + std::chrono::time_point now; + if (subscription_topic_statistics_) { + RCLCPP_WARN_ONCE( + rclcpp::get_logger("rclcpp"), + "Intra-process communication does not support accurate message age statistics"); + now = std::chrono::system_clock::now(); + } + auto shared_ptr = std::static_pointer_cast>( data); @@ -185,12 +207,20 @@ class SubscriptionIntraProcess any_callback_.dispatch_intra_process(std::move(unique_msg), msg_info); } shared_ptr.reset(); + + if (subscription_topic_statistics_) { + const auto nanos = std::chrono::time_point_cast(now); + const auto time = rclcpp::Time(nanos.time_since_epoch().count()); + subscription_topic_statistics_->handle_message(msg_info, time); + } } AnySubscriptionCallback any_callback_; + /// Optional statistics calculator for intra-process deliveries. + SubscriptionTopicStatisticsSharedPtr subscription_topic_statistics_; }; } // namespace experimental } // namespace rclcpp -#endif // RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_HPP_ +#endif // RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_HPP_ \ No newline at end of file diff --git a/rclcpp/include/rclcpp/subscription.hpp b/rclcpp/include/rclcpp/subscription.hpp index 20f2b575f7..8555264477 100644 --- a/rclcpp/include/rclcpp/subscription.hpp +++ b/rclcpp/include/rclcpp/subscription.hpp @@ -172,7 +172,8 @@ class Subscription : public SubscriptionBase context, this->get_topic_name(), // important to get like this, as it has the fully-qualified name qos_profile, - resolve_intra_process_buffer_type(options_.intra_process_buffer_type, callback)); + resolve_intra_process_buffer_type(options_.intra_process_buffer_type, callback), + subscription_topic_statistics); TRACETOOLS_TRACEPOINT( rclcpp_subscription_init, static_cast(get_subscription_handle().get()), @@ -454,4 +455,4 @@ class Subscription : public SubscriptionBase } // namespace rclcpp -#endif // RCLCPP__SUBSCRIPTION_HPP_ +#endif // RCLCPP__SUBSCRIPTION_HPP_ \ No newline at end of file From 98ee7bc0d50e990c30eafa4f29c58715ee767f18 Mon Sep 17 00:00:00 2001 From: jay Date: Thu, 12 Mar 2026 14:02:00 +0000 Subject: [PATCH 2/3] lambda fix --- .../subscription_intra_process.hpp | 32 ++++++++----------- rclcpp/include/rclcpp/subscription.hpp | 15 ++++++++- 2 files changed, 27 insertions(+), 20 deletions(-) diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp index 7fed14bb69..ae938c04a9 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp @@ -17,6 +17,7 @@ #include +#include #include #include #include @@ -29,20 +30,12 @@ #include "rclcpp/context.hpp" #include "rclcpp/experimental/buffers/intra_process_buffer.hpp" #include "rclcpp/experimental/subscription_intra_process_buffer.hpp" +#include "rclcpp/logging.hpp" #include "rclcpp/qos.hpp" #include "rclcpp/time.hpp" #include "rclcpp/type_support_decl.hpp" -#include "rclcpp/logging.hpp" #include "tracetools/tracetools.h" -namespace rclcpp -{ -namespace topic_statistics -{ -class SubscriptionTopicStatistics; -} // namespace topic_statistics -} // namespace rclcpp - namespace rclcpp { namespace experimental @@ -80,8 +73,10 @@ class SubscriptionIntraProcess using ConstMessageSharedPtr = typename SubscriptionIntraProcessBufferT::ConstDataSharedPtr; using MessageUniquePtr = typename SubscriptionIntraProcessBufferT::SubscribedTypeUniquePtr; using BufferUniquePtr = typename SubscriptionIntraProcessBufferT::BufferUniquePtr; - using SubscriptionTopicStatisticsSharedPtr = - std::shared_ptr; + + /// Type-erased stats handler to avoid pulling in subscription_topic_statistics.hpp + /// which creates a circular include via publisher.hpp -> callback_group.hpp. + using StatsHandlerFn = std::function; SubscriptionIntraProcess( AnySubscriptionCallback callback, @@ -90,7 +85,7 @@ class SubscriptionIntraProcess const std::string & topic_name, const rclcpp::QoS & qos_profile, rclcpp::IntraProcessBufferType buffer_type, - SubscriptionTopicStatisticsSharedPtr subscription_topic_statistics = nullptr) + StatsHandlerFn stats_handler = nullptr) : SubscriptionIntraProcessBuffer( std::make_shared(*allocator), @@ -99,7 +94,7 @@ class SubscriptionIntraProcess qos_profile, buffer_type), any_callback_(callback), - subscription_topic_statistics_(std::move(subscription_topic_statistics)) + stats_handler_(std::move(stats_handler)) { TRACETOOLS_TRACEPOINT( rclcpp_subscription_callback_added, @@ -189,7 +184,7 @@ class SubscriptionIntraProcess msg_info.from_intra_process = true; std::chrono::time_point now; - if (subscription_topic_statistics_) { + if (stats_handler_) { RCLCPP_WARN_ONCE( rclcpp::get_logger("rclcpp"), "Intra-process communication does not support accurate message age statistics"); @@ -208,16 +203,15 @@ class SubscriptionIntraProcess } shared_ptr.reset(); - if (subscription_topic_statistics_) { + if (stats_handler_) { const auto nanos = std::chrono::time_point_cast(now); - const auto time = rclcpp::Time(nanos.time_since_epoch().count()); - subscription_topic_statistics_->handle_message(msg_info, time); + stats_handler_(msg_info, rclcpp::Time(nanos.time_since_epoch().count())); } } AnySubscriptionCallback any_callback_; - /// Optional statistics calculator for intra-process deliveries. - SubscriptionTopicStatisticsSharedPtr subscription_topic_statistics_; + /// Type-erased statistics callback, populated when topic statistics are enabled. + StatsHandlerFn stats_handler_; }; } // namespace experimental diff --git a/rclcpp/include/rclcpp/subscription.hpp b/rclcpp/include/rclcpp/subscription.hpp index 8555264477..26d2359ecb 100644 --- a/rclcpp/include/rclcpp/subscription.hpp +++ b/rclcpp/include/rclcpp/subscription.hpp @@ -164,6 +164,19 @@ class Subscription : public SubscriptionBase ROSMessageT, AllocatorT>; + // Build a type-erased stats handler to pass into the IPC subscription. + // This avoids pulling subscription_topic_statistics.hpp into the IPC + // header where it causes circular include issues. + typename SubscriptionIntraProcessT::StatsHandlerFn stats_handler = nullptr; + if (subscription_topic_statistics) { + stats_handler = + [subscription_topic_statistics]( + const rmw_message_info_t & info, const rclcpp::Time & time) + { + subscription_topic_statistics->handle_message(info, time); + }; + } + // First create a SubscriptionIntraProcess which will be given to the intra-process manager. auto context = node_base->get_context(); subscription_intra_process_ = std::make_shared( @@ -173,7 +186,7 @@ class Subscription : public SubscriptionBase this->get_topic_name(), // important to get like this, as it has the fully-qualified name qos_profile, resolve_intra_process_buffer_type(options_.intra_process_buffer_type, callback), - subscription_topic_statistics); + std::move(stats_handler)); TRACETOOLS_TRACEPOINT( rclcpp_subscription_init, static_cast(get_subscription_handle().get()), From 64dbc6516b3e99b7bb385eb0641a1deeb7576368 Mon Sep 17 00:00:00 2001 From: jay Date: Thu, 12 Mar 2026 14:35:51 +0000 Subject: [PATCH 3/3] timestamp fix --- .../include/rclcpp/experimental/subscription_intra_process.hpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp index ae938c04a9..32a2bdf92a 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp @@ -189,6 +189,8 @@ class SubscriptionIntraProcess rclcpp::get_logger("rclcpp"), "Intra-process communication does not support accurate message age statistics"); now = std::chrono::system_clock::now(); + const auto nanos = std::chrono::time_point_cast(now); + msg_info.source_timestamp = nanos.time_since_epoch().count(); } auto shared_ptr = std::static_pointer_cast>(