diff --git a/docs/api/rest.rst b/docs/api/rest.rst index 982f0fac..793ac01d 100644 --- a/docs/api/rest.rst +++ b/docs/api/rest.rst @@ -933,11 +933,26 @@ Without such a plugin, all endpoints return ``501 Not Implemented``. Cyclic Subscriptions -------------------- -Cyclic subscriptions provide periodic push-based data delivery via Server-Sent Events (SSE). -A client creates a subscription specifying which data resource to observe and at what interval. -The server then pushes the latest value of that resource at the requested frequency. +Cyclic subscriptions provide periodic push-based delivery of any SOVD resource collection +via Server-Sent Events (SSE). A client creates a subscription specifying the resource URI +(data, faults, configurations, logs, or ``x-`` vendor extensions) and a delivery interval. +The server then pushes the latest value at the requested frequency. -Subscriptions are temporary — they do not survive server restart. +Subscriptions are temporary - they do not survive server restart. + +**Supported collections:** + +- ``data`` - Topic data (requires a resource path, e.g. ``/data/temperature``) +- ``faults`` - Fault list (resource path optional, e.g. ``/faults`` or ``/faults/fault_001``) +- ``configurations`` - Parameter values (resource path optional) +- ``logs`` - Application log entries from ``/rosout`` +- ``x-*`` - Vendor extensions (e.g. ``x-medkit-metrics``) + +**Interval values:** + +- ``fast`` - 50ms sampling period +- ``normal`` - 200ms sampling period (default) +- ``slow`` - 500ms sampling period ``POST /api/v1/{entity_type}/{entity_id}/cyclic-subscriptions`` Create a new cyclic subscription. @@ -957,10 +972,22 @@ Subscriptions are temporary — they do not survive server restart. **Fields:** - - ``resource`` (string, required): Full URI of the data resource to observe + - ``resource`` (string, required): Full SOVD resource URI to observe + (e.g. ``/api/v1/apps/{id}/data/{topic}``, ``/api/v1/apps/{id}/faults``) - ``protocol`` (string, optional): Transport protocol. Only ``"sse"`` supported. Default: ``"sse"`` - - ``interval`` (string, required): One of ``fast`` (<100ms), ``normal`` (100-250ms), ``slow`` (250-500ms) - - ``duration`` (integer, required): Subscription lifetime in seconds (must be > 0) + - ``interval`` (string, required): One of ``fast``, ``normal``, ``slow`` + - ``duration`` (integer, required): Subscription lifetime in seconds. + Must be > 0 and <= ``sse.max_duration_sec`` (default: 3600) + + **Error responses:** + + - **400** ``invalid-parameter`` - Invalid interval, duration <= 0, or duration exceeds max + - **400** ``x-medkit-invalid-resource-uri`` - Malformed resource URI or path traversal + - **400** ``x-medkit-entity-mismatch`` - Resource URI references different entity than route + - **400** ``x-medkit-collection-not-supported`` - Entity doesn't support the collection + - **400** ``x-medkit-collection-not-available`` - No data provider registered for collection + - **400** ``x-medkit-unsupported-protocol`` - Requested protocol not available + - **503** ``service-unavailable`` - Max subscription capacity reached **Response 201 Created:** @@ -982,7 +1009,8 @@ Subscriptions are temporary — they do not survive server restart. ``PUT /api/v1/{entity_type}/{entity_id}/cyclic-subscriptions/{id}`` Update ``interval`` and/or ``duration`` of an existing subscription. - Only provided fields are updated. + Only provided fields are updated. Updating ``duration`` resets the + expiry timer from the current time (not from the original creation time). **Request Body:** @@ -1016,6 +1044,28 @@ Subscriptions are temporary — they do not survive server restart. The stream auto-closes when the duration expires, the client disconnects, or the subscription is deleted. +**Multi-collection examples:** + +Subscribe to faults on a component: + +.. code-block:: json + + { + "resource": "/api/v1/components/ecu1/faults", + "interval": "slow", + "duration": 600 + } + +Subscribe to a specific configuration parameter: + +.. code-block:: json + + { + "resource": "/api/v1/apps/temp_sensor/configurations/calibration_offset", + "interval": "normal", + "duration": 120 + } + Rate Limiting ------------- diff --git a/docs/config/server.rst b/docs/config/server.rst index 6eb8e7d1..da1584e8 100644 --- a/docs/config/server.rst +++ b/docs/config/server.rst @@ -244,6 +244,10 @@ Configure limits for SSE-based streaming (fault events and cyclic subscriptions) - int - ``100`` - Maximum number of active cyclic subscriptions across all entities. Returns HTTP 503 when this limit is reached. + * - ``sse.max_duration_sec`` + - int + - ``3600`` + - Maximum allowed subscription duration in seconds. Requests exceeding this are rejected with HTTP 400. Example: @@ -254,6 +258,7 @@ Example: sse: max_clients: 10 max_subscriptions: 100 + max_duration_sec: 3600 Plugin Framework ---------------- @@ -364,6 +369,7 @@ Complete Example sse: max_clients: 10 max_subscriptions: 100 + max_duration_sec: 3600 See Also -------- diff --git a/docs/tutorials/plugin-system.rst b/docs/tutorials/plugin-system.rst index 5f1b74c2..0562c690 100644 --- a/docs/tutorials/plugin-system.rst +++ b/docs/tutorials/plugin-system.rst @@ -295,6 +295,42 @@ For entity-scoped endpoints, register a matching capability via ``register_capab or ``register_entity_capability()`` in ``set_context()`` so the endpoint appears in the entity's capabilities array in discovery responses. +Cyclic Subscription Extensions +------------------------------- + +Plugins can extend cyclic subscriptions by registering custom resource samplers +and transport providers during ``set_context()``. + +**Resource Samplers** provide the data for a collection when sampled by a subscription. +Built-in samplers (``data``, ``faults``, ``configurations``, ``updates``) are registered +by the gateway during startup. Custom samplers are registered via ``ResourceSamplerRegistry`` +on the ``GatewayNode``: + +.. code-block:: cpp + + auto* sampler_registry = node->get_sampler_registry(); + sampler_registry->register_sampler("x-medkit-metrics", + [this](const std::string& entity_id, const std::string& resource_path) + -> tl::expected { + return get_metrics(entity_id, resource_path); + }); + +Once registered, clients can create cyclic subscriptions on the ``x-medkit-metrics`` +collection for any entity. + +**Transport Providers** deliver subscription data via alternative protocols (beyond +the built-in SSE transport). Register via ``TransportRegistry`` on the ``GatewayNode``: + +.. code-block:: cpp + + auto* transport_registry = node->get_transport_registry(); + transport_registry->register_transport( + std::make_unique(mqtt_client_)); + +The transport must implement ``SubscriptionTransportProvider`` (``start``, ``stop``, +``notify_update``, ``protocol()``). Clients specify the protocol in the subscription +creation request. + Multiple Plugins ---------------- diff --git a/src/ros2_medkit_gateway/CMakeLists.txt b/src/ros2_medkit_gateway/CMakeLists.txt index 7d74a15c..c736a3c1 100644 --- a/src/ros2_medkit_gateway/CMakeLists.txt +++ b/src/ros2_medkit_gateway/CMakeLists.txt @@ -82,6 +82,8 @@ add_library(gateway_lib STATIC src/fault_manager.cpp src/log_manager.cpp src/subscription_manager.cpp + src/resource_sampler.cpp + src/subscription_transport.cpp # Entity resource model src/models/entity_types.cpp src/models/entity_capabilities.cpp @@ -124,6 +126,7 @@ add_library(gateway_lib STATIC src/http/handlers/bulkdata_handlers.cpp src/http/handlers/sse_fault_handler.cpp src/http/handlers/cyclic_subscription_handlers.cpp + src/http/handlers/sse_transport_provider.cpp src/http/handlers/auth_handlers.cpp # Bulk data storage src/bulk_data_store.cpp @@ -383,10 +386,22 @@ if(BUILD_TESTING) ament_add_gtest(test_subscription_manager test/test_subscription_manager.cpp) target_link_libraries(test_subscription_manager gateway_lib) + # Add resource sampler registry tests + ament_add_gtest(test_resource_sampler_registry test/test_resource_sampler_registry.cpp) + target_link_libraries(test_resource_sampler_registry gateway_lib) + + # Add transport registry tests + ament_add_gtest(test_transport_registry test/test_transport_registry.cpp) + target_link_libraries(test_transport_registry gateway_lib) + # Add cyclic subscription handler tests ament_add_gtest(test_cyclic_subscription_handlers test/test_cyclic_subscription_handlers.cpp) target_link_libraries(test_cyclic_subscription_handlers gateway_lib) + # Add SSE transport provider tests + ament_add_gtest(test_sse_transport_provider test/test_sse_transport_provider.cpp) + target_link_libraries(test_sse_transport_provider gateway_lib) + # Add update manager tests ament_add_gtest(test_update_manager test/test_update_manager.cpp) target_link_libraries(test_update_manager gateway_lib) @@ -488,7 +503,10 @@ if(BUILD_TESTING) test_bulk_data_store test_bulkdata_handlers test_subscription_manager + test_resource_sampler_registry + test_transport_registry test_cyclic_subscription_handlers + test_sse_transport_provider test_update_manager test_data_handlers test_auth_handlers diff --git a/src/ros2_medkit_gateway/config/gateway_params.yaml b/src/ros2_medkit_gateway/config/gateway_params.yaml index 3af7d64d..5ee65f23 100644 --- a/src/ros2_medkit_gateway/config/gateway_params.yaml +++ b/src/ros2_medkit_gateway/config/gateway_params.yaml @@ -112,6 +112,11 @@ ros2_medkit_gateway: # Default: 100 max_subscriptions: 100 + # Maximum allowed subscription duration in seconds + # Subscriptions requesting a longer duration are rejected with HTTP 400 + # Default: 3600 (1 hour) + max_duration_sec: 3600 + # Logging Configuration logs: # Ring buffer capacity per node name. diff --git a/src/ros2_medkit_gateway/include/ros2_medkit_gateway/gateway_node.hpp b/src/ros2_medkit_gateway/include/ros2_medkit_gateway/gateway_node.hpp index cd431802..93b7b091 100644 --- a/src/ros2_medkit_gateway/include/ros2_medkit_gateway/gateway_node.hpp +++ b/src/ros2_medkit_gateway/include/ros2_medkit_gateway/gateway_node.hpp @@ -32,11 +32,14 @@ #include "ros2_medkit_gateway/fault_manager.hpp" #include "ros2_medkit_gateway/http/rate_limiter.hpp" #include "ros2_medkit_gateway/http/rest_server.hpp" +#include "ros2_medkit_gateway/http/sse_client_tracker.hpp" #include "ros2_medkit_gateway/log_manager.hpp" #include "ros2_medkit_gateway/models/thread_safe_entity_cache.hpp" #include "ros2_medkit_gateway/operation_manager.hpp" #include "ros2_medkit_gateway/plugins/plugin_manager.hpp" +#include "ros2_medkit_gateway/resource_sampler.hpp" #include "ros2_medkit_gateway/subscription_manager.hpp" +#include "ros2_medkit_gateway/subscription_transport.hpp" #include "ros2_medkit_gateway/updates/update_manager.hpp" namespace ros2_medkit_gateway { @@ -114,6 +117,24 @@ class GatewayNode : public rclcpp::Node { */ PluginManager * get_plugin_manager() const; + /** + * @brief Get the ResourceSamplerRegistry instance + * @return Raw pointer to ResourceSamplerRegistry (valid for lifetime of GatewayNode) + */ + ResourceSamplerRegistry * get_sampler_registry() const; + + /** + * @brief Get the TransportRegistry instance + * @return Raw pointer to TransportRegistry (valid for lifetime of GatewayNode) + */ + TransportRegistry * get_transport_registry() const; + + /** + * @brief Get the SSEClientTracker instance + * @return Shared pointer to SSEClientTracker + */ + std::shared_ptr get_sse_client_tracker() const; + private: void refresh_cache(); void start_rest_server(); @@ -137,6 +158,9 @@ class GatewayNode : public rclcpp::Node { std::unique_ptr log_mgr_; std::unique_ptr bulk_data_store_; std::unique_ptr subscription_mgr_; + std::unique_ptr sampler_registry_; + std::unique_ptr transport_registry_; + std::shared_ptr sse_client_tracker_; // IMPORTANT: plugin_mgr_ BEFORE update_mgr_ - C++ destroys in reverse order, // so update_mgr_ waits for async tasks before plugin_mgr_ destroys the plugin. // plugin_ctx_ is owned here (outlives plugins); plugin_mgr_ holds a non-owning ref. diff --git a/src/ros2_medkit_gateway/include/ros2_medkit_gateway/http/error_codes.hpp b/src/ros2_medkit_gateway/include/ros2_medkit_gateway/http/error_codes.hpp index 4d511d74..0e932ffd 100644 --- a/src/ros2_medkit_gateway/include/ros2_medkit_gateway/http/error_codes.hpp +++ b/src/ros2_medkit_gateway/include/ros2_medkit_gateway/http/error_codes.hpp @@ -114,6 +114,21 @@ constexpr const char * ERR_X_MEDKIT_UPDATE_NOT_PREPARED = "x-medkit-update-not-p /// Automated mode not supported for this package constexpr const char * ERR_X_MEDKIT_UPDATE_NOT_AUTOMATED = "x-medkit-update-not-automated"; +/// Vendor-specific: invalid resource URI format for subscriptions +constexpr const char * ERR_X_MEDKIT_INVALID_RESOURCE_URI = "x-medkit-invalid-resource-uri"; + +/// Vendor-specific: collection not supported for entity type +constexpr const char * ERR_X_MEDKIT_COLLECTION_NOT_SUPPORTED = "x-medkit-collection-not-supported"; + +/// Vendor-specific: no data provider registered for collection +constexpr const char * ERR_X_MEDKIT_COLLECTION_NOT_AVAILABLE = "x-medkit-collection-not-available"; + +/// Vendor-specific: resource URI entity doesn't match route entity +constexpr const char * ERR_X_MEDKIT_ENTITY_MISMATCH = "x-medkit-entity-mismatch"; + +/// Vendor-specific: unsupported subscription protocol +constexpr const char * ERR_X_MEDKIT_UNSUPPORTED_PROTOCOL = "x-medkit-unsupported-protocol"; + /** * @brief Check if an error code is a vendor-specific code * @param error_code Error code to check diff --git a/src/ros2_medkit_gateway/include/ros2_medkit_gateway/http/handlers/capability_builder.hpp b/src/ros2_medkit_gateway/include/ros2_medkit_gateway/http/handlers/capability_builder.hpp index e5d09c29..602e6d17 100644 --- a/src/ros2_medkit_gateway/include/ros2_medkit_gateway/http/handlers/capability_builder.hpp +++ b/src/ros2_medkit_gateway/include/ros2_medkit_gateway/http/handlers/capability_builder.hpp @@ -52,7 +52,7 @@ class CapabilityBuilder { RELATED_APPS, ///< Entity has related apps (components only) HOSTS, ///< Entity has host apps (functions/components) DEPENDS_ON, ///< Entity has dependencies (components only) - LOGS ///< Entity has communication logs (components and apps) + LOGS ///< Entity has application log entries (components and apps) }; /** diff --git a/src/ros2_medkit_gateway/include/ros2_medkit_gateway/http/handlers/cyclic_subscription_handlers.hpp b/src/ros2_medkit_gateway/include/ros2_medkit_gateway/http/handlers/cyclic_subscription_handlers.hpp index e5979345..3611502d 100644 --- a/src/ros2_medkit_gateway/include/ros2_medkit_gateway/http/handlers/cyclic_subscription_handlers.hpp +++ b/src/ros2_medkit_gateway/include/ros2_medkit_gateway/http/handlers/cyclic_subscription_handlers.hpp @@ -16,17 +16,25 @@ #include -#include #include #include #include "ros2_medkit_gateway/http/handlers/handler_context.hpp" -#include "ros2_medkit_gateway/http/sse_client_tracker.hpp" +#include "ros2_medkit_gateway/resource_sampler.hpp" #include "ros2_medkit_gateway/subscription_manager.hpp" +#include "ros2_medkit_gateway/subscription_transport.hpp" namespace ros2_medkit_gateway { namespace handlers { +/// Result of parsing a SOVD resource URI for subscription +struct ParsedResourceUri { + std::string entity_type; // "apps" or "components" + std::string entity_id; + std::string collection; // "data", "faults", "x-medkit-metrics", etc. + std::string resource_path; // path within collection (may be empty) +}; + /** * @brief HTTP handlers for cyclic subscription CRUD and SSE streaming. * @@ -41,7 +49,8 @@ namespace handlers { class CyclicSubscriptionHandlers { public: CyclicSubscriptionHandlers(HandlerContext & ctx, SubscriptionManager & sub_mgr, - std::shared_ptr client_tracker); + ResourceSamplerRegistry & sampler_registry, TransportRegistry & transport_registry, + int max_duration_sec); /// POST /{entity}/cyclic-subscriptions — create subscription void handle_create(const httplib::Request & req, httplib::Response & res); @@ -64,6 +73,9 @@ class CyclicSubscriptionHandlers { /// Convert subscription info to JSON response static nlohmann::json subscription_to_json(const CyclicSubscriptionInfo & info, const std::string & event_source); + /// Parse resource URI to extract entity type, entity id, collection, and resource path. + static tl::expected parse_resource_uri(const std::string & resource); + private: /// Build event_source URI from subscription info static std::string build_event_source(const CyclicSubscriptionInfo & info); @@ -71,15 +83,11 @@ class CyclicSubscriptionHandlers { /// Extract entity type string ("apps" or "components") from request path static std::string extract_entity_type(const httplib::Request & req); - /// Parse resource URI to extract topic name. Returns topic or error. - static tl::expected parse_resource_uri(const std::string & resource); - HandlerContext & ctx_; SubscriptionManager & sub_mgr_; - std::shared_ptr client_tracker_; - - /// Keepalive interval for SSE streams - static constexpr int kKeepaliveIntervalSec = 15; + ResourceSamplerRegistry & sampler_registry_; + TransportRegistry & transport_registry_; + int max_duration_sec_; }; } // namespace handlers diff --git a/src/ros2_medkit_gateway/include/ros2_medkit_gateway/http/handlers/sse_transport_provider.hpp b/src/ros2_medkit_gateway/include/ros2_medkit_gateway/http/handlers/sse_transport_provider.hpp new file mode 100644 index 00000000..9c7d9a43 --- /dev/null +++ b/src/ros2_medkit_gateway/include/ros2_medkit_gateway/http/handlers/sse_transport_provider.hpp @@ -0,0 +1,61 @@ +// Copyright 2026 bburda +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include + +#include "ros2_medkit_gateway/http/sse_client_tracker.hpp" +#include "ros2_medkit_gateway/subscription_transport.hpp" + +namespace ros2_medkit_gateway { + +/// Built-in SSE transport provider. Refactored from handle_events() logic. +/// +/// Lifetime: must outlive all HTTP connections. GatewayNode ensures this by +/// stopping the REST server (joining HTTP threads) before destroying transports. +class SseTransportProvider : public SubscriptionTransportProvider { + public: + SseTransportProvider(SubscriptionManager & sub_mgr, std::shared_ptr client_tracker); + + std::string protocol() const override { + return "sse"; + } + + tl::expected start(const CyclicSubscriptionInfo & info, ResourceSamplerFn json_sampler, + GatewayNode * node) override; + + void notify_update(const std::string & sub_id) override; + void stop(const std::string & sub_id) override; + + bool handle_client_connect(const std::string & sub_id, const httplib::Request & req, + httplib::Response & res) override; + + private: + struct StreamState { + ResourceSamplerFn sampler; + }; + + SubscriptionManager & sub_mgr_; + std::shared_ptr client_tracker_; + std::mutex mutex_; + std::unordered_map streams_; + + static constexpr int kKeepaliveIntervalSec = 15; +}; + +} // namespace ros2_medkit_gateway diff --git a/src/ros2_medkit_gateway/include/ros2_medkit_gateway/models/entity_types.hpp b/src/ros2_medkit_gateway/include/ros2_medkit_gateway/models/entity_types.hpp index 7f02c4b5..dbd72117 100644 --- a/src/ros2_medkit_gateway/include/ros2_medkit_gateway/models/entity_types.hpp +++ b/src/ros2_medkit_gateway/include/ros2_medkit_gateway/models/entity_types.hpp @@ -49,7 +49,8 @@ enum class ResourceCollection { LOCKS, ///< Lock resources (lifecycle states) MODES, ///< Mode resources (node modes) CYCLIC_SUBSCRIPTIONS, ///< Cyclic subscriptions (topic polling) - COMMUNICATION_LOGS, ///< Communication logs (/rosout filtered) + LOGS, ///< Application log entries (/rosout) + COMMUNICATION_LOGS, ///< Communication logs (CAN/UDS/DoIP - not implemented) TRIGGERS, ///< Trigger resources (event topics) SCRIPTS, ///< Script resources (not mapped in ROS 2) UPDATES ///< Update packages (not mapped in ROS 2) diff --git a/src/ros2_medkit_gateway/include/ros2_medkit_gateway/plugins/plugin_manager.hpp b/src/ros2_medkit_gateway/include/ros2_medkit_gateway/plugins/plugin_manager.hpp index 11d7f66b..93fbdf56 100644 --- a/src/ros2_medkit_gateway/include/ros2_medkit_gateway/plugins/plugin_manager.hpp +++ b/src/ros2_medkit_gateway/include/ros2_medkit_gateway/plugins/plugin_manager.hpp @@ -21,6 +21,8 @@ #include "ros2_medkit_gateway/providers/introspection_provider.hpp" #include "ros2_medkit_gateway/providers/log_provider.hpp" #include "ros2_medkit_gateway/providers/update_provider.hpp" +#include "ros2_medkit_gateway/resource_sampler.hpp" +#include "ros2_medkit_gateway/subscription_transport.hpp" #include #include @@ -98,6 +100,15 @@ class PluginManager { */ void register_routes(httplib::Server & server, const std::string & api_prefix); + /// Register a resource sampler for a vendor collection (must start with "x-") + void register_resource_sampler(const std::string & collection, ResourceSamplerFn fn); + + /// Register a custom transport provider + void register_transport(std::unique_ptr provider); + + /// Set registries (called during gateway init) + void set_registries(ResourceSamplerRegistry & samplers, TransportRegistry & transports); + /** * @brief Shutdown all plugins */ @@ -163,6 +174,8 @@ class PluginManager { PluginContext * context_ = nullptr; UpdateProvider * first_update_provider_ = nullptr; LogProvider * first_log_provider_ = nullptr; + ResourceSamplerRegistry * sampler_registry_ = nullptr; + TransportRegistry * transport_registry_ = nullptr; bool shutdown_called_ = false; mutable std::shared_mutex plugins_mutex_; }; diff --git a/src/ros2_medkit_gateway/include/ros2_medkit_gateway/resource_sampler.hpp b/src/ros2_medkit_gateway/include/ros2_medkit_gateway/resource_sampler.hpp new file mode 100644 index 00000000..78022dad --- /dev/null +++ b/src/ros2_medkit_gateway/include/ros2_medkit_gateway/resource_sampler.hpp @@ -0,0 +1,44 @@ +// Copyright 2026 bburda +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include + +#include +#include + +namespace ros2_medkit_gateway { + +/// Callback: (entity_id, resource_path) -> JSON payload or error string. +using ResourceSamplerFn = std::function(const std::string & entity_id, + const std::string & resource_path)>; + +/// Thread-safe registry mapping collection names to sampling functions. +class ResourceSamplerRegistry { + public: + void register_sampler(const std::string & collection, ResourceSamplerFn fn, bool is_builtin = false); + std::optional get_sampler(const std::string & collection) const; + bool has_sampler(const std::string & collection) const; + + private: + mutable std::shared_mutex mutex_; + std::unordered_map samplers_; +}; + +} // namespace ros2_medkit_gateway diff --git a/src/ros2_medkit_gateway/include/ros2_medkit_gateway/subscription_manager.hpp b/src/ros2_medkit_gateway/include/ros2_medkit_gateway/subscription_manager.hpp index f8104230..389baccb 100644 --- a/src/ros2_medkit_gateway/include/ros2_medkit_gateway/subscription_manager.hpp +++ b/src/ros2_medkit_gateway/include/ros2_medkit_gateway/subscription_manager.hpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -46,8 +47,9 @@ struct CyclicSubscriptionInfo { std::string entity_id; std::string entity_type; // "apps" or "components" std::string resource_uri; - std::string topic_name; - std::string protocol; // Always "sse" for now + std::string collection; // "data", "faults", "configurations", "x-medkit-*" + std::string resource_path; // path within collection (may be empty) + std::string protocol; // Always "sse" for now CyclicInterval interval{CyclicInterval::NORMAL}; int duration_sec{0}; std::chrono::steady_clock::time_point created_at; @@ -85,7 +87,8 @@ class SubscriptionManager { /// Create a new subscription. Returns subscription info or error string. tl::expected create(const std::string & entity_id, const std::string & entity_type, const std::string & resource_uri, - const std::string & topic_name, const std::string & protocol, CyclicInterval interval, int duration_sec); + const std::string & collection, const std::string & resource_path, const std::string & protocol, + CyclicInterval interval, int duration_sec); /// Get a subscription by ID std::optional get(const std::string & sub_id) const; @@ -112,6 +115,10 @@ class SubscriptionManager { /// Signal shutdown — wakes all waiting streams void shutdown(); + /// Set callback invoked when a subscription is removed (remove, cleanup_expired, shutdown). + /// MUST be called during initialization only, before any concurrent access. + void set_on_removed(std::function callback); + /** * @brief Wait for an update or removal on the given subscription. * @@ -134,6 +141,7 @@ class SubscriptionManager { size_t max_subscriptions_; std::atomic next_id_{1}; std::atomic shutdown_flag_{false}; + std::function on_removed_; }; } // namespace ros2_medkit_gateway diff --git a/src/ros2_medkit_gateway/include/ros2_medkit_gateway/subscription_transport.hpp b/src/ros2_medkit_gateway/include/ros2_medkit_gateway/subscription_transport.hpp new file mode 100644 index 00000000..a461e18a --- /dev/null +++ b/src/ros2_medkit_gateway/include/ros2_medkit_gateway/subscription_transport.hpp @@ -0,0 +1,90 @@ +// Copyright 2026 bburda +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +#include +#include +#include +#include + +#include + +#include "ros2_medkit_gateway/resource_sampler.hpp" +#include "ros2_medkit_gateway/subscription_manager.hpp" + +namespace ros2_medkit_gateway { + +class GatewayNode; + +/// Virtual interface for pluggable subscription transport protocols. +/// +/// SSE is the built-in default (SOVD-compliant). Non-SSE protocols +/// (MQTT, WebSocket, Zenoh) are vendor extensions. +class SubscriptionTransportProvider { + public: + virtual ~SubscriptionTransportProvider() = default; + + /// Protocol identifier: "sse" (standard), or vendor extension + virtual std::string protocol() const = 0; + + /// Start delivery for a subscription. Returns event_source URI for client. + virtual tl::expected start(const CyclicSubscriptionInfo & info, + ResourceSamplerFn json_sampler, GatewayNode * node) = 0; + + /// Subscription interval/duration changed. Transport must re-read info. + virtual void notify_update(const std::string & sub_id) = 0; + + /// Stop delivery for a subscription. + /// + /// For HTTP-based transports (SSE), the HTTP server thread join handles + /// synchronous stop - stop() removes state so the streaming loop exits + /// on its next is_active() check. GatewayNode::~GatewayNode() calls + /// stop_rest_server() before shutdown_all() to ensure this ordering. + /// + /// Non-HTTP transports MUST ensure all delivery threads have stopped + /// before returning from this method. + virtual void stop(const std::string & sub_id) = 0; + + /// Handle HTTP client connection (SSE: chunked provider, WebSocket: upgrade). + /// MQTT/Zenoh: return false (not HTTP-based). + virtual bool handle_client_connect(const std::string & sub_id, const httplib::Request & req, + httplib::Response & res) { + (void)sub_id; + (void)req; + (void)res; + return false; + } +}; + +/// Registry of transport providers keyed by protocol name. +class TransportRegistry { + public: + void register_transport(std::unique_ptr provider); + SubscriptionTransportProvider * get_transport(const std::string & protocol) const; + bool has_transport(const std::string & protocol) const; + + /// Shutdown all subscriptions by delegating to SubscriptionManager::shutdown(). + /// Transport cleanup happens via the manager's on_removed callback, which + /// calls SubscriptionTransportProvider::stop() for each active subscription. + void shutdown_all(SubscriptionManager & sub_mgr); + + private: + mutable std::shared_mutex mutex_; + std::unordered_map> transports_; +}; + +} // namespace ros2_medkit_gateway diff --git a/src/ros2_medkit_gateway/include/ros2_medkit_gateway/updates/update_types.hpp b/src/ros2_medkit_gateway/include/ros2_medkit_gateway/updates/update_types.hpp index 9b315c00..e320f008 100644 --- a/src/ros2_medkit_gateway/include/ros2_medkit_gateway/updates/update_types.hpp +++ b/src/ros2_medkit_gateway/include/ros2_medkit_gateway/updates/update_types.hpp @@ -91,4 +91,36 @@ class UpdateProgressReporter { std::mutex & mutex_; }; +/// Serialize UpdateStatusInfo to SOVD-compliant JSON +inline nlohmann::json update_status_to_json(const UpdateStatusInfo & status) { + nlohmann::json j; + switch (status.status) { + case UpdateStatus::Pending: + j["status"] = "pending"; + break; + case UpdateStatus::InProgress: + j["status"] = "inProgress"; + break; + case UpdateStatus::Completed: + j["status"] = "completed"; + break; + case UpdateStatus::Failed: + j["status"] = "failed"; + break; + } + if (status.progress.has_value()) { + j["progress"] = *status.progress; + } + if (status.sub_progress.has_value()) { + j["sub_progress"] = nlohmann::json::array(); + for (const auto & sp : *status.sub_progress) { + j["sub_progress"].push_back({{"name", sp.name}, {"progress", sp.progress}}); + } + } + if (status.error_message.has_value()) { + j["error"] = *status.error_message; + } + return j; +} + } // namespace ros2_medkit_gateway diff --git a/src/ros2_medkit_gateway/src/discovery/layers/runtime_layer.cpp b/src/ros2_medkit_gateway/src/discovery/layers/runtime_layer.cpp index 91714686..ff36ae55 100644 --- a/src/ros2_medkit_gateway/src/discovery/layers/runtime_layer.cpp +++ b/src/ros2_medkit_gateway/src/discovery/layers/runtime_layer.cpp @@ -56,6 +56,29 @@ size_t filter_by_namespace(std::vector & entities, const GapFillConfig & return before - entities.size(); } +// Extract namespace from a fully-qualified node name (e.g. "/ns/sub/node" -> "/ns/sub") +std::string namespace_from_fqn(const std::string & fqn) { + auto pos = fqn.rfind('/'); + if (pos == std::string::npos || pos == 0) { + return "/"; + } + return fqn.substr(0, pos); +} + +// Filter apps by namespace derived from bound_fqn +size_t filter_apps_by_namespace(std::vector & apps, const GapFillConfig & config) { + size_t before = apps.size(); + apps.erase(std::remove_if(apps.begin(), apps.end(), + [&config](const App & a) { + if (!a.bound_fqn.has_value()) { + return false; // Keep unbound apps + } + return !is_namespace_allowed(namespace_from_fqn(*a.bound_fqn), config); + }), + apps.end()); + return before - apps.size(); +} + } // namespace RuntimeLayer::RuntimeLayer(RuntimeDiscoveryStrategy * runtime_strategy) : runtime_strategy_(runtime_strategy) { @@ -94,6 +117,7 @@ LayerOutput RuntimeLayer::discover() { if (gap_fill_config_.allow_heuristic_apps) { output.apps = std::move(apps); + last_filtered_count_ += filter_apps_by_namespace(output.apps, gap_fill_config_); } if (gap_fill_config_.allow_heuristic_functions) { diff --git a/src/ros2_medkit_gateway/src/gateway_node.cpp b/src/ros2_medkit_gateway/src/gateway_node.cpp index 234e42b7..4f509bf8 100644 --- a/src/ros2_medkit_gateway/src/gateway_node.cpp +++ b/src/ros2_medkit_gateway/src/gateway_node.cpp @@ -20,6 +20,8 @@ #include #include +#include "ros2_medkit_gateway/http/handlers/sse_transport_provider.hpp" + using namespace std::chrono_literals; namespace ros2_medkit_gateway { @@ -89,6 +91,7 @@ GatewayNode::GatewayNode() : Node("ros2_medkit_gateway") { // SSE (Server-Sent Events) parameters declare_parameter("sse.max_clients", 10); // Limit concurrent SSE connections to prevent resource exhaustion declare_parameter("sse.max_subscriptions", 100); // Maximum active cyclic subscriptions across all entities + declare_parameter("sse.max_duration_sec", 3600); // Maximum subscription duration in seconds (1 hour default) // Log management parameters declare_parameter("logs.buffer_size", @@ -435,8 +438,17 @@ GatewayNode::GatewayNode() : Node("ros2_medkit_gateway") { subscription_mgr_ = std::make_unique(max_subscriptions); RCLCPP_INFO(get_logger(), "Subscription manager: max_subscriptions=%zu", max_subscriptions); + // Create SSE client tracker (shared between SseTransportProvider and SSEFaultHandler) + auto max_sse_clients = static_cast(get_parameter("sse.max_clients").as_int()); + sse_client_tracker_ = std::make_shared(max_sse_clients); + + // Initialize resource sampler and transport registries + sampler_registry_ = std::make_unique(); + transport_registry_ = std::make_unique(); + // Initialize plugin manager plugin_mgr_ = std::make_unique(); + plugin_mgr_->set_registries(*sampler_registry_, *transport_registry_); auto plugin_names = get_parameter("plugins").as_string_array(); if (!plugin_names.empty()) { std::vector configs; @@ -517,6 +529,152 @@ GatewayNode::GatewayNode() : Node("ros2_medkit_gateway") { } } + // Register built-in resource samplers + sampler_registry_->register_sampler( + "data", + [this](const std::string & /*entity_id*/, + const std::string & resource_path) -> tl::expected { + auto * dam = get_data_access_manager(); + if (!dam) { + return tl::make_unexpected(std::string("DataAccessManager not available")); + } + auto * native_sampler = dam->get_native_sampler(); + if (!native_sampler) { + return tl::make_unexpected(std::string("Native topic sampler unavailable")); + } + auto sample = native_sampler->sample_topic(resource_path, dam->get_topic_sample_timeout()); + if (sample.has_data && sample.data.has_value()) { + nlohmann::json payload; + payload["id"] = resource_path; + payload["data"] = *sample.data; + return payload; + } + return tl::make_unexpected(std::string("Topic data not available: " + resource_path)); + }, + true); + + sampler_registry_->register_sampler( + "faults", + [this](const std::string & entity_id, + const std::string & /*resource_path*/) -> tl::expected { + auto * fault_mgr = get_fault_manager(); + if (!fault_mgr) { + return tl::make_unexpected(std::string("FaultManager not available")); + } + // Determine source_id for fault filtering based on entity type + const auto & cache = get_thread_safe_cache(); + auto entity_ref = cache.find_entity(entity_id); + std::string source_id; + if (entity_ref) { + if (entity_ref->type == SovdEntityType::APP) { + auto app = cache.get_app(entity_id); + if (app) { + source_id = app->bound_fqn.value_or(""); + } + } else if (entity_ref->type == SovdEntityType::COMPONENT) { + auto comp = cache.get_component(entity_id); + if (comp) { + source_id = comp->namespace_path; + } + } + // AREA and FUNCTION: leave source_id empty (returns all faults) + // Guard against TOCTOU: entity found but vanished before get_app/get_component + if (source_id.empty() && entity_ref->type != SovdEntityType::AREA && + entity_ref->type != SovdEntityType::FUNCTION) { + return tl::make_unexpected(std::string("Entity no longer available: " + entity_id)); + } + } + auto result = fault_mgr->list_faults(source_id); + if (!result.success) { + return tl::make_unexpected(result.error_message); + } + return result.data; + }, + true); + + sampler_registry_->register_sampler( + "configurations", + [this](const std::string & entity_id, + const std::string & /*resource_path*/) -> tl::expected { + auto * config_mgr = get_configuration_manager(); + if (!config_mgr) { + return tl::make_unexpected(std::string("ConfigurationManager not available")); + } + const auto & cache = get_thread_safe_cache(); + auto configs = cache.get_entity_configurations(entity_id); + nlohmann::json items = nlohmann::json::array(); + for (const auto & node : configs.nodes) { + auto result = config_mgr->list_parameters(node.node_fqn); + if (result.success && result.data.is_array()) { + for (auto & param : result.data) { + items.push_back(std::move(param)); + } + } + } + nlohmann::json payload; + payload["items"] = std::move(items); + return payload; + }, + true); + + sampler_registry_->register_sampler( + "logs", + [this](const std::string & entity_id, + const std::string & /*resource_path*/) -> tl::expected { + auto * log_mgr = get_log_manager(); + if (!log_mgr) { + return tl::make_unexpected(std::string("LogManager not available")); + } + const auto & cache = get_thread_safe_cache(); + auto configs = cache.get_entity_configurations(entity_id); + std::vector node_fqns; + node_fqns.reserve(configs.nodes.size()); + for (const auto & node : configs.nodes) { + node_fqns.push_back(node.node_fqn); + } + auto result = log_mgr->get_logs(node_fqns, false, "", "", entity_id); + if (!result.has_value()) { + return tl::make_unexpected(result.error()); + } + nlohmann::json payload; + payload["items"] = std::move(*result); + return payload; + }, + true); + + // Register update status sampler (server-level, uses UpdateManager) + if (update_mgr_) { + sampler_registry_->register_sampler( + "updates", + [this](const std::string & /*entity_id*/, + const std::string & resource_path) -> tl::expected { + auto * mgr = get_update_manager(); + if (!mgr || !mgr->has_backend()) { + return tl::make_unexpected(std::string("Update backend not available")); + } + auto result = mgr->get_status(resource_path); + if (!result) { + return tl::make_unexpected(result.error().message); + } + return update_status_to_json(*result); + }, + true); + } + + RCLCPP_INFO(get_logger(), "Registered built-in resource samplers: data, faults, configurations, logs%s", + update_mgr_ ? ", updates" : ""); + + // Register built-in SSE transport + transport_registry_->register_transport( + std::make_unique(*subscription_mgr_, sse_client_tracker_)); + + // Wire subscription lifecycle -> transport cleanup + subscription_mgr_->set_on_removed([this](const CyclicSubscriptionInfo & info) { + if (auto * transport = transport_registry_->get_transport(info.protocol)) { + transport->stop(info.id); + } + }); + // Connect topic sampler to discovery manager for component-topic mapping discovery_mgr_->set_topic_sampler(data_access_mgr_->get_native_sampler()); @@ -556,10 +714,18 @@ GatewayNode::GatewayNode() : Node("ros2_medkit_gateway") { GatewayNode::~GatewayNode() { RCLCPP_INFO(get_logger(), "Shutting down ROS 2 Medkit Gateway..."); + // 1. Stop REST server (kills HTTP connections, SSE streams exit) stop_rest_server(); + // 2. Shutdown subscriptions via transport registry (calls sub_mgr.shutdown(), + // which triggers on_removed -> transport->stop() for each active subscription) + if (transport_registry_) { + transport_registry_->shutdown_all(*subscription_mgr_); + } + // 3. Shutdown plugins if (plugin_mgr_) { plugin_mgr_->shutdown_all(); } + // 4. Normal member destruction (managers safe - all transports stopped) } const ThreadSafeEntityCache & GatewayNode::get_thread_safe_cache() const { @@ -606,6 +772,18 @@ PluginManager * GatewayNode::get_plugin_manager() const { return plugin_mgr_.get(); } +ResourceSamplerRegistry * GatewayNode::get_sampler_registry() const { + return sampler_registry_.get(); +} + +TransportRegistry * GatewayNode::get_transport_registry() const { + return transport_registry_.get(); +} + +std::shared_ptr GatewayNode::get_sse_client_tracker() const { + return sse_client_tracker_; +} + void GatewayNode::refresh_cache() { RCLCPP_DEBUG(get_logger(), "Refreshing entity cache..."); diff --git a/src/ros2_medkit_gateway/src/http/handlers/cyclic_subscription_handlers.cpp b/src/ros2_medkit_gateway/src/http/handlers/cyclic_subscription_handlers.cpp index 60e263b1..69609bfc 100644 --- a/src/ros2_medkit_gateway/src/http/handlers/cyclic_subscription_handlers.cpp +++ b/src/ros2_medkit_gateway/src/http/handlers/cyclic_subscription_handlers.cpp @@ -14,16 +14,14 @@ #include "ros2_medkit_gateway/http/handlers/cyclic_subscription_handlers.hpp" -#include -#include #include -#include -#include "ros2_medkit_gateway/data_access_manager.hpp" +#include + #include "ros2_medkit_gateway/gateway_node.hpp" #include "ros2_medkit_gateway/http/error_codes.hpp" #include "ros2_medkit_gateway/http/http_utils.hpp" -#include "ros2_medkit_gateway/native_topic_sampler.hpp" +#include "ros2_medkit_gateway/models/entity_types.hpp" using json = nlohmann::json; @@ -31,8 +29,13 @@ namespace ros2_medkit_gateway { namespace handlers { CyclicSubscriptionHandlers::CyclicSubscriptionHandlers(HandlerContext & ctx, SubscriptionManager & sub_mgr, - std::shared_ptr client_tracker) - : ctx_(ctx), sub_mgr_(sub_mgr), client_tracker_(std::move(client_tracker)) { + ResourceSamplerRegistry & sampler_registry, + TransportRegistry & transport_registry, int max_duration_sec) + : ctx_(ctx) + , sub_mgr_(sub_mgr) + , sampler_registry_(sampler_registry) + , transport_registry_(transport_registry) + , max_duration_sec_(max_duration_sec) { } // --------------------------------------------------------------------------- @@ -77,11 +80,14 @@ void CyclicSubscriptionHandlers::handle_create(const httplib::Request & req, htt std::string protocol = "sse"; if (body.contains("protocol")) { protocol = body["protocol"].get(); - if (protocol != "sse") { - HandlerContext::send_error(res, 400, ERR_INVALID_PARAMETER, "Unsupported protocol. Only 'sse' is supported.", - {{"parameter", "protocol"}, {"value", protocol}}); - return; - } + } + + // Check transport is registered + auto * transport = transport_registry_.get_transport(protocol); + if (!transport) { + HandlerContext::send_error(res, 400, ERR_X_MEDKIT_UNSUPPORTED_PROTOCOL, "Protocol '" + protocol + "' not available", + {{"parameter", "protocol"}, {"value", protocol}}); + return; } // Parse interval @@ -102,36 +108,90 @@ void CyclicSubscriptionHandlers::handle_create(const httplib::Request & req, htt {{"parameter", "duration"}, {"value", duration}}); return; } + if (duration > max_duration_sec_) { + HandlerContext::send_error(res, 400, ERR_INVALID_PARAMETER, + "Duration must not exceed " + std::to_string(max_duration_sec_) + " seconds.", + {{"parameter", "duration"}, {"max_value", max_duration_sec_}}); + return; + } - // Parse resource URI to extract topic name + // Parse resource URI to extract collection and resource path std::string resource = body["resource"].get(); - auto topic_result = parse_resource_uri(resource); - if (!topic_result) { - HandlerContext::send_error(res, 400, ERR_INVALID_PARAMETER, "Invalid resource URI: " + topic_result.error(), + auto parsed = parse_resource_uri(resource); + if (!parsed) { + HandlerContext::send_error(res, 400, ERR_X_MEDKIT_INVALID_RESOURCE_URI, "Invalid resource URI: " + parsed.error(), {{"parameter", "resource"}, {"value", resource}}); return; } std::string entity_type = extract_entity_type(req); - // Validate resource URI references the same entity as the route - std::string expected_prefix = "/api/v1/" + entity_type + "/" + entity_id + "/data/"; - if (resource.find(expected_prefix) != 0) { - HandlerContext::send_error(res, 400, ERR_INVALID_PARAMETER, - "Resource URI must reference the same entity as the route", - {{"parameter", "resource"}, {"value", resource}}); + // Server-level resources (e.g. updates) skip entity-mismatch and collection checks + bool is_server_level = parsed->entity_type.empty(); + + if (!is_server_level) { + // Validate resource URI references the same entity as the route + if (parsed->entity_type != entity_type || parsed->entity_id != entity_id) { + HandlerContext::send_error(res, 400, ERR_X_MEDKIT_ENTITY_MISMATCH, + "Resource URI must reference the same entity as the route", + {{"parameter", "resource"}, {"value", resource}}); + return; + } + + // Validate collection support + auto known_collection = parse_resource_collection(parsed->collection); + if (known_collection.has_value()) { + // Known SOVD collection - check entity supports it + if (!entity->supports_collection(*known_collection)) { + HandlerContext::send_error(res, 400, ERR_X_MEDKIT_COLLECTION_NOT_SUPPORTED, + "Collection '" + parsed->collection + "' not supported for " + entity_type, + {{"collection", parsed->collection}, {"entity_type", entity_type}}); + return; + } + } else if (parsed->collection.size() < 2 || parsed->collection.substr(0, 2) != "x-") { + // Not a known collection and not a vendor extension + HandlerContext::send_error(res, 400, ERR_X_MEDKIT_INVALID_RESOURCE_URI, + "Unknown collection '" + parsed->collection + + "'. Use a known SOVD collection or 'x-' vendor extension.", + {{"collection", parsed->collection}}); + return; + } + + // Data collection requires a resource path (topic name) + if (parsed->collection == "data" && parsed->resource_path.empty()) { + HandlerContext::send_error(res, 400, ERR_X_MEDKIT_INVALID_RESOURCE_URI, + "Data collection requires a resource path (e.g. /api/v1/apps/{id}/data/{topic})", + {{"parameter", "resource"}, {"value", resource}}); + return; + } + } + + // Check sampler is registered + if (!sampler_registry_.has_sampler(parsed->collection)) { + HandlerContext::send_error(res, 400, ERR_X_MEDKIT_COLLECTION_NOT_AVAILABLE, + "No data provider for collection '" + parsed->collection + "'", + {{"collection", parsed->collection}}); return; } // Create subscription - auto result = sub_mgr_.create(entity_id, entity_type, resource, *topic_result, protocol, interval, duration); + auto result = sub_mgr_.create(entity_id, entity_type, resource, parsed->collection, parsed->resource_path, protocol, + interval, duration); if (!result) { HandlerContext::send_error(res, 503, ERR_SERVICE_UNAVAILABLE, result.error()); return; } - std::string event_source = build_event_source(*result); - auto response_json = subscription_to_json(*result, event_source); + // Start transport delivery + auto sampler = sampler_registry_.get_sampler(parsed->collection); + auto event_source_result = transport->start(*result, *sampler, ctx_.node()); + if (!event_source_result) { + sub_mgr_.remove(result->id); + HandlerContext::send_error(res, 503, ERR_SERVICE_UNAVAILABLE, event_source_result.error()); + return; + } + + auto response_json = subscription_to_json(*result, *event_source_result); res.status = 201; HandlerContext::send_json(res, response_json); @@ -222,6 +282,12 @@ void CyclicSubscriptionHandlers::handle_update(const httplib::Request & req, htt {{"parameter", "duration"}, {"value", *new_duration}}); return; } + if (*new_duration > max_duration_sec_) { + HandlerContext::send_error(res, 400, ERR_INVALID_PARAMETER, + "Duration must not exceed " + std::to_string(max_duration_sec_) + " seconds.", + {{"parameter", "duration"}, {"max_value", max_duration_sec_}}); + return; + } } // Verify subscription exists and belongs to this entity before updating @@ -272,7 +338,7 @@ void CyclicSubscriptionHandlers::handle_delete(const httplib::Request & req, htt } // --------------------------------------------------------------------------- -// GET /events — SSE stream +// GET /events — SSE stream (delegates to transport provider) // --------------------------------------------------------------------------- void CyclicSubscriptionHandlers::handle_events(const httplib::Request & req, httplib::Response & res) { auto entity_id = req.matches[1].str(); @@ -289,121 +355,30 @@ void CyclicSubscriptionHandlers::handle_events(const httplib::Request & req, htt return; } - // Verify subscription belongs to this entity if (sub->entity_id != entity_id) { HandlerContext::send_error(res, 404, ERR_RESOURCE_NOT_FOUND, "Subscription not found", {{"subscription_id", sub_id}}); return; } - // Reject SSE connections for expired or inactive subscriptions if (!sub_mgr_.is_active(sub_id)) { HandlerContext::send_error(res, 404, ERR_RESOURCE_NOT_FOUND, "Subscription expired or inactive", {{"subscription_id", sub_id}}); return; } - // Check combined SSE client limit (shared with fault streams) - if (!client_tracker_->try_connect()) { - RCLCPP_WARN(HandlerContext::logger(), - "SSE client limit reached (%zu), rejecting cyclic subscription stream from %s", - client_tracker_->max_clients(), req.remote_addr.c_str()); - HandlerContext::send_error(res, 503, ERR_SERVICE_UNAVAILABLE, - "Maximum number of SSE clients reached. Please try again later."); + auto * transport = transport_registry_.get_transport(sub->protocol); + if (!transport) { + HandlerContext::send_error(res, 400, ERR_X_MEDKIT_UNSUPPORTED_PROTOCOL, + "Transport for protocol '" + sub->protocol + "' not available", + {{"protocol", sub->protocol}}); return; } - RCLCPP_INFO(HandlerContext::logger(), "SSE cyclic subscription client connected from %s (%zu/%zu)", - req.remote_addr.c_str(), client_tracker_->connected_clients(), client_tracker_->max_clients()); - - // Set SSE headers (Content-Type set by set_chunked_content_provider below) - res.set_header("Cache-Control", "no-cache"); - res.set_header("Connection", "keep-alive"); - res.set_header("X-Accel-Buffering", "no"); - - auto captured_sub_id = sub->id; - auto captured_topic = sub->topic_name; - - res.set_chunked_content_provider( - "text/event-stream", - [this, captured_sub_id, captured_topic](size_t /*offset*/, httplib::DataSink & sink) { - auto keepalive_timeout = std::chrono::seconds(kKeepaliveIntervalSec); - auto last_write = std::chrono::steady_clock::now(); - - while (true) { - // Check if subscription is still active - if (!sub_mgr_.is_active(captured_sub_id)) { - return false; // Subscription expired or removed - } - - // Send keepalive if no data was written recently (e.g. after a slow sample_topic) - auto since_last_write = std::chrono::steady_clock::now() - last_write; - if (since_last_write >= keepalive_timeout) { - const char * keepalive = ":keepalive\n\n"; - if (!sink.write(keepalive, std::strlen(keepalive))) { - return false; // Client disconnected - } - last_write = std::chrono::steady_clock::now(); - } - - // Get current interval from subscription (may have been updated) - auto current_sub = sub_mgr_.get(captured_sub_id); - if (!current_sub) { - return false; // Removed - } - - auto sample_interval = interval_to_duration(current_sub->interval); - - // Sample the topic - auto * data_access_mgr = ctx_.node()->get_data_access_manager(); - auto * native_sampler = data_access_mgr->get_native_sampler(); - auto sample = native_sampler->sample_topic(captured_topic, data_access_mgr->get_topic_sample_timeout()); - - // Build EventEnvelope - json envelope; - - // UTC timestamp in ISO 8601 format - auto now = std::chrono::system_clock::now(); - auto time_t = std::chrono::system_clock::to_time_t(now); - auto ms = std::chrono::duration_cast(now.time_since_epoch()).count() % 1000; - std::ostringstream ts; - struct tm tm_buf; - gmtime_r(&time_t, &tm_buf); - ts << std::put_time(&tm_buf, "%Y-%m-%dT%H:%M:%S"); - ts << "." << std::setw(3) << std::setfill('0') << ms << "Z"; - envelope["timestamp"] = ts.str(); - - if (sample.has_data && sample.data.has_value()) { - json payload; - payload["id"] = captured_topic; - payload["data"] = *sample.data; - envelope["payload"] = payload; - } else { - // No data available — send error event - json error; - error["error_code"] = ERR_X_MEDKIT_ROS2_TOPIC_UNAVAILABLE; - error["message"] = "Topic data not available: " + captured_topic; - envelope["error"] = error; - } - - // Format SSE data frame - std::string sse_msg = "data: " + envelope.dump() + "\n\n"; - if (!sink.write(sse_msg.data(), sse_msg.size())) { - return false; // Client disconnected - } - last_write = std::chrono::steady_clock::now(); - - // Wait for interval or notification (update/delete/shutdown) - sub_mgr_.wait_for_update(captured_sub_id, sample_interval); - } - - return true; - }, - [this, captured_sub_id](bool /*success*/) { - client_tracker_->disconnect(); - RCLCPP_DEBUG(rclcpp::get_logger("rest_server"), "SSE cyclic subscription stream disconnected: %s", - captured_sub_id.c_str()); - }); + if (!transport->handle_client_connect(sub_id, req, res)) { + HandlerContext::send_error(res, 404, ERR_RESOURCE_NOT_FOUND, "Subscription stream not found", + {{"subscription_id", sub_id}}); + } } // --------------------------------------------------------------------------- @@ -426,25 +401,61 @@ std::string CyclicSubscriptionHandlers::build_event_source(const CyclicSubscript } std::string CyclicSubscriptionHandlers::extract_entity_type(const httplib::Request & req) { - // Path is like /api/v1/apps/{id}/cyclic-subscriptions or /api/v1/components/{id}/cyclic-subscriptions - if (req.path.find("/apps/") != std::string::npos) { - return "apps"; - } - if (req.path.find("/components/") != std::string::npos) { - return "components"; + auto type = extract_entity_type_from_path(req.path); + switch (type) { + case SovdEntityType::APP: + return "apps"; + case SovdEntityType::COMPONENT: + return "components"; + default: + RCLCPP_WARN(HandlerContext::logger(), "Unexpected entity type in cyclic subscription path: %s", req.path.c_str()); + return "apps"; } - return "apps"; // Default fallback } -tl::expected CyclicSubscriptionHandlers::parse_resource_uri(const std::string & resource) { - // Expected format: /api/v1/{entity_type}/{entity_id}/data/{topic_path} - // We need to extract the topic path (everything after /data/) - static const std::regex resource_regex(R"(^/api/v1/(?:apps|components)/[^/]+/data(/.*))"); +tl::expected +CyclicSubscriptionHandlers::parse_resource_uri(const std::string & resource) { + // Try entity-scoped format first: /api/v1/{entity_type}/{entity_id}/{collection}[/{resource_path}] + static const std::regex entity_regex(R"(^/api/v1/(apps|components)/([^/]+)/([^/]+)(/.*)?$)"); std::smatch match; - if (!std::regex_match(resource, match, resource_regex)) { - return tl::make_unexpected("Resource URI must match /api/v1/{apps|components}/{id}/data/{topic}"); + if (std::regex_match(resource, match, entity_regex)) { + ParsedResourceUri parsed; + parsed.entity_type = match[1].str(); + parsed.entity_id = match[2].str(); + parsed.collection = match[3].str(); + parsed.resource_path = match[4].matched ? match[4].str() : ""; + + // Security: reject '..' as a path segment (not as substring of e.g. '/..foo') + if (!parsed.resource_path.empty()) { + std::string path = parsed.resource_path; + size_t pos = 0; + while (pos < path.size()) { + size_t next = path.find('/', pos + 1); + std::string segment = (next == std::string::npos) ? path.substr(pos) : path.substr(pos, next - pos); + if (segment == "/.." || segment == "..") { + return tl::make_unexpected("Resource path must not contain '..' as a path segment"); + } + pos = (next == std::string::npos) ? path.size() : next; + } + } + + return parsed; + } + + // Try server-level update status: /api/v1/updates/{update-package-id}/status + static const std::regex update_status_regex(R"(^/api/v1/updates/([^/]+)/status$)"); + if (std::regex_match(resource, match, update_status_regex)) { + ParsedResourceUri parsed; + parsed.entity_type = ""; // server-level, no entity + parsed.entity_id = ""; + parsed.collection = "updates"; + parsed.resource_path = match[1].str(); // update package ID + return parsed; } - return match[1].str(); + + return tl::make_unexpected( + "Resource URI must match /api/v1/{apps|components}/{id}/{collection}[/{path}] " + "or /api/v1/updates/{id}/status"); } } // namespace handlers diff --git a/src/ros2_medkit_gateway/src/http/handlers/sse_transport_provider.cpp b/src/ros2_medkit_gateway/src/http/handlers/sse_transport_provider.cpp new file mode 100644 index 00000000..7e9f7889 --- /dev/null +++ b/src/ros2_medkit_gateway/src/http/handlers/sse_transport_provider.cpp @@ -0,0 +1,182 @@ +// Copyright 2026 bburda +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "ros2_medkit_gateway/http/handlers/sse_transport_provider.hpp" + +#include +#include +#include +#include + +#include + +#include "ros2_medkit_gateway/http/error_codes.hpp" +#include "ros2_medkit_gateway/http/handlers/handler_context.hpp" +#include "ros2_medkit_gateway/http/http_utils.hpp" + +using json = nlohmann::json; + +namespace ros2_medkit_gateway { + +SseTransportProvider::SseTransportProvider(SubscriptionManager & sub_mgr, + std::shared_ptr client_tracker) + : sub_mgr_(sub_mgr), client_tracker_(std::move(client_tracker)) { +} + +tl::expected SseTransportProvider::start(const CyclicSubscriptionInfo & info, + ResourceSamplerFn json_sampler, + GatewayNode * /*node*/) { + { + std::lock_guard lock(mutex_); + streams_[info.id] = StreamState{std::move(json_sampler)}; + } + + // SSE event_source is the events endpoint URL + return std::string(API_BASE_PATH) + "/" + info.entity_type + "/" + info.entity_id + "/cyclic-subscriptions/" + + info.id + "/events"; +} + +void SseTransportProvider::notify_update(const std::string & sub_id) { + // SSE transport re-reads from sub_mgr_ each iteration, so no-op here. + (void)sub_id; +} + +void SseTransportProvider::stop(const std::string & sub_id) { + std::lock_guard lock(mutex_); + streams_.erase(sub_id); + // The in-flight SSE streaming loop exits naturally because: + // 1. stop() is called from on_removed callback, after state->active is set to false + // 2. The loop checks sub_mgr_.is_active() each iteration, which returns false + // 3. Erasing from streams_ prevents new client connections for this subscription +} + +bool SseTransportProvider::handle_client_connect(const std::string & sub_id, const httplib::Request & req, + httplib::Response & res) { + ResourceSamplerFn sampler; + { + std::lock_guard lock(mutex_); + auto it = streams_.find(sub_id); + if (it == streams_.end()) { + return false; + } + sampler = it->second.sampler; + } + + // Check combined SSE client limit + if (!client_tracker_->try_connect()) { + RCLCPP_WARN(handlers::HandlerContext::logger(), + "SSE client limit reached (%zu), rejecting cyclic subscription stream from %s", + client_tracker_->max_clients(), req.remote_addr.c_str()); + handlers::HandlerContext::send_error(res, 503, ERR_SERVICE_UNAVAILABLE, + "Maximum number of SSE clients reached. Please try again later."); + return true; // We handled it (with an error) + } + + RCLCPP_INFO(handlers::HandlerContext::logger(), "SSE cyclic subscription client connected from %s (%zu/%zu)", + req.remote_addr.c_str(), client_tracker_->connected_clients(), client_tracker_->max_clients()); + + // Set SSE headers + res.set_header("Cache-Control", "no-cache"); + res.set_header("Connection", "keep-alive"); + res.set_header("X-Accel-Buffering", "no"); + + const auto & captured_sub_id = sub_id; + auto captured_sampler = std::move(sampler); + + res.set_chunked_content_provider( + "text/event-stream", + [this, captured_sub_id, captured_sampler](size_t, httplib::DataSink & sink) { + auto keepalive_timeout = std::chrono::seconds(kKeepaliveIntervalSec); + auto last_write = std::chrono::steady_clock::now(); + + while (true) { + // Check if subscription is still active + if (!sub_mgr_.is_active(captured_sub_id)) { + return false; + } + + // Send keepalive if no data was written recently + auto since_last_write = std::chrono::steady_clock::now() - last_write; + if (since_last_write >= keepalive_timeout) { + const char * keepalive = ":keepalive\n\n"; + if (!sink.write(keepalive, std::strlen(keepalive))) { + return false; + } + last_write = std::chrono::steady_clock::now(); + } + + // Get current subscription info (may have been updated via PUT) + auto current_sub = sub_mgr_.get(captured_sub_id); + if (!current_sub) { + return false; + } + + auto sample_interval = interval_to_duration(current_sub->interval); + + // Sample the resource via the registered sampler + json envelope; + + // UTC timestamp in ISO 8601 format + auto now = std::chrono::system_clock::now(); + auto time_t_val = std::chrono::system_clock::to_time_t(now); + auto ms = std::chrono::duration_cast(now.time_since_epoch()).count() % 1000; + std::ostringstream ts; + struct tm tm_buf; + gmtime_r(&time_t_val, &tm_buf); + ts << std::put_time(&tm_buf, "%Y-%m-%dT%H:%M:%S"); + ts << "." << std::setw(3) << std::setfill('0') << ms << "Z"; + envelope["timestamp"] = ts.str(); + + try { + auto sample_result = captured_sampler(current_sub->entity_id, current_sub->resource_path); + if (sample_result.has_value()) { + envelope["payload"] = *sample_result; + } else { + json error; + error["error_code"] = ERR_VENDOR_ERROR; + error["message"] = sample_result.error(); + envelope["error"] = error; + } + } catch (const std::exception & e) { + RCLCPP_WARN(rclcpp::get_logger("sse_transport"), "Sampler exception for sub %s: %s", + captured_sub_id.c_str(), e.what()); + json error; + error["error_code"] = ERR_INTERNAL_ERROR; + error["message"] = "Internal error during resource sampling"; + envelope["error"] = error; + } + + // Format SSE data frame + std::string sse_msg = "data: " + envelope.dump() + "\n\n"; + if (!sink.write(sse_msg.data(), sse_msg.size())) { + return false; + } + last_write = std::chrono::steady_clock::now(); + + // Wait for interval or notification + sub_mgr_.wait_for_update(captured_sub_id, sample_interval); + } + + return true; + }, + [this, captured_sub_id](bool) { + client_tracker_->disconnect(); + RCLCPP_DEBUG(rclcpp::get_logger("rest_server"), "SSE cyclic subscription stream disconnected: %s", + captured_sub_id.c_str()); + }); + + return true; +} + +} // namespace ros2_medkit_gateway diff --git a/src/ros2_medkit_gateway/src/http/handlers/update_handlers.cpp b/src/ros2_medkit_gateway/src/http/handlers/update_handlers.cpp index 1f9f81a5..731629f9 100644 --- a/src/ros2_medkit_gateway/src/http/handlers/update_handlers.cpp +++ b/src/ros2_medkit_gateway/src/http/handlers/update_handlers.cpp @@ -35,34 +35,7 @@ bool UpdateHandlers::check_backend(httplib::Response & res) { } json UpdateHandlers::status_to_json(const UpdateStatusInfo & status) { - json j; - switch (status.status) { - case UpdateStatus::Pending: - j["status"] = "pending"; - break; - case UpdateStatus::InProgress: - j["status"] = "inProgress"; - break; - case UpdateStatus::Completed: - j["status"] = "completed"; - break; - case UpdateStatus::Failed: - j["status"] = "failed"; - break; - } - if (status.progress.has_value()) { - j["progress"] = *status.progress; - } - if (status.sub_progress.has_value()) { - j["sub_progress"] = json::array(); - for (const auto & sp : *status.sub_progress) { - j["sub_progress"].push_back({{"name", sp.name}, {"progress", sp.progress}}); - } - } - if (status.error_message.has_value()) { - j["error"] = *status.error_message; - } - return j; + return update_status_to_json(status); } void UpdateHandlers::handle_list_updates(const httplib::Request & req, httplib::Response & res) { diff --git a/src/ros2_medkit_gateway/src/http/rest_server.cpp b/src/ros2_medkit_gateway/src/http/rest_server.cpp index 44c0ce6c..825d6f76 100644 --- a/src/ros2_medkit_gateway/src/http/rest_server.cpp +++ b/src/ros2_medkit_gateway/src/http/rest_server.cpp @@ -121,12 +121,17 @@ RESTServer::RESTServer(GatewayNode * node, const std::string & host, int port, c fault_handlers_ = std::make_unique(*handler_ctx_); log_handlers_ = std::make_unique(*handler_ctx_); auth_handlers_ = std::make_unique(*handler_ctx_); - auto max_sse_clients = static_cast(node_->get_parameter("sse.max_clients").as_int()); - sse_client_tracker_ = std::make_shared(max_sse_clients); + sse_client_tracker_ = node_->get_sse_client_tracker(); sse_fault_handler_ = std::make_unique(*handler_ctx_, sse_client_tracker_); bulkdata_handlers_ = std::make_unique(*handler_ctx_); + auto max_duration_sec = static_cast(node_->get_parameter("sse.max_duration_sec").as_int()); + if (max_duration_sec <= 0) { + RCLCPP_WARN(node_->get_logger(), "sse.max_duration_sec must be > 0, using default 3600"); + max_duration_sec = 3600; + } cyclic_sub_handlers_ = std::make_unique( - *handler_ctx_, *node_->get_subscription_manager(), sse_client_tracker_); + *handler_ctx_, *node_->get_subscription_manager(), *node_->get_sampler_registry(), + *node_->get_transport_registry(), max_duration_sec); if (node_->get_update_manager()) { update_handlers_ = std::make_unique(*handler_ctx_, node_->get_update_manager()); diff --git a/src/ros2_medkit_gateway/src/models/entity_capabilities.cpp b/src/ros2_medkit_gateway/src/models/entity_capabilities.cpp index 5f323f58..8e6bfd24 100644 --- a/src/ros2_medkit_gateway/src/models/entity_capabilities.cpp +++ b/src/ros2_medkit_gateway/src/models/entity_capabilities.cpp @@ -23,18 +23,10 @@ EntityCapabilities EntityCapabilities::for_type(SovdEntityType type) { case SovdEntityType::SERVER: // SERVER supports all collections caps.collections_ = { - ResourceCollection::CONFIGURATIONS, - ResourceCollection::DATA, - ResourceCollection::FAULTS, - ResourceCollection::OPERATIONS, - ResourceCollection::BULK_DATA, - ResourceCollection::DATA_LISTS, - ResourceCollection::LOCKS, - ResourceCollection::MODES, - ResourceCollection::CYCLIC_SUBSCRIPTIONS, - ResourceCollection::COMMUNICATION_LOGS, - ResourceCollection::TRIGGERS, - ResourceCollection::SCRIPTS, + ResourceCollection::CONFIGURATIONS, ResourceCollection::DATA, ResourceCollection::FAULTS, + ResourceCollection::OPERATIONS, ResourceCollection::BULK_DATA, ResourceCollection::DATA_LISTS, + ResourceCollection::LOCKS, ResourceCollection::MODES, ResourceCollection::CYCLIC_SUBSCRIPTIONS, + ResourceCollection::LOGS, ResourceCollection::TRIGGERS, ResourceCollection::SCRIPTS, ResourceCollection::UPDATES, }; // SERVER resources @@ -51,18 +43,10 @@ EntityCapabilities EntityCapabilities::for_type(SovdEntityType type) { case SovdEntityType::COMPONENT: // COMPONENT supports most collections caps.collections_ = { - ResourceCollection::CONFIGURATIONS, - ResourceCollection::DATA, - ResourceCollection::FAULTS, - ResourceCollection::OPERATIONS, - ResourceCollection::BULK_DATA, - ResourceCollection::DATA_LISTS, - ResourceCollection::LOCKS, - ResourceCollection::MODES, - ResourceCollection::CYCLIC_SUBSCRIPTIONS, - ResourceCollection::COMMUNICATION_LOGS, - ResourceCollection::TRIGGERS, - ResourceCollection::SCRIPTS, + ResourceCollection::CONFIGURATIONS, ResourceCollection::DATA, ResourceCollection::FAULTS, + ResourceCollection::OPERATIONS, ResourceCollection::BULK_DATA, ResourceCollection::DATA_LISTS, + ResourceCollection::LOCKS, ResourceCollection::MODES, ResourceCollection::CYCLIC_SUBSCRIPTIONS, + ResourceCollection::LOGS, ResourceCollection::TRIGGERS, ResourceCollection::SCRIPTS, ResourceCollection::UPDATES, }; caps.resources_ = {"docs", "logs", "hosts", "belongs-to", @@ -72,18 +56,10 @@ EntityCapabilities EntityCapabilities::for_type(SovdEntityType type) { case SovdEntityType::APP: // APP supports most collections caps.collections_ = { - ResourceCollection::CONFIGURATIONS, - ResourceCollection::DATA, - ResourceCollection::FAULTS, - ResourceCollection::OPERATIONS, - ResourceCollection::BULK_DATA, - ResourceCollection::DATA_LISTS, - ResourceCollection::LOCKS, - ResourceCollection::MODES, - ResourceCollection::CYCLIC_SUBSCRIPTIONS, - ResourceCollection::COMMUNICATION_LOGS, - ResourceCollection::TRIGGERS, - ResourceCollection::SCRIPTS, + ResourceCollection::CONFIGURATIONS, ResourceCollection::DATA, ResourceCollection::FAULTS, + ResourceCollection::OPERATIONS, ResourceCollection::BULK_DATA, ResourceCollection::DATA_LISTS, + ResourceCollection::LOCKS, ResourceCollection::MODES, ResourceCollection::CYCLIC_SUBSCRIPTIONS, + ResourceCollection::LOGS, ResourceCollection::TRIGGERS, ResourceCollection::SCRIPTS, ResourceCollection::UPDATES, }; caps.resources_ = {"docs", "logs", "is-located-on", "belongs-to", "depends-on", "data-categories", "data-groups"}; diff --git a/src/ros2_medkit_gateway/src/models/entity_types.cpp b/src/ros2_medkit_gateway/src/models/entity_types.cpp index 7217078c..9c476eab 100644 --- a/src/ros2_medkit_gateway/src/models/entity_types.cpp +++ b/src/ros2_medkit_gateway/src/models/entity_types.cpp @@ -58,6 +58,8 @@ std::string to_string(ResourceCollection col) { return "modes"; case ResourceCollection::CYCLIC_SUBSCRIPTIONS: return "cyclic-subscriptions"; + case ResourceCollection::LOGS: + return "logs"; case ResourceCollection::COMMUNICATION_LOGS: return "communication-logs"; case ResourceCollection::TRIGGERS: @@ -87,6 +89,7 @@ std::optional parse_resource_collection(const std::string & {"locks", ResourceCollection::LOCKS}, {"modes", ResourceCollection::MODES}, {"cyclic-subscriptions", ResourceCollection::CYCLIC_SUBSCRIPTIONS}, + {"logs", ResourceCollection::LOGS}, {"communication-logs", ResourceCollection::COMMUNICATION_LOGS}, {"triggers", ResourceCollection::TRIGGERS}, {"scripts", ResourceCollection::SCRIPTS}, diff --git a/src/ros2_medkit_gateway/src/plugins/plugin_manager.cpp b/src/ros2_medkit_gateway/src/plugins/plugin_manager.cpp index e0cb5a32..2e489078 100644 --- a/src/ros2_medkit_gateway/src/plugins/plugin_manager.cpp +++ b/src/ros2_medkit_gateway/src/plugins/plugin_manager.cpp @@ -207,6 +207,25 @@ void PluginManager::set_context(PluginContext & context) { } } +void PluginManager::set_registries(ResourceSamplerRegistry & samplers, TransportRegistry & transports) { + sampler_registry_ = &samplers; + transport_registry_ = &transports; +} + +void PluginManager::register_resource_sampler(const std::string & collection, ResourceSamplerFn fn) { + if (!sampler_registry_) { + throw std::runtime_error("Sampler registry not initialized"); + } + sampler_registry_->register_sampler(collection, std::move(fn)); +} + +void PluginManager::register_transport(std::unique_ptr provider) { + if (!transport_registry_) { + throw std::runtime_error("Transport registry not initialized"); + } + transport_registry_->register_transport(std::move(provider)); +} + void PluginManager::register_routes(httplib::Server & server, const std::string & api_prefix) { std::unique_lock lock(plugins_mutex_); for (auto & lp : plugins_) { diff --git a/src/ros2_medkit_gateway/src/resource_sampler.cpp b/src/ros2_medkit_gateway/src/resource_sampler.cpp new file mode 100644 index 00000000..2e7ede66 --- /dev/null +++ b/src/ros2_medkit_gateway/src/resource_sampler.cpp @@ -0,0 +1,50 @@ +// Copyright 2026 bburda +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "ros2_medkit_gateway/resource_sampler.hpp" + +#include + +namespace ros2_medkit_gateway { + +void ResourceSamplerRegistry::register_sampler(const std::string & collection, ResourceSamplerFn fn, bool is_builtin) { + std::unique_lock lock(mutex_); + + if (!is_builtin) { + if (collection.size() < 2 || collection.substr(0, 2) != "x-") { + throw std::runtime_error("Plugin sampler collection must use 'x-' prefix: " + collection); + } + if (samplers_.count(collection) > 0) { + throw std::runtime_error("Sampler already registered for collection: " + collection); + } + } + + samplers_[collection] = std::move(fn); +} + +std::optional ResourceSamplerRegistry::get_sampler(const std::string & collection) const { + std::shared_lock lock(mutex_); + auto it = samplers_.find(collection); + if (it == samplers_.end()) { + return std::nullopt; + } + return it->second; +} + +bool ResourceSamplerRegistry::has_sampler(const std::string & collection) const { + std::shared_lock lock(mutex_); + return samplers_.count(collection) > 0; +} + +} // namespace ros2_medkit_gateway diff --git a/src/ros2_medkit_gateway/src/subscription_manager.cpp b/src/ros2_medkit_gateway/src/subscription_manager.cpp index d0d5d480..a8941820 100644 --- a/src/ros2_medkit_gateway/src/subscription_manager.cpp +++ b/src/ros2_medkit_gateway/src/subscription_manager.cpp @@ -15,6 +15,7 @@ #include "ros2_medkit_gateway/subscription_manager.hpp" #include +#include #include #include #include @@ -74,8 +75,9 @@ std::string SubscriptionManager::generate_id() { tl::expected SubscriptionManager::create(const std::string & entity_id, const std::string & entity_type, - const std::string & resource_uri, const std::string & topic_name, - const std::string & protocol, CyclicInterval interval, int duration_sec) { + const std::string & resource_uri, const std::string & collection, + const std::string & resource_path, const std::string & protocol, CyclicInterval interval, + int duration_sec) { std::lock_guard lock(map_mutex_); if (subscriptions_.size() >= max_subscriptions_) { @@ -88,7 +90,8 @@ SubscriptionManager::create(const std::string & entity_id, const std::string & e state->info.entity_id = entity_id; state->info.entity_type = entity_type; state->info.resource_uri = resource_uri; - state->info.topic_name = topic_name; + state->info.collection = collection; + state->info.resource_path = resource_path; state->info.protocol = protocol; state->info.interval = interval; state->info.duration_sec = duration_sec; @@ -160,6 +163,7 @@ SubscriptionManager::update(const std::string & sub_id, std::optional state; + CyclicSubscriptionInfo info_copy; { std::lock_guard lock(map_mutex_); auto it = subscriptions_.find(sub_id); @@ -167,25 +171,32 @@ bool SubscriptionManager::remove(const std::string & sub_id) { return false; } state = it->second; + { + std::lock_guard sub_lock(state->mtx); + info_copy = state->info; + } subscriptions_.erase(it); } // Mark inactive and wake up any waiting SSE stream state->active.store(false); state->cv.notify_all(); + if (on_removed_) { + on_removed_(info_copy); + } return true; } size_t SubscriptionManager::cleanup_expired() { auto now = std::chrono::steady_clock::now(); - std::vector> expired_states; + std::vector, CyclicSubscriptionInfo>> expired; { std::lock_guard lock(map_mutex_); for (auto it = subscriptions_.begin(); it != subscriptions_.end();) { std::lock_guard sub_lock(it->second->mtx); if (it->second->info.expires_at <= now) { - expired_states.push_back(it->second); + expired.emplace_back(it->second, it->second->info); it = subscriptions_.erase(it); } else { ++it; @@ -194,12 +205,15 @@ size_t SubscriptionManager::cleanup_expired() { } // Notify expired streams outside the map lock - for (auto & state : expired_states) { + for (auto & [state, info] : expired) { state->active.store(false); state->cv.notify_all(); + if (on_removed_) { + on_removed_(info); + } } - return expired_states.size(); + return expired.size(); } size_t SubscriptionManager::active_count() const { @@ -213,11 +227,23 @@ size_t SubscriptionManager::max_subscriptions() const { void SubscriptionManager::shutdown() { shutdown_flag_.store(true); - - std::lock_guard lock(map_mutex_); - for (auto & [id, state] : subscriptions_) { - state->active.store(false); - state->cv.notify_all(); + std::vector all_infos; + { + std::lock_guard lock(map_mutex_); + for (auto it = subscriptions_.begin(); it != subscriptions_.end();) { + it->second->active.store(false); + it->second->cv.notify_all(); + if (on_removed_) { + std::lock_guard sub_lock(it->second->mtx); + all_infos.push_back(it->second->info); + } + it = subscriptions_.erase(it); + } + } + if (on_removed_) { + for (const auto & info : all_infos) { + on_removed_(info); + } } } @@ -273,4 +299,9 @@ void SubscriptionManager::notify(const std::string & sub_id) { state->cv.notify_all(); } +void SubscriptionManager::set_on_removed(std::function callback) { + assert(subscriptions_.empty() && "set_on_removed must be called before any subscriptions are created"); + on_removed_ = std::move(callback); +} + } // namespace ros2_medkit_gateway diff --git a/src/ros2_medkit_gateway/src/subscription_transport.cpp b/src/ros2_medkit_gateway/src/subscription_transport.cpp new file mode 100644 index 00000000..7e4df0cc --- /dev/null +++ b/src/ros2_medkit_gateway/src/subscription_transport.cpp @@ -0,0 +1,50 @@ +// Copyright 2026 bburda +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "ros2_medkit_gateway/subscription_transport.hpp" + +#include + +namespace ros2_medkit_gateway { + +void TransportRegistry::register_transport(std::unique_ptr provider) { + std::unique_lock lock(mutex_); + auto proto = provider->protocol(); + if (transports_.count(proto) > 0) { + throw std::runtime_error("Transport already registered for protocol: " + proto); + } + transports_[proto] = std::move(provider); +} + +SubscriptionTransportProvider * TransportRegistry::get_transport(const std::string & protocol) const { + std::shared_lock lock(mutex_); + auto it = transports_.find(protocol); + if (it == transports_.end()) { + return nullptr; + } + return it->second.get(); +} + +bool TransportRegistry::has_transport(const std::string & protocol) const { + std::shared_lock lock(mutex_); + return transports_.count(protocol) > 0; +} + +void TransportRegistry::shutdown_all(SubscriptionManager & sub_mgr) { + // Delegate to sub_mgr.shutdown() which triggers on_removed callbacks + // for each active subscription, causing transport->stop() for each. + sub_mgr.shutdown(); +} + +} // namespace ros2_medkit_gateway diff --git a/src/ros2_medkit_gateway/test/test_cyclic_subscription_handlers.cpp b/src/ros2_medkit_gateway/test/test_cyclic_subscription_handlers.cpp index 909b6ca4..efb1c659 100644 --- a/src/ros2_medkit_gateway/test/test_cyclic_subscription_handlers.cpp +++ b/src/ros2_medkit_gateway/test/test_cyclic_subscription_handlers.cpp @@ -25,8 +25,138 @@ using namespace ros2_medkit_gateway; using namespace ros2_medkit_gateway::handlers; using json = nlohmann::json; +// --- parse_resource_uri tests --- + +// @verifies REQ_INTEROP_089 +TEST(ParseResourceUriTest, DataCollectionWithTopic) { + auto result = CyclicSubscriptionHandlers::parse_resource_uri("/api/v1/apps/node1/data/temperature"); + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(result->entity_type, "apps"); + EXPECT_EQ(result->entity_id, "node1"); + EXPECT_EQ(result->collection, "data"); + EXPECT_EQ(result->resource_path, "/temperature"); +} + +TEST(ParseResourceUriTest, FaultsCollectionNoPath) { + auto result = CyclicSubscriptionHandlers::parse_resource_uri("/api/v1/components/ecu1/faults"); + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(result->entity_type, "components"); + EXPECT_EQ(result->entity_id, "ecu1"); + EXPECT_EQ(result->collection, "faults"); + EXPECT_EQ(result->resource_path, ""); +} + +TEST(ParseResourceUriTest, FaultsCollectionWithId) { + auto result = CyclicSubscriptionHandlers::parse_resource_uri("/api/v1/apps/node1/faults/fault_001"); + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(result->collection, "faults"); + EXPECT_EQ(result->resource_path, "/fault_001"); +} + +TEST(ParseResourceUriTest, ConfigurationsCollection) { + auto result = CyclicSubscriptionHandlers::parse_resource_uri("/api/v1/components/ecu1/configurations/param1"); + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(result->collection, "configurations"); + EXPECT_EQ(result->resource_path, "/param1"); +} + +TEST(ParseResourceUriTest, VendorExtensionCollection) { + auto result = CyclicSubscriptionHandlers::parse_resource_uri("/api/v1/apps/node1/x-medkit-metrics/cpu_usage"); + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(result->collection, "x-medkit-metrics"); + EXPECT_EQ(result->resource_path, "/cpu_usage"); +} + +TEST(ParseResourceUriTest, MultiSegmentResourcePath) { + auto result = CyclicSubscriptionHandlers::parse_resource_uri("/api/v1/apps/node1/data/parent/child/value"); + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(result->resource_path, "/parent/child/value"); +} + +TEST(ParseResourceUriTest, InvalidMissingCollection) { + auto result = CyclicSubscriptionHandlers::parse_resource_uri("/api/v1/apps/node1"); + EXPECT_FALSE(result.has_value()); +} + +TEST(ParseResourceUriTest, InvalidMalformedUri) { + auto result = CyclicSubscriptionHandlers::parse_resource_uri("/not/a/valid/uri"); + EXPECT_FALSE(result.has_value()); +} + +TEST(ParseResourceUriTest, InvalidFunctionEntityType) { + auto result = CyclicSubscriptionHandlers::parse_resource_uri("/api/v1/functions/func1/data/topic"); + EXPECT_FALSE(result.has_value()); +} + +TEST(ParseResourceUriTest, PathTraversalRejected) { + auto result = CyclicSubscriptionHandlers::parse_resource_uri("/api/v1/apps/node1/data/../../../etc/passwd"); + EXPECT_FALSE(result.has_value()); +} + +TEST(ParseResourceUriTest, PathTraversalInMiddleRejected) { + auto result = CyclicSubscriptionHandlers::parse_resource_uri("/api/v1/apps/node1/data/a/../b"); + EXPECT_FALSE(result.has_value()); +} + +TEST(ParseResourceUriTest, BenignDoubleDotInSegmentAllowed) { + // "/..foo" is not a traversal - '..' is part of a larger segment name + auto result = CyclicSubscriptionHandlers::parse_resource_uri("/api/v1/apps/node1/data/..foo"); + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(result->resource_path, "/..foo"); +} + +TEST(ParseResourceUriTest, PathTraversalAtEndRejected) { + auto result = CyclicSubscriptionHandlers::parse_resource_uri("/api/v1/apps/node1/data/a/.."); + EXPECT_FALSE(result.has_value()); +} + +TEST(ParseResourceUriTest, DataCollectionEmptyResourcePath) { + // data collection without a topic path - still parses, but handler rejects it + auto result = CyclicSubscriptionHandlers::parse_resource_uri("/api/v1/apps/node1/data"); + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(result->collection, "data"); + EXPECT_EQ(result->resource_path, ""); +} + +// --- parse_resource_uri: server-level update status --- + +// @verifies REQ_INTEROP_089 +TEST(ParseResourceUriTest, UpdateStatusUri) { + auto result = CyclicSubscriptionHandlers::parse_resource_uri("/api/v1/updates/my-package/status"); + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(result->entity_type, ""); + EXPECT_EQ(result->entity_id, ""); + EXPECT_EQ(result->collection, "updates"); + EXPECT_EQ(result->resource_path, "my-package"); +} + +TEST(ParseResourceUriTest, UpdateStatusUriWithHyphenatedId) { + auto result = CyclicSubscriptionHandlers::parse_resource_uri("/api/v1/updates/ADAS-v2-03-2154/status"); + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(result->collection, "updates"); + EXPECT_EQ(result->resource_path, "ADAS-v2-03-2154"); +} + +TEST(ParseResourceUriTest, UpdateStatusUriMissingStatus) { + // /api/v1/updates/{id} without /status is not a subscribable resource + auto result = CyclicSubscriptionHandlers::parse_resource_uri("/api/v1/updates/my-package"); + EXPECT_FALSE(result.has_value()); +} + +TEST(ParseResourceUriTest, UpdateStatusUriMissingId) { + auto result = CyclicSubscriptionHandlers::parse_resource_uri("/api/v1/updates//status"); + EXPECT_FALSE(result.has_value()); +} + +TEST(ParseResourceUriTest, UpdatesListNotSubscribable) { + // /api/v1/updates (list endpoint) is not subscribable + auto result = CyclicSubscriptionHandlers::parse_resource_uri("/api/v1/updates"); + EXPECT_FALSE(result.has_value()); +} + // --- subscription_to_json --- +// @verifies REQ_INTEROP_089 TEST(CyclicSubscriptionJsonTest, ContainsAllRequiredFields) { CyclicSubscriptionInfo info; info.id = "sub_001"; @@ -60,6 +190,28 @@ TEST(CyclicSubscriptionJsonTest, AllIntervalValuesSerialize) { } } +// @verifies REQ_INTEROP_089 +TEST(CyclicSubscriptionJsonTest, ServerLevelUpdateResource) { + CyclicSubscriptionInfo info; + info.id = "sub_updates_001"; + info.entity_id = "temp_sensor"; + info.entity_type = "apps"; + info.resource_uri = "/api/v1/updates/ADAS-v2/status"; + info.collection = "updates"; + info.resource_path = "ADAS-v2"; + info.protocol = "sse"; + info.interval = CyclicInterval::SLOW; + + std::string event_source = "/api/v1/apps/temp_sensor/cyclic-subscriptions/sub_updates_001/events"; + auto j = CyclicSubscriptionHandlers::subscription_to_json(info, event_source); + + EXPECT_EQ(j["id"], "sub_updates_001"); + EXPECT_EQ(j["observed_resource"], "/api/v1/updates/ADAS-v2/status"); + EXPECT_EQ(j["event_source"], event_source); + EXPECT_EQ(j["protocol"], "sse"); + EXPECT_EQ(j["interval"], "slow"); +} + // --- Error response format (via HandlerContext static helpers) --- TEST(CyclicSubscriptionErrorTest, InvalidParameterErrorFormat) { diff --git a/src/ros2_medkit_gateway/test/test_resource_sampler_registry.cpp b/src/ros2_medkit_gateway/test/test_resource_sampler_registry.cpp new file mode 100644 index 00000000..d99c75ea --- /dev/null +++ b/src/ros2_medkit_gateway/test/test_resource_sampler_registry.cpp @@ -0,0 +1,152 @@ +// Copyright 2026 bburda +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include + +#include "ros2_medkit_gateway/resource_sampler.hpp" + +using namespace ros2_medkit_gateway; + +TEST(ResourceSamplerRegistryTest, RegisterAndLookupBuiltinSampler) { + ResourceSamplerRegistry registry; + registry.register_sampler( + "data", + [](const std::string &, const std::string &) -> tl::expected { + return nlohmann::json{{"test", true}}; + }, + true); + + EXPECT_TRUE(registry.has_sampler("data")); + auto sampler = registry.get_sampler("data"); + ASSERT_TRUE(sampler.has_value()); + auto result = (*sampler)("entity1", "/topic"); + ASSERT_TRUE(result.has_value()); + EXPECT_EQ((*result)["test"], true); +} + +TEST(ResourceSamplerRegistryTest, RegisterAndLookupVendorExtensionSampler) { + ResourceSamplerRegistry registry; + registry.register_sampler("x-medkit-metrics", + [](const std::string &, const std::string &) -> tl::expected { + return nlohmann::json{{"cpu", 42}}; + }); + + EXPECT_TRUE(registry.has_sampler("x-medkit-metrics")); +} + +TEST(ResourceSamplerRegistryTest, HasSamplerReturnsFalseForUnregistered) { + ResourceSamplerRegistry registry; + EXPECT_FALSE(registry.has_sampler("nonexistent")); + EXPECT_FALSE(registry.get_sampler("nonexistent").has_value()); +} + +TEST(ResourceSamplerRegistryTest, BuiltinOverwritesExisting) { + ResourceSamplerRegistry registry; + registry.register_sampler( + "data", + [](const std::string &, const std::string &) -> tl::expected { + return nlohmann::json{{"version", 1}}; + }, + true); + registry.register_sampler( + "data", + [](const std::string &, const std::string &) -> tl::expected { + return nlohmann::json{{"version", 2}}; + }, + true); + + auto sampler = registry.get_sampler("data"); + ASSERT_TRUE(sampler.has_value()); + auto result = (*sampler)("e", ""); + ASSERT_TRUE(result.has_value()); + EXPECT_EQ((*result)["version"], 2); +} + +TEST(ResourceSamplerRegistryTest, RejectDuplicatePluginRegistration) { + ResourceSamplerRegistry registry; + registry.register_sampler("x-medkit-metrics", + [](const std::string &, const std::string &) -> tl::expected { + return nlohmann::json{}; + }); + + EXPECT_THROW(registry.register_sampler( + "x-medkit-metrics", + [](const std::string &, const std::string &) -> tl::expected { + return nlohmann::json{}; + }), + std::runtime_error); +} + +TEST(ResourceSamplerRegistryTest, RejectPluginRegistrationWithoutXPrefix) { + ResourceSamplerRegistry registry; + EXPECT_THROW(registry.register_sampler( + "data", + [](const std::string &, const std::string &) -> tl::expected { + return nlohmann::json{}; + }), + std::runtime_error); +} + +TEST(ResourceSamplerRegistryTest, ConcurrentRegisterAndLookup) { + ResourceSamplerRegistry registry; + std::vector threads; + threads.reserve(10); + + for (int i = 0; i < 10; ++i) { + threads.emplace_back([®istry, i]() { + registry.register_sampler( + "collection_" + std::to_string(i), + [i](const std::string &, const std::string &) -> tl::expected { + return nlohmann::json{{"i", i}}; + }, + true); + }); + } + for (auto & t : threads) { + t.join(); + } + + for (int i = 0; i < 10; ++i) { + EXPECT_TRUE(registry.has_sampler("collection_" + std::to_string(i))); + } +} + +TEST(ResourceSamplerRegistryTest, UpdatesSamplerRegisteredAsBuiltin) { + ResourceSamplerRegistry registry; + registry.register_sampler( + "updates", + [](const std::string & /*entity_id*/, + const std::string & resource_path) -> tl::expected { + if (resource_path == "known-pkg") { + return nlohmann::json{{"status", "inProgress"}, {"progress", 42}}; + } + return tl::make_unexpected(std::string("Update not found: " + resource_path)); + }, + true); + + EXPECT_TRUE(registry.has_sampler("updates")); + auto sampler = registry.get_sampler("updates"); + ASSERT_TRUE(sampler.has_value()); + + auto result = (*sampler)("any-entity", "known-pkg"); + ASSERT_TRUE(result.has_value()); + EXPECT_EQ((*result)["status"], "inProgress"); + EXPECT_EQ((*result)["progress"], 42); + + auto err = (*sampler)("any-entity", "unknown-pkg"); + ASSERT_FALSE(err.has_value()); +} diff --git a/src/ros2_medkit_gateway/test/test_sse_transport_provider.cpp b/src/ros2_medkit_gateway/test/test_sse_transport_provider.cpp new file mode 100644 index 00000000..38b14ea2 --- /dev/null +++ b/src/ros2_medkit_gateway/test/test_sse_transport_provider.cpp @@ -0,0 +1,77 @@ +// Copyright 2026 bburda +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include + +#include "ros2_medkit_gateway/http/handlers/sse_transport_provider.hpp" +#include "ros2_medkit_gateway/subscription_manager.hpp" + +using namespace ros2_medkit_gateway; + +class SseTransportProviderTest : public ::testing::Test { + protected: + SubscriptionManager mgr_{10}; + std::shared_ptr tracker_ = std::make_shared(5); + SseTransportProvider provider_{mgr_, tracker_}; +}; + +// @verifies REQ_INTEROP_090 +TEST_F(SseTransportProviderTest, ProtocolReturnsSse) { + EXPECT_EQ(provider_.protocol(), "sse"); +} + +// @verifies REQ_INTEROP_090 +TEST_F(SseTransportProviderTest, StartReturnsEventsUrl) { + CyclicSubscriptionInfo info; + info.id = "sub_001"; + info.entity_type = "apps"; + info.entity_id = "node1"; + + ResourceSamplerFn sampler = [](const std::string &, const std::string &) { + return nlohmann::json{{"value", 42}}; + }; + + auto result = provider_.start(info, sampler, nullptr); + ASSERT_TRUE(result.has_value()); + EXPECT_NE(result->find("/apps/node1/cyclic-subscriptions/sub_001/events"), std::string::npos); +} + +// @verifies REQ_INTEROP_090 +TEST_F(SseTransportProviderTest, StopRemovesStream) { + CyclicSubscriptionInfo info; + info.id = "sub_002"; + info.entity_type = "components"; + info.entity_id = "ecu1"; + + ResourceSamplerFn sampler = [](const std::string &, const std::string &) { + return nlohmann::json{}; + }; + + auto result = provider_.start(info, sampler, nullptr); + ASSERT_TRUE(result.has_value()); + + // stop should not throw even if subscription doesn't exist in mgr + provider_.stop("sub_002"); + + // Calling stop again on already-removed stream should be a no-op + provider_.stop("sub_002"); +} + +TEST_F(SseTransportProviderTest, NotifyUpdateIsNoOp) { + // SSE transport re-reads each iteration, so notify_update is a no-op + // Just verify it doesn't crash + provider_.notify_update("nonexistent"); +} diff --git a/src/ros2_medkit_gateway/test/test_subscription_manager.cpp b/src/ros2_medkit_gateway/test/test_subscription_manager.cpp index 51ff7655..dc4f1220 100644 --- a/src/ros2_medkit_gateway/test/test_subscription_manager.cpp +++ b/src/ros2_medkit_gateway/test/test_subscription_manager.cpp @@ -24,6 +24,7 @@ using namespace ros2_medkit_gateway; // --- Interval parsing edge cases --- +// @verifies REQ_INTEROP_089 TEST(IntervalParsingTest, ParsesValidIntervals) { EXPECT_EQ(parse_interval("fast"), CyclicInterval::FAST); EXPECT_EQ(parse_interval("normal"), CyclicInterval::NORMAL); @@ -51,6 +52,38 @@ TEST(IntervalParsingTest, DurationsArePositive) { EXPECT_LT(interval_to_duration(CyclicInterval::NORMAL), interval_to_duration(CyclicInterval::SLOW)); } +// --- Get/list --- + +TEST(SubscriptionManagerGetTest, GetReturnsCreatedSubscription) { + SubscriptionManager mgr(10); + auto created = mgr.create("e1", "apps", "/api/v1/apps/e1/data/t", "data", "/t", "sse", CyclicInterval::NORMAL, 300); + ASSERT_TRUE(created.has_value()); + + auto fetched = mgr.get(created->id); + ASSERT_TRUE(fetched.has_value()); + EXPECT_EQ(fetched->id, created->id); + EXPECT_EQ(fetched->entity_id, "e1"); + EXPECT_EQ(fetched->entity_type, "apps"); + EXPECT_EQ(fetched->collection, "data"); + EXPECT_EQ(fetched->resource_path, "/t"); + EXPECT_EQ(fetched->protocol, "sse"); + EXPECT_EQ(fetched->interval, CyclicInterval::NORMAL); + EXPECT_EQ(fetched->duration_sec, 300); +} + +TEST(SubscriptionManagerGetTest, GetReturnsNulloptForNonexistent) { + SubscriptionManager mgr(10); + EXPECT_FALSE(mgr.get("nonexistent").has_value()); +} + +TEST(SubscriptionManagerGetTest, GetReturnsNulloptAfterRemove) { + SubscriptionManager mgr(10); + auto created = mgr.create("e1", "apps", "/r", "data", "/t", "sse", CyclicInterval::FAST, 60); + ASSERT_TRUE(created.has_value()); + mgr.remove(created->id); + EXPECT_FALSE(mgr.get(created->id).has_value()); +} + // --- Capacity enforcement --- class SubscriptionManagerTest : public ::testing::Test { @@ -60,31 +93,31 @@ class SubscriptionManagerTest : public ::testing::Test { TEST_F(SubscriptionManagerTest, CreateFailsAtCapacity) { for (int i = 0; i < 5; ++i) { - auto r = mgr_.create("e", "apps", "/r", "/t", "sse", CyclicInterval::FAST, 60); + auto r = mgr_.create("e", "apps", "/r", "data", "/t", "sse", CyclicInterval::FAST, 60); ASSERT_TRUE(r.has_value()) << "Subscription " << i << " should succeed"; } - auto r = mgr_.create("e", "apps", "/r", "/t", "sse", CyclicInterval::FAST, 60); + auto r = mgr_.create("e", "apps", "/r", "data", "/t", "sse", CyclicInterval::FAST, 60); ASSERT_FALSE(r.has_value()); EXPECT_NE(r.error().find("capacity"), std::string::npos); } TEST_F(SubscriptionManagerTest, CapacityFreedAfterRemove) { for (int i = 0; i < 5; ++i) { - (void)mgr_.create("e", "apps", "/r", "/t", "sse", CyclicInterval::FAST, 60); + (void)mgr_.create("e", "apps", "/r", "data", "/t", "sse", CyclicInterval::FAST, 60); } auto list = mgr_.list("e"); ASSERT_FALSE(list.empty()); mgr_.remove(list[0].id); - auto r = mgr_.create("e", "apps", "/r", "/t", "sse", CyclicInterval::FAST, 60); + auto r = mgr_.create("e", "apps", "/r", "data", "/t", "sse", CyclicInterval::FAST, 60); EXPECT_TRUE(r.has_value()) << "Should succeed after freeing a slot"; } // --- ID uniqueness --- TEST_F(SubscriptionManagerTest, GeneratesUniqueIds) { - auto r1 = mgr_.create("a", "apps", "/r1", "/t1", "sse", CyclicInterval::FAST, 60); - auto r2 = mgr_.create("a", "apps", "/r2", "/t2", "sse", CyclicInterval::FAST, 60); + auto r1 = mgr_.create("a", "apps", "/r1", "data", "/t1", "sse", CyclicInterval::FAST, 60); + auto r2 = mgr_.create("a", "apps", "/r2", "data", "/t2", "sse", CyclicInterval::FAST, 60); ASSERT_TRUE(r1.has_value()); ASSERT_TRUE(r2.has_value()); EXPECT_NE(r1->id, r2->id); @@ -93,8 +126,8 @@ TEST_F(SubscriptionManagerTest, GeneratesUniqueIds) { // --- Expiry edge cases --- TEST_F(SubscriptionManagerTest, CleanupExpiredRemovesOnlyExpired) { - (void)mgr_.create("a", "apps", "/r", "/t", "sse", CyclicInterval::FAST, 0); // expired immediately - (void)mgr_.create("b", "apps", "/r", "/t", "sse", CyclicInterval::FAST, 300); // still active + (void)mgr_.create("a", "apps", "/r", "data", "/t", "sse", CyclicInterval::FAST, 0); // expired immediately + (void)mgr_.create("b", "apps", "/r", "data", "/t", "sse", CyclicInterval::FAST, 300); // still active std::this_thread::sleep_for(std::chrono::milliseconds(10)); @@ -105,7 +138,7 @@ TEST_F(SubscriptionManagerTest, CleanupExpiredRemovesOnlyExpired) { } TEST_F(SubscriptionManagerTest, IsActiveReturnsFalseAfterExpiry) { - auto r = mgr_.create("a", "apps", "/r", "/t", "sse", CyclicInterval::FAST, 0); + auto r = mgr_.create("a", "apps", "/r", "data", "/t", "sse", CyclicInterval::FAST, 0); ASSERT_TRUE(r.has_value()); std::this_thread::sleep_for(std::chrono::milliseconds(10)); @@ -119,7 +152,7 @@ TEST_F(SubscriptionManagerTest, IsActiveReturnsFalseForNonexistent) { // --- Update edge cases --- TEST_F(SubscriptionManagerTest, UpdateOnlyIntervalLeavesOtherFieldsUnchanged) { - auto created = mgr_.create("a", "apps", "/r", "/t", "sse", CyclicInterval::NORMAL, 300); + auto created = mgr_.create("a", "apps", "/r", "data", "/t", "sse", CyclicInterval::NORMAL, 300); ASSERT_TRUE(created.has_value()); auto updated = mgr_.update(created->id, CyclicInterval::FAST, std::nullopt); @@ -131,7 +164,7 @@ TEST_F(SubscriptionManagerTest, UpdateOnlyIntervalLeavesOtherFieldsUnchanged) { } TEST_F(SubscriptionManagerTest, UpdateDurationExtendsExpiry) { - auto created = mgr_.create("a", "apps", "/r", "/t", "sse", CyclicInterval::NORMAL, 10); + auto created = mgr_.create("a", "apps", "/r", "data", "/t", "sse", CyclicInterval::NORMAL, 10); ASSERT_TRUE(created.has_value()); auto updated = mgr_.update(created->id, std::nullopt, 600); @@ -153,7 +186,7 @@ TEST(SubscriptionManagerConcurrencyTest, ConcurrentCreateIsThreadSafe) { for (int i = 0; i < 20; ++i) { threads.emplace_back([&mgr, &success_count, i]() { - auto r = mgr.create("entity_" + std::to_string(i), "apps", "/r", "/t", "sse", CyclicInterval::FAST, 60); + auto r = mgr.create("entity_" + std::to_string(i), "apps", "/r", "data", "/t", "sse", CyclicInterval::FAST, 60); if (r.has_value()) { success_count.fetch_add(1); } @@ -173,7 +206,7 @@ TEST(SubscriptionManagerConcurrencyTest, ConcurrentCreateRespectsCapacity) { for (int i = 0; i < 20; ++i) { threads.emplace_back([&mgr, &success_count, i]() { - auto r = mgr.create("e_" + std::to_string(i), "apps", "/r", "/t", "sse", CyclicInterval::FAST, 60); + auto r = mgr.create("e_" + std::to_string(i), "apps", "/r", "data", "/t", "sse", CyclicInterval::FAST, 60); if (r.has_value()) { success_count.fetch_add(1); } @@ -189,7 +222,7 @@ TEST(SubscriptionManagerConcurrencyTest, ConcurrentCreateRespectsCapacity) { TEST(SubscriptionManagerSyncTest, WaitForUpdateTimesOut) { SubscriptionManager mgr(10); - auto created = mgr.create("a", "apps", "/r", "/t", "sse", CyclicInterval::FAST, 300); + auto created = mgr.create("a", "apps", "/r", "data", "/t", "sse", CyclicInterval::FAST, 300); ASSERT_TRUE(created.has_value()); auto start = std::chrono::steady_clock::now(); @@ -202,7 +235,7 @@ TEST(SubscriptionManagerSyncTest, WaitForUpdateTimesOut) { TEST(SubscriptionManagerSyncTest, WaitForUpdateWokenByNotify) { SubscriptionManager mgr(10); - auto created = mgr.create("a", "apps", "/r", "/t", "sse", CyclicInterval::FAST, 300); + auto created = mgr.create("a", "apps", "/r", "data", "/t", "sse", CyclicInterval::FAST, 300); ASSERT_TRUE(created.has_value()); std::thread notifier([&mgr, id = created->id]() { @@ -217,7 +250,7 @@ TEST(SubscriptionManagerSyncTest, WaitForUpdateWokenByNotify) { TEST(SubscriptionManagerSyncTest, WaitForUpdateReturnsTrueIfRemoved) { SubscriptionManager mgr(10); - auto created = mgr.create("a", "apps", "/r", "/t", "sse", CyclicInterval::FAST, 300); + auto created = mgr.create("a", "apps", "/r", "data", "/t", "sse", CyclicInterval::FAST, 300); ASSERT_TRUE(created.has_value()); std::thread remover([&mgr, id = created->id]() { @@ -229,3 +262,46 @@ TEST(SubscriptionManagerSyncTest, WaitForUpdateReturnsTrueIfRemoved) { EXPECT_TRUE(notified); remover.join(); } + +// --- on_removed callback --- + +TEST(SubscriptionManagerCallbackTest, OnRemovedCalledOnRemove) { + SubscriptionManager mgr(10); + std::vector removed_ids; + mgr.set_on_removed([&](const CyclicSubscriptionInfo & info) { + removed_ids.push_back(info.id); + }); + auto created = mgr.create("a", "apps", "/r", "data", "/t", "sse", CyclicInterval::FAST, 300); + ASSERT_TRUE(created.has_value()); + mgr.remove(created->id); + ASSERT_EQ(removed_ids.size(), 1u); + EXPECT_EQ(removed_ids[0], created->id); +} + +TEST(SubscriptionManagerCallbackTest, OnRemovedCalledOnCleanupExpired) { + SubscriptionManager mgr(10); + std::vector removed_ids; + mgr.set_on_removed([&](const CyclicSubscriptionInfo & info) { + removed_ids.push_back(info.id); + }); + auto created = mgr.create("a", "apps", "/r", "data", "/t", "sse", CyclicInterval::FAST, 0); + ASSERT_TRUE(created.has_value()); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + mgr.cleanup_expired(); + ASSERT_EQ(removed_ids.size(), 1u); + EXPECT_EQ(removed_ids[0], created->id); +} + +TEST(SubscriptionManagerCallbackTest, OnRemovedCalledOnShutdown) { + SubscriptionManager mgr(10); + std::vector removed_ids; + mgr.set_on_removed([&](const CyclicSubscriptionInfo & info) { + removed_ids.push_back(info.id); + }); + auto r1 = mgr.create("a", "apps", "/r", "data", "/t", "sse", CyclicInterval::FAST, 300); + auto r2 = mgr.create("b", "apps", "/r", "faults", "", "sse", CyclicInterval::NORMAL, 300); + ASSERT_TRUE(r1.has_value()); + ASSERT_TRUE(r2.has_value()); + mgr.shutdown(); + EXPECT_EQ(removed_ids.size(), 2u); +} diff --git a/src/ros2_medkit_gateway/test/test_transport_registry.cpp b/src/ros2_medkit_gateway/test/test_transport_registry.cpp new file mode 100644 index 00000000..4701ae1a --- /dev/null +++ b/src/ros2_medkit_gateway/test/test_transport_registry.cpp @@ -0,0 +1,99 @@ +// Copyright 2026 bburda +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include +#include + +#include "ros2_medkit_gateway/subscription_transport.hpp" + +using namespace ros2_medkit_gateway; + +/// Mock transport for testing +class MockTransport : public SubscriptionTransportProvider { + public: + explicit MockTransport(std::string proto) : proto_(std::move(proto)) { + } + + std::string protocol() const override { + return proto_; + } + + tl::expected start(const CyclicSubscriptionInfo & /*info*/, ResourceSamplerFn /*sampler*/, + GatewayNode * /*node*/) override { + started_ids_.push_back("started"); + return "mock://event-source"; + } + + void notify_update(const std::string & /*sub_id*/) override { + } + + void stop(const std::string & sub_id) override { + stopped_ids_.push_back(sub_id); + } + + std::vector started_ids_; + std::vector stopped_ids_; + + private: + std::string proto_; +}; + +// @verifies REQ_INTEROP_089 +TEST(TransportRegistryTest, RegisterAndLookup) { + TransportRegistry registry; + auto mock = std::make_unique("mqtt"); + auto * raw = mock.get(); + registry.register_transport(std::move(mock)); + + EXPECT_TRUE(registry.has_transport("mqtt")); + EXPECT_EQ(registry.get_transport("mqtt"), raw); +} + +TEST(TransportRegistryTest, HasTransportReturnsFalseForUnregistered) { + TransportRegistry registry; + EXPECT_FALSE(registry.has_transport("nonexistent")); + EXPECT_EQ(registry.get_transport("nonexistent"), nullptr); +} + +TEST(TransportRegistryTest, RejectDuplicateRegistration) { + TransportRegistry registry; + registry.register_transport(std::make_unique("mqtt")); + EXPECT_THROW(registry.register_transport(std::make_unique("mqtt")), std::runtime_error); +} + +TEST(TransportRegistryTest, ShutdownAllCallsStopForActiveSubscriptions) { + TransportRegistry registry; + auto mock = std::make_unique("sse"); + auto * raw = mock.get(); + registry.register_transport(std::move(mock)); + + // Create some subscriptions and wire on_removed to transport + SubscriptionManager mgr(10); + mgr.set_on_removed([raw](const CyclicSubscriptionInfo & info) { + raw->stop(info.id); + }); + + auto r1 = mgr.create("e1", "apps", "/r1", "data", "/topic", "sse", CyclicInterval::FAST, 300); + auto r2 = mgr.create("e2", "apps", "/r2", "faults", "", "sse", CyclicInterval::NORMAL, 300); + ASSERT_TRUE(r1.has_value()); + ASSERT_TRUE(r2.has_value()); + + registry.shutdown_all(mgr); + + // stop() should have been called for both subscriptions + EXPECT_EQ(raw->stopped_ids_.size(), 2u); +} diff --git a/src/ros2_medkit_integration_tests/test/features/test_multi_collection_subscriptions.test.py b/src/ros2_medkit_integration_tests/test/features/test_multi_collection_subscriptions.test.py new file mode 100644 index 00000000..b0ad0de0 --- /dev/null +++ b/src/ros2_medkit_integration_tests/test/features/test_multi_collection_subscriptions.test.py @@ -0,0 +1,423 @@ +#!/usr/bin/env python3 +# Copyright 2026 bburda +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Integration tests for multi-collection cyclic subscriptions. + +Validates that cyclic subscriptions can be created for data, faults, +and configurations collections, and that error cases (unsupported +collection, invalid URI, entity mismatch, unsupported protocol, +path traversal) return 400. + +""" + +import json +import threading +import time +import unittest + +import launch_testing +import launch_testing.actions +import requests + +from ros2_medkit_test_utils.constants import ALLOWED_EXIT_CODES, API_BASE_PATH +from ros2_medkit_test_utils.gateway_test_case import GatewayTestCase +from ros2_medkit_test_utils.launch_helpers import create_test_launch + + +def generate_test_description(): + """Launch gateway with demo nodes for multi-collection subscription testing.""" + return create_test_launch( + demo_nodes=['temp_sensor'], + fault_manager=True, + gateway_params={ + 'sse.max_clients': 10, + 'sse.max_subscriptions': 50, + }, + ) + + +class TestMultiCollectionSubscriptions(GatewayTestCase): + """Test cyclic subscriptions across different resource collections.""" + + MIN_EXPECTED_APPS = 1 + REQUIRED_APPS = {'temp_sensor'} + + app_id = None + topic_id = None + data_resource_uri = None + + @classmethod + def setUpClass(cls): + """Wait for gateway, discover temp sensor, and find a data topic.""" + super().setUpClass() + + # Find the temp_sensor app + try: + r = requests.get(f'{cls.BASE_URL}/apps', timeout=5) + if r.status_code == 200: + apps = r.json().get('items', []) + for app in apps: + if 'temp_sensor' in app.get('id', '').lower() or \ + 'engine' in app.get('id', '').lower(): + cls.app_id = app['id'] + break + except requests.exceptions.RequestException: + pass + + if not cls.app_id: + raise AssertionError('Demo temp sensor app not discovered') + + # Wait for data availability on the discovered app + # Skip ROS 2 system topics that don't have continuous data flow + system_topics = {'/parameter_events', '/rosout'} + deadline = time.monotonic() + 15.0 + while time.monotonic() < deadline: + try: + r = requests.get( + f'{cls.BASE_URL}/apps/{cls.app_id}/data', timeout=5, + ) + if r.status_code == 200: + items = r.json().get('items', []) + for item in items: + if item['id'] not in system_topics: + cls.topic_id = item['id'] + cls.data_resource_uri = ( + f'/api/v1/apps/{cls.app_id}/data{cls.topic_id}' + ) + break + if cls.topic_id: + break + except requests.exceptions.RequestException: + pass + time.sleep(1.0) + + if not cls.topic_id: + raise AssertionError( + f'No data items available for app {cls.app_id}' + ) + + def _create_subscription(self, resource, interval='normal', duration=60, + protocol=None): + """Create a subscription and return the raw response.""" + body = { + 'resource': resource, + 'interval': interval, + 'duration': duration, + } + if protocol is not None: + body['protocol'] = protocol + return requests.post( + f'{self.BASE_URL}/apps/{self.app_id}/cyclic-subscriptions', + json=body, + timeout=5, + ) + + def _delete_subscription(self, sub_id): + """Delete a subscription, ignoring errors for cleanup.""" + try: + requests.delete( + f'{self.BASE_URL}/apps/{self.app_id}' + f'/cyclic-subscriptions/{sub_id}', + timeout=5, + ) + except requests.exceptions.RequestException: + pass + + # =================================================================== + # Happy path: data, faults, configurations subscriptions + # =================================================================== + + def test_data_subscription_create(self): + """Create a subscription on data collection (regression test). + + @verifies REQ_INTEROP_089 + """ + r = self._create_subscription(self.data_resource_uri) + self.assertEqual(r.status_code, 201, f'Create failed: {r.text}') + + data = r.json() + self.addCleanup(self._delete_subscription, data['id']) + + self.assertIn('id', data) + self.assertTrue(data['id'].startswith('sub_')) + self.assertEqual(data['protocol'], 'sse') + self.assertEqual(data['interval'], 'normal') + self.assertIn('observed_resource', data) + self.assertIn('event_source', data) + self.assertTrue(data['event_source'].endswith('/events')) + + def test_faults_subscription_create(self): + """Create a subscription on faults collection. + + @verifies REQ_INTEROP_089 + """ + resource = f'/api/v1/apps/{self.app_id}/faults' + r = self._create_subscription(resource, interval='fast') + self.assertEqual(r.status_code, 201, f'Create failed: {r.text}') + + data = r.json() + self.addCleanup(self._delete_subscription, data['id']) + + self.assertEqual(data['protocol'], 'sse') + self.assertEqual(data['interval'], 'fast') + self.assertIn('observed_resource', data) + self.assertIn('event_source', data) + + def test_configurations_subscription_create(self): + """Create a subscription on configurations collection. + + @verifies REQ_INTEROP_089 + """ + resource = f'/api/v1/apps/{self.app_id}/configurations' + r = self._create_subscription(resource, interval='slow') + self.assertEqual(r.status_code, 201, f'Create failed: {r.text}') + + data = r.json() + self.addCleanup(self._delete_subscription, data['id']) + + self.assertEqual(data['protocol'], 'sse') + self.assertEqual(data['interval'], 'slow') + self.assertIn('observed_resource', data) + self.assertIn('event_source', data) + + def test_logs_subscription_create(self): + """Create a subscription on logs collection. + + @verifies REQ_INTEROP_089 + """ + resource = f'/api/v1/apps/{self.app_id}/logs' + r = self._create_subscription(resource, interval='slow') + self.assertEqual(r.status_code, 201, f'Create failed: {r.text}') + + data = r.json() + self.addCleanup(self._delete_subscription, data['id']) + + self.assertEqual(data['protocol'], 'sse') + self.assertEqual(data['interval'], 'slow') + self.assertIn('observed_resource', data) + self.assertIn('event_source', data) + + def test_logs_subscription_sse_stream(self): + """SSE stream on logs collection delivers EventEnvelope with log items. + + Creates a cyclic subscription on /apps/{id}/logs, connects to the SSE + events endpoint, and verifies that EventEnvelope payloads arrive with + log entry items containing id, timestamp, severity, and message fields. + + @verifies REQ_INTEROP_089 + @verifies REQ_INTEROP_090 + """ + resource = f'/api/v1/apps/{self.app_id}/logs' + r = self._create_subscription(resource, interval='slow', duration=30) + self.assertEqual(r.status_code, 201, f'Create failed: {r.text}') + + data = r.json() + self.addCleanup(self._delete_subscription, data['id']) + + events_url = ( + f'{self.BASE_URL}{data["event_source"].removeprefix(API_BASE_PATH)}' + ) + + received_events = [] + stop_event = threading.Event() + + def collect_events(): + try: + with requests.get(events_url, stream=True, timeout=15) as resp: + for line in resp.iter_lines(decode_unicode=True): + if stop_event.is_set(): + break + if line and line.startswith('data: '): + event = json.loads(line[6:]) + received_events.append(event) + if len(received_events) >= 2: + stop_event.set() + break + except requests.exceptions.RequestException: + pass + + thread = threading.Thread(target=collect_events, daemon=True) + thread.start() + + # slow interval = 500ms, so 2 events should arrive within ~5s + stop_event.wait(timeout=15) + thread.join(timeout=5) + + self.assertGreaterEqual( + len(received_events), 1, + f'Expected at least 1 SSE log event, got {len(received_events)}', + ) + + for event in received_events: + self.assertIn('timestamp', event, 'EventEnvelope must have timestamp') + self.assertIn('payload', event, 'EventEnvelope must have payload') + payload = event['payload'] + self.assertIn('items', payload, 'Logs payload must have items array') + # Log entries from /rosout should be present after demo node startup + if payload['items']: + entry = payload['items'][0] + self.assertIn('id', entry) + self.assertIn('timestamp', entry) + self.assertIn('severity', entry) + self.assertIn('message', entry) + + # =================================================================== + # CRUD operations: list, get, update, delete + # =================================================================== + + def test_list_subscriptions(self): + """List subscriptions returns items array. + + @verifies REQ_INTEROP_025 + """ + # Create a subscription first + r = self._create_subscription(self.data_resource_uri, duration=60) + self.assertEqual(r.status_code, 201) + sub_id = r.json()['id'] + self.addCleanup(self._delete_subscription, sub_id) + + # List + r = requests.get( + f'{self.BASE_URL}/apps/{self.app_id}/cyclic-subscriptions', + timeout=5, + ) + self.assertEqual(r.status_code, 200) + data = r.json() + self.assertIn('items', data) + ids = [s['id'] for s in data['items']] + self.assertIn(sub_id, ids) + + def test_get_subscription(self): + """Get single subscription returns SOVD fields. + + @verifies REQ_INTEROP_026 + """ + r = self._create_subscription(self.data_resource_uri, duration=60) + self.assertEqual(r.status_code, 201) + sub_id = r.json()['id'] + self.addCleanup(self._delete_subscription, sub_id) + + r = requests.get( + f'{self.BASE_URL}/apps/{self.app_id}' + f'/cyclic-subscriptions/{sub_id}', + timeout=5, + ) + self.assertEqual(r.status_code, 200) + data = r.json() + self.assertEqual(data['id'], sub_id) + self.assertIn('observed_resource', data) + self.assertIn('event_source', data) + self.assertIn('protocol', data) + self.assertIn('interval', data) + + def test_update_subscription_interval(self): + """Update subscription interval. + + @verifies REQ_INTEROP_027 + """ + r = self._create_subscription( + self.data_resource_uri, interval='normal', duration=60, + ) + self.assertEqual(r.status_code, 201) + sub_id = r.json()['id'] + self.addCleanup(self._delete_subscription, sub_id) + + r = requests.put( + f'{self.BASE_URL}/apps/{self.app_id}' + f'/cyclic-subscriptions/{sub_id}', + json={'interval': 'fast'}, + timeout=5, + ) + self.assertEqual(r.status_code, 200) + self.assertEqual(r.json()['interval'], 'fast') + + def test_delete_subscription(self): + """Delete subscription returns 204. + + @verifies REQ_INTEROP_028 + """ + r = self._create_subscription(self.data_resource_uri, duration=60) + self.assertEqual(r.status_code, 201) + sub_id = r.json()['id'] + + r = requests.delete( + f'{self.BASE_URL}/apps/{self.app_id}' + f'/cyclic-subscriptions/{sub_id}', + timeout=5, + ) + self.assertEqual(r.status_code, 204) + + # Verify it's gone + r = requests.get( + f'{self.BASE_URL}/apps/{self.app_id}' + f'/cyclic-subscriptions/{sub_id}', + timeout=5, + ) + self.assertEqual(r.status_code, 404) + + def test_get_nonexistent_subscription_returns_404(self): + """Get nonexistent subscription returns 404.""" + r = requests.get( + f'{self.BASE_URL}/apps/{self.app_id}' + f'/cyclic-subscriptions/nonexistent_sub', + timeout=5, + ) + self.assertEqual(r.status_code, 404) + + # =================================================================== + # Error cases + # =================================================================== + + def test_unsupported_collection_returns_400(self): + """Subscription on unknown collection returns 400.""" + resource = f'/api/v1/apps/{self.app_id}/unknown_collection' + r = self._create_subscription(resource) + self.assertEqual(r.status_code, 400) + + def test_invalid_resource_uri_returns_400(self): + """Invalid resource URI format returns 400.""" + r = self._create_subscription('/invalid/uri') + self.assertEqual(r.status_code, 400) + + def test_entity_mismatch_returns_400(self): + """Resource URI referencing different entity returns 400.""" + resource = '/api/v1/apps/OTHER_nonexistent_entity/data/topic' + r = self._create_subscription(resource) + self.assertEqual(r.status_code, 400) + + def test_unsupported_protocol_returns_400(self): + """Unsupported protocol returns 400.""" + r = self._create_subscription( + self.data_resource_uri, protocol='mqtt', + ) + self.assertEqual(r.status_code, 400) + + def test_path_traversal_returns_400(self): + """Resource path with '..' returns 400.""" + resource = f'/api/v1/apps/{self.app_id}/data/../../../etc/passwd' + r = self._create_subscription(resource) + self.assertEqual(r.status_code, 400) + + +@launch_testing.post_shutdown_test() +class TestShutdown(unittest.TestCase): + + def test_exit_codes(self, proc_info): + """Check all processes exited cleanly (SIGTERM allowed).""" + for info in proc_info: + self.assertIn( + info.returncode, ALLOWED_EXIT_CODES, + f'{info.process_name} exited with code {info.returncode}' + ) diff --git a/src/ros2_medkit_integration_tests/test/features/test_update_status_subscription.test.py b/src/ros2_medkit_integration_tests/test/features/test_update_status_subscription.test.py new file mode 100644 index 00000000..082426f1 --- /dev/null +++ b/src/ros2_medkit_integration_tests/test/features/test_update_status_subscription.test.py @@ -0,0 +1,366 @@ +#!/usr/bin/env python3 +# Copyright 2026 bburda +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Integration test: cyclic subscription on /updates/{id}/status. + +Validates that a client can create a cyclic subscription to monitor +update progress via SSE, receiving periodic status snapshots as the +update transitions through pending -> inProgress -> completed. +""" + +import json +import os +import threading +import time +import unittest + +from launch import LaunchDescription +from launch.actions import TimerAction +import launch_ros.actions +import launch_testing +import launch_testing.actions +import requests + +from ros2_medkit_test_utils.constants import ( + ALLOWED_EXIT_CODES, + API_BASE_PATH, + get_test_port, +) +from ros2_medkit_test_utils.gateway_test_case import GatewayTestCase +from ros2_medkit_test_utils.launch_helpers import get_coverage_env + + +PORT = get_test_port(0) + + +def _get_test_plugin_path(): + """Get path to test_update_backend.so demo plugin.""" + from ament_index_python.packages import get_package_prefix + + pkg_prefix = get_package_prefix('ros2_medkit_gateway') + return os.path.join( + pkg_prefix, 'lib', 'ros2_medkit_gateway', 'libtest_update_backend.so' + ) + + +def generate_test_description(): + """Launch gateway with update plugin and a demo node.""" + coverage_env = get_coverage_env() + plugin_path = _get_test_plugin_path() + + gateway = launch_ros.actions.Node( + package='ros2_medkit_gateway', + executable='gateway_node', + name='gateway_update_sub', + output='screen', + parameters=[{ + 'server.host': '127.0.0.1', + 'server.port': PORT, + 'refresh_interval_ms': 1000, + 'updates.enabled': True, + 'plugins': ['test_update_backend'], + 'plugins.test_update_backend.path': plugin_path, + 'sse.max_clients': 10, + 'sse.max_subscriptions': 50, + }], + additional_env=coverage_env, + ) + + # Need at least one demo node so we have an entity for subscriptions + demo_node = launch_ros.actions.Node( + package='ros2_medkit_integration_tests', + executable='demo_engine_temp_sensor', + name='temp_sensor', + namespace='/powertrain/engine', + output='screen', + additional_env=coverage_env, + ) + + return LaunchDescription([ + gateway, + TimerAction( + period=2.0, + actions=[demo_node], + ), + launch_testing.actions.ReadyToTest(), + ]) + + +class TestUpdateStatusSubscription(GatewayTestCase): + """Test cyclic subscription on /updates/{id}/status.""" + + BASE_URL = f'http://127.0.0.1:{PORT}{API_BASE_PATH}' + MIN_EXPECTED_APPS = 1 + REQUIRED_APPS = {'temp_sensor'} + + app_id = None + + @classmethod + def setUpClass(cls): + """Wait for gateway and discover an app for subscriptions.""" + super().setUpClass() + + # Find the temp_sensor app + r = requests.get(f'{cls.BASE_URL}/apps', timeout=10) + apps = r.json().get('items', []) + for app in apps: + if 'temp_sensor' in app.get('id', '').lower(): + cls.app_id = app['id'] + break + + if not cls.app_id: + raise AssertionError('No app discovered for subscription host') + + def _register_package(self, pkg_id, automated=False): + """Register update package and schedule cleanup.""" + pkg = { + 'id': pkg_id, + 'update_name': f'Test {pkg_id}', + 'automated': automated, + 'origins': ['proximity'], + } + r = requests.post(f'{self.BASE_URL}/updates', json=pkg, timeout=5) + self.assertEqual(r.status_code, 201) + self.addCleanup(self._cleanup_package, pkg_id) + + def _cleanup_package(self, pkg_id): + """Best-effort cleanup.""" + deadline = time.monotonic() + 10.0 + while time.monotonic() < deadline: + r = requests.get( + f'{self.BASE_URL}/updates/{pkg_id}/status', timeout=5 + ) + if r.status_code == 404: + break + if r.status_code == 200 and r.json().get('status') in ( + 'completed', 'failed', + ): + break + time.sleep(0.2) + requests.delete(f'{self.BASE_URL}/updates/{pkg_id}', timeout=5) + + def _cleanup_subscription(self, sub_id): + """Best-effort subscription cleanup.""" + requests.delete( + f'{self.BASE_URL}/apps/{self.app_id}' + f'/cyclic-subscriptions/{sub_id}', + timeout=5, + ) + + def test_01_create_subscription_on_update_status(self): + """POST cyclic-subscriptions with /updates/{id}/status resource.""" + self._register_package('sub-test-pkg') + + r = requests.post( + f'{self.BASE_URL}/apps/{self.app_id}/cyclic-subscriptions', + json={ + 'resource': '/api/v1/updates/sub-test-pkg/status', + 'interval': 'fast', + 'duration': 60, + }, + timeout=5, + ) + self.assertEqual(r.status_code, 201, f'Create failed: {r.text}') + data = r.json() + self.assertIn('id', data) + self.assertEqual( + data['observed_resource'], + '/api/v1/updates/sub-test-pkg/status', + ) + self.assertIn('event_source', data) + self.assertEqual(data['protocol'], 'sse') + self.addCleanup(self._cleanup_subscription, data['id']) + + def test_02_subscription_listed_for_entity(self): + """Subscription on update status appears in entity's subscription list.""" + self._register_package('sub-list-pkg') + + r = requests.post( + f'{self.BASE_URL}/apps/{self.app_id}/cyclic-subscriptions', + json={ + 'resource': '/api/v1/updates/sub-list-pkg/status', + 'interval': 'normal', + 'duration': 60, + }, + timeout=5, + ) + self.assertEqual(r.status_code, 201) + sub_id = r.json()['id'] + self.addCleanup(self._cleanup_subscription, sub_id) + + r = requests.get( + f'{self.BASE_URL}/apps/{self.app_id}/cyclic-subscriptions', + timeout=5, + ) + self.assertEqual(r.status_code, 200) + items = r.json().get('items', []) + sub_ids = [s['id'] for s in items] + self.assertIn(sub_id, sub_ids) + + def test_03_sse_stream_delivers_update_progress(self): + """SSE stream delivers status snapshots during update prepare.""" + self._register_package('sub-sse-pkg') + + # Create subscription + r = requests.post( + f'{self.BASE_URL}/apps/{self.app_id}/cyclic-subscriptions', + json={ + 'resource': '/api/v1/updates/sub-sse-pkg/status', + 'interval': 'fast', + 'duration': 120, + }, + timeout=5, + ) + self.assertEqual(r.status_code, 201) + sub_data = r.json() + sub_id = sub_data['id'] + event_source = sub_data['event_source'] + self.addCleanup(self._cleanup_subscription, sub_id) + + # Construct full URL for SSE events endpoint + # event_source is like /api/v1/apps/{id}/cyclic-subscriptions/{sub}/events + events_url = f'http://127.0.0.1:{PORT}{event_source}' + + # Collect SSE events in background + events = [] + stop_event = threading.Event() + + def collect_sse(): + try: + with requests.get( + events_url, + stream=True, + timeout=30, + headers={'Accept': 'text/event-stream'}, + ) as resp: + for line in resp.iter_lines(decode_unicode=True): + if stop_event.is_set(): + break + if line and line.startswith('data: '): + try: + payload = json.loads(line[6:]) + events.append(payload) + except json.JSONDecodeError: + pass + except Exception: + pass + + sse_thread = threading.Thread(target=collect_sse, daemon=True) + sse_thread.start() + + # Give SSE connection time to establish + time.sleep(1.0) + + # Trigger update prepare to generate status changes + requests.put( + f'{self.BASE_URL}/updates/sub-sse-pkg/prepare', timeout=5 + ) + + # Wait for prepare to complete and SSE events to arrive + deadline = time.monotonic() + 30.0 + while time.monotonic() < deadline: + r = requests.get( + f'{self.BASE_URL}/updates/sub-sse-pkg/status', timeout=5 + ) + if r.status_code == 200 and r.json().get('status') == 'completed': + break + time.sleep(0.5) + + # Give a bit more time for final SSE events + time.sleep(2.0) + stop_event.set() + sse_thread.join(timeout=5) + + # Validate we received SSE events with EventEnvelope format + self.assertGreater(len(events), 0, 'No SSE events received') + + # Each event should be an EventEnvelope with timestamp + payload + statuses_seen = set() + for event in events: + self.assertIn('timestamp', event, 'EventEnvelope must have timestamp') + if 'payload' in event: + payload = event['payload'] + if 'status' in payload: + statuses_seen.add(payload['status']) + + # We should see at least one update status + self.assertTrue( + statuses_seen & {'pending', 'inProgress', 'completed'}, + f'Expected update status in events, saw: {statuses_seen}', + ) + + def test_04_subscription_with_nonexistent_package(self): + """Subscription on nonexistent update package - accepts 201 or 503.""" + # Package not registered - sampler will return error + r = requests.post( + f'{self.BASE_URL}/apps/{self.app_id}/cyclic-subscriptions', + json={ + 'resource': '/api/v1/updates/nonexistent-pkg/status', + 'interval': 'normal', + 'duration': 60, + }, + timeout=5, + ) + # Subscription creation should succeed (sampler check passes - updates + # sampler is registered). But the SSE stream will deliver error frames. + # OR: if the sampler does an initial check, it returns 503. + # Accept either 201 (lazy) or 503 (eager validation). + self.assertIn(r.status_code, [201, 503]) + if r.status_code == 201: + self.addCleanup(self._cleanup_subscription, r.json()['id']) + + def test_05_delete_subscription(self): + """DELETE removes the update status subscription.""" + self._register_package('sub-del-pkg') + + r = requests.post( + f'{self.BASE_URL}/apps/{self.app_id}/cyclic-subscriptions', + json={ + 'resource': '/api/v1/updates/sub-del-pkg/status', + 'interval': 'normal', + 'duration': 60, + }, + timeout=5, + ) + self.assertEqual(r.status_code, 201) + sub_id = r.json()['id'] + + r = requests.delete( + f'{self.BASE_URL}/apps/{self.app_id}' + f'/cyclic-subscriptions/{sub_id}', + timeout=5, + ) + self.assertEqual(r.status_code, 204) + + # Verify gone + r = requests.get( + f'{self.BASE_URL}/apps/{self.app_id}' + f'/cyclic-subscriptions/{sub_id}', + timeout=5, + ) + self.assertEqual(r.status_code, 404) + + +@launch_testing.post_shutdown_test() +class TestShutdown(unittest.TestCase): + """Verify clean shutdown.""" + + def test_exit_codes(self, proc_info): + for info in proc_info: + self.assertIn( + info.returncode, + ALLOWED_EXIT_CODES, + f'Process {info.process_name} exited with {info.returncode}', + )