Support all SOVD resource collections in cyclic subscriptions#254
Support all SOVD resource collections in cyclic subscriptions#254
Conversation
…ation Add error codes for invalid resource URI, unsupported collection, missing data provider, entity mismatch, and unsupported protocol. Ref: #253
Return ParsedResourceUri struct with entity_type, entity_id, collection, resource_path. Supports data, faults, configurations, vendor extensions. Add path traversal protection. Make method public for testing. Ref: #253
Introduces ResourceSamplerFn type and thread-safe ResourceSamplerRegistry for mapping SOVD resource collections to sampling functions. Built-in collections use last-write-wins; plugin collections require x- prefix and reject duplicates. Ref: #253
Validate collection via supports_collection (known) or x- prefix (vendor). Check sampler registry for data provider. Use transport registry for protocol lookup. Delegate handle_events to transport. Add collection field to JSON response. Ref: #253
…ortRegistry Virtual interface for pluggable transport protocols (SSE, MQTT, WebSocket, Zenoh). TransportRegistry manages provider lifecycle and shutdown. SSE is built-in default; non-SSE protocols are vendor extensions. Also fix build break by wiring ResourceSamplerRegistry and TransportRegistry into RESTServer so CyclicSubscriptionHandlers' updated constructor compiles. Ref: #253
Refactor SSE streaming loop into SseTransportProvider implementing SubscriptionTransportProvider interface. Uses ResourceSamplerFn instead of direct NativeTopicSampler calls. handle_events() now delegates to transport registry. Sampler exceptions caught and sent as error events. Ref: #253
…dering Initialize ResourceSamplerRegistry and TransportRegistry in GatewayNode. Register built-in samplers (data, faults, configurations, communication-logs). Register SseTransportProvider as default transport. Wire on_removed callback for transport cleanup. Move SSEClientTracker from RESTServer to GatewayNode. Fix shutdown: REST stop -> transport stop -> sub_mgr shutdown -> plugin shutdown -> destruction. Ref: #253
Add register_resource_sampler() and register_transport() methods to PluginManager. Plugins can register vendor collection samplers (x- prefix) and custom transport protocols during configure() phase. Ref: #253
Integration tests for data, faults, and configurations subscriptions. Error case coverage: unsupported collection, invalid URI, entity mismatch, unsupported protocol, path traversal. Ref: #253
Name unused parameters, add const to non-modified copy, reserve vector capacity before loop. Ref: #253
There was a problem hiding this comment.
Pull request overview
Adds generalized cyclic subscription support across all SOVD resource collections (plus x-* vendor extensions) by introducing registries for resource samplers and transport providers, and refactoring SSE streaming into a dedicated transport provider.
Changes:
- Introduces
ResourceSamplerRegistry(thread-safe) andTransportRegistry(protocol-keyed) with plugin registration hooks. - Refactors cyclic subscription creation to accept
{collection, resource_path}from generalizedparse_resource_uri()(with traversal protection) and to start delivery via transport providers. - Adds unit + integration tests covering registries, URI parsing, on-removed lifecycle hook, and multi-collection subscription flows.
Reviewed changes
Copilot reviewed 22 out of 22 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| src/ros2_medkit_integration_tests/test/features/test_multi_collection_subscriptions.test.py | New integration tests validating create/error cases across multiple collections and protocols. |
| src/ros2_medkit_gateway/test/test_transport_registry.cpp | New unit tests for transport registry lookup and shutdown behavior. |
| src/ros2_medkit_gateway/test/test_subscription_manager.cpp | Updates tests for new create() signature; adds on_removed callback tests. |
| src/ros2_medkit_gateway/test/test_resource_sampler_registry.cpp | New unit tests for sampler registration rules and concurrency. |
| src/ros2_medkit_gateway/test/test_cyclic_subscription_handlers.cpp | Adds parse_resource_uri test coverage for multiple collections and traversal rejection. |
| src/ros2_medkit_gateway/src/subscription_transport.cpp | Implements basic transport registry and shutdown_all delegating to SubscriptionManager. |
| src/ros2_medkit_gateway/src/subscription_manager.cpp | Extends subscription info fields; adds on_removed hook; updates remove/expiry/shutdown lifecycle. |
| src/ros2_medkit_gateway/src/resource_sampler.cpp | Implements thread-safe sampler registry with builtin/plugin registration policies. |
| src/ros2_medkit_gateway/src/plugins/plugin_manager.cpp | Adds plugin APIs to register samplers/transports and wires registries into PluginManager. |
| src/ros2_medkit_gateway/src/http/rest_server.cpp | Injects sampler/transport registries into cyclic subscription handlers; uses node-provided SSE tracker. |
| src/ros2_medkit_gateway/src/http/handlers/sse_transport_provider.cpp | New SSE transport provider extracting former handler streaming logic and invoking samplers safely. |
| src/ros2_medkit_gateway/src/http/handlers/cyclic_subscription_handlers.cpp | Generalizes resource URI parsing; validates collection/protocol; starts delivery via transport provider. |
| src/ros2_medkit_gateway/src/gateway_node.cpp | Initializes registries, built-in samplers/transports, lifecycle wiring, and shutdown ordering. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/subscription_transport.hpp | New transport provider interface and registry. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/subscription_manager.hpp | Updates CyclicSubscriptionInfo; new create() signature; adds on_removed callback API. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/resource_sampler.hpp | New sampler function type + thread-safe sampler registry interface. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/plugins/plugin_manager.hpp | Exposes plugin APIs for sampler/transport registration + registry wiring. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/http/handlers/sse_transport_provider.hpp | Declares SSE transport provider implementation. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/http/handlers/cyclic_subscription_handlers.hpp | Adds ParsedResourceUri and parse_resource_uri() API; injects registries. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/http/error_codes.hpp | Adds vendor-specific error codes for URI/collection/protocol validation. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/gateway_node.hpp | Exposes registries and shared SSE client tracker from GatewayNode. |
| src/ros2_medkit_gateway/CMakeLists.txt | Adds new sources and unit tests to build/test targets. |
| std::unique_lock lock(mutex_); | ||
|
|
||
| if (!is_builtin) { | ||
| if (collection.substr(0, 2) != "x-") { |
There was a problem hiding this comment.
collection.substr(0, 2) will throw std::out_of_range when collection.size() < 2 (e.g., empty string), causing an unexpected exception type and bypassing your intended error message. Guard on collection.size() >= 2 (or use starts_with("x-") if available) before calling substr.
| if (collection.substr(0, 2) != "x-") { | |
| if (collection.size() < 2 || collection.substr(0, 2) != "x-") { |
| // Security: reject path traversal in resource_path | ||
| if (parsed.resource_path.find("/..") != std::string::npos || parsed.resource_path.find("../") != std::string::npos) { | ||
| return tl::make_unexpected("Resource path must not contain '..'"); |
There was a problem hiding this comment.
The traversal check is substring-based and can reject benign paths containing .. as part of a larger segment (e.g., /..foo), while still not precisely enforcing “segment equals '..'”. Prefer checking path segments (reject .. when it appears as a full segment, e.g., (^|/)\\.\\.(?=/|$)), which is both stricter and avoids false positives.
| // Security: reject path traversal in resource_path | |
| if (parsed.resource_path.find("/..") != std::string::npos || parsed.resource_path.find("../") != std::string::npos) { | |
| return tl::make_unexpected("Resource path must not contain '..'"); | |
| // Security: reject path traversal in resource_path (only when '..' is a full path segment) | |
| static const std::regex traversal_pattern(R"((^|/)\\.\\.(?=/|$))"); | |
| if (std::regex_search(parsed.resource_path, traversal_pattern)) { | |
| return tl::make_unexpected("Resource path must not contain '..' as a path segment"); |
| /// Set callback invoked when a subscription is removed (remove, cleanup_expired, shutdown) | ||
| void set_on_removed(std::function<void(const CyclicSubscriptionInfo &)> callback); | ||
|
|
||
| /** | ||
| * @brief Wait for an update or removal on the given subscription. | ||
| * |
There was a problem hiding this comment.
on_removed_ is read/invoked concurrently from remove(), cleanup_expired(), and shutdown(), but it can also be reassigned via set_on_removed() with no synchronization. This is a data race on std::function. Consider either (a) making set_on_removed() only callable during initialization (enforced/documented + not callable after threads start), or (b) protecting reads/writes with a mutex, or (c) storing it as an atomic std::shared_ptr<const Callback> and copying the pointer before invocation.
| std::atomic<bool> shutdown_flag_{false}; | ||
| std::function<void(const CyclicSubscriptionInfo &)> on_removed_; |
There was a problem hiding this comment.
on_removed_ is read/invoked concurrently from remove(), cleanup_expired(), and shutdown(), but it can also be reassigned via set_on_removed() with no synchronization. This is a data race on std::function. Consider either (a) making set_on_removed() only callable during initialization (enforced/documented + not callable after threads start), or (b) protecting reads/writes with a mutex, or (c) storing it as an atomic std::shared_ptr<const Callback> and copying the pointer before invocation.
| void SseTransportProvider::stop(const std::string & sub_id) { | ||
| std::lock_guard<std::mutex> lock(mutex_); | ||
| streams_.erase(sub_id); | ||
| // The SSE loop detects inactive via sub_mgr_.is_active() and exits. | ||
| } |
There was a problem hiding this comment.
stop() does not actually stop an in-flight SSE stream if the subscription remains active: the streaming loop uses sub_mgr_.is_active() and a captured sampler (captured_sampler), so erasing streams_[sub_id] won’t affect existing connections. To make stop() effective, add a transport-local cancellation signal that the loop checks (e.g., a StreamState flag keyed by sub_id, or re-check streams_.contains(sub_id) each iteration) and have stop() flip that signal before returning.
|
|
||
| namespace ros2_medkit_gateway { | ||
|
|
||
| void TransportRegistry::register_transport(std::unique_ptr<SubscriptionTransportProvider> provider) { | ||
| auto proto = provider->protocol(); |
There was a problem hiding this comment.
register_transport() silently overwrites an existing provider for the same protocol. That can lead to hard-to-debug behavior (e.g., a plugin accidentally replacing the built-in sse provider). Consider rejecting duplicates (throw/return error), or at least logging a warning and requiring an explicit “replace” API to intentionally override.
| namespace ros2_medkit_gateway { | |
| void TransportRegistry::register_transport(std::unique_ptr<SubscriptionTransportProvider> provider) { | |
| auto proto = provider->protocol(); | |
| #include <iostream> | |
| namespace ros2_medkit_gateway { | |
| void TransportRegistry::register_transport(std::unique_ptr<SubscriptionTransportProvider> provider) { | |
| auto proto = provider->protocol(); | |
| if (transports_.count(proto) > 0) { | |
| std::cerr << "[ros2_medkit_gateway] Transport for protocol '" << proto | |
| << "' is already registered; ignoring duplicate registration." << std::endl; | |
| return; | |
| } |
| // Security: reject path traversal in resource_path | ||
| if (parsed.resource_path.find("/..") != std::string::npos || parsed.resource_path.find("../") != std::string::npos) { | ||
| return tl::make_unexpected("Resource path must not contain '..'"); | ||
| } |
There was a problem hiding this comment.
The path traversal check is overly broad and will reject legitimate paths like /... (contains \"/..\"), and it doesn't validate traversal by segment boundaries. Consider splitting resource_path into path segments and rejecting only segments that equal \"..\" (and ideally also rejecting URL-encoded dot-dot like %2e%2e if inputs can be encoded).
| // Create subscription | ||
| auto result = sub_mgr_.create(entity_id, entity_type, resource, *topic_result, protocol, interval, duration); | ||
| auto result = sub_mgr_.create(entity_id, entity_type, resource, parsed->collection, parsed->resource_path, protocol, | ||
| interval, duration); |
There was a problem hiding this comment.
Subscriptions for the data collection appear to require a concrete ROS topic path, but parse_resource_uri() allows /api/v1/.../data with an empty resource_path. This can lead to creating a subscription that will always stream sampler errors at runtime. Suggest rejecting collection == \"data\" && resource_path.empty() (400 invalid resource URI), while still allowing empty paths for collections like faults that support collection-level sampling.
| SubscriptionTransportProvider * get_transport(const std::string & protocol) const; | ||
| bool has_transport(const std::string & protocol) const; | ||
|
|
||
| /// Stop all active subscriptions across all transports. |
There was a problem hiding this comment.
TransportRegistry::shutdown_all() is documented as stopping subscriptions across transports, but the current implementation delegates only to sub_mgr.shutdown() and doesn't interact with transports_ at all. Either (a) update the docstring to state that shutdown relies on SubscriptionManager::on_removed to stop transports, or (b) change shutdown_all() to actively stop each subscription via the appropriate provider (e.g., iterate active subscriptions, look up info.protocol, call provider->stop(info.id)), then shut down the manager.
| /// Stop all active subscriptions across all transports. | |
| /// Trigger shutdown of all managed subscriptions. | |
| /// | |
| /// This function delegates to the given SubscriptionManager instance | |
| /// (typically via SubscriptionManager::shutdown()). It relies on the | |
| /// manager's on_removed / teardown callbacks to invoke the appropriate | |
| /// SubscriptionTransportProvider::stop() for each active subscription and | |
| /// protocol, rather than directly iterating transports_ here. |
| void TransportRegistry::register_transport(std::unique_ptr<SubscriptionTransportProvider> provider) { | ||
| auto proto = provider->protocol(); | ||
| transports_[proto] = std::move(provider); | ||
| } | ||
|
|
||
| SubscriptionTransportProvider * TransportRegistry::get_transport(const std::string & protocol) const { | ||
| auto it = transports_.find(protocol); | ||
| if (it == transports_.end()) { | ||
| return nullptr; | ||
| } | ||
| return it->second.get(); | ||
| } |
There was a problem hiding this comment.
TransportRegistry isn't thread-safe: register_transport() mutates transports_ without synchronization while get_transport() reads it. Since transports can be registered via plugins and are queried on request paths, protect the map with a mutex/shared_mutex (similar to ResourceSamplerRegistry) or clearly constrain registration to single-threaded startup only and enforce that contract.
| // 2. Stop all transport providers for active subscriptions | ||
| if (transport_registry_) { | ||
| transport_registry_->shutdown_all(*subscription_mgr_); | ||
| } | ||
| // 3. Shutdown subscription manager (marks all inactive, triggers on_removed for stragglers) | ||
| if (subscription_mgr_) { | ||
| subscription_mgr_->shutdown(); | ||
| } | ||
| // 4. Shutdown plugins | ||
| if (plugin_mgr_) { | ||
| plugin_mgr_->shutdown_all(); | ||
| } | ||
| // 5. Normal member destruction (managers safe - all transports stopped) |
There was a problem hiding this comment.
transport_registry_->shutdown_all(*subscription_mgr_) currently calls sub_mgr.shutdown(), and the destructor then calls subscription_mgr_->shutdown() again. This is redundant and makes shutdown sequencing harder to reason about (and could double-trigger removal callbacks if shutdown_all changes later). Consider doing shutdown in exactly one place (either make shutdown_all() actually stop transports without shutting down the manager, or remove the extra subscription_mgr_->shutdown() call).
| // 2. Stop all transport providers for active subscriptions | |
| if (transport_registry_) { | |
| transport_registry_->shutdown_all(*subscription_mgr_); | |
| } | |
| // 3. Shutdown subscription manager (marks all inactive, triggers on_removed for stragglers) | |
| if (subscription_mgr_) { | |
| subscription_mgr_->shutdown(); | |
| } | |
| // 4. Shutdown plugins | |
| if (plugin_mgr_) { | |
| plugin_mgr_->shutdown_all(); | |
| } | |
| // 5. Normal member destruction (managers safe - all transports stopped) | |
| // 2. Stop all transport providers for active subscriptions (also shuts down subscription manager) | |
| if (transport_registry_) { | |
| transport_registry_->shutdown_all(*subscription_mgr_); | |
| } | |
| // 3. Shutdown plugins | |
| if (plugin_mgr_) { | |
| plugin_mgr_->shutdown_all(); | |
| } | |
| // 4. Normal member destruction (managers safe - all transports stopped) |
Summary
Make cyclic subscriptions work with any SOVD resource collection (data, faults, configurations, communication-logs) and vendor extensions (
x-*prefix), instead of being hard-coded to/data/only.Key changes:
x-prefixparse_resource_uri()- extracts entity type, entity ID, collection, and resource path from any valid SOVD resource URI with path traversal protectionCyclicSubscriptionInfo- replacedtopic_namewithcollection+resource_pathfieldson_removedcallback - SubscriptionManager lifecycle hook for transport cleanup on remove/expire/shutdownregister_resource_sampler()andregister_transport()for plugin-provided collections and transportsIssue
Type
Testing
test_resource_sampler_registry.cpp- 7 tests (register, lookup, builtin overwrite, plugin reject, concurrent access)test_transport_registry.cpp- 3 tests (register/lookup, missing transport, shutdown_all)test_cyclic_subscription_handlers.cpp- 11 new ParseResourceUri tests (all collections, vendor extensions, entity types, path traversal, invalid URIs)test_subscription_manager.cpp- 3 new on_removed callback tests (remove, expire, shutdown)test_multi_collection_subscriptions.test.py- 8 tests (data/faults/configurations create, unsupported collection, invalid URI, entity mismatch, unsupported protocol, path traversal)Checklist