From e58eb5736696407ab971045679ba4abec8ea5d6b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexander=20Schn=C3=B6rch?= Date: Fri, 5 Jun 2026 11:59:15 +0000 Subject: [PATCH 1/5] feat: add COBS framing tests for serial connector - Implemented roundtrip tests for COBS framing in `cobs_framing.rs` to ensure proper encoding and decoding of frames regardless of chunk sizes. - Added tests for single frame, byte-by-byte delivery, multiple frames across chunk boundaries, and empty payloads. - Verified that encoded frames do not contain the sentinel byte except as a terminator. feat: implement Embassy client-exit smoke test - Created `embassy_smoke.rs` to test the RPC engine over the real Embassy serial transport using a loopback UART. - Validated that the engine can handle requests and responses correctly, ensuring proper COBS framing and decoding. feat: add Tokio roundtrip test for AimX over serial transport - Developed `tokio_roundtrip.rs` to test end-to-end communication using a duplex stream as a mock serial connection. - Ensured that the AimX server and client can communicate correctly over the serial transport, verifying COBS framing in both directions. chore: initialize embassy-serial-connector-demo example - Created a new example project demonstrating AimX over serial on STM32H563ZI. - Configured the project with necessary dependencies and build settings for the STM32 platform. - Added README documentation for setup and usage instructions, including hardware requirements and troubleshooting tips. chore: add build script and flash script for demo - Implemented `build.rs` to configure linker arguments for the STM32 target. - Added `flash.sh` script to facilitate flashing the firmware to the STM32H563ZI using probe-rs. chore: set up .cargo/config.toml and .gitignore for demo - Configured `.cargo/config.toml` for the STM32 target and added a `.gitignore` file to exclude build artifacts. --- CHANGELOG.md | 2 + Cargo.lock | 180 +++++++- Cargo.toml | 15 +- Makefile | 20 +- aimdb-serial-connector/CHANGELOG.md | 31 ++ aimdb-serial-connector/Cargo.toml | 108 +++++ .../examples/serial_demo.rs | 124 ++++++ .../src/embassy_transport.rs | 416 ++++++++++++++++++ aimdb-serial-connector/src/framing.rs | 78 ++++ aimdb-serial-connector/src/lib.rs | 62 +++ aimdb-serial-connector/src/tokio_transport.rs | 317 +++++++++++++ aimdb-serial-connector/tests/cobs_framing.rs | 89 ++++ aimdb-serial-connector/tests/embassy_smoke.rs | 161 +++++++ .../tests/tokio_roundtrip.rs | 131 ++++++ .../.cargo/config.toml | 8 + .../embassy-serial-connector-demo/.gitignore | 5 + .../embassy-serial-connector-demo/Cargo.toml | 72 +++ .../embassy-serial-connector-demo/README.md | 105 +++++ .../embassy-serial-connector-demo/build.rs | 5 + .../embassy-serial-connector-demo/flash.sh | 13 + .../rust-toolchain.toml | 4 + .../embassy-serial-connector-demo/src/main.rs | 152 +++++++ 22 files changed, 2089 insertions(+), 9 deletions(-) create mode 100644 aimdb-serial-connector/CHANGELOG.md create mode 100644 aimdb-serial-connector/Cargo.toml create mode 100644 aimdb-serial-connector/examples/serial_demo.rs create mode 100644 aimdb-serial-connector/src/embassy_transport.rs create mode 100644 aimdb-serial-connector/src/framing.rs create mode 100644 aimdb-serial-connector/src/lib.rs create mode 100644 aimdb-serial-connector/src/tokio_transport.rs create mode 100644 aimdb-serial-connector/tests/cobs_framing.rs create mode 100644 aimdb-serial-connector/tests/embassy_smoke.rs create mode 100644 aimdb-serial-connector/tests/tokio_roundtrip.rs create mode 100644 examples/embassy-serial-connector-demo/.cargo/config.toml create mode 100644 examples/embassy-serial-connector-demo/.gitignore create mode 100644 examples/embassy-serial-connector-demo/Cargo.toml create mode 100644 examples/embassy-serial-connector-demo/README.md create mode 100644 examples/embassy-serial-connector-demo/build.rs create mode 100755 examples/embassy-serial-connector-demo/flash.sh create mode 100644 examples/embassy-serial-connector-demo/rust-toolchain.toml create mode 100644 examples/embassy-serial-connector-demo/src/main.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index f1744461..62ff14e8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 > - [aimdb-knx-connector/CHANGELOG.md](aimdb-knx-connector/CHANGELOG.md) > - [aimdb-websocket-connector/CHANGELOG.md](aimdb-websocket-connector/CHANGELOG.md) > - [aimdb-uds-connector/CHANGELOG.md](aimdb-uds-connector/CHANGELOG.md) +> - [aimdb-serial-connector/CHANGELOG.md](aimdb-serial-connector/CHANGELOG.md) > - [aimdb-ws-protocol/CHANGELOG.md](aimdb-ws-protocol/CHANGELOG.md) > - [aimdb-wasm-adapter/CHANGELOG.md](aimdb-wasm-adapter/CHANGELOG.md) > - [aimdb-sync/CHANGELOG.md](aimdb-sync/CHANGELOG.md) @@ -34,6 +35,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - **M16 — JSON codec extracted behind the `json-serialize` feature; `RecordValue::as_json()` now works on `no_std + alloc`, not just `std` ([Design 032](docs/design/032-M16-aimx-json-codec.md)).** New `aimdb-core::codec` module: `RemoteSerialize` (blanket-impl'd for every `serde` `Serialize + DeserializeOwned` type), the object-safe `JsonCodec`, and the zero-sized `SerdeJsonCodec`. `serde_json` runs on `alloc`, so embedded targets can opt in; `std` enables the feature transitively, so std builds are unaffected. ([aimdb-core](aimdb-core/CHANGELOG.md)) - **Embassy buffer + join-queue tests now run in CI (Issue #85).** The join-queue tests previously sat behind `embassy-runtime`, which pulls `embassy-executor`'s cortex-m assembly and can't compile under `cargo test` on x86_64 — so ordering / backpressure / clone-routing regressions were never caught. The `join_queue` module is now gated on `embassy-sync`, and `make test` runs the embassy adapter's unit tests + doctests on the host (no executor). Also adds `EmbassyBuffer::peek()` and fixes a stale `EmbassyBuffer` doc example. ([aimdb-embassy-adapter](aimdb-embassy-adapter/CHANGELOG.md)) - **`no_std` AimX server — a board can serve a host, not just dial one (Issue #120, follow-up to #39).** Cross-cutting de-std of `aimdb-core`'s central record API behind a new **`remote-access`** feature (`= ["json-serialize", "thiserror"]`, transitively enabled by `std`): the type-erased `AnyRecord` JSON + metadata methods, the `AimDb` JSON read/write/subscribe API, the `remote` module (config / protocol / security / error), and the AimX server dispatch (`AimxDispatch`/`AimxSession`) now all compile on `no_std + alloc` — swapping `std::collections` → `hashbrown`, `std::sync::Arc` → `alloc::sync::Arc`, and `thiserror` to `default-features = false`. Adds a runtime-neutral wall clock, `TimeOps::unix_time()`, implemented from the OS clock on Tokio and from an `EmbassyAdapter::set_unix_time(...)` anchor on Embassy. Verified by a new `thumbv7em-none-eabihf` dispatch cross-check in the Makefile. ([aimdb-core](aimdb-core/CHANGELOG.md), [aimdb-executor](aimdb-executor/CHANGELOG.md), [aimdb-tokio-adapter](aimdb-tokio-adapter/CHANGELOG.md), [aimdb-embassy-adapter](aimdb-embassy-adapter/CHANGELOG.md), [aimdb-uds-connector](aimdb-uds-connector/CHANGELOG.md), [tools/aimdb-mcp](tools/aimdb-mcp/CHANGELOG.md)) +- **`aimdb-serial-connector` — COBS-framed serial/UART transport, the headline embedded scenario (Issue #122, follow-up to #39 / #120, [doc 041](docs/design/detailed/041-phase6-embedded-transports.md)).** New crate: a sensor MCU dials a gateway over UART (and, on the `no_std` `AimxDispatch` from #120, an MCU *serves* a host over UART). Contributes only the `Dialer`/`Listener`/`Connection` triple + `SerialClient`/`SerialServer` sugar over the `serial://` scheme; reuses the AimX codec/dispatch and the session engines from core. Same compact AimX JSON, framed with **COBS** + a `0x00` sentinel (self-synchronizing on a lossy serial line). Dual halves: a std `tokio-runtime` (`tokio-serial`) riding the generic `Session*Connector`, and a `no_std + alloc` `embassy-runtime` generic over `embedded-io-async` UART halves that hand-rolls `ConnectorBuilder` and force-`Send`s the single-core futures via `SendFutureWrapper`. Cross-compiles to `thumbv7em-none-eabihf` (new Makefile `test-embedded` checks). Ships a real end-to-end demo: a host `serial_demo` example and an `embassy-serial-connector-demo` STM32H563ZI firmware (serves a record over USART3 ↔ the ST-LINK Virtual COM Port, queried from the host). The workspace `embedded-io-async` dep is bumped 0.6 → 0.7 to match the Embassy STM32 HAL. ([aimdb-serial-connector](aimdb-serial-connector/CHANGELOG.md)) ### Changed (breaking) diff --git a/Cargo.lock b/Cargo.lock index 3ed6c6c1..d25b6af7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -253,6 +253,28 @@ dependencies = [ "tracing", ] +[[package]] +name = "aimdb-serial-connector" +version = "0.1.0" +dependencies = [ + "aimdb-core", + "aimdb-embassy-adapter", + "aimdb-executor", + "aimdb-tokio-adapter", + "cobs", + "defmt 1.0.1", + "embassy-time-driver", + "embedded-io-async 0.7.0", + "futures", + "futures-util", + "serde", + "serde_json", + "thiserror 2.0.17", + "tokio", + "tokio-serial", + "tracing", +] + [[package]] name = "aimdb-sync" version = "0.5.0" @@ -627,6 +649,12 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" +[[package]] +name = "cfg_aliases" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" + [[package]] name = "chacha20" version = "0.10.0" @@ -691,6 +719,16 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a1d728cc89cf3aee9ff92b05e62b19ee65a02b5702cff7d5a377e32c6ae29d8d" +[[package]] +name = "cobs" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd93fd2c1b27acd030440c9dbd9d14c1122aad622374fe05a670b67a4bc034be" +dependencies = [ + "heapless 0.9.1", + "thiserror 2.0.17", +] + [[package]] name = "colorchoice" version = "1.0.4" @@ -741,6 +779,16 @@ dependencies = [ "libc", ] +[[package]] +name = "core-foundation" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2a6cd9ae233e7f62ba4e9353e81a88df7fc8a5987b8d445b4d90c879bd156f6" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "core-foundation-sys" version = "0.8.7" @@ -1048,7 +1096,7 @@ dependencies = [ "embedded-alloc", "embedded-hal 0.2.7", "embedded-hal-async", - "embedded-io-async 0.6.1", + "embedded-io-async 0.7.0", "embedded-storage", "heapless 0.8.0", "knx-connector-demo-common", @@ -1081,7 +1129,7 @@ dependencies = [ "embedded-alloc", "embedded-hal 0.2.7", "embedded-hal-async", - "embedded-io-async 0.6.1", + "embedded-io-async 0.7.0", "embedded-storage", "heapless 0.8.0", "micromath", @@ -1115,6 +1163,31 @@ dependencies = [ "defmt 1.0.1", ] +[[package]] +name = "embassy-serial-connector-demo" +version = "0.1.0" +dependencies = [ + "aimdb-core", + "aimdb-embassy-adapter", + "aimdb-executor", + "aimdb-serial-connector", + "cortex-m", + "cortex-m-rt", + "critical-section", + "defmt 1.0.1", + "defmt-rtt", + "embassy-executor", + "embassy-futures", + "embassy-stm32", + "embassy-sync", + "embassy-time", + "embedded-alloc", + "embedded-io-async 0.7.0", + "panic-probe", + "serde", + "static_cell", +] + [[package]] name = "embassy-stm32" version = "0.6.0" @@ -2020,6 +2093,16 @@ dependencies = [ "serde_core", ] +[[package]] +name = "io-kit-sys" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "617ee6cf8e3f66f3b4ea67a4058564628cde41901316e19f559e14c7c72c5e7b" +dependencies = [ + "core-foundation-sys", + "mach2", +] + [[package]] name = "ipnet" version = "2.11.0" @@ -2164,6 +2247,15 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "mach2" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d640282b302c0bb0a2a8e0233ead9035e3bed871f0b7e81fe4a1ec829765db44" +dependencies = [ + "libc", +] + [[package]] name = "managed" version = "0.8.0" @@ -2220,10 +2312,24 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69d83b0086dc8ecf3ce9ae2874b2d1290252e2a30720bea58a5c6639b0092873" dependencies = [ "libc", + "log", "wasi", "windows-sys 0.61.2", ] +[[package]] +name = "mio-serial" +version = "5.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "029e1f407e261176a983a6599c084efd322d9301028055c87174beac71397ba3" +dependencies = [ + "log", + "mio", + "nix 0.29.0", + "serialport", + "winapi", +] + [[package]] name = "mountain-mqtt" version = "0.2.0" @@ -2289,6 +2395,29 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8d5439c4ad607c3c23abf66de8c8bf57ba8adcd1f129e699851a6e43935d339d" +[[package]] +name = "nix" +version = "0.26.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "598beaf3cc6fdd9a5dfb1630c2800c7acd31df7aaf0f565796fba2b53ca1af1b" +dependencies = [ + "bitflags 1.3.2", + "cfg-if", + "libc", +] + +[[package]] +name = "nix" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "71e2746dc3a24dd78b3cfcb7be93368c6de9963d30f43a6a73998a9cf4b17b46" +dependencies = [ + "bitflags 2.11.0", + "cfg-if", + "cfg_aliases", + "libc", +] + [[package]] name = "nu-ansi-term" version = "0.50.3" @@ -2839,7 +2968,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" dependencies = [ "bitflags 2.11.0", - "core-foundation", + "core-foundation 0.9.4", "core-foundation-sys", "libc", "security-framework-sys", @@ -2975,6 +3104,24 @@ dependencies = [ "unsafe-libyaml", ] +[[package]] +name = "serialport" +version = "4.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4d91116f97173694f1642263b2ff837f80d933aa837e2314969f6728f661df3" +dependencies = [ + "bitflags 2.11.0", + "cfg-if", + "core-foundation 0.10.1", + "core-foundation-sys", + "io-kit-sys", + "mach2", + "nix 0.26.4", + "scopeguard", + "unescaper", + "windows-sys 0.52.0", +] + [[package]] name = "sha1" version = "0.10.6" @@ -3189,7 +3336,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a13f3d0daba03132c0aa9767f98351b3488edc2c100cda2d2ec2b04f3d8d3c8b" dependencies = [ "bitflags 2.11.0", - "core-foundation", + "core-foundation 0.9.4", "system-configuration-sys", ] @@ -3388,6 +3535,20 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-serial" +version = "5.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa1d5427f11ba7c5e6384521cfd76f2d64572ff29f3f4f7aa0f496282923fdc8" +dependencies = [ + "cfg-if", + "futures", + "log", + "mio-serial", + "serialport", + "tokio", +] + [[package]] name = "tokio-stream" version = "0.1.17" @@ -3676,6 +3837,15 @@ version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "562d481066bde0658276a35467c4af00bdc6ee726305698a55b86e61d7ad82bb" +[[package]] +name = "unescaper" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4064ed685c487dbc25bd3f0e9548f2e34bab9d18cefc700f9ec2dba74ba1138e" +dependencies = [ + "thiserror 2.0.17", +] + [[package]] name = "unicode-ident" version = "1.0.20" @@ -4033,7 +4203,7 @@ dependencies = [ "embedded-alloc", "embedded-hal 0.2.7", "embedded-hal-async", - "embedded-io-async 0.6.1", + "embedded-io-async 0.7.0", "embedded-storage", "heapless 0.8.0", "micromath", diff --git a/Cargo.toml b/Cargo.toml index 435d3cd1..2b7dbb45 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,7 @@ members = [ "aimdb-knx-connector", "aimdb-websocket-connector", "aimdb-uds-connector", + "aimdb-serial-connector", "aimdb-ws-protocol", "aimdb-wasm-adapter", "tools/aimdb-cli", @@ -25,6 +26,7 @@ members = [ "examples/tokio-knx-connector-demo", "examples/embassy-mqtt-connector-demo", "examples/embassy-knx-connector-demo", + "examples/embassy-serial-connector-demo", "examples/sync-api-demo", "examples/remote-access-demo", "examples/weather-mesh-demo/weather-mesh-common", @@ -71,12 +73,21 @@ anyhow = "1.0" # Serialization - extend with error handling serde_json = { version = "1.0", default-features = false, features = ["alloc"] } +# COBS framing for the serial connector (no_std; self-synchronizing 0x00-delimited +# frames over a lossy/unframed byte medium). +cobs = { version = "0.5.1", default-features = false } + +# Serial/UART transport for the tokio half of the serial connector (std only). +tokio-serial = "5.4.5" + # Basic observability tracing = { version = "0.1", default-features = false } # Async utilities futures = "0.3" -futures-util = { version = "0.3", default-features = false, features = ["alloc"] } +futures-util = { version = "0.3", default-features = false, features = [ + "alloc", +] } # CLI (for aimdb-cli) clap = { version = "4.0", features = ["derive"] } @@ -127,7 +138,7 @@ embedded-hal-async = "1.0" embedded-hal-nb = "1.0" embedded-hal-bus = { version = "0.2", features = ["async"] } embedded-io = { version = "0.6.0" } -embedded-io-async = { version = "0.6.1" } +embedded-io-async = { version = "0.7.0" } embedded-nal-async = "0.8.0" embedded-storage = "0.3.1" diff --git a/Makefile b/Makefile index 3a8d9908..281ba604 100644 --- a/Makefile +++ b/Makefile @@ -102,6 +102,8 @@ build: cargo build --package aimdb-websocket-connector --features "server,client" @printf "$(YELLOW) → Building UDS connector$(NC)\n" cargo build --package aimdb-uds-connector + @printf "$(YELLOW) → Building serial connector (tokio)$(NC)\n" + cargo build --package aimdb-serial-connector --no-default-features --features "tokio-runtime" @printf "$(YELLOW) → Building WASM adapter$(NC)\n" cargo build --package aimdb-wasm-adapter --target wasm32-unknown-unknown --features "wasm-runtime" @@ -165,10 +167,14 @@ test: cargo test --package aimdb-websocket-connector --no-default-features --features "client" --lib @printf "$(YELLOW) → Testing UDS connector$(NC)\n" cargo test --package aimdb-uds-connector + @printf "$(YELLOW) → Testing serial connector (tokio: COBS framing + AimX round-trip over a duplex)$(NC)\n" + cargo test --package aimdb-serial-connector --no-default-features --features "_test-tokio" + @printf "$(YELLOW) → Testing serial connector (embassy: COBS framing + client-engine smoke on the EmbassyAdapter clock)$(NC)\n" + cargo test --package aimdb-serial-connector --no-default-features --features "embassy-runtime" fmt: @printf "$(GREEN)Formatting code (workspace members only)...$(NC)\n" - @for pkg in aimdb-executor aimdb-derive aimdb-data-contracts aimdb-core aimdb-client aimdb-embassy-adapter aimdb-tokio-adapter aimdb-wasm-adapter aimdb-sync aimdb-persistence aimdb-persistence-sqlite aimdb-mqtt-connector aimdb-knx-connector aimdb-ws-protocol aimdb-websocket-connector aimdb-codegen aimdb-cli aimdb-mcp sync-api-demo tokio-mqtt-connector-demo embassy-mqtt-connector-demo tokio-knx-connector-demo embassy-knx-connector-demo weather-mesh-common weather-hub weather-station-alpha weather-station-beta hello-mailbox hello-single-latest-async; do \ + @for pkg in aimdb-executor aimdb-derive aimdb-data-contracts aimdb-core aimdb-client aimdb-embassy-adapter aimdb-tokio-adapter aimdb-wasm-adapter aimdb-sync aimdb-persistence aimdb-persistence-sqlite aimdb-mqtt-connector aimdb-knx-connector aimdb-ws-protocol aimdb-websocket-connector aimdb-uds-connector aimdb-serial-connector aimdb-codegen aimdb-cli aimdb-mcp sync-api-demo tokio-mqtt-connector-demo embassy-mqtt-connector-demo tokio-knx-connector-demo embassy-knx-connector-demo embassy-serial-connector-demo weather-mesh-common weather-hub weather-station-alpha weather-station-beta hello-mailbox hello-single-latest-async; do \ printf "$(YELLOW) → Formatting $$pkg$(NC)\n"; \ cargo fmt -p $$pkg 2>/dev/null || true; \ done @@ -177,7 +183,7 @@ fmt: fmt-check: @printf "$(GREEN)Checking code formatting (workspace members only)...$(NC)\n" @FAILED=0; \ - for pkg in aimdb-executor aimdb-derive aimdb-data-contracts aimdb-core aimdb-client aimdb-embassy-adapter aimdb-tokio-adapter aimdb-wasm-adapter aimdb-sync aimdb-persistence aimdb-persistence-sqlite aimdb-mqtt-connector aimdb-knx-connector aimdb-ws-protocol aimdb-websocket-connector aimdb-codegen aimdb-cli aimdb-mcp sync-api-demo tokio-mqtt-connector-demo embassy-mqtt-connector-demo tokio-knx-connector-demo embassy-knx-connector-demo weather-mesh-common weather-hub weather-station-alpha weather-station-beta hello-mailbox hello-single-latest-async; do \ + for pkg in aimdb-executor aimdb-derive aimdb-data-contracts aimdb-core aimdb-client aimdb-embassy-adapter aimdb-tokio-adapter aimdb-wasm-adapter aimdb-sync aimdb-persistence aimdb-persistence-sqlite aimdb-mqtt-connector aimdb-knx-connector aimdb-ws-protocol aimdb-websocket-connector aimdb-uds-connector aimdb-serial-connector aimdb-codegen aimdb-cli aimdb-mcp sync-api-demo tokio-mqtt-connector-demo embassy-mqtt-connector-demo tokio-knx-connector-demo embassy-knx-connector-demo embassy-serial-connector-demo weather-mesh-common weather-hub weather-station-alpha weather-station-beta hello-mailbox hello-single-latest-async; do \ printf "$(YELLOW) → Checking $$pkg$(NC)\n"; \ if ! cargo fmt -p $$pkg -- --check 2>&1; then \ printf "$(RED)❌ Formatting check failed for $$pkg$(NC)\n"; \ @@ -240,6 +246,12 @@ clippy: cargo clippy --package aimdb-websocket-connector --features "tokio-runtime,client" --all-targets -- -D warnings @printf "$(YELLOW) → Clippy on UDS connector$(NC)\n" cargo clippy --package aimdb-uds-connector --all-targets -- -D warnings + @printf "$(YELLOW) → Clippy on serial connector (tokio)$(NC)\n" + cargo clippy --package aimdb-serial-connector --no-default-features --features "_test-tokio" --all-targets -- -D warnings + @printf "$(YELLOW) → Clippy on serial connector (embassy)$(NC)\n" + cargo clippy --package aimdb-serial-connector --target thumbv7em-none-eabihf --no-default-features --features "embassy-runtime" -- -D warnings + @printf "$(YELLOW) → Clippy on serial connector (embassy + defmt)$(NC)\n" + cargo clippy --package aimdb-serial-connector --target thumbv7em-none-eabihf --no-default-features --features "embassy-runtime,defmt" -- -D warnings @printf "$(YELLOW) → Clippy on WASM adapter$(NC)\n" cargo clippy --package aimdb-wasm-adapter --target wasm32-unknown-unknown --features "wasm-runtime" -- -D warnings @@ -326,6 +338,10 @@ test-embedded: cargo check --package aimdb-knx-connector --target thumbv7em-none-eabihf --target-dir $(EMBEDDED_CHECK_TARGET_DIR) --no-default-features --features "embassy-runtime" @printf "$(YELLOW) → Checking aimdb-knx-connector (Embassy + defmt) on thumbv7em-none-eabihf target$(NC)\n" cargo check --package aimdb-knx-connector --target thumbv7em-none-eabihf --target-dir $(EMBEDDED_CHECK_TARGET_DIR) --no-default-features --features "embassy-runtime,defmt" + @printf "$(YELLOW) → Checking aimdb-serial-connector (Embassy: full no_std AimX serial client+server) on thumbv7em-none-eabihf target$(NC)\n" + cargo check --package aimdb-serial-connector --target thumbv7em-none-eabihf --target-dir $(EMBEDDED_CHECK_TARGET_DIR) --no-default-features --features "embassy-runtime" + @printf "$(YELLOW) → Checking aimdb-serial-connector (Embassy + defmt) on thumbv7em-none-eabihf target$(NC)\n" + cargo check --package aimdb-serial-connector --target thumbv7em-none-eabihf --target-dir $(EMBEDDED_CHECK_TARGET_DIR) --no-default-features --features "embassy-runtime,defmt" ## Example projects examples: diff --git a/aimdb-serial-connector/CHANGELOG.md b/aimdb-serial-connector/CHANGELOG.md new file mode 100644 index 00000000..f8c3e3e3 --- /dev/null +++ b/aimdb-serial-connector/CHANGELOG.md @@ -0,0 +1,31 @@ +# Changelog - aimdb-serial-connector + +All notable changes to the `aimdb-serial-connector` crate will be documented in this file. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), +and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +## [Unreleased] + +### Added + +- **New crate — the COBS-framed serial/UART transport for AimDB remote access (Issue #122, follow-up to #39, [doc 041](../docs/design/detailed/041-phase6-embedded-transports.md)).** The serial sibling of `aimdb-uds-connector`: it contributes only the `Dialer`/`Listener`/`Connection` triple plus thin sugar; the AimX codec + dispatch and the runtime-neutral session engines (`run_client`/`serve`) are reused from `aimdb-core`. The wire is the same compact AimX JSON, framed with **COBS** (Consistent Overhead Byte Stuffing) and a `0x00` sentinel instead of a newline — self-synchronizing on a lossy/unframed serial medium, so a receiver that joins mid-stream resynchronizes on the next sentinel. Default scheme `"serial"`. Two runtime halves: + - **`tokio-runtime`** (std, host/gateway) — `TokioSerialConnection` over any `AsyncRead + AsyncWrite` (a real `tokio_serial::SerialStream` in production, a `tokio::io::duplex()` in tests), with `SerialClient::new(path, baud)` (sugar over `SessionClientConnector`) and `SerialServer` (sugar over `SessionServerConnector` + `AimxDispatch`). The listener is one-shot (serial is point-to-point). + - **`embassy-runtime`** (`no_std + alloc`, MCU) — `EmbassySerialConnection` generic over `embedded-io-async` `Read`/`Write` halves (the common `Uart::split()` shape), with `SerialClient`/`SerialServer` that hand-roll `ConnectorBuilder` (calling `run_client`/`pump_client`/`serve` directly) and force-`Send` the single-core Embassy futures via `aimdb-embassy-adapter`'s `SendFutureWrapper`. The Embassy *server* half rides the `no_std` `AimxDispatch` landed in #120, so an MCU can answer `record.list`/`get`/`set`/`subscribe`/`drain` over a UART; the *client* half mirrors records to a gateway. Reconnect is disabled by default on Embassy (the UART peripheral is moved in and can't be re-acquired). +- **`framing` module** — the shared COBS frame codec (`encode_frame` + a chunk-tolerant `FrameAccumulator`), pure `no_std + alloc`, so the round-trip is unit-tested independent of any transport. +- **Examples — a real end-to-end serial test.** `examples/serial_demo.rs` (host, `--features _test-tokio`): an AimX client/server over a device path (a board's ST-LINK VCP at `/dev/ttyACM0`, or a `socat` PTY pair). `examples/embassy-serial-connector-demo/` (board): an STM32H563ZI Nucleo serving the `counter` record over USART3 ↔ the ST-LINK Virtual COM Port — the no_std `SerialServer` + `AimxDispatch` on real silicon, flashed via `probe-rs`, queried from the host over the wire. + +### Notes + +- **The Embassy half sends each frame in ring-sized (64-byte) chunks.** A HAL + `BufferedUart::write` is atomic-or-error (`embassy-stm32` returns `BufferTooLong` + for a single write larger than its TX ring), so a frame bigger than the buffer — + e.g. a `record.list` reply — would otherwise fail the whole send and drop the + session. Chunking sends a frame of any length given a TX buffer ≥ 64 bytes. + (Validated end-to-end on an STM32H563ZI: `record.list`/`record.get`/streaming all + round-trip over the ST-LINK VCP.) +- **`embedded-io-async` is pinned to 0.7** (the workspace dep was bumped 0.6 → 0.7) so a HAL `BufferedUart` (e.g. `embassy-stm32`, which uses 0.7) satisfies the connector's `Read`/`Write` bounds without a trait-version skew. Wire the Embassy half from a `BufferedUart::split()` — the plain async `UartRx` does **not** implement `embedded-io-async::Read`, only the buffered/ring-buffered variants do. + +- The `unsafe impl Send`/`Sync` on the Embassy transport + builder types rest on the single-core, cooperative Embassy-executor invariant documented by `SendFutureWrapper` (no preemption / thread migration). This is the first *raw-peripheral* session connector — MQTT/KNX sidestep it by pulling a `Send + Sync` `embassy_net::Stack` from the runtime adapter rather than owning a peripheral. +- The `serial://` scheme constant lands here; the `connect_url` / `--connect ` resolver that maps a `serial:///dev/ttyUSB0?baud=115200` URL to `SerialClient`/`SerialServer` is tracked separately (Issue #123). +- The internal `_test-tokio` feature gates the host tokio integration test's adapter dependency; it is kept off the public `tokio-runtime` feature (a connector shouldn't pull a concrete adapter) and out of `[dev-dependencies]` (an unconditional tokio adapter would force `aimdb-core/std` into the `no_std` embassy test build). diff --git a/aimdb-serial-connector/Cargo.toml b/aimdb-serial-connector/Cargo.toml new file mode 100644 index 00000000..c5a46378 --- /dev/null +++ b/aimdb-serial-connector/Cargo.toml @@ -0,0 +1,108 @@ +[package] +name = "aimdb-serial-connector" +version = "0.1.0" +edition = "2021" +license.workspace = true +repository.workspace = true +homepage.workspace = true +description = "COBS-framed serial/UART transport connector for AimDB: record mirroring and remote access (AimX over serial), tokio + Embassy" +keywords = ["aimdb", "connector", "serial", "uart", "embedded"] +categories = ["embedded", "network-programming", "no-std"] + +[features] +default = ["aimdb-core/alloc"] + +# std build (host). Pulls the tokio half's std bits. +std = [ + "aimdb-core/std", + "aimdb-core/alloc", + "aimdb-core/connector-session", + "thiserror", +] + +# Host / gateway half: real serial via `tokio-serial`, riding the generic +# `SessionClientConnector`/`SessionServerConnector` from core. +tokio-runtime = [ + "std", + "aimdb-core/connector-session", + "aimdb-core/remote-access", + "dep:tokio", + "dep:tokio-serial", + "dep:futures-util", +] + +# Embedded half: `no_std + alloc`, generic over `embedded-io-async` UART halves. +# Hand-rolls the `ConnectorBuilder` and force-`Send`s the engine futures via +# `aimdb-embassy-adapter`'s `SendFutureWrapper`. +embassy-runtime = [ + "aimdb-core/alloc", + "aimdb-core/connector-session", + "aimdb-core/remote-access", + "dep:aimdb-embassy-adapter", + # `SendFutureWrapper` (force-`Send`) is always available on the no_std adapter; + # `embassy-time`/`-sync` give the `EmbassyAdapter` clock the smoke test runs on. + "aimdb-embassy-adapter/embassy-time", + "aimdb-embassy-adapter/embassy-sync", + "dep:embedded-io-async", +] + +tracing = ["dep:tracing", "aimdb-core/tracing"] +defmt = ["dep:defmt", "aimdb-core/defmt"] + +# Internal: host tokio integration tests need a concrete adapter. Kept off the +# public `tokio-runtime` feature (a connector shouldn't pull an adapter in +# production) and out of `[dev-dependencies]` (an unconditional tokio adapter +# would force `aimdb-core/std` into the no_std `embassy-runtime` test build, where +# the no_std `aimdb-embassy-adapter` can't compile against it). Run the tokio +# tests with `--features _test-tokio`. +_test-tokio = ["tokio-runtime", "dep:aimdb-tokio-adapter"] + +[dependencies] +# AimX protocol (codec + dispatch) and the generic session connectors live in +# core; this crate contributes only the serial transport triple + sugar. +aimdb-core = { version = "1.1.0", path = "../aimdb-core", default-features = false } +aimdb-executor = { version = "0.2.0", path = "../aimdb-executor", default-features = false } + +# Shared framing (no_std). `alloc` enables the `encode_vec`/`decode_vec` helpers. +cobs = { workspace = true, features = ["alloc"] } + +# --- tokio half (std) --- +tokio = { workspace = true, optional = true, features = ["io-util"] } +tokio-serial = { workspace = true, optional = true } +futures-util = { version = "0.3", optional = true, default-features = false, features = [ + "alloc", +] } +thiserror = { workspace = true, optional = true } + +# --- embassy half (no_std) --- +aimdb-embassy-adapter = { version = "0.6.0", path = "../aimdb-embassy-adapter", default-features = false, optional = true } +embedded-io-async = { workspace = true, optional = true } + +# --- test-only (see the `_test-tokio` feature) --- +aimdb-tokio-adapter = { version = "0.6.0", path = "../aimdb-tokio-adapter", optional = true } + +# --- logging --- +tracing = { version = "0.1", optional = true } +defmt = { workspace = true, optional = true } + +[dev-dependencies] +# Trivial host time driver so `embassy-time` links in the embassy smoke test. +# (`aimdb-embassy-adapter` itself is the regular `embassy-runtime` optional dep — +# keeping it out of dev-deps avoids a std/no_std unification clash with the tokio +# tests, where it would otherwise compile against a std `aimdb-core`.) +embassy-time-driver = { path = "../_external/embassy/embassy-time-driver" } +tokio = { version = "1", features = [ + "rt-multi-thread", + "macros", + "time", + "io-util", +] } +futures = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } + +# The real-serial host demo needs a concrete adapter (and tokio-serial); gate it on +# the internal test feature so it doesn't pull an adapter into the embassy build. +[[example]] +name = "serial_demo" +required-features = ["_test-tokio"] diff --git a/aimdb-serial-connector/examples/serial_demo.rs b/aimdb-serial-connector/examples/serial_demo.rs new file mode 100644 index 00000000..51861c2c --- /dev/null +++ b/aimdb-serial-connector/examples/serial_demo.rs @@ -0,0 +1,124 @@ +//! Real-serial demo for `aimdb-serial-connector` — talk AimX over an actual +//! serial device (a board's ST-LINK Virtual COM Port, or a `socat` PTY pair for a +//! no-hardware smoke). +//! +//! Two modes, so either end can be the host: +//! +//! ```text +//! # board (Embassy SerialServer) ⇄ host: +//! cargo run --example serial_demo --features _test-tokio -- client /dev/ttyACM0 +//! +//! # host SerialServer ⇄ host client over a PTY (no hardware): +//! socat -d -d pty,raw,echo=0 pty,raw,echo=0 # prints two /dev/pts/N +//! cargo run ... -- server /dev/pts/3 +//! cargo run ... -- client /dev/pts/4 +//! ``` +//! +//! - `server [baud]` — serve a live `counter` record over the line. +//! - `client [baud]` — `record.list` once, then `record.get counter` +//! every second, printing what comes back across the wire. +//! +//! Built only under the internal `_test-tokio` feature (it needs a concrete +//! adapter); see the crate's `Cargo.toml`. +//! +//! On macOS the board's VCP is `/dev/cu.usbmodem…` (use the `cu.*`, not `tty.*`, +//! node). Run from the workspace root, and make sure nothing else holds the port +//! (no stray `screen`/client) — only one program may open it at a time. + +use std::sync::Arc; +use std::time::Duration; + +use aimdb_core::buffer::BufferCfg; +use aimdb_core::remote::AimxConfig; +use aimdb_core::session::aimx::AimxCodec; +use aimdb_core::session::{run_client, ClientConfig, Payload}; +use aimdb_core::AimDbBuilder; +use aimdb_serial_connector::tokio_transport::{SerialDialer, SerialServer}; +use aimdb_tokio_adapter::{TokioAdapter, TokioRecordRegistrarExt}; +use serde::{Deserialize, Serialize}; +use serde_json::json; + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct Counter { + value: u64, +} + +#[tokio::main] +async fn main() { + let args: Vec = std::env::args().collect(); + let mode = args.get(1).map(String::as_str).unwrap_or("client"); + let device = args + .get(2) + .cloned() + .unwrap_or_else(|| "/dev/ttyACM0".to_string()); + let baud: u32 = args.get(3).and_then(|s| s.parse().ok()).unwrap_or(115_200); + + match mode { + "server" => run_server(device, baud).await, + "client" => run_client_mode(device, baud).await, + other => { + eprintln!("usage: serial_demo [baud] (got mode '{other}')"); + std::process::exit(2); + } + } +} + +/// Serve a `counter` record (incrementing once per second) over the serial line. +async fn run_server(device: String, baud: u32) { + println!("[server] serving AimX over {device} @ {baud} baud (Ctrl-C to stop)"); + + let mut builder = AimDbBuilder::new() + .runtime(Arc::new(TokioAdapter)) + .with_connector(SerialServer::new(device, baud).with_config(AimxConfig::uds_default())); + builder.configure::("counter", |reg| { + reg.buffer(BufferCfg::SingleLatest).with_remote_access(); + }); + let (db, runner) = builder.build().await.expect("build db"); + let db = Arc::new(db); + + // Drive a value into the record once per second. + let producer = db.producer::("counter").expect("producer"); + tokio::spawn(async move { + let mut value = 0u64; + loop { + value += 1; + producer.produce(Counter { value }); + tokio::time::sleep(Duration::from_secs(1)).await; + } + }); + + // The runner drives the serial `serve` loop; never returns. + runner.run().await; +} + +/// Connect over the serial line and poll the `counter` record. +async fn run_client_mode(device: String, baud: u32) { + println!("[client] querying AimX over {device} @ {baud} baud"); + + let (handle, engine) = run_client( + SerialDialer::new(device, baud), + AimxCodec, + ClientConfig { + sends_hello: false, + ..ClientConfig::default() + }, + Arc::new(TokioAdapter), + ); + tokio::spawn(engine); + + match handle.call("record.list", Payload::from(&b"{}"[..])).await { + Ok(list) => println!("[client] record.list = {}", String::from_utf8_lossy(&list)), + Err(e) => println!("[client] record.list failed: {e:?}"), + } + + loop { + let params: Payload = serde_json::to_vec(&json!({ "name": "counter" })) + .unwrap() + .into(); + match handle.call("record.get", params).await { + Ok(v) => println!("[client] counter = {}", String::from_utf8_lossy(&v)), + Err(e) => println!("[client] record.get failed: {e:?}"), + } + tokio::time::sleep(Duration::from_secs(1)).await; + } +} diff --git a/aimdb-serial-connector/src/embassy_transport.rs b/aimdb-serial-connector/src/embassy_transport.rs new file mode 100644 index 00000000..d5e23241 --- /dev/null +++ b/aimdb-serial-connector/src/embassy_transport.rs @@ -0,0 +1,416 @@ +//! Embassy serial transport (feature `embassy-runtime`, `no_std + alloc`) — a +//! [`Connection`] over an [`embedded_io_async`] UART with COBS framing, plus +//! [`SerialClient`]/[`SerialServer`] sugar. +//! +//! Generic over the `embedded-io-async` `Read`/`Write` halves (the common Embassy +//! HAL shape, e.g. `Uart::split()`), so it works with any chip's async UART. +//! +//! # Why this half hand-rolls `ConnectorBuilder` +//! +//! The generic [`SessionClientConnector`](aimdb_core::session::SessionClientConnector) +//! / [`SessionServerConnector`](aimdb_core::session::SessionServerConnector) demand +//! `Clone + Send + Sync` on the dialer / `Send + Sync` on the listener+dispatch +//! factories — bounds a moved-in UART peripheral can't meet. The underlying +//! engines need only the bare traits ([`run_client`]``, [`serve`]``), so we call them directly and force-`Send` the (single-core, +//! cooperative) Embassy futures with `aimdb-embassy-adapter`'s +//! [`SendFutureWrapper`] — the same pattern the MQTT/KNX Embassy connectors use. +//! +//! The `unsafe impl Send`/`Sync` on the transport + builder types rest on the same +//! invariant `SendFutureWrapper` documents: an Embassy executor runs cooperatively +//! on a single core with no preemption or thread migration, so these values are +//! never actually accessed from another thread. + +use core::cell::RefCell; +use core::future::Future; +use core::pin::Pin; + +use alloc::boxed::Box; +use alloc::string::{String, ToString}; +use alloc::sync::Arc; +use alloc::vec; +use alloc::vec::Vec; + +use embedded_io_async::{Read, Write}; + +use aimdb_embassy_adapter::SendFutureWrapper; + +use aimdb_core::connector::ConnectorBuilder; +use aimdb_core::remote::{AimxConfig, SecurityPolicy}; +use aimdb_core::session::aimx::{AimxCodec, AimxDispatch}; +use aimdb_core::session::{ + pump_client, run_client, serve, BoxFut, ClientConfig, Connection, Dialer, Dispatch, Listener, + PeerInfo, SessionConfig, SessionLimits, TransportError, TransportResult, +}; +use aimdb_core::{AimDb, DbError, DbResult, RuntimeAdapter}; +use aimdb_executor::TimeOps; + +use crate::framing::{encode_frame, FrameAccumulator}; +use crate::DEFAULT_SCHEME; + +type BoxFuture = Pin + Send + 'static>>; +type BuildFuture<'a> = Pin>> + Send + 'a>>; + +/// How many bytes a single UART `read()` pulls before re-checking for a frame. +const READ_CHUNK: usize = 64; + +/// Max bytes per `write()` call. Some HAL `BufferedUart::write` is atomic-or-error +/// (e.g. `embassy-stm32` returns `BufferTooLong` for a single write larger than its +/// TX ring), so a frame bigger than the buffer must be split. Chunking at this size +/// sends a frame of any length as long as the TX buffer is at least this big. +const WRITE_CHUNK: usize = 64; + +// =========================================================================== +// Connection +// =========================================================================== + +/// A framed bidirectional pipe over an `embedded-io-async` UART. Framing lives in +/// the transport: [`recv`](Connection::recv) returns one COBS frame (sentinel +/// stripped); [`send`](Connection::send) COBS-encodes and appends the sentinel. +pub struct EmbassySerialConnection { + rx: Rd, + tx: Wr, + acc: FrameAccumulator, + peer: PeerInfo, +} + +// SAFETY: Embassy executors run cooperatively on a single core with no preemption +// or thread migration, so the wrapped UART halves are never accessed across +// threads. Only construct this where it is driven by an Embassy executor. Same +// invariant as `aimdb_embassy_adapter::SendFutureWrapper`. +unsafe impl Send for EmbassySerialConnection {} + +impl EmbassySerialConnection { + /// Wrap the split read/write halves of an async UART. + pub fn new(rx: Rd, tx: Wr) -> Self { + Self { + rx, + tx, + acc: FrameAccumulator::new(), + peer: PeerInfo::default(), + } + } +} + +impl Connection for EmbassySerialConnection +where + Rd: Read, + Wr: Write, +{ + fn recv(&mut self) -> BoxFut<'_, TransportResult>>> { + // `SendFutureWrapper` force-`Send`s the (single-core) UART read future to + // satisfy the `Send` `BoxFut` return type. + Box::pin(SendFutureWrapper(async move { + loop { + if let Some(frame) = self.acc.next_frame() { + return Ok(Some(frame.map_err(|_| TransportError::Io)?)); + } + let mut chunk = [0u8; READ_CHUNK]; + match self.rx.read(&mut chunk).await { + Ok(0) => return Ok(None), // EOF — peer closed + Ok(n) => self.acc.push_bytes(&chunk[..n]), + Err(_) => return Err(TransportError::Io), + } + } + })) + } + + fn send<'a>(&'a mut self, frame: &'a [u8]) -> BoxFut<'a, TransportResult<()>> { + Box::pin(SendFutureWrapper(async move { + let mut out = Vec::new(); + encode_frame(frame, &mut out); + // Write in ring-sized chunks: a HAL `BufferedUart` rejects a single + // write larger than its TX buffer, so a frame bigger than the buffer + // (e.g. a `record.list` reply) must be split. + for chunk in out.chunks(WRITE_CHUNK) { + self.tx + .write_all(chunk) + .await + .map_err(|_| TransportError::Closed)?; + } + self.tx.flush().await.map_err(|_| TransportError::Closed) + })) + } + + fn peer(&self) -> &PeerInfo { + &self.peer + } +} + +// =========================================================================== +// Dialer / Listener +// =========================================================================== + +/// The initiating (client) side. Holds the UART halves and hands them to the one +/// connection it ever opens (the peripheral is moved in, so it can't redial — +/// pair with `ClientConfig { reconnect: false, .. }`, the [`SerialClient`] default). +pub struct SerialDialer { + halves: RefCell>, +} + +// SAFETY: single-core cooperative Embassy executor — see the connection above. +unsafe impl Send for SerialDialer {} + +impl SerialDialer { + /// Build a one-shot dialer over the split UART halves. + pub fn new(rx: Rd, tx: Wr) -> Self { + Self { + halves: RefCell::new(Some((rx, tx))), + } + } +} + +impl Dialer for SerialDialer +where + Rd: Read + 'static, + Wr: Write + 'static, +{ + fn connect(&self) -> BoxFut<'_, TransportResult>> { + Box::pin(SendFutureWrapper(async move { + let (rx, tx) = self.halves.borrow_mut().take().ok_or(TransportError::Io)?; + Ok(Box::new(EmbassySerialConnection::new(rx, tx)) as Box) + })) + } +} + +/// The accepting (server) side. Serial is point-to-point: the first +/// [`accept`](Listener::accept) hands out the connection; later calls park forever. +pub struct SerialListener { + halves: Option<(Rd, Wr)>, +} + +// SAFETY: single-core cooperative Embassy executor — see the connection above. +unsafe impl Send for SerialListener {} + +impl SerialListener { + /// Wrap the split UART halves as a one-shot listener. + pub fn new(rx: Rd, tx: Wr) -> Self { + Self { + halves: Some((rx, tx)), + } + } +} + +impl Listener for SerialListener +where + Rd: Read + 'static, + Wr: Write + 'static, +{ + fn accept(&mut self) -> BoxFut<'_, TransportResult>> { + Box::pin(SendFutureWrapper(async move { + match self.halves.take() { + Some((rx, tx)) => { + Ok(Box::new(EmbassySerialConnection::new(rx, tx)) as Box) + } + // Point-to-point: no second peer ever arrives. + None => core::future::pending().await, + } + })) + } +} + +// =========================================================================== +// Client sugar (hand-rolled ConnectorBuilder) +// =========================================================================== + +/// Mirrors records to/from an AimX peer over a serial UART. Register it via +/// `with_connector`; declare the routes with `link_to`/`link_from` on the +/// `serial://` scheme (or override with [`scheme`](Self::scheme)). +pub struct SerialClient { + halves: RefCell>, + config: ClientConfig, + scheme: String, +} + +// SAFETY: single-core cooperative Embassy executor — see the connection above. +// `ConnectorBuilder: Send + Sync`, so the builder must assert both. +unsafe impl Send for SerialClient {} +unsafe impl Sync for SerialClient {} + +impl SerialClient { + /// Build a client over the split UART halves (e.g. from `Uart::split()`). + /// + /// Reconnect is disabled by default: the peripheral is moved in and can't be + /// re-acquired after a drop. Override with [`with_config`](Self::with_config). + pub fn new(rx: Rd, tx: Wr) -> Self { + let config = ClientConfig { + reconnect: false, + ..ClientConfig::default() + }; + Self { + halves: RefCell::new(Some((rx, tx))), + config, + scheme: DEFAULT_SCHEME.to_string(), + } + } + + /// Override the scheme this connector registers. + pub fn scheme(mut self, scheme: impl Into) -> Self { + self.scheme = scheme.into(); + self + } + + /// Override the client engine config (keepalive, offline queue, …). Note that + /// re-enabling `reconnect` cannot re-open the moved-in UART. + pub fn with_config(mut self, config: ClientConfig) -> Self { + self.config = config; + self + } +} + +impl ConnectorBuilder for SerialClient +where + R: TimeOps + 'static, + Rd: Read + 'static, + Wr: Write + 'static, +{ + fn build<'a>(&'a self, db: &'a AimDb) -> BuildFuture<'a> { + Box::pin(SendFutureWrapper(async move { + let (rx, tx) = self + .halves + .borrow_mut() + .take() + .ok_or_else(connector_consumed)?; + let dialer = SerialDialer::new(rx, tx); + let (handle, engine) = + run_client(dialer, AimxCodec, self.config.clone(), db.runtime_arc()); + // One pump future per route; each holds a `ClientHandle` clone, so the + // engine stays alive as long as any mirror runs. + let mut futures = pump_client(db, &self.scheme, &handle); + futures.push(engine); + Ok(futures) + })) + } + + fn scheme(&self) -> &str { + &self.scheme + } +} + +// =========================================================================== +// Server sugar (hand-rolled ConnectorBuilder) +// =========================================================================== + +/// Serves the full AimX toolset over a serial UART, so a host (or another board) +/// can `record.list`/`get`/`set`/`subscribe`/`drain` this db over the wire. +pub struct SerialServer { + halves: RefCell>, + config: AimxConfig, + scheme: String, +} + +// SAFETY: single-core cooperative Embassy executor — see the connection above. +unsafe impl Send for SerialServer {} +unsafe impl Sync for SerialServer {} + +impl SerialServer { + /// Serve AimX over the split UART halves, with the default read-only policy. + pub fn new(rx: Rd, tx: Wr) -> Self { + Self { + halves: RefCell::new(Some((rx, tx))), + config: AimxConfig::uds_default(), + scheme: DEFAULT_SCHEME.to_string(), + } + } + + /// Use a prepared [`AimxConfig`] for the security policy / limits (the + /// `socket_path` / `socket_permissions` fields are unused over serial). + pub fn with_config(mut self, config: AimxConfig) -> Self { + self.config = config; + self + } + + /// Set the security policy (read-only vs. per-record-writable). + pub fn security_policy(mut self, policy: SecurityPolicy) -> Self { + self.config = self.config.security_policy(policy); + self + } + + /// Maximum live subscriptions for the connection. + pub fn max_subs_per_connection(mut self, max: usize) -> Self { + self.config = self.config.max_subs_per_connection(max); + self + } + + /// Override the scheme this connector registers. + pub fn scheme(mut self, scheme: impl Into) -> Self { + self.scheme = scheme.into(); + self + } +} + +impl ConnectorBuilder for SerialServer +where + R: RuntimeAdapter + 'static, + Rd: Read + 'static, + Wr: Write + 'static, +{ + fn build<'a>(&'a self, db: &'a AimDb) -> BuildFuture<'a> { + Box::pin(SendFutureWrapper(async move { + let (rx, tx) = self + .halves + .borrow_mut() + .take() + .ok_or_else(connector_consumed)?; + let listener = SerialListener::new(rx, tx); + + // Apply the security policy's writable marking so `record.list` reports + // the `writable` flag (the dispatch also enforces it). + apply_writable(db, &self.config); + + let session_config = SessionConfig { + limits: SessionLimits { + // A UART carries a single peer. + max_connections: 1, + max_subs_per_connection: self.config.max_subs_per_connection, + }, + reads_hello: false, + // AimX's subscribe ack stays implicit (events flow); no ack frame. + acks_subscribe: false, + }; + let dispatch: Arc = + Arc::new(AimxDispatch::new(Arc::new(db.clone()), self.config.clone())); + let fut: BoxFuture = Box::pin(serve( + listener, + Arc::new(AimxCodec), + dispatch, + session_config, + )); + Ok(vec![fut]) + })) + } + + fn scheme(&self) -> &str { + &self.scheme + } +} + +// =========================================================================== +// Helpers +// =========================================================================== + +/// The builder's UART halves were already taken — `build` ran twice. The +/// framework calls it once, so this is unreachable in practice. +fn connector_consumed() -> DbError { + DbError::MissingConfiguration { + #[cfg(feature = "std")] + parameter: String::from("serial connector already built"), + #[cfg(not(feature = "std"))] + _parameter: (), + } +} + +/// Mark each record named in the policy's writable set as writable, so +/// `record.list` advertises the `writable` flag. (Mirrors the UDS connector.) +fn apply_writable(db: &AimDb, config: &AimxConfig) +where + R: RuntimeAdapter + 'static, +{ + for key in config.security_policy.writable_records() { + if let Some(id) = db.inner().resolve_str(&key) { + if let Some(storage) = db.inner().storage(id) { + storage.set_writable_erased(true); + } + } + } +} diff --git a/aimdb-serial-connector/src/framing.rs b/aimdb-serial-connector/src/framing.rs new file mode 100644 index 00000000..58050fe3 --- /dev/null +++ b/aimdb-serial-connector/src/framing.rs @@ -0,0 +1,78 @@ +//! COBS frame boundaries over an unframed serial byte stream. +//! +//! The AimX codec emits raw JSON bytes; **framing is the transport's job** (the +//! UDS transport delimits with `\n`; serial uses COBS + a `0x00` sentinel). COBS +//! (Consistent Overhead Byte Stuffing) rewrites a payload so it never contains a +//! `0x00`, then a single `0x00` marks the frame boundary — so a receiver that +//! joins mid-stream resynchronizes on the next sentinel. AimX JSON never contains +//! a raw `0x00`, so the encoding is overhead-minimal (one byte per ~254). +//! +//! This module is shared by both runtime halves and is pure `no_std + alloc`, so +//! the round-trip is unit-tested on the host without any transport. + +use alloc::vec::Vec; + +/// A delimited chunk was not valid COBS — line noise, a truncated frame, or a +/// mid-stream join before the next sentinel. The transports map this to a +/// `TransportError`. Self-contained so the framing round-trip is testable without +/// the `connector-session` substrate. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct FrameError; + +/// Frame sentinel — the byte COBS guarantees never appears inside an encoded +/// frame, so it cleanly delimits one frame from the next. +pub const DELIM: u8 = 0x00; + +/// COBS-encode `payload` and append it (plus the trailing [`DELIM`]) to `out`. +/// +/// The dual of [`FrameAccumulator::next_frame`]: `decode(strip_delim(encode(p))) == p`. +pub fn encode_frame(payload: &[u8], out: &mut Vec) { + out.extend_from_slice(&cobs::encode_vec(payload)); + out.push(DELIM); +} + +/// Reassembles whole COBS frames from arbitrarily-chunked reads. +/// +/// A serial `read()` returns whatever bytes happen to be buffered — a partial +/// frame, exactly one frame, or several. Push raw bytes in with +/// [`push_bytes`](Self::push_bytes); pull each complete frame out with +/// [`next_frame`](Self::next_frame) until it returns `None` (need more bytes). +#[derive(Default)] +pub struct FrameAccumulator { + buf: Vec, +} + +impl FrameAccumulator { + /// A fresh, empty accumulator. + pub fn new() -> Self { + Self { buf: Vec::new() } + } + + /// Append freshly-read bytes to the pending buffer. + pub fn push_bytes(&mut self, data: &[u8]) { + self.buf.extend_from_slice(data); + } + + /// Pop the next complete frame, COBS-decoded. + /// + /// - `None` — no `DELIM` buffered yet; read more bytes and retry. + /// - `Some(Ok(frame))` — one decoded payload (the `DELIM` is consumed). + /// - `Some(Err(FrameError))` — the delimited chunk was not valid COBS (line + /// noise / a desync); the chunk is consumed so the next call resynchronizes + /// on the following sentinel. + pub fn next_frame(&mut self) -> Option, FrameError>> { + loop { + let pos = self.buf.iter().position(|&b| b == DELIM)?; + // Take the chunk up to and including the sentinel; the encoded frame is + // everything before it. + let chunk: Vec = self.buf.drain(..=pos).collect(); + let encoded = &chunk[..pos]; + // A leading/duplicate sentinel yields an empty chunk — not a frame; + // swallow it and look for the next one (resync). + if encoded.is_empty() { + continue; + } + return Some(cobs::decode_vec(encoded).map_err(|_| FrameError)); + } + } +} diff --git a/aimdb-serial-connector/src/lib.rs b/aimdb-serial-connector/src/lib.rs new file mode 100644 index 00000000..284f15b1 --- /dev/null +++ b/aimdb-serial-connector/src/lib.rs @@ -0,0 +1,62 @@ +//! COBS-framed serial/UART transport connector for AimDB — record mirroring and +//! remote access over a serial line. +//! +//! A thin, swappable transport crate (the serial sibling of `aimdb-uds-connector`): +//! it contributes only the `Dialer`/`Listener`/`Connection` triple plus thin +//! sugar; the AimX codec ([`AimxCodec`](aimdb_core::session::aimx::AimxCodec)), +//! dispatch ([`AimxDispatch`](aimdb_core::session::aimx::AimxDispatch)), and the +//! runtime-neutral session engines are reused verbatim from `aimdb-core`. +//! +//! The wire is the same compact AimX JSON as UDS, but framed with **COBS** +//! (Consistent Overhead Byte Stuffing) and a `0x00` delimiter instead of a +//! newline — self-synchronizing on a lossy/unframed serial medium. See +//! [`framing`]. +//! +//! # Two halves +//! +//! - **`tokio-runtime`** (std, host/gateway): real serial via `tokio-serial`, +//! riding the generic [`SessionClientConnector`](aimdb_core::session::SessionClientConnector) +//! / [`SessionServerConnector`](aimdb_core::session::SessionServerConnector). +//! See [`tokio_transport`]. +//! - **`embassy-runtime`** (`no_std + alloc`, MCU): generic over +//! [`embedded_io_async`] UART halves; hand-rolls the `ConnectorBuilder` and +//! force-`Send`s the engine futures via `aimdb-embassy-adapter`'s +//! `SendFutureWrapper`. See [`embassy_transport`]. +//! +//! Both speak the `serial://` scheme by default ([`DEFAULT_SCHEME`]). + +#![cfg_attr(not(feature = "std"), no_std)] + +extern crate alloc; + +pub mod framing; + +#[cfg(feature = "tokio-runtime")] +pub mod tokio_transport; + +#[cfg(feature = "embassy-runtime")] +pub mod embassy_transport; + +/// The default scheme `SerialClient`/`SerialServer` register when none is given. +/// +/// Transport-matched (like UDS's `"uds"`), so `link_to("serial://")` reads +/// at the call site. Override with `.scheme(...)` when running more than one +/// remote connector. +pub const DEFAULT_SCHEME: &str = "serial"; + +// Prefer the tokio names when both halves are compiled (e.g. host tests). +#[cfg(all(feature = "tokio-runtime", not(feature = "embassy-runtime")))] +pub use tokio_transport::{SerialClient, SerialDialer, SerialListener, SerialServer}; + +#[cfg(all(feature = "tokio-runtime", feature = "embassy-runtime"))] +pub use embassy_transport::{ + SerialClient as EmbassySerialClient, SerialServer as EmbassySerialServer, +}; +#[cfg(all(feature = "tokio-runtime", feature = "embassy-runtime"))] +pub use tokio_transport::{ + SerialClient as TokioSerialClient, SerialDialer, SerialListener, + SerialServer as TokioSerialServer, +}; + +#[cfg(all(feature = "embassy-runtime", not(feature = "tokio-runtime")))] +pub use embassy_transport::{SerialClient, SerialServer}; diff --git a/aimdb-serial-connector/src/tokio_transport.rs b/aimdb-serial-connector/src/tokio_transport.rs new file mode 100644 index 00000000..31bcda83 --- /dev/null +++ b/aimdb-serial-connector/src/tokio_transport.rs @@ -0,0 +1,317 @@ +//! tokio serial transport (feature `tokio-runtime`) — a [`Connection`] over an +//! async byte stream with COBS framing in the transport, plus +//! [`SerialClient`]/[`SerialServer`] sugar over the generic core connectors. +//! +//! The connection is generic over `AsyncRead + AsyncWrite` so it backs a real +//! `tokio_serial::SerialStream` in production and a `tokio::io::duplex()` pipe in +//! tests — no hardware needed. + +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; + +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use tokio_serial::{SerialPortBuilderExt, SerialStream}; + +use aimdb_core::connector::ConnectorBuilder; +use aimdb_core::remote::{AimxConfig, SecurityPolicy}; +use aimdb_core::session::aimx::{AimxCodec, AimxDispatch}; +use aimdb_core::session::{ + BoxFut, Connection, Dialer, Dispatch, Listener, PeerInfo, SessionClientConnector, + SessionConfig, SessionLimits, SessionServerConnector, TransportError, TransportResult, +}; +use aimdb_core::{AimDb, DbError, DbResult, RuntimeAdapter}; + +use crate::framing::{encode_frame, FrameAccumulator}; +use crate::DEFAULT_SCHEME; + +type BoxFuture = Pin + Send + 'static>>; +type BuildFuture<'a> = Pin>> + Send + 'a>>; + +/// How many bytes a single serial `read()` pulls before re-checking for a frame. +const READ_CHUNK: usize = 256; + +// =========================================================================== +// Connection +// =========================================================================== + +/// A framed bidirectional pipe over an async serial byte stream. Framing lives in +/// the transport: [`recv`](Connection::recv) returns one COBS frame (sentinel +/// stripped); [`send`](Connection::send) COBS-encodes and appends the sentinel. +pub struct TokioSerialConnection { + stream: S, + acc: FrameAccumulator, + peer: PeerInfo, +} + +impl TokioSerialConnection { + /// Wrap an already-open async byte stream (a `SerialStream`, or a duplex pipe + /// in tests). + pub fn new(stream: S) -> Self { + Self { + stream, + acc: FrameAccumulator::new(), + peer: PeerInfo::default(), + } + } +} + +impl Connection for TokioSerialConnection +where + S: AsyncRead + AsyncWrite + Unpin + Send, +{ + fn recv(&mut self) -> BoxFut<'_, TransportResult>>> { + Box::pin(async move { + loop { + if let Some(frame) = self.acc.next_frame() { + return Ok(Some(frame.map_err(|_| TransportError::Io)?)); + } + let mut chunk = [0u8; READ_CHUNK]; + match self.stream.read(&mut chunk).await { + Ok(0) => return Ok(None), // EOF — peer closed + Ok(n) => self.acc.push_bytes(&chunk[..n]), + Err(_) => return Err(TransportError::Io), + } + } + }) + } + + fn send<'a>(&'a mut self, frame: &'a [u8]) -> BoxFut<'a, TransportResult<()>> { + Box::pin(async move { + let mut out = Vec::new(); + encode_frame(frame, &mut out); + self.stream + .write_all(&out) + .await + .map_err(|_| TransportError::Closed)?; + self.stream + .flush() + .await + .map_err(|_| TransportError::Closed) + }) + } + + fn peer(&self) -> &PeerInfo { + &self.peer + } +} + +// =========================================================================== +// Dialer / Listener +// =========================================================================== + +/// The initiating (client) side: opens the serial port on each +/// [`connect`](Dialer::connect). Cheap to clone (path + baud), so `run_client` +/// can redial and the generic `SessionClientConnector` can hold it. +#[derive(Clone)] +pub struct SerialDialer { + path: String, + baud: u32, +} + +impl SerialDialer { + /// Dial the serial device at `path` (e.g. `/dev/ttyUSB0`) at `baud`. + pub fn new(path: impl Into, baud: u32) -> Self { + Self { + path: path.into(), + baud, + } + } +} + +impl Dialer for SerialDialer { + fn connect(&self) -> BoxFut<'_, TransportResult>> { + Box::pin(async move { + let stream = tokio_serial::new(&self.path, self.baud) + .open_native_async() + .map_err(|_| TransportError::Io)?; + Ok(Box::new(TokioSerialConnection::new(stream)) as Box) + }) + } +} + +/// The accepting (server) side. Serial is point-to-point, so this is a one-shot +/// listener: the first [`accept`](Listener::accept) hands out the (already-open) +/// port; later calls park forever (there is only ever one peer on a UART). +pub struct SerialListener { + stream: Option, +} + +impl SerialListener { + /// Wrap an already-open serial port. + pub fn new(stream: SerialStream) -> Self { + Self { + stream: Some(stream), + } + } +} + +impl Listener for SerialListener { + fn accept(&mut self) -> BoxFut<'_, TransportResult>> { + Box::pin(async move { + match self.stream.take() { + Some(s) => Ok(Box::new(TokioSerialConnection::new(s)) as Box), + // Point-to-point: no second peer ever arrives. + None => core::future::pending().await, + } + }) + } +} + +// =========================================================================== +// Client sugar +// =========================================================================== + +/// Constructs a [`SessionClientConnector`] that dials an AimX peer over a serial +/// port. `SerialClient::new(path, baud)` is sugar; chain `.scheme(...)` / +/// `.with_config(...)` on the returned connector. +pub struct SerialClient; + +impl SerialClient { + /// Mirror records to/from the AimX peer reachable at serial `path` (scheme + /// defaults to [`DEFAULT_SCHEME`]). + // Sugar constructor: intentionally returns the generic connector, not `Self`. + #[allow(clippy::new_ret_no_self)] + pub fn new( + path: impl Into, + baud: u32, + ) -> SessionClientConnector { + SessionClientConnector::new(SerialDialer::new(path, baud), AimxCodec).scheme(DEFAULT_SCHEME) + } +} + +// =========================================================================== +// Server sugar +// =========================================================================== + +/// Accepts an AimX connection over a serial port and serves the full AimX +/// toolset. Register it via `with_connector` to let a host (or another board) +/// query this db over a UART. +pub struct SerialServer { + path: String, + baud: u32, + config: AimxConfig, + scheme: String, +} + +impl SerialServer { + /// Serve AimX over the serial device at `path` (e.g. `/dev/ttyUSB0`) at + /// `baud`, with the default read-only policy / limits. + pub fn new(path: impl Into, baud: u32) -> Self { + Self { + path: path.into(), + baud, + config: AimxConfig::uds_default(), + scheme: DEFAULT_SCHEME.to_string(), + } + } + + /// Use a prepared [`AimxConfig`] for the security policy / limits (the + /// `socket_path` / `socket_permissions` fields are unused over serial). + pub fn with_config(mut self, config: AimxConfig) -> Self { + self.config = config; + self + } + + /// Set the security policy (read-only vs. per-record-writable). + pub fn security_policy(mut self, policy: SecurityPolicy) -> Self { + self.config = self.config.security_policy(policy); + self + } + + /// Maximum live subscriptions per connection. + pub fn max_subs_per_connection(mut self, max: usize) -> Self { + self.config = self.config.max_subs_per_connection(max); + self + } + + /// Override the scheme this connector registers. + pub fn scheme(mut self, scheme: impl Into) -> Self { + self.scheme = scheme.into(); + self + } +} + +impl ConnectorBuilder for SerialServer +where + R: RuntimeAdapter + 'static, +{ + fn build<'a>(&'a self, db: &'a AimDb) -> BuildFuture<'a> { + let path = self.path.clone(); + let baud = self.baud; + let config = self.config.clone(); + let scheme = self.scheme.clone(); + Box::pin(async move { + let session_config = SessionConfig { + limits: SessionLimits { + // A UART carries a single peer; cap connections at 1. + max_connections: 1, + max_subs_per_connection: config.max_subs_per_connection, + }, + reads_hello: false, + // AimX's subscribe ack stays implicit (events flow); no ack frame. + acks_subscribe: false, + }; + let dispatch_config = config; + // Reuse the generic spine: open the port (errors surface synchronously) + // + AimX dispatch over the AimX codec. + let connector = SessionServerConnector::new( + move || open_serial_listener(&path, baud), + AimxCodec, + move |db: &AimDb| -> Arc { + // Apply the security policy's writable marking so `record.list` + // reports the `writable` flag (the dispatch also enforces it). + apply_writable(db, &dispatch_config); + Arc::new(AimxDispatch::new( + Arc::new(db.clone()), + dispatch_config.clone(), + )) + }, + session_config, + ) + .scheme(scheme); + connector.build(db).await + }) + } + + fn scheme(&self) -> &str { + &self.scheme + } +} + +// =========================================================================== +// Helpers +// =========================================================================== + +/// Open the serial port synchronously so an open error surfaces from `build`. +fn open_serial_listener(path: &str, baud: u32) -> DbResult { + #[cfg(feature = "tracing")] + tracing::info!( + "Initializing AimX serial server on {} @ {} baud", + path, + baud + ); + + let stream = tokio_serial::new(path, baud) + .open_native_async() + .map_err(|e| DbError::IoWithContext { + context: format!("Failed to open serial port {} @ {} baud", path, baud), + source: std::io::Error::other(e), + })?; + Ok(SerialListener::new(stream)) +} + +/// Mark each record named in the policy's writable set as writable, so +/// `record.list` advertises the `writable` flag. (Mirrors the UDS connector.) +fn apply_writable(db: &AimDb, config: &AimxConfig) +where + R: RuntimeAdapter + 'static, +{ + for key in config.security_policy.writable_records() { + if let Some(id) = db.inner().resolve_str(&key) { + if let Some(storage) = db.inner().storage(id) { + storage.set_writable_erased(true); + } + } + } +} diff --git a/aimdb-serial-connector/tests/cobs_framing.rs b/aimdb-serial-connector/tests/cobs_framing.rs new file mode 100644 index 00000000..d773d67a --- /dev/null +++ b/aimdb-serial-connector/tests/cobs_framing.rs @@ -0,0 +1,89 @@ +//! COBS framing round-trips regardless of how the byte stream is chunked — the +//! property a real UART needs (a `read()` returns an arbitrary slice of the wire). +//! +//! Transport-agnostic: exercises only [`framing`](aimdb_serial_connector::framing), +//! so it runs under the crate's default features (no runtime needed). + +use aimdb_serial_connector::framing::{encode_frame, FrameAccumulator, DELIM}; + +/// Encode several payloads into one byte stream, then feed that stream to a fresh +/// accumulator in `chunk`-sized slices and collect every frame it yields. +fn roundtrip_in_chunks(payloads: &[&[u8]], chunk: usize) -> Vec> { + let mut wire = Vec::new(); + for p in payloads { + encode_frame(p, &mut wire); + } + + let mut acc = FrameAccumulator::new(); + let mut out = Vec::new(); + for bytes in wire.chunks(chunk.max(1)) { + acc.push_bytes(bytes); + while let Some(frame) = acc.next_frame() { + out.push(frame.expect("valid COBS frame")); + } + } + out +} + +#[test] +fn single_frame_roundtrips() { + let payload = br#"{"t":"req","id":1,"method":"record.list"}"#; + let frames = roundtrip_in_chunks(&[payload], usize::MAX); + assert_eq!(frames, vec![payload.to_vec()]); +} + +#[test] +fn survives_byte_by_byte_delivery() { + // The pathological case: the receiver sees one byte at a time, so a frame is + // only complete on the read that delivers the sentinel. + let payload = br#"{"level":42,"nested":{"a":[1,2,3]}}"#; + let frames = roundtrip_in_chunks(&[payload], 1); + assert_eq!(frames, vec![payload.to_vec()]); +} + +#[test] +fn many_frames_across_arbitrary_chunk_boundaries() { + let payloads: &[&[u8]] = &[b"a", b"bb", b"ccc", br#"{"k":"v"}"#, b"\x01\x02\x03"]; + // Every chunk size must reassemble the exact same frame sequence. + for chunk in [1usize, 2, 3, 5, 7, 64, usize::MAX] { + let frames = roundtrip_in_chunks(payloads, chunk); + let expected: Vec> = payloads.iter().map(|p| p.to_vec()).collect(); + assert_eq!(frames, expected, "chunk size {chunk}"); + } +} + +#[test] +fn empty_payload_roundtrips() { + let frames = roundtrip_in_chunks(&[b""], usize::MAX); + assert_eq!(frames, vec![Vec::::new()]); +} + +#[test] +fn encoded_frame_never_contains_the_sentinel_except_the_terminator() { + let mut wire = Vec::new(); + encode_frame(br#"{"any":"json payload"}"#, &mut wire); + assert_eq!(wire.last(), Some(&DELIM), "frame ends with the sentinel"); + assert_eq!( + wire[..wire.len() - 1] + .iter() + .filter(|&&b| b == DELIM) + .count(), + 0, + "the COBS body is free of the sentinel byte" + ); +} + +#[test] +fn resynchronizes_after_a_leading_sentinel() { + // A receiver that joins mid-frame sees a stray sentinel first; it should be + // skipped and the next whole frame recovered. + let mut acc = FrameAccumulator::new(); + acc.push_bytes(&[DELIM]); // garbage tail of a frame we joined late + assert!(acc.next_frame().is_none()); + + let mut wire = Vec::new(); + encode_frame(b"hello", &mut wire); + acc.push_bytes(&wire); + assert_eq!(acc.next_frame().expect("frame").expect("valid"), b"hello"); + assert!(acc.next_frame().is_none()); +} diff --git a/aimdb-serial-connector/tests/embassy_smoke.rs b/aimdb-serial-connector/tests/embassy_smoke.rs new file mode 100644 index 00000000..23e395cc --- /dev/null +++ b/aimdb-serial-connector/tests/embassy_smoke.rs @@ -0,0 +1,161 @@ +//! Embassy client-exit smoke — the runtime-neutral `run_client` engine drives RPC +//! over the **real** Embassy serial transport ([`SerialDialer`] / +//! `EmbassySerialConnection`, COBS over `embedded-io-async`) on the +//! [`EmbassyAdapter`] clock. The `thumbv7em` monomorphization an MCU uses, driven +//! on the host by `futures::executor::block_on` (no `embassy-executor`, which does +//! not build on the host). +//! +//! Promotes Phase 5's stub-transport smoke +//! (`aimdb-embassy-adapter/tests/session_smoke.rs`) to the real serial transport: +//! a loopback UART carries the framed request back as its own reply (an +//! [`EchoCodec`], so no second node is needed), exercising COBS encode → wire → +//! decode under the engine. + +#![cfg(feature = "embassy-runtime")] + +extern crate alloc; + +use alloc::collections::VecDeque; +use alloc::rc::Rc; +use core::cell::RefCell; +use core::future::poll_fn; +use core::task::{Poll, Waker}; +use std::sync::Arc; + +use embedded_io_async::{ErrorKind, ErrorType, Read, Write}; + +use aimdb_core::session::{ + run_client, ClientConfig, CodecError, EnvelopeCodec, Inbound, Outbound, Payload, +}; +use aimdb_embassy_adapter::EmbassyAdapter; +use aimdb_serial_connector::embassy_transport::SerialDialer; + +// Trivial host time driver so `embassy_time` links (the happy path never awaits +// `clock.sleep`, so the driver is never actually exercised). +struct TestTimeDriver; +impl embassy_time_driver::Driver for TestTimeDriver { + fn now(&self) -> u64 { + 0 + } + fn schedule_wake(&self, _at: u64, _waker: &core::task::Waker) {} +} +embassy_time_driver::time_driver_impl!(static TEST_TIME_DRIVER: TestTimeDriver = TestTimeDriver); + +/// Minimal echo wire: a `Request` is `[id:8][params]`; the loopback returns those +/// bytes verbatim, which `decode_outbound` reads back as `Reply { id, Ok(params) }`. +struct EchoCodec; + +impl EnvelopeCodec for EchoCodec { + fn decode(&self, _frame: &[u8]) -> Result { + Err(CodecError::Malformed) // server direction unused by this client smoke + } + fn encode(&self, _msg: Outbound<'_>, _out: &mut Vec) -> Result<(), CodecError> { + Err(CodecError::Malformed) + } + fn encode_inbound(&self, msg: Inbound, out: &mut Vec) -> Result<(), CodecError> { + match msg { + Inbound::Request { id, params, .. } => { + out.extend_from_slice(&id.to_be_bytes()); + out.extend_from_slice(¶ms); + Ok(()) + } + _ => Err(CodecError::Malformed), + } + } + fn decode_outbound<'a>(&self, frame: &'a [u8]) -> Result, CodecError> { + if frame.len() < 8 { + return Err(CodecError::Malformed); + } + let id = u64::from_be_bytes(frame[0..8].try_into().unwrap()); + Ok(Outbound::Reply { + id, + result: Ok(Payload::from(&frame[8..])), + }) + } +} + +/// A single-threaded async byte loopback: bytes written to the shared queue become +/// readable from the same handle. Two clones (one as `rx`, one as `tx`) form the +/// UART halves of a self-replying serial port. +#[derive(Clone, Default)] +struct LoopbackUart { + shared: Rc>, +} + +#[derive(Default)] +struct Shared { + buf: VecDeque, + reader_waker: Option, +} + +impl ErrorType for LoopbackUart { + type Error = ErrorKind; +} + +impl Write for LoopbackUart { + async fn write(&mut self, data: &[u8]) -> Result { + let mut s = self.shared.borrow_mut(); + s.buf.extend(data.iter().copied()); + if let Some(w) = s.reader_waker.take() { + w.wake(); + } + Ok(data.len()) + } + + async fn flush(&mut self) -> Result<(), Self::Error> { + Ok(()) // in-memory loopback: writes are immediately visible + } +} + +impl Read for LoopbackUart { + async fn read(&mut self, out: &mut [u8]) -> Result { + poll_fn(|cx| { + let mut s = self.shared.borrow_mut(); + if s.buf.is_empty() { + s.reader_waker = Some(cx.waker().clone()); + return Poll::Pending; + } + let n = out.len().min(s.buf.len()); + for slot in out.iter_mut().take(n) { + *slot = s.buf.pop_front().unwrap(); + } + Poll::Ready(Ok(n)) + }) + .await + } +} + +#[test] +fn embassy_clock_drives_client_engine_rpc_over_serial() { + use futures::executor::block_on; + use futures::future::{select, Either}; + + // The exact `run_client, _, EmbassyAdapter>` monomorphization + // an MCU build uses — over the real COBS serial connection. + let clock = Arc::new(EmbassyAdapter::default()); + let config = ClientConfig { + reconnect: false, + sends_hello: false, + ..ClientConfig::default() + }; + + let uart = LoopbackUart::default(); + let dialer = SerialDialer::new(uart.clone(), uart); + let (handle, engine_fut) = run_client(dialer, EchoCodec, config, clock); + + block_on(async move { + futures::pin_mut!(engine_fut); + let call = handle.call("echo", Payload::from(&b"ping"[..])); + futures::pin_mut!(call); + + // Drive the engine concurrently with the call; the reply must arrive (the + // framed request, COBS round-tripped through the loopback) before the engine + // ends. + match select(call, engine_fut).await { + Either::Left((reply, _engine)) => { + assert_eq!(&*reply.expect("call should resolve"), b"ping"); + } + Either::Right(_) => panic!("engine ended before the reply arrived"), + } + }); +} diff --git a/aimdb-serial-connector/tests/tokio_roundtrip.rs b/aimdb-serial-connector/tests/tokio_roundtrip.rs new file mode 100644 index 00000000..fcc7b239 --- /dev/null +++ b/aimdb-serial-connector/tests/tokio_roundtrip.rs @@ -0,0 +1,131 @@ +//! End-to-end AimX over the tokio serial transport, without hardware: the two +//! ends of a `tokio::io::duplex()` pipe stand in for a crossover serial cable. +//! The production server (`serve` + `AimxDispatch`) answers on one end; the +//! `run_client` engine drives RPC on the other — proving `TokioSerialConnection`'s +//! COBS framing carries the real protocol both directions. + +#![cfg(feature = "_test-tokio")] + +use std::sync::Arc; +use std::sync::Mutex; +use std::time::Duration; + +use aimdb_core::buffer::BufferCfg; +use aimdb_core::remote::AimxConfig; +use aimdb_core::session::aimx::{AimxCodec, AimxDispatch}; +use aimdb_core::session::{ + run_client, serve, BoxFut, ClientConfig, Connection, Dialer, Dispatch, Listener, Payload, + SessionConfig, SessionLimits, TransportError, TransportResult, +}; +use aimdb_core::AimDbBuilder; +use aimdb_serial_connector::tokio_transport::TokioSerialConnection; +use aimdb_tokio_adapter::{TokioAdapter, TokioRecordRegistrarExt}; +use serde::{Deserialize, Serialize}; +use serde_json::json; +use tokio::io::DuplexStream; + +/// A writable config-style record (SingleLatest, no producer → remotely settable). +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +struct Setting { + level: u64, +} + +/// One-shot dialer over an in-memory duplex end (stands in for opening the port). +struct OnceDialer(Mutex>); + +impl Dialer for OnceDialer { + fn connect(&self) -> BoxFut<'_, TransportResult>> { + Box::pin(async move { + let end = self.0.lock().unwrap().take().ok_or(TransportError::Io)?; + Ok(Box::new(TokioSerialConnection::new(end)) as Box) + }) + } +} + +/// One-shot listener over the other duplex end (point-to-point, like a UART). +struct OnceListener(Option); + +impl Listener for OnceListener { + fn accept(&mut self) -> BoxFut<'_, TransportResult>> { + Box::pin(async move { + match self.0.take() { + Some(end) => Ok(Box::new(TokioSerialConnection::new(end)) as Box), + None => core::future::pending().await, + } + }) + } +} + +#[tokio::test] +async fn aimx_roundtrips_over_the_serial_transport() { + let (server_end, client_end) = tokio::io::duplex(8192); + + // A real server db with one remotely-readable record (no connector — we drive + // `serve` directly over the duplex below). + let mut builder = AimDbBuilder::new().runtime(Arc::new(TokioAdapter)); + builder.configure::("setting", |reg| { + reg.buffer(BufferCfg::SingleLatest).with_remote_access(); + }); + let (db, runner) = builder.build().await.expect("build db"); + let db = Arc::new(db); + tokio::spawn(runner.run()); + db.set_record_from_json("setting", json!({ "level": 42 })) + .expect("seed setting"); + + // Server: AimX dispatch over the server end of the pipe. + let dispatch: Arc = + Arc::new(AimxDispatch::new(db.clone(), AimxConfig::uds_default())); + let session_config = SessionConfig { + limits: SessionLimits { + max_connections: 1, + max_subs_per_connection: 8, + }, + reads_hello: false, + acks_subscribe: false, + }; + tokio::spawn(serve( + OnceListener(Some(server_end)), + Arc::new(AimxCodec), + dispatch, + session_config, + )); + + // Client: the proactive engine over the client end. + let client_config = ClientConfig { + sends_hello: false, + ..ClientConfig::default() + }; + let (handle, engine) = run_client( + OnceDialer(Mutex::new(Some(client_end))), + AimxCodec, + client_config, + Arc::new(TokioAdapter), + ); + tokio::spawn(engine); + + // RPC: record.get {name:"setting"} → the seeded value, carried over COBS frames. + let params: Payload = serde_json::to_vec(&json!({ "name": "setting" })) + .unwrap() + .into(); + let reply = tokio::time::timeout(Duration::from_secs(5), handle.call("record.get", params)) + .await + .expect("rpc within timeout") + .expect("rpc ok"); + let value: serde_json::Value = serde_json::from_slice(&reply).expect("json reply"); + assert_eq!(value, json!({ "level": 42 })); + + // A second RPC reuses the same framed connection (proves the stream re-syncs + // frame-to-frame, not just on the first message). + let list = tokio::time::timeout( + Duration::from_secs(5), + handle.call("record.list", Payload::from(&b"{}"[..])), + ) + .await + .expect("rpc within timeout") + .expect("rpc ok"); + let records: serde_json::Value = serde_json::from_slice(&list).expect("json reply"); + assert!( + records.as_array().is_some_and(|a| !a.is_empty()), + "record.list returns the configured records: {records}" + ); +} diff --git a/examples/embassy-serial-connector-demo/.cargo/config.toml b/examples/embassy-serial-connector-demo/.cargo/config.toml new file mode 100644 index 00000000..5aac79b7 --- /dev/null +++ b/examples/embassy-serial-connector-demo/.cargo/config.toml @@ -0,0 +1,8 @@ +[target.thumbv8m.main-none-eabihf] +runner = 'probe-rs run --chip STM32H563ZITx' + +[build] +target = "thumbv8m.main-none-eabihf" + +[env] +DEFMT_LOG = "info" diff --git a/examples/embassy-serial-connector-demo/.gitignore b/examples/embassy-serial-connector-demo/.gitignore new file mode 100644 index 00000000..8112e1e6 --- /dev/null +++ b/examples/embassy-serial-connector-demo/.gitignore @@ -0,0 +1,5 @@ +target/ +Cargo.lock +*.bin +*.elf +*.hex diff --git a/examples/embassy-serial-connector-demo/Cargo.toml b/examples/embassy-serial-connector-demo/Cargo.toml new file mode 100644 index 00000000..8690fc34 --- /dev/null +++ b/examples/embassy-serial-connector-demo/Cargo.toml @@ -0,0 +1,72 @@ +[package] +edition = "2024" +name = "embassy-serial-connector-demo" +version = "0.1.0" +license = "MIT OR Apache-2.0" +publish = false +description = "AimDB example: an STM32H563 serves AimX over a serial UART (ST-LINK VCP) via the serial connector + Embassy" + +[features] +default = ["embassy-runtime"] +embassy-runtime = [] + +[dependencies] +# AimDB +aimdb-core = { path = "../../aimdb-core", default-features = false, features = [ + "derive", +] } +aimdb-embassy-adapter = { path = "../../aimdb-embassy-adapter", default-features = false, features = [ + "embassy-runtime", +] } +aimdb-executor = { path = "../../aimdb-executor", default-features = false, features = [ + "embassy-types", +] } +aimdb-serial-connector = { path = "../../aimdb-serial-connector", default-features = false, features = [ + "embassy-runtime", + "defmt", +] } + +# Embassy ecosystem - STM32H563ZI +embassy-stm32 = { workspace = true, features = [ + "defmt", + "stm32h563zi", + "memory-x", + "time-driver-any", + "unstable-pac", +] } +embassy-sync = { workspace = true, features = ["defmt"] } +embassy-executor = { workspace = true, features = [ + "platform-cortex-m", + "executor-thread", + "defmt", +] } +embassy-time = { workspace = true, features = [ + "defmt", + "defmt-timestamp-uptime", + "tick-hz-32_768", + "generic-queue-16", +] } +embassy-futures = { workspace = true } + +# Serde for the record type (RemoteSerialize = Serialize + DeserializeOwned) +serde = { workspace = true } + +# Embedded debugging and logging +defmt = { workspace = true } +defmt-rtt = { workspace = true } +panic-probe = { workspace = true } + +# Cortex-M runtime +cortex-m = { workspace = true } +cortex-m-rt = { workspace = true } +critical-section = { workspace = true } +static_cell = { workspace = true } + +# Embedded HAL / heap +embedded-io-async = { workspace = true } +embedded-alloc = { version = "0.6", features = ["llff"] } + +[package.metadata.embassy] +build = [ + { target = "thumbv8m.main-none-eabihf", artifact-dir = "out/examples/stm32h5" }, +] diff --git a/examples/embassy-serial-connector-demo/README.md b/examples/embassy-serial-connector-demo/README.md new file mode 100644 index 00000000..2c3d1d63 --- /dev/null +++ b/examples/embassy-serial-connector-demo/README.md @@ -0,0 +1,105 @@ +# embassy-serial-connector-demo + +A real end-to-end test of [`aimdb-serial-connector`](../../aimdb-serial-connector): +an **STM32H563ZI serves the AimX toolset over a UART**, and a host queries it over +the serial line. This exercises the `no_std` AimX server dispatch (Issue #120) over +the real Embassy serial transport (COBS framing over `embedded-io-async`). + +``` +┌────────────────────────────┐ USART3 (PD8/PD9) ┌──────────────────────┐ +│ STM32H563ZI (this firmware)│ ⇄ ST-LINK Virtual COM Port ⇄ │ host: serial_demo │ +│ SerialServer + AimxDispatch│ = /dev/ttyACM0 │ AimX client (RPC) │ +│ record: `counter` (++/sec) │ │ record.list/get loop │ +└────────────────────────────┘ └──────────────────────┘ +``` + +## Hardware + +- **STM32H563ZI Nucleo** (the same board the KNX/MQTT Embassy demos run on). +- The single **USB↔ST-LINK** cable you already use for flashing. On the + Nucleo-H563ZI the ST-LINK exposes a **Virtual COM Port** bridged to **USART3 + (PD8 = TX, PD9 = RX)**. No extra wiring. The host device node is: + - **Linux:** `/dev/ttyACM0` (`ls /dev/ttyACM*`) + - **macOS:** `/dev/cu.usbmodem…` (`ls /dev/cu.usbmodem*` — use the `cu.*`, not + `tty.*`, node) +- defmt logs stream separately over **RTT (SWD)**, so they don't collide with the + data UART. + +> ⚠️ **The VCP↔USART routing is board-specific** (ST-LINK solder bridges, per +> UM3115). If `record.list` times out, your board likely routes the VCP to a +> different USART — change the `USART3 / PD9 / PD8` line in +> [`src/main.rs`](src/main.rs), or wire a USB-TTL dongle to any free USART and +> point the host at the resulting `/dev/ttyUSB0`. + +## Run it + +**1. Build the firmware** (in the dev container): + +```bash +cd examples/embassy-serial-connector-demo +cargo build # → ../../target/thumbv8m.main-none-eabihf/debug/embassy-serial-connector-demo +``` + +**2. Flash + view logs** (on the host, where probe-rs + the ST-LINK live): + +```bash +./flash.sh # = probe-rs run --chip STM32H563ZITx +# RTT shows: "serving AimX over USART3 / ST-LINK VCP (record: counter) — connect the host now" +``` + +**3. Query it from the host** over the VCP (run from the **workspace root** — the +demo dir pins a different toolchain/target). Replace the device with yours +(`/dev/ttyACM0` on Linux, `/dev/cu.usbmodem…` on macOS): + +```bash +cd ../.. # workspace root +cargo run -p aimdb-serial-connector --example serial_demo \ + --features _test-tokio -- client /dev/ttyACM0 115200 +``` + +Expected output — the `counter` values are produced **on the MCU** and read back +over serial: + +``` +[client] record.list = [{"buffer_type":"single_latest",...,"record_key":"counter","type_id":"TypeId(0x…)","writable":false}] +[client] counter = {"value":173} +[client] counter = {"value":174} +[client] counter = {"value":175} +... +``` + +## No board? Smoke it over a PTY pair + +The host example is dual-mode, so two host processes can talk over a virtual serial +pair (needs `socat`): + +```bash +socat -d -d pty,raw,echo=0 pty,raw,echo=0 # prints two /dev/pts/N device names +# terminal A — host acts as the board: +cargo run -p aimdb-serial-connector --example serial_demo --features _test-tokio -- server /dev/pts/3 +# terminal B — host client: +cargo run -p aimdb-serial-connector --example serial_demo --features _test-tokio -- client /dev/pts/4 +``` + +## Troubleshooting + +These bit us during bring-up — check them first: + +- **Replug after flashing.** After `probe-rs` resets the MCU, the ST-LINK VCP often + doesn't re-enumerate cleanly, so the `/dev/cu.usbmodem…` (or `/dev/ttyACM…`) + handle is stale and the client gets nothing. **Physically unplug/replug the + board** before running the host (the device suffix may change — re-check with + `ls`). +- **One program per port.** Only one process may hold the serial device at a time. + A stray `screen` is the usual culprit — *closing its window only detaches it*, so + it keeps the port open and silently eats the board's replies. Find and kill + leftovers: `lsof ` then `kill -9 `; quit `screen` with `Ctrl-A` `K` + (not by closing the window). +- **Wrong device** — the VCP may enumerate with a different suffix. Linux: + `ls /dev/ttyACM*` (or `dmesg | tail`). macOS: `ls /dev/cu.usbmodem*`, and use the + `cu.*` node (the `tty.*` node blocks on open waiting for carrier-detect). +- **Permission denied (Linux)** — add yourself to the `dialout` group + (`sudo usermod -aG dialout $USER`, then re-login) or `sudo chmod a+rw /dev/ttyACM0`. +- **`record.list` hangs / wrong data** — usually the VCP↔USART mapping (see the + warning above) or a baud mismatch (the firmware uses **115200**; pass the same to + the host). diff --git a/examples/embassy-serial-connector-demo/build.rs b/examples/embassy-serial-connector-demo/build.rs new file mode 100644 index 00000000..8cd32d7e --- /dev/null +++ b/examples/embassy-serial-connector-demo/build.rs @@ -0,0 +1,5 @@ +fn main() { + println!("cargo:rustc-link-arg-bins=--nmagic"); + println!("cargo:rustc-link-arg-bins=-Tlink.x"); + println!("cargo:rustc-link-arg-bins=-Tdefmt.x"); +} diff --git a/examples/embassy-serial-connector-demo/flash.sh b/examples/embassy-serial-connector-demo/flash.sh new file mode 100755 index 00000000..6937d6e3 --- /dev/null +++ b/examples/embassy-serial-connector-demo/flash.sh @@ -0,0 +1,13 @@ +#!/bin/bash +# Flash script for embassy-serial-connector-demo. +# Run on the HOST where probe-rs + the ST-LINK are accessible. +# Build first (in the dev container): cd examples/embassy-serial-connector-demo && cargo build +set -e +BINARY="../../target/thumbv8m.main-none-eabihf/debug/embassy-serial-connector-demo" +if [ ! -f "$BINARY" ]; then + echo "Error: Binary not found at $BINARY" + echo "Build it first: cd examples/embassy-serial-connector-demo && cargo build" + exit 1 +fi +echo "Flashing embassy-serial-connector-demo to STM32H563ZITx (defmt logs stream over RTT)..." +probe-rs run --chip STM32H563ZITx "$BINARY" diff --git a/examples/embassy-serial-connector-demo/rust-toolchain.toml b/examples/embassy-serial-connector-demo/rust-toolchain.toml new file mode 100644 index 00000000..750ee6f7 --- /dev/null +++ b/examples/embassy-serial-connector-demo/rust-toolchain.toml @@ -0,0 +1,4 @@ +[toolchain] +channel = "1.95" +components = ["rust-src", "rustfmt", "llvm-tools"] +targets = ["thumbv8m.main-none-eabihf"] diff --git a/examples/embassy-serial-connector-demo/src/main.rs b/examples/embassy-serial-connector-demo/src/main.rs new file mode 100644 index 00000000..9239fc75 --- /dev/null +++ b/examples/embassy-serial-connector-demo/src/main.rs @@ -0,0 +1,152 @@ +#![no_std] +#![no_main] + +//! AimX over serial on an STM32H563ZI Nucleo — the board *serves* the AimX toolset +//! over a UART, so a host can `record.list` / `record.get` it across the wire. +//! This exercises the `no_std` AimX server dispatch (Issue #120) over the real +//! `aimdb-serial-connector` Embassy transport (COBS over `embedded-io-async`). +//! +//! ## Wiring (no extra cabling on a Nucleo-H563ZI) +//! +//! The AimX server rides **USART3 (PD8 = TX, PD9 = RX)**, which on the +//! Nucleo-H563ZI is bridged to the **ST-LINK Virtual COM Port** — the same USB +//! cable you flash/debug with. On the host it appears as `/dev/ttyACM0`. defmt +//! logs stream separately over **RTT (SWD)**, so they don't collide with the data +//! UART. +//! +//! ⚠️ The VCP↔USART routing is board-specific (ST-LINK solder bridges per UM3115). +//! If your board routes the VCP elsewhere, change the `USART3 / PD8 / PD9` line +//! below — or wire a USB-TTL dongle to any free USART and point the host at the +//! resulting `/dev/ttyUSB0`. +//! +//! ## Host side +//! +//! ```bash +//! cargo run -p aimdb-serial-connector --example serial_demo \ +//! --features _test-tokio -- client /dev/ttyACM0 115200 +//! ``` +//! +//! You should see `record.list` return the `counter` record, then `counter` +//! climbing once per second — values produced on the MCU, read over serial. + +extern crate alloc; + +use aimdb_core::{AimDbBuilder, Producer}; +use aimdb_embassy_adapter::{EmbassyAdapter, EmbassyBufferType, EmbassyRecordRegistrarExtCustom}; +use aimdb_serial_connector::embassy_transport::SerialServer; +use defmt::*; +use embassy_executor::Spawner; +use embassy_stm32::usart::{BufferedUart, Config as UartConfig}; +use embassy_stm32::{Config, bind_interrupts, peripherals, usart}; +use embassy_time::{Duration, Timer}; +use serde::{Deserialize, Serialize}; +use static_cell::StaticCell; +use {defmt_rtt as _, panic_probe as _}; + +#[global_allocator] +static ALLOCATOR: embedded_alloc::LlffHeap = embedded_alloc::LlffHeap::empty(); + +bind_interrupts!(struct Irqs { + USART3 => usart::BufferedInterruptHandler; +}); + +/// The record the board exposes over serial. +#[derive(Debug, Clone, Serialize, Deserialize)] +struct Counter { + value: u64, +} + +/// Drive a value into `counter` once per second. +#[embassy_executor::task] +async fn counter_task(producer: Producer) { + let mut value = 0u64; + loop { + value += 1; + producer.produce(Counter { value }); + Timer::after(Duration::from_secs(1)).await; + } +} + +#[embassy_executor::main] +async fn main(spawner: Spawner) { + // Initialize the heap for the allocator. + { + use core::mem::MaybeUninit; + const HEAP_SIZE: usize = 32768; // 32 KB + static mut HEAP: [MaybeUninit; HEAP_SIZE] = [MaybeUninit::uninit(); HEAP_SIZE]; + unsafe { + let heap_ptr = core::ptr::addr_of_mut!(HEAP); + ALLOCATOR.init((*heap_ptr).as_ptr() as usize, HEAP_SIZE) + } + } + + // Clock tree: HSE 8 MHz (from the ST-LINK MCO) → PLL1 → 250 MHz. Same as the + // other H563 demos in this repo. + let mut config = Config::default(); + { + use embassy_stm32::rcc::*; + use embassy_stm32::time::Hertz; + + config.rcc.hsi = None; + config.rcc.hsi48 = Some(Default::default()); + config.rcc.hse = Some(Hse { + freq: Hertz(8_000_000), + mode: HseMode::BypassDigital, + }); + config.rcc.pll1 = Some(Pll { + source: PllSource::Hse, + prediv: PllPreDiv::Div2, + mul: PllMul::Mul125, + divp: Some(PllDiv::Div2), + divq: Some(PllDiv::Div2), + divr: None, + }); + config.rcc.ahb_pre = AHBPrescaler::Div1; + config.rcc.apb1_pre = APBPrescaler::Div1; + config.rcc.apb2_pre = APBPrescaler::Div1; + config.rcc.apb3_pre = APBPrescaler::Div1; + config.rcc.sys = Sysclk::Pll1P; + config.rcc.voltage_scale = VoltageScale::Scale0; + } + let p = embassy_stm32::init(config); + info!("AimX serial server booting on STM32H563ZI"); + + // USART3 on PD8 (TX) / PD9 (RX) — the ST-LINK Virtual COM Port on the + // Nucleo-H563ZI. `BufferedUart` because both split halves implement + // `embedded-io-async` Read/Write, which the serial connector needs. + static TX_BUF: StaticCell<[u8; 256]> = StaticCell::new(); + static RX_BUF: StaticCell<[u8; 256]> = StaticCell::new(); + let mut uart_config = UartConfig::default(); + uart_config.baudrate = 115_200; + let uart = BufferedUart::new( + p.USART3, + p.PD9, // RX + p.PD8, // TX + TX_BUF.init([0; 256]), + RX_BUF.init([0; 256]), + Irqs, + uart_config, + ) + .unwrap(); + let (tx, rx) = uart.split(); + + // Build the db: one remotely-readable `counter` record, served over the UART. + let runtime = alloc::sync::Arc::new(EmbassyAdapter::default()); + let mut builder = AimDbBuilder::new() + .runtime(runtime) + .with_connector(SerialServer::new(rx, tx)); + builder.configure::("counter", |reg| { + reg.buffer_sized::<4, 2>(EmbassyBufferType::SingleLatest) + .with_remote_access(); + }); + + static DB_CELL: StaticCell> = StaticCell::new(); + let (db, db_runner) = builder.build().await.expect("build db"); + let db = DB_CELL.init(db); + + let producer = db.producer::("counter").expect("producer"); + spawner.spawn(unwrap!(counter_task(producer))); + + info!("serving AimX over USART3 / ST-LINK VCP (record: counter) — connect the host now"); + db_runner.run().await; +} From 44c89f219c58b989a096f7fcc9ccc015663af73a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexander=20Schn=C3=B6rch?= Date: Fri, 5 Jun 2026 20:26:50 +0000 Subject: [PATCH 2/5] feat: enhance serial connector with writable setting record and raw request mode --- aimdb-serial-connector/CHANGELOG.md | 10 +- .../examples/serial_demo.rs | 152 +++++++++++++++++- aimdb-serial-connector/src/tokio_transport.rs | 7 + .../embassy-serial-connector-demo/README.md | 15 ++ .../embassy-serial-connector-demo/src/main.rs | 25 ++- 5 files changed, 202 insertions(+), 7 deletions(-) diff --git a/aimdb-serial-connector/CHANGELOG.md b/aimdb-serial-connector/CHANGELOG.md index f8c3e3e3..6605f9aa 100644 --- a/aimdb-serial-connector/CHANGELOG.md +++ b/aimdb-serial-connector/CHANGELOG.md @@ -22,8 +22,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 for a single write larger than its TX ring), so a frame bigger than the buffer — e.g. a `record.list` reply — would otherwise fail the whole send and drop the session. Chunking sends a frame of any length given a TX buffer ≥ 64 bytes. - (Validated end-to-end on an STM32H563ZI: `record.list`/`record.get`/streaming all - round-trip over the ST-LINK VCP.) +- **The tokio `SerialDialer` flushes the OS input buffer on connect.** A real serial + port retains bytes across opens, so a half-read reply left by a previous (killed) + session would otherwise be read as a stale first frame — failing to decode and + desyncing the stream until the next COBS sentinel (a transient error on the first + call or two). Flushing on connect gives every session a clean start. +- **Validated end-to-end on an STM32H563ZI Nucleo** over the ST-LINK VCP: + `record.list` / `record.get` / `record.set` and streaming subscriptions all + round-trip MCU↔host. - **`embedded-io-async` is pinned to 0.7** (the workspace dep was bumped 0.6 → 0.7) so a HAL `BufferedUart` (e.g. `embassy-stm32`, which uses 0.7) satisfies the connector's `Read`/`Write` bounds without a trait-version skew. Wire the Embassy half from a `BufferedUart::split()` — the plain async `UartRx` does **not** implement `embedded-io-async::Read`, only the buffered/ring-buffered variants do. - The `unsafe impl Send`/`Sync` on the Embassy transport + builder types rest on the single-core, cooperative Embassy-executor invariant documented by `SendFutureWrapper` (no preemption / thread migration). This is the first *raw-peripheral* session connector — MQTT/KNX sidestep it by pulling a `Send + Sync` `embassy_net::Stack` from the runtime adapter rather than owning a peripheral. diff --git a/aimdb-serial-connector/examples/serial_demo.rs b/aimdb-serial-connector/examples/serial_demo.rs index 51861c2c..8d1a7a6d 100644 --- a/aimdb-serial-connector/examples/serial_demo.rs +++ b/aimdb-serial-connector/examples/serial_demo.rs @@ -17,6 +17,10 @@ //! - `server [baud]` — serve a live `counter` record over the line. //! - `client [baud]` — `record.list` once, then `record.get counter` //! every second, printing what comes back across the wire. +//! - `set [baud]` — `record.set` the writable `setting` record, then +//! `record.get` it back (exercises the write path). +//! - `raw [baud] [method] [name]` — low-level debug: send one request and +//! print the full decoded reply (no engine), handy when `client` misbehaves. //! //! Built only under the internal `_test-tokio` feature (it needs a concrete //! adapter); see the crate's `Cargo.toml`. @@ -56,13 +60,159 @@ async fn main() { match mode { "server" => run_server(device, baud).await, "client" => run_client_mode(device, baud).await, + "set" => run_set_mode(device, baud).await, + "raw" => { + let method = args + .get(4) + .cloned() + .unwrap_or_else(|| "record.list".to_string()); + let name = args + .get(5) + .cloned() + .unwrap_or_else(|| "counter".to_string()); + run_raw(device, baud, method, name).await + } other => { - eprintln!("usage: serial_demo [baud] (got mode '{other}')"); + eprintln!( + "usage: serial_demo [baud] [method] [name] (got mode '{other}')" + ); std::process::exit(2); } } } +/// Low-level debug: open the port, send one AimX request for `method`, and print +/// the full reply (accumulated until the terminating `0x00`, then COBS-decoded). +/// Every step is visible — useful when the engine-based `client` just hangs. +async fn run_raw(device: String, baud: u32, method: String, name: String) { + use aimdb_core::session::{EnvelopeCodec, Inbound}; + use aimdb_serial_connector::framing::encode_frame; + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + use tokio_serial::SerialPortBuilderExt; + + println!("[raw] {method} (name={name}) over {device} @ {baud}"); + let mut port = match tokio_serial::new(&device, baud).open_native_async() { + Ok(p) => p, + Err(e) => { + eprintln!("[raw] OPEN FAILED: {e}"); + std::process::exit(1); + } + }; + tokio::time::sleep(Duration::from_millis(300)).await; + + let params = match method.as_str() { + "record.get" => json!({ "name": name }), + "record.set" => json!({ "name": name, "value": { "level": 42 } }), + _ => json!({}), + }; + let mut payload = Vec::new(); + AimxCodec + .encode_inbound( + Inbound::Request { + id: 1, + method: method.clone(), + params: serde_json::to_vec(¶ms).unwrap().into(), + }, + &mut payload, + ) + .expect("encode request"); + let mut frame = Vec::new(); + encode_frame(&payload, &mut frame); + println!("[raw] request: {}", String::from_utf8_lossy(&payload)); + port.write_all(&frame).await.expect("write"); + port.flush().await.expect("flush"); + println!("[raw] sent {} B; awaiting reply (5 s) …", frame.len()); + + let mut acc: Vec = Vec::new(); + let mut buf = [0u8; 256]; + let deadline = tokio::time::Instant::now() + Duration::from_secs(5); + loop { + let remaining = deadline.saturating_duration_since(tokio::time::Instant::now()); + if remaining.is_zero() { + println!("[raw] TIMEOUT — got {} B, no terminating 0x00", acc.len()); + break; + } + match tokio::time::timeout(remaining, port.read(&mut buf)).await { + Ok(Ok(0)) => { + println!("[raw] EOF"); + break; + } + Ok(Ok(n)) => { + acc.extend_from_slice(&buf[..n]); + println!("[raw] +{n} B (total {})", acc.len()); + if acc.contains(&0) { + break; + } + } + Ok(Err(e)) => { + println!("[raw] read error: {e}"); + break; + } + Err(_) => { + println!("[raw] TIMEOUT"); + break; + } + } + } + if let Some(pos) = acc.iter().position(|&b| b == 0) { + match cobs::decode_vec(&acc[..pos]) { + Ok(json) => println!( + "[raw] reply ({} B): {}", + json.len(), + String::from_utf8_lossy(&json) + ), + Err(_) => println!("[raw] COBS decode FAILED on {pos} B"), + } + } +} + +/// Exercise the write path: `record.set` the `setting` record, then `record.get` +/// it back to confirm the value changed on the board. +async fn run_set_mode(device: String, baud: u32) { + println!("[set] writing `setting` over {device} @ {baud} baud"); + + let (handle, engine) = run_client( + SerialDialer::new(device, baud), + AimxCodec, + ClientConfig { + sends_hello: false, + ..ClientConfig::default() + }, + Arc::new(TokioAdapter), + ); + tokio::spawn(engine); + + let mut level = 0u64; + loop { + level += 1; + let set_params: Payload = serde_json::to_vec(&json!({ + "name": "setting", + "value": { "level": level } + })) + .unwrap() + .into(); + match handle.call("record.set", set_params).await { + Ok(r) => println!( + "[set] record.set setting={{\"level\":{level}}} -> {}", + String::from_utf8_lossy(&r) + ), + Err(e) => println!("[set] record.set failed: {e:?}"), + } + + let get_params: Payload = serde_json::to_vec(&json!({ "name": "setting" })) + .unwrap() + .into(); + match handle.call("record.get", get_params).await { + Ok(v) => println!( + "[set] record.get setting -> {}", + String::from_utf8_lossy(&v) + ), + Err(e) => println!("[set] record.get failed: {e:?}"), + } + tokio::time::sleep(Duration::from_secs(2)).await; + } +} + /// Serve a `counter` record (incrementing once per second) over the serial line. async fn run_server(device: String, baud: u32) { println!("[server] serving AimX over {device} @ {baud} baud (Ctrl-C to stop)"); diff --git a/aimdb-serial-connector/src/tokio_transport.rs b/aimdb-serial-connector/src/tokio_transport.rs index 31bcda83..3aebdc78 100644 --- a/aimdb-serial-connector/src/tokio_transport.rs +++ b/aimdb-serial-connector/src/tokio_transport.rs @@ -125,6 +125,13 @@ impl Dialer for SerialDialer { let stream = tokio_serial::new(&self.path, self.baud) .open_native_async() .map_err(|_| TransportError::Io)?; + // Discard any bytes left in the OS input buffer by a previous session + // (e.g. a half-read reply from a killed client). Otherwise the first + // frame is a stale leftover that fails to decode and desyncs the stream + // until the next COBS sentinel — a transient `Internal` on the first + // call or two. + use tokio_serial::SerialPort; + let _ = stream.clear(tokio_serial::ClearBuffer::Input); Ok(Box::new(TokioSerialConnection::new(stream)) as Box) }) } diff --git a/examples/embassy-serial-connector-demo/README.md b/examples/embassy-serial-connector-demo/README.md index 2c3d1d63..53c663cf 100644 --- a/examples/embassy-serial-connector-demo/README.md +++ b/examples/embassy-serial-connector-demo/README.md @@ -68,6 +68,21 @@ over serial: ... ``` +**4. Write a record** over the line (`record.set`). The board exposes a writable +`setting` record (no producer, ReadWrite policy); this sets it and reads it back: + +```bash +cargo run -p aimdb-serial-connector --example serial_demo \ + --features _test-tokio -- set /dev/cu.usbmodem14303 115200 +``` + +``` +[set] record.set setting={"level":1} -> {"value":{"level":1}} +[set] record.get setting -> {"level":1} +[set] record.set setting={"level":2} -> {"value":{"level":2}} +... +``` + ## No board? Smoke it over a PTY pair The host example is dual-mode, so two host processes can talk over a virtual serial diff --git a/examples/embassy-serial-connector-demo/src/main.rs b/examples/embassy-serial-connector-demo/src/main.rs index 9239fc75..1e3e06a5 100644 --- a/examples/embassy-serial-connector-demo/src/main.rs +++ b/examples/embassy-serial-connector-demo/src/main.rs @@ -31,6 +31,7 @@ extern crate alloc; +use aimdb_core::remote::SecurityPolicy; use aimdb_core::{AimDbBuilder, Producer}; use aimdb_embassy_adapter::{EmbassyAdapter, EmbassyBufferType, EmbassyRecordRegistrarExtCustom}; use aimdb_serial_connector::embassy_transport::SerialServer; @@ -50,12 +51,18 @@ bind_interrupts!(struct Irqs { USART3 => usart::BufferedInterruptHandler; }); -/// The record the board exposes over serial. +/// A read-only record the board produces (incrementing counter). #[derive(Debug, Clone, Serialize, Deserialize)] struct Counter { value: u64, } +/// A writable config-style record (no producer → remotely settable via `record.set`). +#[derive(Debug, Clone, Serialize, Deserialize)] +struct Setting { + level: u64, +} + /// Drive a value into `counter` once per second. #[embassy_executor::task] async fn counter_task(producer: Producer) { @@ -130,15 +137,23 @@ async fn main(spawner: Spawner) { .unwrap(); let (tx, rx) = uart.split(); - // Build the db: one remotely-readable `counter` record, served over the UART. + // Build the db: a read-only `counter` and a writable `setting`, served over the + // UART. The ReadWrite policy makes `setting` settable via `record.set`. + let mut policy = SecurityPolicy::read_write(); + policy.allow_write_key("setting"); let runtime = alloc::sync::Arc::new(EmbassyAdapter::default()); let mut builder = AimDbBuilder::new() .runtime(runtime) - .with_connector(SerialServer::new(rx, tx)); + .with_connector(SerialServer::new(rx, tx).security_policy(policy)); builder.configure::("counter", |reg| { reg.buffer_sized::<4, 2>(EmbassyBufferType::SingleLatest) .with_remote_access(); }); + builder.configure::("setting", |reg| { + // No producer → remotely writable. + reg.buffer_sized::<4, 2>(EmbassyBufferType::SingleLatest) + .with_remote_access(); + }); static DB_CELL: StaticCell> = StaticCell::new(); let (db, db_runner) = builder.build().await.expect("build db"); @@ -147,6 +162,8 @@ async fn main(spawner: Spawner) { let producer = db.producer::("counter").expect("producer"); spawner.spawn(unwrap!(counter_task(producer))); - info!("serving AimX over USART3 / ST-LINK VCP (record: counter) — connect the host now"); + info!( + "serving AimX over USART3 / ST-LINK VCP (records: counter, setting) — connect the host now" + ); db_runner.run().await; } From 1c9405524c6b8547feeddafa9ef5d3cbcce257cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexander=20Schn=C3=B6rch?= Date: Fri, 5 Jun 2026 20:47:40 +0000 Subject: [PATCH 3/5] feat: improve COBS frame handling and enhance writable record support --- aimdb-serial-connector/CHANGELOG.md | 11 ++++++-- .../src/embassy_transport.rs | 27 +++++++------------ aimdb-serial-connector/src/lib.rs | 17 ++++++++++++ aimdb-serial-connector/src/tokio_transport.rs | 27 +++++++------------ .../tests/tokio_roundtrip.rs | 25 +++++++++++++++++ .../embassy-serial-connector-demo/README.md | 6 ++--- 6 files changed, 72 insertions(+), 41 deletions(-) diff --git a/aimdb-serial-connector/CHANGELOG.md b/aimdb-serial-connector/CHANGELOG.md index 6605f9aa..cb972067 100644 --- a/aimdb-serial-connector/CHANGELOG.md +++ b/aimdb-serial-connector/CHANGELOG.md @@ -22,11 +22,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 for a single write larger than its TX ring), so a frame bigger than the buffer — e.g. a `record.list` reply — would otherwise fail the whole send and drop the session. Chunking sends a frame of any length given a TX buffer ≥ 64 bytes. +- **An undecodable frame is skipped, not fatal.** `recv` drops a chunk that fails + to COBS-decode (line noise, or bytes from a session joined mid-stream) and + resyncs on the next sentinel rather than returning a transport error — so + transient corruption costs one frame, not the whole session. This matters most on + Embassy, where the default `reconnect: false` over a moved-in UART means a fatal + read error could never recover. - **The tokio `SerialDialer` flushes the OS input buffer on connect.** A real serial port retains bytes across opens, so a half-read reply left by a previous (killed) session would otherwise be read as a stale first frame — failing to decode and - desyncing the stream until the next COBS sentinel (a transient error on the first - call or two). Flushing on connect gives every session a clean start. + desyncing the stream until the next COBS sentinel (a skipped frame or two of + churn, per the resync above). Flushing on connect avoids even that, giving every + session a clean start. - **Validated end-to-end on an STM32H563ZI Nucleo** over the ST-LINK VCP: `record.list` / `record.get` / `record.set` and streaming subscriptions all round-trip MCU↔host. diff --git a/aimdb-serial-connector/src/embassy_transport.rs b/aimdb-serial-connector/src/embassy_transport.rs index d5e23241..a4b3866a 100644 --- a/aimdb-serial-connector/src/embassy_transport.rs +++ b/aimdb-serial-connector/src/embassy_transport.rs @@ -102,8 +102,14 @@ where // satisfy the `Send` `BoxFut` return type. Box::pin(SendFutureWrapper(async move { loop { - if let Some(frame) = self.acc.next_frame() { - return Ok(Some(frame.map_err(|_| TransportError::Io)?)); + // COBS is self-synchronizing: a chunk that fails to decode is line + // noise or a mid-stream join, not a fatal transport error. The + // accumulator has already consumed it, so skip it and resync on the + // next sentinel rather than tearing down the session. + match self.acc.next_frame() { + Some(Ok(frame)) => return Ok(Some(frame)), + Some(Err(_)) => continue, + None => {} } let mut chunk = [0u8; READ_CHUNK]; match self.rx.read(&mut chunk).await { @@ -356,7 +362,7 @@ where // Apply the security policy's writable marking so `record.list` reports // the `writable` flag (the dispatch also enforces it). - apply_writable(db, &self.config); + crate::apply_writable(db, &self.config); let session_config = SessionConfig { limits: SessionLimits { @@ -399,18 +405,3 @@ fn connector_consumed() -> DbError { _parameter: (), } } - -/// Mark each record named in the policy's writable set as writable, so -/// `record.list` advertises the `writable` flag. (Mirrors the UDS connector.) -fn apply_writable(db: &AimDb, config: &AimxConfig) -where - R: RuntimeAdapter + 'static, -{ - for key in config.security_policy.writable_records() { - if let Some(id) = db.inner().resolve_str(&key) { - if let Some(storage) = db.inner().storage(id) { - storage.set_writable_erased(true); - } - } - } -} diff --git a/aimdb-serial-connector/src/lib.rs b/aimdb-serial-connector/src/lib.rs index 284f15b1..0c422972 100644 --- a/aimdb-serial-connector/src/lib.rs +++ b/aimdb-serial-connector/src/lib.rs @@ -44,6 +44,23 @@ pub mod embassy_transport; /// remote connector. pub const DEFAULT_SCHEME: &str = "serial"; +/// Mark each record named in the policy's writable set as writable, so +/// `record.list` advertises the `writable` flag (the dispatch also enforces it). +/// Shared by both `SerialServer` halves; mirrors the UDS connector. +#[cfg(any(feature = "tokio-runtime", feature = "embassy-runtime"))] +pub(crate) fn apply_writable(db: &aimdb_core::AimDb, config: &aimdb_core::remote::AimxConfig) +where + R: aimdb_core::RuntimeAdapter + 'static, +{ + for key in config.security_policy.writable_records() { + if let Some(id) = db.inner().resolve_str(&key) { + if let Some(storage) = db.inner().storage(id) { + storage.set_writable_erased(true); + } + } + } +} + // Prefer the tokio names when both halves are compiled (e.g. host tests). #[cfg(all(feature = "tokio-runtime", not(feature = "embassy-runtime")))] pub use tokio_transport::{SerialClient, SerialDialer, SerialListener, SerialServer}; diff --git a/aimdb-serial-connector/src/tokio_transport.rs b/aimdb-serial-connector/src/tokio_transport.rs index 3aebdc78..aaa55194 100644 --- a/aimdb-serial-connector/src/tokio_transport.rs +++ b/aimdb-serial-connector/src/tokio_transport.rs @@ -63,8 +63,14 @@ where fn recv(&mut self) -> BoxFut<'_, TransportResult>>> { Box::pin(async move { loop { - if let Some(frame) = self.acc.next_frame() { - return Ok(Some(frame.map_err(|_| TransportError::Io)?)); + // COBS is self-synchronizing: a chunk that fails to decode is line + // noise or a mid-stream join, not a fatal transport error. The + // accumulator has already consumed it, so skip it and resync on the + // next sentinel rather than tearing down the session. + match self.acc.next_frame() { + Some(Ok(frame)) => return Ok(Some(frame)), + Some(Err(_)) => continue, + None => {} } let mut chunk = [0u8; READ_CHUNK]; match self.stream.read(&mut chunk).await { @@ -268,7 +274,7 @@ where move |db: &AimDb| -> Arc { // Apply the security policy's writable marking so `record.list` // reports the `writable` flag (the dispatch also enforces it). - apply_writable(db, &dispatch_config); + crate::apply_writable(db, &dispatch_config); Arc::new(AimxDispatch::new( Arc::new(db.clone()), dispatch_config.clone(), @@ -307,18 +313,3 @@ fn open_serial_listener(path: &str, baud: u32) -> DbResult { })?; Ok(SerialListener::new(stream)) } - -/// Mark each record named in the policy's writable set as writable, so -/// `record.list` advertises the `writable` flag. (Mirrors the UDS connector.) -fn apply_writable(db: &AimDb, config: &AimxConfig) -where - R: RuntimeAdapter + 'static, -{ - for key in config.security_policy.writable_records() { - if let Some(id) = db.inner().resolve_str(&key) { - if let Some(storage) = db.inner().storage(id) { - storage.set_writable_erased(true); - } - } - } -} diff --git a/aimdb-serial-connector/tests/tokio_roundtrip.rs b/aimdb-serial-connector/tests/tokio_roundtrip.rs index fcc7b239..fbe5f03d 100644 --- a/aimdb-serial-connector/tests/tokio_roundtrip.rs +++ b/aimdb-serial-connector/tests/tokio_roundtrip.rs @@ -129,3 +129,28 @@ async fn aimx_roundtrips_over_the_serial_transport() { "record.list returns the configured records: {records}" ); } + +/// A chunk that fails to COBS-decode (line noise, or bytes from a session we +/// joined mid-stream) must not kill the connection: `recv` drops it and resyncs on +/// the next sentinel, yielding the following good frame instead of erroring. +#[tokio::test] +async fn recv_resyncs_past_a_corrupt_frame() { + use aimdb_serial_connector::framing::{encode_frame, DELIM}; + use tokio::io::AsyncWriteExt; + + let (mut peer, conn_end) = tokio::io::duplex(1024); + let mut conn = TokioSerialConnection::new(conn_end); + + // `0x05` is a COBS code byte promising four more bytes that never arrive, so the + // delimited chunk fails to decode; follow it with a valid frame. + let mut wire = vec![0x05, DELIM]; + encode_frame(b"after-noise", &mut wire); + peer.write_all(&wire).await.expect("write wire"); + + let frame = conn + .recv() + .await + .expect("recv must not error on a bad frame") + .expect("a frame after the noise"); + assert_eq!(frame, b"after-noise"); +} diff --git a/examples/embassy-serial-connector-demo/README.md b/examples/embassy-serial-connector-demo/README.md index 53c663cf..edbf1be0 100644 --- a/examples/embassy-serial-connector-demo/README.md +++ b/examples/embassy-serial-connector-demo/README.md @@ -9,7 +9,7 @@ the real Embassy serial transport (COBS framing over `embedded-io-async`). ┌────────────────────────────┐ USART3 (PD8/PD9) ┌──────────────────────┐ │ STM32H563ZI (this firmware)│ ⇄ ST-LINK Virtual COM Port ⇄ │ host: serial_demo │ │ SerialServer + AimxDispatch│ = /dev/ttyACM0 │ AimX client (RPC) │ -│ record: `counter` (++/sec) │ │ record.list/get loop │ +│ records: counter + setting │ │ record.list/get loop │ └────────────────────────────┘ └──────────────────────┘ ``` @@ -44,7 +44,7 @@ cargo build # → ../../target/thumbv8m.main-none-eabihf/debug/embass ```bash ./flash.sh # = probe-rs run --chip STM32H563ZITx -# RTT shows: "serving AimX over USART3 / ST-LINK VCP (record: counter) — connect the host now" +# RTT shows: "serving AimX over USART3 / ST-LINK VCP (records: counter, setting) — connect the host now" ``` **3. Query it from the host** over the VCP (run from the **workspace root** — the @@ -61,7 +61,7 @@ Expected output — the `counter` values are produced **on the MCU** and read ba over serial: ``` -[client] record.list = [{"buffer_type":"single_latest",...,"record_key":"counter","type_id":"TypeId(0x…)","writable":false}] +[client] record.list = [{...,"record_key":"counter","writable":false},{...,"record_key":"setting","writable":true}] [client] counter = {"value":173} [client] counter = {"value":174} [client] counter = {"value":175} From 958f4c2bf451fc6d7040c81560413b14a48412e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexander=20Schn=C3=B6rch?= Date: Fri, 5 Jun 2026 21:02:33 +0000 Subject: [PATCH 4/5] feat: implement frame overflow handling and resynchronization in FrameAccumulator --- Cargo.lock | 1 - aimdb-serial-connector/Cargo.toml | 4 - aimdb-serial-connector/src/framing.rs | 81 +++++++++++++++++--- aimdb-serial-connector/tests/cobs_framing.rs | 46 ++++++++++- 4 files changed, 116 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d25b6af7..f7b0eb68 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -266,7 +266,6 @@ dependencies = [ "embassy-time-driver", "embedded-io-async 0.7.0", "futures", - "futures-util", "serde", "serde_json", "thiserror 2.0.17", diff --git a/aimdb-serial-connector/Cargo.toml b/aimdb-serial-connector/Cargo.toml index c5a46378..4b5f2f2a 100644 --- a/aimdb-serial-connector/Cargo.toml +++ b/aimdb-serial-connector/Cargo.toml @@ -28,7 +28,6 @@ tokio-runtime = [ "aimdb-core/remote-access", "dep:tokio", "dep:tokio-serial", - "dep:futures-util", ] # Embedded half: `no_std + alloc`, generic over `embedded-io-async` UART halves. @@ -69,9 +68,6 @@ cobs = { workspace = true, features = ["alloc"] } # --- tokio half (std) --- tokio = { workspace = true, optional = true, features = ["io-util"] } tokio-serial = { workspace = true, optional = true } -futures-util = { version = "0.3", optional = true, default-features = false, features = [ - "alloc", -] } thiserror = { workspace = true, optional = true } # --- embassy half (no_std) --- diff --git a/aimdb-serial-connector/src/framing.rs b/aimdb-serial-connector/src/framing.rs index 58050fe3..6efa2c65 100644 --- a/aimdb-serial-connector/src/framing.rs +++ b/aimdb-serial-connector/src/framing.rs @@ -12,9 +12,10 @@ use alloc::vec::Vec; -/// A delimited chunk was not valid COBS — line noise, a truncated frame, or a -/// mid-stream join before the next sentinel. The transports map this to a -/// `TransportError`. Self-contained so the framing round-trip is testable without +/// A frame could not be recovered — line noise, a truncated frame, a mid-stream +/// join before the next sentinel, or an un-delimited run that overflowed the frame +/// cap. The transports skip it and resync on the next sentinel rather than tearing +/// down the session. Self-contained so the framing round-trip is testable without /// the `connector-session` substrate. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct FrameError; @@ -23,6 +24,14 @@ pub struct FrameError; /// frame, so it cleanly delimits one frame from the next. pub const DELIM: u8 = 0x00; +/// Default cap on un-delimited buffered bytes ([`FrameAccumulator::new`]): once a +/// run with no sentinel exceeds this, it's treated as a desync and dropped rather +/// than buffered without bound (an OOM risk on a small embedded heap). Generous +/// versus any AimX control frame and an order of magnitude over typical embedded +/// frames; only ever approached by a pathological stream, since steady-state +/// buffering is a fraction of this. +pub const DEFAULT_MAX_FRAME: usize = 8 * 1024; + /// COBS-encode `payload` and append it (plus the trailing [`DELIM`]) to `out`. /// /// The dual of [`FrameAccumulator::next_frame`]: `decode(strip_delim(encode(p))) == p`. @@ -37,15 +46,37 @@ pub fn encode_frame(payload: &[u8], out: &mut Vec) { /// frame, exactly one frame, or several. Push raw bytes in with /// [`push_bytes`](Self::push_bytes); pull each complete frame out with /// [`next_frame`](Self::next_frame) until it returns `None` (need more bytes). -#[derive(Default)] pub struct FrameAccumulator { buf: Vec, + /// Set after an oversized (un-delimited) run is dropped: the bytes still + /// arriving are that frame's tail, so skip up to and including the next + /// sentinel before framing resumes. + resyncing: bool, + /// Cap on bytes buffered without a sentinel before the run is declared a desync + /// and dropped — bounds memory on a stream that never delimits. + max_frame: usize, +} + +impl Default for FrameAccumulator { + fn default() -> Self { + Self::new() + } } impl FrameAccumulator { - /// A fresh, empty accumulator. + /// A fresh, empty accumulator with the [`DEFAULT_MAX_FRAME`] cap. pub fn new() -> Self { - Self { buf: Vec::new() } + Self::with_max_frame(DEFAULT_MAX_FRAME) + } + + /// A fresh accumulator that drops any un-delimited run longer than `max_frame` + /// (and resyncs on the next sentinel) instead of buffering it without bound. + pub fn with_max_frame(max_frame: usize) -> Self { + Self { + buf: Vec::new(), + resyncing: false, + max_frame, + } } /// Append freshly-read bytes to the pending buffer. @@ -57,12 +88,42 @@ impl FrameAccumulator { /// /// - `None` — no `DELIM` buffered yet; read more bytes and retry. /// - `Some(Ok(frame))` — one decoded payload (the `DELIM` is consumed). - /// - `Some(Err(FrameError))` — the delimited chunk was not valid COBS (line - /// noise / a desync); the chunk is consumed so the next call resynchronizes - /// on the following sentinel. + /// - `Some(Err(FrameError))` — either the delimited chunk was not valid COBS + /// (line noise / a desync), or an un-delimited run exceeded `max_frame`; the + /// offending bytes are dropped so the next call resynchronizes on the + /// following sentinel. pub fn next_frame(&mut self) -> Option, FrameError>> { loop { - let pos = self.buf.iter().position(|&b| b == DELIM)?; + // Resync after an overflow: the buffered bytes are the tail of an + // abandoned oversized frame, so drop up to and including the next + // sentinel before resuming (don't decode that tail as a frame). + if self.resyncing { + match self.buf.iter().position(|&b| b == DELIM) { + Some(pos) => { + self.buf.drain(..=pos); + self.resyncing = false; + } + // Still no boundary — the whole buffer is tail garbage. Drop it + // (releasing capacity) and wait for more bytes. + None => { + self.buf = Vec::new(); + return None; + } + } + } + + let Some(pos) = self.buf.iter().position(|&b| b == DELIM) else { + // No frame boundary buffered. Bound memory against a stream that + // never delimits: once an un-delimited run exceeds `max_frame`, + // treat it as a desync — drop it (one `FrameError`) and resync on + // the next sentinel rather than buffering toward an OOM. + if self.buf.len() > self.max_frame { + self.buf = Vec::new(); + self.resyncing = true; + return Some(Err(FrameError)); + } + return None; + }; // Take the chunk up to and including the sentinel; the encoded frame is // everything before it. let chunk: Vec = self.buf.drain(..=pos).collect(); diff --git a/aimdb-serial-connector/tests/cobs_framing.rs b/aimdb-serial-connector/tests/cobs_framing.rs index d773d67a..bb669f3d 100644 --- a/aimdb-serial-connector/tests/cobs_framing.rs +++ b/aimdb-serial-connector/tests/cobs_framing.rs @@ -4,7 +4,7 @@ //! Transport-agnostic: exercises only [`framing`](aimdb_serial_connector::framing), //! so it runs under the crate's default features (no runtime needed). -use aimdb_serial_connector::framing::{encode_frame, FrameAccumulator, DELIM}; +use aimdb_serial_connector::framing::{encode_frame, FrameAccumulator, FrameError, DELIM}; /// Encode several payloads into one byte stream, then feed that stream to a fresh /// accumulator in `chunk`-sized slices and collect every frame it yields. @@ -73,6 +73,50 @@ fn encoded_frame_never_contains_the_sentinel_except_the_terminator() { ); } +#[test] +fn caps_buffered_bytes_and_resyncs_after_overflow() { + // A stream that never delimits must not buffer without bound (an OOM on a small + // embedded heap): once an un-delimited run exceeds the cap, the accumulator + // drops it (one `FrameError`) and resyncs on the next sentinel. + let mut acc = FrameAccumulator::with_max_frame(16); + + // 40 bytes of sentinel-free noise — no frame boundary in sight. + acc.push_bytes(&[0x42; 40]); + assert_eq!( + acc.next_frame(), + Some(Err(FrameError)), + "overflow is reported" + ); + // The oversized run was dropped, not retained for a retry. + assert!(acc.next_frame().is_none()); + + // The tail of the abandoned frame (more noise, then its terminating sentinel) is + // skipped, and the next whole frame is recovered intact. + acc.push_bytes(&[0x42; 5]); + acc.push_bytes(&[DELIM]); + let mut wire = Vec::new(); + encode_frame(b"recovered", &mut wire); + acc.push_bytes(&wire); + assert_eq!( + acc.next_frame().expect("frame").expect("valid"), + b"recovered" + ); + assert!(acc.next_frame().is_none()); +} + +#[test] +fn frames_up_to_the_cap_still_pass() { + // The cap only fires on an *un-delimited* run; a delimited frame at the cap size + // round-trips normally. + let mut acc = FrameAccumulator::with_max_frame(4096); + let payload = vec![0x41u8; 512]; + let mut wire = Vec::new(); + encode_frame(&payload, &mut wire); + acc.push_bytes(&wire); + assert_eq!(acc.next_frame().expect("frame").expect("valid"), payload); + assert!(acc.next_frame().is_none()); +} + #[test] fn resynchronizes_after_a_leading_sentinel() { // A receiver that joins mid-frame sees a stray sentinel first; it should be From 7f0628d5713eea6d0ea3602b01cf4fdaca3aa041 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexander=20Schn=C3=B6rch?= Date: Sat, 6 Jun 2026 20:34:29 +0000 Subject: [PATCH 5/5] feat: update changelog and Makefile for embassy-serial-connector crate integration --- CHANGELOG.md | 2 +- Makefile | 2 ++ aimdb-serial-connector/CHANGELOG.md | 2 +- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 62ff14e8..844cf0a3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,7 +35,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - **M16 — JSON codec extracted behind the `json-serialize` feature; `RecordValue::as_json()` now works on `no_std + alloc`, not just `std` ([Design 032](docs/design/032-M16-aimx-json-codec.md)).** New `aimdb-core::codec` module: `RemoteSerialize` (blanket-impl'd for every `serde` `Serialize + DeserializeOwned` type), the object-safe `JsonCodec`, and the zero-sized `SerdeJsonCodec`. `serde_json` runs on `alloc`, so embedded targets can opt in; `std` enables the feature transitively, so std builds are unaffected. ([aimdb-core](aimdb-core/CHANGELOG.md)) - **Embassy buffer + join-queue tests now run in CI (Issue #85).** The join-queue tests previously sat behind `embassy-runtime`, which pulls `embassy-executor`'s cortex-m assembly and can't compile under `cargo test` on x86_64 — so ordering / backpressure / clone-routing regressions were never caught. The `join_queue` module is now gated on `embassy-sync`, and `make test` runs the embassy adapter's unit tests + doctests on the host (no executor). Also adds `EmbassyBuffer::peek()` and fixes a stale `EmbassyBuffer` doc example. ([aimdb-embassy-adapter](aimdb-embassy-adapter/CHANGELOG.md)) - **`no_std` AimX server — a board can serve a host, not just dial one (Issue #120, follow-up to #39).** Cross-cutting de-std of `aimdb-core`'s central record API behind a new **`remote-access`** feature (`= ["json-serialize", "thiserror"]`, transitively enabled by `std`): the type-erased `AnyRecord` JSON + metadata methods, the `AimDb` JSON read/write/subscribe API, the `remote` module (config / protocol / security / error), and the AimX server dispatch (`AimxDispatch`/`AimxSession`) now all compile on `no_std + alloc` — swapping `std::collections` → `hashbrown`, `std::sync::Arc` → `alloc::sync::Arc`, and `thiserror` to `default-features = false`. Adds a runtime-neutral wall clock, `TimeOps::unix_time()`, implemented from the OS clock on Tokio and from an `EmbassyAdapter::set_unix_time(...)` anchor on Embassy. Verified by a new `thumbv7em-none-eabihf` dispatch cross-check in the Makefile. ([aimdb-core](aimdb-core/CHANGELOG.md), [aimdb-executor](aimdb-executor/CHANGELOG.md), [aimdb-tokio-adapter](aimdb-tokio-adapter/CHANGELOG.md), [aimdb-embassy-adapter](aimdb-embassy-adapter/CHANGELOG.md), [aimdb-uds-connector](aimdb-uds-connector/CHANGELOG.md), [tools/aimdb-mcp](tools/aimdb-mcp/CHANGELOG.md)) -- **`aimdb-serial-connector` — COBS-framed serial/UART transport, the headline embedded scenario (Issue #122, follow-up to #39 / #120, [doc 041](docs/design/detailed/041-phase6-embedded-transports.md)).** New crate: a sensor MCU dials a gateway over UART (and, on the `no_std` `AimxDispatch` from #120, an MCU *serves* a host over UART). Contributes only the `Dialer`/`Listener`/`Connection` triple + `SerialClient`/`SerialServer` sugar over the `serial://` scheme; reuses the AimX codec/dispatch and the session engines from core. Same compact AimX JSON, framed with **COBS** + a `0x00` sentinel (self-synchronizing on a lossy serial line). Dual halves: a std `tokio-runtime` (`tokio-serial`) riding the generic `Session*Connector`, and a `no_std + alloc` `embassy-runtime` generic over `embedded-io-async` UART halves that hand-rolls `ConnectorBuilder` and force-`Send`s the single-core futures via `SendFutureWrapper`. Cross-compiles to `thumbv7em-none-eabihf` (new Makefile `test-embedded` checks). Ships a real end-to-end demo: a host `serial_demo` example and an `embassy-serial-connector-demo` STM32H563ZI firmware (serves a record over USART3 ↔ the ST-LINK Virtual COM Port, queried from the host). The workspace `embedded-io-async` dep is bumped 0.6 → 0.7 to match the Embassy STM32 HAL. ([aimdb-serial-connector](aimdb-serial-connector/CHANGELOG.md)) +- **`aimdb-serial-connector` — COBS-framed serial/UART transport, the headline embedded scenario (Issue #122, follow-up to #39 / #120).** New crate: a sensor MCU dials a gateway over UART (and, on the `no_std` `AimxDispatch` from #120, an MCU *serves* a host over UART). Contributes only the `Dialer`/`Listener`/`Connection` triple + `SerialClient`/`SerialServer` sugar over the `serial://` scheme; reuses the AimX codec/dispatch and the session engines from core. Same compact AimX JSON, framed with **COBS** + a `0x00` sentinel (self-synchronizing on a lossy serial line). Dual halves: a std `tokio-runtime` (`tokio-serial`) riding the generic `Session*Connector`, and a `no_std + alloc` `embassy-runtime` generic over `embedded-io-async` UART halves that hand-rolls `ConnectorBuilder` and force-`Send`s the single-core futures via `SendFutureWrapper`. Cross-compiles to `thumbv7em-none-eabihf` (new Makefile `test-embedded` checks). Ships a real end-to-end demo: a host `serial_demo` example and an `embassy-serial-connector-demo` STM32H563ZI firmware (serves a record over USART3 ↔ the ST-LINK Virtual COM Port, queried from the host). The workspace `embedded-io-async` dep is bumped 0.6 → 0.7 to match the Embassy STM32 HAL. ([aimdb-serial-connector](aimdb-serial-connector/CHANGELOG.md)) ### Changed (breaking) diff --git a/Makefile b/Makefile index 281ba604..49fae724 100644 --- a/Makefile +++ b/Makefile @@ -360,6 +360,8 @@ examples: cargo build --package tokio-knx-connector-demo @printf "$(YELLOW) → Building embassy-knx-connector-demo (embedded, embassy runtime)$(NC)\n" cargo build --package embassy-knx-connector-demo --target thumbv7em-none-eabihf + @printf "$(YELLOW) → Building embassy-serial-connector-demo (embedded, embassy runtime)$(NC)\n" + cargo build --package embassy-serial-connector-demo --target thumbv7em-none-eabihf @printf "$(YELLOW) → Building weather-mesh-demo: weather-mesh-common$(NC)\n" cargo build --package weather-mesh-common @printf "$(YELLOW) → Building weather-mesh-demo: weather-hub (cloud aggregator)$(NC)\n" diff --git a/aimdb-serial-connector/CHANGELOG.md b/aimdb-serial-connector/CHANGELOG.md index cb972067..8cd677d7 100644 --- a/aimdb-serial-connector/CHANGELOG.md +++ b/aimdb-serial-connector/CHANGELOG.md @@ -9,7 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added -- **New crate — the COBS-framed serial/UART transport for AimDB remote access (Issue #122, follow-up to #39, [doc 041](../docs/design/detailed/041-phase6-embedded-transports.md)).** The serial sibling of `aimdb-uds-connector`: it contributes only the `Dialer`/`Listener`/`Connection` triple plus thin sugar; the AimX codec + dispatch and the runtime-neutral session engines (`run_client`/`serve`) are reused from `aimdb-core`. The wire is the same compact AimX JSON, framed with **COBS** (Consistent Overhead Byte Stuffing) and a `0x00` sentinel instead of a newline — self-synchronizing on a lossy/unframed serial medium, so a receiver that joins mid-stream resynchronizes on the next sentinel. Default scheme `"serial"`. Two runtime halves: +- **New crate — the COBS-framed serial/UART transport for AimDB remote access (Issue #122, follow-up to #39).** The serial sibling of `aimdb-uds-connector`: it contributes only the `Dialer`/`Listener`/`Connection` triple plus thin sugar; the AimX codec + dispatch and the runtime-neutral session engines (`run_client`/`serve`) are reused from `aimdb-core`. The wire is the same compact AimX JSON, framed with **COBS** (Consistent Overhead Byte Stuffing) and a `0x00` sentinel instead of a newline — self-synchronizing on a lossy/unframed serial medium, so a receiver that joins mid-stream resynchronizes on the next sentinel. Default scheme `"serial"`. Two runtime halves: - **`tokio-runtime`** (std, host/gateway) — `TokioSerialConnection` over any `AsyncRead + AsyncWrite` (a real `tokio_serial::SerialStream` in production, a `tokio::io::duplex()` in tests), with `SerialClient::new(path, baud)` (sugar over `SessionClientConnector`) and `SerialServer` (sugar over `SessionServerConnector` + `AimxDispatch`). The listener is one-shot (serial is point-to-point). - **`embassy-runtime`** (`no_std + alloc`, MCU) — `EmbassySerialConnection` generic over `embedded-io-async` `Read`/`Write` halves (the common `Uart::split()` shape), with `SerialClient`/`SerialServer` that hand-roll `ConnectorBuilder` (calling `run_client`/`pump_client`/`serve` directly) and force-`Send` the single-core Embassy futures via `aimdb-embassy-adapter`'s `SendFutureWrapper`. The Embassy *server* half rides the `no_std` `AimxDispatch` landed in #120, so an MCU can answer `record.list`/`get`/`set`/`subscribe`/`drain` over a UART; the *client* half mirrors records to a gateway. Reconnect is disabled by default on Embassy (the UART peripheral is moved in and can't be re-acquired). - **`framing` module** — the shared COBS frame codec (`encode_frame` + a chunk-tolerant `FrameAccumulator`), pure `no_std + alloc`, so the round-trip is unit-tested independent of any transport.