Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/changes/DM-54502.misc.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Capture ResourcePath transfer_from exception in copy_files_for_distribution
27 changes: 13 additions & 14 deletions python/lsst/ctrl/bps/panda/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
]

import binascii
import concurrent.futures
import json
import logging
import os
Expand Down Expand Up @@ -183,12 +182,17 @@ def copy_files_for_distribution(files_to_stage, file_distribution_uri, max_copy_
Path on the edge node accessed storage,
including access protocol, bucket name to place files.
max_copy_workers : `int`
Maximum number of workers for copying files.
Maximum number of workers for copying files. Present for API
compatibility; worker selection is handled internally by
`ResourcePath.mtransfer`.

Raises
------
ExceptionGroup
Raised by `ResourcePath.mtransfer` when one or more transfers fail.
RuntimeError
Raised when error copying files to the distribution point.
Raised if a copied file is not found at the distribution point after
transfer completes.
"""
files_to_copy = {}

Expand All @@ -205,18 +209,13 @@ def copy_files_for_distribution(files_to_stage, file_distribution_uri, max_copy_
folder_uri = file_distribution_uri.join(folder_name, forceDirectory=False)
files_to_copy[ResourcePath(local_pfn, forceDirectory=False)] = folder_uri

copy_executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_copy_workers)
future_file_copy = []
for src, trgt in files_to_copy.items():
_LOG.debug("Staging %s to %s", src, trgt)
# S3 clients explicitly instantiate here to overpass this
# https://stackoverflow.com/questions/52820971/is-boto3-client-thread-safe
trgt.exists()
future_file_copy.append(copy_executor.submit(trgt.transfer_from, src, transfer="copy"))

for future in concurrent.futures.as_completed(future_file_copy):
if future.result() is not None:
raise RuntimeError("Error of placing files to the distribution point")
_LOG.info("Staging %s to %s", src, trgt)
results = ResourcePath.mtransfer("copy", files_to_copy.items())

for trgt in results:
if not trgt.exists():
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure it's possible for the copy to succeed but the file to not be there.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was motivated by the initial "silent" failure we suspected. In w12 when the copy didn't show any exception, but pipetaskInit couldn't find the QG. If we don't need check the file exists, then we can turn ResourcePath.mtransfer raise to be true.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the copy succeeds the file will be there (unless something very strange has happened) so I don't think you need to explicitly need to check. Even if you want to explicitly check you should still let mtransfer raise.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Now let ResourcePath.mtransfer raise the grouped exceptions. Only two files to be transferred, so I leave the existence check there.

raise RuntimeError(f"File was not copied to the distribution point: {trgt}")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This raise was left in the code, but it was removed from the docstring. If leaving this existence check, please add back the information to the docstring.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks to both of you for the review! I've added the RuntimeError in the doctoring.



def get_idds_client(config):
Expand Down
37 changes: 36 additions & 1 deletion tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@

"""Unit tests for ctrl_bps_panda utilities."""

import os
import tempfile
import unittest

from lsst.ctrl.bps import GenericWorkflowExec, GenericWorkflowJob
from lsst.ctrl.bps.panda.utils import _make_pseudo_filename
from lsst.ctrl.bps.panda.utils import _make_pseudo_filename, copy_files_for_distribution
from lsst.resources import ResourcePath


class TestPandaUtils(unittest.TestCase):
Expand All @@ -46,6 +49,38 @@ def testOKPseudoFilename(self):
name = _make_pseudo_filename({}, gwjob)
self.assertIn("j" * 15, name)

def testCopyFilesForDistribution(self):
with tempfile.TemporaryDirectory() as src_tmpdir, tempfile.TemporaryDirectory() as dest_tmpdir:
single_file = os.path.join(src_tmpdir, "payload.txt")
with open(single_file, "w") as handle:
handle.write("payload")

directory = os.path.join(src_tmpdir, "inputs")
os.mkdir(directory)
nested_file = os.path.join(directory, "nested.txt")
with open(nested_file, "w") as handle:
handle.write("nested")

copy_files_for_distribution(
{"single": single_file, "directory": directory},
ResourcePath(dest_tmpdir, forceDirectory=True),
max_copy_workers=2,
)

self.assertTrue(os.path.exists(os.path.join(dest_tmpdir, "payload.txt")))
self.assertTrue(os.path.exists(os.path.join(dest_tmpdir, "inputs", "nested.txt")))

def testCopyFilesForDistributionRaisesExceptionGroup(self):
with tempfile.TemporaryDirectory() as src_tmpdir, tempfile.TemporaryDirectory() as dest_tmpdir:
missing_file = os.path.join(src_tmpdir, "missing.txt")

with self.assertRaises(ExceptionGroup):
copy_files_for_distribution(
{"missing": missing_file},
ResourcePath(dest_tmpdir, forceDirectory=True),
max_copy_workers=2,
)


if __name__ == "__main__":
unittest.main()
Loading