You have config in NATS JetStream: routing tables, TLS certs, WASM configs. Edge nodes need a local copy, kept in sync, that survives restarts without replaying the full stream.
Slipstream materializes a NATS JetStream KV bucket into a local fold on each consumer. A watch cursor (a stream sequence number) tracks position in the change stream; on restart, only the delta since the last checkpoint arrives from NATS.
NATS is a bounded log. Entries are evicted past max_bytes and max_age. Once retention compacts past a cursor, there is no replay path from NATS. The local fold is the durable state; folds across the fleet are the only full replicas.
NATS JetStream KV
┌──────────────────────────────────────────────────────┐
│ [evicted] ◄──── seq 998 seq 999 seq 1000 ──────► │ max_bytes / max_age
└───────────────────────────┬──────────────────────────┘
│ KvUpdate stream
▼
watch_applied()
┌─────────────────────┐
│ parse │
│ apply() │ ← your domain logic
│ cursor = seq 1000 │ advances after apply() returns
└──────────┬──────────┘
│
┌──────────▼──────────┐
│ local fold │ folds are the only full replicas
│ cursor = 1000 │ once NATS evicts past cursor
└──────────┬──────────┘
┌───────────────┴──────────────────────┐
│ │
▼ ▼
restart new node / cursor expired
resume from cursor = 1000 export fold to object storage
NATS delivers seq 1001+ only import + resume from embedded cursor
The cursor advances after apply() returns, not on receipt. A crash between delivery and application re-delivers on the next start instead of silently skipping. watch_applied enforces this invariant.
For folds that outgrow RAM, fjall (pure Rust) and rocksdb backends hold state on disk.
[dependencies]
beyond-slipstream = "0.5"On-disk snapshot backends are opt-in cargo features:
beyond-slipstream = { version = "0.5", features = ["fjall"] } # pure-Rust LSM, no C toolchain
beyond-slipstream = { version = "0.5", features = ["rocksdb"] } # RocksDB (requires C++ toolchain + libclang)
beyond-slipstream = { version = "0.5", features = ["transport"] } # export/import via object_store (S3, GCS, local)| Term | What It Is |
|---|---|
Connection |
NATS connection lifecycle + store factory |
KvStore |
Named bucket. Vends reader, watcher, writer |
KvReader |
Point-in-time reads: get, entry, keys, scan |
KvWatcher |
Live update stream via channel |
KvWriter |
Write, soft-delete, CAS (create, update, delete_with_version) |
WatchCursor |
Opaque position in a watch stream. Save it; pass it back on reconnect |
VersionToken |
Opaque version — NATS: u64 revision; FDB: 10-byte versionstamp |
KvEntry |
One key + value + version from a read |
KvUpdate |
One watch event: Put, Delete, or Purge |
Snapshot |
Deduplicated KV state + cursor at a point in time. Disk cache, not source of truth |
SnapshotWriter |
Append-only log of KvUpdates; survives restarts without a full NATS scan |
SnapshotStore |
Trait: the durable-fold contract — apply (data + cursor, atomically), load, get, range |
AppendLogSnapshot |
Default SnapshotStore: the append-only log + an in-RAM fold (pure-Rust, small state) |
FjallSnapshot |
On-disk SnapshotStore for folds too large for RAM; queryable (feature = "fjall") |
RocksDbSnapshot |
Same contract on RocksDB, for consumers who prefer the C++ LSM (feature = "rocksdb") |
watch_applied |
Watch loop that advances the cursor only after your apply returns, folding into any SnapshotStore |
ConnectionCapabilities |
Feature flags for runtime branching (CAS, streaming watch, global ordering) |
use slipstream::{Connection, NatsConnection, NatsConnectionConfig};
let conn = NatsConnection::new(NatsConnectionConfig {
url: "nats://localhost:4222".into(),
creds: None,
creds_file: None,
});
conn.connect().await?;use slipstream::{StoreConfig, StorageType};
use std::time::Duration;
let store = conn.store_with_config(StoreConfig {
name: "nodes".into(),
storage: StorageType::Persistent,
max_bytes: Some(512 * 1024 * 1024), // required by Synadia Cloud
max_history: Some(1),
max_age: Some(Duration::from_secs(30 * 24 * 3600)),
num_replicas: Some(3), // HA clusters
..Default::default()
}).await?;max_bytes is required on Synadia Cloud. Omit only for self-hosted NATS.
use slipstream::KvReader;
let reader = store.reader();
// Single key — filters tombstones; use entry() to include them for CAS
if let Some(entry) = reader.get("node.us-east-1").await? {
println!("{}: {:?}", entry.key, entry.version);
}
// All entries under prefix
// Uses DeliverPolicy::LastPerSubject: one NATS consumer, not N round-trips.
let entries = reader.scan("node.").await?;
// Key names only (no value transfer)
let keys = reader.keys("node.").await?;use slipstream::KvWriter;
let writer = store.writer().expect("store is writable");
// Unconditional write
let version = writer.put("node.us-east-1", &payload).await?;
// Create only. Returns AlreadyExists if key has a live value.
let version = writer.create("lock.migration", &payload).await?;
// CAS update. Returns RevisionMismatch if version doesn't match.
let new_version = writer.update("node.us-east-1", &payload, &version).await?;
// CAS delete. Returns RevisionMismatch on conflict.
writer.delete_with_version("node.us-east-1", &version).await?;
// Best-effort delete — returns Ok(true) even if key didn't exist.
writer.delete("node.us-east-1").await?;use slipstream::{KvUpdate, KvWatcher};
let watcher = store.watcher().expect("store supports streaming");
let (tx, mut rx) = tokio::sync::mpsc::channel(128);
// Watches are state-sync streams: the current value of every matching key is
// delivered first (as puts), then live updates. No separate scan needed — and
// no scan-to-watch race window.
//
// watch_all blocks until the stream ends — run it in a separate task
tokio::spawn(async move {
watcher.watch_all(tx).await.unwrap();
});
while let Some(update) = rx.recv().await {
match update {
KvUpdate::Put(entry) => { /* ... */ }
KvUpdate::Delete { key, version } => { /* ... */ }
KvUpdate::Purge { key, version } => { /* ... */ }
}
}Dropping rx cancels the watch. The watcher task exits and unsubscribes automatically.
The cursor is a sequence number. Persist it; pass it back on reconnect. NATS delivers only the delta since that position.
let cursor = load_cursor().unwrap_or(WatchCursor::none());
match watcher.watch_all_from(&cursor, tx.clone()).await {
Ok(()) => {}
Err(KvError::CursorExpired) => {
// NATS compacted past the cursor. Full replay required.
watcher.watch_all(tx).await?;
}
Err(e) => return Err(e.into()),
}watch_prefix_from() works the same way for prefix-filtered streams, and
watch_prefixes_from() resumes the union of several prefixes on one
multi-filter consumer.
For services that cache KV state locally, the snapshot persists both state and cursor to disk. On restart, load the snapshot and resume the watch from its cursor — only the delta since the last checkpoint arrives from NATS.
use slipstream::snapshot;
if let Some(snap) = snapshot::load(Path::new("/var/lib/svc/state.snap"))? {
for (key, entry) in snap.entries {
cache.insert(key, entry.value);
}
watcher.watch_all_from(&snap.cursor, tx).await?;
} else {
watcher.watch_all(tx).await?;
}use slipstream::snapshot::SnapshotWriter;
let mut snap = SnapshotWriter::open(
Path::new("/var/lib/svc/state.snap"),
10 * 1024 * 1024, // compact after 10MB of appended records
)?;
while let Some(update) = rx.recv().await {
cache.apply(&update);
snap.write_update(&update); // buffered, no I/O
// checkpoint() flushes + syncs to disk; returns true when compaction is due
if snap.checkpoint(¤t_cursor)? {
// compact() is blocking I/O; run via spawn_blocking in async contexts
tokio::task::spawn_blocking(move || snap.compact()).await??;
}
}This loop has a trap: current_cursor must track what cache.apply() has consumed, not what rx.recv() delivered. Get it wrong and a crash skips updates on resume. watch_applied runs this loop for you with that invariant enforced.
The snapshot is a cache. Delete it and the service falls back to full replay on next start.
Header: b"PGSS" ++ version:u16le
Record: crc32:u32le ++ type:u8 ++ payload
| Record type | Byte | Payload |
|---|---|---|
REC_PUT |
0x01 | key_len:u16le ++ key ++ value_len:u32le ++ value ++ ver_len:u8 ++ version_bytes |
REC_DELETE |
0x02 | key_len:u16le ++ key ++ ver_len:u8 ++ version_bytes |
REC_CURSOR |
0x03 | cursor_len:u8 ++ cursor bytes |
version_bytes is the raw [VersionToken] bytes (≤10), not a fixed u64, so NATS revisions (8 bytes) and FDB versionstamps (10 bytes) both round-trip intact.
A truncated final record (crash mid-write) is discarded; earlier records are intact. A CRC failure mid-file returns SnapshotError::Corrupted.
The durable fold is a trait, [SnapshotStore], so a consumer picks where its fold lives. The contract is small — apply a batch and advance the cursor atomically, resume from the cursor on restart, and query the result:
pub trait SnapshotStore: Sized + Send {
fn load(path: &Path) -> Result<(WatchCursor, Self), SnapshotError>;
fn apply(&mut self, batch: &[KvUpdate], cursor: &WatchCursor) -> Result<(), SnapshotError>;
fn get(&self, key: &str) -> Result<Option<KvEntry>, SnapshotError>;
fn range(&self, prefix: &str) -> Result<Vec<KvEntry>, SnapshotError>;
}Every backend keeps the same invariants: the fold is a pure function of the log (delete the store, replay from the cursor, get identical state), the cursor never names a revision whose data isn't durable (cursor-after-apply), and the store is a cache — a tail lost to power loss is rebuilt by resuming the watch.
| Backend | When | Notes |
|---|---|---|
AppendLogSnapshot |
Default. Fold fits in RAM (edge/tunnel-style services) | Pure-Rust, the append-only log above plus an in-RAM map serving get/range. No extra dependencies. |
FjallSnapshot |
Fold too large for RAM (e.g. routing at ~1B keys) | On-disk fjall LSM, feature = "fjall". Pure-Rust. Each apply is one atomic batch (data and cursor); durability (NO_SYNC vs fsync) is configurable. |
RocksDbSnapshot |
Same as FjallSnapshot, preferring the battle-tested C++ LSM and its tooling (ldb, sst_dump) |
On-disk RocksDB, feature = "rocksdb". Each apply is one atomic WriteBatch (data and cursor); WAL always on, per-commit fsync configurable. Tuned for billion-key route folds (hit-optimized ribbon filters, partitioned index, zstd bottommost, batched multi_get). Builds C++ (needs a toolchain + libclang). |
Pick a backend, then hand it to watch_applied — load returns the resume cursor alongside the store:
use slipstream::{AppendLogSnapshot, SnapshotStore};
// Default in-RAM backend:
let (resume, store) = AppendLogSnapshot::load(Path::new("/var/lib/svc/state.snap"))?;
// Or, behind `feature = "fjall"`, an on-disk fold for a large consumer:
// let (resume, store) = FjallSnapshot::open(dir, FjallConfig { sync: false, ..Default::default() })?;
// Or the same on RocksDB, behind `feature = "rocksdb"`:
// let (resume, store) = RocksDbSnapshot::open(dir, RocksDbConfig { sync: false, ..Default::default() })?;
let final_cursor = watch_applied(
watcher, WatchScope::All, Some(resume),
Some(reader), // arms the cursor-expired stale-key resync; None to skip
Some(store), None, // store; export-request channel
BatchConfig::default(),
parse, apply, on_applied, shutdown,
).await?;The trait stops at durable fold + cursor + query. Serving structures built from the fold (routing rings, hashrings, indexes) live in the consumer — query them out of the store with get/range. A consumer with a different engine can implement SnapshotStore itself; the rest of slipstream is unchanged.
watch_applied drives the watch-batch-apply-checkpoint loop and enforces one rule the hand-rolled version can't: the cursor advances only after your apply returns, never on receipt. It is generic over the SnapshotStore backend, so the consumer chooses where the durable fold lives (or None to run without persistence).
use slipstream::{watch_applied, AppendLogSnapshot, BatchConfig, KvUpdate, WatchCursor, WatchScope};
let final_cursor = watch_applied(
watcher,
WatchScope::All, // or Prefix("node.".into()) / Prefixes(vec![...])
Some(resume), // Option<WatchCursor> — resume here, or None
Some(reader), // Option<Arc<dyn KvReader>> — arms the
// cursor-expired stale-key resync, or None
Some(store), // any SnapshotStore (e.g. AppendLogSnapshot), or None
None, // Option<mpsc::Receiver<ExportRequest>> — live exports
BatchConfig::default(), // 10ms window, 100 updates per batch
|update: &KvUpdate| parse(update), // KvUpdate -> Option<U>; None just drops it
|batch: Vec<U>| cache.apply_batch(batch), // your only domain logic
|cursor: WatchCursor| persist(cursor), // fires after apply returns
shutdown, // tokio::sync::watch::Receiver<bool>
).await?;A batch closes when window elapses or it hits max updates, whichever comes first. Then, in order: apply(batch) runs to completion, the cursor advances to the batch's highest revision, the batch + cursor are folded into the store atomically (on a blocking task), and on_applied fires.
Persist the cursor on receipt instead and a crash between receive and apply loses data: the cursor reads "caught up to rev N" while rev N sits in an unapplied buffer, and the next resume starts past it. watch_applied checkpoints at the applied cursor, so a persisted cursor always means every update up to it has been applied.
parsereturningNone(corrupt bytes, irrelevant key) still advances the cursor — nothing to apply means nothing to skip.- On
CursorExpired, it falls back to a full watch automatically. With areaderwired, it first diffs the fold against the bucket's live keys and applies synthetic deletes for keys that vanished during the gap (their delete markers were evicted with the cursor) — the one case the fallback re-list can't cover. - It returns the final applied cursor on shutdown or stream close.
apply runs inline. If it panics, the panic aborts the watch.
| Concept | NATS primitive |
|---|---|
| Store | JetStream KV bucket (KV_{name} stream) |
VersionToken |
Per-key revision (u64, big-endian) |
WatchCursor |
NATS revision at last checkpoint |
delete() |
Writes empty value (soft delete). Always returns Ok(true) |
KvUpdate::Purge |
Hard delete: all history removed from stream |
scan() |
DeliverPolicy::LastPerSubject: one entry per key, one consumer |
watch_*() |
DeliverPolicy::LastPerSubject: current state, then live updates |
watch_prefix() |
Native NATS subject filter ({prefix}> wildcard) |
let caps = conn.capabilities();
if caps.cas { /* safe to call create/update/delete_with_version */ }
if caps.streaming_watch { /* watcher() is Some */ }
if caps.prefix_watch { /* watch_prefix() uses a server-side filter */ }
if caps.global_ordering { /* VersionToken is globally ordered across keys */ }| Error | Cause | Recovery |
|---|---|---|
NotConnected |
Operation before connect() |
Call connect() |
AlreadyExists |
create() on a live key |
Read current state, decide |
RevisionMismatch |
CAS conflict on update() / delete_with_version() |
Re-read, retry |
CursorExpired |
watch_*_from() cursor compacted by NATS |
Fall back to watch_all() |
WatchError |
NATS stream dropped | Re-subscribe |
Priority order, first match wins:
creds: base64-encoded.credscontent (containers, ECS)creds_file: path to.credson disk (bare-metal, local dev)- URL-embedded
user:pass@host - No auth