Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changed (breaking)

- **Design 034 Phase 2 — MQTT knobs move out of core; `ConnectorConfig` pruned (Issue #134, [review doc §3.6](docs/design/034-technical-debt-review.md)).** Core's generic link builders drop `with_qos`/`with_retain` (`with_timeout_ms` stays, de-MQTT'd); the knobs now live in `aimdb-mqtt-connector` as the `MqttLinkExt` (qos, outbound + inbound) and `MqttOutboundLinkExt` (retain, publish-side only) extension traits, pushing the **same** `("qos", …)`/`("retain", …)` option keys the MQTT clients have always read — wire behavior unchanged; importing the trait makes the MQTT intent explicit at the call site (generic escape hatch: `with_config(key, value)`). `ConnectorConfig` loses its never-read typed `qos`/`retain` fields and the speculative Kafka/HTTP/shmem interpretation docs — it keeps `timeout_ms` + `protocol_options`, and core now documents no protocol that lacks an in-tree connector. ([aimdb-core](aimdb-core/CHANGELOG.md), [aimdb-mqtt-connector](aimdb-mqtt-connector/CHANGELOG.md))

- **Design 034 Phase 2 — panic-free builder validation: `build()` reports every configuration mistake at once (Issue #133, [review doc §3.4](docs/design/034-technical-debt-review.md)).** Builder methods never panic on user mistakes anymore. Conflicting `.source()`/`.transform()`/`.link_from()` registrations, missing serializers/deserializers, invalid connector URLs, unregistered schemes, and key-reused-with-different-type are *recorded* (the conflicting registration is skipped) and `build()` returns one `DbError::InvalidConfiguration { errors: Vec<ConfigError> }` carrying **all** findings — each with the record key and, where applicable, the connector URL. The worst panic — "requires a buffer" firing at spawn time inside a connector factory closure — is now a build()-time check, which also makes `.buffer()` after `.link_to()`/`.link_from()` legal (order-independent). Duplicate keys and dependency-graph cycles fold into the same collected report (previously distinct `DuplicateRecordKey`/`CyclicDependency` returns from `build()`). Remaining `panic!`/`expect`s in the builder path are internal invariants and say "this is a bug in aimdb-core". ([aimdb-core](aimdb-core/CHANGELOG.md))

- **Design 034 Phase 2 — registrar lifetime fix + de-erased builder internals (Issue #130, [review doc](docs/design/034-technical-debt-review.md)).** `RecordRegistrar`'s fluent methods now take fresh borrows (`&mut self -> &mut Self`) instead of borrowing the registrar for its entire lifetime — a `configure` closure can finally use separate statements (`reg.source_raw(…); reg.tap_raw(…);`) and reuse the registrar after a chain. `configure`'s closure bound drops its HRTB; `OutboundConnectorBuilder`/`InboundConnectorBuilder` gain a second lifetime parameter (`<'r, 'a, T, R>`); `RecordT::register` and the adapter/persistence extension traits follow. Internally, `AimDbBuilder` stores its spawn/start functions typed (`SpawnFnType<R>`/`StartFnType<R>`) instead of `Box<dyn Any>` — the panicking downcasts in `build()` are gone, and `AimDb<R>`'s struct-level bound moved to its impls. Closure-based user code compiles unchanged. ([aimdb-core](aimdb-core/CHANGELOG.md), [aimdb-embassy-adapter](aimdb-embassy-adapter/CHANGELOG.md), [aimdb-persistence](aimdb-persistence/CHANGELOG.md))
Expand Down
2 changes: 2 additions & 0 deletions aimdb-core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changed (breaking)

- **Phase 2 — MQTT knobs deleted from the generic link builders; `ConnectorConfig` pruned (Issue #134, [design doc §3.6](../docs/design/034-technical-debt-review.md)).** `OutboundConnectorBuilder::with_qos`/`with_retain` and `InboundConnectorBuilder::with_qos` are gone — use `aimdb-mqtt-connector`'s `MqttLinkExt`/`MqttOutboundLinkExt` (same option keys, wire-identical) or the generic `with_config(key, value)`. `with_timeout_ms` survives with protocol-neutral docs. `ConnectorConfig` drops its typed `qos: u8`/`retain: bool` fields (verified unread by every connector — MQTT reads `protocol_options`) and the Kafka/HTTP/shmem interpretation docs; it keeps `timeout_ms` + `protocol_options`. `OutboundConnectorBuilder`/`InboundConnectorBuilder` are now re-exported from the crate root (for the extension-trait impls).

- **Phase 2 — panic-free builder validation; `build()` collects every configuration mistake (Issue #133, [design doc §3.4](../docs/design/034-technical-debt-review.md)).** One failure model instead of two: builder methods stay infallible and never panic on user mistakes; `build()` performs all validation and returns one `DbError::InvalidConfiguration { errors: Vec<ConfigError> }` (new variant + new public `ConfigError { record_key, url, message }` type, error code `0x4003`) carrying **every** finding from the run. Specifics:
- `TypedRecord::set_producer`/`set_transform`/`add_inbound_connector` mutual-exclusion violations and duplicate producers/transforms: recorded, conflicting registration skipped (observable: `has_producer()` stays `false` after a conflicting `.source()`).
- `OutboundConnectorBuilder::finish()`/`InboundConnectorBuilder::finish()`: invalid URL, missing serializer/deserializer, unregistered scheme, and the transform/source conflicts are recorded with record key + URL; the link is not registered and the registrar is returned as usual.
Expand Down
5 changes: 4 additions & 1 deletion aimdb-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,10 @@ pub use builder::OutboundRoute;
pub use builder::{AimDb, AimDbBuilder};
pub use connector::ConnectorBuilder;
pub use transport::{Connector, ConnectorConfig, PublishError};
pub use typed_api::{Consumer, Producer, RecordRegistrar, RecordT, StageKind};
pub use typed_api::{
Consumer, InboundConnectorBuilder, OutboundConnectorBuilder, Producer, RecordRegistrar,
RecordT, StageKind,
};
pub use typed_record::{AnyRecord, AnyRecordExt, TypedRecord};

// JSON codec (feature `json-serialize`, no_std + alloc compatible)
Expand Down
94 changes: 31 additions & 63 deletions aimdb-core/src/transport.rs
Original file line number Diff line number Diff line change
@@ -1,40 +1,33 @@
//! Transport connector traits for MQTT, Kafka, HTTP, shmem, and other protocols
//! Transport connector traits for protocol-agnostic publishing
//!
//! Provides a generic `Connector` trait that enables scheme-based routing
//! to different transport protocols. Each connector manages a single connection
//! to a specific endpoint (e.g., one MQTT broker, one shared memory segment, etc.).
//! to a specific endpoint (e.g., one MQTT broker).
//!
//! # Design Philosophy
//!
//! - **Scheme-based routing**: URL scheme (mqtt://, shmem://, kafka://) determines which connector handles requests
//! - **Scheme-based routing**: the URL scheme (e.g. `mqtt://`, `knx://`) determines which connector handles requests
//! - **Single endpoint per connector**: Each connector connects to ONE broker/resource
//! - **Multi-transport publishing**: Same data can be published to multiple protocols
//! - **Protocol-agnostic core**: Core doesn't know about MQTT, Kafka, etc. - just routes by scheme
//! - **Protocol-agnostic core**: Core knows schemes and key/value options, never protocol semantics

use alloc::{boxed::Box, string::String, vec::Vec};
use core::future::Future;
use core::pin::Pin;

/// Protocol-agnostic connector configuration
///
/// Provides common configuration options that apply across multiple protocols.
/// Each protocol interprets these fields according to its semantics.
///
/// # Protocol Interpretation
///
/// - **MQTT**: qos=QoS level, retain=retain flag, timeout_ms=publish timeout
/// - **Kafka**: qos=acks setting (0=none, 1=leader, 2=all), timeout_ms=send timeout
/// - **HTTP**: qos=retry count, timeout_ms=request timeout
/// - **Shmem**: qos=priority, retain=pin in memory
/// Carries the route's key/value options to [`Connector::publish`]. Only the
/// genuinely protocol-agnostic `timeout_ms` is a typed field; every
/// protocol-specific knob (e.g. MQTT's `qos`/`retain`) travels in
/// [`protocol_options`](ConnectorConfig::protocol_options) and is interpreted
/// by the connector with its own defaults. Connector crates expose typed
/// setters as extension traits over the link builders (e.g. the MQTT
/// connector's `MqttLinkExt`).
#[derive(Debug, Clone)]
pub struct ConnectorConfig {
/// Quality of Service / reliability level (0, 1, or 2)
pub qos: u8,

/// Whether to retain/persist the message
pub retain: bool,

/// Optional timeout in milliseconds
/// Optional timeout in milliseconds for the publish/operation, as
/// interpreted by the connector
pub timeout_ms: Option<u32>,

/// Protocol-specific options as key-value pairs
Expand All @@ -45,8 +38,6 @@ pub struct ConnectorConfig {
impl Default for ConnectorConfig {
fn default() -> Self {
Self {
qos: 0,
retain: false,
timeout_ms: Some(5000),
protocol_options: Vec::new(),
}
Expand All @@ -60,16 +51,10 @@ impl ConnectorConfig {
/// per-route configuration through to [`Connector::publish`] without changing
/// the `publish` signature.
///
/// Only the protocol-agnostic `timeout_ms` is lifted into a typed field. The
/// `qos`/`retain` *meaning* differs per protocol (an MQTT QoS level vs. a
/// Kafka `acks` setting vs. an HTTP retry count — see the type docs), and a
/// `u8`/`bool` field cannot represent "unspecified", so these — and every
/// other key — are passed through verbatim in [`protocol_options`] for the
/// connector to interpret with its own defaults. The typed `qos`/`retain`
/// fields therefore keep their [`Default`] values here; they remain available
/// for callers that construct a [`ConnectorConfig`] directly.
///
/// [`protocol_options`]: ConnectorConfig::protocol_options
/// Only the protocol-agnostic `timeout_ms` is lifted into the typed field;
/// every other key is passed through verbatim in
/// [`protocol_options`](ConnectorConfig::protocol_options) for the
/// connector to interpret with its own defaults.
pub fn from_query(query: &[(String, String)]) -> ConnectorConfig {
let mut cfg = ConnectorConfig::default();
for (k, v) in query {
Expand Down Expand Up @@ -139,12 +124,8 @@ impl std::error::Error for PublishError {}

/// Generic transport connector trait for protocol-agnostic publishing
///
/// This trait enables multi-protocol publishing via scheme-based routing:
/// - `mqtt://topic` → MQTT broker
/// - `shmem://segment` → Shared memory
/// - `kafka://topic` → Kafka cluster
/// - `http://endpoint` → HTTP POST
/// - `dds://topic` → DDS topic
/// This trait enables multi-protocol publishing via scheme-based routing
/// (e.g. `mqtt://topic` → MQTT broker, `knx://1/0/6` → KNX group address).
///
/// Each connector manages ONE connection/endpoint. For multiple brokers/endpoints,
/// create multiple connectors and register them with different schemes.
Expand All @@ -159,30 +140,22 @@ impl std::error::Error for PublishError {}
/// config: &ConnectorConfig,
/// payload: &[u8],
/// ) -> Pin<Box<dyn Future<Output = Result<(), PublishError>> + Send + '_>> {
/// // Protocol knobs come from the route's key/value options,
/// // with connector-chosen defaults.
/// let qos = config
/// .protocol_options
/// .iter()
/// .find(|(k, _)| k == "qos")
/// .and_then(|(_, v)| v.parse::<u8>().ok())
/// .unwrap_or(1);
/// Box::pin(async move {
/// self.client.publish(destination, config.qos, config.retain, payload).await
/// self.client.publish(destination, qos, payload).await
/// .map_err(|_| PublishError::ConnectionFailed)
/// })
/// }
/// }
/// ```
///
/// # Usage
///
/// ```rust,ignore
/// let mqtt_connector = MqttConnector::new("mqtt://broker.local:1883").await?;
///
/// let db = AimDbBuilder::new()
/// .runtime(runtime)
/// .with_connector("mqtt", Arc::new(mqtt_connector))
/// .configure::<Temperature>(|reg| {
/// reg.link_to("mqtt://sensors/temp")
/// .with_qos(1)
/// .finish()
/// })
/// .build()?;
/// ```
///
/// # Thread Safety
///
/// Requires Send + Sync for Tokio compatibility. For Embassy (single-threaded),
Expand All @@ -191,12 +164,9 @@ pub trait Connector: Send + Sync {
/// Publish data to a protocol-specific destination
///
/// # Arguments
/// * `destination` - Protocol-specific path (no broker/host info):
/// - MQTT: "sensors/temperature"
/// - Shmem: "temp_readings"
/// - Kafka: "production/events"
/// - HTTP: "api/v1/sensors"
/// * `config` - Publishing configuration (QoS, retain, timeout, protocol options)
/// * `destination` - Protocol-specific path, no broker/host info
/// (e.g. an MQTT topic like "sensors/temperature")
/// * `config` - Publishing configuration (timeout + protocol options)
/// * `payload` - Message payload as byte slice
///
/// # Returns
Expand Down Expand Up @@ -239,8 +209,6 @@ mod tests {
#[test]
fn test_connector_config_default() {
let config = ConnectorConfig::default();
assert_eq!(config.qos, 0);
assert!(!config.retain);
assert_eq!(config.timeout_ms, Some(5000));
assert_eq!(config.protocol_options.len(), 0);
}
Expand Down
34 changes: 14 additions & 20 deletions aimdb-core/src/typed_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -703,19 +703,13 @@ where
self
}

/// Sets the MQTT Quality of Service level
pub fn with_qos(mut self, qos: u8) -> Self {
self.config.push(("qos".to_string(), qos.to_string()));
self
}

/// Sets the MQTT retain flag
pub fn with_retain(mut self, retain: bool) -> Self {
self.config.push(("retain".to_string(), retain.to_string()));
self
}

/// Sets the publish timeout in milliseconds
/// Sets the operation timeout in milliseconds (the connector interprets
/// it; passed as the `timeout_ms` option — see
/// `ConnectorConfig::from_query`)
///
/// Protocol-specific knobs (e.g. MQTT QoS/retain) are provided by the
/// connector crates as extension traits over this builder, or generically
/// via [`with_config`](Self::with_config).
pub fn with_timeout_ms(mut self, timeout_ms: u32) -> Self {
self.config
.push(("timeout_ms".to_string(), timeout_ms.to_string()));
Expand Down Expand Up @@ -992,13 +986,13 @@ where
self
}

/// Sets the MQTT Quality of Service level
pub fn with_qos(mut self, qos: u8) -> Self {
self.config.push(("qos".to_string(), qos.to_string()));
self
}

/// Sets the publish timeout in milliseconds
/// Sets the operation timeout in milliseconds (the connector interprets
/// it; passed as the `timeout_ms` option — see
/// `ConnectorConfig::from_query`)
///
/// Protocol-specific knobs (e.g. MQTT subscribe QoS) are provided by the
/// connector crates as extension traits over this builder, or generically
/// via [`with_config`](Self::with_config).
pub fn with_timeout_ms(mut self, timeout_ms: u32) -> Self {
self.config
.push(("timeout_ms".to_string(), timeout_ms.to_string()));
Expand Down
4 changes: 4 additions & 0 deletions aimdb-mqtt-connector/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- **`MqttLinkExt` / `MqttOutboundLinkExt` — the MQTT knobs, now where the protocol lives (Issue #134, design 034 §3.6).** New `link_ext` module (compiled on every feature leg, `alloc`-only) with extension traits over core's generic link builders: `MqttLinkExt::with_qos(u8)` on outbound *and* inbound links (publish / subscribe QoS), and `MqttOutboundLinkExt::with_retain(bool)` on outbound links only (retain is a publish-side flag). They push the exact `("qos", …)` / `("retain", …)` option keys both clients have always read from `protocol_options` — wire behavior identical to the deleted core methods; only an extra `use aimdb_mqtt_connector::{MqttLinkExt, MqttOutboundLinkExt};` is needed. The crate now declares `extern crate alloc` unconditionally.

### Changed

- **Connector-build errors carry their message on `no_std` too (Issue #129).** With `DbError` unified on `alloc::String`, the dual `#[cfg]` error-construction branches in both clients collapse to one `DbError::runtime_error(...)` expression; the Embassy client's "Failed to build MQTT connector" detail is no longer dropped on embedded targets. No API change.
Expand Down
11 changes: 8 additions & 3 deletions aimdb-mqtt-connector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
//! ```rust,ignore
//! use aimdb_core::AimDbBuilder;
//! use aimdb_tokio_adapter::TokioAdapter;
//! use aimdb_mqtt_connector::MqttConnector;
//! use aimdb_mqtt_connector::{MqttConnector, MqttLinkExt, MqttOutboundLinkExt};
//! use std::sync::Arc;
//!
//! let runtime = Arc::new(TokioAdapter::new()?);
Expand All @@ -26,8 +26,10 @@
//! .with_connector(MqttConnector::new("mqtt://localhost:1883"))
//! .configure::<Temperature>(|reg| {
//! reg.source(temperature_producer)
//! // Outbound: Publish to MQTT
//! // Outbound: Publish to MQTT (QoS/retain via MqttLinkExt traits)
//! .link_to("mqtt://sensors/temperature")
//! .with_qos(1)
//! .with_retain(false)
//! .with_serializer_raw(|t| {
//! serde_json::to_vec(t)
//! .map_err(|_| aimdb_core::connector::SerializeError::InvalidData)
Expand Down Expand Up @@ -71,9 +73,12 @@

#![cfg_attr(not(feature = "std"), no_std)]

#[cfg(not(feature = "std"))]
extern crate alloc;

// MQTT knobs over core's generic link builders (works on every feature leg)
pub mod link_ext;
pub use link_ext::{MqttLinkExt, MqttOutboundLinkExt};

// Platform-specific implementations
#[cfg(feature = "tokio-runtime")]
pub mod tokio_client;
Expand Down
68 changes: 68 additions & 0 deletions aimdb-mqtt-connector/src/link_ext.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
//! MQTT-specific knobs for the generic link builders
//!
//! Core's `OutboundConnectorBuilder`/`InboundConnectorBuilder` know schemes
//! and key/value options, never protocol semantics (design 034 §3.6). The
//! MQTT knobs live here as extension traits: importing them makes the MQTT
//! intent explicit at the call site, and the impls push the exact same
//! `("qos", …)` / `("retain", …)` option keys the MQTT clients have always
//! read from `protocol_options` — wire behavior is unchanged.
//!
//! ```rust,ignore
//! use aimdb_mqtt_connector::{MqttLinkExt, MqttOutboundLinkExt};
//!
//! reg.link_to("mqtt://sensors/temp")
//! .with_qos(1)
//! .with_retain(true)
//! .with_serializer_raw(serialize)
//! .finish();
//! ```

use aimdb_core::{InboundConnectorBuilder, OutboundConnectorBuilder};
use alloc::string::ToString;
use core::fmt::Debug;

/// MQTT knobs shared by outbound and inbound links.
pub trait MqttLinkExt: Sized {
/// Sets the MQTT Quality of Service level (0, 1, or 2).
///
/// Outbound: the publish QoS. Inbound: the subscribe QoS. Defaults to
/// QoS 1 when unset (the connectors' own default).
fn with_qos(self, qos: u8) -> Self;
}

/// MQTT knobs that only make sense when publishing.
pub trait MqttOutboundLinkExt: MqttLinkExt {
/// Sets the MQTT retain flag (broker keeps the last message for
/// late-joining subscribers). Defaults to `false` when unset.
fn with_retain(self, retain: bool) -> Self;
}

impl<'r, 'a, T, R> MqttLinkExt for OutboundConnectorBuilder<'r, 'a, T, R>
where
T: Send + Sync + 'static + Debug + Clone,
R: aimdb_executor::RuntimeAdapter + 'static,
{
fn with_qos(self, qos: u8) -> Self {
self.with_config("qos", &qos.to_string())
}
}

impl<'r, 'a, T, R> MqttOutboundLinkExt for OutboundConnectorBuilder<'r, 'a, T, R>
where
T: Send + Sync + 'static + Debug + Clone,
R: aimdb_executor::RuntimeAdapter + 'static,
{
fn with_retain(self, retain: bool) -> Self {
self.with_config("retain", if retain { "true" } else { "false" })
}
}

impl<'r, 'a, T, R> MqttLinkExt for InboundConnectorBuilder<'r, 'a, T, R>
where
T: Send + Sync + 'static + Debug + Clone,
R: aimdb_executor::RuntimeAdapter + 'static,
{
fn with_qos(self, qos: u8) -> Self {
self.with_config("qos", &qos.to_string())
}
}
Loading