diff --git a/.cargo/audit.toml b/.cargo/audit.toml index aa46f986..29e76b0f 100644 --- a/.cargo/audit.toml +++ b/.cargo/audit.toml @@ -3,9 +3,4 @@ [advisories] # Advisories to ignore (transitive dependencies we can't fix) -ignore = [ - # rustls-webpki 0.102.x CRL issue - no patch in 0.102.x line - # Transitive via rumqttc, waiting for upstream fix - # Low impact: requires compromised CA to exploit - "RUSTSEC-2026-0049", -] +ignore = [] diff --git a/CHANGELOG.md b/CHANGELOG.md index 50669c00..82cf5693 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- **M17 — centralized Embassy connector spine: one audited home for the single-core `unsafe` ([Design 033](docs/design/033-M17-unify-connectors-drop-send.md)).** New `aimdb-embassy-adapter::connectors` module (features `connectors` / `connector-io`) collects the force-`Send` plumbing every Embassy connector used to hand-roll: session transports get `EmbassySessionClient`/`EmbassySessionServer`, `OneShotDialer`/`OneShotListener`/`OneShotCell`, and the framed `EmbassyConnection` + `Framer`; data-plane transports get the `EmbassySink`/`EmbassySource` bridges (over `EmbassySinkRaw`/`EmbassySourceRaw`) that ride core's existing `pump_sink`/`pump_source`, plus `into_box_future` for protocol tasks. The serial Embassy half is now thin sugar (just a COBS `Framer`) with **zero `unsafe`** (down from a 407-line hand-roll with 7 `unsafe impl`s); the MQTT and KNX Embassy halves dropped their hand-rolled publisher/router loops and `SendFutureWrapper` use to ride core's pumps (KNX inbound telegrams now flow through `pump_source`). All connector-crate `unsafe`/`SendFutureWrapper` is gone — confined to the adapter. The std/Tokio side, `aimdb-client`, the WebSocket server, examples, and tests are unchanged. (Chosen over Design 033's original "drop `Send` from the contract", which would have pushed `!Send` onto the std side; see the doc's Implementation Decision.) ([aimdb-embassy-adapter](aimdb-embassy-adapter/CHANGELOG.md), [aimdb-serial-connector](aimdb-serial-connector/CHANGELOG.md), [aimdb-mqtt-connector](aimdb-mqtt-connector/CHANGELOG.md), [aimdb-knx-connector](aimdb-knx-connector/CHANGELOG.md)) + - **Remote access via connectors — Phases 0–6: converge four hand-rolled networking stacks onto two shared engines (Issue #39, [design doc](docs/design/remote-access-via-connectors.md)).** AimX remote access (and any future transport) now rides the connector layer instead of a bespoke I/O abstraction. New, runtime-neutral `aimdb-core::session` module (feature `connector-session`, `no_std + alloc`): the three-layer substrate (`Connection`/`Listener`/`Dialer` + `EnvelopeCodec` + `Dispatch`/`Session`), the reactive **server** engine (`serve`/`run_session`) and proactive **client** engine (`run_client`/`pump_client`), the `pump_sink`/`pump_source` data-plane toolkit, and the transport-agnostic `SessionClientConnector`/`SessionServerConnector`. The AimX-v2 NDJSON protocol (`session::aimx`: `AimxCodec` + `AimxDispatch`) and the WebSocket connector are ports onto this substrate, so the AimX server/client and WS server/client stacks collapse onto the two engines. New **`aimdb-uds-connector`** crate carries the UDS transport (`UdsClient`/`UdsServer`). ([aimdb-core](aimdb-core/CHANGELOG.md), [aimdb-uds-connector](aimdb-uds-connector/CHANGELOG.md), [aimdb-websocket-connector](aimdb-websocket-connector/CHANGELOG.md), [aimdb-client](aimdb-client/CHANGELOG.md), [aimdb-mqtt-connector](aimdb-mqtt-connector/CHANGELOG.md), [aimdb-knx-connector](aimdb-knx-connector/CHANGELOG.md), [aimdb-embassy-adapter](aimdb-embassy-adapter/CHANGELOG.md)) - **M16 — JSON codec extracted behind the `json-serialize` feature; `RecordValue::as_json()` now works on `no_std + alloc`, not just `std` ([Design 032](docs/design/032-M16-aimx-json-codec.md)).** New `aimdb-core::codec` module: `RemoteSerialize` (blanket-impl'd for every `serde` `Serialize + DeserializeOwned` type), the object-safe `JsonCodec`, and the zero-sized `SerdeJsonCodec`. `serde_json` runs on `alloc`, so embedded targets can opt in; `std` enables the feature transitively, so std builds are unaffected. ([aimdb-core](aimdb-core/CHANGELOG.md)) - **Embassy buffer + join-queue tests now run in CI (Issue #85).** The join-queue tests previously sat behind `embassy-runtime`, which pulls `embassy-executor`'s cortex-m assembly and can't compile under `cargo test` on x86_64 — so ordering / backpressure / clone-routing regressions were never caught. The `join_queue` module is now gated on `embassy-sync`, and `make test` runs the embassy adapter's unit tests + doctests on the host (no executor). Also adds `EmbassyBuffer::peek()` and fixes a stale `EmbassyBuffer` doc example. ([aimdb-embassy-adapter](aimdb-embassy-adapter/CHANGELOG.md)) diff --git a/Cargo.lock b/Cargo.lock index c542c3a2..5aa914d1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -135,6 +135,7 @@ dependencies = [ "embedded-hal 0.2.7", "embedded-hal-async", "embedded-hal-nb", + "embedded-io-async 0.7.0", "futures", "futures-core", "heapless 0.9.1", @@ -3246,7 +3247,7 @@ dependencies = [ [[package]] name = "stm32-metapac" version = "21.0.0" -source = "git+https://github.com/embassy-rs/stm32-data-generated?tag=stm32-data-fb9d6e5e432ef51eaa686940d34b3487a50537a4#1a73224a4aaecff9b5ba9906595e12d21f34111d" +source = "git+https://github.com/embassy-rs/stm32-data-generated?tag=stm32-data-efbc4aab23acca680b52fda1f70c82cdca01a43f#76ef43967717719cf5b2f01b9a148ef5bac970b2" dependencies = [ "cortex-m", "cortex-m-rt", diff --git a/Makefile b/Makefile index a9b2cfc3..3ea61cef 100644 --- a/Makefile +++ b/Makefile @@ -143,8 +143,8 @@ test: cargo test --package aimdb-tokio-adapter --features "tokio-runtime,tracing,metrics" @printf "$(YELLOW) → Testing tokio adapter (with profiling)$(NC)\n" cargo test --package aimdb-tokio-adapter --features "tokio-runtime,tracing,profiling" - @printf "$(YELLOW) → Testing embassy adapter (host, no executor: buffers, join-queue, doctests)$(NC)\n" - cargo test --package aimdb-embassy-adapter --no-default-features --features "alloc,embassy-sync,embassy-time" + @printf "$(YELLOW) → Testing embassy adapter (host, no executor: buffers, join-queue, connector spine, doctests)$(NC)\n" + cargo test --package aimdb-embassy-adapter --no-default-features --features "alloc,embassy-sync,embassy-time,connectors" @printf "$(YELLOW) → Testing sync wrapper$(NC)\n" cargo test --package aimdb-sync @printf "$(YELLOW) → Testing codegen library$(NC)\n" @@ -338,6 +338,8 @@ test-embedded: cargo check --package aimdb-embassy-adapter --target thumbv7em-none-eabihf --target-dir $(EMBEDDED_CHECK_TARGET_DIR) --no-default-features --features "embassy-runtime,profiling" @printf "$(YELLOW) → Checking aimdb-embassy-adapter with metrics on thumbv7em-none-eabihf target$(NC)\n" cargo check --package aimdb-embassy-adapter --target thumbv7em-none-eabihf --target-dir $(EMBEDDED_CHECK_TARGET_DIR) --no-default-features --features "embassy-runtime,metrics" + @printf "$(YELLOW) → Checking aimdb-embassy-adapter connector spine (connector-io) on thumbv7em-none-eabihf target$(NC)\n" + cargo check --package aimdb-embassy-adapter --target thumbv7em-none-eabihf --target-dir $(EMBEDDED_CHECK_TARGET_DIR) --no-default-features --features "embassy-runtime,connector-io" @printf "$(YELLOW) → Checking aimdb-mqtt-connector (Embassy) on thumbv7em-none-eabihf target$(NC)\n" cargo check --package aimdb-mqtt-connector --target thumbv7em-none-eabihf --target-dir $(EMBEDDED_CHECK_TARGET_DIR) --no-default-features --features "embassy-runtime" @printf "$(YELLOW) → Checking aimdb-mqtt-connector (Embassy + defmt) on thumbv7em-none-eabihf target$(NC)\n" diff --git a/_external/embassy b/_external/embassy index 6c284434..b047838a 160000 --- a/_external/embassy +++ b/_external/embassy @@ -1 +1 @@ -Subproject commit 6c28443489ad5940ab8c1824c090b1f8a7233bf6 +Subproject commit b047838a34c2a8cf4a5129cde5c118205b1628e8 diff --git a/aimdb-embassy-adapter/CHANGELOG.md b/aimdb-embassy-adapter/CHANGELOG.md index d739070a..061b4198 100644 --- a/aimdb-embassy-adapter/CHANGELOG.md +++ b/aimdb-embassy-adapter/CHANGELOG.md @@ -9,6 +9,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- **M17 — centralized Embassy connector spines: the one audited home for the single-core `unsafe` ([Design 033](../docs/design/033-M17-unify-connectors-drop-send.md)).** New `connectors` module (features `connectors` / `connector-io`) collecting the force-`Send` plumbing every Embassy connector used to hand-roll, so a connector crate carries **no `unsafe` and no `SendFutureWrapper`**: + - **Session spine** — `EmbassySessionClient` / `EmbassySessionServer` (the Embassy duals of core's `SessionClientConnector` / `SessionServerConnector`), the one-shot `OneShotDialer` / `OneShotListener` over a moved-in peripheral connection (the listener parks forever after the first accept — point-to-point), and the force-`Send + Sync` `OneShotCell` for builders holding a moved-in value. `EmbassySessionClient::new` defaults to `reconnect: false` (unlike `ClientConfig::default`): a one-shot dialer can't redial, so the engine would otherwise spin on `TransportError::Io` forever; a re-dialable transport opts back in via `with_config`. + - **Framed connection** (feature `connector-io`) — `EmbassyConnection` over `embedded-io-async` `Read`/`Write` halves with a pluggable `Framer` (COBS, length-prefix, …), chunking writes for atomic-or-error HAL `BufferedUart`s and skip-and-resync on undecodable runs. + - **Data-plane bridges** — `EmbassySinkRaw` / `EmbassySourceRaw` (the `!Send` duals of core's `Connector` / `Source`) with the force-`Send` `EmbassySink` / `EmbassySource` wrappers so an Embassy connector rides core's `pump_sink` / `pump_source` unchanged, plus `into_box_future` for long-lived `!Send` protocol tasks. + - Every `unsafe impl` cites one module-level invariant (single-core, cooperative Embassy executor — no preemption, no thread migration); the module is gated to `no_std` builds so the force-`Send` types can't leak into std/Tokio code. Host smoke tests (`tests/connectors_smoke.rs`) cover the one-shot cell/listener semantics and `serve` over the one-shot listener. - **Wall-clock anchor — `EmbassyAdapter::set_unix_time(now_unix_secs)` + `TimeOps::unix_time()` (Issue #120).** Embassy has no real-time clock, so `unix_time()` returns `None` until the device learns the real time (NTP / GPS / host handshake) and calls `set_unix_time` once (Unix **seconds**); thereafter it derives absolute `(secs, nanos)` from Embassy's monotonic uptime plus the anchor. The anchor is a process-global `u32` (Unix seconds at boot) — a natively-atomic word on Cortex-M, no `portable-atomic` / critical-section needed — with sub-second precision taken from uptime. Gated on `embassy-time`. - **`SendFutureWrapper` — shared force-`Send` wrapper for Embassy data-plane connectors (Issue #39).** New `pub` type (`no_std` only): asserts `Send` for a future driven exclusively by a single-core, cooperative Embassy executor, so an Embassy connector's `!Send` data-plane futures (over `NoopRawMutex` channels) satisfy the connector spine's `Send` `BoxFuture` bound. Consolidates the identical wrappers that the KNX and MQTT Embassy clients previously each hand-rolled. - **Session-engine smoke test on the Embassy clock (Issue #39, Phase 5, [design doc](../docs/design/remote-access-via-connectors.md)).** New `tests/session_smoke.rs` drives `aimdb-core`'s runtime-neutral `run_client` engine using the `EmbassyAdapter`'s `TimeOps` clock for reconnect backoff / keepalive — proving the shared session engines run on Embassy, not just Tokio. Dev-only: pulls in `aimdb-core` with the `connector-session` feature, so the normal `no_std` lib build and the `thumbv7em` cross-checks stay `alloc`-only. diff --git a/aimdb-embassy-adapter/Cargo.toml b/aimdb-embassy-adapter/Cargo.toml index 023db4cf..18d8a093 100644 --- a/aimdb-embassy-adapter/Cargo.toml +++ b/aimdb-embassy-adapter/Cargo.toml @@ -20,6 +20,9 @@ alloc = [] embassy-runtime = ["embassy-executor", "embassy-time", "embassy-sync"] embassy-net-support = ["embassy-net"] # Network stack support for connectors +connectors = ["aimdb-core/connector-session"] +connector-io = ["connectors", "dep:embedded-io-async"] + # Observability features (no_std compatible) tracing = ["aimdb-core/tracing", "dep:tracing"] @@ -51,6 +54,9 @@ aimdb-core = { version = "1.1.0", path = "../aimdb-core", default-features = fal # Stream trait for bidirectional connectors (minimal, no_std compatible) futures-core = { version = "0.3", default-features = false } +# Generic framed `Connection` over async UART halves (feature `connector-io`). +embedded-io-async = { workspace = true, optional = true } + # Embassy ecosystem for embedded async embassy-executor = { workspace = true, optional = true } embassy-time = { workspace = true, optional = true } diff --git a/aimdb-embassy-adapter/src/connectors.rs b/aimdb-embassy-adapter/src/connectors.rs new file mode 100644 index 00000000..f03bacef --- /dev/null +++ b/aimdb-embassy-adapter/src/connectors.rs @@ -0,0 +1,479 @@ +//! Centralized Embassy connector spines — the one audited home for the +//! single-core `unsafe` + [`SendFutureWrapper`](crate::SendFutureWrapper) that +//! every Embassy connector used to hand-roll. +//! +//! AimDB's connector contract is `Send`-everywhere (so a Tokio app can +//! `tokio::spawn(runner.run())`). Embassy's primitives (UART halves over +//! `embedded-io-async`, channels over `NoopRawMutex`, …) are `!Send` *by design* +//! — single-core, cooperative, no preemption or thread migration. Bridging the +//! two requires force-`Send`ing the Embassy futures; this module does that +//! **once**, so a connector crate carries **no `unsafe` and no wrapper**: +//! +//! - **Session transports** (serial, TCP, …) contribute a [`Framer`] (or a +//! [`Connection`]) and wrap it in [`EmbassySessionClient`] / +//! [`EmbassySessionServer`] — the Embassy duals of core's +//! `SessionClientConnector` / `SessionServerConnector`. +//! - **Data-plane transports** (MQTT, KNX) contribute an [`EmbassySinkRaw`] +//! (outbound) and/or [`EmbassySourceRaw`] (inbound) and ride core's existing +//! [`pump_sink`](aimdb_core::session::pump_sink) / +//! [`pump_source`](aimdb_core::session::pump_source) via the force-`Send` +//! bridges [`EmbassySink`] / [`EmbassySource`]. +//! +//! # Safety invariant (shared by every `unsafe impl` below) +//! +//! An Embassy executor runs cooperatively on a single core with no preemption or +//! thread migration, so the wrapped `!Send` values are never actually accessed +//! from another thread. Only use these spines under an Embassy executor. + +use core::cell::RefCell; +use core::future::Future; +use core::pin::Pin; + +use alloc::boxed::Box; +use alloc::string::{String, ToString}; +use alloc::sync::Arc; +use alloc::vec; +use alloc::vec::Vec; + +use aimdb_core::connector::ConnectorBuilder; +use aimdb_core::session::{ + pump_client, run_client, serve, BoxFut, ClientConfig, Connection, Dialer, Dispatch, + EnvelopeCodec, Listener, Payload, SessionConfig, Source, TransportError, TransportResult, +}; +use aimdb_core::transport::{Connector, ConnectorConfig, PublishError}; +use aimdb_core::{AimDb, DbError, DbResult, RuntimeAdapter}; +use aimdb_executor::TimeOps; + +use crate::SendFutureWrapper; + +/// The scheme a spine registers when the connector gives none (matches core's +/// `SessionClientConnector` default). +pub const DEFAULT_SCHEME: &str = "remote"; + +/// The runner's collected future type (`Send`, as the std contract requires). +type BoxFuture = Pin + Send + 'static>>; +/// The `ConnectorBuilder::build` return shape. +type BuildFuture<'a> = Pin>> + Send + 'a>>; + +/// The spine's one-shot peripheral was already consumed — `build` ran twice. The +/// framework calls it once, so this is unreachable in practice. +fn connector_consumed() -> DbError { + DbError::MissingConfiguration { + #[cfg(feature = "std")] + parameter: String::from("embassy connector already built"), + #[cfg(not(feature = "std"))] + _parameter: (), + } +} + +// =========================================================================== +// Data-plane bridges — let a `!Send` sink/source ride core's pumps. +// =========================================================================== + +/// The pure outbound I/O a data-plane connector contributes: publish one payload. +/// The `!Send` dual of [`Connector`]; [`EmbassySink`] force-`Send`s it so it can +/// drive core's [`pump_sink`](aimdb_core::session::pump_sink). +/// +/// Args are owned (a data-plane sink enqueues owned data onto its channel anyway), +/// so the returned future borrows only `&self` — matching [`Connector::publish`]'s +/// `'_` return shape. +pub trait EmbassySinkRaw { + /// Publish `payload` to `destination` (e.g. enqueue onto an Embassy channel). + fn publish( + &self, + destination: String, + config: ConnectorConfig, + payload: Vec, + ) -> impl Future>; +} + +/// Force-`Send` bridge turning an [`EmbassySinkRaw`] into a [`Connector`], so an +/// Embassy outbound sink rides core's [`pump_sink`](aimdb_core::session::pump_sink) +/// unchanged. +pub struct EmbassySink(pub C); + +// SAFETY: single-core cooperative Embassy executor — see the module-level invariant. +unsafe impl Send for EmbassySink {} +// SAFETY: same invariant; `Connector` is shared behind `Arc`. +unsafe impl Sync for EmbassySink {} + +impl Connector for EmbassySink { + fn publish( + &self, + destination: &str, + config: &ConnectorConfig, + payload: &[u8], + ) -> Pin> + Send + '_>> { + // Own the args so the inner future borrows only `&self` (see trait doc). + Box::pin(SendFutureWrapper(self.0.publish( + destination.to_string(), + config.clone(), + payload.to_vec(), + ))) + } +} + +/// The pure inbound I/O a data-plane connector contributes: yield the next +/// `(topic, payload)`. The `!Send` dual of [`Source`]; [`EmbassySource`] +/// force-`Send`s it so it can drive core's +/// [`pump_source`](aimdb_core::session::pump_source). +pub trait EmbassySourceRaw { + /// Yield the next `(topic, payload)`, or `None` when the source is done. + fn next(&mut self) -> impl Future>; +} + +/// Force-`Send` bridge turning an [`EmbassySourceRaw`] into a [`Source`], so an +/// Embassy inbound stream rides core's +/// [`pump_source`](aimdb_core::session::pump_source) unchanged. +pub struct EmbassySource(pub S); + +// SAFETY: single-core cooperative Embassy executor — see the module-level invariant. +unsafe impl Send for EmbassySource {} + +impl Source for EmbassySource { + fn next(&mut self) -> BoxFut<'_, Option<(String, Payload)>> { + Box::pin(SendFutureWrapper(self.0.next())) + } +} + +/// Force-`Send` + box a connector's long-lived **protocol task** (an MQTT broker +/// manager, a KNX tunnelling state machine, …) so it can join the runner's +/// `Send` future set without the connector touching [`SendFutureWrapper`]. +pub fn into_box_future(fut: F) -> BoxFuture +where + F: Future + 'static, +{ + Box::pin(SendFutureWrapper(fut)) +} + +// =========================================================================== +// Session spine — the Embassy duals of `SessionClientConnector` / `…Server`. +// =========================================================================== + +/// A force-`Send + Sync` one-shot cell. Holds a moved-in value behind interior +/// mutability so a [`ConnectorBuilder`] (which is `Send + Sync`) can take it once +/// from `&self` in `build` — without the connector crate writing any `unsafe`. +/// +/// Use it when a session-server connector holds a moved-in peripheral/connection +/// it hands to a [`OneShotListener`] at build time (the moved-in dual of the +/// Tokio server's re-bindable listener factory). +pub struct OneShotCell { + inner: RefCell>, +} + +// SAFETY: single-core cooperative Embassy executor — see the module-level invariant. +unsafe impl Send for OneShotCell {} +// SAFETY: same invariant; the `RefCell` is never borrowed from another thread. +unsafe impl Sync for OneShotCell {} + +impl OneShotCell { + /// Hold `value` for a single [`take`](Self::take). + pub fn new(value: C) -> Self { + Self { + inner: RefCell::new(Some(value)), + } + } + + /// Take the value, or `None` if already taken (i.e. `build` ran twice). + pub fn take(&self) -> Option { + self.inner.borrow_mut().take() + } + + /// Take the value, or the canonical "already built" [`DbError`] — the shared + /// error every spine returns when `build` is (impossibly) called twice. + pub fn take_required(&self) -> DbResult { + self.take().ok_or_else(connector_consumed) + } +} + +/// One-shot [`Dialer`] over a pre-built, moved-in `Connection` (an Embassy +/// peripheral can't be re-acquired, so it dials exactly once; keep reconnect +/// disabled — [`EmbassySessionClient::new`]'s default). +pub struct OneShotDialer { + conn: OneShotCell, +} + +impl OneShotDialer { + /// Wrap a pre-built connection to be handed out on the first `connect`. + pub fn new(conn: C) -> Self { + Self { + conn: OneShotCell::new(conn), + } + } +} + +impl Dialer for OneShotDialer { + fn connect(&self) -> BoxFut<'_, TransportResult>> { + Box::pin(SendFutureWrapper(async move { + self.conn + .take() + .map(|c| Box::new(c) as Box) + .ok_or(TransportError::Io) + })) + } +} + +/// One-shot [`Listener`] over a pre-built, moved-in `Connection`: the first +/// `accept` hands it out; later calls park forever (point-to-point peripheral). +pub struct OneShotListener { + conn: Option, +} + +// SAFETY: single-core cooperative Embassy executor — see the module-level invariant. +unsafe impl Send for OneShotListener {} + +impl OneShotListener { + /// Wrap a pre-built connection to be handed out on the first `accept`. + pub fn new(conn: C) -> Self { + Self { conn: Some(conn) } + } +} + +impl Listener for OneShotListener { + fn accept(&mut self) -> BoxFut<'_, TransportResult>> { + Box::pin(SendFutureWrapper(async move { + match self.conn.take() { + Some(c) => Ok(Box::new(c) as Box), + // Point-to-point: no second peer ever arrives. + None => core::future::pending().await, + } + })) + } +} + +/// Embassy dual of `SessionClientConnector`: dials a peer with `D`, speaks codec +/// `C`, mirrors records under a [`scheme`](ConnectorBuilder::scheme). A transport +/// crate wraps it in a one-line sugar constructor (e.g. `SerialClient`). +pub struct EmbassySessionClient { + scheme: String, + // The moved-in dialer behind the force-`Send + Sync` cell, so the builder is + // auto `Send + Sync` (with a `Send + Sync` codec) — no `unsafe` here. + dialer: OneShotCell, + codec: C, + config: ClientConfig, +} + +impl EmbassySessionClient { + /// Mirror records over `dialer`, framing with `codec`. Scheme defaults to + /// [`DEFAULT_SCHEME`]. + /// + /// Reconnect is **disabled** by default (unlike [`ClientConfig::default`]): + /// an Embassy dialer typically wraps a moved-in peripheral ([`OneShotDialer`]) + /// that can't be re-acquired, so redialing would spin on [`TransportError::Io`] + /// forever. A transport whose dialer really can redial opts back in via + /// [`with_config`](Self::with_config). + pub fn new(dialer: D, codec: C) -> Self { + Self { + scheme: DEFAULT_SCHEME.to_string(), + dialer: OneShotCell::new(dialer), + codec, + config: ClientConfig { + reconnect: false, + ..ClientConfig::default() + }, + } + } + + /// Override the scheme this connector registers. + pub fn scheme(mut self, scheme: impl Into) -> Self { + self.scheme = scheme.into(); + self + } + + /// Override the client engine config (reconnect, keepalive, …). Only enable + /// `reconnect` if the dialer can actually redial (a [`OneShotDialer`] can't). + pub fn with_config(mut self, config: ClientConfig) -> Self { + self.config = config; + self + } +} + +impl ConnectorBuilder for EmbassySessionClient +where + R: TimeOps + 'static, + D: Dialer + 'static, + C: EnvelopeCodec + Clone + 'static, +{ + fn build<'a>(&'a self, db: &'a AimDb) -> BuildFuture<'a> { + Box::pin(SendFutureWrapper(async move { + let dialer = self.dialer.take_required()?; + let (handle, engine) = run_client( + dialer, + self.codec.clone(), + self.config.clone(), + db.runtime_arc(), + ); + // One pump future per route; each holds a `ClientHandle` clone, so the + // engine stays alive as long as any mirror runs. + let mut futures = pump_client(db, &self.scheme, &handle); + futures.push(engine); + Ok(futures) + })) + } + + fn scheme(&self) -> &str { + &self.scheme + } +} + +/// Embassy dual of `SessionServerConnector`: serves a moved-in [`Listener`] with +/// a dispatch produced from the live db, speaking codec `C`, under a +/// [`scheme`](ConnectorBuilder::scheme). +pub struct EmbassySessionServer { + scheme: String, + // Moved-in listener behind the force-`Send + Sync` cell, so the builder is + // auto `Send + Sync` (with a `Send + Sync` codec + factory) — no `unsafe` here. + listener: OneShotCell, + codec: C, + dispatch_factory: DF, + config: SessionConfig, +} + +impl EmbassySessionServer { + /// Serve `listener` with the dispatch built by `dispatch_factory` from the + /// live db, framing with `codec`. Scheme defaults to [`DEFAULT_SCHEME`]. + pub fn new(listener: L, codec: C, dispatch_factory: DF, config: SessionConfig) -> Self { + Self { + scheme: DEFAULT_SCHEME.to_string(), + listener: OneShotCell::new(listener), + codec, + dispatch_factory, + config, + } + } + + /// Override the scheme this connector registers. + pub fn scheme(mut self, scheme: impl Into) -> Self { + self.scheme = scheme.into(); + self + } +} + +impl ConnectorBuilder for EmbassySessionServer +where + R: RuntimeAdapter + 'static, + L: Listener + 'static, + C: EnvelopeCodec + Clone + 'static, + DF: Fn(&AimDb) -> Arc + Send + Sync, +{ + fn build<'a>(&'a self, db: &'a AimDb) -> BuildFuture<'a> { + Box::pin(SendFutureWrapper(async move { + let listener = self.listener.take_required()?; + let dispatch = (self.dispatch_factory)(db); + let codec = Arc::new(self.codec.clone()); + // `serve` is `Send` here because the listener/connections force-`Send` + // their futures; no extra wrapper needed. + let fut: BoxFuture = Box::pin(serve(listener, codec, dispatch, self.config.clone())); + Ok(vec![fut]) + })) + } + + fn scheme(&self) -> &str { + &self.scheme + } +} + +// =========================================================================== +// Framed connection over `embedded-io-async` — lets a session transport ship +// just a `Framer` and carry zero `unsafe`. +// =========================================================================== + +#[cfg(feature = "connector-io")] +mod framed { + use super::*; + use embedded_io_async::{Read, Write}; + + use aimdb_core::session::PeerInfo; + + /// Frames the byte stream a [`EmbassyConnection`] carries: a transport + /// contributes only this (e.g. COBS, length-prefix), inheriting the + /// force-`Send` plumbing. + pub trait Framer { + /// Encode one logical frame, appending its wire bytes to `out`. + fn encode(&self, frame: &[u8], out: &mut Vec); + /// Feed received bytes into the accumulator. + fn push_bytes(&mut self, bytes: &[u8]); + /// Pull the next complete frame: `Some(Ok(frame))`, `Some(Err(()))` for a + /// malformed/unsynced run (skipped, the stream resyncs), or `None`. + fn next_frame(&mut self) -> Option, ()>>; + } + + /// A framed bidirectional [`Connection`] over an `embedded-io-async` UART (or + /// any `Read`/`Write` halves), force-`Send`ing its `recv`/`send` futures. + /// `RC`/`WC` cap the per-`read`/`write` chunk (UART ring sizes). + pub struct EmbassyConnection { + rx: Rd, + tx: Wr, + framer: F, + peer: PeerInfo, + } + + // SAFETY: single-core cooperative Embassy executor — see the module-level invariant. + unsafe impl Send + for EmbassyConnection + { + } + + impl EmbassyConnection { + /// Wrap the split read/write halves of an async byte stream with `framer`. + pub fn new(rx: Rd, tx: Wr, framer: F) -> Self { + Self { + rx, + tx, + framer, + peer: PeerInfo::default(), + } + } + } + + impl Connection + for EmbassyConnection + where + Rd: Read, + Wr: Write, + F: Framer, + { + fn recv(&mut self) -> BoxFut<'_, TransportResult>>> { + Box::pin(SendFutureWrapper(async move { + loop { + // A run that fails to decode is line noise or a mid-stream + // join, not fatal: skip it and resync on the next frame. + match self.framer.next_frame() { + Some(Ok(frame)) => return Ok(Some(frame)), + Some(Err(())) => continue, + None => {} + } + let mut chunk = [0u8; RC]; + match self.rx.read(&mut chunk).await { + Ok(0) => return Ok(None), // EOF — peer closed + Ok(n) => self.framer.push_bytes(&chunk[..n]), + Err(_) => return Err(TransportError::Io), + } + } + })) + } + + fn send<'a>(&'a mut self, frame: &'a [u8]) -> BoxFut<'a, TransportResult<()>> { + Box::pin(SendFutureWrapper(async move { + let mut out = Vec::new(); + self.framer.encode(frame, &mut out); + // Some HAL `BufferedUart::write` rejects a single write larger than + // its TX ring, so split into `WC`-sized chunks. + for chunk in out.chunks(WC) { + self.tx + .write_all(chunk) + .await + .map_err(|_| TransportError::Closed)?; + } + self.tx.flush().await.map_err(|_| TransportError::Closed) + })) + } + + fn peer(&self) -> &PeerInfo { + &self.peer + } + } +} + +#[cfg(feature = "connector-io")] +pub use framed::{EmbassyConnection, Framer}; diff --git a/aimdb-embassy-adapter/src/lib.rs b/aimdb-embassy-adapter/src/lib.rs index 2906afe7..cba08284 100644 --- a/aimdb-embassy-adapter/src/lib.rs +++ b/aimdb-embassy-adapter/src/lib.rs @@ -82,6 +82,11 @@ pub mod time; #[cfg(not(feature = "std"))] pub mod send_wrapper; +// Centralized Embassy connector spines (session + data-plane) — the one audited +// home for the single-core `unsafe` + `SendFutureWrapper`. +#[cfg(all(not(feature = "std"), feature = "connectors"))] +pub mod connectors; + // Error handling exports #[cfg(not(feature = "std"))] pub use error::EmbassyErrorSupport; diff --git a/aimdb-embassy-adapter/tests/connectors_smoke.rs b/aimdb-embassy-adapter/tests/connectors_smoke.rs new file mode 100644 index 00000000..f3ade9f1 --- /dev/null +++ b/aimdb-embassy-adapter/tests/connectors_smoke.rs @@ -0,0 +1,224 @@ +//! Host smoke for the centralized Embassy connector spine — the **server** side +//! the serial smoke test doesn't reach: [`OneShotCell`]'s take-once semantics, +//! [`OneShotListener`]'s park-forever second `accept`, and core's `serve` over +//! the one-shot listener — the exact path `EmbassySessionServer::build` (and the +//! serial `SerialServer`) drives on an MCU. +//! +//! Runs under the adapter's host test feature set (`alloc,…,connectors`); the +//! futures are driven by `futures::executor::block_on`, no executor needed. + +#![cfg(all(not(feature = "std"), feature = "connectors"))] + +use std::sync::{Arc, Mutex}; + +use core::future::Future; +use core::pin::Pin; +use core::task::{Context, Poll}; + +use futures::executor::block_on; +use futures::future::{select, Either}; +use futures::pin_mut; + +use aimdb_core::session::{ + serve, AuthError, BoxFut, CodecError, Connection, Dispatch, EnvelopeCodec, Inbound, Listener, + Outbound, Payload, PeerInfo, RpcError, Session, SessionConfig, SessionCtx, SessionLimits, + TransportResult, +}; +use aimdb_embassy_adapter::connectors::{OneShotCell, OneShotListener}; + +/// Yields (self-waking) `n` times, then completes — bounds how long we drive a +/// never-returning future like `serve` under `block_on`. +struct YieldN(usize); + +impl Future for YieldN { + type Output = (); + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + if self.0 == 0 { + Poll::Ready(()) + } else { + self.0 -= 1; + cx.waker().wake_by_ref(); + Poll::Pending + } + } +} + +/// A scripted connection: `recv` yields the queued frames then `None` (EOF); +/// `send` records every frame into the shared log. +struct ScriptedConn { + /// Frames to yield, in reverse order (popped from the back). + inbox: Vec>, + sent: Arc>>>, + peer: PeerInfo, +} + +impl ScriptedConn { + fn new(inbox: Vec>, sent: Arc>>>) -> Self { + Self { + inbox, + sent, + peer: PeerInfo::default(), + } + } +} + +impl Connection for ScriptedConn { + fn recv(&mut self) -> BoxFut<'_, TransportResult>>> { + Box::pin(async move { Ok(self.inbox.pop()) }) + } + fn send<'a>(&'a mut self, frame: &'a [u8]) -> BoxFut<'a, TransportResult<()>> { + self.sent.lock().unwrap().push(frame.to_vec()); + Box::pin(async { Ok(()) }) + } + fn peer(&self) -> &PeerInfo { + &self.peer + } +} + +/// Minimal server wire: a request frame is `[id:8][params]`; the reply echoes +/// the same shape back. Client-direction methods are unused here. +struct EchoCodec; + +impl EnvelopeCodec for EchoCodec { + fn decode(&self, frame: &[u8]) -> Result { + if frame.len() < 8 { + return Err(CodecError::Malformed); + } + Ok(Inbound::Request { + id: u64::from_be_bytes(frame[0..8].try_into().unwrap()), + method: "echo".to_string(), + params: Payload::from(&frame[8..]), + }) + } + fn encode(&self, msg: Outbound<'_>, out: &mut Vec) -> Result<(), CodecError> { + match msg { + Outbound::Reply { + id, + result: Ok(payload), + } => { + out.extend_from_slice(&id.to_be_bytes()); + out.extend_from_slice(&payload); + Ok(()) + } + _ => Err(CodecError::Malformed), + } + } + fn encode_inbound(&self, _msg: Inbound, _out: &mut Vec) -> Result<(), CodecError> { + Err(CodecError::Malformed) + } + fn decode_outbound<'a>(&self, _frame: &'a [u8]) -> Result, CodecError> { + Err(CodecError::Malformed) + } +} + +/// Accepts every peer; each session echoes a call's params back as the reply. +struct EchoDispatch; + +impl Dispatch for EchoDispatch { + fn authenticate<'a>( + &'a self, + _peer: &'a PeerInfo, + _first: Option<&'a [u8]>, + ) -> BoxFut<'a, Result> { + Box::pin(async { Ok(SessionCtx::default()) }) + } + fn open(&self, _ctx: &SessionCtx) -> Box { + Box::new(EchoSession) + } +} + +struct EchoSession; + +impl Session for EchoSession { + fn call<'a>( + &'a mut self, + _method: &'a str, + params: Payload, + ) -> BoxFut<'a, Result> { + Box::pin(async move { Ok(params) }) + } + fn write<'a>( + &'a mut self, + _topic: &'a str, + _payload: Payload, + ) -> BoxFut<'a, Result<(), RpcError>> { + Box::pin(async { Ok(()) }) + } +} + +fn session_config() -> SessionConfig { + SessionConfig { + limits: SessionLimits { + // The one-shot spine serves a single point-to-point peer. + max_connections: 1, + max_subs_per_connection: 4, + }, + reads_hello: false, + acks_subscribe: false, + } +} + +#[test] +fn one_shot_cell_hands_out_exactly_once() { + let cell = OneShotCell::new(42u32); + assert_eq!(cell.take(), Some(42)); + assert_eq!(cell.take(), None); + + let cell = OneShotCell::new("conn"); + assert_eq!(cell.take_required().unwrap(), "conn"); + // Second take: the canonical "already built" error, not a panic. + assert!(cell.take_required().is_err()); +} + +#[test] +fn one_shot_listener_parks_after_first_accept() { + let sent = Arc::new(Mutex::new(Vec::new())); + let mut listener = OneShotListener::new(ScriptedConn::new(vec![], sent)); + + block_on(async move { + listener + .accept() + .await + .expect("first accept hands out the connection"); + + // Point-to-point: the second accept must park forever, not error — an + // erroring accept would tear `serve` down. + let second = listener.accept(); + pin_mut!(second); + match select(second, YieldN(8)).await { + Either::Left(_) => panic!("second accept must park forever"), + Either::Right(((), _)) => {} + } + }); +} + +#[test] +fn serve_dispatches_over_one_shot_listener_then_keeps_waiting() { + let sent = Arc::new(Mutex::new(Vec::new())); + + // One scripted request (`id=7`, params `ping`), then EOF. + let mut request = 7u64.to_be_bytes().to_vec(); + request.extend_from_slice(b"ping"); + let conn = ScriptedConn::new(vec![request.clone()], sent.clone()); + + let serve_fut = serve( + OneShotListener::new(conn), + Arc::new(EchoCodec), + Arc::new(EchoDispatch), + session_config(), + ); + + block_on(async move { + pin_mut!(serve_fut); + // `serve` must process the session but never return: after the peer's + // EOF it loops back to the parked one-shot accept. + match select(serve_fut, YieldN(8)).await { + Either::Left(_) => panic!("serve must keep waiting on the parked accept"), + Either::Right(((), _)) => {} + } + }); + + // The echoed reply went out before EOF: same `[id:8][params]` bytes back. + let sent = sent.lock().unwrap(); + assert_eq!(sent.as_slice(), &[request]); +} diff --git a/aimdb-knx-connector/CHANGELOG.md b/aimdb-knx-connector/CHANGELOG.md index 5e909676..2294ff16 100644 --- a/aimdb-knx-connector/CHANGELOG.md +++ b/aimdb-knx-connector/CHANGELOG.md @@ -11,14 +11,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - **Tokio client rebuilt on the shared data-plane toolkit (Issue #39, [design doc](../docs/design/remote-access-via-connectors.md)).** The hand-rolled consume-serialize-publish and telegram read-route loops are replaced by `aimdb-core`'s `pump_sink` / `pump_source` helpers: the connector now writes only a `KnxSink` (`Connector`, parses the destination group address and forwards a fire-and-forget `GroupValueWrite`) and a `KnxSource` (`Source`, yields each inbound `(group_address, payload)`) and composes the pumps in `build()`. The routing `Router` is (re)built inside `pump_source`. `std` enables `aimdb-core/connector-session` (where the pump helpers live; `std` implies it transitively). No public API change. - **Outbound publishers survive a consumer lag (Tokio + Embassy).** A `BufferLagged` (SPMC-ring overflow) on the outbound reader now skips the gap and keeps publishing instead of terminating the publisher; only a closed buffer stops it. -- **`SendFutureWrapper` moved to `aimdb-embassy-adapter`.** The Embassy client's local force-`Send` wrapper is gone in favour of the shared `aimdb_embassy_adapter::SendFutureWrapper` (single definition, no behavior change). +- **M17 — Embassy client rebuilt on core's pumps via the adapter spine ([Design 033](../docs/design/033-M17-unify-connectors-drop-send.md)).** The hand-rolled outbound publisher loops are gone; this crate now contributes only the KNX/IP **protocol** (UDP socket + tunnelling state machine), force-`Send`ed once via `aimdb_embassy_adapter::connectors::into_box_future` — **no `unsafe`, no `SendFutureWrapper`** remain in this crate. Data-flow changes: + - **Outbound** rides core's `pump_sink` through the existing `Connector` impl (commands onto the `CriticalSectionRawMutex` command channel, which is already `Send` — no force-`Send` bridge needed). Behavior change: when a topic provider returns an **invalid dynamic group address**, the value is now dropped (`PublishError::InvalidDestination`, logged by the pump) instead of falling back to the URL's default address — silently writing to a different group address than the provider asked for was misrouting, and this matches the Tokio half. + - **Inbound** rides core's `pump_source` via a new static 32-deep telegram channel (`KnxSource` drains it). The protocol loop forwards each parsed telegram with `try_send` — **drop + log when the channel is full** rather than awaiting, so a slow consumer can never stall the loop that answers `TUNNELING_ACK`s and heartbeats (a stalled loop would time out the gateway connection, losing far more than the dropped telegram). ### Changed (breaking) - **`ConnectorBuilder::build()` now returns `Vec>` instead of `Arc` (Issue #88).** Both Tokio and Embassy implementations updated. - `spawn_connection_task()` → `build_connection_future()`; the `mpsc::channel` for outbound commands is created up front, the receiver captured by the connection future, and the sender cloned into each outbound publisher future. `spawn_outbound_publishers()` → `collect_outbound_futures()`. - `R: Spawn` bounds dropped in favour of `R: RuntimeAdapter`. -- The `transport::Connector` impl on `KnxConnectorImpl` was removed alongside the discarded `Arc` return path. +- The `transport::Connector` impl on `KnxConnectorImpl` was removed alongside the discarded `Arc` return path (Issue #88) — then reinstated as the pure outbound I/O adapter that `pump_sink` drives (Issue #39 / M17): it is no longer a programmatic-publish surface but the route through which every outbound record reaches the command channel. ## [0.4.0] - 2026-05-22 diff --git a/aimdb-knx-connector/Cargo.toml b/aimdb-knx-connector/Cargo.toml index 8fa9e8f4..2c50a0d9 100644 --- a/aimdb-knx-connector/Cargo.toml +++ b/aimdb-knx-connector/Cargo.toml @@ -16,8 +16,10 @@ std = ["aimdb-core/std", "knx-pico/std", "thiserror"] tokio-runtime = ["std", "tokio", "uuid", "async-stream", "futures-util"] embassy-runtime = [ "aimdb-core/alloc", # Need alloc for collect_inbound_routes + "aimdb-core/connector-session", # `pump_sink`/`pump_source`/`Source`/`Payload` "dep:aimdb-embassy-adapter", # Enable the optional dependency "aimdb-embassy-adapter/embassy-net-support", # Enable EmbassyNetwork trait for network stack access + "aimdb-embassy-adapter/connectors", # `into_box_future` spine helper "embassy-executor", "embassy-time", "embassy-sync", diff --git a/aimdb-knx-connector/src/embassy_client.rs b/aimdb-knx-connector/src/embassy_client.rs index 38effa09..ef6f4615 100644 --- a/aimdb-knx-connector/src/embassy_client.rs +++ b/aimdb-knx-connector/src/embassy_client.rs @@ -4,10 +4,19 @@ //! //! # Architecture //! -//! - Manual UDP socket management with `embassy-net` -//! - Manual connection state machine (CONNECT_REQUEST/RESPONSE, TUNNELING_ACK) -//! - Manual telegram parsing and routing -//! - Integration with AimDB's ConnectorBuilder pattern +//! This crate contributes only the KNX/IP **protocol**: UDP socket management with +//! `embassy-net` and the tunnelling state machine (CONNECT_REQUEST/RESPONSE, +//! TUNNELING_ACK, heartbeat, telegram parsing). The data-flow is shared: +//! +//! - **Outbound** (records → telegrams) rides core's `pump_sink` via the existing +//! [`Connector`](aimdb_core::transport::Connector) impl (commands go onto a +//! `CriticalSectionRawMutex` channel the connection task drains). +//! - **Inbound** (telegrams → records) rides core's `pump_source`: the connection +//! task parses each telegram and pushes `(group-address, payload)` onto an inbound +//! channel that [`KnxSource`] drains. +//! - The connection task is force-`Send`ed once via +//! [`into_box_future`](aimdb_embassy_adapter::connectors::into_box_future); the +//! crate carries no `unsafe`. //! //! # Usage //! @@ -32,11 +41,11 @@ use crate::GroupAddress; use aimdb_core::connector::ConnectorUrl; -use aimdb_core::router::{Router, RouterBuilder}; +use aimdb_core::session::{pump_sink, pump_source, Payload}; use aimdb_core::ConnectorBuilder; -use aimdb_embassy_adapter::SendFutureWrapper; +use aimdb_embassy_adapter::connectors::into_box_future; use alloc::boxed::Box; -use alloc::string::ToString; +use alloc::string::{String, ToString}; use alloc::sync::Arc; use alloc::vec; use alloc::vec::Vec; @@ -68,9 +77,21 @@ pub struct GroupWriteData { } use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; -use embassy_sync::channel::Channel; +use embassy_sync::channel::{Channel, Receiver, Sender}; use static_cell::StaticCell; +/// Inbound telegram item: `(group-address string, payload)` — pushed by the +/// connection task, drained by [`KnxSource`] into core's `pump_source`. +type InboundItem = (String, Payload); + +/// `'static` reference to the outbound command channel. +type CommandChannelRef = + &'static Channel; +/// Sender / receiver halves of the inbound telegram channel. +type InboundSender = Sender<'static, CriticalSectionRawMutex, InboundItem, KNX_INBOUND_QUEUE_SIZE>; +type InboundReceiver = + Receiver<'static, CriticalSectionRawMutex, InboundItem, KNX_INBOUND_QUEUE_SIZE>; + /// Capacity of the static KNX command channel. /// /// Embassy requires a compile-time const generic — runtime configurability is @@ -89,6 +110,35 @@ fn get_command_channel( KNX_COMMAND_CHANNEL.init(Channel::new()) } +/// Capacity of the static inbound telegram channel. +const KNX_INBOUND_QUEUE_SIZE: usize = 32; + +/// Static channel for inbound telegrams (capacity: [`KNX_INBOUND_QUEUE_SIZE`]). +static KNX_INBOUND_CHANNEL: StaticCell< + Channel, +> = StaticCell::new(); + +/// Get or initialize the inbound telegram channel. +fn get_inbound_channel( +) -> &'static Channel { + KNX_INBOUND_CHANNEL.init(Channel::new()) +} + +/// Inbound [`Source`](aimdb_core::session::Source): drains the connection task's +/// telegram channel, yielding each `(group-address, payload)` for core's +/// `pump_source` to fan out to the matching record producers. The KNX command/ +/// inbound channels use `CriticalSectionRawMutex` (`Send`), so this is a plain +/// `Source` — no force-`Send` wrapper needed. +struct KnxSource { + receiver: Receiver<'static, CriticalSectionRawMutex, InboundItem, KNX_INBOUND_QUEUE_SIZE>, +} + +impl aimdb_core::session::Source for KnxSource { + fn next(&mut self) -> aimdb_core::session::BoxFut<'_, Option<(String, Payload)>> { + Box::pin(async move { Some(self.receiver.receive().await) }) + } +} + /// KNX connector builder for Embassy runtime pub struct KnxConnectorBuilder { gateway_url: heapless::String<128>, @@ -118,70 +168,43 @@ where &'a self, db: &'a aimdb_core::builder::AimDb, ) -> Pin>> + Send + 'a>> { - // Wrap in SendFutureWrapper since Embassy types aren't Send but we're single-threaded - Box::pin(SendFutureWrapper(async move { - // Collect inbound routes from database - let routes = db.collect_inbound_routes("knx"); - - #[cfg(feature = "defmt")] - defmt::trace!( - "Collected {} inbound routes for KNX connector", - routes.len() - ); - - // Convert routes to Router - let router = RouterBuilder::from_routes(routes).build(); - - #[cfg(feature = "defmt")] - defmt::trace!( - "KNX router has {} unique group addresses", - router.resource_ids().len() - ); - - // Build connection future and channel - let runtime_ctx = db.runtime_any(); - let (command_channel, connection_future) = KnxConnectorImpl::build_internal( - self.gateway_url.as_str(), - router, - db.runtime(), - Some(runtime_ctx.clone()), - ) - .await - .map_err(|_e| { - #[cfg(feature = "defmt")] - defmt::error!("Failed to build KNX connector"); - - #[cfg(feature = "std")] - { - aimdb_core::DbError::RuntimeError { - message: format!("Failed to build KNX connector: {}", _e), + // No `.await` here, so the build future is `Send` without a wrapper: the + // tunnelling connection task (which holds the `!Send` UDP socket) is + // force-`Send`ed once via `into_box_future`; the data-flow rides core's + // pumps with `Send` `Connector`/`Source` (KNX channels are + // `CriticalSectionRawMutex`, i.e. `Send`). + Box::pin(async move { + let (command_channel, inbound_rx, connection_task) = + KnxConnectorImpl::setup(self.gateway_url.as_str(), db.runtime()).map_err(|_e| { + #[cfg(feature = "defmt")] + defmt::error!("Failed to build KNX connector"); + #[cfg(feature = "std")] + { + aimdb_core::DbError::RuntimeError { + message: format!("Failed to build KNX connector: {}", _e), + } } - } - #[cfg(not(feature = "std"))] - { - aimdb_core::DbError::RuntimeError { _message: () } - } - })?; - - // Collect outbound publisher futures - let outbound_routes = db.collect_outbound_routes("knx"); - - #[cfg(feature = "defmt")] - defmt::trace!( - "Collected {} outbound routes for KNX connector", - outbound_routes.len() - ); - - let mut futures: Vec = Vec::with_capacity(1 + outbound_routes.len()); - futures.push(connection_future); - futures.extend(KnxConnectorImpl::collect_outbound_futures( - command_channel, - runtime_ctx, - outbound_routes, + #[cfg(not(feature = "std"))] + { + aimdb_core::DbError::RuntimeError { _message: () } + } + })?; + + // Outbound: records → KNX telegrams via the existing `Connector` impl. + let mut futures = pump_sink(db, "knx", Arc::new(KnxConnectorImpl { command_channel })); + // Inbound: KNX telegrams → records via the connection task's channel. + futures.extend(pump_source( + db, + "knx", + KnxSource { + receiver: inbound_rx, + }, )); + // The KNX/IP tunnelling state machine (force-`Send` protocol task). + futures.push(connection_task); Ok(futures) - })) + }) } fn scheme(&self) -> &str { @@ -279,19 +302,15 @@ pub struct KnxConnectorImpl { } impl KnxConnectorImpl { - /// Create a new KNX connector with pre-configured router (internal) - async fn build_internal( + /// Set up the command + inbound channels and the tunnelling connection task. + /// + /// Synchronous (no `.await`) so the caller's `build` future stays `Send`. + /// Returns the command channel (for outbound `pump_sink`), the inbound + /// receiver (for `pump_source`), and the force-`Send` connection task. + fn setup( gateway_url: &str, - router: Router, runtime: &R, - runtime_ctx: Option>, - ) -> Result< - ( - &'static Channel, - BoxFuture, - ), - &'static str, - > + ) -> Result<(CommandChannelRef, InboundReceiver, BoxFuture), &'static str> where R: aimdb_executor::RuntimeAdapter + aimdb_embassy_adapter::EmbassyNetwork + 'static, { @@ -307,40 +326,33 @@ impl KnxConnectorImpl { // Parse gateway IP address let gateway_ip = Ipv4Address::from_str(&host).map_err(|_| "Invalid gateway IP address")?; - // Clone router for background task - let router_arc = Arc::new(router); - let router_for_task = router_arc.clone(); - // Get network stack for background task let network = runtime.network_stack(); - // Initialize command channel + // Channels: outbound commands (publish → task) and inbound telegrams (task → pump_source). let command_channel = get_command_channel(); + let inbound_channel = get_inbound_channel(); + let inbound_tx = inbound_channel.sender(); + let inbound_rx = inbound_channel.receiver(); - // Build KNX connection future (returned instead of spawned) - let knx_task_future: BoxFuture = Box::pin(SendFutureWrapper(async move { + // The KNX/IP tunnelling state machine (holds the `!Send` UDP socket — force-`Send`). + let knx_task_future = into_box_future(async move { #[cfg(feature = "defmt")] defmt::trace!("KNX background task starting for {}:{}", gateway_ip, port); // Run the connection listener (this never returns under normal conditions) #[allow(unreachable_code)] { - let _: () = Self::connection_task( - network, - gateway_ip, - port, - router_for_task, - command_channel, - runtime_ctx, - ) - .await; + let _: () = + Self::connection_task(network, gateway_ip, port, command_channel, inbound_tx) + .await; } - })); + }); #[cfg(feature = "defmt")] defmt::trace!("KNX connector initialized"); - Ok((command_channel, knx_task_future)) + Ok((command_channel, inbound_rx, knx_task_future)) } /// Background task that maintains KNX connection and receives telegrams @@ -348,13 +360,8 @@ impl KnxConnectorImpl { stack: &'static Stack<'static>, gateway_addr: Ipv4Address, gateway_port: u16, - router: Arc, - command_channel: &'static Channel< - CriticalSectionRawMutex, - KnxCommand, - KNX_COMMAND_QUEUE_SIZE, - >, - runtime_ctx: Option>, + command_channel: CommandChannelRef, + inbound_tx: InboundSender, ) { loop { #[cfg(feature = "defmt")] @@ -368,9 +375,8 @@ impl KnxConnectorImpl { stack, gateway_addr, gateway_port, - &router, command_channel, - runtime_ctx.as_ref(), + inbound_tx, ) .await { @@ -397,13 +403,8 @@ impl KnxConnectorImpl { stack: &'static Stack<'static>, gateway_addr: Ipv4Address, gateway_port: u16, - router: &Router, - command_channel: &'static Channel< - CriticalSectionRawMutex, - KnxCommand, - KNX_COMMAND_QUEUE_SIZE, - >, - runtime_ctx: Option<&Arc>, + command_channel: CommandChannelRef, + inbound_tx: InboundSender, ) -> Result<(), &'static str> { // Create UDP socket with static buffers let mut rx_meta = [PacketMetadata::EMPTY; 4]; @@ -585,7 +586,10 @@ impl KnxConnectorImpl { #[cfg(feature = "defmt")] defmt::trace!("Sent TUNNELING_ACK with seq={}", received_seq); - // Parse and route telegram + // Parse and forward telegram to `pump_source` (via the + // inbound channel). Non-blocking: drop + log on a full + // channel rather than stalling the protocol loop, matching + // the router's drop-on-overflow behaviour. if let Some((addr, data)) = Self::parse_telegram(&recv_buf[..len]) { let resource_id = addr.to_string(); @@ -596,14 +600,12 @@ impl KnxConnectorImpl { data.len() ); - if let Err(_e) = - router.route(&resource_id, &data, runtime_ctx).await + if inbound_tx + .try_send((resource_id, Payload::from(data))) + .is_err() { #[cfg(feature = "defmt")] - defmt::warn!( - "Failed to route telegram to {}", - resource_id.as_str() - ); + defmt::warn!("KNX inbound channel full; dropped telegram"); } } else { #[cfg(feature = "defmt")] @@ -1013,168 +1015,6 @@ impl KnxConnectorImpl { Some((dest, payload)) } - - /// Spawn outbound publishers for records that link_to() KNX group addresses - /// - /// If a topic provider is configured, it will be called for each value to - /// dynamically determine the KNX group address. Otherwise, the static default is used. - fn collect_outbound_futures( - command_channel: &'static Channel< - CriticalSectionRawMutex, - KnxCommand, - KNX_COMMAND_QUEUE_SIZE, - >, - runtime_ctx: Arc, - outbound_routes: Vec, - ) -> Vec { - let mut futures = Vec::with_capacity(outbound_routes.len()); - - for (default_group_addr_str, consumer, serializer, _config, topic_provider) in - outbound_routes - { - let default_group_addr_clone = default_group_addr_str.clone(); - let runtime_ctx = runtime_ctx.clone(); - - futures.push(Box::pin(SendFutureWrapper(async move { - // Parse default group address using knx-pico's type-safe parser - let default_group_addr = match default_group_addr_clone.parse::() { - Ok(addr) => Some(addr), - Err(_e) => { - // If no topic provider, this is an error - if topic_provider.is_none() { - #[cfg(feature = "defmt")] - defmt::error!( - "Invalid group address for outbound: '{}'", - default_group_addr_clone.as_str() - ); - return; - } - // With topic provider, the default can be invalid (will be overridden) - None - } - }; - - // Subscribe to typed values (type-erased) - let mut reader = match consumer.subscribe_any().await { - Ok(r) => r, - Err(_e) => { - #[cfg(feature = "defmt")] - defmt::error!( - "Failed to subscribe for outbound: '{}'", - default_group_addr_clone.as_str() - ); - return; - } - }; - - #[cfg(feature = "defmt")] - defmt::info!( - "KNX outbound publisher started for: {}", - default_group_addr_clone.as_str() - ); - - loop { - let value_any = match reader.recv_any().await { - Ok(v) => v, - // SPMC-ring overflow — skip the gap, keep publishing. - Err(aimdb_core::DbError::BufferLagged { .. }) => { - #[cfg(feature = "defmt")] - defmt::warn!( - "KNX outbound: consumer lagged for '{}'", - default_group_addr_clone.as_str() - ); - continue; - } - // Buffer closed — stop the publisher. - Err(_) => break, - }; - // Determine group address: dynamic (from provider) or default (from URL) - let group_addr_str = topic_provider - .as_ref() - .and_then(|provider| provider.topic_any(&*value_any)) - .unwrap_or_else(|| default_group_addr_clone.clone()); - - // Parse group address (may be dynamic) - let group_addr = match group_addr_str.parse::() { - Ok(addr) => addr, - Err(_e) => { - // Try to use cached default if available - if let Some(addr) = default_group_addr { - addr - } else { - #[cfg(feature = "defmt")] - defmt::error!( - "Invalid dynamic group address: '{}'", - group_addr_str.as_str() - ); - continue; - } - } - }; - - // Serialize the type-erased value - let bytes = match &serializer { - aimdb_core::connector::SerializerKind::Raw(ser) => match ser(&*value_any) { - Ok(b) => b, - Err(_e) => { - #[cfg(feature = "defmt")] - defmt::error!( - "Failed to serialize for group address '{}'", - group_addr_str.as_str() - ); - continue; - } - }, - aimdb_core::connector::SerializerKind::Context(ser) => { - match ser(runtime_ctx.clone(), &*value_any) { - Ok(b) => b, - Err(_e) => { - #[cfg(feature = "defmt")] - defmt::error!( - "Failed to serialize for group address '{}'", - group_addr_str.as_str() - ); - continue; - } - } - } - }; - - // Convert to heapless::Vec - let mut vec_data = heapless::Vec::::new(); - if vec_data.extend_from_slice(&bytes).is_err() { - #[cfg(feature = "defmt")] - defmt::error!( - "Data too large for group address '{}'", - group_addr_str.as_str() - ); - continue; - } - - // Send command to connection task - let cmd = KnxCommand { - kind: KnxCommandKind::GroupWrite(Box::new(GroupWriteData { - group_addr, - data: vec_data, - })), - }; - - command_channel.send(cmd).await; - - #[cfg(feature = "defmt")] - defmt::debug!("Published to KNX: {}", group_addr_str.as_str()); - } - - #[cfg(feature = "defmt")] - defmt::info!( - "KNX outbound publisher stopped for: {}", - default_group_addr_clone.as_str() - ); - })) as BoxFuture); - } - - futures - } } // Implement the Connector trait diff --git a/aimdb-mqtt-connector/CHANGELOG.md b/aimdb-mqtt-connector/CHANGELOG.md index 06441db9..648ebf0a 100644 --- a/aimdb-mqtt-connector/CHANGELOG.md +++ b/aimdb-mqtt-connector/CHANGELOG.md @@ -10,13 +10,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed - **Tokio client rebuilt on the shared data-plane toolkit (Issue #39, [design doc](../docs/design/remote-access-via-connectors.md)).** The hand-rolled consume-serialize-publish and read-route loops are replaced by `aimdb-core`'s `pump_sink` / `pump_source` helpers (the connector now writes only its `Connector`/`Source` I/O adapters and composes the pumps in `build()`). Per-route configuration (`qos` / `retain` / `timeout_ms` / …) is threaded from each link URL's query via `ConnectorConfig::from_query`. `std` now enables `aimdb-core/connector-session` (where the pump helpers live; `std` implies it transitively). No public API change. -- **Outbound publisher survives a consumer lag; `SendFutureWrapper` relocated (Embassy client, Issue #39).** A `BufferLagged` (SPMC-ring overflow) on the outbound reader now skips the gap and keeps publishing instead of terminating the publisher; only a closed buffer stops it. The Embassy client's local force-`Send` wrapper is gone in favour of the shared `aimdb_embassy_adapter::SendFutureWrapper` (single definition, no behavior change). +- **Outbound publisher survives a consumer lag (Embassy client, Issue #39).** A `BufferLagged` (SPMC-ring overflow) on the outbound reader now skips the gap and keeps publishing instead of terminating the publisher; only a closed buffer stops it. +- **M17 — Embassy client rebuilt on core's pumps via the adapter spine ([Design 033](../docs/design/033-M17-unify-connectors-drop-send.md)).** The hand-rolled outbound publisher and inbound event-router loops are gone: the Embassy half now rides core's `pump_sink` / `pump_source` through the force-`Send` `EmbassySink` / `EmbassySource` bridges in `aimdb-embassy-adapter::connectors`, exactly like the Tokio half rides them — this crate contributes only the broker **manager task** (mountain-mqtt's `run`, force-`Send`ed once via `into_box_future`) and the `MqttSink` / `MqttSource` over its action/event channels. **No `unsafe`, no `SendFutureWrapper`** remain in this crate. Per-route `qos` / `retain` still arrive from each link URL's query (now via `ConnectorConfig::protocol_options`, parsed per publish). Note: per-message inbound routing logs moved from this crate's `defmt` calls into core's `pump_source` (`tracing` feature), so defmt-only MCU builds no longer log per-message routing failures. ### Changed (breaking) - **`ConnectorBuilder::build()` now returns `Vec>` instead of `Arc` (Issue #88).** Both Tokio and Embassy implementations updated. The MQTT event-loop, the Embassy event-router, and every outbound publisher are returned as futures that the `AimDbRunner` drives — no more `runtime.spawn` / `tokio::spawn` inside the connector. `R: Spawn` bounds dropped throughout in favour of `R: RuntimeAdapter`. - `spawn_event_loop()` → `build_event_loop_future()` (Tokio side). `spawn_outbound_publishers()` → `collect_outbound_futures()` on both Tokio and Embassy. - The `transport::Connector` impl on `MqttConnectorImpl` was removed alongside the discarded `Arc` return path; direct programmatic publish was already unreachable through the `AimDbBuilder` public API. +- **`MqttConnectorImpl` (Embassy) removed entirely (M17).** It was a build-time aggregation holder; its logic collapsed into the private `setup_manager` + the pump composition in `build()`. Register via `MqttConnectorBuilder` as before — the builder's public API is unchanged. ## [0.6.0] - 2026-05-22 diff --git a/aimdb-mqtt-connector/Cargo.toml b/aimdb-mqtt-connector/Cargo.toml index fa2e8e35..e5caf897 100644 --- a/aimdb-mqtt-connector/Cargo.toml +++ b/aimdb-mqtt-connector/Cargo.toml @@ -25,8 +25,10 @@ tokio-runtime = [ ] embassy-runtime = [ "aimdb-core/alloc", # Need alloc for collect_inbound_routes + "aimdb-core/connector-session", # `pump_sink`/`pump_source`/`Source`/`Payload` "dep:aimdb-embassy-adapter", # Enable the optional dependency "aimdb-embassy-adapter/embassy-net-support", # Enable EmbassyNetwork trait for network stack access + "aimdb-embassy-adapter/connectors", # `EmbassySink`/`EmbassySource`/`into_box_future` spine "embassy-executor", "embassy-time", "embassy-sync", diff --git a/aimdb-mqtt-connector/src/embassy_client.rs b/aimdb-mqtt-connector/src/embassy_client.rs index 7d405d04..2bc9d7fa 100644 --- a/aimdb-mqtt-connector/src/embassy_client.rs +++ b/aimdb-mqtt-connector/src/embassy_client.rs @@ -5,11 +5,15 @@ //! //! # Architecture //! -//! Uses the ConnectorBuilder pattern integrated with AimDB: -//! - `MqttConnectorBuilder` is used with `.with_connector()` during database setup -//! - Background tasks are spawned automatically during connector initialization -//! - MQTT manager handles connection, reconnection, and message delivery -//! - Event router forwards incoming MQTT messages to appropriate record producers +//! The data-flow (outbound publish, inbound routing) rides core's +//! [`pump_sink`](aimdb_core::session::pump_sink) / +//! [`pump_source`](aimdb_core::session::pump_source) via the force-`Send` +//! [`EmbassySink`]/[`EmbassySource`] bridges in `aimdb-embassy-adapter`, exactly +//! like the Tokio half rides them. This crate contributes only the +//! transport-specific bits: the broker **manager task** (mountain-mqtt's `run`), +//! the [`MqttSink`]/[`MqttSource`] over its action/event channels, and the +//! `MqttOperations`/`FromApplicationMessage` glue. No `unsafe`, no +//! `SendFutureWrapper` — both live in the adapter spine. //! //! # Usage //! @@ -17,23 +21,15 @@ //! use aimdb_mqtt_connector::embassy_client::MqttConnectorBuilder; //! use aimdb_core::AimDbBuilder; //! -//! // Configure database with MQTT connector //! let db = AimDbBuilder::new() //! .runtime(embassy_adapter) //! .with_connector( //! MqttConnectorBuilder::new("mqtt://192.168.1.100:1883") -//! .with_client_id("my-unique-device-id") +//! .with_client_id("my-unique-device-id"), //! ) //! .configure::(|reg| { -//! // Outbound: Publish temperature readings to MQTT -//! reg.link_to("mqtt://sensors/temperature") -//! .with_serializer(|temp| Ok(temp.to_json())) -//! .finish(); -//! -//! // Inbound: Subscribe to commands from MQTT -//! reg.link_from("mqtt://commands/temperature") -//! .with_deserializer(|data| TemperatureCommand::from_json(data)) -//! .finish(); +//! reg.link_to("mqtt://sensors/temperature").finish(); +//! reg.link_from("mqtt://commands/temperature").finish(); //! }) //! .build().await?; //! ``` @@ -41,7 +37,9 @@ extern crate alloc; use aimdb_core::connector::ConnectorUrl; -use aimdb_core::router::{Router, RouterBuilder}; +use aimdb_core::router::RouterBuilder; +use aimdb_core::session::{pump_sink, pump_source, Payload}; +use aimdb_core::transport::{ConnectorConfig, PublishError}; use aimdb_core::ConnectorBuilder; use alloc::boxed::Box; use alloc::format; @@ -53,10 +51,12 @@ use core::net::Ipv4Addr; use core::pin::Pin; use core::str::FromStr; -use aimdb_embassy_adapter::SendFutureWrapper; +use aimdb_embassy_adapter::connectors::{ + into_box_future, EmbassySink, EmbassySinkRaw, EmbassySource, EmbassySourceRaw, +}; use embassy_net::Ipv4Address; use embassy_sync::blocking_mutex::raw::NoopRawMutex; -use embassy_sync::channel::{Channel, Sender}; +use embassy_sync::channel::{Channel, Receiver, Sender}; use embassy_sync::once_lock::OnceLock; use static_cell::StaticCell; @@ -74,6 +74,14 @@ const BUFFER_SIZE: usize = 4096; /// Maximum properties in MQTT packets const MAX_PROPERTIES: usize = 16; +/// The runner's collected future type. +type EmbassyBoxFuture = Pin + Send + 'static>>; + +/// Sender half of the action channel (outbound publishes + subscriptions). +type ActionSender = Sender<'static, NoopRawMutex, AimdbMqttAction, CHANNEL_SIZE>; +/// Receiver half of the event channel (inbound messages from the broker). +type EventReceiver = Receiver<'static, NoopRawMutex, MqttEvent, CHANNEL_SIZE>; + /// MQTT actions that can be performed /// /// Implements the `MqttOperations` trait required by mountain-mqtt-embassy. @@ -163,7 +171,7 @@ impl MqttOperations for AimdbMqttAction { /// MQTT events for received messages /// /// Handles incoming MQTT messages that will be routed to the appropriate -/// record producers via the router. +/// record producers via core's `pump_source`. #[derive(Clone)] pub enum AimdbMqttEvent { /// A message was received from a subscribed topic @@ -195,51 +203,80 @@ impl mountain_mqtt_embassy::mqtt_manager::FromApplicationMessage } } -/// MQTT connector builder for Embassy with router-based dispatch -/// -/// Similar to the Tokio version, this builder creates an MQTT connector -/// that integrates with AimDB's routing system. The builder collects routes -/// from the database during build() and automatically subscribes to topics. -/// -/// # Usage Pattern -/// -/// ```rust,ignore -/// use aimdb_mqtt_connector::embassy::MqttConnectorBuilder; +// =========================================================================== +// Data-plane bridges — ride core's pumps via the adapter's force-`Send` wrappers. +// =========================================================================== + +/// Outbound sink: turns a `pump_sink` publish into an `AimdbMqttAction::Publish` +/// enqueued onto the manager's action channel. Wrapped in +/// [`EmbassySink`] so it drives core's `pump_sink` despite the `!Send` channel. +struct MqttSink { + sender: ActionSender, +} + +impl EmbassySinkRaw for MqttSink { + async fn publish( + &self, + destination: String, + config: ConnectorConfig, + payload: Vec, + ) -> Result<(), PublishError> { + // `qos`/`retain` arrive via the URL query (passed through in + // `protocol_options`); default to QoS 1 (legacy behaviour), no retain. + let qos = opt_u8(&config, "qos") + .map(map_qos) + .unwrap_or(QualityOfService::Qos1); + let retain = opt_bool(&config, "retain").unwrap_or(false); + + self.sender + .send(AimdbMqttAction::Publish { + topic: destination, + payload, + qos, + retain, + }) + .await; + Ok(()) + } +} + +/// Inbound source: drains the manager's event channel, yielding each received +/// message as `(topic, payload)`. Wrapped in [`EmbassySource`] so it drives +/// core's `pump_source` (which fans out to the matching record producers). +struct MqttSource { + receiver: EventReceiver, +} + +impl EmbassySourceRaw for MqttSource { + async fn next(&mut self) -> Option<(String, Payload)> { + loop { + match self.receiver.receive().await { + MqttEvent::ApplicationEvent { + event: AimdbMqttEvent::MessageReceived { topic, payload }, + .. + } => return Some((topic, Payload::from(payload))), + // Connection lifecycle events (Connected/Disconnected/…) carry no + // record data; skip and keep draining. + _ => continue, + } + } + } +} + +/// MQTT connector builder for Embassy with router-based dispatch. /// -/// // Configure database with MQTT links -/// let db = AimDbBuilder::new() -/// .runtime(embassy_adapter) -/// .with_connector( -/// MqttConnectorBuilder::new("mqtt://192.168.1.100:1883") -/// .with_client_id("my-device-001") -/// ) -/// .configure::(|reg| { -/// reg.link_from("mqtt://commands/temp") -/// .with_deserializer(deserialize_temp) -/// .with_buffer(BufferCfg::SingleLatest); -/// }) -/// .build().await?; -/// ``` +/// Collects routes from the database during `build()` and wires the broker +/// manager + the outbound/inbound pumps. pub struct MqttConnectorBuilder { broker_url: String, client_id: String, } impl MqttConnectorBuilder { - /// Create a new MQTT connector builder for Embassy + /// Create a new MQTT connector builder for Embassy. /// /// # Arguments /// * `broker_url` - Broker URL in format `mqtt://host:port` - /// - Scheme: `mqtt://` (plain TCP) - /// - Host: IP address or hostname (e.g., "192.168.1.100") - /// - Port: MQTT broker port (default: 1883) - /// - /// # Example - /// - /// ```rust,ignore - /// let builder = MqttConnectorBuilder::new("mqtt://192.168.1.100:1883") - /// .with_client_id("sensor-node-42"); - /// ``` pub fn new(broker_url: impl Into) -> Self { Self { broker_url: broker_url.into(), @@ -247,28 +284,13 @@ impl MqttConnectorBuilder { } } - /// Set the MQTT client ID - /// - /// The client ID should be unique for each device connecting to the broker. - /// It's used for session persistence and message delivery guarantees. - /// - /// # Arguments - /// * `client_id` - Unique identifier for this client (max 32 chars recommended) - /// - /// # Example - /// - /// ```rust,ignore - /// let builder = MqttConnectorBuilder::new("mqtt://192.168.1.100:1883") - /// .with_client_id("my-unique-device-id"); - /// ``` + /// Set the MQTT client ID (should be unique per device). pub fn with_client_id(mut self, client_id: impl Into) -> Self { self.client_id = client_id.into(); self } } -type EmbassyBoxFuture = Pin + Send + 'static>>; - /// Implement ConnectorBuilder trait for Embassy runtime with network stack access. /// /// Requires the runtime to provide the `EmbassyNetwork` trait so the connector @@ -282,71 +304,46 @@ where db: &'a aimdb_core::builder::AimDb, ) -> Pin>> + Send + 'a>> { - // Wrap in SendFutureWrapper since Embassy types aren't Send but we're single-threaded - Box::pin(SendFutureWrapper(async move { - // Collect inbound routes from database - let routes = db.collect_inbound_routes("mqtt"); - - #[cfg(feature = "defmt")] - defmt::info!( - "Collected {} inbound routes for MQTT connector", - routes.len() - ); - - // Convert routes to Router - let router = RouterBuilder::from_routes(routes).build(); - - #[cfg(feature = "defmt")] - defmt::info!("MQTT router has {} topics", router.resource_ids().len()); - - // Build the action sender + infrastructure futures (mqtt manager + event router). - let runtime_ctx = db.runtime_any(); - let (action_sender, infra_futures) = MqttConnectorImpl::build_internal( - &self.broker_url, - &self.client_id, - router, - db.runtime(), - Some(runtime_ctx), - ) - .await - .map_err(|_e| { - #[cfg(feature = "defmt")] - defmt::error!("Failed to build MQTT connector"); - - #[cfg(feature = "std")] - { - aimdb_core::DbError::RuntimeError { - message: format!("Failed to build MQTT connector: {}", _e), - } - } - #[cfg(not(feature = "std"))] - { - aimdb_core::DbError::RuntimeError { _message: () } - } - })?; - - // Collect outbound publisher futures. - let outbound_routes = db.collect_outbound_routes("mqtt"); + // No `.await` in this body, so the future is `Send` without a wrapper: the + // `!Send` channel ends are immediately moved into the force-`Send` + // `EmbassySink`/`EmbassySource`/manager-task and never held across a suspend. + Box::pin(async move { + // Inbound topics to subscribe to (the manager sends `Subscribe` for each). + let inbound_routes = db.collect_inbound_routes("mqtt"); + let topics: Vec = RouterBuilder::from_routes(inbound_routes) + .build() + .resource_ids() + .iter() + .map(|t| t.to_string()) + .collect(); #[cfg(feature = "defmt")] - defmt::info!( - "Collected {} outbound routes for MQTT connector", - outbound_routes.len() + defmt::info!("MQTT: subscribing to {} inbound topics", topics.len()); + + // Broker manager task + the channel ends for the pumps. + let (action_sender, event_receiver, manager_task) = + setup_manager(&self.broker_url, &self.client_id, db.runtime(), topics)?; + + // Outbound publishes + inbound routing ride core's pumps. + let mut futures = pump_sink( + db, + "mqtt", + Arc::new(EmbassySink(MqttSink { + sender: action_sender, + })), ); - - let runtime_ctx: Arc = db.runtime_any(); - let outbound_futures = MqttConnectorImpl::collect_outbound_futures( - action_sender, - runtime_ctx, - outbound_routes, - ); - - let mut all: Vec = - Vec::with_capacity(infra_futures.len() + outbound_futures.len()); - all.extend(infra_futures); - all.extend(outbound_futures); - Ok(all) - })) + futures.extend(pump_source( + db, + "mqtt", + EmbassySource(MqttSource { + receiver: event_receiver, + }), + )); + // The broker manager protocol loop (force-`Send` via the adapter). + futures.push(manager_task); + + Ok(futures) + }) } fn scheme(&self) -> &str { @@ -354,330 +351,133 @@ where } } -/// Build-time helper aggregating Embassy MQTT construction logic. -pub struct MqttConnectorImpl; - -impl MqttConnectorImpl { - /// Build the MQTT manager + event router futures and return the action - /// channel sender for use by outbound publishers. - /// - /// # Arguments - /// * `broker_url` - Broker URL (mqtt://host:port) - /// * `client_id` - MQTT client identifier - /// * `router` - Pre-configured router with all routes - /// * `runtime` - Embassy runtime adapter for network access - /// * `runtime_ctx` - Optional type-erased runtime for context-aware deserializers - async fn build_internal( - broker_url: &str, - client_id: &str, - router: Router, - runtime: &R, - runtime_ctx: Option>, - ) -> Result< - ( - Sender<'static, NoopRawMutex, AimdbMqttAction, CHANNEL_SIZE>, - Vec, - ), - &'static str, - > - where - R: aimdb_executor::RuntimeAdapter + aimdb_embassy_adapter::EmbassyNetwork + 'static, - { - // Parse the broker URL - let mut url = broker_url.to_string(); - - // If no topic is provided, add a dummy one for parsing - if !url.contains('/') || url.matches('/').count() < 3 { - url = format!("{}/dummy", url.trim_end_matches('/')); - } - - let connector_url = ConnectorUrl::parse(&url).map_err(|_| "Invalid MQTT URL")?; - - let host = connector_url.host.clone(); - let port = connector_url.port.unwrap_or(1883); - - #[cfg(feature = "defmt")] - defmt::info!("Creating MQTT connector for {}:{}", host.as_str(), port); - - // Parse broker IP address - let broker_ip = Ipv4Addr::from_str(&host).map_err(|_| "Invalid broker IP address")?; - - // Convert to embassy Ipv4Address - let octets = broker_ip.octets(); - let broker_addr = Ipv4Address::new(octets[0], octets[1], octets[2], octets[3]); - - // Store client_id in static memory for 'static lifetime requirement - // Uses OnceLock to gracefully handle multiple initialization attempts - static CLIENT_ID_STORAGE: OnceLock = OnceLock::new(); - let client_id_static: &'static str = - CLIENT_ID_STORAGE.get_or_init(|| client_id.to_string()); - - #[cfg(feature = "defmt")] - defmt::info!("MQTT client ID: {}", client_id_static); - - // Create static channels for MQTT communication - static ACTION_CHANNEL: StaticCell> = - StaticCell::new(); - static EVENT_CHANNEL: StaticCell< - Channel, CHANNEL_SIZE>, - > = StaticCell::new(); - - // Try to initialize channels (will panic if called twice - expected in embedded) - let action_channel = ACTION_CHANNEL.init(Channel::new()); - let event_channel = EVENT_CHANNEL.init(Channel::new()); - - let action_sender = action_channel.sender(); - let action_receiver = action_channel.receiver(); - let event_sender = event_channel.sender(); - let event_receiver = event_channel.receiver(); - - // Create connection settings - let connection_settings = ConnectionSettings::unauthenticated(client_id_static); - - // Create mqtt_manager settings - let settings = Settings::new(broker_addr, port); - - // Clone router for background task - let router_arc = Arc::new(router); - let router_for_task = router_arc.clone(); - - // Get topics to subscribe to from router - let topics = router_arc.resource_ids(); - +/// Set up the static channels + the mountain-mqtt broker manager task, returning +/// the action sender (outbound), the event receiver (inbound), and the manager +/// task future (which first queues the topic subscriptions, then runs the broker +/// loop). Synchronous — no `.await` — so the caller's `build` future stays `Send`. +fn setup_manager( + broker_url: &str, + client_id: &str, + runtime: &R, + topics: Vec, +) -> Result<(ActionSender, EventReceiver, EmbassyBoxFuture), aimdb_core::DbError> +where + R: aimdb_executor::RuntimeAdapter + aimdb_embassy_adapter::EmbassyNetwork + 'static, +{ + let build_err = |_msg: &str| { #[cfg(feature = "defmt")] - defmt::info!("Will subscribe to {} MQTT topics", topics.len()); - - // Get network stack for background task - let network = runtime.network_stack(); - - // Build the MQTT manager future (returned to the runner — design 028 §"Connector futures"). - let mqtt_task_future: EmbassyBoxFuture = Box::pin(SendFutureWrapper(async move { - #[cfg(feature = "defmt")] - defmt::info!("MQTT background task starting"); - - // Run the MQTT manager (this never returns) - #[allow(unreachable_code)] - { - let _: () = mqtt_manager::run::< - AimdbMqttAction, - AimdbMqttEvent, - MAX_PROPERTIES, - BUFFER_SIZE, - CHANNEL_SIZE, - >( - *network, // Dereference the network stack reference - connection_settings, - settings, - event_sender, - action_receiver, - ) - .await; + defmt::error!("Failed to build MQTT connector: {}", _msg); + #[cfg(feature = "std")] + { + aimdb_core::DbError::RuntimeError { + message: format!("Failed to build MQTT connector: {}", _msg), } - })); - - // Build the event router future for inbound message routing. - let event_router_future: EmbassyBoxFuture = Box::pin(SendFutureWrapper(async move { - #[cfg(feature = "defmt")] - defmt::info!("MQTT event router task starting"); - - loop { - let event = event_receiver.receive().await; - - match event { - MqttEvent::ApplicationEvent { - event: AimdbMqttEvent::MessageReceived { topic, payload }, - .. - } => { - #[cfg(feature = "defmt")] - defmt::debug!( - "Routing MQTT message from topic '{}', {} bytes", - topic.as_str(), - payload.len() - ); - - // Route the message through the router to the appropriate producer - if let Err(_e) = router_for_task - .route(&topic, &payload, runtime_ctx.as_ref()) - .await - { - #[cfg(feature = "defmt")] - defmt::warn!("Failed to route MQTT message from '{}'", topic.as_str()); - } - } - _ => { - // Ignore other events (Connected, Disconnected, etc.) - #[cfg(feature = "defmt")] - defmt::trace!("Ignoring MQTT event (not a message)"); - } - } - } - })); + } + #[cfg(not(feature = "std"))] + { + aimdb_core::DbError::RuntimeError { _message: () } + } + }; - // Subscribe to all topics from the router. The subscribe actions are - // queued in the action channel; the MQTT manager future (not yet - // driven) will drain them once `AimDbRunner::run()` polls it. + // Parse the broker URL (add a dummy topic if none, so parsing succeeds). + let mut url = broker_url.to_string(); + if !url.contains('/') || url.matches('/').count() < 3 { + url = format!("{}/dummy", url.trim_end_matches('/')); + } + let connector_url = ConnectorUrl::parse(&url).map_err(|_| build_err("Invalid MQTT URL"))?; + let host = connector_url.host.clone(); + let port = connector_url.port.unwrap_or(1883); + + let broker_ip = + Ipv4Addr::from_str(&host).map_err(|_| build_err("Invalid broker IP address"))?; + let octets = broker_ip.octets(); + let broker_addr = Ipv4Address::new(octets[0], octets[1], octets[2], octets[3]); + + // Store client_id in static memory for the 'static lifetime requirement. + static CLIENT_ID_STORAGE: OnceLock = OnceLock::new(); + let client_id_static: &'static str = CLIENT_ID_STORAGE.get_or_init(|| client_id.to_string()); + + // Static channels for MQTT communication. + static ACTION_CHANNEL: StaticCell> = + StaticCell::new(); + static EVENT_CHANNEL: StaticCell< + Channel, CHANNEL_SIZE>, + > = StaticCell::new(); + let action_channel = ACTION_CHANNEL.init(Channel::new()); + let event_channel = EVENT_CHANNEL.init(Channel::new()); + + let action_sender = action_channel.sender(); + let action_receiver = action_channel.receiver(); + let event_sender = event_channel.sender(); + let event_receiver = event_channel.receiver(); + + let connection_settings = ConnectionSettings::unauthenticated(client_id_static); + let settings = Settings::new(broker_addr, port); + let network = runtime.network_stack(); + + // Manager task: queue subscriptions, then run the broker loop (never returns). + let sub_sender = action_sender; + let manager_task = into_box_future(async move { for topic in &topics { - #[cfg(feature = "defmt")] - defmt::info!("Subscribing to MQTT topic: {}", &**topic); - - let subscribe_action = AimdbMqttAction::Subscribe { - topic: topic.to_string(), - qos: QualityOfService::Qos1, // Use QoS 1 for reliable delivery - }; - - action_sender.send(subscribe_action).await; + sub_sender + .send(AimdbMqttAction::Subscribe { + topic: topic.clone(), + qos: QualityOfService::Qos1, + }) + .await; } #[cfg(feature = "defmt")] - defmt::info!("Queued subscriptions for {} topics", topics.len()); + defmt::info!("MQTT background task starting"); + + #[allow(unreachable_code)] + { + let _: () = mqtt_manager::run::< + AimdbMqttAction, + AimdbMqttEvent, + MAX_PROPERTIES, + BUFFER_SIZE, + CHANNEL_SIZE, + >( + *network, + connection_settings, + settings, + event_sender, + action_receiver, + ) + .await; + } + }); - let _ = router_arc; // router_arc lives inside event_router_future via clone + Ok((action_sender, event_receiver, manager_task)) +} - Ok(( - action_sender, - alloc::vec![mqtt_task_future, event_router_future], - )) +/// Map a QoS level (0/1/2) to mountain-mqtt's `QualityOfService` (2 downgrades to 1). +fn map_qos(qos: u8) -> QualityOfService { + match qos { + 0 => QualityOfService::Qos0, + 1 => QualityOfService::Qos1, + 2 => QualityOfService::Qos1, // Downgrade to QoS 1 + _ => QualityOfService::Qos0, // Default to QoS 0 } +} - /// Collects outbound publisher futures for all configured routes (internal). - /// - /// Each future subscribes to a typed record, serializes values, and sends - /// them to the MQTT action channel. Returned futures are appended to the - /// `AimDbRunner` accumulator. - fn collect_outbound_futures( - action_sender: Sender<'static, NoopRawMutex, AimdbMqttAction, CHANNEL_SIZE>, - runtime_ctx: Arc, - routes: Vec, - ) -> Vec { - let mut futures: Vec = Vec::with_capacity(routes.len()); - - for (default_topic, consumer, serializer, config, topic_provider) in routes { - let default_topic_clone = default_topic.clone(); - let runtime_ctx = runtime_ctx.clone(); - - // Parse config options - let mut qos = QualityOfService::Qos1; // Default - let mut retain = false; - - for (key, value) in &config { - match key.as_str() { - "qos" => { - if let Ok(qos_val) = value.parse::() { - qos = Self::map_qos(qos_val); - } - } - "retain" => { - if let Ok(retain_val) = value.parse::() { - retain = retain_val; - } - } - _ => {} - } - } - - futures.push(Box::pin(SendFutureWrapper(async move { - // Subscribe to typed values (type-erased) - let mut reader = match consumer.subscribe_any().await { - Ok(r) => r, - Err(_e) => { - #[cfg(feature = "defmt")] - defmt::error!( - "Failed to subscribe for outbound topic '{}'", - default_topic_clone.as_str() - ); - return; - } - }; - - #[cfg(feature = "defmt")] - defmt::info!( - "MQTT outbound publisher started for topic: {}", - default_topic_clone.as_str() - ); - - loop { - let value_any = match reader.recv_any().await { - Ok(v) => v, - // Lagged (ring overflow) — skip the gap, keep publishing - // rather than letting a transient overrun kill the publisher. - Err(aimdb_core::DbError::BufferLagged { .. }) => { - #[cfg(feature = "defmt")] - defmt::warn!( - "MQTT outbound: consumer lagged for '{}'", - default_topic_clone.as_str() - ); - continue; - } - // Buffer closed — stop the publisher. - Err(_e) => break, - }; - // Determine topic: dynamic (from provider) or default (from URL) - let topic = topic_provider - .as_ref() - .and_then(|provider| provider.topic_any(&*value_any)) - .unwrap_or_else(|| default_topic_clone.clone()); - - // Serialize the type-erased value - let bytes = match &serializer { - aimdb_core::connector::SerializerKind::Raw(ser) => match ser(&*value_any) { - Ok(b) => b, - Err(_e) => { - #[cfg(feature = "defmt")] - defmt::error!("Failed to serialize for topic '{}'", topic.as_str()); - continue; - } - }, - aimdb_core::connector::SerializerKind::Context(ser) => { - match ser(runtime_ctx.clone(), &*value_any) { - Ok(b) => b, - Err(_e) => { - #[cfg(feature = "defmt")] - defmt::error!( - "Failed to serialize for topic '{}'", - topic.as_str() - ); - continue; - } - } - } - }; - - // Publish to MQTT via action channel - let action = AimdbMqttAction::Publish { - topic: topic.clone(), - payload: bytes, - qos, - retain, - }; - - action_sender.send(action).await; - - #[cfg(feature = "defmt")] - defmt::debug!("Published to MQTT topic: {}", topic.as_str()); - } - - #[cfg(feature = "defmt")] - defmt::info!( - "MQTT outbound publisher stopped for topic: {}", - default_topic_clone.as_str() - ); - }))); - } - - futures - } +/// Read a `u8` option from the per-route `protocol_options` (URL query). +fn opt_u8(config: &ConnectorConfig, key: &str) -> Option { + config + .protocol_options + .iter() + .find(|(k, _)| k == key) + .and_then(|(_, v)| v.parse::().ok()) } -impl MqttConnectorImpl { - /// Map QoS level to mountain-mqtt QualityOfService - fn map_qos(qos: u8) -> QualityOfService { - match qos { - 0 => QualityOfService::Qos0, - 1 => QualityOfService::Qos1, - 2 => QualityOfService::Qos1, // Downgrade to QoS 1 - _ => QualityOfService::Qos0, // Default to QoS 0 - } - } +/// Read a `bool` option from the per-route `protocol_options` (URL query). +fn opt_bool(config: &ConnectorConfig, key: &str) -> Option { + config + .protocol_options + .iter() + .find(|(k, _)| k == key) + .and_then(|(_, v)| v.parse::().ok()) } #[cfg(test)] @@ -686,21 +486,9 @@ mod tests { #[test] fn test_qos_mapping() { - assert!(matches!( - MqttConnectorImpl::map_qos(0), - QualityOfService::Qos0 - )); - assert!(matches!( - MqttConnectorImpl::map_qos(1), - QualityOfService::Qos1 - )); - assert!(matches!( - MqttConnectorImpl::map_qos(2), - QualityOfService::Qos1 - )); // Downgrades to QoS 1 - assert!(matches!( - MqttConnectorImpl::map_qos(99), - QualityOfService::Qos0 - )); // Defaults to QoS 0 + assert!(matches!(map_qos(0), QualityOfService::Qos0)); + assert!(matches!(map_qos(1), QualityOfService::Qos1)); + assert!(matches!(map_qos(2), QualityOfService::Qos1)); // Downgrades to QoS 1 + assert!(matches!(map_qos(99), QualityOfService::Qos0)); // Defaults to QoS 0 } } diff --git a/aimdb-serial-connector/CHANGELOG.md b/aimdb-serial-connector/CHANGELOG.md index f2b6244d..9e501b72 100644 --- a/aimdb-serial-connector/CHANGELOG.md +++ b/aimdb-serial-connector/CHANGELOG.md @@ -11,7 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - **New crate — the COBS-framed serial/UART transport for AimDB remote access (Issue #122, follow-up to #39).** The serial sibling of `aimdb-uds-connector`: it contributes only the `Dialer`/`Listener`/`Connection` triple plus thin sugar; the AimX codec + dispatch and the runtime-neutral session engines (`run_client`/`serve`) are reused from `aimdb-core`. The wire is the same compact AimX JSON, framed with **COBS** (Consistent Overhead Byte Stuffing) and a `0x00` sentinel instead of a newline — self-synchronizing on a lossy/unframed serial medium, so a receiver that joins mid-stream resynchronizes on the next sentinel. Default scheme `"serial"`. Two runtime halves: - **`tokio-runtime`** (std, host/gateway) — `TokioSerialConnection` over any `AsyncRead + AsyncWrite` (a real `tokio_serial::SerialStream` in production, a `tokio::io::duplex()` in tests), with `SerialClient::new(path, baud)` (sugar over `SessionClientConnector`) and `SerialServer` (sugar over `SessionServerConnector` + `AimxDispatch`). The listener is one-shot (serial is point-to-point). - - **`embassy-runtime`** (`no_std + alloc`, MCU) — `EmbassySerialConnection` generic over `embedded-io-async` `Read`/`Write` halves (the common `Uart::split()` shape), with `SerialClient`/`SerialServer` that hand-roll `ConnectorBuilder` (calling `run_client`/`pump_client`/`serve` directly) and force-`Send` the single-core Embassy futures via `aimdb-embassy-adapter`'s `SendFutureWrapper`. The Embassy *server* half rides the `no_std` `AimxDispatch` landed in #120, so an MCU can answer `record.list`/`get`/`set`/`subscribe`/`drain` over a UART; the *client* half mirrors records to a gateway. Reconnect is disabled by default on Embassy (the UART peripheral is moved in and can't be re-acquired). + - **`embassy-runtime`** (`no_std + alloc`, MCU) — thin sugar over the centralized Embassy session spine in `aimdb-embassy-adapter::connectors` (M17, [Design 033](../docs/design/033-M17-unify-connectors-drop-send.md)): this half contributes only the COBS `CobsFramer` (implementing the spine's `Framer`); the framed `EmbassyConnection` over `embedded-io-async` `Read`/`Write` halves (the common `BufferedUart::split()` shape), the one-shot dialer/listener/cell, and **all** the force-`Send` plumbing live in the adapter — this crate carries **no `unsafe`**. `SerialClient::new(rx, tx)` returns the spine's `EmbassySessionClient` (chain `.scheme(...)`/`.with_config(...)` on it); `SerialServer` stores the moved-in framed connection in the spine's `OneShotCell` and drives `serve`. The Embassy *server* half rides the `no_std` `AimxDispatch` landed in #120, so an MCU can answer `record.list`/`get`/`set`/`subscribe`/`drain` over a UART; the *client* half mirrors records to a gateway. Reconnect is disabled by default on Embassy (the spine's default — the UART peripheral is moved in and can't be re-acquired). - **`framing` module** — the shared COBS frame codec (`encode_frame` + a chunk-tolerant `FrameAccumulator`), pure `no_std + alloc`, so the round-trip is unit-tested independent of any transport. - **Examples — a real end-to-end serial test.** `examples/serial_demo.rs` (host, `--features _test-tokio`): an AimX client/server over a device path (a board's ST-LINK VCP at `/dev/ttyACM0`, or a `socat` PTY pair). `examples/embassy-serial-connector-demo/` (board): an STM32H563ZI Nucleo serving the `counter` record over USART3 ↔ the ST-LINK Virtual COM Port — the no_std `SerialServer` + `AimxDispatch` on real silicon, flashed via `probe-rs`, queried from the host over the wire. @@ -39,6 +39,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 round-trip MCU↔host. - **`embedded-io-async` is pinned to 0.7** (the workspace dep was bumped 0.6 → 0.7) so a HAL `BufferedUart` (e.g. `embassy-stm32`, which uses 0.7) satisfies the connector's `Read`/`Write` bounds without a trait-version skew. Wire the Embassy half from a `BufferedUart::split()` — the plain async `UartRx` does **not** implement `embedded-io-async::Read`, only the buffered/ring-buffered variants do. -- The `unsafe impl Send`/`Sync` on the Embassy transport + builder types rest on the single-core, cooperative Embassy-executor invariant documented by `SendFutureWrapper` (no preemption / thread migration). This is the first *raw-peripheral* session connector — MQTT/KNX sidestep it by pulling a `Send + Sync` `embassy_net::Stack` from the runtime adapter rather than owning a peripheral. +- This is the first *raw-peripheral* session connector — MQTT/KNX sidestep peripheral ownership by pulling a `Send + Sync` `embassy_net::Stack` from the runtime adapter. The single-core force-`Send` bridge a moved-in peripheral requires lives **once, audited** in `aimdb-embassy-adapter::connectors` (M17); this crate has no `unsafe` and no `SendFutureWrapper` use. - The `serial://` scheme constant lands here; the host-side `--connect serial:///dev/ttyUSB0?baud=115200` resolver that maps that URL to a `SerialDialer` landed in `aimdb-client`'s `endpoint` module (Issue #123), wired into `aimdb-cli`/`aimdb-mcp` behind their `transport-serial` feature. - The internal `_test-tokio` feature gates the host tokio integration test's adapter dependency; it is kept off the public `tokio-runtime` feature (a connector shouldn't pull a concrete adapter) and out of `[dev-dependencies]` (an unconditional tokio adapter would force `aimdb-core/std` into the `no_std` embassy test build). diff --git a/aimdb-serial-connector/Cargo.toml b/aimdb-serial-connector/Cargo.toml index 4b5f2f2a..4fb5ee84 100644 --- a/aimdb-serial-connector/Cargo.toml +++ b/aimdb-serial-connector/Cargo.toml @@ -31,14 +31,15 @@ tokio-runtime = [ ] # Embedded half: `no_std + alloc`, generic over `embedded-io-async` UART halves. -# Hand-rolls the `ConnectorBuilder` and force-`Send`s the engine futures via -# `aimdb-embassy-adapter`'s `SendFutureWrapper`. +# Thin sugar over the centralized Embassy session spine in `aimdb-embassy-adapter` +# (`connector-io`), which owns the single-core `unsafe` + force-`Send`; this crate +# contributes only the COBS `Framer`. embassy-runtime = [ "aimdb-core/alloc", "aimdb-core/connector-session", "aimdb-core/remote-access", "dep:aimdb-embassy-adapter", - # `SendFutureWrapper` (force-`Send`) is always available on the no_std adapter; + "aimdb-embassy-adapter/connector-io", # `embassy-time`/`-sync` give the `EmbassyAdapter` clock the smoke test runs on. "aimdb-embassy-adapter/embassy-time", "aimdb-embassy-adapter/embassy-sync", diff --git a/aimdb-serial-connector/src/embassy_transport.rs b/aimdb-serial-connector/src/embassy_transport.rs index a4b3866a..99fea11e 100644 --- a/aimdb-serial-connector/src/embassy_transport.rs +++ b/aimdb-serial-connector/src/embassy_transport.rs @@ -1,27 +1,17 @@ -//! Embassy serial transport (feature `embassy-runtime`, `no_std + alloc`) — a -//! [`Connection`] over an [`embedded_io_async`] UART with COBS framing, plus -//! [`SerialClient`]/[`SerialServer`] sugar. +//! Embassy serial transport (feature `embassy-runtime`, `no_std + alloc`) — thin +//! sugar over the centralized Embassy session spine in `aimdb-embassy-adapter`. +//! +//! This half contributes **only** the COBS [`Framer`] plus thin sugar; the framed +//! [`Connection`](aimdb_core::session::Connection), the one-shot +//! dialer/listener/cell, and the force-`Send` plumbing all live in +//! [`aimdb_embassy_adapter::connectors`]. So this module carries **no `unsafe`** +//! (down from the seven `unsafe impl`s this half used to hand-roll) — the Embassy +//! half is now structurally a sibling of the [Tokio half](crate::tokio_transport), +//! both thin sugar over a shared spine. //! //! Generic over the `embedded-io-async` `Read`/`Write` halves (the common Embassy //! HAL shape, e.g. `Uart::split()`), so it works with any chip's async UART. -//! -//! # Why this half hand-rolls `ConnectorBuilder` -//! -//! The generic [`SessionClientConnector`](aimdb_core::session::SessionClientConnector) -//! / [`SessionServerConnector`](aimdb_core::session::SessionServerConnector) demand -//! `Clone + Send + Sync` on the dialer / `Send + Sync` on the listener+dispatch -//! factories — bounds a moved-in UART peripheral can't meet. The underlying -//! engines need only the bare traits ([`run_client`]``, [`serve`]``), so we call them directly and force-`Send` the (single-core, -//! cooperative) Embassy futures with `aimdb-embassy-adapter`'s -//! [`SendFutureWrapper`] — the same pattern the MQTT/KNX Embassy connectors use. -//! -//! The `unsafe impl Send`/`Sync` on the transport + builder types rest on the same -//! invariant `SendFutureWrapper` documents: an Embassy executor runs cooperatively -//! on a single core with no preemption or thread migration, so these values are -//! never actually accessed from another thread. -use core::cell::RefCell; use core::future::Future; use core::pin::Pin; @@ -33,17 +23,15 @@ use alloc::vec::Vec; use embedded_io_async::{Read, Write}; -use aimdb_embassy_adapter::SendFutureWrapper; +use aimdb_embassy_adapter::connectors::{ + EmbassyConnection, EmbassySessionClient, Framer, OneShotCell, OneShotDialer, OneShotListener, +}; use aimdb_core::connector::ConnectorBuilder; use aimdb_core::remote::{AimxConfig, SecurityPolicy}; use aimdb_core::session::aimx::{AimxCodec, AimxDispatch}; -use aimdb_core::session::{ - pump_client, run_client, serve, BoxFut, ClientConfig, Connection, Dialer, Dispatch, Listener, - PeerInfo, SessionConfig, SessionLimits, TransportError, TransportResult, -}; -use aimdb_core::{AimDb, DbError, DbResult, RuntimeAdapter}; -use aimdb_executor::TimeOps; +use aimdb_core::session::{serve, Dispatch, SessionConfig, SessionLimits}; +use aimdb_core::{AimDb, DbResult, RuntimeAdapter}; use crate::framing::{encode_frame, FrameAccumulator}; use crate::DEFAULT_SCHEME; @@ -51,274 +39,132 @@ use crate::DEFAULT_SCHEME; type BoxFuture = Pin + Send + 'static>>; type BuildFuture<'a> = Pin>> + Send + 'a>>; -/// How many bytes a single UART `read()` pulls before re-checking for a frame. +/// How many bytes a single UART `read` pulls before re-checking for a frame. const READ_CHUNK: usize = 64; - -/// Max bytes per `write()` call. Some HAL `BufferedUart::write` is atomic-or-error -/// (e.g. `embassy-stm32` returns `BufferTooLong` for a single write larger than its -/// TX ring), so a frame bigger than the buffer must be split. Chunking at this size -/// sends a frame of any length as long as the TX buffer is at least this big. +/// Max bytes per `write` call. Some HAL `BufferedUart::write` is atomic-or-error +/// (e.g. `embassy-stm32` returns `BufferTooLong` for a single write larger than +/// its TX ring), so a frame bigger than the buffer must be split. const WRITE_CHUNK: usize = 64; +/// The framed connection type a serial peripheral produces: COBS over the UART. +type SerialConnection = EmbassyConnection; + // =========================================================================== -// Connection +// COBS framer — the only serial-specific transport bit. // =========================================================================== -/// A framed bidirectional pipe over an `embedded-io-async` UART. Framing lives in -/// the transport: [`recv`](Connection::recv) returns one COBS frame (sentinel -/// stripped); [`send`](Connection::send) COBS-encodes and appends the sentinel. -pub struct EmbassySerialConnection { - rx: Rd, - tx: Wr, +/// COBS framing for the Embassy [`EmbassyConnection`]: `encode` COBS-encodes a +/// frame and appends the `0x00` sentinel; the accumulator yields one frame per +/// sentinel (a malformed run is skipped — COBS is self-synchronizing). +pub struct CobsFramer { acc: FrameAccumulator, - peer: PeerInfo, } -// SAFETY: Embassy executors run cooperatively on a single core with no preemption -// or thread migration, so the wrapped UART halves are never accessed across -// threads. Only construct this where it is driven by an Embassy executor. Same -// invariant as `aimdb_embassy_adapter::SendFutureWrapper`. -unsafe impl Send for EmbassySerialConnection {} - -impl EmbassySerialConnection { - /// Wrap the split read/write halves of an async UART. - pub fn new(rx: Rd, tx: Wr) -> Self { +impl CobsFramer { + /// A fresh COBS framer. + pub fn new() -> Self { Self { - rx, - tx, acc: FrameAccumulator::new(), - peer: PeerInfo::default(), } } } -impl Connection for EmbassySerialConnection -where - Rd: Read, - Wr: Write, -{ - fn recv(&mut self) -> BoxFut<'_, TransportResult>>> { - // `SendFutureWrapper` force-`Send`s the (single-core) UART read future to - // satisfy the `Send` `BoxFut` return type. - Box::pin(SendFutureWrapper(async move { - loop { - // COBS is self-synchronizing: a chunk that fails to decode is line - // noise or a mid-stream join, not a fatal transport error. The - // accumulator has already consumed it, so skip it and resync on the - // next sentinel rather than tearing down the session. - match self.acc.next_frame() { - Some(Ok(frame)) => return Ok(Some(frame)), - Some(Err(_)) => continue, - None => {} - } - let mut chunk = [0u8; READ_CHUNK]; - match self.rx.read(&mut chunk).await { - Ok(0) => return Ok(None), // EOF — peer closed - Ok(n) => self.acc.push_bytes(&chunk[..n]), - Err(_) => return Err(TransportError::Io), - } - } - })) - } - - fn send<'a>(&'a mut self, frame: &'a [u8]) -> BoxFut<'a, TransportResult<()>> { - Box::pin(SendFutureWrapper(async move { - let mut out = Vec::new(); - encode_frame(frame, &mut out); - // Write in ring-sized chunks: a HAL `BufferedUart` rejects a single - // write larger than its TX buffer, so a frame bigger than the buffer - // (e.g. a `record.list` reply) must be split. - for chunk in out.chunks(WRITE_CHUNK) { - self.tx - .write_all(chunk) - .await - .map_err(|_| TransportError::Closed)?; - } - self.tx.flush().await.map_err(|_| TransportError::Closed) - })) - } - - fn peer(&self) -> &PeerInfo { - &self.peer +impl Default for CobsFramer { + fn default() -> Self { + Self::new() } } -// =========================================================================== -// Dialer / Listener -// =========================================================================== - -/// The initiating (client) side. Holds the UART halves and hands them to the one -/// connection it ever opens (the peripheral is moved in, so it can't redial — -/// pair with `ClientConfig { reconnect: false, .. }`, the [`SerialClient`] default). -pub struct SerialDialer { - halves: RefCell>, -} - -// SAFETY: single-core cooperative Embassy executor — see the connection above. -unsafe impl Send for SerialDialer {} - -impl SerialDialer { - /// Build a one-shot dialer over the split UART halves. - pub fn new(rx: Rd, tx: Wr) -> Self { - Self { - halves: RefCell::new(Some((rx, tx))), - } +impl Framer for CobsFramer { + fn encode(&self, frame: &[u8], out: &mut Vec) { + encode_frame(frame, out); } -} - -impl Dialer for SerialDialer -where - Rd: Read + 'static, - Wr: Write + 'static, -{ - fn connect(&self) -> BoxFut<'_, TransportResult>> { - Box::pin(SendFutureWrapper(async move { - let (rx, tx) = self.halves.borrow_mut().take().ok_or(TransportError::Io)?; - Ok(Box::new(EmbassySerialConnection::new(rx, tx)) as Box) - })) - } -} - -/// The accepting (server) side. Serial is point-to-point: the first -/// [`accept`](Listener::accept) hands out the connection; later calls park forever. -pub struct SerialListener { - halves: Option<(Rd, Wr)>, -} - -// SAFETY: single-core cooperative Embassy executor — see the connection above. -unsafe impl Send for SerialListener {} -impl SerialListener { - /// Wrap the split UART halves as a one-shot listener. - pub fn new(rx: Rd, tx: Wr) -> Self { - Self { - halves: Some((rx, tx)), - } + fn push_bytes(&mut self, bytes: &[u8]) { + self.acc.push_bytes(bytes); } -} -impl Listener for SerialListener -where - Rd: Read + 'static, - Wr: Write + 'static, -{ - fn accept(&mut self) -> BoxFut<'_, TransportResult>> { - Box::pin(SendFutureWrapper(async move { - match self.halves.take() { - Some((rx, tx)) => { - Ok(Box::new(EmbassySerialConnection::new(rx, tx)) as Box) - } - // Point-to-point: no second peer ever arrives. - None => core::future::pending().await, - } - })) + fn next_frame(&mut self) -> Option, ()>> { + // The accumulator's `FrameError` collapses to `()`: the connection only + // distinguishes "got a frame" from "skip and resync". + self.acc.next_frame().map(|r| r.map_err(|_| ())) } } // =========================================================================== -// Client sugar (hand-rolled ConnectorBuilder) +// Client sugar — one-shot dial over the moved-in UART. // =========================================================================== -/// Mirrors records to/from an AimX peer over a serial UART. Register it via -/// `with_connector`; declare the routes with `link_to`/`link_from` on the -/// `serial://` scheme (or override with [`scheme`](Self::scheme)). -pub struct SerialClient { - halves: RefCell>, - config: ClientConfig, - scheme: String, -} - -// SAFETY: single-core cooperative Embassy executor — see the connection above. -// `ConnectorBuilder: Send + Sync`, so the builder must assert both. -unsafe impl Send for SerialClient {} -unsafe impl Sync for SerialClient {} - -impl SerialClient { - /// Build a client over the split UART halves (e.g. from `Uart::split()`). - /// - /// Reconnect is disabled by default: the peripheral is moved in and can't be - /// re-acquired after a drop. Override with [`with_config`](Self::with_config). - pub fn new(rx: Rd, tx: Wr) -> Self { - let config = ClientConfig { - reconnect: false, - ..ClientConfig::default() - }; - Self { - halves: RefCell::new(Some((rx, tx))), - config, - scheme: DEFAULT_SCHEME.to_string(), - } - } - - /// Override the scheme this connector registers. - pub fn scheme(mut self, scheme: impl Into) -> Self { - self.scheme = scheme.into(); - self - } - - /// Override the client engine config (keepalive, offline queue, …). Note that - /// re-enabling `reconnect` cannot re-open the moved-in UART. - pub fn with_config(mut self, config: ClientConfig) -> Self { - self.config = config; - self - } -} - -impl ConnectorBuilder for SerialClient -where - R: TimeOps + 'static, - Rd: Read + 'static, - Wr: Write + 'static, -{ - fn build<'a>(&'a self, db: &'a AimDb) -> BuildFuture<'a> { - Box::pin(SendFutureWrapper(async move { - let (rx, tx) = self - .halves - .borrow_mut() - .take() - .ok_or_else(connector_consumed)?; - let dialer = SerialDialer::new(rx, tx); - let (handle, engine) = - run_client(dialer, AimxCodec, self.config.clone(), db.runtime_arc()); - // One pump future per route; each holds a `ClientHandle` clone, so the - // engine stays alive as long as any mirror runs. - let mut futures = pump_client(db, &self.scheme, &handle); - futures.push(engine); - Ok(futures) - })) - } - - fn scheme(&self) -> &str { - &self.scheme +/// Constructs an [`EmbassySessionClient`] that mirrors records to/from an AimX +/// peer over a serial UART. `SerialClient::new(rx, tx)` is sugar; chain +/// `.scheme(...)` / `.with_config(...)` on the returned connector and register it +/// with `with_connector`. +/// +/// Reconnect is disabled by default: the peripheral is moved in and can't be +/// re-acquired after a drop. +pub struct SerialClient; + +impl SerialClient { + /// Mirror records to/from the AimX peer over the split UART halves (e.g. from + /// `Uart::split()`). Scheme defaults to [`DEFAULT_SCHEME`]. + // Sugar constructor: intentionally returns the spine connector, not `Self`. + #[allow(clippy::new_ret_no_self)] + pub fn new( + rx: Rd, + tx: Wr, + ) -> EmbassySessionClient>, AimxCodec> + where + Rd: Read + 'static, + Wr: Write + 'static, + { + let conn = EmbassyConnection::new(rx, tx, CobsFramer::new()); + // Reconnect stays disabled (the spine's default): the UART peripheral is + // moved in and can't be re-acquired. + EmbassySessionClient::new(OneShotDialer::new(conn), AimxCodec).scheme(DEFAULT_SCHEME) } } // =========================================================================== -// Server sugar (hand-rolled ConnectorBuilder) +// Server sugar — serve the full AimX toolset over the moved-in UART. // =========================================================================== /// Serves the full AimX toolset over a serial UART, so a host (or another board) /// can `record.list`/`get`/`set`/`subscribe`/`drain` this db over the wire. +/// Register it directly with `with_connector`: +/// +/// ```ignore +/// builder.with_connector( +/// SerialServer::new(rx, tx).security_policy(SecurityPolicy::read_only()), +/// ); +/// ``` +/// +/// Holds the moved-in framed UART connection (built up front from the halves) in +/// the adapter's force-`Send + Sync` [`OneShotCell`]; `build` takes it, hands it +/// to a [`OneShotListener`], and drives `serve`. Storing it in the cell (rather +/// than a bare `RefCell`) keeps **all** the `unsafe` in the adapter — this crate +/// has none. pub struct SerialServer { - halves: RefCell>, + conn: OneShotCell>, config: AimxConfig, scheme: String, } -// SAFETY: single-core cooperative Embassy executor — see the connection above. -unsafe impl Send for SerialServer {} -unsafe impl Sync for SerialServer {} - -impl SerialServer { +impl SerialServer +where + Rd: Read + 'static, + Wr: Write + 'static, +{ /// Serve AimX over the split UART halves, with the default read-only policy. pub fn new(rx: Rd, tx: Wr) -> Self { Self { - halves: RefCell::new(Some((rx, tx))), + conn: OneShotCell::new(EmbassyConnection::new(rx, tx, CobsFramer::new())), config: AimxConfig::uds_default(), scheme: DEFAULT_SCHEME.to_string(), } } +} +impl SerialServer { /// Use a prepared [`AimxConfig`] for the security policy / limits (the /// `socket_path` / `socket_permissions` fields are unused over serial). pub fn with_config(mut self, config: AimxConfig) -> Self { @@ -352,56 +198,40 @@ where Wr: Write + 'static, { fn build<'a>(&'a self, db: &'a AimDb) -> BuildFuture<'a> { - Box::pin(SendFutureWrapper(async move { - let (rx, tx) = self - .halves - .borrow_mut() - .take() - .ok_or_else(connector_consumed)?; - let listener = SerialListener::new(rx, tx); - + // Take the moved-in connection out of `&self` (build runs once); the + // canonical "already built" error lives on the adapter's cell. + let conn = self.conn.take_required(); + let config = self.config.clone(); + Box::pin(async move { + let conn = conn?; // Apply the security policy's writable marking so `record.list` reports // the `writable` flag (the dispatch also enforces it). - crate::apply_writable(db, &self.config); - + crate::apply_writable(db, &config); let session_config = SessionConfig { limits: SessionLimits { // A UART carries a single peer. max_connections: 1, - max_subs_per_connection: self.config.max_subs_per_connection, + max_subs_per_connection: config.max_subs_per_connection, }, reads_hello: false, // AimX's subscribe ack stays implicit (events flow); no ack frame. acks_subscribe: false, }; let dispatch: Arc = - Arc::new(AimxDispatch::new(Arc::new(db.clone()), self.config.clone())); + Arc::new(AimxDispatch::new(Arc::new(db.clone()), config)); + // `serve` is `Send` here: the one-shot listener + framed connection + // force-`Send` their futures inside the adapter. let fut: BoxFuture = Box::pin(serve( - listener, + OneShotListener::new(conn), Arc::new(AimxCodec), dispatch, session_config, )); Ok(vec![fut]) - })) + }) } fn scheme(&self) -> &str { &self.scheme } } - -// =========================================================================== -// Helpers -// =========================================================================== - -/// The builder's UART halves were already taken — `build` ran twice. The -/// framework calls it once, so this is unreachable in practice. -fn connector_consumed() -> DbError { - DbError::MissingConfiguration { - #[cfg(feature = "std")] - parameter: String::from("serial connector already built"), - #[cfg(not(feature = "std"))] - _parameter: (), - } -} diff --git a/aimdb-serial-connector/src/lib.rs b/aimdb-serial-connector/src/lib.rs index 0c422972..e83fadeb 100644 --- a/aimdb-serial-connector/src/lib.rs +++ b/aimdb-serial-connector/src/lib.rs @@ -19,9 +19,10 @@ //! / [`SessionServerConnector`](aimdb_core::session::SessionServerConnector). //! See [`tokio_transport`]. //! - **`embassy-runtime`** (`no_std + alloc`, MCU): generic over -//! [`embedded_io_async`] UART halves; hand-rolls the `ConnectorBuilder` and -//! force-`Send`s the engine futures via `aimdb-embassy-adapter`'s -//! `SendFutureWrapper`. See [`embassy_transport`]. +//! [`embedded_io_async`] UART halves; the COBS `Framer` plus thin sugar over the +//! centralized Embassy session spine in `aimdb-embassy-adapter`, which owns the +//! force-`Send` plumbing, the framed connection, and all the `unsafe` — this +//! crate carries none. See [`embassy_transport`]. //! //! Both speak the `serial://` scheme by default ([`DEFAULT_SCHEME`]). diff --git a/aimdb-serial-connector/tests/embassy_smoke.rs b/aimdb-serial-connector/tests/embassy_smoke.rs index 23e395cc..be18e28b 100644 --- a/aimdb-serial-connector/tests/embassy_smoke.rs +++ b/aimdb-serial-connector/tests/embassy_smoke.rs @@ -27,8 +27,9 @@ use embedded_io_async::{ErrorKind, ErrorType, Read, Write}; use aimdb_core::session::{ run_client, ClientConfig, CodecError, EnvelopeCodec, Inbound, Outbound, Payload, }; +use aimdb_embassy_adapter::connectors::{EmbassyConnection, OneShotDialer}; use aimdb_embassy_adapter::EmbassyAdapter; -use aimdb_serial_connector::embassy_transport::SerialDialer; +use aimdb_serial_connector::embassy_transport::CobsFramer; // Trivial host time driver so `embassy_time` links (the happy path never awaits // `clock.sleep`, so the driver is never actually exercised). @@ -140,7 +141,10 @@ fn embassy_clock_drives_client_engine_rpc_over_serial() { }; let uart = LoopbackUart::default(); - let dialer = SerialDialer::new(uart.clone(), uart); + // The one-shot dialer over the real COBS framed connection — the exact spine + // an MCU build uses (`OneShotDialer>`). + let conn = EmbassyConnection::<_, _, _>::new(uart.clone(), uart, CobsFramer::new()); + let dialer = OneShotDialer::new(conn); let (handle, engine_fut) = run_client(dialer, EchoCodec, config, clock); block_on(async move { diff --git a/deny.toml b/deny.toml index a60fa7dc..111663f3 100644 --- a/deny.toml +++ b/deny.toml @@ -26,6 +26,7 @@ yanked = "warn" # embedded stack; there is currently no drop-in replacement at this depth. ignore = [ "RUSTSEC-2026-0110", # bare-metal deprecated via cortex-m (embedded-only transitive) + "RUSTSEC-2026-0173", # TEMP: proc-macro-error2 unmaintained (transitive via defmt-macros/tabled_derive) ] [sources] diff --git a/docs/design/012-M5-connector-development-guide.md b/docs/design/012-M5-connector-development-guide.md index dcb11a80..841cfdfd 100644 --- a/docs/design/012-M5-connector-development-guide.md +++ b/docs/design/012-M5-connector-development-guide.md @@ -175,41 +175,43 @@ tokio = { workspace = true, optional = true } ## Embassy Implementation Pattern +Embassy's primitives are `!Send` (single-core, cooperative), but AimDB's connector +contract is `Send`-everywhere (so a Tokio app can `tokio::spawn(runner.run())`). **Do not +hand-roll the `unsafe`/force-`Send` bridge** — it lives, audited and once, in +`aimdb_embassy_adapter::connectors` (Design 033). A connector crate contributes only its +transport-specific logic and carries **no `unsafe`**. + **Dependencies:** ```toml [features] -embassy-runtime = ["aimdb-core/alloc", "embassy-net", "embassy-sync"] - -[dependencies] -embassy-net = { workspace = true, optional = true } -embassy-sync = { workspace = true, optional = true } -static-cell = "2.0" -``` - -**Key patterns:** -- Use `alloc` types: `alloc::sync::Arc`, `alloc::string::String` -- Wrap futures: `SendFutureWrapper(async move { ... })` -- Static allocation: `StaticCell` -- Logging: `defmt::info!()` (behind `#[cfg(feature = "defmt")]`) -- Network access: `R: EmbassyNetwork` trait bound -- Unsafe `Send + Sync` impls for single-threaded safety - -**SendFutureWrapper helper:** -```rust -struct SendFutureWrapper(F); -unsafe impl Send for SendFutureWrapper {} - -impl Future for SendFutureWrapper { - type Output = F::Output; - fn poll(self: Pin<&mut Self>, cx: &mut core::task::Context<'_>) - -> core::task::Poll - { - unsafe { self.map_unchecked_mut(|s| &mut s.0).poll(cx) } - } -} -``` - -**See:** `examples/embassy-mqtt-connector-demo/` for Embassy patterns +# Session transport (serial/TCP): needs the framed-connection spine. +embassy-runtime = ["aimdb-core/connector-session", "aimdb-embassy-adapter/connector-io", …] +# Data-plane transport (MQTT/KNX): needs the sink/source bridges + pumps. +embassy-runtime = ["aimdb-core/connector-session", "aimdb-embassy-adapter/connectors", …] +``` + +**Session transport** (a framed byte stream — serial, TCP): +- Implement `aimdb_embassy_adapter::connectors::Framer` (encode/accumulate/next-frame). +- Client sugar → `EmbassySessionClient::new(OneShotDialer::new(EmbassyConnection::new(rx, tx, MyFramer)), Codec)`. +- Server sugar → `EmbassySessionServer::new(OneShotListener::new(conn), Codec, dispatch_factory, cfg)`, + or a thin `ConnectorBuilder` that stores the moved-in connection in a `OneShotCell` and + drives `serve` (see `aimdb-serial-connector`). + +**Data-plane transport** (a pub/sub channel — MQTT, KNX): +- Implement `EmbassySinkRaw` (outbound publish) and/or `EmbassySourceRaw` (inbound next), + then ride core's pumps: + `pump_sink(db, scheme, Arc::new(EmbassySink(my_sink)))` / + `pump_source(db, scheme, EmbassySource(my_source))`. + (If your channels are already `Send` — e.g. `CriticalSectionRawMutex` — implement core's + `Connector`/`Source` directly and skip the bridges; see `aimdb-knx-connector`.) +- Force-`Send` the long-lived protocol task with `into_box_future(async move { … })`. + +**Other:** `alloc` types (`alloc::sync::Arc`, `alloc::string::String`), `StaticCell` for +channels, `defmt` logging behind `#[cfg(feature = "defmt")]`, `R: EmbassyNetwork` for the +network stack. + +**See:** `aimdb-serial-connector` (session), `aimdb-mqtt-connector` / `aimdb-knx-connector` +(data-plane), and `examples/embassy-mqtt-connector-demo/`. --- @@ -244,13 +246,14 @@ static CH: StaticCell> = StaticCell::new(); let ch = CH.init(Channel::new()); ``` -**Missing Send wrapper (Embassy):** +**Force-`Send` a protocol task (Embassy):** ```rust -// ❌ Not Send -Box::pin(async move { ... }) - -// ✅ Send-wrapped +// ❌ Don't hand-roll the unsafe wrapper in your connector crate Box::pin(SendFutureWrapper(async move { ... })) + +// ✅ Use the adapter spine's helper (the unsafe lives there, audited once) +use aimdb_embassy_adapter::connectors::into_box_future; +into_box_future(async move { ... }) ``` --- diff --git a/docs/design/033-M17-unify-connectors-drop-send.md b/docs/design/033-M17-unify-connectors-drop-send.md new file mode 100644 index 00000000..dd1a4ea8 --- /dev/null +++ b/docs/design/033-M17-unify-connectors-drop-send.md @@ -0,0 +1,490 @@ +# Unify Embassy Connectors — Centralized Adapter Spine (revised from "Drop `Send`") + +**Version:** 0.2 +**Status:** ✅ Implemented — **via the centralized-spine approach, not the `Send`-drop** (see Implementation Decision) +**Predecessor:** [Design 028 — Remove `Spawn` Trait](028-M13-remove-spawn-trait.md) (✅ Implemented) +**Motivating beneficiary:** [#121 — `aimdb-tcp-connector` (tokio + embassy-net)](https://github.com/aimdb-dev/aimdb/issues/121) +**Last Updated:** June 9, 2026 +**Milestone:** M17 — Architectural clean-up + +--- + +## Implementation Decision (supersedes the proposal below) + +The original proposal — *drop `Send` from the connector/future contract* — was **not** +implemented. Investigation found it forces the `!Send`-ness onto the std/Tokio side +(the proposal's own goal of "keep `Send` on data") and that its §5 mechanism is unsound: + +- `run_client`'s engine drives a `Box` (from a runtime-resolved + `Box`) across `.await`, so it becomes **unconditionally `!Send`** once + `Connection: Send` is dropped — the "auto-`Send` named engine" cannot recover it, and + the associated-`Conn`-type fix breaks `Box` object-safety. +- `run_session` is **inherently `!Send`** (its subscription-pump `FuturesUnordered`), + so the WebSocket server would lose axum's multi-thread per-connection parallelism, and + `tokio::spawn(runner.run())` would break across ~37 app/test sites. + +**Chosen instead (the "centralized Embassy wrapper" alternative this doc had rejected):** +keep the std contract exactly as-is and confine all the Embassy single-core `unsafe` + +`SendFutureWrapper` into **one audited place**, `aimdb-embassy-adapter::connectors`: + +- **Session transports** (serial, later TCP) contribute a `Framer` (or a `Connection`) + and wrap it in `EmbassySessionClient` / `EmbassySessionServer` (+ `OneShotDialer` / + `OneShotListener` / `OneShotCell` / the framed `EmbassyConnection`). +- **Data-plane transports** (MQTT, KNX) contribute an `EmbassySinkRaw` / `EmbassySourceRaw` + (or, where their channels are already `Send`, a plain `Connector`/`Source`) and ride + core's existing `pump_sink` / `pump_source` via the `EmbassySink` / `EmbassySource` + bridges; their protocol task is force-`Send`ed once via `into_box_future`. + +**Outcome:** the serial Embassy half dropped from a 407-line hand-roll (7 `unsafe impl`s) to +thin sugar with **zero `unsafe`**; the MQTT/KNX Embassy halves deleted their hand-rolled +pump loops and `SendFutureWrapper` use, riding core's pumps; **all** connector-crate +`unsafe`/`SendFutureWrapper` is gone (confined to the adapter). The std/Tokio side, +`aimdb-client`, the WS server, examples, tests, and `aimdb-pro` are **unchanged**. + +> The sections below are the original (rejected) proposal, kept for the motivation and +> architecture audit, which remain accurate. + +--- + +## Table of Contents + +- [Summary](#summary) +- [Motivation](#motivation) + - [Every Embassy connector hand-rolls what Tokio reuses](#every-embassy-connector-hand-rolls-what-tokio-reuses) + - [The cost scales with every new transport](#the-cost-scales-with-every-new-transport) +- [Current Architecture — Audit](#current-architecture--audit) + - [The runner is already single-task cooperative](#the-runner-is-already-single-task-cooperative) + - [Where `Send` propagates](#where-send-propagates) + - [Where `Send` is actually exercised](#where-send-is-actually-exercised) + - [The Embassy workaround today](#the-embassy-workaround-today) +- [Key Insight — `Send` is a boxing-boundary decision](#key-insight--send-is-a-boxing-boundary-decision) +- [Proposed Design](#proposed-design) + - [1. Drop `Send` from the future/transport contract](#1-drop-send-from-the-futuretransport-contract) + - [2. Keep `Send` on identity and data](#2-keep-send-on-identity-and-data) + - [3. The runner becomes `!Send`](#3-the-runner-becomes-send) + - [4. Unify the generic session connectors](#4-unify-the-generic-session-connectors) + - [5. Preserve direct-spawn for `aimdb-client`](#5-preserve-direct-spawn-for-aimdb-client) +- [Type-System Changes](#type-system-changes) +- [Platform-Specific Concerns](#platform-specific-concerns) +- [Affected Crates](#affected-crates) +- [Breaking Changes & Migration](#breaking-changes--migration) +- [Alternatives Considered](#alternatives-considered) +- [Implementation Plan](#implementation-plan) +- [Decisions](#decisions) +- [Out of Scope](#out-of-scope) + +--- + +## Summary + +The generic, transport-agnostic session connectors (`SessionClientConnector` / +`SessionServerConnector` in `aimdb-core/src/session/connector.rs`) let a Tokio +transport crate contribute just a `Dialer`/`Listener`/`Connection` triple and a +codec, and inherit all the engine wiring (reconnect, pumps, accept loop, +fan-out). The UDS and serial **Tokio** halves are one-line sugar over them. + +The **Embassy** half of every connector cannot use that spine. It re-implements +`ConnectorBuilder` by hand — calling the bare engines (`run_client`, `serve`, +`pump_client`) directly and force-`Send`ing every future with +`aimdb-embassy-adapter`'s `SendFutureWrapper` plus a sprinkle of +`unsafe impl Send`/`Sync` on the transport, dialer, listener, and builder types +(see `aimdb-serial-connector/src/embassy_transport.rs`). + +The root cause is a single, deliberate choice: **the connector contract is +`Send`-everywhere.** `ConnectorBuilder::build` returns +`Vec>>`, and `Connection`/`Listener`/`Dialer` and the +`BoxFut`/`BoxStream` aliases all carry `Send`. That bound was inherited from the +Tokio-shaped, multi-threaded execution model. Embassy is single-core +cooperative; its primitives and HAL peripherals are `!Send` *by design*. + +This design **drops `Send` from the future/transport contract** so that one +connector implementation per transport serves both runtimes — exactly as the +Tokio halves already do — while keeping `Send` on the *data and identity* types +(`RuntimeAdapter`, the type-erased `Arc` payloads, +`JoinQueue`). + +The change is sound because **AimDB already drives every future on a single +cooperative task** ([`AimDbRunner::run()`](../../aimdb-core/src/builder.rs) +collects everything into one `FuturesUnordered`). The `Send` bound is never +exercised by AimDB's own machinery — it is consumed only when an application +hands the *whole* aggregate task to a multi-threaded runtime via +`tokio::spawn(runner.run())`. That single ergonomic — `tokio::spawn` of the +runner — is the only thing we give up, and it has a one-line migration. + +--- + +## Motivation + +### Every Embassy connector hand-rolls what Tokio reuses + +Today the same transport ships two structurally different implementations: + +| Transport | Tokio half | Embassy half | +|---|---|---| +| UDS / serial | ~1-line sugar over `SessionClientConnector` / `SessionServerConnector` | full hand-rolled `ConnectorBuilder` + `SendFutureWrapper` + `unsafe impl Send/Sync` × 4 | +| MQTT / KNX | thin wrapper | hand-rolled pump loops (can't ride `pump_sink`/`pump_source`) | + +The Embassy `embassy_transport.rs` for serial even carries a dedicated +doc comment titled *"Why this half hand-rolls `ConnectorBuilder`"*. The hand-roll is +a mechanical translation of the Tokio sugar: + +- `D: Clone` (clone the dialer out of `&self`) → `RefCell>::take()` +- `Send + Sync` bounds → `unsafe impl Send/Sync` justified by the single-core + invariant +- `Send` futures → wrap every `recv`/`send`/engine future in `SendFutureWrapper` + +None of this is transport logic. It is per-crate boilerplate that re-asserts the +same single-core safety argument, each copy an independent place for that +`unsafe` to drift. + +### The cost scales with every new transport + +[#121 (`aimdb-tcp-connector`, tokio + embassy-net)](https://github.com/aimdb-dev/aimdb/issues/121) +is the next connector in the queue. Under the current design it will need a fully +hand-rolled Embassy half — a fourth copy of the `SendFutureWrapper` + `unsafe` +pattern — for what is, on Tokio, a few lines over the existing spine. Every +future transport (CoAP, BLE, CAN, …) pays the same tax. This design pays it down +once. + +This is also the **unfinished half of [Design 028](028-M13-remove-spawn-trait.md)**. +M13 removed `Spawn` and made `build()` *collect* futures into a `Vec` +driven by `run()`. It deliberately left the `Send` bound on that `BoxFuture` in +place. With `Spawn` gone, that residual `Send` is now the *only* thing forcing +Embassy connectors to diverge. + +--- + +## Current Architecture — Audit + +### The runner is already single-task cooperative + +[`AimDbRunner::run()`](../../aimdb-core/src/builder.rs) collects **every** future +the database needs — connectors, producer/consumer loops, transforms, the remote +supervisor, `on_start` tasks — into one `FuturesUnordered` and polls them on one +task: + +```rust +let mut set: FuturesUnordered = self.futures.into_iter().collect(); +while set.next().await.is_some() {} +``` + +Nothing in `aimdb-core` ever `tokio::spawn`s these futures individually. The +session engines themselves fan out per-connection and per-subscription with +*nested* `FuturesUnordered` + `select_biased!` +(`aimdb-core/src/session/server.rs`), never a spawn. So: + +> The entire connector / producer / consumer surface is **already** cooperative +> on a single task on every runtime. `Send` buys *relocation of that one +> aggregate task onto a worker thread* — it does **not** buy intra-database +> parallelism, because two connectors are never polled simultaneously. + +### Where `Send` propagates + +| Location | Bound | Role | +|---|---|---| +| `aimdb-core/src/builder.rs` | `type BoxFuture = Pin + Send + 'static>>` | the runner's collected future type | +| `aimdb-core/src/connector.rs` | `trait ConnectorBuilder: Send + Sync` | connector entry point | +| `aimdb-core/src/connector.rs` | `build(..) -> Pin>>>> + Send>>` | build returns a `Send` Vec of `Send` futures | +| `aimdb-core/src/session/mod.rs` | `type BoxFut<'a,T> = Pin>` | every transport/dispatch async return | +| `aimdb-core/src/session/mod.rs` | `type BoxStream<'a,T> = Pin>` | subscription stream | +| `aimdb-core/src/session/mod.rs` | `trait Connection: Send`, `Listener: Send`, `Dialer: Send`, `Source: Send` | Layer-1 transport | +| `aimdb-core/src/session/mod.rs` | `trait Dispatch: Send + Sync`, `Session: Send`, `EnvelopeCodec: Send + Sync` | Layer-3 dispatch | +| `aimdb-core/src/session/connector.rs` | `D: Dialer + Clone + Send + Sync`, `LF/DF: Fn(..) + Send + Sync` | generic connector bounds | + +### Where `Send` is actually exercised + +Across the whole workspace, the `Send` bound on the connector/runner contract is +*consumed* (i.e. a future is actually moved to another thread) in exactly one +shape — `tokio::spawn` of the runner or a directly-driven engine: + +| Site | What is spawned | +|---|---| +| `examples/remote-access-demo/src/server.rs` | `tokio::spawn(runner.run())` | +| `examples/weather-mesh-demo/weather-station-{alpha,beta}/src/main.rs` | `tokio::spawn(runner.run())` | +| `aimdb-websocket-connector/examples/ws_{client,server}.rs` | `tokio::spawn(runner.run())` | +| `aimdb-client/src/engine.rs` | `tokio::spawn(engine_fut)` (the `run_client` engine) | + +Every other Tokio example already drives the runner with `builder.run().await` +(`examples/tokio-{mqtt,knx}-connector-demo`), which needs **no** `Send` — +`block_on` never requires it. + +### The Embassy workaround today + +`aimdb-embassy-adapter::SendFutureWrapper` is a `struct SendFutureWrapper(F)` +with `unsafe impl Send for SendFutureWrapper {}`, justified by the +single-core, no-preemption, no-thread-migration Embassy invariant. Each Embassy +connector wraps every `recv`/`send`/`connect`/`accept`/engine future in it and +adds `unsafe impl Send`/`Sync` to the connection, dialer, listener, and builder +structs. This is the entire delta between the Tokio and Embassy halves. + +--- + +## Key Insight — `Send` is a boxing-boundary decision + +A future is `Send` iff every value held across an `.await` is `Send`. On Tokio, +all transport values are `Send`, so the **concrete** engine future *is* `Send`. +It only loses that property when it is **boxed into a `dyn Future` whose type +does not advertise `Send`**. + +Therefore `Send` is not a property we must thread through the engines — it is a +single decision made at each `dyn`-boxing boundary: + +1. **`ConnectorBuilder::build` → the runner.** The returned `Vec` is a + `dyn` box. This is the *one* uniform boundary every connector funnels + through, so its `Send`-ness is what unifies (or splits) the connectors. This + box **must drop `Send`** to admit Embassy. +2. **Direct engine consumers** (`aimdb-client`). These don't go through the + runner; they hold the concrete engine future. If we keep that future's type + `Send`-transparent (generic / un-pre-boxed), std callers keep `tokio::spawn`. + +Once boundary (1) drops `Send`, the runner is `!Send` and nothing downstream +needs `Send` — a `FuturesUnordered` polls `F: !Send` perfectly well on one +thread. + +--- + +## Proposed Design + +### 1. Drop `Send` from the future/transport contract + +Remove `+ Send` / `Send` supertrait from: + +- `BoxFuture` (`builder.rs`) and the `ConnectorBuilder::build` return type + (both the outer build future and the inner `Vec` element). +- `ConnectorBuilder` supertrait (`: Send + Sync` → no auto-trait bound). +- `BoxFut`, `BoxStream` (`session/mod.rs`). +- `Connection`, `Listener`, `Dialer`, `Source`, `Session` (drop `: Send`). +- `Dispatch`, `EnvelopeCodec` (drop `: Send + Sync`). + +### 2. Keep `Send` on identity and data + +Explicitly **unchanged**, because they describe data at rest, not futures in +flight, and removing them would ripple needlessly: + +- `RuntimeAdapter: Send + Sync`, `TimeOps` associated `Instant`/`Duration: Send + Sync`. +- The type-erased payloads: `Arc` (runtime ctx, + `PeerInfo::ext`, `SessionCtx::ext`), `Payload = Arc<[u8]>`. +- `JoinQueue` and the transform fan-in channels. + +A `!Send` future may freely hold `Send` data; these two facts do not conflict. + +### 3. The runner becomes `!Send` + +`AimDbRunner` and its `run()` future are now `!Send`. Applications drive it one +of three ways (all already valid on a multi-threaded Tokio runtime): + +```rust +// (a) Block on it — simplest, multi-thread runtime is fine (block_on ≠ Send). +runner.run().await; + +// (b) Concurrent with other main-task work, single-threaded scope: +let local = tokio::task::LocalSet::new(); +local.run_until(async { tokio::task::spawn_local(runner.run()); /* … */ }).await; + +// (c) Dedicated current-thread runtime on its own std thread. +std::thread::spawn(|| { + tokio::runtime::Builder::new_current_thread().enable_all().build() + .unwrap().block_on(runner.run()); +}); +``` + +### 4. Unify the generic session connectors + +With the bounds relaxed, `SessionClientConnector` / `SessionServerConnector` +host an Embassy peripheral directly. Two mechanical adjustments replace what the +hand-roll did manually: + +- **Client:** store the dialer behind interior mutability and move it out once in + `build`, dropping the `D: Clone` requirement (the engine already owns the + dialer for the connection's life and re-dials via `&self`): + + ```rust + pub struct SessionClientConnector { + scheme: String, + dialer: RefCell>, // moved out once in build() + codec: C, + config: ClientConfig, + } + + impl ConnectorBuilder for SessionClientConnector + where R: TimeOps + 'static, D: Dialer + 'static, C: EnvelopeCodec + Clone + 'static + { /* build(): take dialer, run_client(..), collect pumps + engine */ } + ``` + +- **Server:** keep the listener/dispatch factories, drop their `Send + Sync` + bound. A once-only listener (a moved-in UART) is captured the same way the + hand-roll does — `RefCell>` inside the factory closure. + +Embassy connectors then collapse to *only* the `Dialer`/`Listener`/`Connection` +triple plus the one-line sugar — identical in shape to the Tokio halves. +`SendFutureWrapper` and every `unsafe impl Send/Sync` in the connector crates are +deleted. + +### 5. Preserve direct-spawn for `aimdb-client` + +`aimdb-client` calls `run_client` directly and `tokio::spawn`s the returned +engine future (`engine.rs`). `run_client` currently returns +`(ClientHandle, BoxFut<'static, ()>)` — a pre-`Send`-boxed future. If we simply +drop `Send` from `BoxFut`, that `tokio::spawn` stops compiling. + +To keep std direct-spawn working *without* reintroducing `unsafe`, give the +engine a **named, generic future type** so its `Send`-ness is inferred from the +inputs rather than erased by the box: + +```rust +pub fn run_client(dialer: D, codec: C, config: ClientConfig, clock: Arc) + -> (ClientHandle, ClientEngine) // was: (ClientHandle, BoxFut<'static, ()>) +where D: Dialer + 'static, C: EnvelopeCodec + 'static, R: TimeOps + 'static; + +// ClientEngine: Future; auto-Send iff D, C, R are Send. +``` + +- **std** (`aimdb-client`, Tokio transports): `ClientEngine<…>` is `Send`, + so `tokio::spawn(engine)` keeps compiling — no migration. +- **connector spine** (`SessionClientConnector::build`): boxes `ClientEngine` + into the runner's now-`!Send` `Vec` — the box drops the `Send` + capability the single-task runner never needed. +- **Embassy**: `ClientEngine` is `!Send`, driven on the cooperative runner. No + wrapper. + +`serve` is already `pub async fn serve(..)` (returns an anonymous `impl Future`), +so its `Send`-ness is inferred at each call site for free — no newtype needed. + +--- + +## Type-System Changes + +| Item | Before | After | +|---|---|---| +| `BoxFuture` (builder) | `… + Send + 'static` | `… + 'static` | +| `ConnectorBuilder` | `: Send + Sync` | (no auto-trait supertrait) | +| `build()` return | `Vec>`, outer `+ Send` | drop both `Send` | +| `BoxFut` / `BoxStream` | `… + Send + 'a` | `… + 'a` | +| `Connection`/`Listener`/`Dialer`/`Source`/`Session` | `: Send` | (none) | +| `Dispatch`/`EnvelopeCodec` | `: Send + Sync` | (none) | +| `SessionClientConnector` `D` | `Dialer + Clone + Send + Sync` | `Dialer` (+ interior-mutability move) | +| `SessionServerConnector` `LF`/`DF` | `Fn(..) + Send + Sync` | `Fn(..)` | +| `run_client` engine return | `BoxFut<'static,()>` | named `ClientEngine` | +| `RuntimeAdapter`, `Arc`, `JoinQueue` | `Send`/`Sync` | **unchanged** | + +The dyn-object-safety assertions in `session/mod.rs` (`_assert_object_safe`) +stay valid — removing an auto-trait supertrait only widens the set of impls. + +--- + +## Platform-Specific Concerns + +**Tokio (std).** The runner future becomes `!Send`. Apps switch +`tokio::spawn(runner.run())` → `runner.run().await` or a `LocalSet`. Connectors +that internally rely on Tokio's own machinery (e.g. the WS connector's +`axum::serve`, which `tokio::spawn`s its own per-connection tasks) are +unaffected: those are the connector's own `Send` tasks on the ambient runtime, +independent of AimDB's `BoxFuture` type. `aimdb-client` keeps `tokio::spawn` via +the named-engine return (§5). + +**Embassy (no_std + alloc).** The target outcome. `SendFutureWrapper` and all +`unsafe impl Send/Sync` in the connector crates are removed. Embassy connectors +reuse `SessionClientConnector` / `SessionServerConnector` and the bare engines +directly. The single-core cooperative invariant that previously *justified* the +`unsafe` now simply *is* the model the type system expresses. + +**WASM (wasm32).** Already drives `!Send` futures via `spawn_local` +(`aimdb-wasm-adapter`). This change aligns the connector contract with the model +WASM already uses; the WASM adapter's existing `unsafe impl Send/Sync` on its +adapter becomes reviewable for removal as a follow-up (out of scope here). + +--- + +## Affected Crates + +| Crate | Impact | +|---|---| +| `aimdb-core` | bound removals; `SessionClientConnector` interior-mutability; `run_client` named return | +| `aimdb-embassy-adapter` | `SendFutureWrapper` deprecated/removed (kept one release as deprecated shim) | +| `aimdb-serial-connector` | delete `embassy_transport.rs` hand-roll; Embassy half = sugar over the spine | +| `aimdb-uds-connector` | (no Embassy half yet) gains one for free | +| `aimdb-mqtt-connector`, `aimdb-knx-connector` | data-plane Embassy pumps can ride `pump_sink`/`pump_source` once those drop `Send` (follow-up; see Out of Scope) | +| `aimdb-client` | **no source change** if §5 lands; otherwise migrate spawn → `LocalSet` | +| `aimdb-websocket-connector` | unaffected (rides `run_session`; axum owns its spawns) | +| examples (`remote-access-demo`, `weather-mesh-demo`, ws examples) | `tokio::spawn(runner.run())` → `runner.run().await` / `LocalSet` | + +--- + +## Breaking Changes & Migration + +1. **`tokio::spawn(runner.run())` no longer compiles.** Migrate to + `runner.run().await` (most apps) or `LocalSet::spawn_local` (concurrent + case). One-line change; documented in the runner rustdoc and `CHANGELOG`. +2. **`run_client` return type** changes from `BoxFut` to `ClientEngine<…>`. The + future is still driven the same way; only the type name changes. Internal to + the session layer + `aimdb-client`. +3. **Custom connectors** that named `Box` explicitly in their + `ConnectorBuilder::build` must drop the `+ Send`. The `BoxFuture` alias path + is unaffected. + +No wire-format, record-API, or `link_to`/`link_from` changes. This is purely a +type-system / execution-model change. + +--- + +## Alternatives Considered + +- **Keep `Send`; factor the Embassy wrapper once.** Provide + `EmbassySessionClient`/`Server` in `aimdb-embassy-adapter` that encapsulate + `SendFutureWrapper` + `unsafe impl` once. *Rejected:* still two code paths and + retains `unsafe`; doesn't unify, only de-duplicates the workaround. +- **Conditional `Send` via a feature flag** (two `BoxFuture` aliases gated by + `send-futures`). *Rejected:* combinatorial trait surface, `cfg`-split bounds + everywhere, and connectors compiled for the "wrong" flag silently fail to + compose. Worse than the status quo. +- **`?Send` / maybe-`Send` generics.** Not expressible on stable Rust. +- **De-`Send` the entire executor stack** (`RuntimeAdapter`, channels, payloads). + *Rejected:* unnecessary — a `!Send` future may hold `Send` data; this would be + a far larger, riskier blast radius for no additional benefit. + +--- + +## Implementation Plan + +1. Land the `run_client` named-engine return (`ClientEngine`) with the + `Send`-transparent type; verify `aimdb-client` + CLI + MCP still build and + `tokio::spawn`. +2. Drop `Send` from `BoxFut`/`BoxStream` and the `Connection`/`Listener`/ + `Dialer`/`Dispatch`/`Session`/`EnvelopeCodec` traits in `session/mod.rs`. +3. Drop `Send` from `BoxFuture` + `ConnectorBuilder` in `builder.rs`/ + `connector.rs`; fix the runner. +4. Relax `SessionClientConnector`/`SessionServerConnector` bounds; switch the + client to interior-mutability dialer move. +5. Migrate the `tokio::spawn(runner.run())` examples to `.await` / `LocalSet`. +6. Replace the serial Embassy `embassy_transport.rs` hand-roll with sugar over + the spine; delete its `unsafe impl`s; deprecate `SendFutureWrapper`. +7. CI gate: `cargo build` the full workspace on **both** the `tokio-runtime` and + `embassy-runtime` feature sets; confirm the serial connector's Tokio and + Embassy halves now share the spine. +8. Update [Design 028](028-M13-remove-spawn-trait.md) with a forward link and + `012-M5-connector-development-guide.md` with the unified one-implementation + guidance. + +--- + +## Decisions + +- **Drop `Send` at the boxing boundary, infer it everywhere else.** The only + authoritative `Send` decision is the `ConnectorBuilder`/runner box; engines + stay `Send`-transparent so std direct-spawn survives. +- **Keep data/identity `Send`.** Minimizes blast radius; the futures-vs-data + distinction is the whole reason this is safe and small. +- **No feature flag.** A single contract for all runtimes; the unification is the + point. + +--- + +## Out of Scope + +- **Data-plane pump unification** (`pump_sink`/`pump_source` for Embassy + MQTT/KNX). Same root cause (`Source: Send`, `Connector: Send + Sync`); a + natural follow-up once the session-plane change lands. +- **Removing the WASM adapter's `unsafe impl Send/Sync`.** Enabled by this + change but tracked separately. +- **`aimdb-sync` no_std** ([#46](https://github.com/aimdb-dev/aimdb/issues/46)). + Unrelated.