-
-
Notifications
You must be signed in to change notification settings - Fork 857
repository: make check and delete work at N=1 with sha256 pack ids #9783
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -32,6 +32,7 @@ | |
| from .crypto.file_integrity import IntegrityCheckedFile, FileIntegrityError | ||
| from .manifest import Manifest | ||
| from .platform import SaveFile | ||
| from .repoobj import RepoObj | ||
| from .repository import LIST_SCAN_LIMIT, Repository, StoreObjectNotFound, repo_lister | ||
| from .security import SecurityManager, assert_secure # noqa: F401 | ||
|
|
||
|
|
@@ -619,7 +620,9 @@ def read_chunkindex_from_repo(repository, hash): | |
| logger.debug(f"{index_name} is invalid.") | ||
|
|
||
|
|
||
| def build_chunkindex_from_repo(repository, *, disable_caches=False, cache_immediately=False): | ||
| def build_chunkindex_from_repo( | ||
| repository, *, disable_caches=False, cache_immediately=False, init_flags=ChunkIndex.F_USED | ||
| ): | ||
| # first, try to build a fresh, mostly complete chunk index from centrally cached chunk indexes: | ||
| if not disable_caches: | ||
| hashes = list_chunkindex_hashes(repository) | ||
|
|
@@ -642,26 +645,32 @@ def build_chunkindex_from_repo(repository, *, disable_caches=False, cache_immedi | |
| chunks.clear_new() | ||
| return chunks | ||
| # if we didn't get anything from the cache, compute the ChunkIndex the slow way: | ||
| logger.debug("querying the chunk IDs list from the repo...") | ||
| logger.debug("rebuilding the chunk index from the repo the slow way...") | ||
| chunks = ChunkIndex() | ||
| t0 = perf_counter() | ||
| num_chunks = 0 | ||
| # The repo says it has these chunks, so we assume they are referenced/used chunks. | ||
| # We do not know the plaintext size (!= stored_size), thus we set size = 0. | ||
| # | ||
| # IMPORTANT (N=1 only): listing yields pack_ids, not per-chunk locations. We can only | ||
| # reconstruct the index here under the N=1 assumption -- pack_id == chunk_id, one chunk per | ||
| # pack at offset 0 spanning the whole pack. At N>1 this is wrong: a cold rebuild would have to | ||
| # open each pack and read its header to recover the per-chunk offsets and sizes. Until that | ||
| # exists, Repository.get()'s range-load is only correct while a persisted/cached chunk index | ||
| # is available; a cold rebuild from a bare repo listing silently falls back to N=1 semantics. | ||
| for pack_id, pack_size in repo_lister(repository, limit=LIST_SCAN_LIMIT): | ||
| num_chunks += 1 | ||
| chunk_id = pack_id # N=1: chunk_id == pack_id | ||
| obj_size = pack_size # true for N=1 | ||
| chunks[chunk_id] = ChunkIndexEntry( | ||
| flags=ChunkIndex.F_USED, size=0, pack_id=pack_id, obj_offset=0, obj_size=obj_size | ||
| ) | ||
| # By default we assume the repo's chunks are used; callers that compute usage themselves | ||
| # (e.g. compact) pass init_flags=F_NONE. Plaintext size is unknown here (!= stored size), so size=0. | ||
| if isinstance(repository, Repository): | ||
| # Read the pack object headers at the store level. Don't call Repository.list() here: it | ||
| # iterates this same index we are building, so it would recurse. The headers also give each | ||
| # object's real (chunk_id, offset, size), so this is not limited to one object per pack. | ||
| for info in repository.store_list("packs"): | ||
| pack_id = hex_to_bin(info.name) | ||
| pack = repository.store_load("packs/" + info.name) | ||
| for chunk_id, obj_offset, obj_size in RepoObj.iter_object_headers(pack): | ||
| num_chunks += 1 | ||
| chunks[chunk_id] = ChunkIndexEntry( | ||
| flags=init_flags, size=0, pack_id=pack_id, obj_offset=obj_offset, obj_size=obj_size | ||
| ) | ||
|
Comment on lines
+658
to
+665
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will read all the data that is stored in all packs. There needs to be at least a TODO to improve efficiency by only loading the RepoObj headers from the packfile and not also all metadata and data.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is also bit tricky considering how the borgstore internal caching works right now (IF the Store is configured with a cache): on first access, it will load the complete pack. So even if borg would only read little from the pack, it would fetch the whole pack anyway. |
||
| else: | ||
| # Legacy repo: list() reads its own segment index (no recursion). get() routes through that | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That "segment index" used to be called "repository index" in borg1.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is that legacy part even used? By what? |
||
| # index, so the pack_id/offset fields here are just placeholders. | ||
| for chunk_id, stored_size in repo_lister(repository, limit=LIST_SCAN_LIMIT): | ||
| num_chunks += 1 | ||
| chunks[chunk_id] = ChunkIndexEntry( | ||
| flags=init_flags, size=0, pack_id=chunk_id, obj_offset=0, obj_size=stored_size | ||
| ) | ||
| # Cache does not contain the manifest. | ||
| if not isinstance(repository, Repository): | ||
| del chunks[Manifest.MANIFEST_ID] | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -38,6 +38,22 @@ def extract_crypted_data(cls, data: bytes) -> bytes: | |
| raise IntegrityError(f"object size inconsistent: expected {overall_expected_size} bytes, got {len(data)}") | ||
| return data[hdr_size + hdr.meta_size :] # crypted data | ||
|
|
||
| @classmethod | ||
| def iter_object_headers(cls, pack: bytes): | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Problematic: API requires giving the complete pack contents. Could do partial loads to only load the interesting parts of the pack and skip 99% of the data it does not need. |
||
| """Yield (chunk_id, obj_offset, obj_size) for every object stored in a pack. | ||
|
|
||
| Each object's identity and extent come from its on-disk header, so callers do not need to | ||
| know the pack file name. Works for one object per pack and for several. | ||
| """ | ||
| hdr_size = cls.obj_header.size | ||
| offset = 0 | ||
| total = len(pack) | ||
| while offset + hdr_size <= total: | ||
| hdr = cls.ObjHeader(*cls.obj_header.unpack(pack[offset : offset + hdr_size])) | ||
| obj_size = hdr_size + hdr.meta_size + hdr.data_size | ||
| yield hdr.chunk_id, offset, obj_size | ||
| offset += obj_size | ||
|
|
||
| def __init__(self, key): | ||
| self.key = key | ||
| # Some commands write new chunks (e.g. rename) but don't take a --compression argument. This duplicates | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -658,13 +658,15 @@ def check_object(obj): | |
| # add all existing objects to the index. | ||
| # borg check: the index may have corrupted objects (we did not delete them) | ||
| # borg check --repair: the index will only have non-corrupted objects. | ||
| # the pack file name is the pack_id (sha256(pack) at N>1 or with the | ||
| # BORG_TESTONLY_SHA256_PACK_ID switch), which is not the chunk_id, so recover | ||
| # each object's real (chunk_id, offset, size) from its on-disk header rather | ||
| # than assuming pack file name == chunk_id. | ||
| pack_id = hex_to_bin(info.name) | ||
| pack_size = info.size | ||
| chunk_id = pack_id # N=1: chunk_id == pack_id | ||
| obj_size = pack_size # correct for N=1 | ||
| chunks[chunk_id] = ChunkIndexEntry( | ||
| flags=ChunkIndex.F_USED, size=0, pack_id=pack_id, obj_offset=0, obj_size=obj_size | ||
| ) | ||
| for chunk_id, obj_offset, obj_size in RepoObj.iter_object_headers(obj): | ||
| chunks[chunk_id] = ChunkIndexEntry( | ||
| flags=ChunkIndex.F_USED, size=0, pack_id=pack_id, obj_offset=obj_offset, obj_size=obj_size | ||
| ) | ||
| now = time.monotonic() | ||
| if now > t_last_checkpoint + 300: # checkpoint every 5 mins | ||
| t_last_checkpoint = now | ||
|
|
@@ -705,28 +707,22 @@ def list(self, limit=None, marker=None): | |
| list <limit> infos starting from after id <marker>. | ||
| each info is a tuple (id, storage_size). | ||
| """ | ||
| collect = True if marker is None else False | ||
| # Yield chunk_ids from the chunk index. (Listing the packs/ dir would yield pack file names, | ||
| # i.e. pack_ids, which are not chunk_ids.) iteritems() has no marker arg, so we skip to | ||
| # <marker> ourselves; index order is stable unless the index is mutated, which is all the | ||
| # marker pagination needs. | ||
| self._lock_refresh() | ||
| collect = marker is None | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, that is a better expression than the original code. |
||
| result = [] | ||
| infos = self.store.list("packs") # generator yielding ItemInfos | ||
| while True: | ||
| self._lock_refresh() | ||
| try: | ||
| info = next(infos) | ||
| except StoreObjectNotFound: | ||
| break # can happen e.g. if "packs" does not exist, pointless to continue in that case | ||
| except StopIteration: | ||
| break | ||
| else: | ||
| pack_id = hex_to_bin(info.name) | ||
| chunk_id = pack_id # N=1: chunk_id == pack_id | ||
| if collect: | ||
| chunk_size = info.size # only correct for N=1 | ||
| result.append((chunk_id, chunk_size)) | ||
| if len(result) == limit: | ||
| break | ||
| elif chunk_id == marker: | ||
| collect = True | ||
| # note: do not collect the marker id | ||
| for chunk_id, entry in self.chunks.iteritems(): | ||
| if entry.pack_id == UNKNOWN_BYTES32: | ||
| continue # buffered in PackWriter, not flushed to a pack yet | ||
| if collect: | ||
| result.append((chunk_id, entry.obj_size)) | ||
| if len(result) == limit: | ||
| break | ||
| elif chunk_id == marker: | ||
| collect = True # start collecting after the marker; do not include the marker itself | ||
| return result | ||
|
|
||
| def get(self, id, read_data=True, raise_missing=True): | ||
|
|
@@ -809,12 +805,14 @@ def delete(self, id, wait=True): | |
| deal with async results / exceptions later. | ||
| """ | ||
| self._lock_refresh() | ||
| pack_id = id # N=1: pack_id == chunk_id | ||
| key = "packs/" + bin_to_hex(pack_id) | ||
| try: | ||
| self.store.delete(key) | ||
| except StoreObjectNotFound: | ||
| raise self.ObjectNotFound(id, str(self._location)) from None | ||
| # We can not remove one object by dropping its whole pack without losing the pack's other | ||
| # objects; real removal is store_delete at the pack level (compact). For now just check the | ||
| # object exists (ObjectNotFound contract), log, and do nothing. | ||
| # TODO: delete a single object once a pack can hold more than one (N>1). | ||
| entry = self.chunks.get(id) | ||
| if entry is None: | ||
| raise self.ObjectNotFound(id, str(self._location)) | ||
| logger.warning("ignoring deletion of %s in %s", bin_to_hex(id), bin_to_hex(entry.pack_id)) | ||
|
|
||
| def async_response(self, wait=True): | ||
| """Get one async result (only applies to remote repositories). | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,6 +20,7 @@ | |
| from ...constants import * # NOQA | ||
| from ...helpers import Location, umount | ||
| from ...helpers import EXIT_SUCCESS | ||
| from ...helpers import bin_to_hex | ||
| from ...helpers import init_ec_warnings | ||
| from ...logger import flush_logging | ||
| from ...manifest import Manifest | ||
|
|
@@ -179,6 +180,17 @@ def open_archive(repo_path, name): | |
| return archive, repository | ||
|
|
||
|
|
||
| def delete_chunk(repository, id): | ||
| """Drop the pack holding chunk `id` (test damage helper). | ||
|
|
||
| Repository.delete is a no-op now, so tests that need a chunk to really vanish drop its whole | ||
| pack at the store level. Works at N=1 (one chunk per pack). The pack is resolved through the | ||
| chunk index, since the pack file name is the pack_id, which need not equal the chunk_id. | ||
| """ | ||
| entry = repository.chunks.get(id) | ||
| repository.store_delete("packs/" + bin_to_hex(entry.pack_id)) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if entry is not None: ... else: ... |
||
|
|
||
|
|
||
| def open_repository(archiver): | ||
| if archiver.get_kind() == "remote": | ||
| return Repository(Location(archiver.repository_location), exclusive=True) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what part of "don't do actions that are very harmful for the general case" didn't you understand?
you are killing a complete pack here because one chunk was problematic.