diff --git a/src/borg/archive.py b/src/borg/archive.py index a6d1f325fc..317ffb04e9 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -23,7 +23,7 @@ from . import xattr from .chunkers import get_chunker, Chunk -from .cache import ChunkListEntry, build_chunkindex_from_repo, delete_chunkindex_cache +from .cache import ChunkListEntry, build_chunkindex_from_repo, delete_chunkindex_from_repo from .crypto.key import key_factory, UnsupportedPayloadError from .constants import * # NOQA from .crypto.low_level import IntegrityError as IntegrityErrorBase @@ -2164,10 +2164,10 @@ def valid_item(obj): def finish(self): if self.repair: - # we may have deleted chunks. delete_chunkindex_cache() removes the on-disk cache and + # we may have deleted chunks. delete_chunkindex_from_repo() removes the on-disk index and # drops the stale in-memory index, so the next repository access rebuilds it from the repo. - logger.info("Deleting chunks cache in repository - next repository access will cause a rebuild.") - delete_chunkindex_cache(self.repository) + logger.info("Deleting chunk indexes in repository - next repository access will cause a rebuild.") + delete_chunkindex_from_repo(self.repository) logger.info("Writing Manifest.") self.manifest.write() diff --git a/src/borg/archiver/compact_cmd.py b/src/borg/archiver/compact_cmd.py index 335826d26b..0820d803a9 100644 --- a/src/borg/archiver/compact_cmd.py +++ b/src/borg/archiver/compact_cmd.py @@ -2,7 +2,7 @@ from ._common import with_repository from ..archive import Archive -from ..cache import write_chunkindex_to_repo_cache, build_chunkindex_from_repo, delete_chunkindex_cache +from ..cache import write_chunkindex_to_repo, build_chunkindex_from_repo, delete_chunkindex_from_repo from ..cache import files_cache_name, discover_files_cache_names from ..helpers import get_cache_dir from ..helpers.argparsing import ArgumentParser @@ -70,7 +70,7 @@ def save_chunk_index(self): # and also remove all older cached chunk indexes. # write_chunkindex_to_repo now removes all flags and size infos. # we need this, as we put the wrong size in there to support --stats computations. - write_chunkindex_to_repo_cache( + write_chunkindex_to_repo( self.repository, self.chunks, incremental=False, clear=True, force_write=True, delete_other=True ) self.chunks = None # nothing there (cleared!) @@ -179,13 +179,13 @@ def report_and_delete(self): logger.info(f"Deleting {len(unused)} unused objects...") if unused: # Before deleting any repository object, invalidate all centrally cached chunk indexes. - # Otherwise, if we get interrupted within the deletion loop, the still-existing cache/chunks.* + # Otherwise, if we get interrupted within the deletion loop, the still-existing index/* # would claim that already-deleted objects are still present. A later "borg create" would then # trust that stale index, not re-upload the affected chunks and silently create an archive with # dangling object references (see issue #9748). By removing the cached indexes first, an # interruption is conservative: clients must rebuild the index from actual repository contents # and will re-upload any deleted data. save_chunk_index() writes a fresh, valid index afterwards. - delete_chunkindex_cache(self.repository) + delete_chunkindex_from_repo(self.repository) pi = ProgressIndicatorPercent( total=len(unused), msg="Deleting unused objects %3.1f%%", step=0.1, msgid="compact.report_and_delete" ) diff --git a/src/borg/cache.py b/src/borg/cache.py index 1353856bf9..b06ef4cdff 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -516,34 +516,30 @@ def memorize_file(self, hashed_path, path_hash, st, chunks): def list_chunkindex_hashes(repository): hashes = [] - for info in repository.store_list("cache"): + for info in repository.store_list("index"): info = ItemInfo(*info) # RPC does not give namedtuple - if info.name.startswith("chunks."): - hash = info.name.removeprefix("chunks.") - hashes.append(hash) + # in the index/ namespace, each object's name is the sha256 hash of its content. + hashes.append(info.name) hashes = sorted(hashes) - logger.debug(f"cached chunk indexes: {hashes}") + logger.debug(f"chunk indexes: {hashes}") return hashes -def delete_chunkindex_cache(repository): +def delete_chunkindex_from_repo(repository): hashes = list_chunkindex_hashes(repository) for hash in hashes: - cache_name = f"cache/chunks.{hash}" + index_name = f"index/{hash}" try: - repository.store_delete(cache_name) + repository.store_delete(index_name) except StoreObjectNotFound: pass - logger.debug(f"cached chunk indexes deleted: {hashes}") + logger.debug(f"chunk indexes deleted: {hashes}") # the in-memory index is now stale; drop it so close() does not write it back into the - # cache we just deleted. the next .chunks access rebuilds it from actual repo contents. + # index we just deleted. the next .chunks access rebuilds it from actual repo contents. repository.invalidate_chunk_index() -CHUNKINDEX_HASH_SEED = b"0001" # increment seed to invalidate old chunk indexes - - -def write_chunkindex_to_repo_cache( +def write_chunkindex_to_repo( repository, chunks, *, incremental=True, clear=False, force_write=False, delete_other=False, delete_these=None ): # for now, we don't want to serialize the flags or the size: @@ -565,60 +561,62 @@ def write_chunkindex_to_repo_cache( if clear: # if we don't need the in-memory chunks index anymore: chunks.clear() # free memory, immediately - new_hash = hashlib.sha256(data + CHUNKINDEX_HASH_SEED).hexdigest() + # the index object's name in the repo is the pure sha256 of its content, so borgstore can verify + # it the same way as any other object. an incompatible index format from a different borg version + # is rejected by borghash's own versioned header (MAGIC + VERSION) when it is read back. + new_hash = hashlib.sha256(data).hexdigest() if num_to_write == 0 and not force_write: - # don't persist an empty incremental index: if it became the only cache/chunks.* (e.g. right - # after delete_chunkindex_cache()), build_chunkindex_from_repo() would return it as-is instead - # of rebuilding from the repo. with nothing new, the existing cache is already up to date. - logger.debug("no new chunks to cache; not writing an empty incremental chunk index.") + # don't persist an empty incremental index: if it became the only index/* (e.g. right + # after delete_chunkindex_from_repo()), build_chunkindex_from_repo() would return it as-is + # instead of rebuilding from the repo. with nothing new, the existing index is already up to date. + logger.debug("no new chunks to persist; not writing an empty incremental chunk index.") return new_hash - cached_hashes = list_chunkindex_hashes(repository) - if force_write or new_hash not in cached_hashes: - # when an updated chunks index is stored into the cache, we also store its hash as part of the name. - # when a client is loading the chunks index from a cache, it has to compare its content - # hash against the hash in its name. if it is the same, the cache is valid. - # if it is different, the cache is either corrupted or out of date and has to be discarded. - # when some functionality is DELETING chunks from the repository, it has to delete - # all existing cache/chunks.* and maybe write a new, valid cache/chunks., + stored_hashes = list_chunkindex_hashes(repository) + if force_write or new_hash not in stored_hashes: + # an index object is stored as index/, where is the sha256 of its content. + # when a client loads an index object, it compares the content hash against the hash in its + # name. if it is the same, the object is valid. if it is different, it is either corrupted or + # out of date and has to be discarded. when some functionality is DELETING chunks from the + # repository, it has to delete all existing index/* and maybe write a new, valid index/, # so that all clients will discard any client-local chunks index caches. - cache_name = f"cache/chunks.{new_hash}" - logger.debug(f"caching chunks index as {cache_name} in repository...") - repository.store_store(cache_name, data) + index_name = f"index/{new_hash}" + logger.debug(f"storing chunks index as {index_name} in repository...") + repository.store_store(index_name, data) # we have successfully stored to the repository, so we can clear all F_NEW flags now: chunks.clear_new() - # delete some not needed cached chunk indexes, but never the one we just wrote: + # delete some no longer needed index objects, but never the one we just wrote: if delete_other: - delete_these = set(cached_hashes) - {new_hash} + delete_these = set(stored_hashes) - {new_hash} elif delete_these: delete_these = set(delete_these) - {new_hash} else: delete_these = set() for hash in delete_these: - cache_name = f"cache/chunks.{hash}" + index_name = f"index/{hash}" try: - repository.store_delete(cache_name) + repository.store_delete(index_name) except StoreObjectNotFound: pass if delete_these: - logger.debug(f"cached chunk indexes deleted: {delete_these}") + logger.debug(f"chunk indexes deleted: {delete_these}") return new_hash -def read_chunkindex_from_repo_cache(repository, hash): - cache_name = f"cache/chunks.{hash}" - logger.debug(f"trying to load {cache_name} from the repo...") +def read_chunkindex_from_repo(repository, hash): + index_name = f"index/{hash}" + logger.debug(f"trying to load {index_name} from the repo...") try: - chunks_data = repository.store_load(cache_name) + chunks_data = repository.store_load(index_name) except StoreObjectNotFound: - logger.debug(f"{cache_name} not found in the repository.") + logger.debug(f"{index_name} not found in the repository.") else: - if hashlib.sha256(chunks_data + CHUNKINDEX_HASH_SEED).digest() == hex_to_bin(hash): - logger.debug(f"{cache_name} is valid.") + if hashlib.sha256(chunks_data).digest() == hex_to_bin(hash): + logger.debug(f"{index_name} is valid.") with io.BytesIO(chunks_data) as f: chunks = ChunkIndex.read(f) return chunks else: - logger.debug(f"{cache_name} is invalid.") + logger.debug(f"{index_name} is invalid.") def build_chunkindex_from_repo(repository, *, disable_caches=False, cache_immediately=False): @@ -629,7 +627,7 @@ def build_chunkindex_from_repo(repository, *, disable_caches=False, cache_immedi merged = 0 chunks = ChunkIndex() # we'll merge all we find into this for hash in hashes: - chunks_to_merge = read_chunkindex_from_repo_cache(repository, hash) + chunks_to_merge = read_chunkindex_from_repo(repository, hash) if chunks_to_merge is not None: logger.debug(f"cached chunk index {hash} gets merged...") for k, v in chunks_to_merge.items(): @@ -638,10 +636,8 @@ def build_chunkindex_from_repo(repository, *, disable_caches=False, cache_immedi chunks_to_merge.clear() if merged > 0: if merged > 1 and cache_immediately: - # immediately update cache/chunks, so we don't have to merge these again: - write_chunkindex_to_repo_cache( - repository, chunks, clear=False, force_write=True, delete_these=hashes - ) + # immediately update the index, so we don't have to merge these again: + write_chunkindex_to_repo(repository, chunks, clear=False, force_write=True, delete_these=hashes) else: chunks.clear_new() return chunks @@ -675,8 +671,8 @@ def build_chunkindex_from_repo(repository, *, disable_caches=False, cache_immedi speed = format_file_size(num_chunks * 34 / duration) logger.debug(f"queried {num_chunks} chunk IDs in {duration} s, ~{speed}/s") if cache_immediately: - # immediately update cache/chunks, so we only rarely have to do it the slow way: - write_chunkindex_to_repo_cache(repository, chunks, clear=False, force_write=True, delete_other=True) + # immediately update the index, so we only rarely have to do it the slow way: + write_chunkindex_to_repo(repository, chunks, clear=False, force_write=True, delete_other=True) return chunks @@ -699,7 +695,7 @@ def chunks(self): # building a second one and pushing it back into the repository. self._chunks = self.repository.chunks # note: we deliberately do NOT consolidate the cached chunk index fragments here. - # each backup writes a small incremental cache/chunks.* fragment (only its new chunks), + # each backup writes a small incremental index/* fragment (only its new chunks), # which is cheap. collapsing them all into one big fragment on every run would re-upload # the whole index and, with delete_other, invalidate every other client's fragments -- # a multi-GB churn per run on a shared repo. fragment count is reclaimed by `borg compact` @@ -766,7 +762,7 @@ def add_chunk( def _maybe_write_chunks_cache(self, now, force=False, clear=False): if force or now > self.chunks_cache_last_write + self.chunks_cache_write_td: if self._chunks is not None: - write_chunkindex_to_repo_cache(self.repository, self._chunks, clear=clear) + write_chunkindex_to_repo(self.repository, self._chunks, clear=clear) self.chunks_cache_last_write = now def refresh_lock(self, now): @@ -867,7 +863,7 @@ def close(self): for key, value in sorted(self._chunks.stats.items()): logger.debug(f"Chunks index stats: {key}: {value}") pi.output("Saving chunks cache") - # note: cache/chunks.* in repo has a different integrity mechanism + # note: index/* in repo has a different integrity mechanism now = datetime.now(UTC) self._maybe_write_chunks_cache(now, force=True, clear=True) self._chunks = None # nothing there (cleared!) diff --git a/src/borg/hashindex.pyx b/src/borg/hashindex.pyx index cc280528e7..bfa4d164ad 100644 --- a/src/borg/hashindex.pyx +++ b/src/borg/hashindex.pyx @@ -53,7 +53,7 @@ class ChunkIndex(HTProxyMixin, MutableMapping): F_USED = 2 ** 0 # chunk is used/referenced F_COMPRESS = 2 ** 1 # chunk shall get (re-)compressed # system flags (internal use, always 0 to user, not changeable by user): - F_NEW = 2 ** 24 # a new chunk that is not present in repo/cache/chunks.* yet. + F_NEW = 2 ** 24 # a new chunk that is not present in repo index/* yet. def __init__(self, capacity=1000, path=None, usable=None): if path: diff --git a/src/borg/repository.py b/src/borg/repository.py index e30421436a..1592ff2314 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -48,8 +48,9 @@ def borg_permissions(permissions): return { "": "lr", "archives": "lrw", - "cache": "lrwWD", # WD for chunks., last-key-checked, ... + "cache": "lrwWD", # WD for last-key-checked, ... "config": "lrW", # W for manifest + "index": "lrwWD", # WD for index/ (merge/compaction of incremental indexes) "keys": "lr", "locks": "lrwD", # borg needs to create/delete a shared lock here "packs": "lrw", @@ -58,8 +59,9 @@ def borg_permissions(permissions): return { "": "l", "archives": "lw", - "cache": "lrwWD", # read allowed, e.g. for chunks. cache + "cache": "lrwWD", # TODO: check more restrictive permissions "config": "lrW", # W for manifest + "index": "lrwWD", # read allowed so that borg create can check chunk presence for deduplication "keys": "lr", "locks": "lrwD", # borg needs to create/delete a shared lock here "packs": "lw", # no r! @@ -318,6 +320,7 @@ def __init__( "archives/": {"levels": [0]}, "cache/": {"levels": [0]}, "config/": {"levels": [0]}, + "index/": {"levels": [0]}, "keys/": {"levels": [0]}, "locks/": {"levels": [0]}, "packs/": {"levels": [1]}, @@ -395,9 +398,9 @@ def create(self): # listing them all might be rather slow, so we better cache an empty # ChunkIndex from here so that the first repo operation does not have # to build the ChunkIndex the slow way by listing all the directories. - from borg.cache import write_chunkindex_to_repo_cache + from borg.cache import write_chunkindex_to_repo - write_chunkindex_to_repo_cache(self, ChunkIndex(), clear=True, force_write=True) + write_chunkindex_to_repo(self, ChunkIndex(), clear=True, force_write=True) finally: self.store.close() @@ -543,9 +546,9 @@ def close(self): # this session (only F_NEW entries are serialized, and an empty incremental write is skipped). # guard on is_chunk_index_loaded so we never trigger a lazy rebuild just to persist on close. if self.store_opened and self.is_chunk_index_loaded: - from .cache import write_chunkindex_to_repo_cache + from .cache import write_chunkindex_to_repo - write_chunkindex_to_repo_cache(self, self.chunks, incremental=True) + write_chunkindex_to_repo(self, self.chunks, incremental=True) if self.lock: self.lock.release() self.lock = None @@ -679,9 +682,9 @@ def check_object(obj): pass if not partial: # if we did a full pass in one go, we built a complete, up-to-date ChunkIndex, cache it! - from .cache import write_chunkindex_to_repo_cache + from .cache import write_chunkindex_to_repo - write_chunkindex_to_repo_cache( + write_chunkindex_to_repo( self, chunks, incremental=False, clear=True, force_write=True, delete_other=True ) except StoreObjectNotFound: diff --git a/src/borg/testsuite/archiver/compact_cmd_test.py b/src/borg/testsuite/archiver/compact_cmd_test.py index ef87785a6d..ffcc802c05 100644 --- a/src/borg/testsuite/archiver/compact_cmd_test.py +++ b/src/borg/testsuite/archiver/compact_cmd_test.py @@ -93,7 +93,7 @@ def test_compact_interrupted_does_not_poison_chunk_index(archivers, request, mon """Regression test for issue #9748. If a compact is interrupted after it deleted repository objects but before it wrote the - updated chunk index, the still-existing cache/chunks.* must not claim that the deleted + updated chunk index, the still-existing index/* must not claim that the deleted objects are still present. Otherwise a later "borg create" trusts the stale index, does not re-upload the affected chunks and silently produces an archive with dangling object references (which extracts to zero bytes). diff --git a/src/borg/testsuite/cache_test.py b/src/borg/testsuite/cache_test.py index 06b2300424..dee76f8f03 100644 --- a/src/borg/testsuite/cache_test.py +++ b/src/borg/testsuite/cache_test.py @@ -6,7 +6,7 @@ from .hashindex_test import H from .crypto.key_test import TestKey from ..archive import Statistics -from ..cache import AdHocWithFilesCache, FileCacheEntry, delete_chunkindex_cache, read_chunkindex_from_repo_cache +from ..cache import AdHocWithFilesCache, FileCacheEntry, delete_chunkindex_from_repo, read_chunkindex_from_repo from ..crypto.key import AESOCBKey from ..helpers import safe_ns from ..helpers.msgpack import int_to_timestamp @@ -83,14 +83,14 @@ def test_no_change_backup_keeps_files_cache(self, repository, key, manifest): assert path_hash in loaded -def test_delete_chunkindex_cache_missing(tmp_path): - """delete_chunkindex_cache handles StoreObjectNotFound when cache entries do not exist.""" +def test_delete_chunkindex_from_repo_missing(tmp_path): + """delete_chunkindex_from_repo handles StoreObjectNotFound when index entries do not exist.""" from borgstore.store import ObjectNotFound as StoreObjectNotFound repository_location = os.fspath(tmp_path / "repository") with Repository(repository_location, exclusive=True, create=True) as repository: - # Create a cache entry so list_chunkindex_hashes finds it. - repository.store_store(f"cache/chunks.{'a' * 64}", b"data") + # Create an index entry so list_chunkindex_hashes finds it. + repository.store_store(f"index/{'a' * 64}", b"data") # Patch store_delete to raise StoreObjectNotFound (simulates a race or already-deleted entry). original_store_delete = repository.store_delete @@ -99,27 +99,27 @@ def failing_store_delete(name): repository.store_delete = failing_store_delete # Should not raise — the except StoreObjectNotFound catches it. - delete_chunkindex_cache(repository) + delete_chunkindex_from_repo(repository) repository.store_delete = original_store_delete -def test_read_chunkindex_from_repo_cache_missing(tmp_path): - """read_chunkindex_from_repo_cache handles StoreObjectNotFound when cache does not exist.""" +def test_read_chunkindex_from_repo_missing(tmp_path): + """read_chunkindex_from_repo handles StoreObjectNotFound when the index object does not exist.""" repository_location = os.fspath(tmp_path / "repository") with Repository(repository_location, exclusive=True, create=True) as repository: - # Try to load a non-existent cache entry — should return None, not raise. - result = read_chunkindex_from_repo_cache(repository, "f" * 64) + # Try to load a non-existent index entry — should return None, not raise. + result = read_chunkindex_from_repo(repository, "f" * 64) assert result is None def test_chunkindex_cache_not_consolidated_on_access(tmp_path): """ChunksMixin.chunks binds the repository index without collapsing the cached fragments. - Each backup leaves a small incremental cache/chunks.* fragment; collapsing them all into one + Each backup leaves a small incremental index/* fragment; collapsing them all into one on every access would re-upload the whole index and, with delete_other, invalidate every other client's fragments. Fragment count is reclaimed by `borg compact`, not on every read here. """ - from ..cache import ChunksMixin, write_chunkindex_to_repo_cache, list_chunkindex_hashes + from ..cache import ChunksMixin, write_chunkindex_to_repo, list_chunkindex_hashes from ..hashindex import ChunkIndex, ChunkIndexEntry repository_location = os.fspath(tmp_path / "repository") @@ -128,7 +128,7 @@ def test_chunkindex_cache_not_consolidated_on_access(tmp_path): for h in (H(1), H(2)): ci = ChunkIndex() ci[h] = ChunkIndexEntry(ChunkIndex.F_NEW, 0, h, 0, 4) - write_chunkindex_to_repo_cache(repository, ci, incremental=False, force_write=True) + write_chunkindex_to_repo(repository, ci, incremental=False, force_write=True) before = len(list_chunkindex_hashes(repository)) assert before > 1 diff --git a/src/borg/testsuite/repository_test.py b/src/borg/testsuite/repository_test.py index 1192e9f12a..5f5c8bdad9 100644 --- a/src/borg/testsuite/repository_test.py +++ b/src/borg/testsuite/repository_test.py @@ -101,7 +101,7 @@ def test_chunk_index_persisted_on_close(tmp_path): # repo can resolve pack locations without any manual hand-off. This proves the round-trip # by reading the persisted index back directly (not via a repo rescan, which at N=1 would # reconstruct the same entries and so could mask a broken persist step). - from ..cache import list_chunkindex_hashes, read_chunkindex_from_repo_cache + from ..cache import list_chunkindex_hashes, read_chunkindex_from_repo location = os.fspath(tmp_path / "repo") with Repository(location, exclusive=True, create=True) as repository: @@ -111,7 +111,7 @@ def test_chunk_index_persisted_on_close(tmp_path): with Repository(location, exclusive=True) as repository: persisted = ChunkIndex() for hash in list_chunkindex_hashes(repository): - fragment = read_chunkindex_from_repo_cache(repository, hash) + fragment = read_chunkindex_from_repo(repository, hash) if fragment is not None: for k, v in fragment.items(): persisted[k] = v @@ -191,7 +191,7 @@ class FailingPackStore: """Wraps a store but fails packs/* writes; every other call passes through to the inner store. Models the realistic failure where only a pack write broke while the rest of the repo (e.g. the - cache/chunks.* index) stays writable. In production PackWriter and the chunk index cache share + index/* objects) stay writable. In production PackWriter and the chunk index share one store, so a single object has to fail the pack write yet still let the index persist. """ @@ -262,11 +262,11 @@ def test_pack_writer_rolls_back_index_on_failed_store(): def test_failed_store_phantom_not_persisted(tmp_path): # The phantom must not survive into the persisted repo cache either: close() can write the # in-memory index on context exit, so the rollback has to happen before anything is serialized. - from ..cache import write_chunkindex_to_repo_cache, build_chunkindex_from_repo + from ..cache import write_chunkindex_to_repo, build_chunkindex_from_repo chunk_id = H(60) with Repository(str(tmp_path / "repo"), exclusive=True, create=True) as repository: - # fail only the pack write on the repository's own store; cache/chunks.* writes still work, + # fail only the pack write on the repository's own store; index/* writes still work, # so one store models "just the pack write broke" (PackWriter and the index cache share a # store in production). the failing store is thus load-bearing for every assertion below. repository.store = FailingPackStore(repository.store) @@ -275,7 +275,7 @@ def test_failed_store_phantom_not_persisted(tmp_path): pw.add(chunk_id, fchunk(b"DATA")) assert repository.chunks.get(chunk_id) is None # rolled back from the in-memory index ... # ... and persisting + reloading the cache (through that same store) does not bring it back: - write_chunkindex_to_repo_cache(repository, repository.chunks, incremental=True) + write_chunkindex_to_repo(repository, repository.chunks, incremental=True) reloaded = build_chunkindex_from_repo(repository) assert reloaded.get(chunk_id) is None