Skip to content

Support all SOVD resource collections in cyclic subscriptions#254

Draft
bburda wants to merge 10 commits intomainfrom
feature/253-cyclic-subscriptions-all-collections
Draft

Support all SOVD resource collections in cyclic subscriptions#254
bburda wants to merge 10 commits intomainfrom
feature/253-cyclic-subscriptions-all-collections

Conversation

@bburda
Copy link
Collaborator

@bburda bburda commented Mar 5, 2026

Summary

Make cyclic subscriptions work with any SOVD resource collection (data, faults, configurations, communication-logs) and vendor extensions (x-* prefix), instead of being hard-coded to /data/ only.

Key changes:

  • ResourceSamplerRegistry - thread-safe registry mapping collection names to sampler functions. Built-in samplers (data, faults, configurations, communication-logs) use last-write-wins; plugins must use x- prefix
  • SubscriptionTransportProvider / TransportRegistry - pluggable transport interface (SSE built-in, extensible to MQTT/WebSocket/Zenoh) with protocol-keyed registry
  • SseTransportProvider - SSE streaming logic extracted from handler into a standalone transport provider with exception-safe sampler invocation
  • Generalized parse_resource_uri() - extracts entity type, entity ID, collection, and resource path from any valid SOVD resource URI with path traversal protection
  • CyclicSubscriptionInfo - replaced topic_name with collection + resource_path fields
  • on_removed callback - SubscriptionManager lifecycle hook for transport cleanup on remove/expire/shutdown
  • Proper shutdown ordering - REST stop -> transport stop -> sub_mgr shutdown -> plugin shutdown -> destruction
  • PluginManager extensions - register_resource_sampler() and register_transport() for plugin-provided collections and transports

Issue


Type

  • Bug fix
  • New feature or tests
  • Breaking change
  • Documentation only

Testing

  • 4 new unit test files (22 new test cases):
    • test_resource_sampler_registry.cpp - 7 tests (register, lookup, builtin overwrite, plugin reject, concurrent access)
    • test_transport_registry.cpp - 3 tests (register/lookup, missing transport, shutdown_all)
    • test_cyclic_subscription_handlers.cpp - 11 new ParseResourceUri tests (all collections, vendor extensions, entity types, path traversal, invalid URIs)
    • test_subscription_manager.cpp - 3 new on_removed callback tests (remove, expire, shutdown)
  • 1 new integration test file:
    • test_multi_collection_subscriptions.test.py - 8 tests (data/faults/configurations create, unsupported collection, invalid URI, entity mismatch, unsupported protocol, path traversal)
  • All 870 unit tests pass, all linters pass (clang-format, clang-tidy, ament)

Checklist

  • Breaking changes are clearly described (and announced in docs / changelog if needed)
  • Tests were added or updated if needed
  • Docs were updated if behavior or public API changed

bburda added 10 commits March 5, 2026 19:51
…ation

Add error codes for invalid resource URI, unsupported collection,
missing data provider, entity mismatch, and unsupported protocol.

Ref: #253
Return ParsedResourceUri struct with entity_type, entity_id, collection,
resource_path. Supports data, faults, configurations, vendor extensions.
Add path traversal protection. Make method public for testing.

Ref: #253
Introduces ResourceSamplerFn type and thread-safe ResourceSamplerRegistry
for mapping SOVD resource collections to sampling functions. Built-in
collections use last-write-wins; plugin collections require x- prefix
and reject duplicates.

Ref: #253
Validate collection via supports_collection (known) or x- prefix (vendor).
Check sampler registry for data provider. Use transport registry for
protocol lookup. Delegate handle_events to transport. Add collection
field to JSON response.

Ref: #253
…ortRegistry

Virtual interface for pluggable transport protocols (SSE, MQTT, WebSocket,
Zenoh). TransportRegistry manages provider lifecycle and shutdown.
SSE is built-in default; non-SSE protocols are vendor extensions.

Also fix build break by wiring ResourceSamplerRegistry and TransportRegistry
into RESTServer so CyclicSubscriptionHandlers' updated constructor compiles.

Ref: #253
Refactor SSE streaming loop into SseTransportProvider implementing
SubscriptionTransportProvider interface. Uses ResourceSamplerFn instead
of direct NativeTopicSampler calls. handle_events() now delegates to
transport registry. Sampler exceptions caught and sent as error events.

Ref: #253
…dering

Initialize ResourceSamplerRegistry and TransportRegistry in GatewayNode.
Register built-in samplers (data, faults, configurations, communication-logs).
Register SseTransportProvider as default transport. Wire on_removed callback
for transport cleanup. Move SSEClientTracker from RESTServer to GatewayNode.
Fix shutdown: REST stop -> transport stop -> sub_mgr shutdown -> plugin
shutdown -> destruction.

Ref: #253
Add register_resource_sampler() and register_transport() methods to
PluginManager. Plugins can register vendor collection samplers (x- prefix)
and custom transport protocols during configure() phase.

Ref: #253
Integration tests for data, faults, and configurations subscriptions.
Error case coverage: unsupported collection, invalid URI, entity mismatch,
unsupported protocol, path traversal.

Ref: #253
Name unused parameters, add const to non-modified copy, reserve vector
capacity before loop.

Ref: #253
Copilot AI review requested due to automatic review settings March 5, 2026 20:31
@bburda bburda self-assigned this Mar 5, 2026
@bburda bburda added the enhancement New feature or request label Mar 5, 2026
@bburda bburda marked this pull request as draft March 5, 2026 20:31
@bburda bburda changed the title feat(gateway): support all SOVD resource collections in cyclic subscriptions Support all SOVD resource collections in cyclic subscriptions Mar 5, 2026
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds generalized cyclic subscription support across all SOVD resource collections (plus x-* vendor extensions) by introducing registries for resource samplers and transport providers, and refactoring SSE streaming into a dedicated transport provider.

Changes:

  • Introduces ResourceSamplerRegistry (thread-safe) and TransportRegistry (protocol-keyed) with plugin registration hooks.
  • Refactors cyclic subscription creation to accept {collection, resource_path} from generalized parse_resource_uri() (with traversal protection) and to start delivery via transport providers.
  • Adds unit + integration tests covering registries, URI parsing, on-removed lifecycle hook, and multi-collection subscription flows.

Reviewed changes

Copilot reviewed 22 out of 22 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
src/ros2_medkit_integration_tests/test/features/test_multi_collection_subscriptions.test.py New integration tests validating create/error cases across multiple collections and protocols.
src/ros2_medkit_gateway/test/test_transport_registry.cpp New unit tests for transport registry lookup and shutdown behavior.
src/ros2_medkit_gateway/test/test_subscription_manager.cpp Updates tests for new create() signature; adds on_removed callback tests.
src/ros2_medkit_gateway/test/test_resource_sampler_registry.cpp New unit tests for sampler registration rules and concurrency.
src/ros2_medkit_gateway/test/test_cyclic_subscription_handlers.cpp Adds parse_resource_uri test coverage for multiple collections and traversal rejection.
src/ros2_medkit_gateway/src/subscription_transport.cpp Implements basic transport registry and shutdown_all delegating to SubscriptionManager.
src/ros2_medkit_gateway/src/subscription_manager.cpp Extends subscription info fields; adds on_removed hook; updates remove/expiry/shutdown lifecycle.
src/ros2_medkit_gateway/src/resource_sampler.cpp Implements thread-safe sampler registry with builtin/plugin registration policies.
src/ros2_medkit_gateway/src/plugins/plugin_manager.cpp Adds plugin APIs to register samplers/transports and wires registries into PluginManager.
src/ros2_medkit_gateway/src/http/rest_server.cpp Injects sampler/transport registries into cyclic subscription handlers; uses node-provided SSE tracker.
src/ros2_medkit_gateway/src/http/handlers/sse_transport_provider.cpp New SSE transport provider extracting former handler streaming logic and invoking samplers safely.
src/ros2_medkit_gateway/src/http/handlers/cyclic_subscription_handlers.cpp Generalizes resource URI parsing; validates collection/protocol; starts delivery via transport provider.
src/ros2_medkit_gateway/src/gateway_node.cpp Initializes registries, built-in samplers/transports, lifecycle wiring, and shutdown ordering.
src/ros2_medkit_gateway/include/ros2_medkit_gateway/subscription_transport.hpp New transport provider interface and registry.
src/ros2_medkit_gateway/include/ros2_medkit_gateway/subscription_manager.hpp Updates CyclicSubscriptionInfo; new create() signature; adds on_removed callback API.
src/ros2_medkit_gateway/include/ros2_medkit_gateway/resource_sampler.hpp New sampler function type + thread-safe sampler registry interface.
src/ros2_medkit_gateway/include/ros2_medkit_gateway/plugins/plugin_manager.hpp Exposes plugin APIs for sampler/transport registration + registry wiring.
src/ros2_medkit_gateway/include/ros2_medkit_gateway/http/handlers/sse_transport_provider.hpp Declares SSE transport provider implementation.
src/ros2_medkit_gateway/include/ros2_medkit_gateway/http/handlers/cyclic_subscription_handlers.hpp Adds ParsedResourceUri and parse_resource_uri() API; injects registries.
src/ros2_medkit_gateway/include/ros2_medkit_gateway/http/error_codes.hpp Adds vendor-specific error codes for URI/collection/protocol validation.
src/ros2_medkit_gateway/include/ros2_medkit_gateway/gateway_node.hpp Exposes registries and shared SSE client tracker from GatewayNode.
src/ros2_medkit_gateway/CMakeLists.txt Adds new sources and unit tests to build/test targets.

std::unique_lock lock(mutex_);

if (!is_builtin) {
if (collection.substr(0, 2) != "x-") {
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

collection.substr(0, 2) will throw std::out_of_range when collection.size() < 2 (e.g., empty string), causing an unexpected exception type and bypassing your intended error message. Guard on collection.size() >= 2 (or use starts_with("x-") if available) before calling substr.

Suggested change
if (collection.substr(0, 2) != "x-") {
if (collection.size() < 2 || collection.substr(0, 2) != "x-") {

Copilot uses AI. Check for mistakes.
Comment on lines +405 to +407
// Security: reject path traversal in resource_path
if (parsed.resource_path.find("/..") != std::string::npos || parsed.resource_path.find("../") != std::string::npos) {
return tl::make_unexpected("Resource path must not contain '..'");
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The traversal check is substring-based and can reject benign paths containing .. as part of a larger segment (e.g., /..foo), while still not precisely enforcing “segment equals '..'”. Prefer checking path segments (reject .. when it appears as a full segment, e.g., (^|/)\\.\\.(?=/|$)), which is both stricter and avoids false positives.

Suggested change
// Security: reject path traversal in resource_path
if (parsed.resource_path.find("/..") != std::string::npos || parsed.resource_path.find("../") != std::string::npos) {
return tl::make_unexpected("Resource path must not contain '..'");
// Security: reject path traversal in resource_path (only when '..' is a full path segment)
static const std::regex traversal_pattern(R"((^|/)\\.\\.(?=/|$))");
if (std::regex_search(parsed.resource_path, traversal_pattern)) {
return tl::make_unexpected("Resource path must not contain '..' as a path segment");

Copilot uses AI. Check for mistakes.
Comment on lines +118 to 123
/// Set callback invoked when a subscription is removed (remove, cleanup_expired, shutdown)
void set_on_removed(std::function<void(const CyclicSubscriptionInfo &)> callback);

/**
* @brief Wait for an update or removal on the given subscription.
*
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on_removed_ is read/invoked concurrently from remove(), cleanup_expired(), and shutdown(), but it can also be reassigned via set_on_removed() with no synchronization. This is a data race on std::function. Consider either (a) making set_on_removed() only callable during initialization (enforced/documented + not callable after threads start), or (b) protecting reads/writes with a mutex, or (c) storing it as an atomic std::shared_ptr<const Callback> and copying the pointer before invocation.

Copilot uses AI. Check for mistakes.
Comment on lines 142 to +143
std::atomic<bool> shutdown_flag_{false};
std::function<void(const CyclicSubscriptionInfo &)> on_removed_;
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on_removed_ is read/invoked concurrently from remove(), cleanup_expired(), and shutdown(), but it can also be reassigned via set_on_removed() with no synchronization. This is a data race on std::function. Consider either (a) making set_on_removed() only callable during initialization (enforced/documented + not callable after threads start), or (b) protecting reads/writes with a mutex, or (c) storing it as an atomic std::shared_ptr<const Callback> and copying the pointer before invocation.

Copilot uses AI. Check for mistakes.
Comment on lines +55 to +59
void SseTransportProvider::stop(const std::string & sub_id) {
std::lock_guard<std::mutex> lock(mutex_);
streams_.erase(sub_id);
// The SSE loop detects inactive via sub_mgr_.is_active() and exits.
}
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stop() does not actually stop an in-flight SSE stream if the subscription remains active: the streaming loop uses sub_mgr_.is_active() and a captured sampler (captured_sampler), so erasing streams_[sub_id] won’t affect existing connections. To make stop() effective, add a transport-local cancellation signal that the loop checks (e.g., a StreamState flag keyed by sub_id, or re-check streams_.contains(sub_id) each iteration) and have stop() flip that signal before returning.

Copilot uses AI. Check for mistakes.
Comment on lines +16 to +20

namespace ros2_medkit_gateway {

void TransportRegistry::register_transport(std::unique_ptr<SubscriptionTransportProvider> provider) {
auto proto = provider->protocol();
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

register_transport() silently overwrites an existing provider for the same protocol. That can lead to hard-to-debug behavior (e.g., a plugin accidentally replacing the built-in sse provider). Consider rejecting duplicates (throw/return error), or at least logging a warning and requiring an explicit “replace” API to intentionally override.

Suggested change
namespace ros2_medkit_gateway {
void TransportRegistry::register_transport(std::unique_ptr<SubscriptionTransportProvider> provider) {
auto proto = provider->protocol();
#include <iostream>
namespace ros2_medkit_gateway {
void TransportRegistry::register_transport(std::unique_ptr<SubscriptionTransportProvider> provider) {
auto proto = provider->protocol();
if (transports_.count(proto) > 0) {
std::cerr << "[ros2_medkit_gateway] Transport for protocol '" << proto
<< "' is already registered; ignoring duplicate registration." << std::endl;
return;
}

Copilot uses AI. Check for mistakes.
@bburda bburda requested a review from Copilot March 5, 2026 20:36
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 22 out of 22 changed files in this pull request and generated 5 comments.

Comment on lines +405 to +408
// Security: reject path traversal in resource_path
if (parsed.resource_path.find("/..") != std::string::npos || parsed.resource_path.find("../") != std::string::npos) {
return tl::make_unexpected("Resource path must not contain '..'");
}
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The path traversal check is overly broad and will reject legitimate paths like /... (contains \"/..\"), and it doesn't validate traversal by segment boundaries. Consider splitting resource_path into path segments and rejecting only segments that equal \"..\" (and ideally also rejecting URL-encoded dot-dot like %2e%2e if inputs can be encoded).

Copilot uses AI. Check for mistakes.
Comment on lines 157 to +159
// Create subscription
auto result = sub_mgr_.create(entity_id, entity_type, resource, *topic_result, protocol, interval, duration);
auto result = sub_mgr_.create(entity_id, entity_type, resource, parsed->collection, parsed->resource_path, protocol,
interval, duration);
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Subscriptions for the data collection appear to require a concrete ROS topic path, but parse_resource_uri() allows /api/v1/.../data with an empty resource_path. This can lead to creating a subscription that will always stream sampler errors at runtime. Suggest rejecting collection == \"data\" && resource_path.empty() (400 invalid resource URI), while still allowing empty paths for collections like faults that support collection-level sampling.

Copilot uses AI. Check for mistakes.
SubscriptionTransportProvider * get_transport(const std::string & protocol) const;
bool has_transport(const std::string & protocol) const;

/// Stop all active subscriptions across all transports.
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TransportRegistry::shutdown_all() is documented as stopping subscriptions across transports, but the current implementation delegates only to sub_mgr.shutdown() and doesn't interact with transports_ at all. Either (a) update the docstring to state that shutdown relies on SubscriptionManager::on_removed to stop transports, or (b) change shutdown_all() to actively stop each subscription via the appropriate provider (e.g., iterate active subscriptions, look up info.protocol, call provider->stop(info.id)), then shut down the manager.

Suggested change
/// Stop all active subscriptions across all transports.
/// Trigger shutdown of all managed subscriptions.
///
/// This function delegates to the given SubscriptionManager instance
/// (typically via SubscriptionManager::shutdown()). It relies on the
/// manager's on_removed / teardown callbacks to invoke the appropriate
/// SubscriptionTransportProvider::stop() for each active subscription and
/// protocol, rather than directly iterating transports_ here.

Copilot uses AI. Check for mistakes.
Comment on lines +19 to +30
void TransportRegistry::register_transport(std::unique_ptr<SubscriptionTransportProvider> provider) {
auto proto = provider->protocol();
transports_[proto] = std::move(provider);
}

SubscriptionTransportProvider * TransportRegistry::get_transport(const std::string & protocol) const {
auto it = transports_.find(protocol);
if (it == transports_.end()) {
return nullptr;
}
return it->second.get();
}
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TransportRegistry isn't thread-safe: register_transport() mutates transports_ without synchronization while get_transport() reads it. Since transports can be registered via plugins and are queried on request paths, protect the map with a mutex/shared_mutex (similar to ResourceSamplerRegistry) or clearly constrain registration to single-threaded startup only and enforce that contract.

Copilot uses AI. Check for mistakes.
Comment on lines +623 to +635
// 2. Stop all transport providers for active subscriptions
if (transport_registry_) {
transport_registry_->shutdown_all(*subscription_mgr_);
}
// 3. Shutdown subscription manager (marks all inactive, triggers on_removed for stragglers)
if (subscription_mgr_) {
subscription_mgr_->shutdown();
}
// 4. Shutdown plugins
if (plugin_mgr_) {
plugin_mgr_->shutdown_all();
}
// 5. Normal member destruction (managers safe - all transports stopped)
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

transport_registry_->shutdown_all(*subscription_mgr_) currently calls sub_mgr.shutdown(), and the destructor then calls subscription_mgr_->shutdown() again. This is redundant and makes shutdown sequencing harder to reason about (and could double-trigger removal callbacks if shutdown_all changes later). Consider doing shutdown in exactly one place (either make shutdown_all() actually stop transports without shutting down the manager, or remove the extra subscription_mgr_->shutdown() call).

Suggested change
// 2. Stop all transport providers for active subscriptions
if (transport_registry_) {
transport_registry_->shutdown_all(*subscription_mgr_);
}
// 3. Shutdown subscription manager (marks all inactive, triggers on_removed for stragglers)
if (subscription_mgr_) {
subscription_mgr_->shutdown();
}
// 4. Shutdown plugins
if (plugin_mgr_) {
plugin_mgr_->shutdown_all();
}
// 5. Normal member destruction (managers safe - all transports stopped)
// 2. Stop all transport providers for active subscriptions (also shuts down subscription manager)
if (transport_registry_) {
transport_registry_->shutdown_all(*subscription_mgr_);
}
// 3. Shutdown plugins
if (plugin_mgr_) {
plugin_mgr_->shutdown_all();
}
// 4. Normal member destruction (managers safe - all transports stopped)

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

cyclic-subscriptions/ should support all SOVD resource collections and vendor extensions

2 participants