diff --git a/cirro/__init__.py b/cirro/__init__.py index 0353053..119f6b4 100644 --- a/cirro/__init__.py +++ b/cirro/__init__.py @@ -1,6 +1,7 @@ import cirro.file_utils # noqa from cirro.cirro_client import CirroApi from cirro.sdk.dataset import DataPortalDataset +from cirro.sdk.file import DataPortalFile from cirro.sdk.login import DataPortalLogin from cirro.sdk.portal import DataPortal from cirro.sdk.process import DataPortalProcess @@ -14,6 +15,7 @@ 'DataPortalProcess', 'DataPortalDataset', 'DataPortalReference', + 'DataPortalFile', 'CirroApi', 'file_utils' ] diff --git a/cirro/cli/__init__.py b/cirro/cli/__init__.py index 28f0cad..b667fc6 100644 --- a/cirro/cli/__init__.py +++ b/cirro/cli/__init__.py @@ -1,6 +1,7 @@ from cirro.cli.controller import run_ingest, run_download, run_configure, run_list_datasets from cirro.cli.controller import run_create_pipeline_config, run_validate_folder from cirro.cli.controller import run_list_projects, run_list_files +from cirro.cli.debug import run_debug __all__ = [ 'run_ingest', @@ -11,4 +12,5 @@ 'run_validate_folder', 'run_list_projects', 'run_list_files', + 'run_debug', ] diff --git a/cirro/cli/cli.py b/cirro/cli/cli.py index f6bf6c6..89972b9 100644 --- a/cirro/cli/cli.py +++ b/cirro/cli/cli.py @@ -7,6 +7,7 @@ from cirro.cli import run_create_pipeline_config, run_validate_folder from cirro.cli import run_ingest, run_download, run_configure, run_list_datasets from cirro.cli.controller import handle_error, run_upload_reference, run_list_projects, run_list_files +from cirro.cli.debug import run_debug from cirro.cli.interactive.utils import InputError @@ -142,6 +143,21 @@ def upload_reference(**kwargs): run_upload_reference(kwargs, interactive=kwargs.get('interactive')) +@run.command(help='Debug a failed workflow execution', no_args_is_help=True) +@click.option('--project', + help='Name or ID of the project', + default=None) +@click.option('--dataset', + help='Name or ID of the dataset', + default=None) +@click.option('-i', '--interactive', + help='Gather arguments interactively', + is_flag=True, default=False) +def debug(**kwargs): + check_required_args(kwargs) + run_debug(kwargs, interactive=kwargs.get('interactive')) + + @run.command(help='Configure authentication') def configure(): run_configure() diff --git a/cirro/cli/controller.py b/cirro/cli/controller.py index ab05611..dbc1d19 100644 --- a/cirro/cli/controller.py +++ b/cirro/cli/controller.py @@ -9,12 +9,13 @@ from cirro.cirro_client import CirroApi from cirro.cli.interactive.auth_args import gather_auth_config from cirro.cli.interactive.create_pipeline_config import gather_create_pipeline_config_arguments -from cirro.cli.interactive.download_args import gather_download_arguments, ask_dataset_files -from cirro.cli.interactive.download_args import gather_download_arguments_dataset +from cirro.cli.interactive.download_args import gather_download_arguments, ask_dataset_files, \ + gather_download_arguments_dataset from cirro.cli.interactive.list_dataset_args import gather_list_arguments from cirro.cli.interactive.upload_args import gather_upload_arguments from cirro.cli.interactive.upload_reference_args import gather_reference_upload_arguments -from cirro.cli.interactive.utils import get_id_from_name, get_item_from_name_or_id, InputError, validate_files +from cirro.cli.interactive.utils import get_id_from_name, get_item_from_name_or_id, InputError, \ + validate_files from cirro.cli.interactive.validate_args import gather_validate_arguments, gather_validate_arguments_dataset from cirro.cli.models import ListArguments, UploadArguments, DownloadArguments, CreatePipelineConfigArguments, \ UploadReferenceArguments, ValidateArguments, ListFilesArguments diff --git a/cirro/cli/debug.py b/cirro/cli/debug.py new file mode 100644 index 0000000..1e2b222 --- /dev/null +++ b/cirro/cli/debug.py @@ -0,0 +1,417 @@ +import json +from pathlib import Path +from typing import List, Optional, Set + +from cirro_api_client.v1.models import Status + +from cirro.cli.interactive.common_args import ask_project +from cirro.cli.interactive.download_args import ask_dataset +from cirro.cli.interactive.utils import get_id_from_name, get_item_from_name_or_id, InputError, ask_yes_no, ask +from cirro.cli.models import DebugArguments +from cirro.sdk.dataset import DataPortalDataset +from cirro.sdk.task import DataPortalTask +from cirro.services.service_helpers import list_all_datasets +from cirro.utils import convert_size + +_BACK = "Back" +_DONE = "Done" +_SHOW_FULL_LOG = 'Show full execution log?' +_EMPTY_LABEL = '(empty)' +_STAGED_INPUT = 'staged input' +_UNKNOWN_SIZE = 'unknown size' +# Extensions that can be meaningfully displayed as text +_CSV_EXTENSIONS = {'.csv', '.tsv'} +_JSON_EXTENSIONS = {'.json'} +_TEXT_EXTENSIONS = _CSV_EXTENSIONS | _JSON_EXTENSIONS | { + '.txt', '.log', '.out', '.err', + '.md', '.rst', + '.yaml', '.yml', '.toml', '.ini', '.cfg', '.conf', + '.sh', '.bash', '.py', '.r', '.nf', '.wdl', '.cwl', + '.html', '.xml', + '.bed', '.vcf', '.gff', '.gff3', '.gtf', '.sam', '.fasta', '.fa', '.fastq', '.fq', +} + + +def run_debug(input_params: DebugArguments, interactive=False): # NOSONAR + """ + Debug a failed workflow execution. + + Displays the execution log, identifies the primary failed task, and + shows its logs, inputs, and outputs. In interactive mode the user can + drill into the input chain to trace back the root cause. + """ + from cirro.cli.controller import _init_cirro_client, _get_projects + + cirro = _init_cirro_client() + projects = _get_projects(cirro) + + if interactive: + project_name = ask_project(projects, input_params.get('project')) + input_params['project'] = get_id_from_name(projects, project_name) + datasets = list_all_datasets(project_id=input_params['project'], client=cirro) + datasets = [d for d in datasets if d.status != Status.RUNNING] + input_params['dataset'] = ask_dataset(datasets, input_params.get('dataset'), msg_action='debug') + else: + input_params['project'] = get_id_from_name(projects, input_params['project']) + datasets = cirro.datasets.list(input_params['project']) + original_dataset = input_params['dataset'] + input_params['dataset'] = get_id_from_name(datasets, input_params['dataset']) + dataset_obj = get_item_from_name_or_id(datasets, original_dataset) + if dataset_obj and dataset_obj.status == Status.RUNNING: + raise InputError( + f"Dataset '{dataset_obj.name}' ({dataset_obj.id}) is currently RUNNING. " + "The debug command is only available for completed or failed datasets." + ) + + project_id = input_params['project'] + dataset_id = input_params['dataset'] + + dataset_detail = cirro.datasets.get(project_id=project_id, dataset_id=dataset_id) + sdk_dataset = DataPortalDataset(dataset=dataset_detail, client=cirro) + + # --- Execution log --- + execution_log = sdk_dataset.logs + log_lines = execution_log.splitlines() + + print("\n=== Execution Log (last 50 lines) ===") + print('\n'.join(log_lines[-50:])) + + # Only search for a failed task when the dataset actually failed. + if sdk_dataset.status != Status.FAILED: + if interactive and log_lines and ask_yes_no(_SHOW_FULL_LOG): + print(execution_log) + return + + # --- Primary failed task --- + try: + if interactive: + print("\nSearching for the primary failed task (this may take a moment)...") + failed_task = sdk_dataset.primary_failed_task + except Exception as e: # NOSONAR + print(f"\nCould not load task trace: {e}") + if interactive and log_lines and ask_yes_no(_SHOW_FULL_LOG): + print(execution_log) + return + + if interactive: + if failed_task is None: + print("\nNo failed tasks found in this execution.") + if log_lines and ask_yes_no(_SHOW_FULL_LOG): + print(execution_log) + return + + choices = [ + f"Show task info: {failed_task.name}", + "Show full execution log", + _DONE, + ] + while True: + choice = ask('select', 'Primary failed task found. What would you like to do?', choices=choices) + if choice.startswith("Show task info"): + _task_menu(failed_task, depth=0) + elif choice == "Show full execution log": + print(execution_log) + else: + break + else: + if failed_task is None: + print("\nNo failed tasks found in this execution.") + return + + _print_task_debug_recursive( + failed_task, + max_depth=input_params.get('max_depth'), + max_tasks=input_params.get('max_tasks'), + show_script=input_params.get('show_script', True), + show_log=input_params.get('show_log', True), + show_files=input_params.get('show_files', True), + ) + + +def _print_task_debug(task, depth: int = 0, # NOSONAR + show_script: bool = True, + show_log: bool = True, + show_files: bool = True) -> None: + """Print all debug info for one task, indented according to its depth in the input chain.""" + indent = " " * depth + label = "Primary Failed Task" if depth == 0 else f"Source Task [depth {depth}]" + _print_task_header(task, indent, label) + + if show_script: + task_script = task.script + print(f"\n{indent}--- Task Script ---") + print('\n'.join(indent + line for line in (task_script or _EMPTY_LABEL).splitlines())) + + if show_log: + task_log = task.logs + print(f"\n{indent}--- Task Log ---") + print('\n'.join(indent + line for line in (task_log or _EMPTY_LABEL).splitlines())) + + if show_files: + inputs = task.inputs + print(f"\n{indent}--- Inputs ({len(inputs)}) ---") + for f in inputs: + source = f"from task: {f.source_task.name}" if f.source_task else _STAGED_INPUT + try: + size_str = convert_size(f.size) + except Exception: # NOSONAR + size_str = _UNKNOWN_SIZE + print(f"{indent} {f.name} ({size_str}) [{source}]") + + outputs = task.outputs + print(f"\n{indent}--- Outputs ({len(outputs)}) ---") + for f in outputs: + try: + size_str = convert_size(f.size) + except Exception: # NOSONAR + size_str = _UNKNOWN_SIZE + print(f"{indent} {f.name} ({size_str})") + + +def _print_task_debug_recursive( + task, + max_depth: Optional[int], + max_tasks: Optional[int], + show_script: bool = True, + show_log: bool = True, + show_files: bool = True, + _depth: int = 0, + _seen: Optional[Set[str]] = None, + _counter: Optional[List[int]] = None +) -> None: + """ + Print debug info for a task and then recurse into the tasks that created + each of its input files. + + Deduplicates tasks (a task that produced multiple inputs is only printed + once). Stops early when ``max_depth`` or ``max_tasks`` is reached and + prints a notice so the user knows output was capped. + """ + if _seen is None: + _seen = set() + if _counter is None: + _counter = [0] + + if task.name in _seen: + return + + if max_tasks is not None and _counter[0] >= max_tasks: + indent = " " * _depth + print(f"\n{indent}[max-tasks limit reached — stopping recursion]") + return + + _seen.add(task.name) + _counter[0] += 1 + + _print_task_debug(task, depth=_depth, + show_script=show_script, + show_log=show_log, + show_files=show_files) + + if max_depth is not None and _depth >= max_depth: + source_tasks = [ + f.source_task for f in task.inputs + if f.source_task and f.source_task.name not in _seen + ] + if source_tasks: + indent = " " * (_depth + 1) + names = ', '.join(t.name for t in source_tasks) + print(f"\n{indent}[max-depth limit reached — not expanding: {names}]") + return + + for f in task.inputs: + if f.source_task and f.source_task.name not in _seen: + _print_task_debug_recursive( + f.source_task, max_depth, max_tasks, + show_script=show_script, + show_log=show_log, + show_files=show_files, + _depth=_depth + 1, _seen=_seen, _counter=_counter + ) + + +def _print_task_header(task: DataPortalTask, indent: str, label: str) -> None: + print(f"\n{indent}=== {label} ===") + print(f"{indent}Name: {task.name}") + print(f"{indent}Status: {task.status}") + print(f"{indent}Exit Code: {task.exit_code}") + print(f"{indent}Hash: {task.hash}") + print(f"{indent}Work Dir: {task.work_dir}") + + +def _task_menu(task: DataPortalTask, depth: int = 0) -> None: # NOSONAR + """ + Menu-driven exploration of a single task. + + The user can show the script/log, browse inputs and outputs, and drill + into any source task that produced an input file. The menu loops until + the user selects Back / Done. + """ + indent = " " * depth + label = "Primary Failed Task" if depth == 0 else "Source Task" + _print_task_header(task, indent, label) + + inputs = task.inputs + outputs = task.outputs + + while True: + choices = [ + "Show task script", + "Show task log", + f"Browse inputs ({len(inputs)})", + f"Browse outputs ({len(outputs)})", + _DONE if depth == 0 else _BACK, + ] + choice = ask('select', 'What would you like to do?', choices=choices) + + if choice == "Show task script": + content = task.script + print(f"\n{indent}--- Task Script ---") + print(content if content else _EMPTY_LABEL) + + elif choice == "Show task log": + content = task.logs + print(f"\n{indent}--- Task Log ---") + print(content if content else _EMPTY_LABEL) + + elif choice.startswith("Browse inputs"): + _browse_files_menu(inputs, "input", depth) + + elif choice.startswith("Browse outputs"): + _browse_files_menu(outputs, "output", depth) + + else: # Done / Back + break + + +def _browse_files_menu(files, kind: str, depth: int) -> None: + """ + Let the user pick a file from a list, then enter its file menu. + + ``kind`` is ``'input'`` or ``'output'``, used only for the prompt label. + When there is only one file the selection step is skipped and the file + menu opens immediately. + """ + indent = " " * depth + if not files: + print(f"\n{indent}No {kind} files available.") + return + + if len(files) == 1: + _file_menu(files[0], depth) + return + + # Build display labels — disambiguate duplicates by appending a counter + seen: dict = {} + labels = [] + for f in files: + seen[f.name] = seen.get(f.name, 0) + 1 + counts: dict = {} + for f in files: + if seen[f.name] > 1: + counts[f.name] = counts.get(f.name, 0) + 1 + label = f"{f.name} [{counts[f.name]}]" + else: + label = f.name + source = f"from task: {f.source_task.name}" if f.source_task else _STAGED_INPUT + try: + size_str = convert_size(f.size) + except Exception: # NOSONAR + size_str = _UNKNOWN_SIZE + labels.append(f"{label} ({size_str}) [{source}]") + + choices = labels + [_BACK] + + while True: + choice = ask('select', f'Select a {kind} file to inspect', choices=choices) + if choice == _BACK: + break + + idx = labels.index(choice) + _file_menu(files[idx], depth) + + +def _file_read_options(name: str): + """Return the list of read-action strings appropriate for a given filename.""" + lower = name.lower() + # Strip compression suffix to check underlying type + for ext in ('.gz', '.bz2', '.zst'): + if lower.endswith(ext): + lower = lower[:-len(ext)] + break + + suffix = Path(lower).suffix + + if suffix not in _TEXT_EXTENSIONS: + return [] + + options = [] + if suffix in _CSV_EXTENSIONS: + options.append("Read as CSV (first 10 rows)") + if suffix in _JSON_EXTENSIONS: + options.append("Read as JSON") + options.append("Read as text (first 100 lines)") + return options + + +def _file_menu(wf, depth: int) -> None: # NOSONAR + """Menu for inspecting a single WorkDirFile: read contents or drill into source task.""" + indent = " " * depth + source = f"from task: {wf.source_task.name}" if wf.source_task else _STAGED_INPUT + try: + size_str = convert_size(wf.size) + except Exception: # NOSONAR + size_str = _UNKNOWN_SIZE + print(f"\n{indent}File: {wf.name} ({size_str}) [{source}]") + + read_options = _file_read_options(wf.name) + if not read_options and not wf.source_task: + print(f"{indent}(binary file — no readable options)") + return + + choices = list(read_options) + if wf.source_task: + choices.append(f"Drill into source task: {wf.source_task.name}") + choices.append(_BACK) + + while True: + choice = ask('select', f'What would you like to do with {wf.name!r}?', + choices=choices) + + if choice == _BACK: + break + + elif choice.startswith("Read as CSV"): + try: + df = wf.read_csv() + print(df.head(10).to_string()) + except Exception as e: # NOSONAR + print(f"Could not read as CSV: {e}") + + elif choice.startswith("Read as JSON"): + try: + data = wf.read_json() + output = json.dumps(data, indent=2) + # Cap output at ~200 lines so the terminal isn't flooded + lines = output.splitlines() + if len(lines) > 200: + print('\n'.join(lines[:200])) + print(f"... ({len(lines) - 200} more lines)") + else: + print(output) + except Exception as e: # NOSONAR + print(f"Could not read as JSON: {e}") + + elif choice.startswith("Read as text"): + try: + lines = wf.readlines() + if len(lines) > 100: + print('\n'.join(lines[:100])) + print(f"... ({len(lines) - 100} more lines)") + else: + print('\n'.join(lines)) + except Exception as e: # NOSONAR + print(f"Could not read as text: {e}") + + elif choice.startswith("Drill into source task"): + _task_menu(wf.source_task, depth=depth + 1) diff --git a/cirro/cli/models.py b/cirro/cli/models.py index 1acc469..a162f41 100644 --- a/cirro/cli/models.py +++ b/cirro/cli/models.py @@ -54,3 +54,17 @@ class ListFilesArguments(TypedDict): dataset: str interactive: bool file_limit: int + + +class _DebugArgumentsBase(TypedDict): + project: str + dataset: str + interactive: bool + + +class DebugArguments(_DebugArgumentsBase, total=False): + max_depth: Optional[int] + max_tasks: Optional[int] + show_script: bool + show_log: bool + show_files: bool diff --git a/cirro/helpers/nextflow_utils.py b/cirro/helpers/nextflow_utils.py new file mode 100644 index 0000000..5bfd587 --- /dev/null +++ b/cirro/helpers/nextflow_utils.py @@ -0,0 +1,53 @@ +import re +from typing import List, Optional, TYPE_CHECKING + +if TYPE_CHECKING: + from cirro.sdk.task import DataPortalTask + + +def find_primary_failed_task( # NOSONAR + tasks: List['DataPortalTask'], + execution_log: str +) -> Optional['DataPortalTask']: + """ + Identify the root-cause failed task in a Nextflow workflow execution. + + Strategy: + 1. Filter tasks where status == "FAILED" and exit_code is not None and != 0. + 2. If none, fall back to any task with status == "FAILED". + 3. Parse execution_log for "Error executing process > 'TASK_NAME'" to cross-reference + the task list (exact match first, then substring match). + 4. Fall back to the FAILED task with the lowest task_id (ran earliest). + + Returns None if no failed task is found. + """ + # Step 1: tasks that actually failed with a non-zero exit code + hard_failed = [ + t for t in tasks + if t.status == "FAILED" and t.exit_code is not None and t.exit_code != 0 + ] + + # Step 2: fall back to any FAILED task if the above is empty + candidate_pool = hard_failed if hard_failed else [t for t in tasks if t.status == "FAILED"] + + if not candidate_pool: + return None + + if len(candidate_pool) == 1: + return candidate_pool[0] + + # Step 3: try to cross-reference the execution log + log_match = re.search(r"Error executing process > '([^']+)'", execution_log) + if log_match: + log_task_name = log_match.group(1) + # Exact match first + for task in candidate_pool: + if task.name == log_task_name: + return task + # Partial match + for task in candidate_pool: + if log_task_name in task.name or task.name in log_task_name: + return task + + # Step 4: fall back to earliest failing task + return min(candidate_pool, key=lambda t: t.task_id) diff --git a/cirro/models/dataset.py b/cirro/models/dataset.py index 37c5bb6..aa1fe97 100644 --- a/cirro/models/dataset.py +++ b/cirro/models/dataset.py @@ -1,10 +1,10 @@ -from attrs import define as _attrs_define +from attrs import define as _attrs_define, field as _attrs_field from cirro_api_client.v1.models import Share, Dataset @_attrs_define class DatasetWithShare(Dataset): - share: Share + share: Share = _attrs_field(kw_only=True) @classmethod def from_dataset(cls, dataset: Dataset, share: Share) -> 'DatasetWithShare': diff --git a/cirro/models/file.py b/cirro/models/file.py index cdd2f12..e7998fa 100644 --- a/cirro/models/file.py +++ b/cirro/models/file.py @@ -68,6 +68,19 @@ def upload_dataset(cls, project_id: str, dataset_id: str, base_url: str, token_l project_id=project_id ) + @classmethod + def scratch_download(cls, project_id: str, base_url: str, token_lifetime_override: int = None, + dataset_id: str = ''): + """Create an access context for reading files from the Nextflow scratch bucket.""" + return cls( + file_access_request=ProjectFileAccessRequest( + access_type=ProjectAccessType.READ_SCRATCH, + token_lifetime_hours=token_lifetime_override + ), + base_url=base_url, + project_id=project_id + ) + @classmethod def upload_reference(cls, project_id: str, base_url: str): return cls( diff --git a/cirro/sdk/dataset.py b/cirro/sdk/dataset.py index 34aea03..8320a81 100644 --- a/cirro/sdk/dataset.py +++ b/cirro/sdk/dataset.py @@ -1,17 +1,20 @@ import datetime +from functools import cached_property import re from pathlib import Path from typing import Union, List, Optional, Any from cirro_api_client.v1.api.processes import validate_file_requirements -from cirro_api_client.v1.models import Dataset, DatasetDetail, RunAnalysisRequest, ProcessDetail, Status, \ - RunAnalysisRequestParams, Tag, ArtifactType, NamedItem, ValidateFileRequirementsRequest +from cirro_api_client.v1.errors import CirroException, UnexpectedStatus +from cirro_api_client.v1.models import Dataset, DatasetDetail, RunAnalysisRequest, ProcessDetail, \ + Status, RunAnalysisRequestParams, Tag, ArtifactType, NamedItem, ValidateFileRequirementsRequest from cirro.cirro_client import CirroApi from cirro.file_utils import bytes_to_human_readable, filter_files_by_pattern from cirro.models.assets import DatasetAssets from cirro.models.file import PathLike from cirro.sdk.asset import DataPortalAssets, DataPortalAsset +from cirro.sdk.task import DataPortalTask from cirro.sdk.exceptions import DataPortalAssetNotFound from cirro.sdk.exceptions import DataPortalInputError from cirro.sdk.file import DataPortalFile, DataPortalFiles @@ -241,6 +244,79 @@ def created_at(self) -> datetime.datetime: """Timestamp of dataset creation""" return self._data.created_at + @cached_property + def logs(self) -> str: + """ + Return the top-level execution log for this dataset. + + Returns an empty string if no log events are available (e.g. the job has not started yet). + + Returns: + str: Execution log text, or an empty string if unavailable. + """ + try: + return self._client.execution.get_execution_logs( + project_id=self.project_id, + dataset_id=self.id + ) + except (CirroException, UnexpectedStatus): + return '' + + @cached_property + def tasks(self) -> List[DataPortalTask]: + """ + List of tasks from the workflow execution, fetched via the execution API. + + Returns: + `List[DataPortalTask]` + """ + return self._load_tasks_from_api() + + def _load_tasks_from_api(self) -> List[DataPortalTask]: + """Load tasks via the execution API.""" + api_tasks = self._client.execution.get_tasks_for_execution( + project_id=self.project_id, + dataset_id=self.id + ) + if not api_tasks: + return [] + + all_tasks_ref: List[DataPortalTask] = [] + tasks = [ + DataPortalTask( + task=t, + client=self._client, + project_id=self.project_id, + dataset_id=self.id, + all_tasks_ref=all_tasks_ref, + task_id=i, + ) + for i, t in enumerate(api_tasks) + ] + all_tasks_ref.extend(tasks) + return tasks + + @property + def primary_failed_task(self) -> Optional[DataPortalTask]: + """ + Find the root-cause failed task in this workflow execution. + + Returns ``None`` gracefully when no tasks are available or none have + a ``FAILED`` status. + + Returns: + `cirro.sdk.task.DataPortalTask`, or ``None`` if no failed task is found. + """ + from cirro.helpers.nextflow_utils import find_primary_failed_task + + tasks = self.tasks + + if not tasks: + return None + + execution_log = self.logs + return find_primary_failed_task(tasks, execution_log) + def _get_detail(self): if not isinstance(self._data, DatasetDetail): self._data = self._client.datasets.get(project_id=self.project_id, dataset_id=self.id) diff --git a/cirro/sdk/file.py b/cirro/sdk/file.py index 3c6850e..3cdbb4d 100644 --- a/cirro/sdk/file.py +++ b/cirro/sdk/file.py @@ -1,23 +1,15 @@ -import gzip -import json -import pickle -from io import BytesIO, StringIO from pathlib import Path from typing import List -from typing import TYPE_CHECKING -if TYPE_CHECKING: - import anndata - from pandas import DataFrame - from cirro.cirro_client import CirroApi from cirro.models.file import File, PathLike from cirro.sdk.asset import DataPortalAssets, DataPortalAsset from cirro.sdk.exceptions import DataPortalInputError +from cirro.sdk.file_mixins import FileReadMixin from cirro.utils import convert_size -class DataPortalFile(DataPortalAsset): +class DataPortalFile(DataPortalAsset, FileReadMixin): """ Datasets are made up of a collection of File objects in the Data Portal. """ @@ -90,135 +82,6 @@ def _get(self) -> bytes: return self._client.file.get_file(self._file) - def read_csv(self, compression='infer', encoding='utf-8', **kwargs) -> 'DataFrame': - """ - Parse the file as a Pandas DataFrame. - - The default field separator is a comma (for CSV), use sep='\\t' for TSV. - - File compression is inferred from the extension, but can be set - explicitly with the compression= flag. - - All other keyword arguments are passed to pandas.read_csv - https://pandas.pydata.org/docs/reference/api/pandas.read_csv.html - """ - import pandas - - if compression == 'infer': - # If the file appears to be compressed - if self.relative_path.endswith('.gz'): - compression = dict(method='gzip') - elif self.relative_path.endswith('.bz2'): - compression = dict(method='bz2') - elif self.relative_path.endswith('.xz'): - compression = dict(method='xz') - elif self.relative_path.endswith('.zst'): - compression = dict(method='zstd') - else: - compression = None - - if compression is not None: - handle = BytesIO(self._get()) - else: - handle = StringIO(self._get().decode(encoding)) - - df = pandas.read_csv( - handle, - compression=compression, - encoding=encoding, - **kwargs - ) - handle.close() - return df - - def read_h5ad(self) -> 'anndata.AnnData': - """Read an AnnData object from a file.""" - # Import the anndata library, and raise an error if it is not available - try: - import anndata as ad # noqa - except ImportError: - raise ImportError("The anndata library is required to read AnnData files. " - "Please install it using 'pip install anndata'.") - - # Download the file to a temporary file handle and parse the contents - with BytesIO(self._get()) as handle: - return ad.read_h5ad(handle) - - def read_json(self, **kwargs): - """Read the file contents as a parsed JSON object (dict, list, etc.).""" - return json.loads(self._get(), **kwargs) - - def read_parquet(self, **kwargs) -> 'DataFrame': - """ - Read a Parquet file as a Pandas DataFrame. - - Requires ``pyarrow`` or ``fastparquet`` to be installed. - All keyword arguments are passed to :func:`pandas.read_parquet`. - """ - import pandas - return pandas.read_parquet(BytesIO(self._get()), **kwargs) - - def read_feather(self, **kwargs) -> 'DataFrame': - """ - Read a Feather file as a Pandas DataFrame. - - Requires ``pyarrow`` to be installed. - All keyword arguments are passed to :func:`pandas.read_feather`. - """ - import pandas - return pandas.read_feather(BytesIO(self._get()), **kwargs) - - def read_pickle(self, **kwargs): - """Read the file contents as a Python pickle object.""" - return pickle.loads(self._get(), **kwargs) - - def read_excel(self, **kwargs) -> 'DataFrame': - """ - Read an Excel file (``.xlsx`` / ``.xls``) as a Pandas DataFrame. - - Requires ``openpyxl`` (for ``.xlsx``) or ``xlrd`` (for ``.xls``). - All keyword arguments are passed to :func:`pandas.read_excel`. - """ - import pandas - return pandas.read_excel(BytesIO(self._get()), **kwargs) - - def readlines(self, encoding='utf-8', compression=None) -> List[str]: - """Read the file contents as a list of lines.""" - - return self.read( - encoding=encoding, - compression=compression - ).splitlines() - - def read(self, encoding='utf-8', compression=None) -> str: - """Read the file contents as text.""" - - # Get the raw file contents - cont = self._get() - - # If the file is uncompressed - if compression is None: - return cont.decode(encoding) - # If the file is compressed - else: - - # Only gzip-compression is supported currently - if compression != "gzip": - raise DataPortalInputError("compression may be 'gzip' or None") - - with gzip.open( - BytesIO( - cont - ), - 'rt', - encoding=encoding - ) as handle: - return handle.read() - - def read_bytes(self) -> BytesIO: - """Get a generic BytesIO object representing the Data Portal File, to be passed into readers.""" - return BytesIO(self._get()) - def download(self, download_location: str = None) -> Path: """ Download the file to a local directory. diff --git a/cirro/sdk/file_mixins.py b/cirro/sdk/file_mixins.py new file mode 100644 index 0000000..e9ba08e --- /dev/null +++ b/cirro/sdk/file_mixins.py @@ -0,0 +1,133 @@ +import gzip +import json +import pickle +from abc import ABC, abstractmethod +from io import BytesIO, StringIO +from typing import List, TYPE_CHECKING + +if TYPE_CHECKING: + import anndata + from pandas import DataFrame + +from cirro.sdk.exceptions import DataPortalInputError + + +class FileReadMixin(ABC): + """ + Mixin that adds file-reading methods to any class that provides + ``_get() -> bytes`` and a ``name`` property. + """ + + @property + @abstractmethod + def name(self) -> str: + """Filename.""" + + @abstractmethod + def _get(self) -> bytes: + """Return the raw file bytes.""" + + def read(self, encoding='utf-8', compression=None) -> str: + """Read the file contents as text.""" + cont = self._get() + if compression is None: + return cont.decode(encoding) + if compression != 'gzip': + raise DataPortalInputError("compression may be 'gzip' or None") + with gzip.open(BytesIO(cont), 'rt', encoding=encoding) as handle: + return handle.read() + + def readlines(self, encoding='utf-8', compression=None) -> List[str]: + """Read the file contents as a list of lines.""" + return self.read(encoding=encoding, compression=compression).splitlines() + + def read_bytes(self) -> BytesIO: + """Get a BytesIO object for the file contents, to pass into arbitrary readers.""" + return BytesIO(self._get()) + + def read_csv(self, compression='infer', encoding='utf-8', **kwargs) -> 'DataFrame': + """ + Parse the file as a Pandas DataFrame. + + The default field separator is a comma (for CSV), use sep='\\t' for TSV. + + File compression is inferred from the extension, but can be set + explicitly with the compression= flag. + + All other keyword arguments are passed to pandas.read_csv + https://pandas.pydata.org/docs/reference/api/pandas.read_csv.html + """ + import pandas + + if compression == 'infer': + if self.name.endswith('.gz'): + compression = {'method': 'gzip'} + elif self.name.endswith('.bz2'): + compression = {'method': 'bz2'} + elif self.name.endswith('.xz'): + compression = {'method': 'xz'} + elif self.name.endswith('.zst'): + compression = {'method': 'zstd'} + else: + compression = None + + if compression is not None: + handle = BytesIO(self._get()) + try: + return pandas.read_csv(handle, compression=compression, encoding=encoding, **kwargs) + finally: + handle.close() + else: + handle = StringIO(self._get().decode(encoding)) + try: + return pandas.read_csv(handle, **kwargs) + finally: + handle.close() + + def read_json(self, **kwargs): + """Read the file contents as a parsed JSON object (dict, list, etc.).""" + return json.loads(self._get(), **kwargs) + + def read_h5ad(self) -> 'anndata.AnnData': + """Read an AnnData object from a file.""" + try: + import anndata as ad # noqa + except ImportError: + raise ImportError("The anndata library is required to read AnnData files. " + "Please install it using 'pip install anndata'.") + with BytesIO(self._get()) as handle: + return ad.read_h5ad(handle) + + def read_parquet(self, **kwargs) -> 'DataFrame': + """ + Read a Parquet file as a Pandas DataFrame. + + Requires ``pyarrow`` or ``fastparquet`` to be installed. + All keyword arguments are passed to :func:`pandas.read_parquet`. + """ + import pandas + return pandas.read_parquet(BytesIO(self._get()), **kwargs) + + def read_feather(self, **kwargs) -> 'DataFrame': + """ + Read a Feather file as a Pandas DataFrame. + + Requires ``pyarrow`` to be installed. + All keyword arguments are passed to :func:`pandas.read_feather`. + """ + import pandas + return pandas.read_feather(BytesIO(self._get()), **kwargs) + + def read_pickle(self, **kwargs): + """Read the file contents as a Python pickle object.""" + return pickle.loads(self._get(), **kwargs) + + def read_excel(self, **kwargs) -> 'DataFrame': + """ + Read an Excel file (``.xlsx`` / ``.xls``) as a Pandas DataFrame. + + Requires ``openpyxl`` (for ``.xlsx``) or ``xlrd`` (for ``.xls``). + All keyword arguments are passed to :func:`pandas.read_excel`. + """ + import pandas + return pandas.read_excel(BytesIO(self._get()), **kwargs) diff --git a/cirro/sdk/task.py b/cirro/sdk/task.py new file mode 100644 index 0000000..c900400 --- /dev/null +++ b/cirro/sdk/task.py @@ -0,0 +1,462 @@ +from functools import cached_property +from pathlib import PurePath +import re +from typing import List, Optional, TYPE_CHECKING + +from cirro_api_client.v1.errors import CirroException, UnexpectedStatus +from cirro_api_client.v1.models import Task +from cirro_api_client.v1.types import Unset +from cirro.models.file import FileAccessContext +from cirro.models.s3_path import S3Path +from cirro.sdk.exceptions import DataPortalAssetNotFound +from cirro.sdk.file_mixins import FileReadMixin + +if TYPE_CHECKING: + from cirro.cirro_client import CirroApi + + +class WorkDirFile(FileReadMixin): + """ + A file that lives in a Nextflow work directory or a dataset staging area. + + Each WorkDirFile either originated from another task's work directory + (``source_task`` is set) or was a primary/staged input to the workflow + (``source_task`` is ``None``). + """ + + def __init__( + self, + s3_uri: str, + client: 'CirroApi', + project_id: str, + size: Optional[int] = None, + source_task: Optional['DataPortalTask'] = None, + dataset_id: str = '' + ): + """ + Obtained from a task's ``inputs`` or ``outputs`` property. + + ```python + for task in dataset.tasks: + for f in task.inputs: + print(f.name, f.source_task) + ``` + """ + self._s3_uri = s3_uri + self._client = client + self._project_id = project_id + self._dataset_id = dataset_id + self._size = size + self._source_task = source_task + self._s3_path = S3Path(s3_uri) + + @property + def source_task(self) -> Optional['DataPortalTask']: + """The task that produced this file, or ``None`` for staged/primary inputs.""" + return self._source_task + + @property + def name(self) -> str: + """Filename (last component of the S3 URI).""" + return PurePath(self._s3_uri).name + + @property + def size(self) -> int: + """File size in bytes (fetched lazily via head_object if not pre-populated).""" + if self._size is None: + try: + s3 = self._get_s3_client() + resp = s3.head_object(Bucket=self._s3_path.bucket, Key=self._s3_path.key) + self._size = resp['ContentLength'] + except Exception as e: # NOSONAR + raise DataPortalAssetNotFound( + f"Could not determine size of {self.name!r} — " + f"the work directory may have been cleaned up: {e}" + ) from e + return self._size + + def _access_context(self) -> FileAccessContext: + """Return the appropriate FileAccessContext for this file's location.""" + return FileAccessContext.scratch_download( + project_id=self._project_id, + base_url=self._s3_path.base + ) + + def _get(self) -> bytes: + """Return the raw bytes of the file.""" + try: + return self._client.file.get_file_from_path(self._access_context(), self._s3_path.key) + except Exception as e: # NOSONAR + raise DataPortalAssetNotFound( + f"Could not read {self.name!r} — " + f"the work directory may have been cleaned up: {e}" + ) from e + + def _get_s3_client(self): + return self._client.file.get_aws_s3_client(self._access_context()) + + def __str__(self): + return self.name + + def __repr__(self): + return f'WorkDirFile(name={self.name!r})' + + +class DataPortalTask: + """ + Represents a single task from a Nextflow workflow execution. + + Task metadata (name, status, exit code, work directory, etc.) is read + from the workflow trace artifact. Log contents and input/output files are + fetched from the task's S3 work directory on demand. + """ + + def __init__( + self, + task: Task, + client: 'CirroApi', + project_id: str, + dataset_id: str = '', + all_tasks_ref: Optional[list] = None, + task_id: int = 0 + ): + """ + Obtained from a dataset's ``tasks`` property. + + ```python + for task in dataset.tasks: + print(task.name, task.status) + print(task.logs) + ``` + + Args: + task (Task): Task object returned by the execution API. + client (CirroApi): Authenticated CirroApi client. + project_id (str): ID of the project that owns this dataset. + dataset_id (str): ID of the dataset (execution) that owns this task. + all_tasks_ref (list): A shared list that will contain all tasks once they + are all built. Used by ``inputs`` to resolve ``source_task``. + task_id (int): Numeric index of this task in the execution's task list. + """ + self._task = task + self._client = client + self._project_id = project_id + self._dataset_id = dataset_id + self._all_tasks_ref: list = all_tasks_ref if all_tasks_ref is not None else [] + self._task_id = task_id + + # ------------------------------------------------------------------ # + # Task properties # + # ------------------------------------------------------------------ # + + @property + def task_id(self) -> int: + """Sequential task identifier — the 0-based index of this task in the execution's task list.""" + return self._task_id + + @property + def name(self) -> str: + """Full task name, e.g. ``NFCORE_RNASEQ:RNASEQ:TRIMGALORE (sample1)``.""" + return self._task.name + + @property + def status(self) -> str: + """Task status string, e.g. ``COMPLETED``, ``FAILED``, ``ABORTED``.""" + return self._task.status + + @property + def hash(self) -> str: + """Short hash prefix used by Nextflow, e.g. ``99/b42c07``. + + Note: this field is not available in cirro_api_client >= 1.5.0 and always returns ''. + """ + return '' + + @property + def work_dir(self) -> str: + """S3 URI of the task's work directory.""" + val = self._task.work_dir + if isinstance(val, Unset): + val = self._task_details.work_dir + if isinstance(val, Unset) or val is None: + return '' + return val + + @property + def native_id(self) -> str: + """Native job ID on the underlying executor (e.g. AWS Batch job ID).""" + val = self._task.native_job_id + if isinstance(val, Unset) or val is None: + return '' + return val + + @property + def command_line(self) -> str: + """The shell command that was executed for this task.""" + val = self._task.command_line + if isinstance(val, Unset) or val is None: + return '' + return val + + @property + def log_location(self) -> str: + """S3 URI or path to the task log file.""" + val = self._task.log_location + if isinstance(val, Unset) or val is None: + return '' + return val + + @cached_property + def exit_code(self) -> Optional[int]: + """Process exit code.""" + val = self._task.exit_code + if isinstance(val, Unset): + val = self._task_details.exit_code + if isinstance(val, Unset) or val is None: + return None + return val + + @cached_property + def _task_details(self) -> Task: + """Fetch full task details from the API (lazy, cached).""" + if not self._dataset_id or not self.native_id: + return self._task + detail = self._client.execution.get_task( + project_id=self._project_id, + dataset_id=self._dataset_id, + task_id=self.native_id + ) + return detail if detail is not None else self._task + + # ------------------------------------------------------------------ # + # Work-directory file access # + # ------------------------------------------------------------------ # + + def _get_access_context(self) -> FileAccessContext: + if not self.work_dir: + raise DataPortalAssetNotFound( + f"Task {self.name!r} has no work directory recorded in the trace" + ) + s3_path = S3Path(self.work_dir) + if self._dataset_id: + return FileAccessContext.scratch_download( + project_id=self._project_id, + dataset_id=self._dataset_id, + base_url=s3_path.base + ) + return FileAccessContext.download( + project_id=self._project_id, + base_url=s3_path.base + ) + + def _read_work_file(self, filename: str) -> str: + """ + Read a file from the task's work directory. + + Returns an empty string if the work directory has been cleaned up or + the file does not exist. + """ + if not self.work_dir: + return '' + try: + s3_path = S3Path(self.work_dir) + key = f'{s3_path.key}/{filename}' + access_context = self._get_access_context() + return self._client.file.get_file_from_path( + access_context, key + ).decode('utf-8', errors='replace') + except Exception: # NOSONAR + return '' + + @cached_property + def logs(self) -> str: + """ + Return the task log (combined stdout/stderr of the task process). + + Fetches via the Cirro execution API when a native job ID is available, + which works even when the S3 scratch bucket is not directly accessible. + Falls back to reading ``.command.log`` from the S3 work directory. + Returns an empty string if neither source can be read. + """ + if self._dataset_id and self.native_id: + try: + return self._client.execution.get_task_logs( + project_id=self._project_id, + dataset_id=self._dataset_id, + task_id=self.native_id + ) + except (CirroException, UnexpectedStatus): + pass + return self._read_work_file('.command.log') + + @cached_property + def script(self) -> str: + """ + Return the contents of ``.command.sh`` from the task's work directory. + + This is the actual shell script that Nextflow executed — the user's + pipeline code for this task. Falls back to parsing the script from the + ``WORKFLOW_LOGS`` artifact when the work directory is not accessible + (scratch bucket requires elevated permissions). + Returns an empty string if the script cannot be obtained. + """ + content = self._read_work_file('.command.sh') + if content: + return content + return self._script_from_workflow_log() + + def _script_from_workflow_log(self) -> str: + """ + Parse this task's shell script from the WORKFLOW_LOGS artifact. + + When a Nextflow task fails the head-node log includes a block: + + Error executing process > 'TASK_NAME' + ... + Command executed: +