11#!/usr/bin/env python
2- from gevent import monkey
3-
4- monkey .patch_all ()
52
63import datetime
74import enum
@@ -40,7 +37,7 @@ class Resolution(enum.Enum):
4037
4138
4239# https://stackoverflow.com/questions/73395864/how-do-i-wait-when-all-threadpoolexecutor-threads-are-busy
43- class RecompressThreadPoolExecutor (ThreadPoolExecutor ):
40+ class CompressThreadPoolExecutor (ThreadPoolExecutor ):
4441 """ThreadPoolExecutor that keeps track of the number of available workers.
4542
4643 Refs:
@@ -182,18 +179,34 @@ def overwrite_parallel(compressed_raw_mime_by_sha256: "dict[str, bytes]") -> Non
182179 )
183180
184181
185- def recompress_batch (
186- recompress_sha256s : "dict[str, int]" , * , dry_run = True , compression_level : int = 3
182+ def compress_batch (
183+ recompress_sha256s : "dict[str, int]" ,
184+ * ,
185+ dry_run = True ,
186+ compression_level : int = 3 ,
187+ recompress : bool = False ,
187188) -> None :
188189 if not recompress_sha256s :
189190 return
190191
191192 data_by_sha256 = {
192193 data_sha256 : data
193194 for data_sha256 , data in download_parallel (set (recompress_sha256s ))
194- if data is not None and not data . startswith ( blockstore . ZSTD_MAGIC_NUMBER_PREFIX )
195+ if data is not None
195196 }
196197
198+ if recompress :
199+ data_by_sha256 = {
200+ data_sha256 : blockstore .maybe_decompress_raw_mime (data )
201+ for data_sha256 , data in data_by_sha256 .items ()
202+ }
203+ else :
204+ data_by_sha256 = {
205+ data_sha256 : data
206+ for data_sha256 , data in data_by_sha256 .items ()
207+ if not data .startswith (blockstore .ZSTD_MAGIC_NUMBER_PREFIX )
208+ }
209+
197210 if not data_by_sha256 :
198211 return
199212
@@ -276,6 +289,7 @@ def recompress_batch(
276289 "--max-recompress-batch-bytes" , type = int , default = MAX_RECOMPRESS_BATCH_BYTES
277290)
278291@click .option ("--fraction" , type = str , default = None )
292+ @click .option ("--recompress/--no-recompress" , default = False )
279293def run (
280294 limit : "int | None" ,
281295 after : "str | None" ,
@@ -294,6 +308,7 @@ def run(
294308 min_size : "int | None" ,
295309 max_recompress_batch_bytes : int ,
296310 fraction : "str | None" ,
311+ recompress : bool ,
297312) -> int :
298313 shutting_down = False
299314
@@ -317,7 +332,7 @@ def shutdown(signum, frame):
317332 assert batch_size > 0
318333 assert recompress_batch_size > 0
319334
320- recompress_executor = RecompressThreadPoolExecutor (
335+ compress_executor = CompressThreadPoolExecutor (
321336 max_workers = recompress_executor_workers
322337 )
323338
@@ -377,24 +392,26 @@ def shutdown(signum, frame):
377392 len (recompress_sha256s ) >= recompress_batch_size
378393 or recompress_bytes > max_recompress_batch_bytes
379394 ):
380- recompress_executor .wait_for_available_worker ()
381- recompress_executor .submit (
382- recompress_batch ,
395+ compress_executor .wait_for_available_worker ()
396+ compress_executor .submit (
397+ compress_batch ,
383398 recompress_sha256s .copy (),
384399 dry_run = dry_run ,
385400 compression_level = compression_level ,
401+ recompress = recompress ,
386402 )
387403 recompress_sha256s .clear ()
388404 recompress_bytes = 0
389405
390406 if shutting_down :
391407 break
392408
393- recompress_executor .submit (
394- recompress_batch ,
409+ compress_executor .submit (
410+ compress_batch ,
395411 recompress_sha256s .copy (),
396412 dry_run = dry_run ,
397413 compression_level = compression_level ,
414+ recompress = recompress ,
398415 )
399416
400417 if shutting_down :
@@ -405,7 +422,7 @@ def shutdown(signum, frame):
405422
406423 after_id = max_id + 1
407424
408- recompress_executor .shutdown (wait = True )
425+ compress_executor .shutdown (wait = True )
409426
410427
411428if __name__ == "__main__" :
0 commit comments