Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 3 additions & 10 deletions src/borg/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,11 +382,8 @@ def __init__(self, cache, key, stats, chunker_params=ITEMS_CHUNKER_PARAMS):
self.stats = stats

def write_chunk(self, chunk):
id_, _ = self.cache.add_chunk(
self.key.id_hash(chunk), {}, chunk, stats=self.stats, wait=False, ro_type=ROBJ_ARCHIVE_STREAM
)
id_, _ = self.cache.add_chunk(self.key.id_hash(chunk), {}, chunk, stats=self.stats, ro_type=ROBJ_ARCHIVE_STREAM)
logger.debug(f"writing item metadata stream chunk {bin_to_hex(id_)}")
self.cache.repository.async_response(wait=False)
return id_


Expand Down Expand Up @@ -688,8 +685,6 @@ def save(self, name=None, comment=None, timestamp=None, stats=None, additional_m
raise Error("%s - archive too big (issue #1473)!" % err_msg)
else:
raise
while self.repository.async_response(wait=True) is not None:
pass
self.manifest.archives.create(name, self.id, metadata.time)
self.manifest.write()
return metadata
Expand Down Expand Up @@ -1176,8 +1171,7 @@ def chunk_processor(chunk):
started_hashing = time.monotonic()
chunk_id, data = cached_hash(chunk, self.key.id_hash)
stats.hashing_time += time.monotonic() - started_hashing
chunk_entry = cache.add_chunk(chunk_id, {}, data, stats=stats, wait=False, ro_type=ROBJ_FILE_STREAM)
self.cache.repository.async_response(wait=False)
chunk_entry = cache.add_chunk(chunk_id, {}, data, stats=stats, ro_type=ROBJ_FILE_STREAM)
return chunk_entry

item.chunks = []
Expand Down Expand Up @@ -2271,8 +2265,7 @@ def chunk_processor(self, target, chunk):
size = len(data)
if chunk_id in self.seen_chunks:
return self.cache.reuse_chunk(chunk_id, size, target.stats)
chunk_entry = self.cache.add_chunk(chunk_id, {}, data, stats=target.stats, wait=False, ro_type=ROBJ_FILE_STREAM)
self.cache.repository.async_response(wait=False)
chunk_entry = self.cache.add_chunk(chunk_id, {}, data, stats=target.stats, ro_type=ROBJ_FILE_STREAM)
self.seen_chunks.add(chunk_entry.id)
return chunk_entry

Expand Down
9 changes: 2 additions & 7 deletions src/borg/archiver/transfer_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,7 @@ def transfer_chunks(
present += size
else:
# Add the new chunk to the repository
chunk_entry = cache.add_chunk(
chunk_id, {}, data, stats=archive.stats, wait=False, ro_type=ROBJ_FILE_STREAM
)
cache.repository.async_response(wait=False)
chunk_entry = cache.add_chunk(chunk_id, {}, data, stats=archive.stats, ro_type=ROBJ_FILE_STREAM)
transfer += size
chunks.append(chunk_entry)
else:
Expand Down Expand Up @@ -101,7 +98,6 @@ def transfer_chunks(
meta,
data,
stats=archive.stats,
wait=False,
compress=False,
size=size,
ctype=meta["ctype"],
Expand All @@ -112,11 +108,10 @@ def transfer_chunks(
# always decompress and re-compress file data chunks
meta, data = other_manifest.repo_objs.parse(chunk_id, cdata, ro_type=ROBJ_FILE_STREAM)
chunk_entry = cache.add_chunk(
chunk_id, meta, data, stats=archive.stats, wait=False, ro_type=ROBJ_FILE_STREAM
chunk_id, meta, data, stats=archive.stats, ro_type=ROBJ_FILE_STREAM
)
else:
raise ValueError(f"unsupported recompress mode: {recompress}")
cache.repository.async_response(wait=False)
chunks.append(chunk_entry)
transfer += size
else:
Expand Down
15 changes: 2 additions & 13 deletions src/borg/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -722,18 +722,7 @@ def reuse_chunk(self, id, size, stats):
return ChunkListEntry(id, size)

def add_chunk(
self,
id,
meta,
data,
*,
stats,
wait=True,
compress=True,
size=None,
ctype=None,
clevel=None,
ro_type=ROBJ_FILE_STREAM,
self, id, meta, data, *, stats, compress=True, size=None, ctype=None, clevel=None, ro_type=ROBJ_FILE_STREAM
):
assert ro_type is not None
if size is None:
Expand All @@ -752,7 +741,7 @@ def add_chunk(
cdata = self.repo_objs.format(
id, meta, data, compress=compress, size=size, ctype=ctype, clevel=clevel, ro_type=ro_type
)
pack_results = self.repository.put(id, cdata, wait=wait)
pack_results = self.repository.put(id, cdata)
self.last_refresh_dt = now # .put also refreshed the lock
self.chunks.add(id, size)
self.chunks.update_pack_info(pack_results)
Expand Down
24 changes: 3 additions & 21 deletions src/borg/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -785,12 +785,9 @@ def get_many(self, ids, read_data=True, raise_missing=True):
for id_ in ids:
yield self.get(id_, read_data=read_data, raise_missing=raise_missing)

def put(self, id, data, wait=True):
def put(self, id, data):
"""put a repo object

Note: when doing calls with wait=False this gets async and caller must
deal with async results / exceptions later.

Returns a list of (chunk_id, pack_id, obj_offset, obj_size) tuples for
every chunk written to disk this call. At max_count=1 this is always
one entry.
Expand All @@ -802,12 +799,8 @@ def put(self, id, data, wait=True):
# PackWriter shares this repository's index, so add() triggers the lazy build itself.
return self._pack_writer.add(id, data)

def delete(self, id, wait=True):
"""delete a repo object

Note: when doing calls with wait=False this gets async and caller must
deal with async results / exceptions later.
"""
def delete(self, id):
"""delete a repo object"""
self._lock_refresh()
pack_id = id # N=1: pack_id == chunk_id
key = "packs/" + bin_to_hex(pack_id)
Expand All @@ -816,17 +809,6 @@ def delete(self, id, wait=True):
except StoreObjectNotFound:
raise self.ObjectNotFound(id, str(self._location)) from None

def async_response(self, wait=True):
"""Get one async result (only applies to remote repositories).

async commands (== calls with wait=False, e.g. delete and put) have no results,
but may raise exceptions. These async exceptions must get collected later via
async_response() calls. Repeat the call until it returns None.
The previous calls might either return one (non-None) result or raise an exception.
If wait=True is given and there are outstanding responses, it will wait for them
to arrive. With wait=False, it will only return already received responses.
"""

def break_lock(self):
Lock(self.store).break_lock()

Expand Down
5 changes: 2 additions & 3 deletions src/borg/testsuite/archive_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,14 +145,13 @@ def test_timestamp_parsing(monkeypatch, isoformat, expected):

class MockCache:
class MockRepo:
def async_response(self, wait=True):
pass
pass

def __init__(self):
self.objects = {}
self.repository = self.MockRepo()

def add_chunk(self, id, meta, data, stats=None, wait=True, ro_type=None):
def add_chunk(self, id, meta, data, stats=None, ro_type=None):
assert ro_type is not None
self.objects[id] = data
return id, len(data)
Expand Down
Loading