Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 64 additions & 8 deletions docs/api/rest.rst
Original file line number Diff line number Diff line change
Expand Up @@ -933,11 +933,32 @@ Without such a plugin, all endpoints return ``501 Not Implemented``.
Cyclic Subscriptions
--------------------

Cyclic subscriptions provide periodic push-based data delivery via Server-Sent Events (SSE).
A client creates a subscription specifying which data resource to observe and at what interval.
The server then pushes the latest value of that resource at the requested frequency.
Cyclic subscriptions provide periodic push-based delivery of any SOVD resource collection
via Server-Sent Events (SSE). A client creates a subscription specifying the resource URI
(data, faults, configurations, communication-logs, or ``x-`` vendor extensions) and a
delivery interval. The server then pushes the latest value at the requested frequency.

Subscriptions are temporary — they do not survive server restart.
Subscriptions are temporary - they do not survive server restart.

**Supported collections:**

- ``data`` - Topic data (requires a resource path, e.g. ``/data/temperature``)
- ``faults`` - Fault list (resource path optional, e.g. ``/faults`` or ``/faults/fault_001``)
- ``configurations`` - Parameter values (resource path optional)
- ``communication-logs`` - Network protocol logs (SOVD "comlogs", not ``/logs``)
- ``x-*`` - Vendor extensions (e.g. ``x-medkit-metrics``)

.. note::

SOVD "Communication Logs" (``communication-logs``) are network protocol logs per
ISO 17978-3, **not** the gateway's ``/logs`` endpoints which serve application-level
log entries from ``/rosout``.

**Interval values:**

- ``fast`` - 50ms sampling period
- ``normal`` - 200ms sampling period (default)
- ``slow`` - 500ms sampling period

``POST /api/v1/{entity_type}/{entity_id}/cyclic-subscriptions``
Create a new cyclic subscription.
Expand All @@ -957,10 +978,22 @@ Subscriptions are temporary — they do not survive server restart.

**Fields:**

- ``resource`` (string, required): Full URI of the data resource to observe
- ``resource`` (string, required): Full SOVD resource URI to observe
(e.g. ``/api/v1/apps/{id}/data/{topic}``, ``/api/v1/apps/{id}/faults``)
- ``protocol`` (string, optional): Transport protocol. Only ``"sse"`` supported. Default: ``"sse"``
- ``interval`` (string, required): One of ``fast`` (<100ms), ``normal`` (100-250ms), ``slow`` (250-500ms)
- ``duration`` (integer, required): Subscription lifetime in seconds (must be > 0)
- ``interval`` (string, required): One of ``fast``, ``normal``, ``slow``
- ``duration`` (integer, required): Subscription lifetime in seconds.
Must be > 0 and <= ``sse.max_duration_sec`` (default: 3600)

**Error responses:**

- **400** ``invalid-parameter`` - Invalid interval, duration <= 0, or duration exceeds max
- **400** ``x-medkit-invalid-resource-uri`` - Malformed resource URI or path traversal
- **400** ``x-medkit-entity-mismatch`` - Resource URI references different entity than route
- **400** ``x-medkit-collection-not-supported`` - Entity doesn't support the collection
- **400** ``x-medkit-collection-not-available`` - No data provider registered for collection
- **400** ``x-medkit-unsupported-protocol`` - Requested protocol not available
- **503** ``service-unavailable`` - Max subscription capacity reached

**Response 201 Created:**

Expand All @@ -982,7 +1015,8 @@ Subscriptions are temporary — they do not survive server restart.

``PUT /api/v1/{entity_type}/{entity_id}/cyclic-subscriptions/{id}``
Update ``interval`` and/or ``duration`` of an existing subscription.
Only provided fields are updated.
Only provided fields are updated. Updating ``duration`` resets the
expiry timer from the current time (not from the original creation time).

**Request Body:**

Expand Down Expand Up @@ -1016,6 +1050,28 @@ Subscriptions are temporary — they do not survive server restart.
The stream auto-closes when the duration expires, the client disconnects,
or the subscription is deleted.

**Multi-collection examples:**

Subscribe to faults on a component:

.. code-block:: json

{
"resource": "/api/v1/components/ecu1/faults",
"interval": "slow",
"duration": 600
}

Subscribe to a specific configuration parameter:

.. code-block:: json

{
"resource": "/api/v1/apps/temp_sensor/configurations/calibration_offset",
"interval": "normal",
"duration": 120
}

Rate Limiting
-------------

Expand Down
6 changes: 6 additions & 0 deletions docs/config/server.rst
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,10 @@ Configure limits for SSE-based streaming (fault events and cyclic subscriptions)
- int
- ``100``
- Maximum number of active cyclic subscriptions across all entities. Returns HTTP 503 when this limit is reached.
* - ``sse.max_duration_sec``
- int
- ``3600``
- Maximum allowed subscription duration in seconds. Requests exceeding this are rejected with HTTP 400.

Example:

Expand All @@ -254,6 +258,7 @@ Example:
sse:
max_clients: 10
max_subscriptions: 100
max_duration_sec: 3600

Plugin Framework
----------------
Expand Down Expand Up @@ -364,6 +369,7 @@ Complete Example
sse:
max_clients: 10
max_subscriptions: 100
max_duration_sec: 3600

See Also
--------
Expand Down
35 changes: 35 additions & 0 deletions docs/tutorials/plugin-system.rst
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,41 @@ For entity-scoped endpoints, register a matching capability via ``register_capab
or ``register_entity_capability()`` in ``set_context()`` so the endpoint appears in the
entity's capabilities array in discovery responses.

Cyclic Subscription Extensions
-------------------------------

Plugins can extend cyclic subscriptions by registering custom resource samplers
and transport providers during ``set_context()``.

**Resource Samplers** provide the data for a collection when sampled by a subscription.
Register a sampler via ``ResourceSamplerRegistry``:

.. code-block:: cpp

// In set_context():
auto& sampler_registry = ctx_->get_sampler_registry();
sampler_registry.register_sampler("x-medkit-metrics",
[this](const std::string& entity_id, const std::string& resource_path)
-> tl::expected<nlohmann::json, std::string> {
return get_metrics(entity_id, resource_path);
});

Once registered, clients can create cyclic subscriptions on the ``x-medkit-metrics``
collection for any entity.

**Transport Providers** deliver subscription data via alternative protocols (beyond
the built-in SSE transport). Register via ``TransportRegistry``:

.. code-block:: cpp

auto& transport_registry = ctx_->get_transport_registry();
transport_registry.register_transport(
std::make_unique<MqttTransportProvider>(mqtt_client_));

The transport must implement ``SubscriptionTransportProvider`` (``start``, ``stop``,
``notify_update``, ``protocol()``). Clients specify the protocol in the subscription
creation request.

Multiple Plugins
----------------

Expand Down
18 changes: 18 additions & 0 deletions src/ros2_medkit_gateway/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ add_library(gateway_lib STATIC
src/fault_manager.cpp
src/log_manager.cpp
src/subscription_manager.cpp
src/resource_sampler.cpp
src/subscription_transport.cpp
# Entity resource model
src/models/entity_types.cpp
src/models/entity_capabilities.cpp
Expand Down Expand Up @@ -124,6 +126,7 @@ add_library(gateway_lib STATIC
src/http/handlers/bulkdata_handlers.cpp
src/http/handlers/sse_fault_handler.cpp
src/http/handlers/cyclic_subscription_handlers.cpp
src/http/handlers/sse_transport_provider.cpp
src/http/handlers/auth_handlers.cpp
# Bulk data storage
src/bulk_data_store.cpp
Expand Down Expand Up @@ -383,10 +386,22 @@ if(BUILD_TESTING)
ament_add_gtest(test_subscription_manager test/test_subscription_manager.cpp)
target_link_libraries(test_subscription_manager gateway_lib)

# Add resource sampler registry tests
ament_add_gtest(test_resource_sampler_registry test/test_resource_sampler_registry.cpp)
target_link_libraries(test_resource_sampler_registry gateway_lib)

# Add transport registry tests
ament_add_gtest(test_transport_registry test/test_transport_registry.cpp)
target_link_libraries(test_transport_registry gateway_lib)

# Add cyclic subscription handler tests
ament_add_gtest(test_cyclic_subscription_handlers test/test_cyclic_subscription_handlers.cpp)
target_link_libraries(test_cyclic_subscription_handlers gateway_lib)

# Add SSE transport provider tests
ament_add_gtest(test_sse_transport_provider test/test_sse_transport_provider.cpp)
target_link_libraries(test_sse_transport_provider gateway_lib)

# Add update manager tests
ament_add_gtest(test_update_manager test/test_update_manager.cpp)
target_link_libraries(test_update_manager gateway_lib)
Expand Down Expand Up @@ -488,7 +503,10 @@ if(BUILD_TESTING)
test_bulk_data_store
test_bulkdata_handlers
test_subscription_manager
test_resource_sampler_registry
test_transport_registry
test_cyclic_subscription_handlers
test_sse_transport_provider
test_update_manager
test_data_handlers
test_auth_handlers
Expand Down
5 changes: 5 additions & 0 deletions src/ros2_medkit_gateway/config/gateway_params.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ ros2_medkit_gateway:
# Default: 100
max_subscriptions: 100

# Maximum allowed subscription duration in seconds
# Subscriptions requesting a longer duration are rejected with HTTP 400
# Default: 3600 (1 hour)
max_duration_sec: 3600

# Logging Configuration
logs:
# Ring buffer capacity per node name.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,14 @@
#include "ros2_medkit_gateway/fault_manager.hpp"
#include "ros2_medkit_gateway/http/rate_limiter.hpp"
#include "ros2_medkit_gateway/http/rest_server.hpp"
#include "ros2_medkit_gateway/http/sse_client_tracker.hpp"
#include "ros2_medkit_gateway/log_manager.hpp"
#include "ros2_medkit_gateway/models/thread_safe_entity_cache.hpp"
#include "ros2_medkit_gateway/operation_manager.hpp"
#include "ros2_medkit_gateway/plugins/plugin_manager.hpp"
#include "ros2_medkit_gateway/resource_sampler.hpp"
#include "ros2_medkit_gateway/subscription_manager.hpp"
#include "ros2_medkit_gateway/subscription_transport.hpp"
#include "ros2_medkit_gateway/updates/update_manager.hpp"

namespace ros2_medkit_gateway {
Expand Down Expand Up @@ -114,6 +117,24 @@ class GatewayNode : public rclcpp::Node {
*/
PluginManager * get_plugin_manager() const;

/**
* @brief Get the ResourceSamplerRegistry instance
* @return Raw pointer to ResourceSamplerRegistry (valid for lifetime of GatewayNode)
*/
ResourceSamplerRegistry * get_sampler_registry() const;

/**
* @brief Get the TransportRegistry instance
* @return Raw pointer to TransportRegistry (valid for lifetime of GatewayNode)
*/
TransportRegistry * get_transport_registry() const;

/**
* @brief Get the SSEClientTracker instance
* @return Shared pointer to SSEClientTracker
*/
std::shared_ptr<SSEClientTracker> get_sse_client_tracker() const;

private:
void refresh_cache();
void start_rest_server();
Expand All @@ -137,6 +158,9 @@ class GatewayNode : public rclcpp::Node {
std::unique_ptr<LogManager> log_mgr_;
std::unique_ptr<BulkDataStore> bulk_data_store_;
std::unique_ptr<SubscriptionManager> subscription_mgr_;
std::unique_ptr<ResourceSamplerRegistry> sampler_registry_;
std::unique_ptr<TransportRegistry> transport_registry_;
std::shared_ptr<SSEClientTracker> sse_client_tracker_;
// IMPORTANT: plugin_mgr_ BEFORE update_mgr_ - C++ destroys in reverse order,
// so update_mgr_ waits for async tasks before plugin_mgr_ destroys the plugin.
// plugin_ctx_ is owned here (outlives plugins); plugin_mgr_ holds a non-owning ref.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,21 @@ constexpr const char * ERR_X_MEDKIT_UPDATE_NOT_PREPARED = "x-medkit-update-not-p
/// Automated mode not supported for this package
constexpr const char * ERR_X_MEDKIT_UPDATE_NOT_AUTOMATED = "x-medkit-update-not-automated";

/// Vendor-specific: invalid resource URI format for subscriptions
constexpr const char * ERR_X_MEDKIT_INVALID_RESOURCE_URI = "x-medkit-invalid-resource-uri";

/// Vendor-specific: collection not supported for entity type
constexpr const char * ERR_X_MEDKIT_COLLECTION_NOT_SUPPORTED = "x-medkit-collection-not-supported";

/// Vendor-specific: no data provider registered for collection
constexpr const char * ERR_X_MEDKIT_COLLECTION_NOT_AVAILABLE = "x-medkit-collection-not-available";

/// Vendor-specific: resource URI entity doesn't match route entity
constexpr const char * ERR_X_MEDKIT_ENTITY_MISMATCH = "x-medkit-entity-mismatch";

/// Vendor-specific: unsupported subscription protocol
constexpr const char * ERR_X_MEDKIT_UNSUPPORTED_PROTOCOL = "x-medkit-unsupported-protocol";

/**
* @brief Check if an error code is a vendor-specific code
* @param error_code Error code to check
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,21 @@

#include "ros2_medkit_gateway/http/handlers/handler_context.hpp"
#include "ros2_medkit_gateway/http/sse_client_tracker.hpp"
#include "ros2_medkit_gateway/resource_sampler.hpp"
#include "ros2_medkit_gateway/subscription_manager.hpp"
#include "ros2_medkit_gateway/subscription_transport.hpp"

namespace ros2_medkit_gateway {
namespace handlers {

/// Result of parsing a SOVD resource URI for subscription
struct ParsedResourceUri {
std::string entity_type; // "apps" or "components"
std::string entity_id;
std::string collection; // "data", "faults", "x-medkit-metrics", etc.
std::string resource_path; // path within collection (may be empty)
};

/**
* @brief HTTP handlers for cyclic subscription CRUD and SSE streaming.
*
Expand All @@ -41,7 +51,9 @@ namespace handlers {
class CyclicSubscriptionHandlers {
public:
CyclicSubscriptionHandlers(HandlerContext & ctx, SubscriptionManager & sub_mgr,
std::shared_ptr<SSEClientTracker> client_tracker);
std::shared_ptr<SSEClientTracker> client_tracker,
ResourceSamplerRegistry & sampler_registry, TransportRegistry & transport_registry,
int max_duration_sec);

/// POST /{entity}/cyclic-subscriptions — create subscription
void handle_create(const httplib::Request & req, httplib::Response & res);
Expand All @@ -64,19 +76,22 @@ class CyclicSubscriptionHandlers {
/// Convert subscription info to JSON response
static nlohmann::json subscription_to_json(const CyclicSubscriptionInfo & info, const std::string & event_source);

/// Parse resource URI to extract entity type, entity id, collection, and resource path.
static tl::expected<ParsedResourceUri, std::string> parse_resource_uri(const std::string & resource);

private:
/// Build event_source URI from subscription info
static std::string build_event_source(const CyclicSubscriptionInfo & info);

/// Extract entity type string ("apps" or "components") from request path
static std::string extract_entity_type(const httplib::Request & req);

/// Parse resource URI to extract topic name. Returns topic or error.
static tl::expected<std::string, std::string> parse_resource_uri(const std::string & resource);

HandlerContext & ctx_;
SubscriptionManager & sub_mgr_;
std::shared_ptr<SSEClientTracker> client_tracker_;
ResourceSamplerRegistry & sampler_registry_;
TransportRegistry & transport_registry_;
int max_duration_sec_;

/// Keepalive interval for SSE streams
static constexpr int kKeepaliveIntervalSec = 15;
Expand Down
Loading