diff --git a/.env_sample b/.env_sample index 4dab205b6..0100ffc37 100644 --- a/.env_sample +++ b/.env_sample @@ -69,7 +69,7 @@ AWS_STORAGE_PRIVATE_BUCKET_NAME=private AWS_S3_ENDPOINT_URL=http://minio:9000/ AWS_QUERYSTRING_AUTH=False # Optional URL rewriting in compute worker, format: FROM | TO -#WORKER_BUNDLE_URL_REWRITE=http://localhost:9000|http://minio:9000 +#WORKER_BUNDLE_URL_REWRITE=http://localhost:9000/|http://minio:9000/ # ----------------------------------------------------------------------------- diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index 573e72a94..623728cec 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -10,76 +10,137 @@ import tempfile import time import uuid +import requests +import websockets +import yaml +import docker +import logging +import sys # This is only needed for the pytests to pass from shutil import make_archive from urllib.error import HTTPError from urllib.parse import urlparse from urllib.request import urlretrieve from zipfile import ZipFile, BadZipFile -import docker -from rich.progress import Progress +from urllib3 import Retry + from rich.pretty import pprint -import requests -import websockets -import yaml -from billiard.exceptions import SoftTimeLimitExceeded -from celery import Celery, shared_task, utils +from rich.progress import Progress from kombu import Queue, Exchange -from urllib3 import Retry +from celery import Celery, shared_task, utils, signals +from billiard.exceptions import SoftTimeLimitExceeded + +from logs_loguru import configure_logging, colorize_run_args -# This is only needed for the pytests to pass -import sys +logger = logging.getLogger(__name__) sys.path.append("/app/src/settings/") -from celery import signals -import logging -logger = logging.getLogger(__name__) -from logs_loguru import configure_logging, colorize_run_args -import json +# ----------------------------------------------- +# Env Settings +# ----------------------------------------------- +class Settings: + + @staticmethod + def get(key, default=None): + """ + Return the env var value if set, else default; returns None if not set and no default. + """ + val = os.getenv(key) + + if val is not None: + return val + + if default is not None: + return default + + logger.warning(f"Environment variable '{key}' not found and no default provided.") + return None + + # Defaults + DEFAULT_SOCKETS = { + "docker": "unix:///var/run/docker.sock", + "podman": "unix:///run/user/1000/podman/podman.sock", + } + + # Settings variables + LOG_LEVEL = get("LOG_LEVEL", "INFO") + SERIALIZED = get("SERIALIZED", "false") + + USE_GPU = get("USE_GPU", "false") + CONTAINER_ENGINE_EXECUTABLE = get("CONTAINER_ENGINE_EXECUTABLE", "docker") + GPU_DEVICE = get("GPU_DEVICE", "nvidia.com/gpu=all") + + CONTAINER_SOCKET = get("CONTAINER_SOCKET", DEFAULT_SOCKETS.get(CONTAINER_ENGINE_EXECUTABLE)) + + HOST_DIRECTORY = get("HOST_DIRECTORY", "/tmp/codabench/") + MAX_CACHE_DIR_SIZE_GB = get("MAX_CACHE_DIR_SIZE_GB", 10) + + COMPETITION_CONTAINER_NETWORK_DISABLED = get("COMPETITION_CONTAINER_NETWORK_DISABLED", "False") + COMPETITION_CONTAINER_HTTP_PROXY = get("COMPETITION_CONTAINER_HTTP_PROXY", "") + COMPETITION_CONTAINER_HTTPS_PROXY = get("COMPETITION_CONTAINER_HTTPS_PROXY", "") + + CODALAB_IGNORE_CLEANUP_STEP = get("CODALAB_IGNORE_CLEANUP_STEP") + + WORKER_BUNDLE_URL_REWRITE = get("WORKER_BUNDLE_URL_REWRITE", "") + + +# ----------------------------------------------- +# Program Kind +# ----------------------------------------------- +class ProgramKind: + INGESTION_PROGRAM = "ingestion_program" + SCORING_PROGRAM = "scoring_program" + SUBMISSION = "submission" + + +# ----------------------------------------------- +# Submission status +# ----------------------------------------------- +class SubmissionStatus: + NONE = "None" + SUBMITTING = "Submitting" + SUBMITTED = "Submitted" + PREPARING = "Preparing" + RUNNING = "Running" + SCORING = "Scoring" + FINISHED = "Finished" + FAILED = "Failed" + + AVAILABLE_STATUSES = ( + NONE, + SUBMITTING, + SUBMITTED, + PREPARING, + RUNNING, + SCORING, + FINISHED, + FAILED, + ) # ----------------------------------------------- # Logging # ----------------------------------------------- configure_logging( - os.environ.get("LOG_LEVEL", "INFO"), os.environ.get("SERIALIZED", "false") + Settings.LOG_LEVEL, Settings.SERIALIZED ) # ----------------------------------------------- # Initialize Docker or Podman depending on .env # ----------------------------------------------- -if os.environ.get("USE_GPU", "false").lower() == "true": - logger.info( - "Using " - + os.environ.get("CONTAINER_ENGINE_EXECUTABLE", "docker").upper() - + "with GPU capabilites : " - + os.environ.get("GPU_DEVICE", "nvidia.com/gpu=all") - + " network_disabled for the competition container is set to " - + os.environ.get("COMPETITION_CONTAINER_NETWORK_DISABLED", "False") - ) -else: - logger.info( - "Using " - + os.environ.get("CONTAINER_ENGINE_EXECUTABLE", "docker").upper() - + " without GPU capabilities. " - + "network_disabled for the competition container is set to " - + os.environ.get("COMPETITION_CONTAINER_NETWORK_DISABLED", "False") - ) - -if os.environ.get("CONTAINER_ENGINE_EXECUTABLE", "docker").lower() == "docker": - client = docker.APIClient( - base_url=os.environ.get("CONTAINER_SOCKET", "unix:///var/run/docker.sock"), - version="auto", - ) -elif os.environ.get("CONTAINER_ENGINE_EXECUTABLE").lower() == "podman": - client = docker.APIClient( - base_url=os.environ.get( - "CONTAINER_SOCKET", "unix:///run/user/1000/podman/podman.sock" - ), - version="auto", - ) +logger.info( + f"Using {Settings.CONTAINER_ENGINE_EXECUTABLE.upper()} " + f"{'with GPU capabilities: ' + Settings.GPU_DEVICE if Settings.USE_GPU.lower() == 'true' else 'without GPU capabilities'}. " + f"Network disabled for the competition container is set to {Settings.COMPETITION_CONTAINER_NETWORK_DISABLED}" +) +# Intializing client +# NOTE: CONTAINER_SOCKET is set in Settings based on CONTAINER_ENGINE_EXECUTABLE which must has either podman or docker +client = docker.APIClient( + base_url=Settings.CONTAINER_SOCKET, + version="auto", +) # ----------------------------------------------- # Show Progress bar on downloading images @@ -147,8 +208,8 @@ def show_progress(line, progress): total=total, ) except Exception as e: - if os.environ.get("LOG_LEVEL", "info").lower() == "debug": - logger.exception("There was an error showing the progress bar") + if Settings.LOG_LEVEL.lower() == "debug": + logger.exception(f"There was an error showing the progress bar: {e}") # ----------------------------------------------- @@ -175,35 +236,11 @@ def setup_celery_logging(**kwargs): # Directories # ----------------------------------------------- # Setup base directories used by all submissions -# note: we need to pass this directory to docker/podman so it knows where to store things! -HOST_DIRECTORY = os.environ.get("HOST_DIRECTORY", "/tmp/codabench/") +# NOTE: we need to pass this directory to docker/podman so it knows where to store things! +HOST_DIRECTORY = Settings.HOST_DIRECTORY BASE_DIR = "/codabench/" # base directory inside the container CACHE_DIR = os.path.join(BASE_DIR, "cache") -MAX_CACHE_DIR_SIZE_GB = float(os.environ.get("MAX_CACHE_DIR_SIZE_GB", 10)) - - -# ----------------------------------------------- -# Submission status -# ----------------------------------------------- -# Status options for submissions -STATUS_NONE = "None" -STATUS_SUBMITTING = "Submitting" -STATUS_SUBMITTED = "Submitted" -STATUS_PREPARING = "Preparing" -STATUS_RUNNING = "Running" -STATUS_SCORING = "Scoring" -STATUS_FINISHED = "Finished" -STATUS_FAILED = "Failed" -AVAILABLE_STATUSES = ( - STATUS_NONE, - STATUS_SUBMITTING, - STATUS_SUBMITTED, - STATUS_PREPARING, - STATUS_RUNNING, - STATUS_SCORING, - STATUS_FINISHED, - STATUS_FAILED, -) +MAX_CACHE_DIR_SIZE_GB = float(Settings.MAX_CACHE_DIR_SIZE_GB) # ----------------------------------------------- @@ -232,7 +269,7 @@ def rewrite_bundle_url_if_needed(url): Example: http://localhost:9000|http://minio:9000 """ - rule = os.getenv("WORKER_BUNDLE_URL_REWRITE", "").strip() + rule = Settings.WORKER_BUNDLE_URL_REWRITE.strip() if not rule or "|" not in rule: return url src, dst = rule.split("|", 1) @@ -265,11 +302,11 @@ def run_wrapper(run_args): msg = f"Docker image pull failed: {msg}" else: msg = "Docker image pull failed." - run._update_status(STATUS_FAILED, extra_information=msg) + run._update_status(SubmissionStatus.FAILED, extra_information=msg) raise except SoftTimeLimitExceeded: run._update_status( - STATUS_FAILED, + SubmissionStatus.FAILED, extra_information="Execution time limit exceeded.", ) raise @@ -279,11 +316,11 @@ def run_wrapper(run_args): msg = f"Submission failed: {msg}. See logs for more details." else: msg = "Submission failed. See logs for more details." - run._update_status(STATUS_FAILED, extra_information=msg) + run._update_status(SubmissionStatus.FAILED, extra_information=msg) raise except Exception as e: # Catch any exception to avoid getting stuck in Running status - run._update_status(STATUS_FAILED, extra_information=traceback.format_exc()) + run._update_status(SubmissionStatus.FAILED, extra_information=traceback.format_exc()) raise finally: try: @@ -420,9 +457,11 @@ def __init__(self, run_args): self._get_stdout_stderr_file_names(run_args) ) self.ingestion_container_name = f"ingestion_{self.run_related_name}" - self.program_container_name = f"scoring_{self.run_related_name}" - self.program_data = run_args.get("program_data") - self.ingestion_program_data = run_args.get("ingestion_program") + self.scoring_program_container_name = f"scoring_{self.run_related_name}" + # self.program_data = run_args.get("program_data") + self.scoring_program_data = run_args.get("scoring_program_data") + self.submission_data = run_args.get("submission_data") + self.ingestion_program_data = run_args.get("ingestion_program_data") self.input_data = run_args.get("input_data") self.reference_data = run_args.get("reference_data") self.ingestion_only_during_scoring = run_args.get( @@ -465,7 +504,7 @@ async def watch_detailed_results(self): start = time.time() expiration_seconds = 60 - while self.watch and self.completed_program_counter < 2: + while self.watch and self.completed_program_counter < 1: if file_path: new_time = os.path.getmtime(file_path) if new_time != last_modified_time: @@ -573,9 +612,9 @@ def _update_submission(self, data): def _update_status(self, status, extra_information=None): # Update submission status - if status not in AVAILABLE_STATUSES: + if status not in SubmissionStatus.AVAILABLE_STATUSES: raise SubmissionException( - f"Status '{status}' is not in available statuses: {AVAILABLE_STATUSES}" + f"Status '{status}' is not in available statuses: {SubmissionStatus.AVAILABLE_STATUSES}" ) data = {"status": status, "status_details": extra_information} try: @@ -717,6 +756,90 @@ def _get_bundle(self, url, destination, cache=True): # Return the zip file path for other uses, e.g. for creating a MD5 hash to identify it return bundle_file + def _create_container( + self, + container_name: str, + command: str, + volumes_host: list, + volumes_config: dict + ): + """ + Helper to create and configure a container for ingestion, scoring, or submission. + Returns the container object. + """ + logger.info("Creating container with multiple configurations") + + cap_drop_list = [ + "AUDIT_WRITE", + "CHOWN", + "DAC_OVERRIDE", + "FOWNER", + "FSETID", + "KILL", + "MKNOD", + "NET_BIND_SERVICE", + "NET_RAW", + "SETFCAP", + "SETGID", + "SETPCAP", + "SETUID", + "SYS_CHROOT", + ] + + # Configure whether or not we use the GPU. Also setting auto_remove to False because + if Settings.CONTAINER_ENGINE_EXECUTABLE.lower() == "docker": + security_options = ["no-new-privileges"] + else: + security_options = ["label=disable"] + + # Setting the device ID like this allows users to specify which gpu to use in the .env file, with all being the default if no value is given + device_id = [Settings.GPU_DEVICE] + if Settings.USE_GPU.lower() == "true": + logger.info("Container configured with GPU capabilities") + host_config = client.create_host_config( + auto_remove=False, + cap_drop=cap_drop_list, + binds=volumes_config, + userns_mode="host", + security_opt=security_options, + device_requests=[ + { + "Driver": "cdi", + "DeviceIDs": device_id, + }, + ], + ) + else: + logger.info("Container configured with CPU capabilities") + host_config = client.create_host_config( + auto_remove=False, + cap_drop=cap_drop_list, + binds=volumes_config, + userns_mode="host", + security_opt=security_options, + ) + + # Creating container + # COMPETITION_CONTAINER_NETWORK_DISABLED: Disable or not the competition container access to Internet (False by default) + # HTTP and HTTPS proxy for the competition container if needed + container = client.create_container( + self.container_image, + name=container_name, + host_config=host_config, + detach=False, + volumes=volumes_host, + command=command, + working_dir="/app/program", + environment=[ + "PYTHONUNBUFFERED=1", + "http_proxy=" + Settings.COMPETITION_CONTAINER_HTTP_PROXY, + "https_proxy=" + Settings.COMPETITION_CONTAINER_HTTPS_PROXY, + ], + network_disabled=Settings.COMPETITION_CONTAINER_NETWORK_DISABLED.lower() == "true", + ) + + return container + async def _run_container_engine_cmd(self, container, kind): """This runs a command and asynchronously writes the data to both a storage file and a socket @@ -739,25 +862,25 @@ async def _run_container_engine_cmd(self, container, kind): try: websocket_url = f"{self.websocket_url}?kind={kind}" logger.debug( - "Connecting to " - + websocket_url - + "for container " - + str(container.get("Id")) + "Connecting to " + + websocket_url + + "for container " + + str(container.get("Id")) ) websocket = await asyncio.wait_for( websockets.connect(websocket_url), timeout=10.0 ) logger.debug( - "connected to " - + str(websocket_url) - + "for container " - + str(container.get("Id")) + "connected to " + + str(websocket_url) + + "for container " + + str(container.get("Id")) ) except Exception as e: logger.error( f"There was an error trying to connect to the websocket on the codabench instance: {e}" ) - if os.environ.get("LOG_LEVEL", "info").lower() == "debug": + if Settings.LOG_LEVEL.lower() == "debug": logger.exception(e) start = time.time() @@ -775,8 +898,7 @@ async def _run_container_engine_cmd(self, container, kind): # If we enter the for loop after the container exited, the program will get stuck if ( - client.inspect_container(container)["State"]["Status"].lower() - == "running" + client.inspect_container(container)["State"]["Status"].lower() == "running" ): logger.debug( "Show the logs and stream them to codabench " + container.get("Id") @@ -793,7 +915,7 @@ async def _run_container_engine_cmd(self, container, kind): ) except Exception as e: logger.error(e) - + # Errors elif log[1] is not None: stderr_chunks.append(log[1]) @@ -812,7 +934,7 @@ async def _run_container_engine_cmd(self, container, kind): logger.error( f"There was an error while starting the container and getting the logs: {e}" ) - if os.environ.get("LOG_LEVEL", "info").lower() == "debug": + if Settings.LOG_LEVEL.lower() == "debug": logger.exception(e) # Get the return code of the competition container once done @@ -832,10 +954,10 @@ async def _run_container_engine_cmd(self, container, kind): client.remove_container(container, force=True) logger.debug( - "Container " - + container.get("Id") - + "exited with status code : " - + str(return_Code["StatusCode"]) + "Container " + + container.get("Id") + + "exited with status code : " + + str(return_Code["StatusCode"]) ) except ( @@ -849,7 +971,7 @@ async def _run_container_engine_cmd(self, container, kind): finally: try: # Last chance of removing container - client.remove_container(container_id, force=True) + client.remove_container(container.get("Id"), force=True) except Exception: pass @@ -861,13 +983,13 @@ async def _run_container_engine_cmd(self, container, kind): "data": logs_Unified[0], "stream": logs_Unified[0], "continue": True, - "location": self.stdout if kind == "program" else self.ingestion_stdout, + "location": self.stdout if kind == ProgramKind.SCORING_PROGRAM else self.ingestion_stdout, }, "stderr": { "data": logs_Unified[1], "stream": logs_Unified[1], "continue": True, - "location": self.stderr if kind == "program" else self.ingestion_stderr, + "location": self.stderr if kind == ProgramKind.SCORING_PROGRAM else self.ingestion_stderr, }, } @@ -885,7 +1007,7 @@ def _get_host_path(self, *paths): path = os.path.join(*paths) # pull front of path, which points to the location inside the container - path = path[len(BASE_DIR) :] + path = path[len(BASE_DIR):] # add host to front, so when we run commands in the container on the host they # can be seen properly @@ -896,129 +1018,90 @@ def _get_host_path(self, *paths): return path - async def _run_program_directory(self, program_dir, kind): + async def _run_program_directory(self, kind, program_dir): """ - Function responsible for running program directory + Function responsible for running + - ingestion program + - scoring program + - submission Args: - - program_dir : can be either ingestion program or program/submission - - kind : either `program` or `ingestion` + kind: `ingestion_program` or `scoring_program` or `submission` + program_dir: path to the program to run """ - # If the directory doesn't even exist, move on + # Return if directory does not exist if not os.path.exists(program_dir): - logger.warning(f"{program_dir} not found, no program to execute") - + logger.warning(f"{program_dir} for {kind} not found, no program to execute") # Communicate that the program is closing self.completed_program_counter += 1 return + # Find metadata file. + # Raise error if metadata is not found for ingestion or scoring if os.path.exists(os.path.join(program_dir, "metadata.yaml")): metadata_path = "metadata.yaml" elif os.path.exists(os.path.join(program_dir, "metadata")): metadata_path = "metadata" else: - # Display a warning in logs when there is no metadata file in submission/program dir - if kind == "program": - logger.warning( - "Program directory missing metadata, assuming it's going to be handled by ingestion" - ) - # Copy submission files into prediction output - # This is useful for results submissions but wrongly uses storage - shutil.copytree(program_dir, self.output_dir) - return + if kind in [ProgramKind.INGESTION_PROGRAM, ProgramKind.SCORING_PROGRAM]: + error_message = f"{program_dir} for {kind} missing 'metadata.yaml/metadata' file." + logger.error(error_message) + raise SubmissionException(error_message) else: - raise SubmissionException( - "Program directory missing 'metadata.yaml/metadata'" - ) + logger.warning(f"{program_dir} for {kind} missing 'metadata.yaml/metadata' file. Assuming it is going to be handled by ingestion or scoring") + # Metadata file is found logger.info(f"Metadata path is {os.path.join(program_dir, metadata_path)}") + + # Reading metadata file to find command. + # Raise error if command is not found for ingestion or scoring with open(os.path.join(program_dir, metadata_path), "r") as metadata_file: - try: # try to find a command in the metadata, in other cases set metadata to None + command = None + try: metadata = yaml.load(metadata_file.read(), Loader=yaml.FullLoader) logger.info(f"Metadata contains:\n {metadata}") - if isinstance(metadata, dict): # command found + if isinstance(metadata, dict): command = metadata.get("command") - else: - command = None except yaml.YAMLError as e: logger.error("Error parsing YAML file: ", e) - print("Error parsing YAML file: ", e) - command = None - if not command and kind == "ingestion": + + if not command and kind in [ProgramKind.INGESTION_PROGRAM, ProgramKind.SCORING_PROGRAM]: raise SubmissionException( - "Program directory missing 'command' in metadata" + "Missing 'command' in metadata or metadata format is not correct!" ) - elif not command: + else: logger.warning( - f"Warning: {program_dir} has no command in metadata, continuing anyway " - f"(may be meant to be consumed by an ingestion program)" + "Missing 'command' in metadata or metadata format is not correct! Continuing anyway assuming it is going to be handled by ingestion or scoring" ) - return + + # Setting volume host and volumes config. + # To be used by `_create_container` function volumes_host = [ self._get_host_path(program_dir), self._get_host_path(self.output_dir), self.data_dir, + self._get_host_path(self.root_dir, "submission") ] volumes_config = { - volumes_host[0]: { - "bind": "/app/program", - "mode": "z", - }, - volumes_host[1]: { - "bind": "/app/output", - "mode": "z", - }, - volumes_host[2]: { - "bind": "/app/data", - "mode": "ro", - }, + volumes_host[0]: {"bind": "/app/program", "mode": "z"}, + volumes_host[1]: {"bind": "/app/output", "mode": "z"}, + volumes_host[2]: {"bind": "/app/data", "mode": "ro"}, + volumes_host[3]: {"bind": "/app/ingested_program", "mode": "ro"}, } - if kind == "ingestion": - # program here is either scoring program or submission, depends on if this ran during Prediction or Scoring - if self.ingestion_only_during_scoring and self.is_scoring: - # submission program moved to 'input/res' with shutil.move() above - ingested_program_location = "input/res" - else: - ingested_program_location = "program" - volumes_host.extend( - [self._get_host_path(self.root_dir, ingested_program_location)] - ) - tempvolumeConfig = { - volumes_host[-1]: { - "bind": "/app/ingested_program", - } - } - volumes_config.update(tempvolumeConfig) - - if self.is_scoring: - # For scoring programs, we want to have a shared directory just in case we have an ingestion program. - # This will add the share dir regardless of ingestion or scoring, as long as we're `is_scoring` + if kind == ProgramKind.SCORING_PROGRAM: + # For scoring program, we want to have a shared directory just in case we have an ingestion program. volumes_host.extend([self._get_host_path(self.root_dir, "shared")]) - tempvolumeConfig = { - volumes_host[-1]: { - "bind": "/app/shared", - } - } - volumes_config.update(tempvolumeConfig) + volumes_config.update({volumes_host[-1]: {"bind": "/app/shared"}}) - # Input from submission (or submission + ingestion combo) + # Input dir for scoring program volumes_host.extend([self._get_host_path(self.input_dir)]) - tempvolumeConfig = { - volumes_host[-1]: { - "bind": "/app/input", - } - } - volumes_config.update(tempvolumeConfig) + volumes_config.update({volumes_host[-1]: {"bind": "/app/input"}}) + # NOTE: self.input_data is valid when running an ingestion program and competition task has input data if self.input_data: - volumes_host.extend([self._get_host_path(self.root_dir, "input_data")]) - tempvolumeConfig = { - volumes_host[-1]: { - "bind": "/app/input_data", - } - } - volumes_config.update(tempvolumeConfig) + volumes_host.append(self._get_host_path(self.root_dir, "input_data")) + volumes_config.update({volumes_host[-1]: {"bind": "/app/input_data"}}) # Handle Legacy competitions by replacing anything in the run command command = replace_legacy_metadata_command( @@ -1027,96 +1110,16 @@ async def _run_program_directory(self, program_dir, kind): is_scoring=self.is_scoring, ingestion_only_during_scoring=self.ingestion_only_during_scoring, ) + logger.info("Container will be run with command: " + command) - cap_drop_list = [ - "AUDIT_WRITE", - "CHOWN", - "DAC_OVERRIDE", - "FOWNER", - "FSETID", - "KILL", - "MKNOD", - "NET_BIND_SERVICE", - "NET_RAW", - "SETFCAP", - "SETGID", - "SETPCAP", - "SETUID", - "SYS_CHROOT", - ] - # Configure whether or not we use the GPU. Also setting auto_remove to False because - if os.environ.get("CONTAINER_ENGINE_EXECUTABLE", "docker").lower() == "docker": - security_options = ["no-new-privileges"] - else: - security_options = ["label=disable"] - # Setting the device ID like this allows users to specify which gpu to use in the .env file, with all being the default if no value is given - device_id = [os.environ.get("GPU_DEVICE", "nvidia.com/gpu=all")] - if os.environ.get("USE_GPU", "false").lower() == "true": - logger.info("Running the container with GPU capabilities") - host_config = client.create_host_config( - auto_remove=False, - cap_drop=cap_drop_list, - binds=volumes_config, - userns_mode="host", - security_opt=security_options, - device_requests=[ - { - "Driver": "cdi", - "DeviceIDs": device_id, - }, - ], - ) - else: - host_config = client.create_host_config( - auto_remove=False, - cap_drop=cap_drop_list, - binds=volumes_config, - userns_mode="host", - security_opt=security_options, - ) - - logger.info("Running container with command " + command) - container_name = ( - self.ingestion_container_name - if kind == "ingestion" - else self.program_container_name - ) - # Disable or not the competition container access to Internet (False by default) - container_network_disabled = os.environ.get( - "COMPETITION_CONTAINER_NETWORK_DISABLED", "" - ) - - # HTTP and HTTPS proxy for the competition container if needed - competition_container_proxy_http = os.environ.get( - "COMPETITION_CONTAINER_HTTP_PROXY", "" - ) - competition_container_proxy_http = ( - "http_proxy=" + competition_container_proxy_http - ) - - competition_container_proxy_https = os.environ.get( - "COMPETITION_CONTAINER_HTTPS_PROXY", "" - ) - competition_container_proxy_https = ( - "https_proxy=" + competition_container_proxy_https - ) - - container = client.create_container( - self.container_image, - name=container_name, - host_config=host_config, - detach=False, - volumes=volumes_host, + # Create container with configurations + container = self._create_container( + container_name=self.ingestion_container_name, command=command, - working_dir="/app/program", - environment=[ - "PYTHONUNBUFFERED=1", - competition_container_proxy_http, - competition_container_proxy_https, - ], - network_disabled=container_network_disabled.lower() == "true", + volumes_host=volumes_host, + volumes_config=volumes_config ) - logger.debug("Created container : " + str(container)) + logger.debug("Created container: " + str(container)) logger.info("Volume configuration of the container: ") pprint(volumes_config) # This runs the container engine command and asynchronously passes data back via websocket @@ -1193,19 +1196,44 @@ def _prep_cache_dir(self, max_size=MAX_CACHE_DIR_SIZE_GB): else: logger.info("Cache directory does not need to be pruned!") + def _copy_submission_to_input_res(self): + """ + Temporary backward-compatibility function. + + Earlier, scoring programs expected submission files in ingestion output: + /app/input/res/ + + Newer changes expose submission under: + /app/ingested_program/ + + To avoid breaking older scoring programs, we copy the submission + directory into input/res + """ + + submission_directory = os.path.join(self.root_dir, "submission") + ingestion_res_directory = os.path.join(self.root_dir, "input/res") + + # copy from submission_directory ingestion_res_directory + try: + shutil.copytree(submission_directory, ingestion_res_directory, dirs_exist_ok=True) + logger.info(f"Copied submission files to input/res successfully") + + except Exception as e: + logger.error(f"Failed to copy submission to input/res: {e}") + def prepare(self): hostname = utils.nodenames.gethostname() if self.is_scoring: self._update_status( - STATUS_RUNNING, extra_information=f"scoring_hostname-{hostname}" + SubmissionStatus.RUNNING, extra_information=f"scoring_hostname-{hostname}" ) else: self._update_status( - STATUS_RUNNING, extra_information=f"ingestion_hostname-{hostname}" + SubmissionStatus.RUNNING, extra_information=f"ingestion_hostname-{hostname}" ) if not self.is_scoring: # Only during prediction step do we want to announce "preparing" - self._update_status(STATUS_PREPARING) + self._update_status(SubmissionStatus.PREPARING) # Setup cache and prune if it's out of control self._prep_cache_dir() @@ -1214,7 +1242,9 @@ def prepare(self): # sub folder. bundles = [ # (url to file, relative folder destination) - (self.program_data, "program"), + # (self.program_data, "program"), + (self.scoring_program_data, "scoring_program"), + (self.submission_data, "submission"), (self.ingestion_program_data, "ingestion_program"), (self.input_data, "input_data"), (self.reference_data, "input/ref"), @@ -1229,8 +1259,8 @@ def prepare(self): cache_this_bundle = path in ("input_data", "input/ref") zip_file = self._get_bundle(url, path, cache=cache_this_bundle) - # TODO: When we have `is_scoring_only` this needs to change... - if url == self.program_data and not self.is_scoring: + # Computing checksum of the submission file during ingestion run + if url == self.submission_data and not self.is_scoring: # We want to get a checksum of submissions so we can check if they are # a solution, or maybe match them against other submissions later logger.info(f"Beginning MD5 checksum of submission: {zip_file}") @@ -1238,6 +1268,11 @@ def prepare(self): logger.info(f"Checksum result: {checksum}") self._update_submission({"md5": checksum}) + # During scoring: copy submission files into "input/res" + if self.is_scoring: + # NOTE: Temporary compatibility hook (To be removed in the future) + self._copy_submission_to_input_res() + # For logging purposes let's dump file names for filename in glob.iglob(self.root_dir + "**/*.*", recursive=True): logger.info(filename) @@ -1247,19 +1282,44 @@ def prepare(self): self._get_container_image(self.container_image) def start(self): - program_dir = os.path.join(self.root_dir, "program") + + logger.info(f"Preparing to run: {ProgramKind.SCORING_PROGRAM if self.is_scoring else ProgramKind.INGESTION_PROGRAM}") + + # Define directories for ingestion, scoring and submission ingestion_program_dir = os.path.join(self.root_dir, "ingestion_program") + scoring_program_dir = os.path.join(self.root_dir, "scoring_program") + submission_dir = os.path.join(self.root_dir, "submission") - logger.info("Running scoring program, and then ingestion program") loop = asyncio.new_event_loop() # Set the event loop for the gather asyncio.set_event_loop(loop) - gathered_tasks = asyncio.gather( - self._run_program_directory(program_dir, kind="program"), - self._run_program_directory(ingestion_program_dir, kind="ingestion"), - self.watch_detailed_results(), - return_exceptions=True, - ) + + tasks = [] + if self.is_scoring: + # During scoring, run scoring program directory + tasks.append( + self._run_program_directory(kind=ProgramKind.SCORING_PROGRAM, program_dir=scoring_program_dir) + ) + + # If ingestion_only_during_scoring is true, we also run ingestion program directory in parallel to scoring program + if self.ingestion_only_during_scoring: + tasks.append( + self._run_program_directory(kind=ProgramKind.INGESTION_PROGRAM, program_dir=ingestion_program_dir) + ) + + # During scoring we watch for detailed results + tasks.append( + self.watch_detailed_results() + ) + else: + # During ingestion we run ingestion program directory and submission directory + tasks.extend([ + self._run_program_directory(kind=ProgramKind.INGESTION_PROGRAM, program_dir=ingestion_program_dir), + self._run_program_directory(kind=ProgramKind.SUBMISSION, program_dir=submission_dir) + ]) + + gathered_tasks = asyncio.gather(*tasks, return_exceptions=True) + task_results = [] # will store results/exceptions from gather signal.signal(signal.SIGALRM, alarm_handler) signal.alarm(self.execution_time_limit) @@ -1282,7 +1342,7 @@ def start(self): for kind, logs in self.logs.items(): containers_to_kill = [] containers_to_kill.append(self.ingestion_container_name) - containers_to_kill.append(self.program_container_name) + containers_to_kill.append(self.scoring_program_container_name) logger.debug( "Trying to kill and remove container " + str(containers_to_kill) ) @@ -1295,7 +1355,7 @@ def start(self): logger.error( f"There was a problem killing {containers_to_kill}: {e}" ) - if os.environ.get("LOG_LEVEL", "info").lower() == "debug": + if Settings.LOG_LEVEL.lower() == "debug": logger.exception(e) # Send data to be written to ingestion/scoring std_err self._update_submission(execution_time_limit_exceeded_data) @@ -1337,7 +1397,7 @@ def start(self): if kind == "ingestion": containers_to_kill = self.ingestion_container_name else: - containers_to_kill = self.program_container_name + containers_to_kill = self.scoring_program_container_name try: client.kill(containers_to_kill) client.remove_container(containers_to_kill, force=True) @@ -1347,7 +1407,7 @@ def start(self): logger.error( f"There was a problem killing {containers_to_kill}: {e}" ) - if os.environ.get("LOG_LEVEL", "info").lower() == "debug": + if Settings.LOG_LEVEL.lower() == "debug": logger.exception(e) if kind == "program": self.program_exit_code = return_code @@ -1372,7 +1432,10 @@ def start(self): if self.is_scoring: # Check if scoring program failed - program_results, _, _ = task_results + try: + program_results, _, _ = task_results + except: + program_results, _ = task_results # Gather returns either normal values or exception instances when return_exceptions=True had_async_exc = isinstance( program_results, BaseException @@ -1381,15 +1444,16 @@ def start(self): failed_rc = (program_rc is None) or (program_rc != 0) if had_async_exc or failed_rc: self._update_status( - STATUS_FAILED, + SubmissionStatus.FAILED, extra_information=f"program_rc={program_rc}, async={task_results}", ) # Raise so upstream marks failed immediately raise SubmissionException("Child task failed or non-zero return code") - self._update_status(STATUS_FINISHED) + + self._update_status(SubmissionStatus.FINISHED) else: - self._update_status(STATUS_SCORING) + self._update_status(SubmissionStatus.SCORING) def push_scores(self): """This is only ran at the end of the scoring step""" @@ -1461,7 +1525,7 @@ def push_output(self): self._put_dir(self.scoring_result, self.output_dir) def clean_up(self): - if os.environ.get("CODALAB_IGNORE_CLEANUP_STEP"): + if Settings.CODALAB_IGNORE_CLEANUP_STEP: logger.warning( f"CODALAB_IGNORE_CLEANUP_STEP mode enabled, ignoring clean up of: {self.root_dir}" ) diff --git a/src/apps/competitions/tasks.py b/src/apps/competitions/tasks.py index 4990d04f5..a91742c37 100644 --- a/src/apps/competitions/tasks.py +++ b/src/apps/competitions/tasks.py @@ -165,7 +165,7 @@ def _send_to_compute_worker(submission, is_scoring): if task.ingestion_program: if (task.ingestion_only_during_scoring and is_scoring) or (not task.ingestion_only_during_scoring and not is_scoring): - run_args['ingestion_program'] = make_url_sassy(task.ingestion_program.data_file.name) + run_args['ingestion_program_data'] = make_url_sassy(task.ingestion_program.data_file.name) if task.input_data and (not is_scoring or task.ingestion_only_during_scoring): run_args['input_data'] = make_url_sassy(task.input_data.data_file.name) @@ -175,9 +175,10 @@ def _send_to_compute_worker(submission, is_scoring): run_args['ingestion_only_during_scoring'] = task.ingestion_only_during_scoring - run_args['program_data'] = make_url_sassy( - path=submission.data.data_file.name if not is_scoring else task.scoring_program.data_file.name - ) + if is_scoring: + run_args['scoring_program_data'] = make_url_sassy(path=task.scoring_program.data_file.name) + + run_args['submission_data'] = make_url_sassy(path=submission.data.data_file.name) if not is_scoring: detail_names = SubmissionDetails.DETAILED_OUTPUT_NAMES_PREDICTION