Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/borg/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

from . import xattr
from .chunkers import get_chunker, Chunk
from .cache import ChunkListEntry, build_chunkindex_from_repo, delete_chunkindex_cache
from .cache import ChunkListEntry, build_chunkindex_from_repo, delete_chunkindex_from_repo
from .crypto.key import key_factory, UnsupportedPayloadError
from .constants import * # NOQA
from .crypto.low_level import IntegrityError as IntegrityErrorBase
Expand Down Expand Up @@ -2164,10 +2164,10 @@ def valid_item(obj):

def finish(self):
if self.repair:
# we may have deleted chunks. delete_chunkindex_cache() removes the on-disk cache and
# we may have deleted chunks. delete_chunkindex_from_repo() removes the on-disk index and
# drops the stale in-memory index, so the next repository access rebuilds it from the repo.
logger.info("Deleting chunks cache in repository - next repository access will cause a rebuild.")
delete_chunkindex_cache(self.repository)
logger.info("Deleting chunk indexes in repository - next repository access will cause a rebuild.")
delete_chunkindex_from_repo(self.repository)
logger.info("Writing Manifest.")
self.manifest.write()

Expand Down
8 changes: 4 additions & 4 deletions src/borg/archiver/compact_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from ._common import with_repository
from ..archive import Archive
from ..cache import write_chunkindex_to_repo_cache, build_chunkindex_from_repo, delete_chunkindex_cache
from ..cache import write_chunkindex_to_repo, build_chunkindex_from_repo, delete_chunkindex_from_repo
from ..cache import files_cache_name, discover_files_cache_names
from ..helpers import get_cache_dir
from ..helpers.argparsing import ArgumentParser
Expand Down Expand Up @@ -70,7 +70,7 @@ def save_chunk_index(self):
# and also remove all older cached chunk indexes.
# write_chunkindex_to_repo now removes all flags and size infos.
# we need this, as we put the wrong size in there to support --stats computations.
write_chunkindex_to_repo_cache(
write_chunkindex_to_repo(
self.repository, self.chunks, incremental=False, clear=True, force_write=True, delete_other=True
)
self.chunks = None # nothing there (cleared!)
Expand Down Expand Up @@ -179,13 +179,13 @@ def report_and_delete(self):
logger.info(f"Deleting {len(unused)} unused objects...")
if unused:
# Before deleting any repository object, invalidate all centrally cached chunk indexes.
# Otherwise, if we get interrupted within the deletion loop, the still-existing cache/chunks.*
# Otherwise, if we get interrupted within the deletion loop, the still-existing index/*
# would claim that already-deleted objects are still present. A later "borg create" would then
# trust that stale index, not re-upload the affected chunks and silently create an archive with
# dangling object references (see issue #9748). By removing the cached indexes first, an
# interruption is conservative: clients must rebuild the index from actual repository contents
# and will re-upload any deleted data. save_chunk_index() writes a fresh, valid index afterwards.
delete_chunkindex_cache(self.repository)
delete_chunkindex_from_repo(self.repository)
pi = ProgressIndicatorPercent(
total=len(unused), msg="Deleting unused objects %3.1f%%", step=0.1, msgid="compact.report_and_delete"
)
Expand Down
102 changes: 49 additions & 53 deletions src/borg/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -516,34 +516,30 @@ def memorize_file(self, hashed_path, path_hash, st, chunks):

def list_chunkindex_hashes(repository):
hashes = []
for info in repository.store_list("cache"):
for info in repository.store_list("index"):
info = ItemInfo(*info) # RPC does not give namedtuple
if info.name.startswith("chunks."):
hash = info.name.removeprefix("chunks.")
hashes.append(hash)
# in the index/ namespace, each object's name is the sha256 hash of its content.
hashes.append(info.name)
hashes = sorted(hashes)
logger.debug(f"cached chunk indexes: {hashes}")
logger.debug(f"chunk indexes: {hashes}")
return hashes


def delete_chunkindex_cache(repository):
def delete_chunkindex_from_repo(repository):
hashes = list_chunkindex_hashes(repository)
for hash in hashes:
cache_name = f"cache/chunks.{hash}"
index_name = f"index/{hash}"
try:
repository.store_delete(cache_name)
repository.store_delete(index_name)
except StoreObjectNotFound:
pass
logger.debug(f"cached chunk indexes deleted: {hashes}")
logger.debug(f"chunk indexes deleted: {hashes}")
# the in-memory index is now stale; drop it so close() does not write it back into the
# cache we just deleted. the next .chunks access rebuilds it from actual repo contents.
# index we just deleted. the next .chunks access rebuilds it from actual repo contents.
repository.invalidate_chunk_index()


CHUNKINDEX_HASH_SEED = b"0001" # increment seed to invalidate old chunk indexes


def write_chunkindex_to_repo_cache(
def write_chunkindex_to_repo(
repository, chunks, *, incremental=True, clear=False, force_write=False, delete_other=False, delete_these=None
):
# for now, we don't want to serialize the flags or the size:
Expand All @@ -565,60 +561,62 @@ def write_chunkindex_to_repo_cache(
if clear:
# if we don't need the in-memory chunks index anymore:
chunks.clear() # free memory, immediately
new_hash = hashlib.sha256(data + CHUNKINDEX_HASH_SEED).hexdigest()
# the index object's name in the repo is the pure sha256 of its content, so borgstore can verify
# it the same way as any other object. an incompatible index format from a different borg version
# is rejected by borghash's own versioned header (MAGIC + VERSION) when it is read back.
new_hash = hashlib.sha256(data).hexdigest()
if num_to_write == 0 and not force_write:
# don't persist an empty incremental index: if it became the only cache/chunks.* (e.g. right
# after delete_chunkindex_cache()), build_chunkindex_from_repo() would return it as-is instead
# of rebuilding from the repo. with nothing new, the existing cache is already up to date.
logger.debug("no new chunks to cache; not writing an empty incremental chunk index.")
# don't persist an empty incremental index: if it became the only index/* (e.g. right
# after delete_chunkindex_from_repo()), build_chunkindex_from_repo() would return it as-is
# instead of rebuilding from the repo. with nothing new, the existing index is already up to date.
logger.debug("no new chunks to persist; not writing an empty incremental chunk index.")
return new_hash
cached_hashes = list_chunkindex_hashes(repository)
if force_write or new_hash not in cached_hashes:
# when an updated chunks index is stored into the cache, we also store its hash as part of the name.
# when a client is loading the chunks index from a cache, it has to compare its content
# hash against the hash in its name. if it is the same, the cache is valid.
# if it is different, the cache is either corrupted or out of date and has to be discarded.
# when some functionality is DELETING chunks from the repository, it has to delete
# all existing cache/chunks.* and maybe write a new, valid cache/chunks.<hash>,
stored_hashes = list_chunkindex_hashes(repository)
if force_write or new_hash not in stored_hashes:
# an index object is stored as index/<hash>, where <hash> is the sha256 of its content.
# when a client loads an index object, it compares the content hash against the hash in its
# name. if it is the same, the object is valid. if it is different, it is either corrupted or
# out of date and has to be discarded. when some functionality is DELETING chunks from the
# repository, it has to delete all existing index/* and maybe write a new, valid index/<hash>,
# so that all clients will discard any client-local chunks index caches.
cache_name = f"cache/chunks.{new_hash}"
logger.debug(f"caching chunks index as {cache_name} in repository...")
repository.store_store(cache_name, data)
index_name = f"index/{new_hash}"
logger.debug(f"storing chunks index as {index_name} in repository...")
repository.store_store(index_name, data)
# we have successfully stored to the repository, so we can clear all F_NEW flags now:
chunks.clear_new()
# delete some not needed cached chunk indexes, but never the one we just wrote:
# delete some no longer needed index objects, but never the one we just wrote:
if delete_other:
delete_these = set(cached_hashes) - {new_hash}
delete_these = set(stored_hashes) - {new_hash}
elif delete_these:
delete_these = set(delete_these) - {new_hash}
else:
delete_these = set()
for hash in delete_these:
cache_name = f"cache/chunks.{hash}"
index_name = f"index/{hash}"
try:
repository.store_delete(cache_name)
repository.store_delete(index_name)
except StoreObjectNotFound:
pass
if delete_these:
logger.debug(f"cached chunk indexes deleted: {delete_these}")
logger.debug(f"chunk indexes deleted: {delete_these}")
return new_hash


def read_chunkindex_from_repo_cache(repository, hash):
cache_name = f"cache/chunks.{hash}"
logger.debug(f"trying to load {cache_name} from the repo...")
def read_chunkindex_from_repo(repository, hash):
index_name = f"index/{hash}"
logger.debug(f"trying to load {index_name} from the repo...")
try:
chunks_data = repository.store_load(cache_name)
chunks_data = repository.store_load(index_name)
except StoreObjectNotFound:
logger.debug(f"{cache_name} not found in the repository.")
logger.debug(f"{index_name} not found in the repository.")
else:
if hashlib.sha256(chunks_data + CHUNKINDEX_HASH_SEED).digest() == hex_to_bin(hash):
logger.debug(f"{cache_name} is valid.")
if hashlib.sha256(chunks_data).digest() == hex_to_bin(hash):
logger.debug(f"{index_name} is valid.")
with io.BytesIO(chunks_data) as f:
chunks = ChunkIndex.read(f)
return chunks
else:
logger.debug(f"{cache_name} is invalid.")
logger.debug(f"{index_name} is invalid.")


def build_chunkindex_from_repo(repository, *, disable_caches=False, cache_immediately=False):
Expand All @@ -629,7 +627,7 @@ def build_chunkindex_from_repo(repository, *, disable_caches=False, cache_immedi
merged = 0
chunks = ChunkIndex() # we'll merge all we find into this
for hash in hashes:
chunks_to_merge = read_chunkindex_from_repo_cache(repository, hash)
chunks_to_merge = read_chunkindex_from_repo(repository, hash)
if chunks_to_merge is not None:
logger.debug(f"cached chunk index {hash} gets merged...")
for k, v in chunks_to_merge.items():
Expand All @@ -638,10 +636,8 @@ def build_chunkindex_from_repo(repository, *, disable_caches=False, cache_immedi
chunks_to_merge.clear()
if merged > 0:
if merged > 1 and cache_immediately:
# immediately update cache/chunks, so we don't have to merge these again:
write_chunkindex_to_repo_cache(
repository, chunks, clear=False, force_write=True, delete_these=hashes
)
# immediately update the index, so we don't have to merge these again:
write_chunkindex_to_repo(repository, chunks, clear=False, force_write=True, delete_these=hashes)
else:
chunks.clear_new()
return chunks
Expand Down Expand Up @@ -675,8 +671,8 @@ def build_chunkindex_from_repo(repository, *, disable_caches=False, cache_immedi
speed = format_file_size(num_chunks * 34 / duration)
logger.debug(f"queried {num_chunks} chunk IDs in {duration} s, ~{speed}/s")
if cache_immediately:
# immediately update cache/chunks, so we only rarely have to do it the slow way:
write_chunkindex_to_repo_cache(repository, chunks, clear=False, force_write=True, delete_other=True)
# immediately update the index, so we only rarely have to do it the slow way:
write_chunkindex_to_repo(repository, chunks, clear=False, force_write=True, delete_other=True)
return chunks


Expand All @@ -699,7 +695,7 @@ def chunks(self):
# building a second one and pushing it back into the repository.
self._chunks = self.repository.chunks
# note: we deliberately do NOT consolidate the cached chunk index fragments here.
# each backup writes a small incremental cache/chunks.* fragment (only its new chunks),
# each backup writes a small incremental index/* fragment (only its new chunks),
# which is cheap. collapsing them all into one big fragment on every run would re-upload
# the whole index and, with delete_other, invalidate every other client's fragments --
# a multi-GB churn per run on a shared repo. fragment count is reclaimed by `borg compact`
Expand Down Expand Up @@ -766,7 +762,7 @@ def add_chunk(
def _maybe_write_chunks_cache(self, now, force=False, clear=False):
if force or now > self.chunks_cache_last_write + self.chunks_cache_write_td:
if self._chunks is not None:
write_chunkindex_to_repo_cache(self.repository, self._chunks, clear=clear)
write_chunkindex_to_repo(self.repository, self._chunks, clear=clear)
self.chunks_cache_last_write = now

def refresh_lock(self, now):
Expand Down Expand Up @@ -867,7 +863,7 @@ def close(self):
for key, value in sorted(self._chunks.stats.items()):
logger.debug(f"Chunks index stats: {key}: {value}")
pi.output("Saving chunks cache")
# note: cache/chunks.* in repo has a different integrity mechanism
# note: index/* in repo has a different integrity mechanism
now = datetime.now(UTC)
self._maybe_write_chunks_cache(now, force=True, clear=True)
self._chunks = None # nothing there (cleared!)
Expand Down
2 changes: 1 addition & 1 deletion src/borg/hashindex.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class ChunkIndex(HTProxyMixin, MutableMapping):
F_USED = 2 ** 0 # chunk is used/referenced
F_COMPRESS = 2 ** 1 # chunk shall get (re-)compressed
# system flags (internal use, always 0 to user, not changeable by user):
F_NEW = 2 ** 24 # a new chunk that is not present in repo/cache/chunks.* yet.
F_NEW = 2 ** 24 # a new chunk that is not present in repo index/* yet.

def __init__(self, capacity=1000, path=None, usable=None):
if path:
Expand Down
19 changes: 11 additions & 8 deletions src/borg/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ def borg_permissions(permissions):
return {
"": "lr",
"archives": "lrw",
"cache": "lrwWD", # WD for chunks.<HASH>, last-key-checked, ...
"cache": "lrwWD", # WD for last-key-checked, ...
"config": "lrW", # W for manifest
"index": "lrwWD", # WD for index/<HASH> (merge/compaction of incremental indexes)
"keys": "lr",
"locks": "lrwD", # borg needs to create/delete a shared lock here
"packs": "lrw",
Expand All @@ -58,8 +59,9 @@ def borg_permissions(permissions):
return {
"": "l",
"archives": "lw",
"cache": "lrwWD", # read allowed, e.g. for chunks.<HASH> cache
"cache": "lrwWD", # TODO: check more restrictive permissions
"config": "lrW", # W for manifest
"index": "lrwWD", # read allowed so that borg create can check chunk presence for deduplication
"keys": "lr",
"locks": "lrwD", # borg needs to create/delete a shared lock here
"packs": "lw", # no r!
Expand Down Expand Up @@ -318,6 +320,7 @@ def __init__(
"archives/": {"levels": [0]},
"cache/": {"levels": [0]},
"config/": {"levels": [0]},
"index/": {"levels": [0]},
"keys/": {"levels": [0]},
"locks/": {"levels": [0]},
"packs/": {"levels": [1]},
Expand Down Expand Up @@ -395,9 +398,9 @@ def create(self):
# listing them all might be rather slow, so we better cache an empty
# ChunkIndex from here so that the first repo operation does not have
# to build the ChunkIndex the slow way by listing all the directories.
from borg.cache import write_chunkindex_to_repo_cache
from borg.cache import write_chunkindex_to_repo

write_chunkindex_to_repo_cache(self, ChunkIndex(), clear=True, force_write=True)
write_chunkindex_to_repo(self, ChunkIndex(), clear=True, force_write=True)
finally:
self.store.close()

Expand Down Expand Up @@ -543,9 +546,9 @@ def close(self):
# this session (only F_NEW entries are serialized, and an empty incremental write is skipped).
# guard on is_chunk_index_loaded so we never trigger a lazy rebuild just to persist on close.
if self.store_opened and self.is_chunk_index_loaded:
from .cache import write_chunkindex_to_repo_cache
from .cache import write_chunkindex_to_repo

write_chunkindex_to_repo_cache(self, self.chunks, incremental=True)
write_chunkindex_to_repo(self, self.chunks, incremental=True)
if self.lock:
self.lock.release()
self.lock = None
Expand Down Expand Up @@ -679,9 +682,9 @@ def check_object(obj):
pass
if not partial:
# if we did a full pass in one go, we built a complete, up-to-date ChunkIndex, cache it!
from .cache import write_chunkindex_to_repo_cache
from .cache import write_chunkindex_to_repo

write_chunkindex_to_repo_cache(
write_chunkindex_to_repo(
self, chunks, incremental=False, clear=True, force_write=True, delete_other=True
)
except StoreObjectNotFound:
Expand Down
2 changes: 1 addition & 1 deletion src/borg/testsuite/archiver/compact_cmd_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def test_compact_interrupted_does_not_poison_chunk_index(archivers, request, mon
"""Regression test for issue #9748.

If a compact is interrupted after it deleted repository objects but before it wrote the
updated chunk index, the still-existing cache/chunks.* must not claim that the deleted
updated chunk index, the still-existing index/* must not claim that the deleted
objects are still present. Otherwise a later "borg create" trusts the stale index, does
not re-upload the affected chunks and silently produces an archive with dangling object
references (which extracts to zero bytes).
Expand Down
Loading
Loading