From 8044220239e8bc72d62ddd0f719c7bd16fb15765 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Fri, 22 May 2026 17:32:19 +0200 Subject: [PATCH 1/6] add cache plan --- add-optional-cache.md | 308 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 308 insertions(+) create mode 100644 add-optional-cache.md diff --git a/add-optional-cache.md b/add-optional-cache.md new file mode 100644 index 0000000..f9b19d1 --- /dev/null +++ b/add-optional-cache.md @@ -0,0 +1,308 @@ +--- +sessionId: session-260522-134315-4jqi +isActive: true +--- + +# Requirements + +### Overview & Goals +Implement an **optional cache** for `borgstore` so that operations against remote backends (`sftp`, `rclone`, `s3`, `rest`) can be served from a fast local backend (typically `posixfs`) when the data is safe to cache. This addresses issue #166. + +The cache is implemented at the **Store** level (not inside the `REST` backend), so it benefits every remote backend uniformly and reuses the existing `BackendBase` interface for cache storage. + +### Scope + +**In Scope (v1):** +- Optional cache, enabled via new arguments to `Store.__init__`. +- Cache is restricted to namespaces declared as content-hash addressed (i.e. `name == hash(content)`). +- Per-namespace mode declared via a three-valued enum `CacheMode` with members `C_OFF`, `C_CACHE`, `C_MIRROR`: + - `C_OFF` — bypass the cache entirely (default for unlisted namespaces). + - `C_CACHE` — full read-through + write-through. Reads check the cache first; on miss, fetch from primary and populate the cache. Writes go to primary and then to cache. + - `C_MIRROR` — **always** read from the primary backend, but mirror the bytes into the cache on every read (in addition to write-through on `store`). Reads never short-circuit on a cache hit; the cache is purely a side-effect copy that tracks the primary. +- The cache uses the **same nested name as the primary backend**, including the `DEL_SUFFIX` (`.del`) for soft-deleted items. There is **no separate "live" cache key**: soft-delete/undelete `move` calls also rename the cache entry, just like the primary backend. +- The `cache` dict accepts either `CacheMode` members or their string aliases (`"off"`, `"cache"`, `"mirror"`, case-insensitive). String values are remapped to the enum at construction time. +- Cache backend may be specified by URL (`cache_url=`) or as a pre-built `BackendBase` instance (`cache_backend=`). +- Stats: extend `Store.stats()` with cache counters. +- Cache lifecycle (`open`/`close`/`create`/`destroy`) is tied to the Store's lifecycle. + +**Out of Scope (v1):** caching of non-hash namespaces, list/info/hash/find caching, LRU eviction, background revalidation, per-call no-cache override, REST-backend-specific HTTP cache. + +### User Stories +- *As a borg user* with a slow remote backend, I want repeated reads of the same hashed object to come from a local cache. +- *As an integrator*, I want to opt into caching per-namespace via `CacheMode`, passing either enum members or simple string aliases. +- *As an operator*, I want `Store.stats()` to expose cache hits/misses. + +### Functional Requirements +- Caching is **off by default**; backwards compatible. +- New `cache: dict[str, CacheMode | str]` argument; missing namespaces default to `C_OFF`. +- String values in the `cache` dict are normalized at `Store.__init__` time: `"off" → C_OFF`, `"cache" → C_CACHE`, `"mirror" → C_MIRROR` (case-insensitive). Unknown strings raise `ValueError`. +- New public `CacheMode` enum is exposed at module level (importable from `borgstore.store`). +- Any namespace with mode ≠ `C_OFF` must exist in `levels` (validated at construction). +- `load`: + - `C_CACHE` — read cache first using the same nested name the primary backend would see (including `DEL_SUFFIX` for soft-deleted items). On miss, fetch from primary, populate cache, return slice. + - `C_MIRROR` — always fetch from primary; after a successful fetch, mirror the bytes into the cache (under the same nested name as primary, including `DEL_SUFFIX`). + - `C_OFF` — no cache interaction. +- `store`: for `C_CACHE` and `C_MIRROR`, populate cache after primary write succeeds (cache key = primary nested name). +- `delete`: for `C_CACHE` and `C_MIRROR`, invalidate the cache entry under the exact same nested name that was deleted in the primary (so `delete(..., deleted=True)` invalidates the `.del` cache entry). +- `move`: for `C_CACHE` and `C_MIRROR`, mirror the move on the cache backend. Soft-delete (`move(delete=True)`) and soft-undelete (`move(undelete=True)`) therefore also touch the cache, renaming `foo`↔`foo.del` to keep cache and primary in lockstep. +- `find`, `info`, `hash`, `list`: unchanged. +- Stale-cache policy: passive (no proactive validation). + +### Non-Functional Requirements +- No regression for callers that do not enable the cache. +- Atomicity via backends' atomic `store`. +- Concurrency-safe at the cache-directory level (relies on backend atomicity). +- Cache failures never break the main operation (caught, logged at WARNING). + +# Technical Design + +### Current Implementation +- `Store` (`src/borgstore/store.py`) wraps a single `BackendBase` and adds nesting, soft-delete, stats, and latency/bandwidth emulation. Constructor accepts `url`/`backend`, `levels`, `permissions`. +- `BackendBase` (`src/borgstore/backends/_base.py`) defines the minimal contract: `create/destroy/open/close/mkdir/rmdir/info/load/store/delete/move/list/hash/defrag/quota`. +- `get_backend(url, ...)` returns a backend instance for any supported URL scheme; reused for the cache backend. +- `Store.find(name, deleted=...)` probes nesting levels (and appends `DEL_SUFFIX` when `deleted=True`) to find an item. Stats are tracked via `_stats_updater`/`_stats_update_volume`. + +### Key Decisions +1. **Cache lives in `Store`, not in the REST backend.** Composes naturally because the cache is itself just another `BackendBase`. *(Confirmed)* +2. **Per-namespace mode via a `CacheMode` enum** (`C_OFF`, `C_CACHE`, `C_MIRROR`) in a new `cache` dict. *(Confirmed)* +3. **Cache backend specified by URL or instance** (`cache_url=` or `cache_backend=`), symmetric to the primary backend. +4. **`C_CACHE` = read-through + write-through; `C_MIRROR` = always read primary AND populate cache on every read; `C_OFF` = bypass.** Delete/move invalidate/rename the cache entry in lockstep. +5. **Cache fully mirrors the primary's nested names — including `DEL_SUFFIX`.** Soft-deleted items in the cache live under `.del`, exactly as in the primary. `move(delete=True)`/`move(undelete=True)` therefore also rename the cache entry. *(Confirmed)* +6. **`cache` dict accepts strings.** `"off" / "cache" / "mirror"` are remapped to `CacheMode` members at construction. Keeps caller configs ergonomic. *(Confirmed)* +7. **`C_MIRROR` populates the cache from every primary read too.** Every byte that flows out of the primary also flows into the cache, but the cache is never trusted as an authoritative source for reads. *(Confirmed)* +8. **Cache failures are non-fatal**, caught and logged. +9. **No eviction in v1.** + +### Proposed Changes + +**`src/borgstore/store.py`** +- Add a `CacheMode` enum with members `C_OFF`, `C_CACHE`, `C_MIRROR`, and a `CacheMode.from_str(value)` classmethod for string-alias normalization. +- Extend `Store.__init__` with kwargs `cache: Optional[dict[str, CacheMode | str]]`, `cache_url: Optional[str]`, `cache_backend: Optional[BackendBase]`. +- Normalize the `cache` dict at construction via `CacheMode.from_str`; reject unknown strings with `ValueError`. +- Validate: if any non-`C_OFF` cache mode is given, require exactly one of `cache_url`/`cache_backend`; all non-`C_OFF` namespaces must be in `levels`. +- Materialize `self.cache_backend` and store `self.cache_namespaces` as `(namespace, mode)` tuples sorted longest-prefix-first. +- Helpers: + - `_cache_mode_for(name) -> CacheMode` (returns `C_OFF` if no match). + - `_cache_get(nested_name)`, `_cache_put(nested_name, value)`, `_cache_invalidate(nested_name)`, `_cache_move(old_nested, new_nested)` — all wrap backend calls in try/except, log WARNING, update stats. The **cache key is always the same nested name that the primary backend sees**, including any `DEL_SUFFIX`. +- Modify `load`: + - `C_CACHE`: try cache first using `nested_name = self.find(name, deleted=deleted)`; on miss, fetch full from primary, populate cache, then slice. + - `C_MIRROR`: fetch from primary; on success, `_cache_put(nested_name, full_value)`; return slice. + - `C_OFF`: unchanged. +- Modify `store`: after successful primary store, `_cache_put(nested_name, value)` for `C_CACHE` and `C_MIRROR`. +- Modify `delete`: after successful primary delete, `_cache_invalidate(nested_name)` for `C_CACHE`/`C_MIRROR`. +- Modify `move`: after successful primary move, mirror it on the cache via `_cache_move(old_nested, new_nested)` — applies uniformly to soft-delete, soft-undelete, generic rename, and `change_level`. +- Lifecycle: `create/destroy/open/close/create_levels` are also applied to the cache backend. +- Extend `Store.stats()` with cache counters; update `__repr__`. + +**No changes** to: `BackendBase` interface, individual backend implementations, REST backend. + +### Data Models / Contracts +```python +class Store: + def __init__( + self, + url: Optional[str] = None, + backend: Optional[BackendBase] = None, + levels: Optional[dict] = None, + permissions: Optional[dict] = None, + *, + cache: Optional[dict[str, "CacheMode | str"]] = None, + cache_url: Optional[str] = None, + cache_backend: Optional[BackendBase] = None, + ): ... + + +class CacheMode(enum.Enum): + C_OFF = "off" # bypass cache (default for unlisted namespaces) + C_CACHE = "cache" # read-through + write-through + C_MIRROR = "mirror" # always read from primary, but mirror every read AND every write into the cache + + @classmethod + def from_str(cls, value): + if isinstance(value, cls): + return value + if isinstance(value, str): + try: + return cls(value.lower()) + except ValueError: + raise ValueError(f"unknown CacheMode: {value!r}") + raise TypeError(f"CacheMode value must be CacheMode or str, got {type(value).__name__}") +``` + +New stats fields: `cache_hits`, `cache_misses`, `cache_errors`, `cache_bytes_read`, `cache_bytes_written`, `cache_hit_ratio`. + +Usage example: +```python +from borgstore.store import Store, CacheMode + +store = Store( + url="sftp://user@host/repo", + levels={"data": [2], "meta": [1], "config": [0]}, + cache={ + "data": CacheMode.C_CACHE, # read-through + write-through + "meta": "mirror", # string alias -> CacheMode.C_MIRROR + "config": "off", # string alias -> CacheMode.C_OFF + }, + cache_url="file:///home/u/.cache/borgstore/", +) +``` + +### Components +- **`Store` (modified)** — owns cache lifecycle and policy. +- **Cache backend** (any `BackendBase`, unmodified, reused as-is). +- **`get_backend()`** (reused) — parses `cache_url`. +- **`utils.nesting.nest`** (reused) — same nested names. + +### File Structure +Modified: +- `src/borgstore/store.py` +- `docs/index.rst` (TOC), `docs/backends.rst` (cross-ref), `docs/changes.rst` (changelog). + +Added: +- `docs/store_caching.rst` — dedicated docs page. +- `tests/test_cache.py` — new test module dedicated to cache behavior. + +### Architecture Diagram +```mermaid +graph LR + Caller -->|load/store/delete/move| Store + Store -->|read miss / write-through / invalidate / rename| MainBackend[Main backend
sftp/s3/rclone/rest/posixfs] + Store -->|C_CACHE: hit / populate-on-miss
C_MIRROR: populate-on-every-read
invalidate on delete
rename on move| CacheBackend[Cache backend
BackendBase, typ. posixfs] + CacheBackend -.->|same nested names incl. .del| MainBackend +``` + +### Risks +- **Cache divergence under concurrent multi-client deletes.** Stale-present is still correct content (content-addressed). +- **Unbounded cache growth.** Documented; LRU deferred. +- **Cache backend failure mid-op.** Wrapped; degrades gracefully. + +# Testing + +### Validation Approach +Add a new `tests/test_cache.py` module that constructs a `Store` with a `tmp_path`-backed POSIX main backend and a separate `tmp_path` POSIX cache backend. Cover all three `CacheMode` values and the string-alias config explicitly. + +### Key Scenarios +- **Cache disabled (default)**: existing `tests/test_store.py` tests remain green; no cache directory is created. +- **`C_CACHE` read-through populate**: second `load` does not touch the main backend (spy). +- **`C_CACHE` write-through populate**: identical bytes in both backends under the same nested name. +- **`C_MIRROR` always reads from primary**: two consecutive `load` calls both hit the primary backend (spy), even though both also populate the cache. +- **`C_MIRROR` populates on read**: `load` of an entry present only in the primary DOES create a cache entry afterward. +- **`C_MIRROR` write-through**: after `store(name, value)`, the cache backend has identical bytes. +- **`C_OFF` bypass**: explicit `C_OFF` and unlisted namespaces never touch the cache backend. +- **Invalidate on delete**: cache entry is gone after `delete` for both `C_CACHE` and `C_MIRROR`. +- **Soft-delete renames cache entry**: after `move(name, delete=True)`, the cache entry has moved from `` to `.del`; after `move(name, undelete=True)`, it moves back. +- **`deleted=True` reads use `.del` cache key**: `load(name, deleted=True)` for a soft-deleted item hits the cache entry at `.del` (under `C_CACHE`) or mirrors the primary read into `.del` (under `C_MIRROR`). +- **Generic rename / change_level**: cache entry is renamed in lockstep with the primary. +- **Partial load from cache** (`C_CACHE`): correct slice returned. +- **String aliases**: `cache={"data": "cache", "meta": "MIRROR", "config": "off"}` behaves identically to passing `CacheMode` members; unknown strings raise `ValueError`. +- **Stats**: counters match expected values for `C_CACHE` and `C_MIRROR` workloads (notably, `C_MIRROR` always increments `cache_bytes_written` on read, never `cache_hits`). +- **Lifecycle**: `create()` builds the cache dir tree; `destroy()` removes it. + +### Edge Cases +- Cache backend `load` raising `ObjectNotFound` is a normal miss — not a `cache_error`. +- Cache backend `store`/`move` raising other errors is logged + counted; main op still succeeds. +- `cache` with a non-`C_OFF` entry but no `cache_url`/`cache_backend` → `ValueError`. +- `cache` namespace with mode ≠ `C_OFF` not in `levels` → `ValueError`. +- `cache` containing only `C_OFF` is accepted with no cache backend; behaves like no cache. +- `cache` value of an unknown string (e.g. `"on"`) → `ValueError` from `CacheMode.from_str`. +- `_cache_move` failure on a soft-delete/undelete: log WARNING, count as `cache_error`, primary move still succeeds. + +### Test Changes +- **Add**: `tests/test_cache.py`. +- **Update**: nothing structural in `tests/test_store.py`. +- **Skip**: no real-network tests for cache behavior. + +# Delivery Steps + +### Step 1: Introduce CacheMode enum (with string aliases) and add cache configuration plumbing to Store.__init__ +Store accepts `cache`, `cache_url`, and `cache_backend` kwargs and validates them; a new `CacheMode` enum with string-alias support is exported. No I/O method behavior changes yet. + +- In `src/borgstore/store.py`, define a new `CacheMode` enum with members `C_OFF`, `C_CACHE`, `C_MIRROR` (values `"off"`, `"cache"`, `"mirror"`) and export it at module level. +- Add a `CacheMode.from_str(value)` classmethod that accepts either a `CacheMode` instance or a case-insensitive string alias and raises `ValueError` on unknown strings. +- Add three new keyword-only constructor parameters: `cache: Optional[dict[str, CacheMode | str]]`, `cache_url: Optional[str]`, `cache_backend: Optional[BackendBase]`. +- Normalize the `cache` dict at construction by running each value through `CacheMode.from_str`, producing an internal dict of `CacheMode` members. +- Validate that at most one of `cache_url`/`cache_backend` is given; if the normalized `cache` contains any non-`C_OFF` entry, require one of them. +- Validate that every namespace in `cache` with mode ≠ `C_OFF` is also present in `levels` (raise `ValueError` otherwise). +- Materialize `self.cache_backend` (via `get_backend(cache_url)` or the passed instance, or `None` if no caching) and store `self.cache_namespaces` as a list of `(namespace, CacheMode)` tuples sorted longest-prefix-first. +- Add a `_cache_mode_for(name) -> CacheMode` helper returning `C_OFF` if no match. +- Update `__repr__` to include cache info when configured. +- No existing methods change yet — this step is purely additive and inert. + +### Step 2: Wire cache lifecycle into Store.open/close/create/destroy/create_levels +The cache backend is opened/closed/created/destroyed together with the main backend; pre-creation of nesting levels also applies to the cache. + +- In `Store.create()`, call `self.cache_backend.create()` after the main backend, if a cache backend is configured. +- In `Store.destroy()`, call `self.cache_backend.destroy()` after the main backend. +- In `Store.open()`, call `self.cache_backend.open()`; on failure, log WARNING and disable the cache for the rest of the session (`self._cache_disabled = True`). +- In `Store.close()`, call `self.cache_backend.close()` and swallow errors with a WARNING. +- In `Store.create_levels()`, also pre-create the same level directories on the cache backend for namespaces whose mode is `C_CACHE` or `C_MIRROR`. +- Ensure all of this is a no-op when no cache backend is configured — zero overhead in the default path. + +### Step 3: Implement read-through (C_CACHE) and read-mirroring (C_MIRROR) in Store.load +`Store.load` consults the cache first for `C_CACHE` namespaces and populates the cache on a miss. For `C_MIRROR` namespaces it always reads from the primary but mirrors the bytes into the cache. The cache key is the exact nested name returned by `find(name, deleted=deleted)` — including `DEL_SUFFIX` for soft-deleted items. + +- Add `_cache_get(nested_name) -> Optional[bytes]` and `_cache_put(nested_name, value)` private helpers. Both must catch all backend exceptions, log WARNING, and update the stats counters. +- `_cache_get` returns `None` on miss (treating `ObjectNotFound` as a normal miss, not an error). +- Modify `Store.load(name, *, size=None, offset=0, deleted=False)`: + - Compute `mode = self._cache_mode_for(name)` and `nested_name = self.find(name, deleted=deleted)`. + - If `mode == C_CACHE`: + - Try `_cache_get(nested_name)`; on hit, return `value[offset:offset+size]` (mirroring `backend.load` slicing semantics). + - On miss, call `self.backend.load(nested_name, size=None, offset=0)`, then `_cache_put(nested_name, full_value)`, and return the sliced bytes. + - If `mode == C_MIRROR`: + - Always call `self.backend.load(nested_name, size=None, offset=0)`. + - On success, `_cache_put(nested_name, full_value)` (mirror every read into the cache). + - Return the sliced bytes. + - If `mode == C_OFF`: behave exactly like today's `load`. + - Keep `_stats_update_volume("load", ...)` consistent (count bytes the caller receives). + +### Step 4: Implement write-through, invalidation and move-mirroring in Store.store/delete/move for C_CACHE and C_MIRROR +Cache stays in sync with successful local mutations for both `C_CACHE` and `C_MIRROR` namespaces. The cache key is always the exact nested name used against the primary backend, so soft-deleted items live under `.del` in the cache (mirroring the primary). + +- Add `_cache_invalidate(nested_name)` and `_cache_move(old_nested, new_nested)` private helpers. Both must: + - Call the corresponding `cache_backend` method. + - Swallow `ObjectNotFound`. + - Log other errors at WARNING and increment `cache_errors`. +- Modify `Store.store(name, value)`: after the primary backend's `store` returns successfully, call `_cache_put(nested_name, value)` if mode ∈ {`C_CACHE`, `C_MIRROR`} (where `nested_name = self.find(name)`). +- Modify `Store.delete(name, *, deleted=False)`: after the primary backend's `delete` returns successfully, call `_cache_invalidate(nested_name)` if mode ∈ {`C_CACHE`, `C_MIRROR`}, where `nested_name = self.find(name, deleted=deleted)` (so the deleted-state of the entry matches between primary and cache). +- Modify `Store.move(...)`: + - For **soft-delete** (`move(delete=True)`) and **soft-undelete** (`move(undelete=True)`): after the primary move succeeds, call `_cache_move(old_nested, new_nested)` so the cache renames `foo`↔`foo.del` in lockstep. + - For **generic rename**: same — `_cache_move(old_nested, new_nested)`. + - For **change_level**: same — `_cache_move(old_nested, new_nested)`. +- Ensure all paths skip cache work cleanly when the namespace is `C_OFF` or the cache is disabled. + +### Step 5: Expose cache stats and document the feature in docs/store_caching.rst +`Store.stats()` reports cache effectiveness, and a dedicated docs page describes how to enable and use the cache. + +- Extend `Store.stats()` to surface `cache_hits`, `cache_misses`, `cache_errors`, `cache_bytes_read`, `cache_bytes_written`, and a derived `cache_hit_ratio` (guarded against zero division). Ensure default values appear (0) even when the cache is not configured. +- Add a new `docs/store_caching.rst` page that explains: + - When to enable the cache and the three `CacheMode` values: `C_OFF` (bypass), `C_CACHE` (read-through + write-through), `C_MIRROR` (always read primary, mirror every read AND write into cache). + - That `cache` dict values may be passed as strings (`"off"`, `"cache"`, `"mirror"`, case-insensitive) and are normalized to `CacheMode` members. + - That v1 is restricted to content-hash addressed namespaces. + - That soft-deleted items use the same nested name in the cache as in the primary (cache entry renamed to `.del` on soft-delete and back on undelete). + - No eviction in v1, and the stale-presence caveat under concurrent multi-client deletes. + - The stats counters surfaced by `Store.stats()`. +- Register `store_caching` in the docs TOC (`docs/index.rst`). +- Cross-reference from `docs/backends.rst` (a one-liner). +- Mention in the changelog (`docs/changes.rst`). + +### Step 6: Add tests for cache behavior in tests/test_cache.py +A new dedicated test module verifies all functional requirements end-to-end using two posixfs backends (main + cache) under `tmp_path`, covering all three `CacheMode` values and the string-alias config. + +- Create `tests/test_cache.py` with a fixture building a `Store` with a posixfs main backend and `cache_url="file:///cache"`; parametrize over `CacheMode` values where useful. +- Test: cache disabled by default (no `cache` arg) — no cache directory or files appear. +- Test (`C_CACHE`): `store → load → load` causes exactly one main-backend `load` and one cache `load` (spy on the main backend). +- Test (`C_CACHE`): write-through — after `store(name, value)`, the cache backend has identical bytes at the same nested name. +- Test (`C_CACHE`): partial load served from cache returns the exact slice the backend would have returned. +- Test (`C_MIRROR`): two consecutive `load` calls both hit the main backend (spy), and the cache contains the bytes after either call. +- Test (`C_MIRROR`): `load` of an entry present only in the main backend DOES create a cache entry (mirror-on-read). +- Test (`C_MIRROR`): `store` populates the cache with identical bytes. +- Test (`C_OFF`): explicit `C_OFF` and unlisted namespaces never touch the cache backend on any op. +- Test: string aliases — `cache={"data": "cache", "meta": "MIRROR", "config": "off"}` is normalized to enum members and behaves identically; an unknown string raises `ValueError`. +- Test: invalidation on `delete(name)` for both `C_CACHE` and `C_MIRROR`. +- Test: soft-delete (`move(delete=True)`) renames the cache entry from `` to `.del`; soft-undelete reverses this — both verified by direct inspection of the cache backend. +- Test: generic rename and `change_level` rename the cache entry in lockstep with the primary. +- Test: `load(name, deleted=True)` for a soft-deleted item uses the `.del` cache key (verified via spy that primary's `load` is/isn't called per mode). +- Test: `stats()` shows `cache_hits`, `cache_misses`, `cache_bytes_read`, `cache_bytes_written` with expected values for both modes (`C_MIRROR` increments `cache_bytes_written` on every read, never `cache_hits`). +- Test: misconfiguration — non-`C_OFF` `cache` entry without `cache_url`/`cache_backend` raises `ValueError`; cache namespace ≠ `C_OFF` missing from `levels` raises `ValueError`. +- Test: `cache` dict containing only `C_OFF` entries (including string `"off"`) is accepted without `cache_url`/`cache_backend` and behaves identically to no cache. +- Test: cache-backend error during `_cache_put`/`_cache_move` does NOT fail the surrounding `Store.store`/`Store.move` call (monkeypatch the cache backend's method to raise). \ No newline at end of file From 12f9d002dd2c8dd1ec4d1fe952094ff54e89c6b2 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Fri, 22 May 2026 18:01:15 +0200 Subject: [PATCH 2/6] cache implementation --- docs/backends.rst | 2 + docs/index.rst | 1 + docs/store.rst | 2 + docs/store_caching.rst | 49 +++++++++ src/borgstore/store.py | 170 ++++++++++++++++++++++++++++- tests/test_cache.py | 241 +++++++++++++++++++++++++++++++++++++++++ 6 files changed, 461 insertions(+), 4 deletions(-) create mode 100644 docs/store_caching.rst create mode 100644 tests/test_cache.py diff --git a/docs/backends.rst b/docs/backends.rst index 4e06264..9fc9f1b 100644 --- a/docs/backends.rst +++ b/docs/backends.rst @@ -6,6 +6,8 @@ basic operations. Existing backends are listed below; more might come in the future. +See also :doc:`store_caching` for optional Store-level caching with a secondary backend. + posixfs ------- diff --git a/docs/index.rst b/docs/index.rst index c1e7848..81f51cf 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -7,6 +7,7 @@ installation store + store_caching backends servers changes diff --git a/docs/store.rst b/docs/store.rst index 657df53..cf5e37e 100644 --- a/docs/store.rst +++ b/docs/store.rst @@ -28,6 +28,8 @@ API can be much simpler: Store operations (and per-op timing and volume) are logged at DEBUG log level. +See also :doc:`store_caching` for optional Store-level caching with a secondary backend. + Keys ---- diff --git a/docs/store_caching.rst b/docs/store_caching.rst new file mode 100644 index 0000000..37cfb83 --- /dev/null +++ b/docs/store_caching.rst @@ -0,0 +1,49 @@ +Store caching +============= + +The ``Store`` can optionally use a second backend as a local cache for selected +namespaces, which is especially useful when the primary backend is remote +slower or otherwise more "expensive" than the cache. + +Configuration +------------- + +- ``cache_url`` or ``cache_backend``: where cached data is stored +- ``cache``: mapping of namespace to cache mode + +Cache modes are configured with ``CacheMode`` or string aliases: + +- ``CacheMode.C_OFF`` or ``"off"``: bypass cache completely. +- ``CacheMode.C_MIRROR`` or ``"mirror"``: always read from primary backend, + but update the cache after successful primary backend reads and writes. +- ``CacheMode.C_CACHE`` or ``"cache"``: read-through + write-through. + For now, only content-hash addressed namespaces should use this mode. + +Behavior +-------- + +- Cache keys are identical to primary backend keys (same nesting). +- Soft-deleted items are cached under the same ``.del`` name as primary. +- Soft delete/undelete (``move(delete=True|undelete=True)``) renames cache + entries in lockstep with primary backend names. +- Cache failures are non-fatal and logged as warnings. + +Limitations +----------- + +- No cache eviction. +- No proactive cache validation/revalidation. +- If an object is deleted in the primary backend by another client, the local + cache will still have a stale object. + +Statistics +---------- + +``Store.stats`` includes cache counters: + +- ``cache_hits`` +- ``cache_misses`` +- ``cache_errors`` +- ``cache_bytes_read`` +- ``cache_bytes_written`` +- ``cache_hit_ratio`` diff --git a/src/borgstore/store.py b/src/borgstore/store.py index f66b8c1..06dc107 100644 --- a/src/borgstore/store.py +++ b/src/borgstore/store.py @@ -12,6 +12,7 @@ from binascii import hexlify from collections import Counter from contextlib import contextmanager +import enum import logging import os import time @@ -30,6 +31,23 @@ logger = logging.getLogger(__name__) +class CacheMode(enum.Enum): + C_OFF = "off" + C_CACHE = "cache" + C_MIRROR = "mirror" + + @classmethod + def from_str(cls, value): + if isinstance(value, cls): + return value + if isinstance(value, str): + try: + return cls(value.lower()) + except ValueError as err: + raise ValueError(f"unknown CacheMode: {value!r}") from err + raise ValueError(f"unknown CacheMode: {value!r}") + + def get_backend(url, permissions=None, quota=None): """Parse backend URL and return a backend instance (or None).""" backend = get_file_backend(url, permissions=permissions, quota=quota) @@ -66,6 +84,10 @@ def __init__( backend: Optional[BackendBase] = None, levels: Optional[dict] = None, permissions: Optional[dict] = None, + *, + cache: Optional[dict[str, CacheMode | str]] = None, + cache_url: Optional[str] = None, + cache_backend: Optional[BackendBase] = None, ): self.url = url if backend is None and url is not None: @@ -76,6 +98,32 @@ def __init__( raise NoBackendGiven("You need to give a backend instance or a backend url.") self.backend = backend self.set_levels(levels) + if cache_url is not None and cache_backend is not None: + raise ValueError("Only one of cache_url and cache_backend can be given.") + cache = cache or {} + normalized_cache = {namespace: CacheMode.from_str(mode) for namespace, mode in cache.items()} + has_enabled_namespaces = any(mode != CacheMode.C_OFF for mode in normalized_cache.values()) + configured_namespaces = {namespace for namespace, _ in self.levels} + for namespace, mode in normalized_cache.items(): + if mode != CacheMode.C_OFF and namespace not in configured_namespaces: + raise ValueError(f"Invalid cache namespace configuration: {namespace!r} not in levels.") + if has_enabled_namespaces and cache_url is None and cache_backend is None: + raise ValueError("cache_url or cache_backend is required for cache modes other than C_OFF.") + self.cache_backend = cache_backend if cache_backend is not None else None + if self.cache_backend is None and cache_url is not None: + self.cache_backend = get_backend(cache_url) + if self.cache_backend is None: + raise BackendURLInvalid(f"Invalid or unsupported Cache Backend URL: {cache_url}") + self._cache_disabled = False + self.cache = normalized_cache + self.cache_namespaces = [ + entry + for entry in sorted( + ((namespace, mode) for namespace, mode in normalized_cache.items() if mode != CacheMode.C_OFF), + key=lambda item: len(item[0]), + reverse=True, + ) + ] self._stats: Counter = Counter() # this is to emulate additional latency to what the backend actually offers: self.latency = float(os.environ.get("BORGSTORE_LATENCY", "0")) / 1e6 # [us] -> [s] @@ -83,8 +131,20 @@ def __init__( self.bandwidth = float(os.environ.get("BORGSTORE_BANDWIDTH", "0")) / 8 # [bits/s] -> [bytes/s] def __repr__(self): + if self.cache_backend is not None or self.cache_namespaces: + cache_backend = self.cache_backend.__class__.__name__ if self.cache_backend is not None else None + return ( + f"" + ) return f"" + def _cache_mode_for(self, name: str) -> CacheMode: + for namespace, mode in self.cache_namespaces: + if name.startswith(namespace): + return mode + return CacheMode.C_OFF + def set_levels(self, levels: dict, create: bool = False) -> None: if not levels or not isinstance(levels, dict): raise ValueError("No or invalid levels configuration given.") @@ -101,9 +161,16 @@ def create_levels(self): for namespace, levels in self.levels: namespace = namespace.rstrip("/") level = max(levels) + cache_enabled = ( + self.cache_backend is not None + and not self._cache_disabled + and self._cache_mode_for(f"{namespace}/") in {CacheMode.C_CACHE, CacheMode.C_MIRROR} + ) if level == 0: # flat, we just need to create the namespace directory: self.backend.mkdir(namespace) + if cache_enabled: + self.cache_backend.mkdir(namespace) elif level > 0: # nested, we only need to create the deepest nesting dir layer, # any missing parent dirs will be created as needed by backend.mkdir. @@ -113,16 +180,22 @@ def create_levels(self): name = f"{namespace}/{dir}" if namespace else dir nested_name = nest(name, level) self.backend.mkdir(nested_name[: -2 * level - 1]) + if cache_enabled: + self.cache_backend.mkdir(nested_name[: -2 * level - 1]) else: raise ValueError(f"Invalid levels: {namespace}: {levels}") def create(self) -> None: self.backend.create() + if self.cache_backend is not None and not self._cache_disabled: + self.cache_backend.create() if self.backend.precreate_dirs: self.create_levels() def destroy(self) -> None: self.backend.destroy() + if self.cache_backend is not None: + self.cache_backend.destroy() def __enter__(self): self.open() @@ -134,9 +207,20 @@ def __exit__(self, exc_type, exc_val, exc_tb): def open(self) -> None: self.backend.open() + if self.cache_backend is not None and not self._cache_disabled: + try: + self.cache_backend.open() + except Exception as err: + logger.warning(f"borgstore: cache open failed, disabling cache: {err!r}") + self._cache_disabled = True def close(self) -> None: self.backend.close() + if self.cache_backend is not None: + try: + self.cache_backend.close() + except Exception as err: + logger.warning(f"borgstore: cache close failed: {err!r}") def quota(self) -> dict: return self.backend.quota() @@ -190,9 +274,65 @@ def stats(self): for key in "load", "store": v = st.get(f"{key}_volume", 0) t = st.get(f"{key}_time", 0) - st[f"{key}_throughput"] = v / t + st[f"{key}_throughput"] = v / t if t else 0 + st["cache_hits"] = st.get("cache_hits", 0) + st["cache_misses"] = st.get("cache_misses", 0) + st["cache_errors"] = st.get("cache_errors", 0) + st["cache_bytes_read"] = st.get("cache_bytes_read", 0) + st["cache_bytes_written"] = st.get("cache_bytes_written", 0) + st["cache_disabled"] = self._cache_disabled + cache_total = st["cache_hits"] + st["cache_misses"] + st["cache_hit_ratio"] = st["cache_hits"] / cache_total if cache_total else 0 return st + def _cache_get(self, nested_name: str) -> Optional[bytes]: + if self.cache_backend is None or self._cache_disabled: + return None + try: + value = self.cache_backend.load(nested_name) + except ObjectNotFound: + self._stats["cache_misses"] += 1 + return None + except Exception as err: + logger.warning(f"borgstore: cache load failed for {nested_name!r}: {err!r}") + self._stats["cache_errors"] += 1 + return None + self._stats["cache_hits"] += 1 + self._stats["cache_bytes_read"] += len(value) + return value + + def _cache_put(self, nested_name: str, value: bytes) -> None: + if self.cache_backend is None or self._cache_disabled: + return + try: + self.cache_backend.store(nested_name, value) + self._stats["cache_bytes_written"] += len(value) + except Exception as err: + logger.warning(f"borgstore: cache store failed for {nested_name!r}: {err!r}") + self._stats["cache_errors"] += 1 + + def _cache_invalidate(self, nested_name: str) -> None: + if self.cache_backend is None or self._cache_disabled: + return + try: + self.cache_backend.delete(nested_name) + except ObjectNotFound: + pass + except Exception as err: + logger.warning(f"borgstore: cache delete failed for {nested_name!r}: {err!r}") + self._stats["cache_errors"] += 1 + + def _cache_move(self, old_nested: str, new_nested: str) -> None: + if self.cache_backend is None or self._cache_disabled: + return + try: + self.cache_backend.move(old_nested, new_nested) + except ObjectNotFound: + pass + except Exception as err: + logger.warning(f"borgstore: cache move failed for {old_nested!r}->{new_nested!r}: {err!r}") + self._stats["cache_errors"] += 1 + def _get_levels(self, name): """Get levels from the configuration depending on the namespace.""" for namespace, levels in self.levels: @@ -227,7 +367,21 @@ def info(self, name: str, *, deleted=False) -> ItemInfo: def load(self, name: str, *, size=None, offset=0, deleted=False) -> bytes: with self._stats_updater("load", f"load({name!r}, offset={offset}, size={size}, deleted={deleted})"): - result = self.backend.load(self.find(name, deleted=deleted), size=size, offset=offset) + mode = self._cache_mode_for(name) + nested_name = self.find(name, deleted=deleted) + if mode == CacheMode.C_CACHE: + full_value = self._cache_get(nested_name) + if full_value is None: + full_value = self.backend.load(nested_name, size=None, offset=0) + self._cache_put(nested_name, full_value) + elif mode == CacheMode.C_MIRROR: + full_value = self.backend.load(nested_name, size=None, offset=0) + self._cache_put(nested_name, full_value) + else: + result = self.backend.load(nested_name, size=size, offset=offset) + self._stats_update_volume("load", len(result)) + return result + result = full_value[offset : (None if size is None else offset + size)] self._stats_update_volume("load", len(result)) return result @@ -236,7 +390,10 @@ def store(self, name: str, value: bytes) -> None: # - overwrite an existing item (level stays same) # - write to the last level if no existing item is found. with self._stats_updater("store", f"store({name!r})"): - self.backend.store(self.find(name), value) + nested_name = self.find(name) + self.backend.store(nested_name, value) + if self._cache_mode_for(name) in {CacheMode.C_CACHE, CacheMode.C_MIRROR}: + self._cache_put(nested_name, value) self._stats_update_volume("store", len(value)) def hash(self, name: str, algorithm: str = "sha256", *, deleted: bool = False) -> str: @@ -250,7 +407,10 @@ def delete(self, name: str, *, deleted=False) -> None: See also .move(name, delete=True) for "soft" deletion. """ with self._stats_updater("delete", f"delete({name!r}, deleted={deleted})"): - self.backend.delete(self.find(name, deleted=deleted)) + nested_name = self.find(name, deleted=deleted) + self.backend.delete(nested_name) + if self._cache_mode_for(name) in {CacheMode.C_CACHE, CacheMode.C_MIRROR}: + self._cache_invalidate(nested_name) def move( self, @@ -287,6 +447,8 @@ def move( msg = f"rename({name!r}, {new_name!r}, deleted={deleted})" with self._stats_updater("move", msg + f" [{nested_name!r}, {nested_new_name!r}]"): self.backend.move(nested_name, nested_new_name) + if self._cache_mode_for(name) in {CacheMode.C_CACHE, CacheMode.C_MIRROR}: + self._cache_move(nested_name, nested_new_name) def defrag(self, sources, *, target=None, algorithm=None, namespace=None, deleted=False) -> str: """ diff --git a/tests/test_cache.py b/tests/test_cache.py new file mode 100644 index 0000000..7a92a7c --- /dev/null +++ b/tests/test_cache.py @@ -0,0 +1,241 @@ +"""Tests for Store optional cache behavior.""" + +import pytest + +from borgstore.backends.errors import ObjectNotFound +from borgstore.constants import DEL_SUFFIX +from borgstore.store import CacheMode, Store + +LEVELS = {"data/": [2], "meta/": [1], "config/": [0]} + + +def make_store(tmp_path, *, cache=None, with_cache_backend=True): + primary = (tmp_path / "primary").resolve() + cache_root = (tmp_path / "cache").resolve() + kwargs = {"url": primary.as_uri(), "levels": LEVELS} + if cache is not None: + kwargs["cache"] = cache + if with_cache_backend: + kwargs["cache_url"] = cache_root.as_uri() + return Store(**kwargs), cache_root + + +def test_cache_disabled_by_default(tmp_path): + store, cache_root = make_store(tmp_path, cache=None, with_cache_backend=False) + store.create() + try: + with store: + name, value = "data/00000000", b"abc" + store.store(name, value) + assert store.load(name) == value + finally: + store.destroy() + assert not cache_root.exists() + + +def test_cache_aliases_and_invalid_value(tmp_path): + store, _ = make_store(tmp_path, cache={"data/": "cache", "meta/": "MIRROR", "config/": "off"}) + assert store.cache["data/"] == CacheMode.C_CACHE + assert store.cache["meta/"] == CacheMode.C_MIRROR + assert store.cache["config/"] == CacheMode.C_OFF + with pytest.raises(ValueError): + make_store(tmp_path, cache={"data/": "on"}) + + +def test_cache_misconfiguration(tmp_path): + primary_url = (tmp_path / "primary").resolve().as_uri() + cache_url = (tmp_path / "cache").resolve().as_uri() + with pytest.raises(ValueError): + make_store(tmp_path, cache={"data/": CacheMode.C_CACHE}, with_cache_backend=False) + with pytest.raises(ValueError): + Store(url=primary_url, levels=LEVELS, cache={"missing/": CacheMode.C_CACHE}, cache_url=cache_url) + + +def test_cache_off_only_without_backend_is_ok(tmp_path): + store, _ = make_store(tmp_path, cache={"data/": "off"}, with_cache_backend=False) + store.create() + try: + with store: + name, value = "data/00000000", b"abc" + store.store(name, value) + assert store.load(name) == value + finally: + store.destroy() + + +def test_c_cache_read_through_and_partial_load(tmp_path): + store, _ = make_store(tmp_path, cache={"data/": CacheMode.C_CACHE}) + store.create() + try: + with store: + name, value = "data/00000000", b"0123456789" + store.store(name, value) + store._cache_invalidate(store.find(name)) + calls = {"load": 0} + original_load = store.backend.load + + def wrapped(name, size=None, offset=0): + calls["load"] += 1 + return original_load(name, size=size, offset=offset) + + store.backend.load = wrapped + try: + assert store.load(name) == value + assert store.load(name, size=4, offset=2) == value[2:6] + finally: + store.backend.load = original_load + assert calls["load"] == 1 + finally: + store.destroy() + + +def test_c_mirror_reads_always_from_primary_and_populates_cache(tmp_path): + store, _ = make_store(tmp_path, cache={"data/": CacheMode.C_MIRROR}) + store.create() + try: + with store: + name, value = "data/00000000", b"abc" + nested_name = store.find(name) + store.backend.store(nested_name, value) + calls = {"load": 0} + original_load = store.backend.load + + def wrapped(name, size=None, offset=0): + calls["load"] += 1 + return original_load(name, size=size, offset=offset) + + store.backend.load = wrapped + try: + assert store.load(name) == value + assert store.load(name) == value + finally: + store.backend.load = original_load + assert calls["load"] == 2 + assert store.cache_backend.load(nested_name) == value + finally: + store.destroy() + + +@pytest.mark.parametrize("mode", [CacheMode.C_CACHE, CacheMode.C_MIRROR]) +def test_write_delete_and_soft_delete_mirror_cache_entries(tmp_path, mode): + store, _ = make_store(tmp_path, cache={"data/": mode}) + store.create() + try: + with store: + name, value = "data/00000000", b"abc" + nested = store.find(name) + store.store(name, value) + assert store.cache_backend.load(nested) == value + + store.move(name, delete=True) + with pytest.raises(ObjectNotFound): + store.cache_backend.load(nested) + assert store.cache_backend.load(nested + DEL_SUFFIX) == value + + store.move(name, undelete=True) + assert store.cache_backend.load(nested) == value + + store.delete(name) + with pytest.raises(ObjectNotFound): + store.cache_backend.load(nested) + finally: + store.destroy() + + +def test_generic_rename_and_change_level_move_cache(tmp_path): + levels = {"data/": [0, 1]} + primary_url = (tmp_path / "primary").resolve().as_uri() + cache_url = (tmp_path / "cache").resolve().as_uri() + store = Store(url=primary_url, levels=levels, cache={"data/": CacheMode.C_CACHE}, cache_url=cache_url) + store.create() + try: + with store: + old_name, new_name, value = "data/00000000", "data/00000001", b"x" + store.store(old_name, value) + old_nested = store.find(old_name) + store.move(old_name, new_name=new_name) + new_nested = store.find(new_name) + with pytest.raises(ObjectNotFound): + store.cache_backend.load(old_nested) + assert store.cache_backend.load(new_nested) == value + + store.move(new_name, change_level=True) + changed_nested = store.find(new_name) + assert store.cache_backend.load(changed_nested) == value + finally: + store.destroy() + + +def test_deleted_reads_use_del_cache_key(tmp_path): + store, _ = make_store(tmp_path, cache={"data/": CacheMode.C_CACHE}) + store.create() + try: + with store: + name, value = "data/00000000", b"abc" + nested = store.find(name) + store.store(name, value) + store.move(name, delete=True) + store._cache_invalidate(nested + DEL_SUFFIX) + calls = {"load": 0} + original_load = store.backend.load + + def wrapped(name, size=None, offset=0): + calls["load"] += 1 + return original_load(name, size=size, offset=offset) + + store.backend.load = wrapped + try: + assert store.load(name, deleted=True) == value + assert store.load(name, deleted=True) == value + finally: + store.backend.load = original_load + assert calls["load"] == 1 + assert store.cache_backend.load(nested + DEL_SUFFIX) == value + finally: + store.destroy() + + +def test_cache_errors_do_not_fail_main_operations(tmp_path): + store, _ = make_store(tmp_path, cache={"data/": CacheMode.C_CACHE}) + store.create() + try: + with store: + name, value = "data/00000000", b"abc" + store.cache_backend.store = lambda *_args, **_kwargs: (_ for _ in ()).throw(RuntimeError("boom")) + store.store(name, value) + + original_move = store.cache_backend.move + store.cache_backend.move = lambda *_args, **_kwargs: (_ for _ in ()).throw(RuntimeError("boom")) + try: + store.move(name, delete=True) + finally: + store.cache_backend.move = original_move + + assert store.info(name, deleted=True).exists + assert store.stats["cache_errors"] >= 1 + finally: + store.destroy() + + +def test_cache_stats(tmp_path): + store, _ = make_store(tmp_path, cache={"data/": CacheMode.C_CACHE, "meta/": CacheMode.C_MIRROR}) + store.create() + try: + with store: + data_name, data_value = "data/00000000", b"abc" + meta_name, meta_value = "meta/00", b"xyz" + store.store(data_name, data_value) + store.load(data_name) + store.load(data_name) + store.store(meta_name, meta_value) + store.load(meta_name) + + stats = store.stats + assert stats["cache_hits"] == 2 + assert stats["cache_misses"] == 0 + assert stats["cache_bytes_read"] == 6 + assert stats["cache_bytes_written"] == 9 + assert stats["cache_disabled"] is False + assert stats["cache_hit_ratio"] == 1.0 + finally: + store.destroy() From 408e890bf2088b8209a38ab38d3d3d1373b4cdc7 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Mon, 25 May 2026 18:31:22 +0200 Subject: [PATCH 3/6] cache: implement policy and max_age expiry --- docs/store_caching.rst | 34 +++- src/borgstore/backends/_base.py | 3 +- src/borgstore/backends/posixfs.py | 4 +- src/borgstore/store.py | 117 +++++++++++--- tests/test_backends.py | 11 +- tests/test_cache.py | 254 ++++++++++++++++++++++++++++-- tests/test_store.py | 24 +-- 7 files changed, 394 insertions(+), 53 deletions(-) diff --git a/docs/store_caching.rst b/docs/store_caching.rst index 37cfb83..ef0d706 100644 --- a/docs/store_caching.rst +++ b/docs/store_caching.rst @@ -9,9 +9,14 @@ Configuration ------------- - ``cache_url`` or ``cache_backend``: where cached data is stored -- ``cache``: mapping of namespace to cache mode +- ``cache``: mapping of namespace to cache policy -Cache modes are configured with ``CacheMode`` or string aliases: +Each cache policy can be provided either as: + +- ``CachePolicy(mode=..., max_age=...)`` +- ``{"mode": ..., "max_age": ...}`` + +``mode`` accepts ``CacheMode`` values or string aliases: - ``CacheMode.C_OFF`` or ``"off"``: bypass cache completely. - ``CacheMode.C_MIRROR`` or ``"mirror"``: always read from primary backend, @@ -19,6 +24,23 @@ Cache modes are configured with ``CacheMode`` or string aliases: - ``CacheMode.C_CACHE`` or ``"cache"``: read-through + write-through. For now, only content-hash addressed namespaces should use this mode. +``max_age`` is optional and expressed in seconds since last access. The default +is ``None`` (no age limit). + +Example:: + + from borgstore.store import Store, CacheMode + + store = Store( + url="sftp://user@host/repo", + levels={"data": [2], "meta": [1]}, + cache={ + "data": {"mode": "cache", "max_age": 3600}, + "meta": {"mode": CacheMode.C_MIRROR}, + }, + cache_url="file:///home/user/.cache/borgstore/repo", + ) + Behavior -------- @@ -26,6 +48,11 @@ Behavior - Soft-deleted items are cached under the same ``.del`` name as primary. - Soft delete/undelete (``move(delete=True|undelete=True)``) renames cache entries in lockstep with primary backend names. +- If ``max_age`` is configured and a cache item is expired, it is deleted from + the cache and treated as a cache miss. +- On ``Store.close()``, cache-enabled namespaces with ``max_age`` configured are + scanned and expired cache objects are removed before closing the cache + backend. - Cache failures are non-fatal and logged as warnings. Limitations @@ -35,6 +62,8 @@ Limitations - No proactive cache validation/revalidation. - If an object is deleted in the primary backend by another client, the local cache will still have a stale object. +- ``max_age`` depends on backend ``ItemInfo.atime`` support. If ``atime`` is 0 + (not implemented), age-based caching behaves as immediate expiry. Statistics ---------- @@ -47,3 +76,4 @@ Statistics - ``cache_bytes_read`` - ``cache_bytes_written`` - ``cache_hit_ratio`` +- ``cache_disabled`` diff --git a/src/borgstore/backends/_base.py b/src/borgstore/backends/_base.py index b3b02a7..c333a19 100644 --- a/src/borgstore/backends/_base.py +++ b/src/borgstore/backends/_base.py @@ -11,7 +11,8 @@ from ..constants import MAX_NAME_LENGTH, TMP_SUFFIX, HID_SUFFIX -ItemInfo = namedtuple("ItemInfo", "name exists size directory") +# atime is the last read access UNIX timestamp [s] or 0 if not implemented +ItemInfo = namedtuple("ItemInfo", "name exists size directory atime", defaults=(0,)) def validate_name(name): diff --git a/src/borgstore/backends/posixfs.py b/src/borgstore/backends/posixfs.py index 12c0c13..5732496 100644 --- a/src/borgstore/backends/posixfs.py +++ b/src/borgstore/backends/posixfs.py @@ -205,7 +205,7 @@ def info(self, name): return ItemInfo(name=path.name, exists=False, directory=False, size=0) else: is_dir = stat.S_ISDIR(st.st_mode) - return ItemInfo(name=path.name, exists=True, directory=is_dir, size=st.st_size) + return ItemInfo(name=path.name, exists=True, directory=is_dir, size=st.st_size, atime=st.st_atime) def load(self, name, *, size=None, offset=0): if not self.opened: @@ -361,7 +361,7 @@ def list(self, name): pass else: is_dir = stat.S_ISDIR(st.st_mode) - yield ItemInfo(name=p.name, exists=True, size=st.st_size, directory=is_dir) + yield ItemInfo(name=p.name, exists=True, size=st.st_size, directory=is_dir, atime=st.st_atime) def quota(self) -> dict: """Return quota information: limit and usage in bytes. -1 means not set / not tracked.""" diff --git a/src/borgstore/store.py b/src/borgstore/store.py index 06dc107..5336fc0 100644 --- a/src/borgstore/store.py +++ b/src/borgstore/store.py @@ -16,7 +16,7 @@ import logging import os import time -from typing import Iterator, Optional +from typing import Iterator, NamedTuple, Optional from .utils.nesting import nest, unnest from .backends._base import ItemInfo, BackendBase @@ -48,6 +48,11 @@ def from_str(cls, value): raise ValueError(f"unknown CacheMode: {value!r}") +class CachePolicy(NamedTuple): + mode: CacheMode + max_age: Optional[float] + + def get_backend(url, permissions=None, quota=None): """Parse backend URL and return a backend instance (or None).""" backend = get_file_backend(url, permissions=permissions, quota=quota) @@ -85,7 +90,7 @@ def __init__( levels: Optional[dict] = None, permissions: Optional[dict] = None, *, - cache: Optional[dict[str, CacheMode | str]] = None, + cache: Optional[dict[str, CachePolicy | dict]] = None, cache_url: Optional[str] = None, cache_backend: Optional[BackendBase] = None, ): @@ -101,13 +106,15 @@ def __init__( if cache_url is not None and cache_backend is not None: raise ValueError("Only one of cache_url and cache_backend can be given.") cache = cache or {} - normalized_cache = {namespace: CacheMode.from_str(mode) for namespace, mode in cache.items()} - has_enabled_namespaces = any(mode != CacheMode.C_OFF for mode in normalized_cache.values()) + if not isinstance(cache, dict): + raise ValueError("Invalid cache configuration: expected a dict mapping namespace to policy.") + cache_policies = {namespace: self._normalize_cache_policy(policy) for namespace, policy in cache.items()} configured_namespaces = {namespace for namespace, _ in self.levels} - for namespace, mode in normalized_cache.items(): - if mode != CacheMode.C_OFF and namespace not in configured_namespaces: + for namespace, policy in cache_policies.items(): + if policy.mode != CacheMode.C_OFF and namespace not in configured_namespaces: raise ValueError(f"Invalid cache namespace configuration: {namespace!r} not in levels.") - if has_enabled_namespaces and cache_url is None and cache_backend is None: + have_cache_enabled_namespaces = any(policy.mode != CacheMode.C_OFF for policy in cache_policies.values()) + if have_cache_enabled_namespaces and cache_url is None and cache_backend is None: raise ValueError("cache_url or cache_backend is required for cache modes other than C_OFF.") self.cache_backend = cache_backend if cache_backend is not None else None if self.cache_backend is None and cache_url is not None: @@ -115,11 +122,10 @@ def __init__( if self.cache_backend is None: raise BackendURLInvalid(f"Invalid or unsupported Cache Backend URL: {cache_url}") self._cache_disabled = False - self.cache = normalized_cache self.cache_namespaces = [ entry for entry in sorted( - ((namespace, mode) for namespace, mode in normalized_cache.items() if mode != CacheMode.C_OFF), + ((namespace, policy) for namespace, policy in cache_policies.items() if policy.mode != CacheMode.C_OFF), key=lambda item: len(item[0]), reverse=True, ) @@ -139,11 +145,47 @@ def __repr__(self): ) return f"" - def _cache_mode_for(self, name: str) -> CacheMode: - for namespace, mode in self.cache_namespaces: + @staticmethod + def _normalize_cache_policy(policy: CachePolicy | dict) -> CachePolicy: + if isinstance(policy, CachePolicy): + return policy + if isinstance(policy, dict): + unknown_keys = set(policy) - {"mode", "max_age"} + if unknown_keys: + raise ValueError(f"Invalid cache policy keys: {sorted(unknown_keys)!r}") + if "mode" not in policy: + raise ValueError("Invalid cache policy: 'mode' is required.") + mode = CacheMode.from_str(policy["mode"]) + max_age = policy.get("max_age") + if max_age is None: + return CachePolicy(mode=mode, max_age=None) + if not isinstance(max_age, (int, float)) or max_age < 0: + raise ValueError(f"Invalid cache max_age value: {max_age!r}") + return CachePolicy(mode=mode, max_age=float(max_age)) + raise ValueError("Invalid cache policy: expected dict or CachePolicy.") + + def _cache_policy_for(self, name: str) -> CachePolicy: + for namespace, policy in self.cache_namespaces: if name.startswith(namespace): - return mode - return CacheMode.C_OFF + return policy + return CachePolicy(mode=CacheMode.C_OFF, max_age=None) + + def _cache_is_expired(self, nested_name: str, max_age: Optional[float]) -> bool: + if max_age is None: + return False + if self.cache_backend is None or self._cache_disabled: + return True + try: + info = self.cache_backend.info(nested_name) + except ObjectNotFound: + return True + except Exception as err: + logger.warning(f"borgstore: cache info failed for {nested_name!r}: {err!r}") + self._stats["cache_errors"] += 1 + return True + if not info.atime: + return True + return (time.time() - info.atime) > max_age def set_levels(self, levels: dict, create: bool = False) -> None: if not levels or not isinstance(levels, dict): @@ -164,7 +206,7 @@ def create_levels(self): cache_enabled = ( self.cache_backend is not None and not self._cache_disabled - and self._cache_mode_for(f"{namespace}/") in {CacheMode.C_CACHE, CacheMode.C_MIRROR} + and self._cache_policy_for(f"{namespace}/").mode in {CacheMode.C_CACHE, CacheMode.C_MIRROR} ) if level == 0: # flat, we just need to create the namespace directory: @@ -214,9 +256,36 @@ def open(self) -> None: logger.warning(f"borgstore: cache open failed, disabling cache: {err!r}") self._cache_disabled = True + def _cache_list(self, name: str) -> Iterator[ItemInfo]: + if self.cache_backend is None: + return + for info in self.cache_backend.list(name): + if info.directory: + subdir_name = (name + "/" + info.name) if name else info.name + yield from self._cache_list(subdir_name) + else: + full_name = (name + "/" + info.name) if name else info.name + yield info._replace(name=full_name) + + def _cache_cleanup_expired(self) -> None: + now = time.time() + for namespace, policy in self.cache_namespaces: + if policy.max_age is None: + continue + try: + for info in self._cache_list(namespace.rstrip("/")): + if info.directory: + continue + if not info.atime or (now - info.atime) > policy.max_age: + self._cache_invalidate(info.name) + except Exception as err: + logger.warning(f"borgstore: cache cleanup failed for namespace {namespace!r}: {err!r}") + self._stats["cache_errors"] += 1 + def close(self) -> None: self.backend.close() if self.cache_backend is not None: + self._cache_cleanup_expired() try: self.cache_backend.close() except Exception as err: @@ -285,9 +354,13 @@ def stats(self): st["cache_hit_ratio"] = st["cache_hits"] / cache_total if cache_total else 0 return st - def _cache_get(self, nested_name: str) -> Optional[bytes]: + def _cache_get(self, nested_name: str, *, max_age: Optional[float] = None) -> Optional[bytes]: if self.cache_backend is None or self._cache_disabled: return None + if self._cache_is_expired(nested_name, max_age): + self._cache_invalidate(nested_name) + self._stats["cache_misses"] += 1 + return None try: value = self.cache_backend.load(nested_name) except ObjectNotFound: @@ -367,14 +440,14 @@ def info(self, name: str, *, deleted=False) -> ItemInfo: def load(self, name: str, *, size=None, offset=0, deleted=False) -> bytes: with self._stats_updater("load", f"load({name!r}, offset={offset}, size={size}, deleted={deleted})"): - mode = self._cache_mode_for(name) + cache_policy = self._cache_policy_for(name) nested_name = self.find(name, deleted=deleted) - if mode == CacheMode.C_CACHE: - full_value = self._cache_get(nested_name) + if cache_policy.mode == CacheMode.C_CACHE: + full_value = self._cache_get(nested_name, max_age=cache_policy.max_age) if full_value is None: full_value = self.backend.load(nested_name, size=None, offset=0) self._cache_put(nested_name, full_value) - elif mode == CacheMode.C_MIRROR: + elif cache_policy.mode == CacheMode.C_MIRROR: full_value = self.backend.load(nested_name, size=None, offset=0) self._cache_put(nested_name, full_value) else: @@ -392,7 +465,7 @@ def store(self, name: str, value: bytes) -> None: with self._stats_updater("store", f"store({name!r})"): nested_name = self.find(name) self.backend.store(nested_name, value) - if self._cache_mode_for(name) in {CacheMode.C_CACHE, CacheMode.C_MIRROR}: + if self._cache_policy_for(name).mode in {CacheMode.C_CACHE, CacheMode.C_MIRROR}: self._cache_put(nested_name, value) self._stats_update_volume("store", len(value)) @@ -409,7 +482,7 @@ def delete(self, name: str, *, deleted=False) -> None: with self._stats_updater("delete", f"delete({name!r}, deleted={deleted})"): nested_name = self.find(name, deleted=deleted) self.backend.delete(nested_name) - if self._cache_mode_for(name) in {CacheMode.C_CACHE, CacheMode.C_MIRROR}: + if self._cache_policy_for(name).mode in {CacheMode.C_CACHE, CacheMode.C_MIRROR}: self._cache_invalidate(nested_name) def move( @@ -447,7 +520,7 @@ def move( msg = f"rename({name!r}, {new_name!r}, deleted={deleted})" with self._stats_updater("move", msg + f" [{nested_name!r}, {nested_new_name!r}]"): self.backend.move(nested_name, nested_new_name) - if self._cache_mode_for(name) in {CacheMode.C_CACHE, CacheMode.C_MIRROR}: + if self._cache_policy_for(name).mode in {CacheMode.C_CACHE, CacheMode.C_MIRROR}: self._cache_move(nested_name, nested_new_name) def defrag(self, sources, *, target=None, algorithm=None, namespace=None, deleted=False) -> str: diff --git a/tests/test_backends.py b/tests/test_backends.py index f440853..ad5e115 100644 --- a/tests/test_backends.py +++ b/tests/test_backends.py @@ -501,8 +501,14 @@ def test_list(tested_backends, request): backend.mkdir("dir") items = list(backend.list(ROOTNS)) assert len(items) == 3 - assert ItemInfo(name=k0, exists=True, size=len(v0), directory=False) in items - assert ItemInfo(name=k1, exists=True, size=len(v1), directory=False) in items + matching_k0 = [item for item in items if item.name == k0] + matching_k1 = [item for item in items if item.name == k1] + assert len(matching_k0) == 1 + assert len(matching_k1) == 1 + assert matching_k0[0].exists and not matching_k0[0].directory and matching_k0[0].size == len(v0) + assert matching_k1[0].exists and not matching_k1[0].directory and matching_k1[0].size == len(v1) + assert matching_k0[0].atime >= 0 + assert matching_k1[0].atime >= 0 # for "dir", we do not know what size the backend has returned. # that is rather OS / fs / backend specific. matching_items = [item for item in items if item.name == "dir"] @@ -510,6 +516,7 @@ def test_list(tested_backends, request): dir_item = matching_items[0] assert dir_item.exists assert dir_item.directory + assert dir_item.atime >= 0 items = list(backend.list("dir")) assert items == [] diff --git a/tests/test_cache.py b/tests/test_cache.py index 7a92a7c..f48bc68 100644 --- a/tests/test_cache.py +++ b/tests/test_cache.py @@ -4,7 +4,7 @@ from borgstore.backends.errors import ObjectNotFound from borgstore.constants import DEL_SUFFIX -from borgstore.store import CacheMode, Store +from borgstore.store import CacheMode, CachePolicy, Store LEVELS = {"data/": [2], "meta/": [1], "config/": [0]} @@ -34,25 +34,97 @@ def test_cache_disabled_by_default(tmp_path): def test_cache_aliases_and_invalid_value(tmp_path): - store, _ = make_store(tmp_path, cache={"data/": "cache", "meta/": "MIRROR", "config/": "off"}) - assert store.cache["data/"] == CacheMode.C_CACHE - assert store.cache["meta/"] == CacheMode.C_MIRROR - assert store.cache["config/"] == CacheMode.C_OFF + store, _ = make_store( + tmp_path, + cache={ + "data/": {"mode": "cache"}, + "meta/": {"mode": "MIRROR"}, + "config/": {"mode": "off"}, + }, + ) + store.create() + try: + with store: + data_name, data_value = "data/00000000", b"abc" + meta_name, meta_value = "meta/abcde", b"meta" + config_name, config_value = "config/item", b"cfg" + store.store(data_name, data_value) + store.store(meta_name, meta_value) + store.store(config_name, config_value) + assert store.load(data_name) == data_value + assert store.load(meta_name) == meta_value + assert store.load(config_name) == config_value + finally: + store.destroy() with pytest.raises(ValueError): make_store(tmp_path, cache={"data/": "on"}) +def test_cache_policy_dict_and_max_age_validation(tmp_path): + store, _ = make_store( + tmp_path, + cache={ + "data/": {"mode": "cache", "max_age": 60}, + "meta/": {"mode": "mirror"}, + "config/": {"mode": "off", "max_age": 0}, + }, + ) + store.create() + try: + with store: + name, value = "data/00000000", b"abc" + store.store(name, value) + store._cache_invalidate(store.find(name)) + assert store.load(name) == value + assert store.load(name) == value + stats = store.stats + assert stats["cache_misses"] == 1 + assert stats["cache_hits"] == 1 + finally: + store.destroy() + + with pytest.raises(ValueError): + make_store(tmp_path, cache={"data/": {"max_age": 1}}) + with pytest.raises(ValueError): + make_store(tmp_path, cache={"data/": {"mode": "cache", "max_age": -1}}) + with pytest.raises(ValueError): + make_store(tmp_path, cache={"data/": {"mode": "cache", "max_age": "1"}}) + with pytest.raises(ValueError): + make_store(tmp_path, cache={"data/": {"mode": "cache", "unexpected": 1}}) + with pytest.raises(ValueError): + make_store(tmp_path, cache={"data/": "cache"}) + with pytest.raises(ValueError): + make_store(tmp_path, cache={"data/": CacheMode.C_CACHE}) + + +def test_cache_policy_namedtuple_input(tmp_path): + store, _ = make_store(tmp_path, cache={"data/": CachePolicy(mode=CacheMode.C_CACHE, max_age=60.0)}) + store.create() + try: + with store: + name, value = "data/00000000", b"abc" + store.store(name, value) + store._cache_invalidate(store.find(name)) + assert store.load(name) == value + assert store.load(name) == value + stats = store.stats + assert stats["cache_misses"] == 1 + assert stats["cache_hits"] == 1 + finally: + store.destroy() + + def test_cache_misconfiguration(tmp_path): primary_url = (tmp_path / "primary").resolve().as_uri() cache_url = (tmp_path / "cache").resolve().as_uri() with pytest.raises(ValueError): - make_store(tmp_path, cache={"data/": CacheMode.C_CACHE}, with_cache_backend=False) + make_store(tmp_path, cache={"data/": {"mode": "cache"}}, with_cache_backend=False) with pytest.raises(ValueError): - Store(url=primary_url, levels=LEVELS, cache={"missing/": CacheMode.C_CACHE}, cache_url=cache_url) + Store(url=primary_url, levels=LEVELS, cache={"missing/": {"mode": "cache"}}, cache_url=cache_url) def test_cache_off_only_without_backend_is_ok(tmp_path): - store, _ = make_store(tmp_path, cache={"data/": "off"}, with_cache_backend=False) + store, _ = make_store(tmp_path, cache={"data/": {"mode": "off"}}, with_cache_backend=False) store.create() try: with store: @@ -64,7 +136,7 @@ def test_cache_off_only_without_backend_is_ok(tmp_path): def test_c_cache_read_through_and_partial_load(tmp_path): - store, _ = make_store(tmp_path, cache={"data/": CacheMode.C_CACHE}) + store, _ = make_store(tmp_path, cache={"data/": {"mode": CacheMode.C_CACHE}}) store.create() try: with store: @@ -90,7 +162,7 @@ def wrapped(name, size=None, offset=0): def test_c_mirror_reads_always_from_primary_and_populates_cache(tmp_path): - store, _ = make_store(tmp_path, cache={"data/": CacheMode.C_MIRROR}) + store, _ = make_store(tmp_path, cache={"data/": {"mode": CacheMode.C_MIRROR}}) store.create() try: with store: @@ -118,7 +190,7 @@ def wrapped(name, size=None, offset=0): @pytest.mark.parametrize("mode", [CacheMode.C_CACHE, CacheMode.C_MIRROR]) def test_write_delete_and_soft_delete_mirror_cache_entries(tmp_path, mode): - store, _ = make_store(tmp_path, cache={"data/": mode}) + store, _ = make_store(tmp_path, cache={"data/": {"mode": mode}}) store.create() try: with store: @@ -146,7 +218,7 @@ def test_generic_rename_and_change_level_move_cache(tmp_path): levels = {"data/": [0, 1]} primary_url = (tmp_path / "primary").resolve().as_uri() cache_url = (tmp_path / "cache").resolve().as_uri() - store = Store(url=primary_url, levels=levels, cache={"data/": CacheMode.C_CACHE}, cache_url=cache_url) + store = Store(url=primary_url, levels=levels, cache={"data/": {"mode": CacheMode.C_CACHE}}, cache_url=cache_url) store.create() try: with store: @@ -167,7 +239,7 @@ def test_generic_rename_and_change_level_move_cache(tmp_path): def test_deleted_reads_use_del_cache_key(tmp_path): - store, _ = make_store(tmp_path, cache={"data/": CacheMode.C_CACHE}) + store, _ = make_store(tmp_path, cache={"data/": {"mode": CacheMode.C_CACHE}}) store.create() try: with store: @@ -195,8 +267,74 @@ def wrapped(name, size=None, offset=0): store.destroy() +def test_c_cache_respects_max_age_since_last_use(tmp_path, monkeypatch): + store, _ = make_store(tmp_path, cache={"data/": {"mode": CacheMode.C_CACHE, "max_age": 5}}) + store.create() + try: + with store: + name, value = "data/00000000", b"abc" + store.store(name, value) + nested_name = store.find(name) + store._cache_invalidate(nested_name) + + now = 1000.0 + atime = 0.0 + + def fake_time(): + return now + + monkeypatch.setattr("borgstore.store.time.time", fake_time) + original_info = store.cache_backend.info + cache_deletes = {"count": 0} + original_cache_delete = store.cache_backend.delete + + def wrapped_info(backend_name): + info = original_info(backend_name) + if backend_name == nested_name and info.exists: + return info._replace(atime=atime) + return info + + store.cache_backend.info = wrapped_info + + def wrapped_cache_delete(backend_name): + if backend_name == nested_name: + cache_deletes["count"] += 1 + return original_cache_delete(backend_name) + + store.cache_backend.delete = wrapped_cache_delete + + calls = {"load": 0} + original_load = store.backend.load + + def wrapped(backend_name, size=None, offset=0): + calls["load"] += 1 + return original_load(backend_name, size=size, offset=offset) + + store.backend.load = wrapped + try: + assert store.load(name) == value # miss, populate cache at t=1000 + atime = 1000.0 + now = 1004.0 + assert store.load(name) == value # hit, refresh last-used to t=1004 + atime = 1004.0 + now = 1010.0 + assert store.load(name) == value # expired, miss again + finally: + store.backend.load = original_load + store.cache_backend.info = original_info + store.cache_backend.delete = original_cache_delete + + assert calls["load"] == 2 + assert cache_deletes["count"] == 2 + stats = store.stats + assert stats["cache_misses"] == 2 + assert stats["cache_hits"] == 1 + finally: + store.destroy() + + def test_cache_errors_do_not_fail_main_operations(tmp_path): - store, _ = make_store(tmp_path, cache={"data/": CacheMode.C_CACHE}) + store, _ = make_store(tmp_path, cache={"data/": {"mode": CacheMode.C_CACHE}}) store.create() try: with store: @@ -218,7 +356,10 @@ def test_cache_errors_do_not_fail_main_operations(tmp_path): def test_cache_stats(tmp_path): - store, _ = make_store(tmp_path, cache={"data/": CacheMode.C_CACHE, "meta/": CacheMode.C_MIRROR}) + store, _ = make_store( + tmp_path, + cache={"data/": {"mode": CacheMode.C_CACHE}, "meta/": {"mode": CacheMode.C_MIRROR}}, + ) store.create() try: with store: @@ -239,3 +380,86 @@ def test_cache_stats(tmp_path): assert stats["cache_hit_ratio"] == 1.0 finally: store.destroy() + + +def test_close_cleans_up_expired_cache_items(tmp_path, monkeypatch): + store, _ = make_store(tmp_path, cache={"data/": {"mode": CacheMode.C_CACHE, "max_age": 5}}) + store.create() + name, value = "data/00000000", b"abc" + with store: + store.store(name, value) + nested_name = store.find(name) + + now = 2000.0 + atime = 1990.0 + + def fake_time(): + return now + + monkeypatch.setattr("borgstore.store.time.time", fake_time) + original_list = store.cache_backend.list + delete_calls = {"count": 0} + original_delete = store.cache_backend.delete + + def wrapped_list(backend_name): + for info in original_list(backend_name): + full_name = (backend_name + "/" + info.name) if backend_name else info.name + if full_name == nested_name and info.exists: + yield info._replace(atime=atime) + else: + yield info + + store.cache_backend.list = wrapped_list + + def wrapped_delete(backend_name): + if backend_name == nested_name: + delete_calls["count"] += 1 + return original_delete(backend_name) + + store.cache_backend.delete = wrapped_delete + try: + store.open() + store.close() + assert delete_calls["count"] == 1 + + atime = 1999.0 + store.open() + store.store(name, value) + store.close() + assert delete_calls["count"] == 1 + finally: + store.cache_backend.list = original_list + store.cache_backend.delete = original_delete + store.destroy() + + +def test_close_cleanup_errors_are_best_effort(tmp_path): + store, _ = make_store(tmp_path, cache={"data/": {"mode": CacheMode.C_CACHE, "max_age": 5}}) + store.create() + name, value = "data/00000000", b"abc" + with store: + store.store(name, value) + + original_list = store.cache_backend.list + close_calls = {"count": 0} + original_close = store.cache_backend.close + + def failing_list(_backend_name): + raise RuntimeError("boom") + yield # pragma: no cover + + def wrapped_close(): + close_calls["count"] += 1 + return original_close() + + store.cache_backend.list = failing_list + store.cache_backend.close = wrapped_close + try: + store.open() + store.close() + assert close_calls["count"] == 1 + assert store.stats["cache_errors"] >= 1 + finally: + store.cache_backend.list = original_list + store.cache_backend.close = original_close + store.destroy() diff --git a/tests/test_store.py b/tests/test_store.py index f42d8cb..cfad04e 100644 --- a/tests/test_store.py +++ b/tests/test_store.py @@ -81,7 +81,13 @@ def test_basics(posixfs_store_created): assert store.backend.info("two/00/00/00000000").size == len(v0) assert not store.backend.info("two/00/00/00000000").directory - assert list(store.list(ns)) == [ItemInfo(name=k0, exists=True, size=len(v0), directory=False)] + items = list(store.list(ns)) + assert len(items) == 1 + assert items[0].name == k0 + assert items[0].exists + assert items[0].size == len(v0) + assert not items[0].directory + assert items[0].atime >= 0 store.delete(nsk0) @@ -198,7 +204,7 @@ def test_upgrade_levels(posixfs_store_created): # Store k0 on level 0: store.store(k0, v0) assert store.find(k0) == "" + k0 # found on level 0 - assert store.info(k0) == ii0 + assert store.info(k0)[:4] == ii0[:4] assert list_store_names(store, ROOTNS) == [k0] # Now upgrade to nesting level 1 (while keeping support for level 0), using the same backend storage: @@ -206,12 +212,12 @@ def test_upgrade_levels(posixfs_store_created): with posixfs_store_created as store: # Does k0 still work? assert store.find(k0) == "" + k0 # found on level 0 - assert store.info(k0) == ii0 + assert store.info(k0)[:4] == ii0[:4] assert list_store_names(store, ROOTNS) == [k0] # Store k1 on level 1: store.store(k1, v1) assert store.find(k1) == "00/" + k1 # found on level 1 - assert store.info(k1) == ii1 + assert store.info(k1)[:4] == ii1[:4] assert list_store_names_sorted(store, ROOTNS) == [k0, k1] store.delete(k1) # just to have it out of the way @@ -220,7 +226,7 @@ def test_upgrade_levels(posixfs_store_created): ii0new = ItemInfo(k0, True, 9, False) store.store(k0, v0new) assert store.find(k0) == "" + k0 # still found on level 0 - assert store.info(k0) == ii0new + assert store.info(k0)[:4] == ii0new[:4] # k0 should show up only once, as we overwrote the level 0 item: assert list_store_names(store, ROOTNS) == [k0] assert store.load(k0) == v0new @@ -238,7 +244,7 @@ def test_downgrade_levels(posixfs_store_created): # Store k1 on level 1: store.store(k1, v1) assert store.find(k1) == "00/" + k1 # found on level 1 - assert store.info(k1) == ii1 + assert store.info(k1)[:4] == ii1[:4] assert list_store_names(store, ROOTNS) == [k1] # Now downgrade to nesting level 0 (while keeping support for level 1), using the same backend storage: @@ -246,12 +252,12 @@ def test_downgrade_levels(posixfs_store_created): with posixfs_store_created as store: # Does k1 still work? assert store.find(k1) == "00/" + k1 # found on level 1 - assert store.info(k1) == ii1 + assert store.info(k1)[:4] == ii1[:4] assert list_store_names(store, ROOTNS) == [k1] # Store k0 on level 0: store.store(k0, v0) assert store.find(k0) == "" + k0 # found on level 0 - assert store.info(k0) == ii0 + assert store.info(k0)[:4] == ii0[:4] assert list_store_names_sorted(store, ROOTNS) == [k0, k1] store.delete(k0) # just to have it out of the way @@ -260,7 +266,7 @@ def test_downgrade_levels(posixfs_store_created): ii1new = ItemInfo(k1, True, 9, False) store.store(k1, v1new) assert store.find(k1) == "00/" + k1 # still found on level 1 - assert store.info(k1) == ii1new + assert store.info(k1)[:4] == ii1new[:4] # k1 should show up only once, as we overwrote the level 1 item: assert list_store_names(store, ROOTNS) == [k1] assert store.load(k1) == v1new From 9e21900066afc6532163b94bc3fa509c20e05718 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Mon, 25 May 2026 18:50:40 +0200 Subject: [PATCH 4/6] cache: rename "cache" mode to "writethrough" --- add-optional-cache.md | 17 +++++++++-------- docs/store_caching.rst | 4 ++-- src/borgstore/store.py | 12 ++++++------ tests/test_cache.py | 43 ++++++++++++++++++++---------------------- 4 files changed, 37 insertions(+), 39 deletions(-) diff --git a/add-optional-cache.md b/add-optional-cache.md index f9b19d1..fd5b1d4 100644 --- a/add-optional-cache.md +++ b/add-optional-cache.md @@ -132,18 +132,19 @@ class CacheMode(enum.Enum): New stats fields: `cache_hits`, `cache_misses`, `cache_errors`, `cache_bytes_read`, `cache_bytes_written`, `cache_hit_ratio`. Usage example: + ```python from borgstore.store import Store, CacheMode store = Store( - url="sftp://user@host/repo", - levels={"data": [2], "meta": [1], "config": [0]}, - cache={ - "data": CacheMode.C_CACHE, # read-through + write-through - "meta": "mirror", # string alias -> CacheMode.C_MIRROR - "config": "off", # string alias -> CacheMode.C_OFF - }, - cache_url="file:///home/u/.cache/borgstore/", + url="sftp://user@host/repo", + levels={"data": [2], "meta": [1], "config": [0]}, + cache={ + "data": CacheMode.C_WRITETHROUGH, # read-through + write-through + "meta": "mirror", # string alias -> CacheMode.C_MIRROR + "config": "off", # string alias -> CacheMode.C_OFF + }, + cache_url="file:///home/u/.cache/borgstore/", ) ``` diff --git a/docs/store_caching.rst b/docs/store_caching.rst index ef0d706..d9a02e1 100644 --- a/docs/store_caching.rst +++ b/docs/store_caching.rst @@ -21,7 +21,7 @@ Each cache policy can be provided either as: - ``CacheMode.C_OFF`` or ``"off"``: bypass cache completely. - ``CacheMode.C_MIRROR`` or ``"mirror"``: always read from primary backend, but update the cache after successful primary backend reads and writes. -- ``CacheMode.C_CACHE`` or ``"cache"``: read-through + write-through. +- ``CacheMode.C_WRITETHROUGH`` or ``"writethrough"``: read-through + write-through. For now, only content-hash addressed namespaces should use this mode. ``max_age`` is optional and expressed in seconds since last access. The default @@ -35,7 +35,7 @@ Example:: url="sftp://user@host/repo", levels={"data": [2], "meta": [1]}, cache={ - "data": {"mode": "cache", "max_age": 3600}, + "data": {"mode": "writethrough", "max_age": 3600}, "meta": {"mode": CacheMode.C_MIRROR}, }, cache_url="file:///home/user/.cache/borgstore/repo", diff --git a/src/borgstore/store.py b/src/borgstore/store.py index 5336fc0..ecd4f7b 100644 --- a/src/borgstore/store.py +++ b/src/borgstore/store.py @@ -33,8 +33,8 @@ class CacheMode(enum.Enum): C_OFF = "off" - C_CACHE = "cache" C_MIRROR = "mirror" + C_WRITETHROUGH = "writethrough" @classmethod def from_str(cls, value): @@ -206,7 +206,7 @@ def create_levels(self): cache_enabled = ( self.cache_backend is not None and not self._cache_disabled - and self._cache_policy_for(f"{namespace}/").mode in {CacheMode.C_CACHE, CacheMode.C_MIRROR} + and self._cache_policy_for(f"{namespace}/").mode in {CacheMode.C_WRITETHROUGH, CacheMode.C_MIRROR} ) if level == 0: # flat, we just need to create the namespace directory: @@ -442,7 +442,7 @@ def load(self, name: str, *, size=None, offset=0, deleted=False) -> bytes: with self._stats_updater("load", f"load({name!r}, offset={offset}, size={size}, deleted={deleted})"): cache_policy = self._cache_policy_for(name) nested_name = self.find(name, deleted=deleted) - if cache_policy.mode == CacheMode.C_CACHE: + if cache_policy.mode == CacheMode.C_WRITETHROUGH: full_value = self._cache_get(nested_name, max_age=cache_policy.max_age) if full_value is None: full_value = self.backend.load(nested_name, size=None, offset=0) @@ -465,7 +465,7 @@ def store(self, name: str, value: bytes) -> None: with self._stats_updater("store", f"store({name!r})"): nested_name = self.find(name) self.backend.store(nested_name, value) - if self._cache_policy_for(name).mode in {CacheMode.C_CACHE, CacheMode.C_MIRROR}: + if self._cache_policy_for(name).mode in {CacheMode.C_WRITETHROUGH, CacheMode.C_MIRROR}: self._cache_put(nested_name, value) self._stats_update_volume("store", len(value)) @@ -482,7 +482,7 @@ def delete(self, name: str, *, deleted=False) -> None: with self._stats_updater("delete", f"delete({name!r}, deleted={deleted})"): nested_name = self.find(name, deleted=deleted) self.backend.delete(nested_name) - if self._cache_policy_for(name).mode in {CacheMode.C_CACHE, CacheMode.C_MIRROR}: + if self._cache_policy_for(name).mode in {CacheMode.C_WRITETHROUGH, CacheMode.C_MIRROR}: self._cache_invalidate(nested_name) def move( @@ -520,7 +520,7 @@ def move( msg = f"rename({name!r}, {new_name!r}, deleted={deleted})" with self._stats_updater("move", msg + f" [{nested_name!r}, {nested_new_name!r}]"): self.backend.move(nested_name, nested_new_name) - if self._cache_policy_for(name).mode in {CacheMode.C_CACHE, CacheMode.C_MIRROR}: + if self._cache_policy_for(name).mode in {CacheMode.C_WRITETHROUGH, CacheMode.C_MIRROR}: self._cache_move(nested_name, nested_new_name) def defrag(self, sources, *, target=None, algorithm=None, namespace=None, deleted=False) -> str: diff --git a/tests/test_cache.py b/tests/test_cache.py index f48bc68..708df76 100644 --- a/tests/test_cache.py +++ b/tests/test_cache.py @@ -36,11 +36,7 @@ def test_cache_disabled_by_default(tmp_path): def test_cache_aliases_and_invalid_value(tmp_path): store, _ = make_store( tmp_path, - cache={ - "data/": {"mode": "cache"}, - "meta/": {"mode": "MIRROR"}, - "config/": {"mode": "off"}, - }, + cache={"data/": {"mode": "writethrough"}, "meta/": {"mode": "MIRROR"}, "config/": {"mode": "off"}}, ) store.create() try: @@ -64,7 +60,7 @@ def test_cache_policy_dict_and_max_age_validation(tmp_path): store, _ = make_store( tmp_path, cache={ - "data/": {"mode": "cache", "max_age": 60}, + "data/": {"mode": "writethrough", "max_age": 60}, "meta/": {"mode": "mirror"}, "config/": {"mode": "off", "max_age": 0}, }, @@ -86,19 +82,19 @@ def test_cache_policy_dict_and_max_age_validation(tmp_path): with pytest.raises(ValueError): make_store(tmp_path, cache={"data/": {"max_age": 1}}) with pytest.raises(ValueError): - make_store(tmp_path, cache={"data/": {"mode": "cache", "max_age": -1}}) + make_store(tmp_path, cache={"data/": {"mode": "writethrough", "max_age": -1}}) with pytest.raises(ValueError): - make_store(tmp_path, cache={"data/": {"mode": "cache", "max_age": "1"}}) + make_store(tmp_path, cache={"data/": {"mode": "writethrough", "max_age": "1"}}) with pytest.raises(ValueError): - make_store(tmp_path, cache={"data/": {"mode": "cache", "unexpected": 1}}) + make_store(tmp_path, cache={"data/": {"mode": "writethrough", "unexpected": 1}}) with pytest.raises(ValueError): make_store(tmp_path, cache={"data/": "cache"}) with pytest.raises(ValueError): - make_store(tmp_path, cache={"data/": CacheMode.C_CACHE}) + make_store(tmp_path, cache={"data/": CacheMode.C_WRITETHROUGH}) def test_cache_policy_namedtuple_input(tmp_path): - store, _ = make_store(tmp_path, cache={"data/": CachePolicy(mode=CacheMode.C_CACHE, max_age=60.0)}) + store, _ = make_store(tmp_path, cache={"data/": CachePolicy(mode=CacheMode.C_WRITETHROUGH, max_age=60.0)}) store.create() try: with store: @@ -118,9 +114,9 @@ def test_cache_misconfiguration(tmp_path): primary_url = (tmp_path / "primary").resolve().as_uri() cache_url = (tmp_path / "cache").resolve().as_uri() with pytest.raises(ValueError): - make_store(tmp_path, cache={"data/": {"mode": "cache"}}, with_cache_backend=False) + make_store(tmp_path, cache={"data/": {"mode": "writethrough"}}, with_cache_backend=False) with pytest.raises(ValueError): - Store(url=primary_url, levels=LEVELS, cache={"missing/": {"mode": "cache"}}, cache_url=cache_url) + Store(url=primary_url, levels=LEVELS, cache={"missing/": {"mode": "writethrough"}}, cache_url=cache_url) def test_cache_off_only_without_backend_is_ok(tmp_path): @@ -136,7 +132,7 @@ def test_cache_off_only_without_backend_is_ok(tmp_path): def test_c_cache_read_through_and_partial_load(tmp_path): - store, _ = make_store(tmp_path, cache={"data/": {"mode": CacheMode.C_CACHE}}) + store, _ = make_store(tmp_path, cache={"data/": {"mode": CacheMode.C_WRITETHROUGH}}) store.create() try: with store: @@ -188,7 +184,7 @@ def wrapped(name, size=None, offset=0): store.destroy() -@pytest.mark.parametrize("mode", [CacheMode.C_CACHE, CacheMode.C_MIRROR]) +@pytest.mark.parametrize("mode", [CacheMode.C_WRITETHROUGH, CacheMode.C_MIRROR]) def test_write_delete_and_soft_delete_mirror_cache_entries(tmp_path, mode): store, _ = make_store(tmp_path, cache={"data/": {"mode": mode}}) store.create() @@ -218,7 +214,9 @@ def test_generic_rename_and_change_level_move_cache(tmp_path): levels = {"data/": [0, 1]} primary_url = (tmp_path / "primary").resolve().as_uri() cache_url = (tmp_path / "cache").resolve().as_uri() - store = Store(url=primary_url, levels=levels, cache={"data/": {"mode": CacheMode.C_CACHE}}, cache_url=cache_url) + store = Store( + url=primary_url, levels=levels, cache={"data/": {"mode": CacheMode.C_WRITETHROUGH}}, cache_url=cache_url + ) store.create() try: with store: @@ -239,7 +237,7 @@ def test_generic_rename_and_change_level_move_cache(tmp_path): def test_deleted_reads_use_del_cache_key(tmp_path): - store, _ = make_store(tmp_path, cache={"data/": {"mode": CacheMode.C_CACHE}}) + store, _ = make_store(tmp_path, cache={"data/": {"mode": CacheMode.C_WRITETHROUGH}}) store.create() try: with store: @@ -268,7 +266,7 @@ def wrapped(name, size=None, offset=0): def test_c_cache_respects_max_age_since_last_use(tmp_path, monkeypatch): - store, _ = make_store(tmp_path, cache={"data/": {"mode": CacheMode.C_CACHE, "max_age": 5}}) + store, _ = make_store(tmp_path, cache={"data/": {"mode": CacheMode.C_WRITETHROUGH, "max_age": 5}}) store.create() try: with store: @@ -334,7 +332,7 @@ def wrapped(backend_name, size=None, offset=0): def test_cache_errors_do_not_fail_main_operations(tmp_path): - store, _ = make_store(tmp_path, cache={"data/": {"mode": CacheMode.C_CACHE}}) + store, _ = make_store(tmp_path, cache={"data/": {"mode": CacheMode.C_WRITETHROUGH}}) store.create() try: with store: @@ -357,8 +355,7 @@ def test_cache_errors_do_not_fail_main_operations(tmp_path): def test_cache_stats(tmp_path): store, _ = make_store( - tmp_path, - cache={"data/": {"mode": CacheMode.C_CACHE}, "meta/": {"mode": CacheMode.C_MIRROR}}, + tmp_path, cache={"data/": {"mode": CacheMode.C_WRITETHROUGH}, "meta/": {"mode": CacheMode.C_MIRROR}} ) store.create() try: @@ -383,7 +380,7 @@ def test_cache_stats(tmp_path): def test_close_cleans_up_expired_cache_items(tmp_path, monkeypatch): - store, _ = make_store(tmp_path, cache={"data/": {"mode": CacheMode.C_CACHE, "max_age": 5}}) + store, _ = make_store(tmp_path, cache={"data/": {"mode": CacheMode.C_WRITETHROUGH, "max_age": 5}}) store.create() name, value = "data/00000000", b"abc" with store: @@ -434,7 +431,7 @@ def wrapped_delete(backend_name): def test_close_cleanup_errors_are_best_effort(tmp_path): - store, _ = make_store(tmp_path, cache={"data/": {"mode": CacheMode.C_CACHE, "max_age": 5}}) + store, _ = make_store(tmp_path, cache={"data/": {"mode": CacheMode.C_WRITETHROUGH, "max_age": 5}}) store.create() name, value = "data/00000000", b"abc" with store: From 2a9065bed4160bef4dda217bf24853bcdfa9924a Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Mon, 25 May 2026 19:42:52 +0200 Subject: [PATCH 5/6] cache: add "size" to policy, close-time size-based LRU eviction --- docs/store_caching.rst | 32 ++++++++++---- src/borgstore/store.py | 39 +++++++++++------ tests/test_cache.py | 98 +++++++++++++++++++++++++++++++++++++++--- 3 files changed, 143 insertions(+), 26 deletions(-) diff --git a/docs/store_caching.rst b/docs/store_caching.rst index d9a02e1..b227b23 100644 --- a/docs/store_caching.rst +++ b/docs/store_caching.rst @@ -13,8 +13,8 @@ Configuration Each cache policy can be provided either as: -- ``CachePolicy(mode=..., max_age=...)`` -- ``{"mode": ..., "max_age": ...}`` +- ``CachePolicy(mode=..., max_age=..., size=...)`` +- ``{"mode": ..., "max_age": ..., "size": ...}`` ``mode`` accepts ``CacheMode`` values or string aliases: @@ -27,6 +27,10 @@ Each cache policy can be provided either as: ``max_age`` is optional and expressed in seconds since last access. The default is ``None`` (no age limit). +``size`` is optional and expressed in bytes. It sets a per-namespace cache size +budget enforced during ``Store.close()`` by evicting least-recently-used items +until the namespace total size is within the configured budget. + Example:: from borgstore.store import Store, CacheMode @@ -35,7 +39,7 @@ Example:: url="sftp://user@host/repo", levels={"data": [2], "meta": [1]}, cache={ - "data": {"mode": "writethrough", "max_age": 3600}, + "data": {"mode": "writethrough", "max_age": 3600, "size": 4 * 1024**3}, "meta": {"mode": CacheMode.C_MIRROR}, }, cache_url="file:///home/user/.cache/borgstore/repo", @@ -50,20 +54,30 @@ Behavior entries in lockstep with primary backend names. - If ``max_age`` is configured and a cache item is expired, it is deleted from the cache and treated as a cache miss. -- On ``Store.close()``, cache-enabled namespaces with ``max_age`` configured are - scanned and expired cache objects are removed before closing the cache - backend. +- On ``Store.close()``, cache-enabled namespaces are scanned before closing the + cache backend. Cleanup order per namespace is: + + 1. remove expired cache objects when ``max_age`` is configured, + 2. if ``size`` is configured, evict the least-recently-used remaining items + until the namespace total size is ``<= size``. + + Expired entries are always removed first, even if total size is already below + the ``size`` limit. - Cache failures are non-fatal and logged as warnings. Limitations ----------- -- No cache eviction. +- Eviction is close-time only (on ``Store.close()``), not continuous during + ``store()``/``load()`` operations. - No proactive cache validation/revalidation. - If an object is deleted in the primary backend by another client, the local cache will still have a stale object. -- ``max_age`` depends on backend ``ItemInfo.atime`` support. If ``atime`` is 0 - (not implemented), age-based caching behaves as immediate expiry. +- ``max_age`` and LRU-by-``size`` depend on backend ``ItemInfo.atime`` support. + If ``atime`` is 0 (not implemented): + + - using ``max_age`` would empty the cache on ``Store.close()`` + - using ``size`` would not work in LRU order, because order can't be determined Statistics ---------- diff --git a/src/borgstore/store.py b/src/borgstore/store.py index ecd4f7b..5eeb8e9 100644 --- a/src/borgstore/store.py +++ b/src/borgstore/store.py @@ -51,6 +51,7 @@ def from_str(cls, value): class CachePolicy(NamedTuple): mode: CacheMode max_age: Optional[float] + size: Optional[int] def get_backend(url, permissions=None, quota=None): @@ -150,25 +151,28 @@ def _normalize_cache_policy(policy: CachePolicy | dict) -> CachePolicy: if isinstance(policy, CachePolicy): return policy if isinstance(policy, dict): - unknown_keys = set(policy) - {"mode", "max_age"} + unknown_keys = set(policy) - {"mode", "max_age", "size"} if unknown_keys: raise ValueError(f"Invalid cache policy keys: {sorted(unknown_keys)!r}") if "mode" not in policy: raise ValueError("Invalid cache policy: 'mode' is required.") mode = CacheMode.from_str(policy["mode"]) max_age = policy.get("max_age") - if max_age is None: - return CachePolicy(mode=mode, max_age=None) - if not isinstance(max_age, (int, float)) or max_age < 0: - raise ValueError(f"Invalid cache max_age value: {max_age!r}") - return CachePolicy(mode=mode, max_age=float(max_age)) + if max_age is not None: + if not isinstance(max_age, (int, float)) or max_age < 0: + raise ValueError(f"Invalid cache max_age value: {max_age!r}") + max_age = float(max_age) + size = policy.get("size") + if size is not None and (not isinstance(size, int) or size < 0): + raise ValueError(f"Invalid cache size value: {size!r}") + return CachePolicy(mode=mode, max_age=max_age, size=size) raise ValueError("Invalid cache policy: expected dict or CachePolicy.") def _cache_policy_for(self, name: str) -> CachePolicy: for namespace, policy in self.cache_namespaces: if name.startswith(namespace): return policy - return CachePolicy(mode=CacheMode.C_OFF, max_age=None) + return CachePolicy(mode=CacheMode.C_OFF, max_age=None, size=None) def _cache_is_expired(self, nested_name: str, max_age: Optional[float]) -> bool: if max_age is None: @@ -270,14 +274,25 @@ def _cache_list(self, name: str) -> Iterator[ItemInfo]: def _cache_cleanup_expired(self) -> None: now = time.time() for namespace, policy in self.cache_namespaces: - if policy.max_age is None: + if policy.max_age is None and policy.size is None: continue try: - for info in self._cache_list(namespace.rstrip("/")): - if info.directory: - continue - if not info.atime or (now - info.atime) > policy.max_age: + items = [info for info in self._cache_list(namespace.rstrip("/")) if not info.directory] + if policy.max_age is not None: + remaining_items = [] + for info in items: + if not info.atime or (now - info.atime) > policy.max_age: + self._cache_invalidate(info.name) + else: + remaining_items.append(info) + items = remaining_items + if policy.size is not None: + total_size = sum(info.size for info in items) + for info in sorted(items, key=lambda entry: (entry.atime, entry.name)): + if total_size <= policy.size: + break self._cache_invalidate(info.name) + total_size -= info.size except Exception as err: logger.warning(f"borgstore: cache cleanup failed for namespace {namespace!r}: {err!r}") self._stats["cache_errors"] += 1 diff --git a/tests/test_cache.py b/tests/test_cache.py index 708df76..8011fec 100644 --- a/tests/test_cache.py +++ b/tests/test_cache.py @@ -35,8 +35,7 @@ def test_cache_disabled_by_default(tmp_path): def test_cache_aliases_and_invalid_value(tmp_path): store, _ = make_store( - tmp_path, - cache={"data/": {"mode": "writethrough"}, "meta/": {"mode": "MIRROR"}, "config/": {"mode": "off"}}, + tmp_path, cache={"data/": {"mode": "writethrough"}, "meta/": {"mode": "MIRROR"}, "config/": {"mode": "off"}} ) store.create() try: @@ -60,9 +59,9 @@ def test_cache_policy_dict_and_max_age_validation(tmp_path): store, _ = make_store( tmp_path, cache={ - "data/": {"mode": "writethrough", "max_age": 60}, + "data/": {"mode": "writethrough", "max_age": 60, "size": 1024}, "meta/": {"mode": "mirror"}, - "config/": {"mode": "off", "max_age": 0}, + "config/": {"mode": "off", "max_age": 0, "size": 0}, }, ) store.create() @@ -85,6 +84,12 @@ def test_cache_policy_dict_and_max_age_validation(tmp_path): make_store(tmp_path, cache={"data/": {"mode": "writethrough", "max_age": -1}}) with pytest.raises(ValueError): make_store(tmp_path, cache={"data/": {"mode": "writethrough", "max_age": "1"}}) + with pytest.raises(ValueError): + make_store(tmp_path, cache={"data/": {"mode": "writethrough", "size": -1}}) + with pytest.raises(ValueError): + make_store(tmp_path, cache={"data/": {"mode": "writethrough", "size": 1.5}}) + with pytest.raises(ValueError): + make_store(tmp_path, cache={"data/": {"mode": "writethrough", "size": "1"}}) with pytest.raises(ValueError): make_store(tmp_path, cache={"data/": {"mode": "writethrough", "unexpected": 1}}) with pytest.raises(ValueError): @@ -94,7 +99,7 @@ def test_cache_policy_dict_and_max_age_validation(tmp_path): def test_cache_policy_namedtuple_input(tmp_path): - store, _ = make_store(tmp_path, cache={"data/": CachePolicy(mode=CacheMode.C_WRITETHROUGH, max_age=60.0)}) + store, _ = make_store(tmp_path, cache={"data/": CachePolicy(mode=CacheMode.C_WRITETHROUGH, max_age=60.0, size=512)}) store.create() try: with store: @@ -430,6 +435,89 @@ def wrapped_delete(backend_name): store.destroy() +def test_close_cleans_up_lru_cache_items_by_size(tmp_path, monkeypatch): + store, _ = make_store(tmp_path, cache={"data/": {"mode": CacheMode.C_WRITETHROUGH, "size": 7}}) + store.create() + names_values = [("data/00000000", b"aaaa"), ("data/00000001", b"bbbb"), ("data/00000002", b"cccc")] + store.open() + for name, value in names_values: + store.store(name, value) + nested_names = [store.find(name) for name, _value in names_values] + + atimes = {nested_names[0]: 100.0, nested_names[1]: 200.0, nested_names[2]: 300.0} + sizes = {nested_name: 4 for nested_name in nested_names} + + monkeypatch.setattr("borgstore.store.time.time", lambda: 500.0) + original_list = store.cache_backend.list + deleted_names = [] + original_delete = store.cache_backend.delete + + def wrapped_list(backend_name): + for info in original_list(backend_name): + full_name = (backend_name + "/" + info.name) if backend_name else info.name + if full_name in atimes and info.exists: + yield info._replace(atime=atimes[full_name], size=sizes[full_name]) + else: + yield info + + def wrapped_delete(backend_name): + if backend_name in atimes: + deleted_names.append(backend_name) + return original_delete(backend_name) + + store.cache_backend.list = wrapped_list + store.cache_backend.delete = wrapped_delete + try: + store.close() + assert deleted_names == [nested_names[0], nested_names[1]] + finally: + store.cache_backend.list = original_list + store.cache_backend.delete = original_delete + store.destroy() + + +def test_close_cleans_up_expired_before_lru_size_eviction(tmp_path, monkeypatch): + store, _ = make_store(tmp_path, cache={"data/": {"mode": CacheMode.C_WRITETHROUGH, "max_age": 50, "size": 7}}) + store.create() + names_values = [("data/00000000", b"aaaa"), ("data/00000001", b"bbbb"), ("data/00000002", b"cccc")] + store.open() + for name, value in names_values: + store.store(name, value) + nested_names = [store.find(name) for name, _value in names_values] + + now = 1000.0 + atimes = {nested_names[0]: 900.0, nested_names[1]: 990.0, nested_names[2]: 995.0} + sizes = {nested_name: 4 for nested_name in nested_names} + + monkeypatch.setattr("borgstore.store.time.time", lambda: now) + original_list = store.cache_backend.list + deleted_names = [] + original_delete = store.cache_backend.delete + + def wrapped_list(backend_name): + for info in original_list(backend_name): + full_name = (backend_name + "/" + info.name) if backend_name else info.name + if full_name in atimes and info.exists: + yield info._replace(atime=atimes[full_name], size=sizes[full_name]) + else: + yield info + + def wrapped_delete(backend_name): + if backend_name in atimes: + deleted_names.append(backend_name) + return original_delete(backend_name) + + store.cache_backend.list = wrapped_list + store.cache_backend.delete = wrapped_delete + try: + store.close() + assert deleted_names == [nested_names[0], nested_names[1]] + finally: + store.cache_backend.list = original_list + store.cache_backend.delete = original_delete + store.destroy() + + def test_close_cleanup_errors_are_best_effort(tmp_path): store, _ = make_store(tmp_path, cache={"data/": {"mode": CacheMode.C_WRITETHROUGH, "max_age": 5}}) store.create() From 106eba697cc70d1e53c6de4e6271158643d0af70 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Mon, 25 May 2026 23:38:19 +0200 Subject: [PATCH 6/6] Apply latency/bandwidth emulation only to primary backend calls ... not to cache backend calls. --- docs/store.rst | 31 +++++++++++++-- src/borgstore/store.py | 63 ++++++++++++++++++------------ tests/test_cache.py | 88 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 154 insertions(+), 28 deletions(-) diff --git a/docs/store.rst b/docs/store.rst index cf5e37e..de6404e 100644 --- a/docs/store.rst +++ b/docs/store.rst @@ -22,14 +22,39 @@ API can be much simpler: - defrag: general purpose defragmentation helper (copies blocks to new items) - quota: return quota limit and usage (-1 if quotas not enabled or not supported) - stats: API call counters, time spent in API methods, data volume/throughput. -- latency/bandwidth emulator: can emulate higher latency (via BORGSTORE_LATENCY - [us]) and lower bandwidth (via BORGSTORE_BANDWIDTH [bit/s]) than what is - actually provided by the backend. +- latency/bandwidth emulator: see :ref:`store-latency-bandwidth-emulator`. Store operations (and per-op timing and volume) are logged at DEBUG log level. See also :doc:`store_caching` for optional Store-level caching with a secondary backend. + +.. _store-latency-bandwidth-emulator: + +Latency and bandwidth emulator +------------------------------ + +The Store can emulate slower backend behavior using environment variables: + +- ``BORGSTORE_LATENCY``: per-primary-call latency in microseconds (``[us]``). +- ``BORGSTORE_BANDWIDTH``: primary-call bandwidth limit in bits per second + (``[bit/s]``). + +Current behavior with Store caching enabled: + +- Emulation applies to **primary backend** operations. +- Emulation does **not** apply to **cache backend** operations. + +This means: + +- On cache miss paths (for example writethrough/mirror reads that load from the + primary backend), emulation affects the primary backend calls. +- On cache hit paths, cached reads avoid primary backend load operations and + therefore do not incur emulated bandwidth delay for the cache backend read. +- Name resolution for Store operations still uses primary backend lookups, + therefore configured latency can still be visible even when data comes from + cache. + Keys ---- diff --git a/src/borgstore/store.py b/src/borgstore/store.py index 5eeb8e9..b110bd3 100644 --- a/src/borgstore/store.py +++ b/src/borgstore/store.py @@ -311,23 +311,28 @@ def quota(self) -> dict: @contextmanager def _stats_updater(self, key, msg): - """update call counters and overall times, also emulate latency and bandwidth""" + """update call counters and overall times""" # do not use this in generators! - volume_before = self._stats_get_volume(key) start = time.perf_counter_ns() yield + end = time.perf_counter_ns() + overall_time = end - start + self._stats[f"{key}_calls"] += 1 + self._stats[f"{key}_time"] += overall_time + logger.debug(f"borgstore: {msg} in {overall_time / 1e6:0.1f}ms") + + def _backend_call(self, operation, *, volume=0): + # latency and bandwidth emulation is only applied to (primary) + # backend calls, not to (secondary) cache backend calls. + start = time.perf_counter_ns() + result = operation() be_needed_ns = time.perf_counter_ns() - start - volume_after = self._stats_get_volume(key) - volume = volume_after - volume_before + volume = volume(result) if callable(volume) else volume emulated_time = self.latency + (0 if not self.bandwidth else float(volume) / self.bandwidth) remaining_time = emulated_time - be_needed_ns / 1e9 if remaining_time > 0.0: time.sleep(remaining_time) - end = time.perf_counter_ns() - overall_time = end - start - self._stats[f"{key}_calls"] += 1 - self._stats[f"{key}_time"] += overall_time - logger.debug(f"borgstore: {msg} -> {volume}B in {overall_time / 1e6:0.1f}ms") + return result def _stats_update_volume(self, key, amount): self._stats[f"{key}_volume"] += amount @@ -451,22 +456,28 @@ def find(self, name: str, *, deleted=False) -> str: def info(self, name: str, *, deleted=False) -> ItemInfo: with self._stats_updater("info", f"info({name!r}, deleted={deleted})"): - return self.backend.info(self.find(name, deleted=deleted)) + return self._backend_call(lambda: self.backend.info(self.find(name, deleted=deleted)), volume=0) def load(self, name: str, *, size=None, offset=0, deleted=False) -> bytes: with self._stats_updater("load", f"load({name!r}, offset={offset}, size={size}, deleted={deleted})"): cache_policy = self._cache_policy_for(name) - nested_name = self.find(name, deleted=deleted) + nested_name = self._backend_call(lambda: self.find(name, deleted=deleted), volume=0) if cache_policy.mode == CacheMode.C_WRITETHROUGH: full_value = self._cache_get(nested_name, max_age=cache_policy.max_age) if full_value is None: - full_value = self.backend.load(nested_name, size=None, offset=0) + full_value = self._backend_call( + lambda: self.backend.load(nested_name, size=None, offset=0), volume=lambda value: len(value) + ) self._cache_put(nested_name, full_value) elif cache_policy.mode == CacheMode.C_MIRROR: - full_value = self.backend.load(nested_name, size=None, offset=0) + full_value = self._backend_call( + lambda: self.backend.load(nested_name, size=None, offset=0), volume=lambda value: len(value) + ) self._cache_put(nested_name, full_value) else: - result = self.backend.load(nested_name, size=size, offset=offset) + result = self._backend_call( + lambda: self.backend.load(nested_name, size=size, offset=offset), volume=lambda value: len(value) + ) self._stats_update_volume("load", len(result)) return result result = full_value[offset : (None if size is None else offset + size)] @@ -478,15 +489,17 @@ def store(self, name: str, value: bytes) -> None: # - overwrite an existing item (level stays same) # - write to the last level if no existing item is found. with self._stats_updater("store", f"store({name!r})"): - nested_name = self.find(name) - self.backend.store(nested_name, value) + nested_name = self._backend_call(lambda: self.find(name), volume=0) + self._backend_call(lambda: self.backend.store(nested_name, value), volume=len(value)) if self._cache_policy_for(name).mode in {CacheMode.C_WRITETHROUGH, CacheMode.C_MIRROR}: self._cache_put(nested_name, value) self._stats_update_volume("store", len(value)) def hash(self, name: str, algorithm: str = "sha256", *, deleted: bool = False) -> str: with self._stats_updater("hash", f"hash({name!r}, algorithm={algorithm!r}, deleted={deleted})"): - return self.backend.hash(self.find(name, deleted=deleted), algorithm=algorithm) + return self._backend_call( + lambda: self.backend.hash(self.find(name, deleted=deleted), algorithm=algorithm), volume=0 + ) def delete(self, name: str, *, deleted=False) -> None: """ @@ -495,8 +508,8 @@ def delete(self, name: str, *, deleted=False) -> None: See also .move(name, delete=True) for "soft" deletion. """ with self._stats_updater("delete", f"delete({name!r}, deleted={deleted})"): - nested_name = self.find(name, deleted=deleted) - self.backend.delete(nested_name) + nested_name = self._backend_call(lambda: self.find(name, deleted=deleted), volume=0) + self._backend_call(lambda: self.backend.delete(nested_name), volume=0) if self._cache_policy_for(name).mode in {CacheMode.C_WRITETHROUGH, CacheMode.C_MIRROR}: self._cache_invalidate(nested_name) @@ -512,29 +525,29 @@ def move( ) -> None: if delete: # use case: keep name, but soft "delete" the item - nested_name = self.find(name, deleted=False) + nested_name = self._backend_call(lambda: self.find(name, deleted=False), volume=0) nested_new_name = nested_name + DEL_SUFFIX msg = f"soft_delete({name!r}, deleted={deleted})" elif undelete: # use case: keep name, undelete a previously soft "deleted" item - nested_name = self.find(name, deleted=True) + nested_name = self._backend_call(lambda: self.find(name, deleted=True), volume=0) nested_new_name = nested_name.removesuffix(DEL_SUFFIX) msg = f"soft_undelete({name!r}, deleted={deleted})" elif change_level: # use case: keep name, changing to another nesting level suffix = DEL_SUFFIX if deleted else None - nested_name = self.find(name, deleted=deleted) + nested_name = self._backend_call(lambda: self.find(name, deleted=deleted), volume=0) nested_new_name = nest(name, self._get_levels(name)[-1], add_suffix=suffix) msg = f"change_level({name!r}, deleted={deleted})" else: # generic use (be careful!) if not new_name: raise ValueError("Generic move requires new_name to be given.") - nested_name = self.find(name, deleted=deleted) - nested_new_name = self.find(new_name, deleted=deleted) + nested_name = self._backend_call(lambda: self.find(name, deleted=deleted), volume=0) + nested_new_name = self._backend_call(lambda: self.find(new_name, deleted=deleted), volume=0) msg = f"rename({name!r}, {new_name!r}, deleted={deleted})" with self._stats_updater("move", msg + f" [{nested_name!r}, {nested_new_name!r}]"): - self.backend.move(nested_name, nested_new_name) + self._backend_call(lambda: self.backend.move(nested_name, nested_new_name), volume=0) if self._cache_policy_for(name).mode in {CacheMode.C_WRITETHROUGH, CacheMode.C_MIRROR}: self._cache_move(nested_name, nested_new_name) diff --git a/tests/test_cache.py b/tests/test_cache.py index 8011fec..83e2ead 100644 --- a/tests/test_cache.py +++ b/tests/test_cache.py @@ -1,6 +1,7 @@ """Tests for Store optional cache behavior.""" import pytest +import borgstore.store as store_module from borgstore.backends.errors import ObjectNotFound from borgstore.constants import DEL_SUFFIX @@ -548,3 +549,90 @@ def wrapped_close(): store.cache_backend.list = original_list store.cache_backend.close = original_close store.destroy() + + +def test_latency_emulation_not_applied_to_cache_backend_calls(tmp_path, monkeypatch): + monkeypatch.setenv("BORGSTORE_LATENCY", "200000") + store, _ = make_store(tmp_path, cache={"data/": {"mode": CacheMode.C_WRITETHROUGH}}) + store.create() + try: + with store: + name, value = "data/00000000", b"abc" + store.store(name, value) + + nested = store.find(name) + store._cache_invalidate(nested) + + original_primary_load = store.backend.load + primary_calls = {"count": 0} + + def wrapped_primary_load(backend_name, size=None, offset=0): + primary_calls["count"] += 1 + return original_primary_load(backend_name, size=size, offset=offset) + + sleep_calls = [] + original_sleep = store_module.time.sleep + + def wrapped_sleep(seconds): + sleep_calls.append(seconds) + + monkeypatch.setattr("borgstore.store.time.sleep", wrapped_sleep) + store.backend.load = wrapped_primary_load + try: + assert store.load(name) == value + assert primary_calls["count"] == 1 + sleeps_after_miss = len(sleep_calls) + assert sleeps_after_miss >= 1 + + assert store.load(name) == value + assert primary_calls["count"] == 1 + # second call still does primary find() checks, but must avoid a primary load() + assert len(sleep_calls) == sleeps_after_miss + 1 + finally: + store.backend.load = original_primary_load + monkeypatch.setattr("borgstore.store.time.sleep", original_sleep) + finally: + store.destroy() + + +def test_bandwidth_emulation_not_applied_to_cache_backend_calls(tmp_path, monkeypatch): + monkeypatch.setenv("BORGSTORE_LATENCY", "0") + monkeypatch.setenv("BORGSTORE_BANDWIDTH", "8") # 1 byte/s + store, _ = make_store(tmp_path, cache={"data/": {"mode": CacheMode.C_WRITETHROUGH}}) + store.create() + try: + with store: + name, value = "data/00000000", b"abc" + store.store(name, value) + nested = store.find(name) + store._cache_invalidate(nested) + + primary_calls = {"count": 0} + original_primary_load = store.backend.load + + def wrapped_primary_load(backend_name, size=None, offset=0): + primary_calls["count"] += 1 + return original_primary_load(backend_name, size=size, offset=offset) + + sleep_calls = [] + original_sleep = store_module.time.sleep + + def wrapped_sleep(seconds): + sleep_calls.append(seconds) + + monkeypatch.setattr("borgstore.store.time.sleep", wrapped_sleep) + store.backend.load = wrapped_primary_load + try: + assert store.load(name) == value + assert primary_calls["count"] == 1 + sleeps_after_miss = len(sleep_calls) + assert any(seconds >= 2.9 for seconds in sleep_calls) + + assert store.load(name) == value + assert primary_calls["count"] == 1 + assert len(sleep_calls) == sleeps_after_miss + finally: + store.backend.load = original_primary_load + monkeypatch.setattr("borgstore.store.time.sleep", original_sleep) + finally: + store.destroy()