diff --git a/CHANGELOG.md b/CHANGELOG.md index 974ee89..f354f08 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -44,6 +44,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed (breaking) +- **Design 034 Phase 2 — MQTT knobs move out of core; `ConnectorConfig` pruned (Issue #134, [review doc §3.6](docs/design/034-technical-debt-review.md)).** Core's generic link builders drop `with_qos`/`with_retain` (`with_timeout_ms` stays, de-MQTT'd); the knobs now live in `aimdb-mqtt-connector` as the `MqttLinkExt` (qos, outbound + inbound) and `MqttOutboundLinkExt` (retain, publish-side only) extension traits, pushing the **same** `("qos", …)`/`("retain", …)` option keys the MQTT clients have always read — wire behavior unchanged; importing the trait makes the MQTT intent explicit at the call site (generic escape hatch: `with_config(key, value)`). `ConnectorConfig` loses its never-read typed `qos`/`retain` fields and the speculative Kafka/HTTP/shmem interpretation docs — it keeps `timeout_ms` + `protocol_options`, and core now documents no protocol that lacks an in-tree connector. ([aimdb-core](aimdb-core/CHANGELOG.md), [aimdb-mqtt-connector](aimdb-mqtt-connector/CHANGELOG.md)) + - **Design 034 Phase 2 — panic-free builder validation: `build()` reports every configuration mistake at once (Issue #133, [review doc §3.4](docs/design/034-technical-debt-review.md)).** Builder methods never panic on user mistakes anymore. Conflicting `.source()`/`.transform()`/`.link_from()` registrations, missing serializers/deserializers, invalid connector URLs, unregistered schemes, and key-reused-with-different-type are *recorded* (the conflicting registration is skipped) and `build()` returns one `DbError::InvalidConfiguration { errors: Vec }` carrying **all** findings — each with the record key and, where applicable, the connector URL. The worst panic — "requires a buffer" firing at spawn time inside a connector factory closure — is now a build()-time check, which also makes `.buffer()` after `.link_to()`/`.link_from()` legal (order-independent). Duplicate keys and dependency-graph cycles fold into the same collected report (previously distinct `DuplicateRecordKey`/`CyclicDependency` returns from `build()`). Remaining `panic!`/`expect`s in the builder path are internal invariants and say "this is a bug in aimdb-core". ([aimdb-core](aimdb-core/CHANGELOG.md)) - **Design 034 Phase 2 — registrar lifetime fix + de-erased builder internals (Issue #130, [review doc](docs/design/034-technical-debt-review.md)).** `RecordRegistrar`'s fluent methods now take fresh borrows (`&mut self -> &mut Self`) instead of borrowing the registrar for its entire lifetime — a `configure` closure can finally use separate statements (`reg.source_raw(…); reg.tap_raw(…);`) and reuse the registrar after a chain. `configure`'s closure bound drops its HRTB; `OutboundConnectorBuilder`/`InboundConnectorBuilder` gain a second lifetime parameter (`<'r, 'a, T, R>`); `RecordT::register` and the adapter/persistence extension traits follow. Internally, `AimDbBuilder` stores its spawn/start functions typed (`SpawnFnType`/`StartFnType`) instead of `Box` — the panicking downcasts in `build()` are gone, and `AimDb`'s struct-level bound moved to its impls. Closure-based user code compiles unchanged. ([aimdb-core](aimdb-core/CHANGELOG.md), [aimdb-embassy-adapter](aimdb-embassy-adapter/CHANGELOG.md), [aimdb-persistence](aimdb-persistence/CHANGELOG.md)) diff --git a/aimdb-core/CHANGELOG.md b/aimdb-core/CHANGELOG.md index 6796104..692ccab 100644 --- a/aimdb-core/CHANGELOG.md +++ b/aimdb-core/CHANGELOG.md @@ -43,6 +43,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed (breaking) +- **Phase 2 — MQTT knobs deleted from the generic link builders; `ConnectorConfig` pruned (Issue #134, [design doc §3.6](../docs/design/034-technical-debt-review.md)).** `OutboundConnectorBuilder::with_qos`/`with_retain` and `InboundConnectorBuilder::with_qos` are gone — use `aimdb-mqtt-connector`'s `MqttLinkExt`/`MqttOutboundLinkExt` (same option keys, wire-identical) or the generic `with_config(key, value)`. `with_timeout_ms` survives with protocol-neutral docs. `ConnectorConfig` drops its typed `qos: u8`/`retain: bool` fields (verified unread by every connector — MQTT reads `protocol_options`) and the Kafka/HTTP/shmem interpretation docs; it keeps `timeout_ms` + `protocol_options`. `OutboundConnectorBuilder`/`InboundConnectorBuilder` are now re-exported from the crate root (for the extension-trait impls). + - **Phase 2 — panic-free builder validation; `build()` collects every configuration mistake (Issue #133, [design doc §3.4](../docs/design/034-technical-debt-review.md)).** One failure model instead of two: builder methods stay infallible and never panic on user mistakes; `build()` performs all validation and returns one `DbError::InvalidConfiguration { errors: Vec }` (new variant + new public `ConfigError { record_key, url, message }` type, error code `0x4003`) carrying **every** finding from the run. Specifics: - `TypedRecord::set_producer`/`set_transform`/`add_inbound_connector` mutual-exclusion violations and duplicate producers/transforms: recorded, conflicting registration skipped (observable: `has_producer()` stays `false` after a conflicting `.source()`). - `OutboundConnectorBuilder::finish()`/`InboundConnectorBuilder::finish()`: invalid URL, missing serializer/deserializer, unregistered scheme, and the transform/source conflicts are recorded with record key + URL; the link is not registered and the registrar is returned as usual. diff --git a/aimdb-core/src/lib.rs b/aimdb-core/src/lib.rs index fb899ae..97fc896 100644 --- a/aimdb-core/src/lib.rs +++ b/aimdb-core/src/lib.rs @@ -81,7 +81,10 @@ pub use builder::OutboundRoute; pub use builder::{AimDb, AimDbBuilder}; pub use connector::ConnectorBuilder; pub use transport::{Connector, ConnectorConfig, PublishError}; -pub use typed_api::{Consumer, Producer, RecordRegistrar, RecordT, StageKind}; +pub use typed_api::{ + Consumer, InboundConnectorBuilder, OutboundConnectorBuilder, Producer, RecordRegistrar, + RecordT, StageKind, +}; pub use typed_record::{AnyRecord, AnyRecordExt, TypedRecord}; // JSON codec (feature `json-serialize`, no_std + alloc compatible) diff --git a/aimdb-core/src/transport.rs b/aimdb-core/src/transport.rs index 99813ae..dfab5b4 100644 --- a/aimdb-core/src/transport.rs +++ b/aimdb-core/src/transport.rs @@ -1,15 +1,15 @@ -//! Transport connector traits for MQTT, Kafka, HTTP, shmem, and other protocols +//! Transport connector traits for protocol-agnostic publishing //! //! Provides a generic `Connector` trait that enables scheme-based routing //! to different transport protocols. Each connector manages a single connection -//! to a specific endpoint (e.g., one MQTT broker, one shared memory segment, etc.). +//! to a specific endpoint (e.g., one MQTT broker). //! //! # Design Philosophy //! -//! - **Scheme-based routing**: URL scheme (mqtt://, shmem://, kafka://) determines which connector handles requests +//! - **Scheme-based routing**: the URL scheme (e.g. `mqtt://`, `knx://`) determines which connector handles requests //! - **Single endpoint per connector**: Each connector connects to ONE broker/resource //! - **Multi-transport publishing**: Same data can be published to multiple protocols -//! - **Protocol-agnostic core**: Core doesn't know about MQTT, Kafka, etc. - just routes by scheme +//! - **Protocol-agnostic core**: Core knows schemes and key/value options, never protocol semantics use alloc::{boxed::Box, string::String, vec::Vec}; use core::future::Future; @@ -17,24 +17,17 @@ use core::pin::Pin; /// Protocol-agnostic connector configuration /// -/// Provides common configuration options that apply across multiple protocols. -/// Each protocol interprets these fields according to its semantics. -/// -/// # Protocol Interpretation -/// -/// - **MQTT**: qos=QoS level, retain=retain flag, timeout_ms=publish timeout -/// - **Kafka**: qos=acks setting (0=none, 1=leader, 2=all), timeout_ms=send timeout -/// - **HTTP**: qos=retry count, timeout_ms=request timeout -/// - **Shmem**: qos=priority, retain=pin in memory +/// Carries the route's key/value options to [`Connector::publish`]. Only the +/// genuinely protocol-agnostic `timeout_ms` is a typed field; every +/// protocol-specific knob (e.g. MQTT's `qos`/`retain`) travels in +/// [`protocol_options`](ConnectorConfig::protocol_options) and is interpreted +/// by the connector with its own defaults. Connector crates expose typed +/// setters as extension traits over the link builders (e.g. the MQTT +/// connector's `MqttLinkExt`). #[derive(Debug, Clone)] pub struct ConnectorConfig { - /// Quality of Service / reliability level (0, 1, or 2) - pub qos: u8, - - /// Whether to retain/persist the message - pub retain: bool, - - /// Optional timeout in milliseconds + /// Optional timeout in milliseconds for the publish/operation, as + /// interpreted by the connector pub timeout_ms: Option, /// Protocol-specific options as key-value pairs @@ -45,8 +38,6 @@ pub struct ConnectorConfig { impl Default for ConnectorConfig { fn default() -> Self { Self { - qos: 0, - retain: false, timeout_ms: Some(5000), protocol_options: Vec::new(), } @@ -60,16 +51,10 @@ impl ConnectorConfig { /// per-route configuration through to [`Connector::publish`] without changing /// the `publish` signature. /// - /// Only the protocol-agnostic `timeout_ms` is lifted into a typed field. The - /// `qos`/`retain` *meaning* differs per protocol (an MQTT QoS level vs. a - /// Kafka `acks` setting vs. an HTTP retry count — see the type docs), and a - /// `u8`/`bool` field cannot represent "unspecified", so these — and every - /// other key — are passed through verbatim in [`protocol_options`] for the - /// connector to interpret with its own defaults. The typed `qos`/`retain` - /// fields therefore keep their [`Default`] values here; they remain available - /// for callers that construct a [`ConnectorConfig`] directly. - /// - /// [`protocol_options`]: ConnectorConfig::protocol_options + /// Only the protocol-agnostic `timeout_ms` is lifted into the typed field; + /// every other key is passed through verbatim in + /// [`protocol_options`](ConnectorConfig::protocol_options) for the + /// connector to interpret with its own defaults. pub fn from_query(query: &[(String, String)]) -> ConnectorConfig { let mut cfg = ConnectorConfig::default(); for (k, v) in query { @@ -139,12 +124,8 @@ impl std::error::Error for PublishError {} /// Generic transport connector trait for protocol-agnostic publishing /// -/// This trait enables multi-protocol publishing via scheme-based routing: -/// - `mqtt://topic` → MQTT broker -/// - `shmem://segment` → Shared memory -/// - `kafka://topic` → Kafka cluster -/// - `http://endpoint` → HTTP POST -/// - `dds://topic` → DDS topic +/// This trait enables multi-protocol publishing via scheme-based routing +/// (e.g. `mqtt://topic` → MQTT broker, `knx://1/0/6` → KNX group address). /// /// Each connector manages ONE connection/endpoint. For multiple brokers/endpoints, /// create multiple connectors and register them with different schemes. @@ -159,30 +140,22 @@ impl std::error::Error for PublishError {} /// config: &ConnectorConfig, /// payload: &[u8], /// ) -> Pin> + Send + '_>> { +/// // Protocol knobs come from the route's key/value options, +/// // with connector-chosen defaults. +/// let qos = config +/// .protocol_options +/// .iter() +/// .find(|(k, _)| k == "qos") +/// .and_then(|(_, v)| v.parse::().ok()) +/// .unwrap_or(1); /// Box::pin(async move { -/// self.client.publish(destination, config.qos, config.retain, payload).await +/// self.client.publish(destination, qos, payload).await /// .map_err(|_| PublishError::ConnectionFailed) /// }) /// } /// } /// ``` /// -/// # Usage -/// -/// ```rust,ignore -/// let mqtt_connector = MqttConnector::new("mqtt://broker.local:1883").await?; -/// -/// let db = AimDbBuilder::new() -/// .runtime(runtime) -/// .with_connector("mqtt", Arc::new(mqtt_connector)) -/// .configure::(|reg| { -/// reg.link_to("mqtt://sensors/temp") -/// .with_qos(1) -/// .finish() -/// }) -/// .build()?; -/// ``` -/// /// # Thread Safety /// /// Requires Send + Sync for Tokio compatibility. For Embassy (single-threaded), @@ -191,12 +164,9 @@ pub trait Connector: Send + Sync { /// Publish data to a protocol-specific destination /// /// # Arguments - /// * `destination` - Protocol-specific path (no broker/host info): - /// - MQTT: "sensors/temperature" - /// - Shmem: "temp_readings" - /// - Kafka: "production/events" - /// - HTTP: "api/v1/sensors" - /// * `config` - Publishing configuration (QoS, retain, timeout, protocol options) + /// * `destination` - Protocol-specific path, no broker/host info + /// (e.g. an MQTT topic like "sensors/temperature") + /// * `config` - Publishing configuration (timeout + protocol options) /// * `payload` - Message payload as byte slice /// /// # Returns @@ -239,8 +209,6 @@ mod tests { #[test] fn test_connector_config_default() { let config = ConnectorConfig::default(); - assert_eq!(config.qos, 0); - assert!(!config.retain); assert_eq!(config.timeout_ms, Some(5000)); assert_eq!(config.protocol_options.len(), 0); } diff --git a/aimdb-core/src/typed_api.rs b/aimdb-core/src/typed_api.rs index 2302439..ca63e74 100644 --- a/aimdb-core/src/typed_api.rs +++ b/aimdb-core/src/typed_api.rs @@ -703,19 +703,13 @@ where self } - /// Sets the MQTT Quality of Service level - pub fn with_qos(mut self, qos: u8) -> Self { - self.config.push(("qos".to_string(), qos.to_string())); - self - } - - /// Sets the MQTT retain flag - pub fn with_retain(mut self, retain: bool) -> Self { - self.config.push(("retain".to_string(), retain.to_string())); - self - } - - /// Sets the publish timeout in milliseconds + /// Sets the operation timeout in milliseconds (the connector interprets + /// it; passed as the `timeout_ms` option — see + /// `ConnectorConfig::from_query`) + /// + /// Protocol-specific knobs (e.g. MQTT QoS/retain) are provided by the + /// connector crates as extension traits over this builder, or generically + /// via [`with_config`](Self::with_config). pub fn with_timeout_ms(mut self, timeout_ms: u32) -> Self { self.config .push(("timeout_ms".to_string(), timeout_ms.to_string())); @@ -992,13 +986,13 @@ where self } - /// Sets the MQTT Quality of Service level - pub fn with_qos(mut self, qos: u8) -> Self { - self.config.push(("qos".to_string(), qos.to_string())); - self - } - - /// Sets the publish timeout in milliseconds + /// Sets the operation timeout in milliseconds (the connector interprets + /// it; passed as the `timeout_ms` option — see + /// `ConnectorConfig::from_query`) + /// + /// Protocol-specific knobs (e.g. MQTT subscribe QoS) are provided by the + /// connector crates as extension traits over this builder, or generically + /// via [`with_config`](Self::with_config). pub fn with_timeout_ms(mut self, timeout_ms: u32) -> Self { self.config .push(("timeout_ms".to_string(), timeout_ms.to_string())); diff --git a/aimdb-mqtt-connector/CHANGELOG.md b/aimdb-mqtt-connector/CHANGELOG.md index 068f0a9..8507ee8 100644 --- a/aimdb-mqtt-connector/CHANGELOG.md +++ b/aimdb-mqtt-connector/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- **`MqttLinkExt` / `MqttOutboundLinkExt` — the MQTT knobs, now where the protocol lives (Issue #134, design 034 §3.6).** New `link_ext` module (compiled on every feature leg, `alloc`-only) with extension traits over core's generic link builders: `MqttLinkExt::with_qos(u8)` on outbound *and* inbound links (publish / subscribe QoS), and `MqttOutboundLinkExt::with_retain(bool)` on outbound links only (retain is a publish-side flag). They push the exact `("qos", …)` / `("retain", …)` option keys both clients have always read from `protocol_options` — wire behavior identical to the deleted core methods; only an extra `use aimdb_mqtt_connector::{MqttLinkExt, MqttOutboundLinkExt};` is needed. The crate now declares `extern crate alloc` unconditionally. + ### Changed - **Connector-build errors carry their message on `no_std` too (Issue #129).** With `DbError` unified on `alloc::String`, the dual `#[cfg]` error-construction branches in both clients collapse to one `DbError::runtime_error(...)` expression; the Embassy client's "Failed to build MQTT connector" detail is no longer dropped on embedded targets. No API change. diff --git a/aimdb-mqtt-connector/src/lib.rs b/aimdb-mqtt-connector/src/lib.rs index d8ac4f9..69919ac 100644 --- a/aimdb-mqtt-connector/src/lib.rs +++ b/aimdb-mqtt-connector/src/lib.rs @@ -16,7 +16,7 @@ //! ```rust,ignore //! use aimdb_core::AimDbBuilder; //! use aimdb_tokio_adapter::TokioAdapter; -//! use aimdb_mqtt_connector::MqttConnector; +//! use aimdb_mqtt_connector::{MqttConnector, MqttLinkExt, MqttOutboundLinkExt}; //! use std::sync::Arc; //! //! let runtime = Arc::new(TokioAdapter::new()?); @@ -26,8 +26,10 @@ //! .with_connector(MqttConnector::new("mqtt://localhost:1883")) //! .configure::(|reg| { //! reg.source(temperature_producer) -//! // Outbound: Publish to MQTT +//! // Outbound: Publish to MQTT (QoS/retain via MqttLinkExt traits) //! .link_to("mqtt://sensors/temperature") +//! .with_qos(1) +//! .with_retain(false) //! .with_serializer_raw(|t| { //! serde_json::to_vec(t) //! .map_err(|_| aimdb_core::connector::SerializeError::InvalidData) @@ -71,9 +73,12 @@ #![cfg_attr(not(feature = "std"), no_std)] -#[cfg(not(feature = "std"))] extern crate alloc; +// MQTT knobs over core's generic link builders (works on every feature leg) +pub mod link_ext; +pub use link_ext::{MqttLinkExt, MqttOutboundLinkExt}; + // Platform-specific implementations #[cfg(feature = "tokio-runtime")] pub mod tokio_client; diff --git a/aimdb-mqtt-connector/src/link_ext.rs b/aimdb-mqtt-connector/src/link_ext.rs new file mode 100644 index 0000000..066b83b --- /dev/null +++ b/aimdb-mqtt-connector/src/link_ext.rs @@ -0,0 +1,68 @@ +//! MQTT-specific knobs for the generic link builders +//! +//! Core's `OutboundConnectorBuilder`/`InboundConnectorBuilder` know schemes +//! and key/value options, never protocol semantics (design 034 §3.6). The +//! MQTT knobs live here as extension traits: importing them makes the MQTT +//! intent explicit at the call site, and the impls push the exact same +//! `("qos", …)` / `("retain", …)` option keys the MQTT clients have always +//! read from `protocol_options` — wire behavior is unchanged. +//! +//! ```rust,ignore +//! use aimdb_mqtt_connector::{MqttLinkExt, MqttOutboundLinkExt}; +//! +//! reg.link_to("mqtt://sensors/temp") +//! .with_qos(1) +//! .with_retain(true) +//! .with_serializer_raw(serialize) +//! .finish(); +//! ``` + +use aimdb_core::{InboundConnectorBuilder, OutboundConnectorBuilder}; +use alloc::string::ToString; +use core::fmt::Debug; + +/// MQTT knobs shared by outbound and inbound links. +pub trait MqttLinkExt: Sized { + /// Sets the MQTT Quality of Service level (0, 1, or 2). + /// + /// Outbound: the publish QoS. Inbound: the subscribe QoS. Defaults to + /// QoS 1 when unset (the connectors' own default). + fn with_qos(self, qos: u8) -> Self; +} + +/// MQTT knobs that only make sense when publishing. +pub trait MqttOutboundLinkExt: MqttLinkExt { + /// Sets the MQTT retain flag (broker keeps the last message for + /// late-joining subscribers). Defaults to `false` when unset. + fn with_retain(self, retain: bool) -> Self; +} + +impl<'r, 'a, T, R> MqttLinkExt for OutboundConnectorBuilder<'r, 'a, T, R> +where + T: Send + Sync + 'static + Debug + Clone, + R: aimdb_executor::RuntimeAdapter + 'static, +{ + fn with_qos(self, qos: u8) -> Self { + self.with_config("qos", &qos.to_string()) + } +} + +impl<'r, 'a, T, R> MqttOutboundLinkExt for OutboundConnectorBuilder<'r, 'a, T, R> +where + T: Send + Sync + 'static + Debug + Clone, + R: aimdb_executor::RuntimeAdapter + 'static, +{ + fn with_retain(self, retain: bool) -> Self { + self.with_config("retain", if retain { "true" } else { "false" }) + } +} + +impl<'r, 'a, T, R> MqttLinkExt for InboundConnectorBuilder<'r, 'a, T, R> +where + T: Send + Sync + 'static + Debug + Clone, + R: aimdb_executor::RuntimeAdapter + 'static, +{ + fn with_qos(self, qos: u8) -> Self { + self.with_config("qos", &qos.to_string()) + } +} diff --git a/aimdb-mqtt-connector/tests/link_ext_tests.rs b/aimdb-mqtt-connector/tests/link_ext_tests.rs new file mode 100644 index 0000000..a7602ad --- /dev/null +++ b/aimdb-mqtt-connector/tests/link_ext_tests.rs @@ -0,0 +1,79 @@ +//! Tests for the MQTT link extension traits (`MqttLinkExt` / `MqttOutboundLinkExt`) +//! +//! Issue #134 moved `with_qos`/`with_retain` out of core's generic link +//! builders into this crate. These tests pin down the wire-compat contract: +//! the extension methods push exactly the `("qos", …)` / `("retain", …)` +//! option keys the MQTT clients read from `protocol_options`. + +#![cfg(feature = "tokio-runtime")] + +use aimdb_core::buffer::BufferCfg; +use aimdb_core::AimDbBuilder; +use aimdb_mqtt_connector::{MqttConnector, MqttLinkExt, MqttOutboundLinkExt}; +use aimdb_tokio_adapter::{TokioAdapter, TokioRecordRegistrarExt}; +use std::sync::Arc; + +#[derive(Clone, Debug)] +struct Reading { + #[allow(dead_code)] + value: f32, +} + +#[tokio::test] +async fn outbound_knobs_push_exact_config_keys() { + let runtime = Arc::new(TokioAdapter::new().unwrap()); + let mut builder = AimDbBuilder::new() + .runtime(runtime) + .with_connector(MqttConnector::new("mqtt://localhost:1883").with_client_id("link-ext")); + + builder.configure::("test.reading.out", |reg| { + reg.buffer(BufferCfg::SingleLatest) + .link_to("mqtt://sensors/reading") + .with_qos(2) + .with_retain(true) + .with_serializer_raw(|_r: &Reading| Ok(vec![0u8])) + .finish(); + }); + + let (db, _runner) = builder.build().await.expect("build must succeed"); + + let id = db.inner().resolve_str("test.reading.out").unwrap(); + let record = db.inner().storage(id).unwrap(); + let config = &record.outbound_connectors()[0].config; + + assert!( + config.contains(&("qos".to_string(), "2".to_string())), + "expected (qos, 2) in {config:?}" + ); + assert!( + config.contains(&("retain".to_string(), "true".to_string())), + "expected (retain, true) in {config:?}" + ); +} + +#[tokio::test] +async fn inbound_qos_pushes_exact_config_key() { + let runtime = Arc::new(TokioAdapter::new().unwrap()); + let mut builder = AimDbBuilder::new() + .runtime(runtime) + .with_connector(MqttConnector::new("mqtt://localhost:1883").with_client_id("link-ext-in")); + + builder.configure::("test.reading.in", |reg| { + reg.buffer(BufferCfg::SingleLatest) + .link_from("mqtt://commands/reading") + .with_qos(0) + .with_deserializer_raw(|_b: &[u8]| Ok(Reading { value: 0.0 })) + .finish(); + }); + + let (db, _runner) = builder.build().await.expect("build must succeed"); + + let id = db.inner().resolve_str("test.reading.in").unwrap(); + let record = db.inner().storage(id).unwrap(); + let config = &record.inbound_connectors()[0].config; + + assert!( + config.contains(&("qos".to_string(), "0".to_string())), + "expected (qos, 0) in {config:?}" + ); +} diff --git a/examples/tokio-mqtt-connector-demo/src/main.rs b/examples/tokio-mqtt-connector-demo/src/main.rs index 4c00cf3..34f3258 100644 --- a/examples/tokio-mqtt-connector-demo/src/main.rs +++ b/examples/tokio-mqtt-connector-demo/src/main.rs @@ -36,6 +36,7 @@ use aimdb_core::buffer::BufferCfg; use aimdb_core::{AimDbBuilder, DbResult, Producer, RecordKey, RuntimeContext}; +use aimdb_mqtt_connector::{MqttLinkExt, MqttOutboundLinkExt}; use aimdb_tokio_adapter::{TokioAdapter, TokioRecordRegistrarExt}; use std::sync::Arc; @@ -146,12 +147,17 @@ async fn main() -> DbResult<()> { .with_client_id("tokio-demo-multi-sensor"), ); - // Temperature sensors (outbound to MQTT) - using link_address() from key metadata + // Temperature sensors (outbound to MQTT) - using link_address() from key metadata. + // The MQTT knobs come from the connector crate's MqttLinkExt/ + // MqttOutboundLinkExt extension traits (design 034 §3.6); QoS 1 / no + // retain matches the connector defaults. builder.configure::(SensorKey::TempIndoor, |reg| { reg.buffer(BufferCfg::SpmcRing { capacity: 10 }) .source(indoor_temp_producer) .tap(temperature_logger) .link_to(SensorKey::TempIndoor.link_address().unwrap()) + .with_qos(1) + .with_retain(false) .with_serializer_raw(|temp: &Temperature| Ok(temp.to_json_vec())) .finish(); }); @@ -179,6 +185,7 @@ async fn main() -> DbResult<()> { reg.buffer(BufferCfg::SpmcRing { capacity: 10 }) .tap(command_consumer) .link_from(CommandKey::TempIndoor.link_address().unwrap()) + .with_qos(1) // subscribe QoS via MqttLinkExt .with_deserializer_raw(|data: &[u8]| TemperatureCommand::from_json(data)) .finish(); });