diff --git a/.github/workflows/infiniops-ci-v2-shadow.yml b/.github/workflows/infiniops-ci-v2-shadow.yml index af9f886..bf5d263 100644 --- a/.github/workflows/infiniops-ci-v2-shadow.yml +++ b/.github/workflows/infiniops-ci-v2-shadow.yml @@ -403,17 +403,17 @@ jobs: fi queue-watchdog: - name: Fail queued CI v2 jobs after 10 minutes + name: Fail queued CI v2 jobs after 30 minutes needs: prepare runs-on: ubuntu-latest steps: - - name: Fail queued CI v2 jobs after 10 minutes + - name: Fail queued CI v2 jobs after 30 minutes env: GH_TOKEN: ${{ github.token }} REPOSITORY: ${{ github.repository }} RUN_ID: ${{ github.run_id }} MATRIX_JSON: ${{ needs.prepare.outputs.matrix_json_for_unittest }} - QUEUE_TIMEOUT_SECONDS: 600 + QUEUE_TIMEOUT_SECONDS: 1800 POLL_INTERVAL_SECONDS: 15 run: | set -euo pipefail @@ -568,7 +568,7 @@ jobs: if time.monotonic() >= deadline: if queued: - print("CI v2 jobs still queued after 10 minutes:", file=sys.stderr) + print("CI v2 jobs still queued after 30 minutes:", file=sys.stderr) for job in queued: print(f"- {job.get('name')} {job.get('html_url')}", file=sys.stderr) sys.exit(1) diff --git a/.github/workflows/infiniops-ci.yml b/.github/workflows/infiniops-ci.yml index 2012d0a..b18777b 100644 --- a/.github/workflows/infiniops-ci.yml +++ b/.github/workflows/infiniops-ci.yml @@ -468,18 +468,18 @@ jobs: "${IMAGE_TAG}" queue-watchdog: - name: Fail queued CI jobs after 10 minutes + name: Fail queued CI jobs after 30 minutes needs: prepare if: contains(fromJSON(needs.prepare.outputs.job_types_with_jobs), 'unittest') runs-on: ubuntu-latest steps: - - name: Fail queued CI jobs after 10 minutes + - name: Fail queued CI jobs after 30 minutes env: GH_TOKEN: ${{ github.token }} REPOSITORY: ${{ github.repository }} RUN_ID: ${{ github.run_id }} MATRIX_JSON: ${{ needs.prepare.outputs.matrix_json_for_unittest }} - QUEUE_TIMEOUT_SECONDS: 600 + QUEUE_TIMEOUT_SECONDS: 1800 POLL_INTERVAL_SECONDS: 15 run: | set -euo pipefail @@ -632,7 +632,7 @@ jobs: if time.monotonic() >= deadline: if queued: - print("CI jobs still queued after 10 minutes:", file=sys.stderr) + print("CI jobs still queued after 30 minutes:", file=sys.stderr) for job in queued: print(f"- {job.get('name')} {job.get('html_url')}", file=sys.stderr) sys.exit(1) diff --git a/ci_resource.py b/ci_resource.py index f25137f..b1c3592 100644 --- a/ci_resource.py +++ b/ci_resource.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 """Resource detection and allocation for CI Runner Agent.""" + from __future__ import annotations import json @@ -27,6 +28,8 @@ "ascend": "ASCEND_VISIBLE_DEVICES", } +PROCESS_EXCLUSIVE_PLATFORMS = {"ascend", "iluvatar"} + @dataclass class GpuInfo: @@ -34,6 +37,8 @@ class GpuInfo: memory_used_mb: float memory_total_mb: float utilization_pct: float + process_count: int = 0 + process_pids: tuple[int, ...] = () @dataclass @@ -85,11 +90,17 @@ def detect_gpus(self) -> list[GpuInfo]: if self._platform == "cambricon": return self._detect_gpus_cambricon() + if self._platform == "iluvatar": + return self._detect_gpus_iluvatar() + if self._platform == "ascend": return self._detect_gpus_ascend() tool = self.GPU_QUERY_TOOLS.get(self._platform) + return self._detect_gpus_csv(tool) + + def _detect_gpus_csv(self, tool) -> list[GpuInfo]: if not tool: return [] @@ -132,6 +143,66 @@ def detect_gpus(self) -> list[GpuInfo]: return gpus + def _detect_gpus_iluvatar(self) -> list[GpuInfo]: + tool = self.GPU_QUERY_TOOLS.get("iluvatar") + if not tool: + return [] + + gpus = self._detect_gpus_csv(tool) + + if not gpus: + return [] + + try: + raw_result = subprocess.run( + [tool], + capture_output=True, + text=True, + timeout=10, + ) + except (FileNotFoundError, subprocess.TimeoutExpired): + return gpus + + if raw_result.returncode != 0: + return gpus + + process_pids: dict[int, list[int]] = {} + in_process_table = False + + for line in raw_result.stdout.splitlines(): + if "Processes:" in line: + in_process_table = True + continue + + if not in_process_table: + continue + + content = line.strip().strip("|").strip() + tokens = content.split() + + if len(tokens) < 2 or not tokens[0].isdigit() or not tokens[1].isdigit(): + continue + + try: + gpu_index = int(tokens[0]) + pid = int(tokens[1]) + except ValueError: + continue + + process_pids.setdefault(gpu_index, []).append(pid) + + return [ + GpuInfo( + index=g.index, + memory_used_mb=g.memory_used_mb, + memory_total_mb=g.memory_total_mb, + utilization_pct=g.utilization_pct, + process_count=len(process_pids.get(g.index, [])), + process_pids=tuple(process_pids.get(g.index, ())), + ) + for g in gpus + ] + def _detect_gpus_metax(self) -> list[GpuInfo]: """Parse mx-smi output for MetaX GPUs. @@ -368,6 +439,22 @@ def _detect_gpus_ascend(self) -> list[GpuInfo]: gpus = [] lines = result.stdout.splitlines() + process_pids: dict[int, list[int]] = {} + + for line in lines: + process_m = re.match(r"^\|\s*(\d+)\s+\d+\s*\|\s*(\d+)\s*\|", line) + + if not process_m: + continue + + try: + npu_index = int(process_m.group(1)) + pid = int(process_m.group(2)) + except ValueError: + continue + + process_pids.setdefault(npu_index, []).append(pid) + i = 0 while i < len(lines): @@ -377,7 +464,7 @@ def _detect_gpus_ascend(self) -> list[GpuInfo]: m1 = re.match(r"^\|\s+(\d+)\s+", line) - if m1 and i + 1 < len(lines): + if m1 and i + 1 < len(lines) and re.search(r"\b(910|310)\w*\b", line): try: npu_index = int(m1.group(1)) row2 = lines[i + 1] @@ -402,6 +489,8 @@ def _detect_gpus_ascend(self) -> list[GpuInfo]: memory_used_mb=used_mb, memory_total_mb=total_mb, utilization_pct=util_pct, + process_count=len(process_pids.get(npu_index, [])), + process_pids=tuple(process_pids.get(npu_index, ())), ) ) except (ValueError, AttributeError): @@ -439,7 +528,13 @@ def get_free_gpus(self) -> list[int]: """Return GPU indices with utilization below threshold.""" gpus = self.detect_gpus() return [ - g.index for g in gpus if g.utilization_pct < self._utilization_threshold + g.index + for g in gpus + if g.utilization_pct < self._utilization_threshold + and ( + self._platform not in PROCESS_EXCLUSIVE_PLATFORMS + or g.process_count == 0 + ) ] def allocate(self, gpu_count, memory_mb=0) -> tuple[list[int], bool]: @@ -469,6 +564,10 @@ def allocate(self, gpu_count, memory_mb=0) -> tuple[list[int], bool]: if g.index not in self._allocated and self._is_gpu_memory_available(g) and g.utilization_pct < self._utilization_threshold + and ( + self._platform not in PROCESS_EXCLUSIVE_PLATFORMS + or g.process_count == 0 + ) ] if len(available) < gpu_count: @@ -477,7 +576,14 @@ def allocate(self, gpu_count, memory_mb=0) -> tuple[list[int], bool]: if sys_res is not None and sys_res.available_memory_mb < memory_mb: return ([], False) - available.sort(key=operator.attrgetter("utilization_pct")) + if self._platform in PROCESS_EXCLUSIVE_PLATFORMS: + available.sort( + key=operator.attrgetter( + "utilization_pct", "memory_used_mb", "index" + ) + ) + else: + available.sort(key=operator.attrgetter("utilization_pct")) selected = [g.index for g in available[:gpu_count]] self._allocated.update(selected) return (selected, True) @@ -512,6 +618,8 @@ def get_status(self) -> dict: "memory_used_mb": g.memory_used_mb, "memory_total_mb": g.memory_total_mb, "utilization_pct": g.utilization_pct, + "process_count": g.process_count, + "process_pids": list(g.process_pids), "allocated_by_agent": g.index in allocated, } for g in gpus diff --git a/config.yml b/config.yml index b8417c1..c5ba2a2 100644 --- a/config.yml +++ b/config.yml @@ -137,7 +137,6 @@ platforms: docker_args: - "--runtime=runc" - "--privileged" - - "--device=/dev/davinci0" - "--device=/dev/davinci_manager" - "--device=/dev/devmm_svm" - "--device=/dev/hisi_hdc" diff --git a/tests/test_resource.py b/tests/test_resource.py index 2f1fd50..c056b62 100644 --- a/tests/test_resource.py +++ b/tests/test_resource.py @@ -15,6 +15,8 @@ def test_gpu_info_fields(): ) assert g.index == 0 assert g.memory_total_mb == 8000 + assert g.process_count == 0 + assert g.process_pids == () def test_system_resources_fields(): @@ -93,7 +95,6 @@ def test_detect_system_resources(monkeypatch, tmp_path): "MemAvailable: 20000000 kB\n" ) - _real_open = open def fake_open(path, **kw): @@ -401,13 +402,13 @@ class R: assert gpus[0].memory_total_mb == 32768.0 -def test_detect_gpus_ascend_ignores_process_table(monkeypatch): +def test_detect_gpus_ascend_marks_process_table_busy(monkeypatch): npu_output = ( "+---------------------------+---------------+-------------------------------+\n" "| 0 910B4 | OK | 86.5 41 |\n" - "| 0 | 0000:c1:00.0 | 0 0 / 0 32761 / 32768 |\n" + "| 0 | 0000:c1:00.0 | 0 0 / 0 64 / 32768 |\n" "| 1 910B4 | OK | 80.1 41 |\n" - "| 0 | 0000:c2:00.0 | 0 0 / 0 2867 / 32768 |\n" + "| 0 | 0000:c2:00.0 | 1 0 / 0 2867 / 32768 |\n" "+---------------------------+---------------+-------------------------------+\n" "| NPU Chip | Process id | Process name |\n" "| 0 0 | 183216 | python |\n" @@ -426,12 +427,95 @@ class R: pool = res.ResourcePool("ascend") gpus = pool.detect_gpus() assert [gpu.index for gpu in gpus] == [0, 1] + assert gpus[0].process_count == 1 + assert gpus[0].process_pids == (183216,) + assert gpus[1].process_count == 0 + assert gpus[1].process_pids == () selected, ok = pool.allocate(1) assert ok assert selected == [1] +def test_allocate_ascend_fails_when_all_npus_have_processes(monkeypatch): + pool = res.ResourcePool("ascend") + + monkeypatch.setattr( + pool, + "detect_gpus", + lambda: [ + res.GpuInfo(0, 64, 32768, 0, process_count=1, process_pids=(12345,)), + res.GpuInfo(1, 64, 32768, 0, process_count=1, process_pids=(23456,)), + ], + ) + + selected, ok = pool.allocate(1) + assert not ok + assert selected == [] + + +def test_detect_gpus_iluvatar_marks_process_table_busy(monkeypatch): + csv_output = "0, 64, 32768, 0\n1, 2867, 32768, 1\n" + ixsmi_output = ( + "+-----------------------------------------------------------------------------+\n" + "| Processes: |\n" + "| GPU PID Type Process name GPU Memory |\n" + "| 0 12345 C python 530 MiB |\n" + "+-----------------------------------------------------------------------------+\n" + ) + + def mock_run(cmd, **kwargs): + class R: + returncode = 0 + stdout = ixsmi_output if cmd == ["ixsmi"] else csv_output + + return R() + + monkeypatch.setattr("subprocess.run", mock_run) + + pool = res.ResourcePool("iluvatar") + gpus = pool.detect_gpus() + assert [gpu.index for gpu in gpus] == [0, 1] + assert gpus[0].process_count == 1 + assert gpus[0].process_pids == (12345,) + assert gpus[1].process_count == 0 + assert gpus[1].process_pids == () + + +def test_allocate_iluvatar_skips_gpus_with_processes(monkeypatch): + pool = res.ResourcePool("iluvatar") + + monkeypatch.setattr( + pool, + "detect_gpus", + lambda: [ + res.GpuInfo(0, 64, 32768, 0, process_count=1, process_pids=(12345,)), + res.GpuInfo(1, 2867, 32768, 1), + ], + ) + + selected, ok = pool.allocate(1) + assert ok + assert selected == [1] + + +def test_allocate_iluvatar_fails_when_all_gpus_have_processes(monkeypatch): + pool = res.ResourcePool("iluvatar") + + monkeypatch.setattr( + pool, + "detect_gpus", + lambda: [ + res.GpuInfo(0, 64, 32768, 0, process_count=1, process_pids=(12345,)), + res.GpuInfo(1, 64, 32768, 0, process_count=1, process_pids=(23456,)), + ], + ) + + selected, ok = pool.allocate(1) + assert not ok + assert selected == [] + + def test_detect_gpus_moore_gpu_list_json(monkeypatch): moore_output = """ { diff --git a/tests/test_run.py b/tests/test_run.py index 656c859..9b245cc 100644 --- a/tests/test_run.py +++ b/tests/test_run.py @@ -3,6 +3,7 @@ import pytest import run +from utils import load_config # --------------------------------------------------------------------------- @@ -291,6 +292,12 @@ def test_docker_args_ascend_selected_device(): assert "--device=/dev/davinci3:/dev/davinci0" in args +def test_config_ascend_does_not_pin_davinci0(): + config = load_config(Path("config.yml")) + docker_args = config["jobs"]["ascend_npu"].get("docker_args", []) + assert "--device=/dev/davinci0" not in docker_args + + def test_docker_args_metax_cuda_visible_devices(): args = _make_platform_args("metax") assert "CUDA_VISIBLE_DEVICES=0" in args diff --git a/tests/test_shadow_workflow.py b/tests/test_shadow_workflow.py index f68ab2c..1e477bf 100644 --- a/tests/test_shadow_workflow.py +++ b/tests/test_shadow_workflow.py @@ -41,14 +41,17 @@ def test_shadow_prepare_preflights_runner_availability_before_matrix_jobs_start( assert "Preflight self-hosted runner availability" in text assert "MATRIX_JSON: ${{ steps.generate.outputs.matrix_json_for_unittest }}" in text assert "CI_RUNNER_STATUS_TOKEN" in text - assert "CI_RUNNER_STATUS_TOKEN is not configured; skipping preflight runner availability check." in text + assert ( + "CI_RUNNER_STATUS_TOKEN is not configured; skipping preflight runner availability check." + in text + ) assert "Queued-job watchdog remains enabled as a fallback." in text assert "/actions/runners?per_page=100" in text assert "No online self-hosted runner before starting CI v2 jobs:" in text assert "job=run-unittest-shadow" in text -def test_shadow_workflow_fails_queued_jobs_after_ten_minutes(): +def test_shadow_workflow_fails_queued_jobs_after_thirty_minutes(): workflow = yaml.safe_load(WORKFLOW.read_text(encoding="utf-8")) jobs = workflow["jobs"] @@ -57,9 +60,12 @@ def test_shadow_workflow_fails_queued_jobs_after_ten_minutes(): assert watchdog["runs-on"] == "ubuntu-latest" step = watchdog["steps"][0] - assert step["env"]["QUEUE_TIMEOUT_SECONDS"] == 600 + assert step["env"]["QUEUE_TIMEOUT_SECONDS"] == 1800 assert step["env"]["POLL_INTERVAL_SECONDS"] == 15 - assert step["env"]["MATRIX_JSON"] == "${{ needs.prepare.outputs.matrix_json_for_unittest }}" + assert ( + step["env"]["MATRIX_JSON"] + == "${{ needs.prepare.outputs.matrix_json_for_unittest }}" + ) assert 'sleep "${QUEUE_TIMEOUT_SECONDS}"' not in step["run"] assert 'job.get("status") == "queued"' in step["run"] assert "/actions/runners?per_page=100" in step["run"] diff --git a/tests/test_workflow.py b/tests/test_workflow.py index e9da1e6..c6403bd 100644 --- a/tests/test_workflow.py +++ b/tests/test_workflow.py @@ -8,37 +8,44 @@ def test_nvidia_unit_runs_directly_without_scheduler(): text = WORKFLOW.read_text(encoding="utf-8") assert "Run local Unit Test directly" in text - assert "${{ matrix.platform == 'nvidia' || matrix.platform == 'iluvatar' || matrix.platform == 'ascend' }}" in text - assert "eval \"docker run ${DOCKER_ARGS}\"" in text + assert ( + "${{ matrix.platform == 'nvidia' || matrix.platform == 'iluvatar' || matrix.platform == 'ascend' }}" + in text + ) + assert 'eval "docker run ${DOCKER_ARGS}"' in text def test_scheduler_unit_step_skips_local_unit_platforms(): text = WORKFLOW.read_text(encoding="utf-8") assert "Trigger ${{ matrix.platform }} Unit Test Task" in text - assert "${{ matrix.platform != 'nvidia' && matrix.platform != 'iluvatar' && matrix.platform != 'ascend' }}" in text + assert ( + "${{ matrix.platform != 'nvidia' && matrix.platform != 'iluvatar' && matrix.platform != 'ascend' }}" + in text + ) def test_local_unit_platforms_use_resource_pool_for_auto_gpus(): text = WORKFLOW.read_text(encoding="utf-8") assert 'uses_local_runner = platform in {"nvidia", "iluvatar", "ascend"}' in text - assert 'if not gpu_id_override and not uses_local_runner:' in text + assert "if not gpu_id_override and not uses_local_runner:" in text assert 'gpu_id_override = "all"' in text assert 'if not gpu_id_override and raw_gpu_ids == "auto":' in text -def test_workflow_fails_queued_jobs_after_ten_minutes(): +def test_workflow_fails_queued_jobs_after_thirty_minutes(): text = WORKFLOW.read_text(encoding="utf-8") assert "queue-watchdog" in text - assert "Fail queued CI jobs after 10 minutes" in text + assert "Fail queued CI jobs after 30 minutes" in text assert "MATRIX_JSON: ${{ needs.prepare.outputs.matrix_json_for_unittest }}" in text + assert "QUEUE_TIMEOUT_SECONDS: 1800" in text assert "POLL_INTERVAL_SECONDS: 15" in text assert "/actions/runs/{run_id}/jobs?per_page=100" in text assert "/actions/runners?per_page=100" in text assert "CI queued jobs have no online self-hosted runner:" in text - assert "CI jobs still queued after 10 minutes:" in text + assert "CI jobs still queued after 30 minutes:" in text assert "All expected CI platform jobs completed." in text @@ -48,7 +55,10 @@ def test_prepare_preflights_runner_availability_before_matrix_jobs_start(): assert "Preflight self-hosted runner availability" in text assert "MATRIX_JSON: ${{ steps.generate.outputs.matrix_json_for_unittest }}" in text assert "CI_RUNNER_STATUS_TOKEN" in text - assert "CI_RUNNER_STATUS_TOKEN is not configured; skipping preflight runner availability check." in text + assert ( + "CI_RUNNER_STATUS_TOKEN is not configured; skipping preflight runner availability check." + in text + ) assert "Queued-job watchdog remains enabled as a fallback." in text assert "/actions/runners?per_page=100" in text assert "No online self-hosted runner before starting CI jobs:" in text