diff --git a/CHANGELOG.md b/CHANGELOG.md index e658a28..6e4ff63 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- **Design 034 Phase 2 — dyn-safe `RuntimeOps` capability trait (Issue #130, [review doc](docs/design/034-technical-debt-review.md)).** New object-safe trait in `aimdb-executor` (`name` / `now_nanos` / `unix_time` / boxed `sleep` / `log(LogLevel, …)`) so a runtime adapter can travel as `Arc` instead of a generic parameter — the groundwork for removing `R` from the record object graph (#131). Implemented by `TokioAdapter`, `EmbassyAdapter`, and `WasmAdapter`, each covered by a shared behavioral contract test. `BoxFuture`'s canonical definition moves to `aimdb-executor` (re-exported unchanged from `aimdb-core`). ([aimdb-executor](aimdb-executor/CHANGELOG.md), [aimdb-tokio-adapter](aimdb-tokio-adapter/CHANGELOG.md), [aimdb-embassy-adapter](aimdb-embassy-adapter/CHANGELOG.md), [aimdb-wasm-adapter](aimdb-wasm-adapter/CHANGELOG.md)) + - **M17 — centralized Embassy connector spine: one audited home for the single-core `unsafe` ([Design 033](docs/design/033-M17-unify-connectors-drop-send.md)).** New `aimdb-embassy-adapter::connectors` module (features `connectors` / `connector-io`) collects the force-`Send` plumbing every Embassy connector used to hand-roll: session transports get `EmbassySessionClient`/`EmbassySessionServer`, `OneShotDialer`/`OneShotListener`/`OneShotCell`, and the framed `EmbassyConnection` + `Framer`; data-plane transports get the `EmbassySink`/`EmbassySource` bridges (over `EmbassySinkRaw`/`EmbassySourceRaw`) that ride core's existing `pump_sink`/`pump_source`, plus `into_box_future` for protocol tasks. The serial Embassy half is now thin sugar (just a COBS `Framer`) with **zero `unsafe`** (down from a 407-line hand-roll with 7 `unsafe impl`s); the MQTT and KNX Embassy halves dropped their hand-rolled publisher/router loops and `SendFutureWrapper` use to ride core's pumps (KNX inbound telegrams now flow through `pump_source`). All connector-crate `unsafe`/`SendFutureWrapper` is gone — confined to the adapter. The std/Tokio side, `aimdb-client`, the WebSocket server, examples, and tests are unchanged. (Chosen over Design 033's original "drop `Send` from the contract", which would have pushed `!Send` onto the std side; see the doc's Implementation Decision.) ([aimdb-embassy-adapter](aimdb-embassy-adapter/CHANGELOG.md), [aimdb-serial-connector](aimdb-serial-connector/CHANGELOG.md), [aimdb-mqtt-connector](aimdb-mqtt-connector/CHANGELOG.md), [aimdb-knx-connector](aimdb-knx-connector/CHANGELOG.md)) - **Remote access via connectors — Phases 0–6: converge four hand-rolled networking stacks onto two shared engines (Issue #39, [design doc](docs/design/remote-access-via-connectors.md)).** AimX remote access (and any future transport) now rides the connector layer instead of a bespoke I/O abstraction. New, runtime-neutral `aimdb-core::session` module (feature `connector-session`, `no_std + alloc`): the three-layer substrate (`Connection`/`Listener`/`Dialer` + `EnvelopeCodec` + `Dispatch`/`Session`), the reactive **server** engine (`serve`/`run_session`) and proactive **client** engine (`run_client`/`pump_client`), the `pump_sink`/`pump_source` data-plane toolkit, and the transport-agnostic `SessionClientConnector`/`SessionServerConnector`. The AimX-v2 NDJSON protocol (`session::aimx`: `AimxCodec` + `AimxDispatch`) and the WebSocket connector are ports onto this substrate, so the AimX server/client and WS server/client stacks collapse onto the two engines. New **`aimdb-uds-connector`** crate carries the UDS transport (`UdsClient`/`UdsServer`). ([aimdb-core](aimdb-core/CHANGELOG.md), [aimdb-uds-connector](aimdb-uds-connector/CHANGELOG.md), [aimdb-websocket-connector](aimdb-websocket-connector/CHANGELOG.md), [aimdb-client](aimdb-client/CHANGELOG.md), [aimdb-mqtt-connector](aimdb-mqtt-connector/CHANGELOG.md), [aimdb-knx-connector](aimdb-knx-connector/CHANGELOG.md), [aimdb-embassy-adapter](aimdb-embassy-adapter/CHANGELOG.md)) @@ -42,6 +44,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed (breaking) +- **Design 034 Phase 2 — registrar lifetime fix + de-erased builder internals (Issue #130, [review doc](docs/design/034-technical-debt-review.md)).** `RecordRegistrar`'s fluent methods now take fresh borrows (`&mut self -> &mut Self`) instead of borrowing the registrar for its entire lifetime — a `configure` closure can finally use separate statements (`reg.source_raw(…); reg.tap_raw(…);`) and reuse the registrar after a chain. `configure`'s closure bound drops its HRTB; `OutboundConnectorBuilder`/`InboundConnectorBuilder` gain a second lifetime parameter (`<'r, 'a, T, R>`); `RecordT::register` and the adapter/persistence extension traits follow. Internally, `AimDbBuilder` stores its spawn/start functions typed (`SpawnFnType`/`StartFnType`) instead of `Box` — the panicking downcasts in `build()` are gone, and `AimDb`'s struct-level bound moved to its impls. Closure-based user code compiles unchanged. ([aimdb-core](aimdb-core/CHANGELOG.md), [aimdb-embassy-adapter](aimdb-embassy-adapter/CHANGELOG.md), [aimdb-persistence](aimdb-persistence/CHANGELOG.md)) + - **Design 034 Phase 1 — mechanical debt cleanup (Issues #129, #132, [review doc](docs/design/034-technical-debt-review.md)).** `DbError` is unified on `alloc::string::String`: every variant has one shape on all targets, `thiserror` derives `Display`/`Error` unconditionally (now a mandatory no_std dependency of `aimdb-core`), and no_std builds produce the same error messages as std builds instead of `Error 0xNNNN` codes — on no_std the `_field: ()` placeholders become the real `String` fields. The dead `Database` wrapper, the `TokioDatabase`/`EmbassyDatabase` aliases, and the deprecated `RecordRegistrar::link()` are removed. `ConsumerTrait::subscribe_any` is infallible (returns `Box`), `OutboundRoute` is a struct with named fields, and `aimdb-core`'s `std` feature no longer pulls a `tokio` dependency (tests cover it via dev-deps). Internally: all dual std/alloc import pairs and the duplicated `RuntimeContext` impl blocks are collapsed, and crate-private `log_*!` macros replace the 62 per-call-site `#[cfg(feature = "tracing")]` gates. The `aimdb-wasm-adapter` ring-lag error now carries a `buffer_name` like the other adapters. ([aimdb-core](aimdb-core/CHANGELOG.md), [aimdb-tokio-adapter](aimdb-tokio-adapter/CHANGELOG.md), [aimdb-embassy-adapter](aimdb-embassy-adapter/CHANGELOG.md), [aimdb-mqtt-connector](aimdb-mqtt-connector/CHANGELOG.md), [aimdb-knx-connector](aimdb-knx-connector/CHANGELOG.md)) - **CLI/MCP endpoint surface reworked to `--connect ` (Issue #123).** `aimdb`'s per-command `--socket ` is replaced by a global `--connect ` (`AIMDB_CONNECT` env; bare paths still work). `aimdb-mcp` renames the tools' `socket_path` param to `endpoint`, the startup `--socket` to `--connect`, and `AIMDB_SOCKET` to `AIMDB_CONNECT`; the pool is keyed by endpoint URL. `aimdb-client`'s `AimxConnection::connect` takes a `&str` endpoint (was a path) and `ClientError::ConnectionFailed.socket` is renamed `endpoint`. Serial endpoints (`serial://…`) require building the CLI/MCP with `--features transport-serial`. ([aimdb-client](aimdb-client/CHANGELOG.md), [tools/aimdb-cli](tools/aimdb-cli/CHANGELOG.md), [tools/aimdb-mcp](tools/aimdb-mcp/CHANGELOG.md)) diff --git a/aimdb-core/CHANGELOG.md b/aimdb-core/CHANGELOG.md index b24bfba..3786360 100644 --- a/aimdb-core/CHANGELOG.md +++ b/aimdb-core/CHANGELOG.md @@ -25,6 +25,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Internal refactors +- **Phase 2 — builder internals de-erased (Issue #130, [design doc §3.2](../docs/design/034-technical-debt-review.md)).** No behavior change: `AimDbBuilder` stores its per-record future collectors and `on_start` tasks as their typed forms (`Vec<(StringKey, SpawnFnType)>` / `Vec>`) instead of `Box` — the builder is already generic over `R`, so the erasure bought nothing. The panicking downcasts in `build()` (`"spawn function type mismatch"`, `"on_start fn type mismatch"`) are deleted. To keep the field types well-formed on the `NoRuntime` typestate, `AimDb`'s struct-level `R: RuntimeAdapter + 'static` bound moved to its impl blocks (strictly more permissive). `BoxFuture` is now a re-export of the canonical alias in `aimdb-executor` (same type). + - **Phase 1 mechanical cleanup (Issue #132, [design doc](../docs/design/034-technical-debt-review.md)).** No behavior change: - All dual `#[cfg(feature = "std")] use std::… / #[cfg(not(…))] use alloc::…` import pairs replaced by single unconditional `use alloc::…` imports; redundant per-module `extern crate alloc;` declarations dropped (the crate root has one). The duplicated std/no_std `RuntimeContext` impl blocks in `context.rs` (character-identical except for the `Arc` path) are merged into one. - New crate-private `log_debug!`/`log_info!`/`log_warn!`/`log_error!` macros (`src/log.rs`) forward to `tracing` when the feature is on and expand to an argument-borrowing no-op otherwise — deleting all 62 per-call-site `#[cfg(feature = "tracing")]` gates. `defmt` is deliberately not folded in (most sites use `{:?}` with non-`defmt::Format` types); router.rs keeps its paired explicit defmt gates. @@ -41,6 +43,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed (breaking) +- **Phase 2 — `RecordRegistrar` fluent methods take fresh borrows (Issue #130, [design doc §3.5](../docs/design/034-technical-debt-review.md)).** The methods previously took `&'a mut self` and returned `&'a mut Self` with `'a` = the struct's own lifetime parameter, so the first call borrowed the registrar for its entire remaining lifetime and only one unbroken chain per `configure` closure was possible. Now: + - All fluent methods are `&mut self -> &mut Self`; separate statements (`reg.source_raw(…); reg.tap_raw(…);`) and registrar reuse after a chain work. + - `configure`'s closure bound is `FnOnce(&mut RecordRegistrar<'_, T, R>)` (HRTB dropped) — existing closures compile unchanged. + - `OutboundConnectorBuilder` / `InboundConnectorBuilder` are now `<'r, 'a, T, R>` (`'r` borrows the registrar, `'a` is the registrar's record borrow) instead of the double-`'a`; `finish()` returns `&'r mut RecordRegistrar<'a, T, R>`, so `.finish().link_from(…)` chains keep working. + - `RecordT::register` is `fn register(reg: &mut RecordRegistrar<'_, Self, R>, cfg: &Self::Config)` (was `fn register<'a>(reg: &'a mut RecordRegistrar<'a, …>)`); the `impl_record_registrar_ext!` macro and the adapter/persistence extension traits follow the same shape. Code that *names* the old signatures (custom `RecordT` impls, custom extension traits) must update; closure-based configuration is source-compatible. + - **`DbError` unified on `alloc::string::String` — one shape per variant on every target (Issue #129, [design doc §3.1.1](../docs/design/034-technical-debt-review.md)).** The std/no_std dual-field design (`key: String` under `std`, `_key: ()` otherwise) is gone: every context field is a `String` unconditionally (the crate already requires `alloc` everywhere), and `Display`/`Error` derive from `thiserror` on every target (thiserror 2.x is no_std-capable, now a mandatory `default-features = false` dependency; the `remote-access` feature no longer lists it). Consequences: - **no_std only:** field renames (`_endpoint`/`_reason`/`_buffer_name`/`_key`/… `: ()` → the real `String` fields); `Display` now prints the same full messages as std builds instead of the `Error 0xNNNN` numeric-code table (which is deleted); `DbError` now implements `core::error::Error`. `error_code()` / `error_category()` are unchanged. - **std:** no change to variant shapes or messages. diff --git a/aimdb-core/src/builder.rs b/aimdb-core/src/builder.rs index 34a2093..bd298cb 100644 --- a/aimdb-core/src/builder.rs +++ b/aimdb-core/src/builder.rs @@ -5,7 +5,6 @@ use core::any::TypeId; use core::fmt::Debug; -use core::marker::PhantomData; use alloc::{ boxed::Box, @@ -19,17 +18,13 @@ use crate::extensions::Extensions; use crate::graph::DependencyGraph; /// Shorthand for a heap-pinned, `Send`, `'static` future — the unit of work -/// the `AimDbRunner` drives. -pub type BoxFuture = core::pin::Pin + Send + 'static>>; +/// the `AimDbRunner` drives. Canonical definition lives in `aimdb-executor`. +pub type BoxFuture = aimdb_executor::BoxFuture; -/// Type-erased on_start function stored in `AimDbBuilder::start_fns`. -/// -/// Defined once here so `on_start()` (which stores) and `build()` (which -/// downcasts) share the *exact same* type and a silent type mismatch cannot -/// cause a runtime panic. Single alias regardless of `std`/`no_std`. +/// `on_start` task stored in `AimDbBuilder::start_fns`, invoked at `build()`. type StartFnType = Box) -> BoxFuture + Send>; -/// Type-erased per-record future collector stored in `AimDbBuilder::spawn_fns`. +/// Per-record future collector stored in `AimDbBuilder::spawn_fns`. /// /// At `build()` time each is invoked in topological order; the returned /// `Vec` is appended to the runner's accumulator. @@ -294,20 +289,18 @@ pub struct AimDbBuilder { /// Connector builders that will be invoked during build() connector_builders: Vec>>, - /// Spawn functions with their keys - spawn_fns: Vec<(StringKey, Box)>, + /// Per-record future collectors with their keys. Always empty on the + /// `NoRuntime` typestate — `configure()` only exists once `R` is fixed. + spawn_fns: Vec<(StringKey, SpawnFnType)>, /// Startup tasks registered via on_start() — spawned after build() completes. - /// Stored type-erased (`Box) -> BoxFuture<…>>>`) to allow - /// the field to exist on the unparameterised NoRuntime builder too. - start_fns: Vec>, + /// Always empty on the `NoRuntime` typestate — `on_start()` only exists + /// once `R` is fixed. + start_fns: Vec>, /// Generic extension storage for external crates (e.g., persistence, metrics). /// Moved into AimDbInner during build() so it can be read on the live AimDb handle. extensions: Extensions, - - /// PhantomData to track the runtime type parameter - _phantom: PhantomData, } impl AimDbBuilder { @@ -322,7 +315,6 @@ impl AimDbBuilder { spawn_fns: Vec::new(), start_fns: Vec::new(), extensions: Extensions::new(), - _phantom: PhantomData, } } @@ -332,15 +324,14 @@ impl AimDbBuilder { /// /// # Type Safety Note /// - /// The `connector_builders` field is intentionally reset to `Vec::new()` during this - /// transition because connectors are parameterized by the runtime type: - /// - /// - Before: `Vec>>` - /// - After: `Vec>>` - /// - /// These types are incompatible and cannot be transferred. However, this is not a bug - /// because `.with_connector()` is only available AFTER calling `.runtime()` (it's defined - /// in the `impl where R: RuntimeAdapter` block, not in `impl AimDbBuilder`). + /// The `connector_builders`, `spawn_fns` and `start_fns` fields are + /// intentionally reset to `Vec::new()` during this transition: all three + /// are parameterized by the runtime type (`ConnectorBuilder` → + /// `ConnectorBuilder`, etc.), and the `NoRuntime` instantiations are + /// incompatible with — and provably empty before — the typed ones, because + /// `.with_connector()`, `.configure()` and `.on_start()` are only available + /// AFTER calling `.runtime()` (they're defined in the + /// `impl where R: RuntimeAdapter` block, not in `impl AimDbBuilder`). /// /// This means the type system **enforces** the correct call order: /// ```rust,ignore @@ -360,9 +351,8 @@ impl AimDbBuilder { runtime: Some(rt), connector_builders: Vec::new(), spawn_fns: Vec::new(), - start_fns: self.start_fns, + start_fns: Vec::new(), extensions: self.extensions, - _phantom: PhantomData, } } } @@ -407,11 +397,8 @@ where F: FnOnce(Arc) -> Fut + Send + 'static, Fut: core::future::Future + Send + 'static, { - // Type-erase so the field can be shared with the `NoRuntime` builder struct. - // Uses the module-level `StartFnType` alias — must stay in sync with - // the downcast in `build()`. - let boxed: StartFnType = Box::new(move |runtime| Box::pin(f(runtime))); - self.start_fns.push(Box::new(boxed)); + self.start_fns + .push(Box::new(move |runtime| Box::pin(f(runtime)))); self } @@ -494,7 +481,7 @@ where pub fn configure( &mut self, key: impl RecordKey, - f: impl for<'a> FnOnce(&'a mut RecordRegistrar<'a, T, R>), + f: impl FnOnce(&mut RecordRegistrar<'_, T, R>), ) -> &mut Self where T: Send + Sync + 'static + Debug + Clone, @@ -566,8 +553,7 @@ where ) }); - // Store the spawn function (type-erased in Box) - self.spawn_fns.push((spawn_key, Box::new(spawn_fn))); + self.spawn_fns.push((spawn_key, spawn_fn)); } self @@ -776,7 +762,7 @@ where log_info!("Collecting futures for {} records", self.spawn_fns.len()); // Build a lookup map from spawn_fns for topological ordering - let mut spawn_fn_map: HashMap> = + let mut spawn_fn_map: HashMap> = self.spawn_fns.into_iter().collect(); // Execute collectors in topological order — transforms collect after their inputs. @@ -787,15 +773,11 @@ where continue; }; - let Some(spawn_fn_any) = spawn_fn_map.remove(&key) else { + let Some(spawn_fn) = spawn_fn_map.remove(&key) else { continue; }; - let spawn_fn = spawn_fn_any - .downcast::>() - .expect("spawn function type mismatch"); - - futures_acc.extend((*spawn_fn)(&runtime, &db, id)?); + futures_acc.extend(spawn_fn(&runtime, &db, id)?); } log_info!("Record future collection complete"); @@ -827,13 +809,8 @@ where if !self.start_fns.is_empty() { log_debug!("Collecting {} on_start future(s)", self.start_fns.len()); - for (idx, start_fn_any) in self.start_fns.into_iter().enumerate() { - let start_fn = start_fn_any - .downcast::>() - .unwrap_or_else(|_| { - panic!("on_start fn[{idx}] type mismatch — this is a bug in aimdb-core") - }); - futures_acc.push((*start_fn)(runtime.clone())); + for start_fn in self.start_fns { + futures_acc.push(start_fn(runtime.clone())); } } @@ -868,7 +845,10 @@ impl Default for AimDbBuilder { /// .register_record::(&TemperatureConfig) /// .build()?; /// ``` -pub struct AimDb { +// No struct-level bound: `SpawnFnType` must be a well-formed type even for +// the builder's `NoRuntime` typestate (where it is never instantiated). All +// functionality lives on `R: RuntimeAdapter` impls. +pub struct AimDb { /// Internal state inner: Arc, @@ -880,7 +860,7 @@ pub struct AimDb { profiling_clock: crate::profiling::Clock, } -impl Clone for AimDb { +impl Clone for AimDb { fn clone(&self) -> Self { Self { inner: self.inner.clone(), diff --git a/aimdb-core/src/ext_macros.rs b/aimdb-core/src/ext_macros.rs index 2956294..6baae48 100644 --- a/aimdb-core/src/ext_macros.rs +++ b/aimdb-core/src/ext_macros.rs @@ -61,15 +61,15 @@ macro_rules! impl_record_registrar_ext { { /// Configures a buffer using inline configuration fn buffer( - &'a mut self, + &mut self, cfg: $crate::buffer::BufferCfg, - ) -> &'a mut $crate::RecordRegistrar<'a, T, $runtime>; + ) -> &mut $crate::RecordRegistrar<'a, T, $runtime>; /// Registers a producer with automatic runtime context injection fn source( - &'a mut self, + &mut self, f: F, - ) -> &'a mut $crate::RecordRegistrar<'a, T, $runtime> + ) -> &mut $crate::RecordRegistrar<'a, T, $runtime> where F: FnOnce($crate::RuntimeContext<$runtime>, $crate::Producer) -> Fut + Send @@ -79,9 +79,9 @@ macro_rules! impl_record_registrar_ext { /// Registers a consumer with automatic runtime context injection fn tap( - &'a mut self, + &mut self, f: F, - ) -> &'a mut $crate::RecordRegistrar<'a, T, $runtime> + ) -> &mut $crate::RecordRegistrar<'a, T, $runtime> where F: FnOnce($crate::RuntimeContext<$runtime>, $crate::Consumer) -> Fut + Send @@ -93,10 +93,10 @@ macro_rules! impl_record_registrar_ext { /// Derives this record from an input record. Panics if a `.source()` or /// another `.transform()` is already registered. fn transform( - &'a mut self, + &mut self, input_key: impl $crate::RecordKey, build_fn: F, - ) -> &'a mut $crate::RecordRegistrar<'a, T, $runtime> + ) -> &mut $crate::RecordRegistrar<'a, T, $runtime> where I: Send + Sync + Clone + core::fmt::Debug + 'static, F: FnOnce( @@ -111,9 +111,9 @@ macro_rules! impl_record_registrar_ext { T: Send + Sync + Clone + core::fmt::Debug + 'static, { fn buffer( - &'a mut self, + &mut self, cfg: $crate::buffer::BufferCfg, - ) -> &'a mut $crate::RecordRegistrar<'a, T, $runtime> { + ) -> &mut $crate::RecordRegistrar<'a, T, $runtime> { use $crate::buffer::Buffer; extern crate alloc; @@ -124,9 +124,9 @@ macro_rules! impl_record_registrar_ext { } fn source( - &'a mut self, + &mut self, f: F, - ) -> &'a mut $crate::RecordRegistrar<'a, T, $runtime> + ) -> &mut $crate::RecordRegistrar<'a, T, $runtime> where F: FnOnce($crate::RuntimeContext<$runtime>, $crate::Producer) -> Fut + Send @@ -141,9 +141,9 @@ macro_rules! impl_record_registrar_ext { } fn tap( - &'a mut self, + &mut self, f: F, - ) -> &'a mut $crate::RecordRegistrar<'a, T, $runtime> + ) -> &mut $crate::RecordRegistrar<'a, T, $runtime> where F: FnOnce($crate::RuntimeContext<$runtime>, $crate::Consumer) -> Fut + Send @@ -157,10 +157,10 @@ macro_rules! impl_record_registrar_ext { } fn transform( - &'a mut self, + &mut self, input_key: impl $crate::RecordKey, build_fn: F, - ) -> &'a mut $crate::RecordRegistrar<'a, T, $runtime> + ) -> &mut $crate::RecordRegistrar<'a, T, $runtime> where I: Send + Sync + Clone + core::fmt::Debug + 'static, F: FnOnce( @@ -191,15 +191,15 @@ macro_rules! impl_record_registrar_ext { { /// Configures a buffer using inline configuration fn buffer( - &'a mut self, + &mut self, cfg: $crate::buffer::BufferCfg, - ) -> &'a mut $crate::RecordRegistrar<'a, T, $runtime>; + ) -> &mut $crate::RecordRegistrar<'a, T, $runtime>; /// Registers a producer with automatic runtime context injection fn source( - &'a mut self, + &mut self, f: F, - ) -> &'a mut $crate::RecordRegistrar<'a, T, $runtime> + ) -> &mut $crate::RecordRegistrar<'a, T, $runtime> where F: FnOnce($crate::RuntimeContext<$runtime>, $crate::Producer) -> Fut + Send @@ -209,9 +209,9 @@ macro_rules! impl_record_registrar_ext { /// Registers a consumer with automatic runtime context injection fn tap( - &'a mut self, + &mut self, f: F, - ) -> &'a mut $crate::RecordRegistrar<'a, T, $runtime> + ) -> &mut $crate::RecordRegistrar<'a, T, $runtime> where F: FnOnce($crate::RuntimeContext<$runtime>, $crate::Consumer) -> Fut + Send @@ -220,10 +220,10 @@ macro_rules! impl_record_registrar_ext { /// Single-input reactive transform. fn transform( - &'a mut self, + &mut self, input_key: impl $crate::RecordKey, build_fn: F, - ) -> &'a mut $crate::RecordRegistrar<'a, T, $runtime> + ) -> &mut $crate::RecordRegistrar<'a, T, $runtime> where I: Send + Sync + Clone + core::fmt::Debug + 'static, F: FnOnce( @@ -238,9 +238,9 @@ macro_rules! impl_record_registrar_ext { T: Send + Sync + Clone + core::fmt::Debug + 'static, { fn buffer( - &'a mut self, + &mut self, cfg: $crate::buffer::BufferCfg, - ) -> &'a mut $crate::RecordRegistrar<'a, T, $runtime> { + ) -> &mut $crate::RecordRegistrar<'a, T, $runtime> { use $crate::buffer::Buffer; extern crate alloc; @@ -251,9 +251,9 @@ macro_rules! impl_record_registrar_ext { } fn source( - &'a mut self, + &mut self, f: F, - ) -> &'a mut $crate::RecordRegistrar<'a, T, $runtime> + ) -> &mut $crate::RecordRegistrar<'a, T, $runtime> where F: FnOnce($crate::RuntimeContext<$runtime>, $crate::Producer) -> Fut + Send @@ -268,9 +268,9 @@ macro_rules! impl_record_registrar_ext { } fn tap( - &'a mut self, + &mut self, f: F, - ) -> &'a mut $crate::RecordRegistrar<'a, T, $runtime> + ) -> &mut $crate::RecordRegistrar<'a, T, $runtime> where F: FnOnce($crate::RuntimeContext<$runtime>, $crate::Consumer) -> Fut + Send @@ -284,10 +284,10 @@ macro_rules! impl_record_registrar_ext { } fn transform( - &'a mut self, + &mut self, input_key: impl $crate::RecordKey, build_fn: F, - ) -> &'a mut $crate::RecordRegistrar<'a, T, $runtime> + ) -> &mut $crate::RecordRegistrar<'a, T, $runtime> where I: Send + Sync + Clone + core::fmt::Debug + 'static, F: FnOnce( diff --git a/aimdb-core/src/typed_api.rs b/aimdb-core/src/typed_api.rs index 2970c40..3d08a17 100644 --- a/aimdb-core/src/typed_api.rs +++ b/aimdb-core/src/typed_api.rs @@ -374,7 +374,7 @@ where /// reg.source(|ctx, producer| async move { /* ... */ }) /// .with_name("sensor_reader"); /// ``` - pub fn with_name(&'a mut self, name: &str) -> &'a mut Self { + pub fn with_name(&mut self, name: &str) -> &mut Self { #[cfg(feature = "profiling")] if let Some((kind, idx)) = self.last_stage { self.rec.profiling_mut().set_stage_name(kind, idx, name); @@ -395,7 +395,7 @@ where /// - Runtime adapter implementations to provide convenient wrappers /// - Internal connector implementations /// - Advanced use cases requiring direct control - pub fn source_raw(&'a mut self, f: F) -> &'a mut Self + pub fn source_raw(&mut self, f: F) -> &mut Self where F: FnOnce(crate::Producer, Arc) -> Fut + Send @@ -426,7 +426,7 @@ where /// - Runtime adapter implementations to provide convenient wrappers /// - Internal connector implementations (e.g., `.link_to()` creates consumers via this method) /// - Advanced use cases requiring direct control - pub fn tap_raw(&'a mut self, f: F) -> &'a mut Self + pub fn tap_raw(&mut self, f: F) -> &mut Self where F: FnOnce(crate::Consumer, Arc) -> Fut + Send @@ -460,24 +460,24 @@ where /// /// **Note:** For metadata tracking in std mode, call `buffer_with_cfg()` instead, /// or call `buffer_cfg()` separately to set the configuration. - pub fn buffer_raw(&'a mut self, buffer: Box>) -> &'a mut Self { + pub fn buffer_raw(&mut self, buffer: Box>) -> &mut Self { self.rec.set_buffer(buffer); self } /// Configures a buffer with metadata tracking pub fn buffer_with_cfg( - &'a mut self, + &mut self, buffer: Box>, cfg: crate::buffer::BufferCfg, - ) -> &'a mut Self { + ) -> &mut Self { self.rec.set_buffer(buffer); self.rec.set_buffer_cfg(cfg); self } /// Sets the buffer configuration for metadata tracking - pub fn buffer_cfg(&'a mut self, cfg: crate::buffer::BufferCfg) -> &'a mut Self { + pub fn buffer_cfg(&mut self, cfg: crate::buffer::BufferCfg) -> &mut Self { self.rec.set_buffer_cfg(cfg); self } @@ -497,7 +497,7 @@ where /// }); /// ``` #[cfg(feature = "json-serialize")] - pub fn with_remote_access(&'a mut self) -> &'a mut Self + pub fn with_remote_access(&mut self) -> &mut Self where T: crate::codec::RemoteSerialize + 'static, { @@ -519,10 +519,10 @@ where /// * `input_key` - The record key to subscribe to as input /// * `build_fn` - Closure that configures the transform pipeline via `TransformBuilder` pub fn transform_raw( - &'a mut self, + &mut self, input_key: impl crate::RecordKey, build_fn: F, - ) -> &'a mut Self + ) -> &mut Self where I: Send + Sync + Clone + Debug + 'static, F: FnOnce( @@ -543,7 +543,7 @@ where /// Register a multi-input join transform (low-level API). /// /// Panics if a `.source()` or another `.transform()` is already registered. - pub fn transform_join_raw(&'a mut self, build_fn: F) -> &'a mut Self + pub fn transform_join_raw(&mut self, build_fn: F) -> &mut Self where R: aimdb_executor::JoinFanInRuntime, F: FnOnce(crate::transform::JoinBuilder) -> crate::transform::JoinPipeline, @@ -561,7 +561,7 @@ where /// Derives this record from multiple input records. Available on any runtime /// that implements `JoinFanInRuntime`. Panics if a `.source()` or another /// `.transform()` is already registered. - pub fn transform_join(&'a mut self, build_fn: F) -> &'a mut Self + pub fn transform_join(&mut self, build_fn: F) -> &mut Self where R: aimdb_executor::JoinFanInRuntime, F: FnOnce(crate::transform::JoinBuilder) -> crate::transform::JoinPipeline, @@ -583,7 +583,7 @@ where /// .finish() /// }); /// ``` - pub fn link_to(&'a mut self, url: &str) -> OutboundConnectorBuilder<'a, T, R> { + pub fn link_to(&mut self, url: &str) -> OutboundConnectorBuilder<'_, 'a, T, R> { OutboundConnectorBuilder { registrar: self, url: url.to_string(), @@ -608,7 +608,7 @@ where /// .finish() /// }); /// ``` - pub fn link_from(&'a mut self, url: &str) -> InboundConnectorBuilder<'a, T, R> { + pub fn link_from(&mut self, url: &str) -> InboundConnectorBuilder<'_, 'a, T, R> { InboundConnectorBuilder { registrar: self, url: url.to_string(), @@ -625,12 +625,16 @@ where // ============================================================================ /// Builder for configuring outbound connector links (AimDB → External) +/// +/// `'r` is the borrow of the registrar taken by `link_to()`; `'a` is the +/// registrar's own borrow of the record being configured. pub struct OutboundConnectorBuilder< + 'r, 'a, T: Send + Sync + 'static + Debug + Clone, R: aimdb_executor::RuntimeAdapter + 'static, > { - registrar: &'a mut RecordRegistrar<'a, T, R>, + registrar: &'r mut RecordRegistrar<'a, T, R>, url: String, config: Vec<(String, String)>, serializer: Option>, @@ -638,7 +642,7 @@ pub struct OutboundConnectorBuilder< topic_provider: Option, } -impl<'a, T, R> OutboundConnectorBuilder<'a, T, R> +impl<'r, 'a, T, R> OutboundConnectorBuilder<'r, 'a, T, R> where T: Send + Sync + 'static + Debug + Clone, R: aimdb_executor::RuntimeAdapter + 'static, @@ -759,7 +763,7 @@ where } /// Finalizes the connector registration - pub fn finish(self) -> &'a mut RecordRegistrar<'a, T, R> { + pub fn finish(self) -> &'r mut RecordRegistrar<'a, T, R> { use crate::connector::{ConnectorLink, ConnectorUrl}; let url = ConnectorUrl::parse(&self.url) @@ -877,12 +881,16 @@ where type TypedDeserializerFn = Arc Result + Send + Sync + 'static>; /// Builder for configuring inbound connector links (External → AimDB) +/// +/// `'r` is the borrow of the registrar taken by `link_from()`; `'a` is the +/// registrar's own borrow of the record being configured. pub struct InboundConnectorBuilder< + 'r, 'a, T: Send + Sync + 'static + Debug + Clone, R: aimdb_executor::RuntimeAdapter + 'static, > { - registrar: &'a mut RecordRegistrar<'a, T, R>, + registrar: &'r mut RecordRegistrar<'a, T, R>, url: String, config: Vec<(String, String)>, deserializer: Option>, @@ -890,7 +898,7 @@ pub struct InboundConnectorBuilder< topic_resolver: Option, } -impl<'a, T, R> InboundConnectorBuilder<'a, T, R> +impl<'r, 'a, T, R> InboundConnectorBuilder<'r, 'a, T, R> where T: Send + Sync + 'static + Debug + Clone, R: aimdb_executor::RuntimeAdapter + 'static, @@ -1009,7 +1017,7 @@ where /// - If the URL is invalid /// - If the record already has a `.source()` or `.transform()` /// (local producer + inbound connector would race as last-writer-wins) - pub fn finish(self) -> &'a mut RecordRegistrar<'a, T, R> { + pub fn finish(self) -> &'r mut RecordRegistrar<'a, T, R> { use crate::connector::{ConnectorUrl, DeserializerKind, InboundConnectorLink}; let url = ConnectorUrl::parse(&self.url) @@ -1121,7 +1129,7 @@ pub trait RecordT: type Config; /// Registers producer and consumer functions - fn register<'a>(reg: &'a mut RecordRegistrar<'a, Self, R>, cfg: &Self::Config); + fn register(reg: &mut RecordRegistrar<'_, Self, R>, cfg: &Self::Config); } #[cfg(test)] @@ -1690,8 +1698,7 @@ mod tests { let extensions = crate::extensions::Extensions::new(); let mut reg = make_registrar(&mut rec, &builders, &extensions); - // Chain via finish() → &mut RecordRegistrar — the registrar's - // lifetime only permits one borrow chain at a time. + // Chained via finish() → &mut RecordRegistrar … reg.link_from("mqtt://broker/topic-a") .with_deserializer_raw(|_b: &[u8]| Ok(TestRecord { value: 0 })) .finish() @@ -1699,6 +1706,31 @@ mod tests { .with_deserializer_raw(|_b: &[u8]| Ok(TestRecord { value: 0 })) .finish(); - assert_eq!(rec.inbound_connectors().len(), 2); + // … and as separate statements: each call takes a fresh borrow, so + // the registrar is reusable after a chain ends (issue #130). + reg.link_from("mqtt://broker/topic-c") + .with_deserializer_raw(|_b: &[u8]| Ok(TestRecord { value: 0 })) + .finish(); + reg.with_name("third-link"); + + assert_eq!(rec.inbound_connectors().len(), 3); + } + + /// Registrar methods take fresh borrows (issue #130): separate + /// statements in a configure-style closure must compile. + #[test] + fn registrar_allows_separate_statements() { + let mut rec = crate::typed_record::TypedRecord::::new(); + rec.set_buffer(Box::new(MockBuffer)); + + let builders: Vec>> = vec![]; + let extensions = crate::extensions::Extensions::new(); + let mut reg = make_registrar(&mut rec, &builders, &extensions); + + reg.source_raw(|_p, _ctx| async move {}); + reg.tap_raw(|_c, _ctx| async move {}); + + assert!(rec.has_producer()); + assert_eq!(rec.consumer_count(), 1); } } diff --git a/aimdb-embassy-adapter/CHANGELOG.md b/aimdb-embassy-adapter/CHANGELOG.md index a38f153..0ad85ac 100644 --- a/aimdb-embassy-adapter/CHANGELOG.md +++ b/aimdb-embassy-adapter/CHANGELOG.md @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- **`RuntimeOps` implemented for `EmbassyAdapter` (Issue #130, design 034 Phase 2).** The dyn-safe capability surface from `aimdb-executor`, gated on `embassy-time` like `TimeOps`: `now_nanos()` is boot-anchored uptime at microsecond granularity (the portable lower bound), `sleep` boxes `embassy_time::Timer::after`, `unix_time` rides the `set_unix_time` anchor, `log` forwards to the defmt-backed `Logger`. Covered by the shared contract test on the host (the test time driver now wakes immediately on `schedule_wake`, so already-expired timers complete; non-zero sleeps remain unusable on the pinned-at-0 host clock). - **M17 — centralized Embassy connector spines: the one audited home for the single-core `unsafe` ([Design 033](../docs/design/033-M17-unify-connectors-drop-send.md)).** New `connectors` module (features `connectors` / `connector-io`) collecting the force-`Send` plumbing every Embassy connector used to hand-roll, so a connector crate carries **no `unsafe` and no `SendFutureWrapper`**: - **Session spine** — `EmbassySessionClient` / `EmbassySessionServer` (the Embassy duals of core's `SessionClientConnector` / `SessionServerConnector`), the one-shot `OneShotDialer` / `OneShotListener` over a moved-in peripheral connection (the listener parks forever after the first accept — point-to-point), and the force-`Send + Sync` `OneShotCell` for builders holding a moved-in value. `EmbassySessionClient::new` defaults to `reconnect: false` (unlike `ClientConfig::default`): a one-shot dialer can't redial, so the engine would otherwise spin on `TransportError::Io` forever; a re-dialable transport opts back in via `with_config`. - **Framed connection** (feature `connector-io`) — `EmbassyConnection` over `embedded-io-async` `Read`/`Write` halves with a pluggable `Framer` (COBS, length-prefix, …), chunking writes for atomic-or-error HAL `BufferedUart`s and skip-and-resync on undecodable runs. diff --git a/aimdb-embassy-adapter/src/buffer.rs b/aimdb-embassy-adapter/src/buffer.rs index 6390b20..a45e747 100644 --- a/aimdb-embassy-adapter/src/buffer.rs +++ b/aimdb-embassy-adapter/src/buffer.rs @@ -592,14 +592,19 @@ mod tests { core::panic!("defmt panic in host test") } - // Trivial time driver so `_embassy_time_now` resolves on the host. peek() - // never reads the clock; the driver only needs to exist for linking. + // Trivial time driver so `_embassy_time_now` resolves on the host. The + // clock is pinned at 0; `schedule_wake` wakes immediately so an + // already-expired `Timer` (e.g. `sleep(Duration::ZERO)` in the RuntimeOps + // contract test) completes on its next poll. Non-zero sleeps would spin + // forever — host tests must not use them. struct TestTimeDriver; impl embassy_time_driver::Driver for TestTimeDriver { fn now(&self) -> u64 { 0 } - fn schedule_wake(&self, _at: u64, _waker: &core::task::Waker) {} + fn schedule_wake(&self, _at: u64, waker: &core::task::Waker) { + waker.wake_by_ref(); + } } embassy_time_driver::time_driver_impl!(static TEST_TIME_DRIVER: TestTimeDriver = TestTimeDriver); diff --git a/aimdb-embassy-adapter/src/lib.rs b/aimdb-embassy-adapter/src/lib.rs index 239e91c..8074940 100644 --- a/aimdb-embassy-adapter/src/lib.rs +++ b/aimdb-embassy-adapter/src/lib.rs @@ -253,9 +253,9 @@ where /// immediately, producing no output). A `defmt::error!` is emitted, but set /// `CONSUMERS` high enough to avoid it altogether. fn buffer_sized( - &'a mut self, + &mut self, buffer_type: EmbassyBufferType, - ) -> &'a mut aimdb_core::RecordRegistrar<'a, T, EmbassyAdapter>; + ) -> &mut aimdb_core::RecordRegistrar<'a, T, EmbassyAdapter>; /// Registers a producer with additional context (e.g., hardware peripherals) /// @@ -289,10 +289,10 @@ where /// } /// ``` fn source_with_context( - &'a mut self, + &mut self, context: Ctx, f: F, - ) -> &'a mut aimdb_core::RecordRegistrar<'a, T, EmbassyAdapter> + ) -> &mut aimdb_core::RecordRegistrar<'a, T, EmbassyAdapter> where Ctx: Send + Sync + 'static, F: FnOnce(aimdb_core::RuntimeContext, aimdb_core::Producer, Ctx) -> Fut @@ -309,9 +309,9 @@ where T: Send + Sync + Clone + core::fmt::Debug + 'static, { fn buffer_sized( - &'a mut self, + &mut self, buffer_type: EmbassyBufferType, - ) -> &'a mut aimdb_core::RecordRegistrar<'a, T, EmbassyAdapter> { + ) -> &mut aimdb_core::RecordRegistrar<'a, T, EmbassyAdapter> { use aimdb_core::buffer::{Buffer, BufferCfg}; use alloc::boxed::Box; @@ -340,10 +340,10 @@ where } fn source_with_context( - &'a mut self, + &mut self, context: Ctx, f: F, - ) -> &'a mut aimdb_core::RecordRegistrar<'a, T, EmbassyAdapter> + ) -> &mut aimdb_core::RecordRegistrar<'a, T, EmbassyAdapter> where Ctx: Send + Sync + 'static, F: FnOnce(aimdb_core::RuntimeContext, aimdb_core::Producer, Ctx) -> Fut diff --git a/aimdb-embassy-adapter/src/runtime.rs b/aimdb-embassy-adapter/src/runtime.rs index 1f3e04c..4cd9da3 100644 --- a/aimdb-embassy-adapter/src/runtime.rs +++ b/aimdb-embassy-adapter/src/runtime.rs @@ -222,6 +222,41 @@ impl aimdb_executor::TimeOps for EmbassyAdapter { } } +#[cfg(feature = "embassy-time")] +impl aimdb_executor::RuntimeOps for EmbassyAdapter { + fn name(&self) -> &'static str { + ::runtime_name() + } + + fn now_nanos(&self) -> u64 { + // Boot-anchored monotonic uptime; microsecond granularity is the + // portable lower bound (see `duration_as_nanos`). + embassy_time::Instant::now() + .as_micros() + .saturating_mul(1_000) + } + + fn unix_time(&self) -> Option<(u64, u32)> { + ::unix_time(self) + } + + fn sleep(&self, d: core::time::Duration) -> aimdb_executor::BoxFuture { + let duration = + embassy_time::Duration::from_micros(d.as_micros().min(u64::MAX as u128) as u64); + alloc::boxed::Box::pin(embassy_time::Timer::after(duration)) + } + + fn log(&self, level: aimdb_executor::LogLevel, msg: &str) { + use aimdb_executor::{LogLevel, Logger}; + match level { + LogLevel::Debug => Logger::debug(self, msg), + LogLevel::Info => Logger::info(self, msg), + LogLevel::Warn => Logger::warn(self, msg), + LogLevel::Error => Logger::error(self, msg), + } + } +} + // Implement Logger trait impl aimdb_executor::Logger for EmbassyAdapter { fn info(&self, message: &str) { @@ -243,6 +278,29 @@ impl aimdb_executor::Logger for EmbassyAdapter { // Runtime trait is auto-implemented when RuntimeAdapter + TimeOps + Logger are all implemented +// Host-run contract test for the dyn-safe RuntimeOps surface. Gated like the +// join-queue tests: `embassy-sync` (not `embassy-runtime`) so the cortex-m +// executor never enters the host build; the defmt/time-driver stubs shared by +// the lib test binary live in `buffer.rs`'s test module. The stub clock is +// pinned at 0, so only `Duration::ZERO` sleeps complete — see +// `assert_runtime_ops_contract`'s docs. +#[cfg(all(test, feature = "embassy-sync", feature = "embassy-time"))] +mod runtime_ops_tests { + use super::*; + use aimdb_executor::RuntimeOps; + use alloc::sync::Arc; + use futures::executor::block_on; + + #[test] + fn runtime_ops_contract() { + // `Arc` must be constructible from the adapter. + let ops: Arc = Arc::new(EmbassyAdapter::new().unwrap()); + block_on(aimdb_executor::test_support::assert_runtime_ops_contract( + ops.as_ref(), + )); + } +} + // Implement EmbassyNetwork trait for accessing network stack #[cfg(feature = "embassy-net-support")] impl EmbassyNetwork for EmbassyAdapter { diff --git a/aimdb-executor/CHANGELOG.md b/aimdb-executor/CHANGELOG.md index 87629a3..7087a2e 100644 --- a/aimdb-executor/CHANGELOG.md +++ b/aimdb-executor/CHANGELOG.md @@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- **`RuntimeOps` — dyn-safe runtime capability trait (Issue #130, [design doc 034](../docs/design/034-technical-debt-review.md) §3.2/§3.3).** Object-safe counterpart to `RuntimeAdapter + TimeOps + Logger`: `name()`, `now_nanos()` (monotonic, arbitrary epoch), `unix_time()`, `sleep(core::time::Duration) -> BoxFuture` (boxed — one allocation per call, consistent with the existing cost model), and `log(LogLevel, &str)`. Lets a runtime travel as `Arc` where threading `R` through a signature isn't worth it; per-adapter impls are intentional (a blanket impl over `TimeOps` is impossible — `now_nanos` needs an epoch anchor and `TimeOps::Instant` is opaque). Groundwork for #131; core does not consume it yet. Ships with `LogLevel`, the canonical `BoxFuture` alias (moved here from `aimdb-core`, which re-exports it), and a `#[doc(hidden)] test_support::assert_runtime_ops_contract` shared behavioral test used by all three adapters. The crate now declares `extern crate alloc`. - **`TimeOps::unix_time(&self) -> Option<(u64, u32)>` (Issue #120).** Absolute wall-clock time — `(seconds, nanoseconds)` since the Unix epoch — distinct from the monotonic `now()` used to measure durations. Intended for human / remote display (e.g. AimX `record.list` metadata). The default impl returns `None` for platforms without a real-time clock (a bare MCU); runtimes backed by an OS clock (or an MCU with a configured RTC) override it. Default-method, so existing `TimeOps` impls need no change. - `futures-util` (alloc-only) as a regular dependency — provides `FuturesUnordered` used by `aimdb-core`'s `AimDbRunner`. diff --git a/aimdb-executor/src/lib.rs b/aimdb-executor/src/lib.rs index cc965ab..ea2c9a7 100644 --- a/aimdb-executor/src/lib.rs +++ b/aimdb-executor/src/lib.rs @@ -16,6 +16,8 @@ //! 1. **`RuntimeAdapter`** - Platform identity and metadata //! 2. **`TimeOps`** - Time operations (now, sleep, duration helpers) //! 3. **`Logger`** - Structured logging (info, debug, warn, error) +//! 4. **`RuntimeOps`** - Object-safe bundle of the above (`Arc`) +//! for code that holds the runtime as a value instead of a type parameter //! //! Task execution is driven by the `AimDbRunner` returned from //! `AimDbBuilder::build()`, which collects every future the database needs @@ -23,10 +25,17 @@ #![cfg_attr(not(feature = "std"), no_std)] +extern crate alloc; + use core::future::Future; pub mod join; +pub mod ops; +#[doc(hidden)] +pub mod test_support; + pub use join::{JoinFanInRuntime, JoinQueue, JoinReceiver, JoinSender}; +pub use ops::{BoxFuture, LogLevel, RuntimeOps}; // ============================================================================ // Error Types diff --git a/aimdb-executor/src/ops.rs b/aimdb-executor/src/ops.rs new file mode 100644 index 0000000..01eb7cf --- /dev/null +++ b/aimdb-executor/src/ops.rs @@ -0,0 +1,74 @@ +//! Dyn-safe runtime capability surface +//! +//! [`RuntimeOps`] is the object-safe counterpart to the generic trait family +//! ([`RuntimeAdapter`](crate::RuntimeAdapter) + [`TimeOps`](crate::TimeOps) + +//! [`Logger`](crate::Logger)). Those traits use associated types and +//! `impl Future` returns, so a runtime can only be held as a type parameter +//! `R`. `RuntimeOps` flattens the same capabilities onto concrete types +//! (`u64` nanoseconds, `core::time::Duration`, a boxed sleep future) so the +//! runtime can be passed as a *value* — `Arc` — wherever +//! threading `R` through a signature is not worth it. +//! +//! Design notes (issue #130, design doc 034 §3.2/§3.3): +//! +//! - **Concrete time types instead of associated types.** Adapters convert +//! internally; `aimdb-core`'s profiling clock already flattens instants to +//! nanoseconds the same way, proving the shape works. +//! - **Boxed `sleep` future**: one allocation per call, consistent with the +//! existing cost model — every service future is already boxed, and `alloc` +//! is mandatory on all supported targets. +//! - **Per-adapter implementations are intentional.** A blanket impl over +//! [`TimeOps`](crate::TimeOps) is impossible: `now_nanos` needs an epoch +//! anchor, and `TimeOps::Instant` is opaque with no place to store one. +//! +//! This trait is groundwork: `aimdb-core` does not consume it yet (that is +//! the follow-up de-genericization, issue #131). + +/// Heap-pinned, `Send`, `'static` future — the unit of work AimDB drives. +/// +/// Canonical definition; `aimdb_core::BoxFuture` re-exports this alias. +pub type BoxFuture = + core::pin::Pin + Send + 'static>>; + +/// Log severity for [`RuntimeOps::log`]. +/// +/// Mirrors the four discrete [`Logger`](crate::Logger) methods so adapters +/// can forward to their existing logging backends. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum LogLevel { + Debug, + Info, + Warn, + Error, +} + +/// Object-safe runtime capabilities: identity, time, sleep, and logging. +/// +/// Implemented by every runtime adapter (`TokioAdapter`, `EmbassyAdapter`, +/// `WasmAdapter`) so a runtime can travel as `Arc` instead of +/// a generic parameter. +pub trait RuntimeOps: Send + Sync { + /// Short adapter identity, e.g. `"tokio"` — matches + /// [`RuntimeAdapter::runtime_name`](crate::RuntimeAdapter::runtime_name). + fn name(&self) -> &'static str; + + /// Monotonic clock reading in nanoseconds from an arbitrary epoch. + /// + /// Only differences between two readings are meaningful. Never decreases; + /// saturates at `u64::MAX` rather than wrapping. + fn now_nanos(&self) -> u64; + + /// Wall-clock time as `(seconds, nanoseconds)` since the Unix epoch, if + /// the runtime has a real-time clock. + /// + /// Returns `None` on platforms without a wall clock (e.g. a bare MCU with + /// no RTC anchor) — same contract as + /// [`TimeOps::unix_time`](crate::TimeOps::unix_time). + fn unix_time(&self) -> Option<(u64, u32)>; + + /// Completes after at least `d` has elapsed. + fn sleep(&self, d: core::time::Duration) -> BoxFuture; + + /// Emits `msg` at `level` through the adapter's logging backend. + fn log(&self, level: LogLevel, msg: &str); +} diff --git a/aimdb-executor/src/test_support.rs b/aimdb-executor/src/test_support.rs new file mode 100644 index 0000000..8ff6ef1 --- /dev/null +++ b/aimdb-executor/src/test_support.rs @@ -0,0 +1,41 @@ +//! Shared behavioral contract for [`RuntimeOps`](crate::RuntimeOps) implementations. +//! +//! Each adapter crate calls [`assert_runtime_ops_contract`] from a test running +//! under its own executor (`#[tokio::test]`, `block_on`, `#[wasm_bindgen_test]`), +//! mirroring how the join-queue behavioral tests are duplicated per adapter. + +use crate::{LogLevel, RuntimeOps}; + +/// Asserts the behavioral contract every `RuntimeOps` implementation must hold. +/// +/// Uses `Duration::ZERO` for the sleep check: the Embassy host-test time +/// driver reports `now() == 0` with a no-op `schedule_wake`, so any non-zero +/// sleep would hang forever on the host. Adapters with a real clock should +/// additionally assert that a non-zero sleep advances `now_nanos` in their +/// own tests. +pub async fn assert_runtime_ops_contract(ops: &dyn RuntimeOps) { + assert!(!ops.name().is_empty(), "name() must be non-empty"); + + let t0 = ops.now_nanos(); + let t1 = ops.now_nanos(); + assert!(t1 >= t0, "now_nanos() must be monotonic ({t1} < {t0})"); + + ops.log(LogLevel::Debug, "runtime-ops contract: debug"); + ops.log(LogLevel::Info, "runtime-ops contract: info"); + ops.log(LogLevel::Warn, "runtime-ops contract: warn"); + ops.log(LogLevel::Error, "runtime-ops contract: error"); + + ops.sleep(core::time::Duration::ZERO).await; + + if let Some((secs, nanos)) = ops.unix_time() { + assert!( + nanos < 1_000_000_000, + "unix_time nanos out of range: {nanos}" + ); + // Sanity: any real wall clock reads after 2020-09-13 (1.6e9). + assert!( + secs > 1_600_000_000, + "unix_time seconds implausible: {secs}" + ); + } +} diff --git a/aimdb-persistence/CHANGELOG.md b/aimdb-persistence/CHANGELOG.md index 181aa5c..869fa36 100644 --- a/aimdb-persistence/CHANGELOG.md +++ b/aimdb-persistence/CHANGELOG.md @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed (breaking) +- **`RecordRegistrarPersistExt::persist` takes a fresh borrow (Issue #130, design 034 Phase 2).** `fn persist(&mut self, …) -> &mut RecordRegistrar<'a, T, R>` (was `&'a mut self -> &'a mut …`), following core's registrar lifetime fix — `persist()` no longer borrows the registrar for its entire remaining lifetime. Call sites compile unchanged. - All `R: Spawn` bounds replaced with `R: RuntimeAdapter` on public traits (`AimDbBuilderPersistExt`, `RecordRegistrarPersistExt`, `AimDbQueryExt`) and internal helpers (Issue #88). No behavioural change — `aimdb-persistence` never called `runtime.spawn` directly, the bound was just propagated. ## [0.1.1] - 2026-05-22 diff --git a/aimdb-persistence/src/ext.rs b/aimdb-persistence/src/ext.rs index 221f0a6..71f479e 100644 --- a/aimdb-persistence/src/ext.rs +++ b/aimdb-persistence/src/ext.rs @@ -23,7 +23,7 @@ where /// Spawns a background subscriber (via `tap_raw`) that serializes each /// value to JSON and writes it to the configured backend. Retention is /// managed by the cleanup task registered during `with_persistence()`. - fn persist(&'a mut self, record_name: impl Into) -> &'a mut RecordRegistrar<'a, T, R>; + fn persist(&mut self, record_name: impl Into) -> &mut RecordRegistrar<'a, T, R>; } impl<'a, T, R> RecordRegistrarPersistExt<'a, T, R> for RecordRegistrar<'a, T, R> @@ -31,7 +31,7 @@ where T: serde::Serialize + Send + Sync + Clone + core::fmt::Debug + 'static, R: RuntimeAdapter + 'static, { - fn persist(&'a mut self, record_name: impl Into) -> &'a mut RecordRegistrar<'a, T, R> { + fn persist(&mut self, record_name: impl Into) -> &mut RecordRegistrar<'a, T, R> { let record_name: String = record_name.into(); // Retrieve the backend from the builder's Extensions TypeMap, if configured. let backend: Option> = self diff --git a/aimdb-tokio-adapter/CHANGELOG.md b/aimdb-tokio-adapter/CHANGELOG.md index b862de0..4e61faa 100644 --- a/aimdb-tokio-adapter/CHANGELOG.md +++ b/aimdb-tokio-adapter/CHANGELOG.md @@ -13,6 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- **`RuntimeOps` implemented for `TokioAdapter` (Issue #130, design 034 Phase 2).** The dyn-safe capability surface from `aimdb-executor`: `now_nanos()` reports nanoseconds since a process-global `OnceLock` anchor (std `Instant` has no public epoch), `sleep` boxes `tokio::time::sleep`, `unix_time`/`log` forward to the existing `TimeOps`/`Logger` impls. Covered by the shared contract test plus a real-sleep monotonicity test. - **`TimeOps::unix_time()` implemented from the OS wall clock (Issue #120).** Returns `SystemTime::now()` since the Unix epoch as `(secs, subsec_nanos)`; `now()` stays monotonic for duration measurement. Supplies absolute timestamps to the runtime-neutral AimX server / remote-display paths. - **`TokioBuffer::peek()` (M15, Design 031).** Non-destructive buffer-native read backing AimX `record.get` / `TypedRecord::latest()`: `SingleLatest` (`Watch`) reads via `watch::Sender::borrow()`, `Mailbox` (`Notify`) clones the slot mutex, `SpmcRing` (`Broadcast`) returns `None` (no canonical latest). Unit tests cover all three buffer types (empty, populated, non-destructive, overwrite, drained). - **`tests/remote_access_validation.rs` integration test.** Asserts that a `.with_remote_access()` record with no buffer fails `build()`, and that the same record with a `SingleLatest` buffer builds — locking in the new build-time guard from `aimdb-core` (M15). diff --git a/aimdb-tokio-adapter/src/runtime.rs b/aimdb-tokio-adapter/src/runtime.rs index e3524d6..8248471 100644 --- a/aimdb-tokio-adapter/src/runtime.rs +++ b/aimdb-tokio-adapter/src/runtime.rs @@ -189,6 +189,42 @@ impl TimeOps for TokioAdapter { } } +#[cfg(feature = "tokio-runtime")] +impl aimdb_executor::RuntimeOps for TokioAdapter { + fn name(&self) -> &'static str { + ::runtime_name() + } + + fn now_nanos(&self) -> u64 { + // std `Instant` has no public epoch, so anchor the first reading and + // report nanoseconds elapsed since it (monotonic, arbitrary epoch). + static ANCHOR: std::sync::OnceLock = std::sync::OnceLock::new(); + ANCHOR + .get_or_init(Instant::now) + .elapsed() + .as_nanos() + .min(u64::MAX as u128) as u64 + } + + fn unix_time(&self) -> Option<(u64, u32)> { + ::unix_time(self) + } + + fn sleep(&self, d: core::time::Duration) -> aimdb_executor::BoxFuture { + Box::pin(tokio::time::sleep(d)) + } + + fn log(&self, level: aimdb_executor::LogLevel, msg: &str) { + use aimdb_executor::LogLevel; + match level { + LogLevel::Debug => Logger::debug(self, msg), + LogLevel::Info => Logger::info(self, msg), + LogLevel::Warn => Logger::warn(self, msg), + LogLevel::Error => Logger::error(self, msg), + } + } +} + #[cfg(feature = "tokio-runtime")] impl Logger for TokioAdapter { fn info(&self, message: &str) { @@ -212,3 +248,30 @@ impl Logger for TokioAdapter { } // Runtime trait is auto-implemented when RuntimeAdapter + TimeOps + Logger are implemented + +#[cfg(all(test, feature = "tokio-runtime"))] +mod runtime_ops_tests { + use super::*; + use aimdb_executor::RuntimeOps; + use std::sync::Arc; + + #[tokio::test] + async fn runtime_ops_contract() { + // `Arc` must be constructible from the adapter. + let ops: Arc = Arc::new(TokioAdapter); + aimdb_executor::test_support::assert_runtime_ops_contract(ops.as_ref()).await; + } + + #[tokio::test] + async fn runtime_ops_sleep_advances_clock() { + let ops: Arc = Arc::new(TokioAdapter); + let before = ops.now_nanos(); + ops.sleep(Duration::from_millis(10)).await; + let after = ops.now_nanos(); + assert!( + after - before >= 10_000_000, + "10ms sleep advanced now_nanos by only {}ns", + after - before + ); + } +} diff --git a/aimdb-wasm-adapter/CHANGELOG.md b/aimdb-wasm-adapter/CHANGELOG.md index b4a4caf..a80fdee 100644 --- a/aimdb-wasm-adapter/CHANGELOG.md +++ b/aimdb-wasm-adapter/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- **`RuntimeOps` implemented for `WasmAdapter` (Issue #130, design 034 Phase 2).** The dyn-safe capability surface from `aimdb-executor`: `now_nanos()` from `Performance.now()` (monotonic ms since page load), `unix_time()` from `Date.now()` (wall clock — the generic `TimeOps::unix_time` default still returns `None`), `sleep` boxes the setTimeout-Promise future, `log` forwards to the console-backed `Logger`. Browser contract test via `wasm_bindgen_test`; native fallback covered by a sync-surface test. + ### Changed - **`record.set` write path routes through `Producer` (M15, Design 031).** `bindings.rs` (`set`) and `ws_bridge.rs` now call `db.producer::(key)?.produce(val)` instead of the removed `TypedRecord::produce`. Internal only — no `#[wasm_bindgen]` / JS API change. diff --git a/aimdb-wasm-adapter/src/time.rs b/aimdb-wasm-adapter/src/time.rs index 70dc2ba..719f49d 100644 --- a/aimdb-wasm-adapter/src/time.rs +++ b/aimdb-wasm-adapter/src/time.rs @@ -185,3 +185,108 @@ impl TimeOps for WasmAdapter { } } } + +// ─── RuntimeOps (dyn-safe capability surface) ───────────────────────────── + +impl aimdb_executor::RuntimeOps for WasmAdapter { + fn name(&self) -> &'static str { + ::runtime_name() + } + + fn now_nanos(&self) -> u64 { + // `Performance.now()` is monotonic milliseconds since page load. + let now_ms = self.now(); + self.duration_as_nanos(WasmDuration(now_ms.0)) + } + + fn unix_time(&self) -> Option<(u64, u32)> { + #[cfg(all(feature = "wasm-runtime", target_arch = "wasm32"))] + { + // `Date.now()` is wall-clock milliseconds since the Unix epoch. + let ms = js_sys::Date::now(); + if ms <= 0.0 { + return None; + } + let secs = (ms / 1000.0) as u64; + let sub_nanos = ((ms % 1000.0) * 1_000_000.0) as u32; + Some((secs, sub_nanos)) + } + + #[cfg(not(all(feature = "wasm-runtime", target_arch = "wasm32")))] + { + None + } + } + + fn sleep(&self, d: core::time::Duration) -> aimdb_executor::BoxFuture { + extern crate alloc; + // `TimeOps::sleep`'s opaque return captures `&self`, so it cannot be + // boxed as `'static`; build the same future directly instead. + let ms = d.as_secs_f64() * 1000.0; + + #[cfg(all(feature = "wasm-runtime", target_arch = "wasm32"))] + { + use futures_util::FutureExt; + let fut = wasm_bindgen_futures::JsFuture::from(js_sys::Promise::new( + &mut |resolve, _reject| { + global_set_timeout(&resolve, ms as i32); + }, + )) + .map(|_result| ()); + // SAFETY rationale matches `TimeOps::sleep` above: wasm32 without + // atomics is single-threaded. + alloc::boxed::Box::pin(SendFuture(fut)) + } + + #[cfg(not(all(feature = "wasm-runtime", target_arch = "wasm32")))] + { + let _ = ms; + alloc::boxed::Box::pin(core::future::ready(())) + } + } + + fn log(&self, level: aimdb_executor::LogLevel, msg: &str) { + use aimdb_executor::{LogLevel, Logger}; + match level { + LogLevel::Debug => Logger::debug(self, msg), + LogLevel::Info => Logger::info(self, msg), + LogLevel::Warn => Logger::warn(self, msg), + LogLevel::Error => Logger::error(self, msg), + } + } +} + +#[cfg(test)] +mod runtime_ops_tests { + use super::*; + use aimdb_executor::RuntimeOps; + use alloc::sync::Arc; + + // Full async contract needs the browser event loop (setTimeout sleep). + #[cfg(target_arch = "wasm32")] + mod wasm { + use super::*; + use wasm_bindgen_test::wasm_bindgen_test; + + wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser); + + #[wasm_bindgen_test] + async fn runtime_ops_contract() { + let ops: Arc = Arc::new(WasmAdapter); + aimdb_executor::test_support::assert_runtime_ops_contract(ops.as_ref()).await; + } + } + + // Native fallback build: cover the sync surface and the dyn coercion. + // `log` is excluded — the wasm Logger forwards to web_sys console, which + // panics off-target (pre-existing); the browser test covers it. + #[cfg(not(target_arch = "wasm32"))] + #[test] + fn runtime_ops_sync_surface() { + let ops: Arc = Arc::new(WasmAdapter); + assert_eq!(ops.name(), "wasm"); + let t0 = ops.now_nanos(); + assert!(ops.now_nanos() >= t0); + assert_eq!(ops.unix_time(), None); + } +}