-
Notifications
You must be signed in to change notification settings - Fork 1
Capture ResourcePath transfer_from exception in copy_files_for_distribution #108
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| Capture ResourcePath transfer_from exception in copy_files_for_distribution |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -39,7 +39,6 @@ | |
| ] | ||
|
|
||
| import binascii | ||
| import concurrent.futures | ||
| import json | ||
| import logging | ||
| import os | ||
|
|
@@ -183,12 +182,17 @@ def copy_files_for_distribution(files_to_stage, file_distribution_uri, max_copy_ | |
| Path on the edge node accessed storage, | ||
| including access protocol, bucket name to place files. | ||
| max_copy_workers : `int` | ||
| Maximum number of workers for copying files. | ||
| Maximum number of workers for copying files. Present for API | ||
| compatibility; worker selection is handled internally by | ||
| `ResourcePath.mtransfer`. | ||
|
|
||
| Raises | ||
| ------ | ||
| ExceptionGroup | ||
| Raised by `ResourcePath.mtransfer` when one or more transfers fail. | ||
| RuntimeError | ||
| Raised when error copying files to the distribution point. | ||
| Raised if a copied file is not found at the distribution point after | ||
| transfer completes. | ||
| """ | ||
| files_to_copy = {} | ||
|
|
||
|
|
@@ -205,18 +209,13 @@ def copy_files_for_distribution(files_to_stage, file_distribution_uri, max_copy_ | |
| folder_uri = file_distribution_uri.join(folder_name, forceDirectory=False) | ||
| files_to_copy[ResourcePath(local_pfn, forceDirectory=False)] = folder_uri | ||
|
|
||
| copy_executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_copy_workers) | ||
| future_file_copy = [] | ||
| for src, trgt in files_to_copy.items(): | ||
| _LOG.debug("Staging %s to %s", src, trgt) | ||
| # S3 clients explicitly instantiate here to overpass this | ||
| # https://stackoverflow.com/questions/52820971/is-boto3-client-thread-safe | ||
| trgt.exists() | ||
| future_file_copy.append(copy_executor.submit(trgt.transfer_from, src, transfer="copy")) | ||
|
|
||
| for future in concurrent.futures.as_completed(future_file_copy): | ||
| if future.result() is not None: | ||
| raise RuntimeError("Error of placing files to the distribution point") | ||
| _LOG.info("Staging %s to %s", src, trgt) | ||
| results = ResourcePath.mtransfer("copy", files_to_copy.items()) | ||
|
|
||
| for trgt in results: | ||
| if not trgt.exists(): | ||
| raise RuntimeError(f"File was not copied to the distribution point: {trgt}") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This raise was left in the code, but it was removed from the docstring. If leaving this existence check, please add back the information to the docstring.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks to both of you for the review! I've added the RuntimeError in the doctoring. |
||
|
|
||
|
|
||
| def get_idds_client(config): | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure it's possible for the copy to succeed but the file to not be there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was motivated by the initial "silent" failure we suspected. In w12 when the copy didn't show any exception, but pipetaskInit couldn't find the QG. If we don't need check the file exists, then we can turn ResourcePath.mtransfer raise to be true.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the copy succeeds the file will be there (unless something very strange has happened) so I don't think you need to explicitly need to check. Even if you want to explicitly check you should still let mtransfer raise.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Now let
ResourcePath.mtransferraise the grouped exceptions. Only two files to be transferred, so I leave the existence check there.