From 70f4c12c65ff5068efc83dc924b878131b74a86d Mon Sep 17 00:00:00 2001 From: Ihsan Ullah Date: Fri, 27 Mar 2026 01:53:34 +0500 Subject: [PATCH 1/2] reorganzing and cleaning of compute worker --- compute_worker/compute_worker.py | 323 +++++++++++++++++-------------- 1 file changed, 177 insertions(+), 146 deletions(-) diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index 573e72a94..e7d78ba8b 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -10,75 +10,163 @@ import tempfile import time import uuid +import requests +import websockets +import yaml +import docker +import logging +import sys # This is only needed for the pytests to pass from shutil import make_archive from urllib.error import HTTPError from urllib.parse import urlparse from urllib.request import urlretrieve from zipfile import ZipFile, BadZipFile -import docker -from rich.progress import Progress +from urllib3 import Retry + from rich.pretty import pprint -import requests -import websockets -import yaml -from billiard.exceptions import SoftTimeLimitExceeded -from celery import Celery, shared_task, utils +from rich.progress import Progress from kombu import Queue, Exchange -from urllib3 import Retry +from celery import Celery, shared_task, utils, signals +from billiard.exceptions import SoftTimeLimitExceeded -# This is only needed for the pytests to pass -import sys +from logs_loguru import configure_logging, colorize_run_args + +logger = logging.getLogger(__name__) sys.path.append("/app/src/settings/") -from celery import signals -import logging -logger = logging.getLogger(__name__) -from logs_loguru import configure_logging, colorize_run_args -import json +# ----------------------------------------------- +# Env Settings +# ----------------------------------------------- +class Settings: + + @staticmethod + def get(key, default=None): + """ + Return the env var value if set, else default; returns None if not set and no default. + """ + val = os.getenv(key) + + if val is not None: + return val + + if default is not None: + return default + + logger.warning(f"Environment variable '{key}' not found and no default provided.") + return None + + @staticmethod + def to_bool(val): + try: + if isinstance(val, bool): + return val + + val_str = str(val).strip() + + if val_str in ("true", "True", "TRUE", "1"): + return True + if val_str in ("false", "False", "FALSE", "0"): + return False + + logger.warning(f"Failed to parse boolean from '{val}'") + return val + + except Exception as e: + logger.warning(f"Failed to parse boolean from '{val}': {e}") + return val + + # Constants + DOCKER = "docker" + PODMAN = "podman" + LOG_LEVEL_DEBUG = "debug" + + # Defaults + DEFAULT_SOCKETS = { + DOCKER: "unix:///var/run/docker.sock", + PODMAN: "unix:///run/user/1000/podman/podman.sock", + } + + # Settings variables + LOG_LEVEL = get("LOG_LEVEL", "INFO").lower() + SERIALIZED = get("SERIALIZED", "false") + + USE_GPU = to_bool(get("USE_GPU", "false")) + CONTAINER_ENGINE_EXECUTABLE = get("CONTAINER_ENGINE_EXECUTABLE", DOCKER).lower() + GPU_DEVICE = get("GPU_DEVICE", "nvidia.com/gpu=all") + + CONTAINER_SOCKET = get("CONTAINER_SOCKET", DEFAULT_SOCKETS.get(CONTAINER_ENGINE_EXECUTABLE)) + + HOST_DIRECTORY = get("HOST_DIRECTORY", "/tmp/codabench/") + MAX_CACHE_DIR_SIZE_GB = get("MAX_CACHE_DIR_SIZE_GB", 10) + + COMPETITION_CONTAINER_NETWORK_DISABLED = to_bool(get("COMPETITION_CONTAINER_NETWORK_DISABLED", "False")) + COMPETITION_CONTAINER_HTTP_PROXY = get("COMPETITION_CONTAINER_HTTP_PROXY", "") + COMPETITION_CONTAINER_HTTPS_PROXY = get("COMPETITION_CONTAINER_HTTPS_PROXY", "") + + CODALAB_IGNORE_CLEANUP_STEP = to_bool(get("CODALAB_IGNORE_CLEANUP_STEP")) + + WORKER_BUNDLE_URL_REWRITE = get("WORKER_BUNDLE_URL_REWRITE", "") + + +# ----------------------------------------------- +# Program Kind +# ----------------------------------------------- +# NOTE: This is not used, to be used in next PR +class ProgramKind: + INGESTION_PROGRAM = "ingestion_program" + SCORING_PROGRAM = "scoring_program" + SUBMISSION = "submission" + + +# ----------------------------------------------- +# Submission status +# ----------------------------------------------- +class SubmissionStatus: + NONE = "None" + SUBMITTING = "Submitting" + SUBMITTED = "Submitted" + PREPARING = "Preparing" + RUNNING = "Running" + SCORING = "Scoring" + FINISHED = "Finished" + FAILED = "Failed" + + AVAILABLE_STATUSES = ( + NONE, + SUBMITTING, + SUBMITTED, + PREPARING, + RUNNING, + SCORING, + FINISHED, + FAILED, + ) # ----------------------------------------------- # Logging # ----------------------------------------------- configure_logging( - os.environ.get("LOG_LEVEL", "INFO"), os.environ.get("SERIALIZED", "false") + Settings.LOG_LEVEL, Settings.SERIALIZED ) # ----------------------------------------------- # Initialize Docker or Podman depending on .env # ----------------------------------------------- -if os.environ.get("USE_GPU", "false").lower() == "true": - logger.info( - "Using " - + os.environ.get("CONTAINER_ENGINE_EXECUTABLE", "docker").upper() - + "with GPU capabilites : " - + os.environ.get("GPU_DEVICE", "nvidia.com/gpu=all") - + " network_disabled for the competition container is set to " - + os.environ.get("COMPETITION_CONTAINER_NETWORK_DISABLED", "False") - ) -else: - logger.info( - "Using " - + os.environ.get("CONTAINER_ENGINE_EXECUTABLE", "docker").upper() - + " without GPU capabilities. " - + "network_disabled for the competition container is set to " - + os.environ.get("COMPETITION_CONTAINER_NETWORK_DISABLED", "False") - ) +logger.info( + f"Using {Settings.CONTAINER_ENGINE_EXECUTABLE} " + f"{'with GPU capabilities: ' + Settings.GPU_DEVICE if Settings.USE_GPU else 'without GPU capabilities'}. " + f"Network disabled for the competition container is set to {Settings.COMPETITION_CONTAINER_NETWORK_DISABLED}" +) -if os.environ.get("CONTAINER_ENGINE_EXECUTABLE", "docker").lower() == "docker": - client = docker.APIClient( - base_url=os.environ.get("CONTAINER_SOCKET", "unix:///var/run/docker.sock"), - version="auto", - ) -elif os.environ.get("CONTAINER_ENGINE_EXECUTABLE").lower() == "podman": - client = docker.APIClient( - base_url=os.environ.get( - "CONTAINER_SOCKET", "unix:///run/user/1000/podman/podman.sock" - ), - version="auto", - ) +# Intializing client +# NOTE: CONTAINER_SOCKET is set in Settings based on CONTAINER_ENGINE_EXECUTABLE which must has either podman or docker +client = docker.APIClient( + base_url=Settings.CONTAINER_SOCKET, + version="auto", +) # ----------------------------------------------- @@ -147,8 +235,8 @@ def show_progress(line, progress): total=total, ) except Exception as e: - if os.environ.get("LOG_LEVEL", "info").lower() == "debug": - logger.exception("There was an error showing the progress bar") + if Settings.LOG_LEVEL == Settings.DEBUG: + logger.exception(f"There was an error showing the progress bar: {e}") # ----------------------------------------------- @@ -175,35 +263,11 @@ def setup_celery_logging(**kwargs): # Directories # ----------------------------------------------- # Setup base directories used by all submissions -# note: we need to pass this directory to docker/podman so it knows where to store things! -HOST_DIRECTORY = os.environ.get("HOST_DIRECTORY", "/tmp/codabench/") +# NOTE: we need to pass this directory to docker/podman so it knows where to store things! +HOST_DIRECTORY = Settings.HOST_DIRECTORY BASE_DIR = "/codabench/" # base directory inside the container CACHE_DIR = os.path.join(BASE_DIR, "cache") -MAX_CACHE_DIR_SIZE_GB = float(os.environ.get("MAX_CACHE_DIR_SIZE_GB", 10)) - - -# ----------------------------------------------- -# Submission status -# ----------------------------------------------- -# Status options for submissions -STATUS_NONE = "None" -STATUS_SUBMITTING = "Submitting" -STATUS_SUBMITTED = "Submitted" -STATUS_PREPARING = "Preparing" -STATUS_RUNNING = "Running" -STATUS_SCORING = "Scoring" -STATUS_FINISHED = "Finished" -STATUS_FAILED = "Failed" -AVAILABLE_STATUSES = ( - STATUS_NONE, - STATUS_SUBMITTING, - STATUS_SUBMITTED, - STATUS_PREPARING, - STATUS_RUNNING, - STATUS_SCORING, - STATUS_FINISHED, - STATUS_FAILED, -) +MAX_CACHE_DIR_SIZE_GB = float(Settings.MAX_CACHE_DIR_SIZE_GB) # ----------------------------------------------- @@ -232,7 +296,7 @@ def rewrite_bundle_url_if_needed(url): Example: http://localhost:9000|http://minio:9000 """ - rule = os.getenv("WORKER_BUNDLE_URL_REWRITE", "").strip() + rule = Settings.WORKER_BUNDLE_URL_REWRITE.strip() if not rule or "|" not in rule: return url src, dst = rule.split("|", 1) @@ -265,13 +329,11 @@ def run_wrapper(run_args): msg = f"Docker image pull failed: {msg}" else: msg = "Docker image pull failed." - run._update_status(STATUS_FAILED, extra_information=msg) + run._update_status(SubmissionStatus.FAILED, extra_information=msg) raise except SoftTimeLimitExceeded: run._update_status( - STATUS_FAILED, - extra_information="Execution time limit exceeded.", - ) + SubmissionStatus.FAILED, extra_information="Execution time limit exceeded.") raise except SubmissionException as e: msg = str(e).strip() @@ -279,11 +341,11 @@ def run_wrapper(run_args): msg = f"Submission failed: {msg}. See logs for more details." else: msg = "Submission failed. See logs for more details." - run._update_status(STATUS_FAILED, extra_information=msg) + run._update_status(SubmissionStatus.FAILED, extra_information=msg) raise except Exception as e: # Catch any exception to avoid getting stuck in Running status - run._update_status(STATUS_FAILED, extra_information=traceback.format_exc()) + run._update_status(SubmissionStatus.FAILED, extra_information=traceback.format_exc()) raise finally: try: @@ -573,9 +635,9 @@ def _update_submission(self, data): def _update_status(self, status, extra_information=None): # Update submission status - if status not in AVAILABLE_STATUSES: + if status not in SubmissionStatus.AVAILABLE_STATUSES: raise SubmissionException( - f"Status '{status}' is not in available statuses: {AVAILABLE_STATUSES}" + f"Status '{status}' is not in available statuses: {SubmissionStatus.AVAILABLE_STATUSES}" ) data = {"status": status, "status_details": extra_information} try: @@ -738,26 +800,18 @@ async def _run_container_engine_cmd(self, container, kind): websocket = None try: websocket_url = f"{self.websocket_url}?kind={kind}" - logger.debug( - "Connecting to " - + websocket_url - + "for container " - + str(container.get("Id")) - ) + logger.debug(f"Connecting to {websocket_url} for container {str(container.get('Id'))}") websocket = await asyncio.wait_for( websockets.connect(websocket_url), timeout=10.0 ) - logger.debug( - "connected to " - + str(websocket_url) - + "for container " - + str(container.get("Id")) - ) + logger.debug(f"connected to {websocket_url} for container {str(container.get('Id'))}") + except Exception as e: logger.error( f"There was an error trying to connect to the websocket on the codabench instance: {e}" ) - if os.environ.get("LOG_LEVEL", "info").lower() == "debug": + + if Settings.LOG_LEVEL == Settings.LOG_LEVEL_DEBUG: logger.exception(e) start = time.time() @@ -775,8 +829,7 @@ async def _run_container_engine_cmd(self, container, kind): # If we enter the for loop after the container exited, the program will get stuck if ( - client.inspect_container(container)["State"]["Status"].lower() - == "running" + client.inspect_container(container)["State"]["Status"].lower() == "running" ): logger.debug( "Show the logs and stream them to codabench " + container.get("Id") @@ -793,7 +846,7 @@ async def _run_container_engine_cmd(self, container, kind): ) except Exception as e: logger.error(e) - + # Errors elif log[1] is not None: stderr_chunks.append(log[1]) @@ -812,7 +865,7 @@ async def _run_container_engine_cmd(self, container, kind): logger.error( f"There was an error while starting the container and getting the logs: {e}" ) - if os.environ.get("LOG_LEVEL", "info").lower() == "debug": + if Settings.LOG_LEVEL == Settings.LOG_LEVEL_DEBUG: logger.exception(e) # Get the return code of the competition container once done @@ -831,12 +884,7 @@ async def _run_container_engine_cmd(self, container, kind): logger.error(e) client.remove_container(container, force=True) - logger.debug( - "Container " - + container.get("Id") - + "exited with status code : " - + str(return_Code["StatusCode"]) - ) + logger.debug(f"Container {container.get('Id')} exited with status code : {str(return_Code['StatusCode'])}") except ( requests.exceptions.ReadTimeout, @@ -849,7 +897,7 @@ async def _run_container_engine_cmd(self, container, kind): finally: try: # Last chance of removing container - client.remove_container(container_id, force=True) + client.remove_container(container.get("Id"), force=True) except Exception: pass @@ -885,7 +933,7 @@ def _get_host_path(self, *paths): path = os.path.join(*paths) # pull front of path, which points to the location inside the container - path = path[len(BASE_DIR) :] + path = path[len(BASE_DIR):] # add host to front, so when we run commands in the container on the host they # can be seen properly @@ -1044,14 +1092,16 @@ async def _run_program_directory(self, program_dir, kind): "SETUID", "SYS_CHROOT", ] + # Configure whether or not we use the GPU. Also setting auto_remove to False because - if os.environ.get("CONTAINER_ENGINE_EXECUTABLE", "docker").lower() == "docker": + if Settings.CONTAINER_ENGINE_EXECUTABLE == Settings.DOCKER: security_options = ["no-new-privileges"] else: security_options = ["label=disable"] + # Setting the device ID like this allows users to specify which gpu to use in the .env file, with all being the default if no value is given - device_id = [os.environ.get("GPU_DEVICE", "nvidia.com/gpu=all")] - if os.environ.get("USE_GPU", "false").lower() == "true": + device_id = [Settings.GPU_DEVICE] + if Settings.USE_GPU: logger.info("Running the container with GPU capabilities") host_config = client.create_host_config( auto_remove=False, @@ -1081,26 +1131,10 @@ async def _run_program_directory(self, program_dir, kind): if kind == "ingestion" else self.program_container_name ) - # Disable or not the competition container access to Internet (False by default) - container_network_disabled = os.environ.get( - "COMPETITION_CONTAINER_NETWORK_DISABLED", "" - ) + # Creating container + # COMPETITION_CONTAINER_NETWORK_DISABLED: Disable or not the competition container access to Internet (False by default) # HTTP and HTTPS proxy for the competition container if needed - competition_container_proxy_http = os.environ.get( - "COMPETITION_CONTAINER_HTTP_PROXY", "" - ) - competition_container_proxy_http = ( - "http_proxy=" + competition_container_proxy_http - ) - - competition_container_proxy_https = os.environ.get( - "COMPETITION_CONTAINER_HTTPS_PROXY", "" - ) - competition_container_proxy_https = ( - "https_proxy=" + competition_container_proxy_https - ) - container = client.create_container( self.container_image, name=container_name, @@ -1111,12 +1145,13 @@ async def _run_program_directory(self, program_dir, kind): working_dir="/app/program", environment=[ "PYTHONUNBUFFERED=1", - competition_container_proxy_http, - competition_container_proxy_https, + "http_proxy=" + Settings.COMPETITION_CONTAINER_HTTP_PROXY, + "https_proxy=" + Settings.COMPETITION_CONTAINER_HTTPS_PROXY, ], - network_disabled=container_network_disabled.lower() == "true", + network_disabled=Settings.COMPETITION_CONTAINER_NETWORK_DISABLED, ) - logger.debug("Created container : " + str(container)) + + logger.debug("Created container: " + str(container)) logger.info("Volume configuration of the container: ") pprint(volumes_config) # This runs the container engine command and asynchronously passes data back via websocket @@ -1196,16 +1231,12 @@ def _prep_cache_dir(self, max_size=MAX_CACHE_DIR_SIZE_GB): def prepare(self): hostname = utils.nodenames.gethostname() if self.is_scoring: - self._update_status( - STATUS_RUNNING, extra_information=f"scoring_hostname-{hostname}" - ) + self._update_status(SubmissionStatus.RUNNING, extra_information=f"scoring_hostname-{hostname}") else: - self._update_status( - STATUS_RUNNING, extra_information=f"ingestion_hostname-{hostname}" - ) + self._update_status(SubmissionStatus.RUNNING, extra_information=f"ingestion_hostname-{hostname}") if not self.is_scoring: # Only during prediction step do we want to announce "preparing" - self._update_status(STATUS_PREPARING) + self._update_status(SubmissionStatus.PREPARING) # Setup cache and prune if it's out of control self._prep_cache_dir() @@ -1295,7 +1326,7 @@ def start(self): logger.error( f"There was a problem killing {containers_to_kill}: {e}" ) - if os.environ.get("LOG_LEVEL", "info").lower() == "debug": + if Settings.LOG_LEVEL == Settings.LOG_LEVEL_DEBUG: logger.exception(e) # Send data to be written to ingestion/scoring std_err self._update_submission(execution_time_limit_exceeded_data) @@ -1347,7 +1378,7 @@ def start(self): logger.error( f"There was a problem killing {containers_to_kill}: {e}" ) - if os.environ.get("LOG_LEVEL", "info").lower() == "debug": + if Settings.LOG_LEVEL == Settings.LOG_LEVEL_DEBUG: logger.exception(e) if kind == "program": self.program_exit_code = return_code @@ -1381,15 +1412,15 @@ def start(self): failed_rc = (program_rc is None) or (program_rc != 0) if had_async_exc or failed_rc: self._update_status( - STATUS_FAILED, + SubmissionStatus.FAILED, extra_information=f"program_rc={program_rc}, async={task_results}", ) # Raise so upstream marks failed immediately raise SubmissionException("Child task failed or non-zero return code") - self._update_status(STATUS_FINISHED) + self._update_status(SubmissionStatus.FINISHED) else: - self._update_status(STATUS_SCORING) + self._update_status(SubmissionStatus.SCORING) def push_scores(self): """This is only ran at the end of the scoring step""" @@ -1461,7 +1492,7 @@ def push_output(self): self._put_dir(self.scoring_result, self.output_dir) def clean_up(self): - if os.environ.get("CODALAB_IGNORE_CLEANUP_STEP"): + if Settings.CODALAB_IGNORE_CLEANUP_STEP: logger.warning( f"CODALAB_IGNORE_CLEANUP_STEP mode enabled, ignoring clean up of: {self.root_dir}" ) From ae39fd496f13e8b0abaca3e3f0056e98e36c566f Mon Sep 17 00:00:00 2001 From: Ihsan Ullah Date: Fri, 27 Mar 2026 02:02:02 +0500 Subject: [PATCH 2/2] minor updates --- compute_worker/compute_worker.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index e7d78ba8b..fec86936a 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -107,7 +107,7 @@ def to_bool(val): CODALAB_IGNORE_CLEANUP_STEP = to_bool(get("CODALAB_IGNORE_CLEANUP_STEP")) - WORKER_BUNDLE_URL_REWRITE = get("WORKER_BUNDLE_URL_REWRITE", "") + WORKER_BUNDLE_URL_REWRITE = get("WORKER_BUNDLE_URL_REWRITE", "").strip() # ----------------------------------------------- @@ -235,7 +235,7 @@ def show_progress(line, progress): total=total, ) except Exception as e: - if Settings.LOG_LEVEL == Settings.DEBUG: + if Settings.LOG_LEVEL == Settings.LOG_LEVEL_DEBUG: logger.exception(f"There was an error showing the progress bar: {e}") @@ -296,7 +296,7 @@ def rewrite_bundle_url_if_needed(url): Example: http://localhost:9000|http://minio:9000 """ - rule = Settings.WORKER_BUNDLE_URL_REWRITE.strip() + rule = Settings.WORKER_BUNDLE_URL_REWRITE if not rule or "|" not in rule: return url src, dst = rule.split("|", 1)