-
Notifications
You must be signed in to change notification settings - Fork 2
Sync metadata #14
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Sync metadata #14
Changes from all commits
9170850
b205f2e
3b8820a
50c2194
20da277
b83542d
e59d773
80bed1f
e25bd15
e59af6f
22521e4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,10 +21,18 @@ def __init__(self, run_dir, configuration): | |
| ) | ||
| self.final_file = "" | ||
| self.transfer_details = self.configuration.get("transfer_details", {}) | ||
| self.metadata_rsync_exitcode_file = os.path.join( | ||
| self.run_dir, ".metadata_rsync_exitcode" | ||
| ) | ||
| self.metadata_destination = os.path.join( | ||
| self.configuration.get("metadata_archive"), | ||
| getattr(self, "run_type", None), | ||
| self.run_id, | ||
| ) | ||
| self.final_rsync_exitcode_file = os.path.join( | ||
| self.run_dir, ".final_rsync_exitcode" | ||
| ) | ||
| self.miarka_destination = self.sequencer_config.get("miarka_destination") | ||
| self.remote_destination = self.sequencer_config.get("remote_destination") | ||
| self.db = StatusdbSession(self.configuration.get("statusdb")) | ||
|
|
||
| def confirm_run_type(self): | ||
|
|
@@ -42,50 +50,95 @@ def sequencing_ongoing(self): | |
| return False | ||
| return True | ||
|
|
||
| def generate_rsync_command(self, is_final_sync=False): | ||
| """Generate an rsync command string.""" | ||
| destination = ( | ||
| self.transfer_details.get("user") | ||
| + "@" | ||
| + self.transfer_details.get("host") | ||
| + ":" | ||
| + self.miarka_destination | ||
| @property | ||
| def metadata_synced(self): | ||
| """Check if the metadata rsync was successful by reading the exit code file.""" | ||
| return fs.check_exit_status(self.metadata_rsync_exitcode_file) | ||
|
|
||
| def sync_metadata(self): | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Am I missing something or Is this not called anywhere yet?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's called in dataflow_transfer.py, by
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder how I missed that. But anyhow, so the sync is triggered once the sequencing is finished but after that we don't really ensure that it finished properly right? Not sure how to change it, but we have had occassional errors on the rsync to the nas-ns. But maybe I misinterpret and its actually attempted again if failed?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, if the exit code of the rsync is not 0 it will retry. The exception is if the first case,
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've added some additional logic to
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you think it looks ok? |
||
| """Start background rsync transfer for metadata files.""" | ||
| metadata_rsync_command = self.generate_rsync_command( | ||
| metadata_only=True, with_exit_code_file=True | ||
| ) | ||
|
|
||
| if fs.rsync_is_running(src=self.run_dir, dst=self.metadata_destination): | ||
| logger.info( | ||
| f"Metadata rsync is already running for {self.run_dir} to destination {self.metadata_destination}. Skipping background metadata sync initiation." | ||
| ) | ||
| return | ||
| try: | ||
| fs.submit_background_process(metadata_rsync_command) | ||
| logger.info( | ||
| f"{self.run_id}: Started metadata rsync to {self.metadata_destination}" | ||
| + f" with the following command: '{metadata_rsync_command}'" | ||
| ) | ||
| except Exception as e: | ||
| logger.error(f"Failed to start metadata rsync for {self.run_id}: {e}") | ||
| raise e | ||
|
|
||
| def generate_rsync_command(self, metadata_only=False, with_exit_code_file=False): | ||
| """Generate an rsync command string.""" | ||
ssjunnebo marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if metadata_only: | ||
| source = self.run_dir + "/" | ||
| destination = self.metadata_destination + "/" | ||
| log_file_option = "--log-file=" + os.path.join( | ||
| self.run_dir, "rsync_metadata_log.txt" | ||
| ) | ||
| rsync_options = self.sequencer_config.get("metadata_rsync_options", []) | ||
| exit_code_file = self.metadata_rsync_exitcode_file | ||
| else: | ||
| source = self.run_dir | ||
| destination = ( | ||
| self.transfer_details.get("user") | ||
| + "@" | ||
| + self.transfer_details.get("host") | ||
| + ":" | ||
| + self.remote_destination | ||
| ) | ||
| log_file_option = "--log-file=" + os.path.join( | ||
| self.run_dir, "rsync_remote_log.txt" | ||
| ) | ||
| rsync_options = self.sequencer_config.get("remote_rsync_options", []) | ||
| exit_code_file = self.final_rsync_exitcode_file | ||
|
|
||
| run_one_bin = self.configuration.get("run_one_path", "run-one") | ||
| command = [ | ||
| run_one_bin, | ||
| "rsync", | ||
| "-au", | ||
| "--log-file=" + os.path.join(self.run_dir, "rsync_remote_log.txt"), | ||
| *(self.sequencer_config.get("rsync_options", [])), | ||
| self.run_dir, | ||
| log_file_option, | ||
| *(rsync_options), | ||
| "--exclude='*'" if metadata_only else "", | ||
| source, | ||
| destination, | ||
| ] | ||
| command_str = " ".join(command) | ||
| if is_final_sync: | ||
| command_str += f"; echo $? > {self.final_rsync_exitcode_file}" | ||
| if with_exit_code_file: | ||
| command_str += f"; echo $? > {exit_code_file}" | ||
| return command_str | ||
|
|
||
| def start_transfer(self, final=False): | ||
| """Start background rsync transfer to storage.""" | ||
| transfer_command = self.generate_rsync_command(is_final_sync=final) | ||
| if fs.rsync_is_running(src=self.run_dir): | ||
| transfer_command = self.generate_rsync_command( | ||
| metadata_only=False, with_exit_code_file=final | ||
| ) | ||
| if fs.rsync_is_running(src=self.run_dir, dst=self.remote_destination): | ||
| logger.info( | ||
| f"Rsync is already running for {self.run_dir}. Skipping background transfer initiation." | ||
| f"Rsync is already running for {self.run_dir} to destination {self.remote_destination}. Skipping background transfer initiation." | ||
| ) | ||
| return | ||
| try: | ||
| fs.submit_background_process(transfer_command) | ||
| logger.info( | ||
| f"{self.run_id}: Started rsync to {self.miarka_destination}" | ||
| f"{self.run_id}: Started rsync to {self.remote_destination}" | ||
| + f" with the following command: '{transfer_command}'" | ||
| ) | ||
| except Exception as e: | ||
| logger.error(f"Failed to start rsync for {self.run_id}: {e}") | ||
| raise e | ||
| rsync_info = { | ||
| "command": transfer_command, | ||
| "destination_path": self.miarka_destination, | ||
| "destination_path": self.remote_destination, | ||
| } | ||
| if final: | ||
| self.update_statusdb( | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should it really be
nothere? If it should, I need to think harder.