From f9cf67954e93ae77837ee196463efcce8971ddef Mon Sep 17 00:00:00 2001 From: Zhaoyu Yang Date: Fri, 15 May 2026 16:04:35 -0400 Subject: [PATCH] check distributed files availability after transfer --- doc/changes/DM-54502.misc.rst | 1 + python/lsst/ctrl/bps/panda/utils.py | 27 ++++++++++----------- tests/test_utils.py | 37 ++++++++++++++++++++++++++++- 3 files changed, 50 insertions(+), 15 deletions(-) create mode 100644 doc/changes/DM-54502.misc.rst diff --git a/doc/changes/DM-54502.misc.rst b/doc/changes/DM-54502.misc.rst new file mode 100644 index 00000000..afd19ae4 --- /dev/null +++ b/doc/changes/DM-54502.misc.rst @@ -0,0 +1 @@ +Capture ResourcePath transfer_from exception in copy_files_for_distribution diff --git a/python/lsst/ctrl/bps/panda/utils.py b/python/lsst/ctrl/bps/panda/utils.py index 6d64a1fa..f598943a 100644 --- a/python/lsst/ctrl/bps/panda/utils.py +++ b/python/lsst/ctrl/bps/panda/utils.py @@ -39,7 +39,6 @@ ] import binascii -import concurrent.futures import json import logging import os @@ -183,12 +182,17 @@ def copy_files_for_distribution(files_to_stage, file_distribution_uri, max_copy_ Path on the edge node accessed storage, including access protocol, bucket name to place files. max_copy_workers : `int` - Maximum number of workers for copying files. + Maximum number of workers for copying files. Present for API + compatibility; worker selection is handled internally by + `ResourcePath.mtransfer`. Raises ------ + ExceptionGroup + Raised by `ResourcePath.mtransfer` when one or more transfers fail. RuntimeError - Raised when error copying files to the distribution point. + Raised if a copied file is not found at the distribution point after + transfer completes. """ files_to_copy = {} @@ -205,18 +209,13 @@ def copy_files_for_distribution(files_to_stage, file_distribution_uri, max_copy_ folder_uri = file_distribution_uri.join(folder_name, forceDirectory=False) files_to_copy[ResourcePath(local_pfn, forceDirectory=False)] = folder_uri - copy_executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_copy_workers) - future_file_copy = [] for src, trgt in files_to_copy.items(): - _LOG.debug("Staging %s to %s", src, trgt) - # S3 clients explicitly instantiate here to overpass this - # https://stackoverflow.com/questions/52820971/is-boto3-client-thread-safe - trgt.exists() - future_file_copy.append(copy_executor.submit(trgt.transfer_from, src, transfer="copy")) - - for future in concurrent.futures.as_completed(future_file_copy): - if future.result() is not None: - raise RuntimeError("Error of placing files to the distribution point") + _LOG.info("Staging %s to %s", src, trgt) + results = ResourcePath.mtransfer("copy", files_to_copy.items()) + + for trgt in results: + if not trgt.exists(): + raise RuntimeError(f"File was not copied to the distribution point: {trgt}") def get_idds_client(config): diff --git a/tests/test_utils.py b/tests/test_utils.py index fa3d6533..9c5d3445 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -27,10 +27,13 @@ """Unit tests for ctrl_bps_panda utilities.""" +import os +import tempfile import unittest from lsst.ctrl.bps import GenericWorkflowExec, GenericWorkflowJob -from lsst.ctrl.bps.panda.utils import _make_pseudo_filename +from lsst.ctrl.bps.panda.utils import _make_pseudo_filename, copy_files_for_distribution +from lsst.resources import ResourcePath class TestPandaUtils(unittest.TestCase): @@ -46,6 +49,38 @@ def testOKPseudoFilename(self): name = _make_pseudo_filename({}, gwjob) self.assertIn("j" * 15, name) + def testCopyFilesForDistribution(self): + with tempfile.TemporaryDirectory() as src_tmpdir, tempfile.TemporaryDirectory() as dest_tmpdir: + single_file = os.path.join(src_tmpdir, "payload.txt") + with open(single_file, "w") as handle: + handle.write("payload") + + directory = os.path.join(src_tmpdir, "inputs") + os.mkdir(directory) + nested_file = os.path.join(directory, "nested.txt") + with open(nested_file, "w") as handle: + handle.write("nested") + + copy_files_for_distribution( + {"single": single_file, "directory": directory}, + ResourcePath(dest_tmpdir, forceDirectory=True), + max_copy_workers=2, + ) + + self.assertTrue(os.path.exists(os.path.join(dest_tmpdir, "payload.txt"))) + self.assertTrue(os.path.exists(os.path.join(dest_tmpdir, "inputs", "nested.txt"))) + + def testCopyFilesForDistributionRaisesExceptionGroup(self): + with tempfile.TemporaryDirectory() as src_tmpdir, tempfile.TemporaryDirectory() as dest_tmpdir: + missing_file = os.path.join(src_tmpdir, "missing.txt") + + with self.assertRaises(ExceptionGroup): + copy_files_for_distribution( + {"missing": missing_file}, + ResourcePath(dest_tmpdir, forceDirectory=True), + max_copy_workers=2, + ) + if __name__ == "__main__": unittest.main()