Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/infiniops-ci-v2-shadow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -403,17 +403,17 @@ jobs:
fi

queue-watchdog:
name: Fail queued CI v2 jobs after 10 minutes
name: Fail queued CI v2 jobs after 30 minutes
needs: prepare
runs-on: ubuntu-latest
steps:
- name: Fail queued CI v2 jobs after 10 minutes
- name: Fail queued CI v2 jobs after 30 minutes
env:
GH_TOKEN: ${{ github.token }}
REPOSITORY: ${{ github.repository }}
RUN_ID: ${{ github.run_id }}
MATRIX_JSON: ${{ needs.prepare.outputs.matrix_json_for_unittest }}
QUEUE_TIMEOUT_SECONDS: 600
QUEUE_TIMEOUT_SECONDS: 1800
POLL_INTERVAL_SECONDS: 15
run: |
set -euo pipefail
Expand Down Expand Up @@ -568,7 +568,7 @@ jobs:

if time.monotonic() >= deadline:
if queued:
print("CI v2 jobs still queued after 10 minutes:", file=sys.stderr)
print("CI v2 jobs still queued after 30 minutes:", file=sys.stderr)
for job in queued:
print(f"- {job.get('name')} {job.get('html_url')}", file=sys.stderr)
sys.exit(1)
Expand Down
8 changes: 4 additions & 4 deletions .github/workflows/infiniops-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -468,18 +468,18 @@ jobs:
"${IMAGE_TAG}"

queue-watchdog:
name: Fail queued CI jobs after 10 minutes
name: Fail queued CI jobs after 30 minutes
needs: prepare
if: contains(fromJSON(needs.prepare.outputs.job_types_with_jobs), 'unittest')
runs-on: ubuntu-latest
steps:
- name: Fail queued CI jobs after 10 minutes
- name: Fail queued CI jobs after 30 minutes
env:
GH_TOKEN: ${{ github.token }}
REPOSITORY: ${{ github.repository }}
RUN_ID: ${{ github.run_id }}
MATRIX_JSON: ${{ needs.prepare.outputs.matrix_json_for_unittest }}
QUEUE_TIMEOUT_SECONDS: 600
QUEUE_TIMEOUT_SECONDS: 1800
POLL_INTERVAL_SECONDS: 15
run: |
set -euo pipefail
Expand Down Expand Up @@ -632,7 +632,7 @@ jobs:

if time.monotonic() >= deadline:
if queued:
print("CI jobs still queued after 10 minutes:", file=sys.stderr)
print("CI jobs still queued after 30 minutes:", file=sys.stderr)
for job in queued:
print(f"- {job.get('name')} {job.get('html_url')}", file=sys.stderr)
sys.exit(1)
Expand Down
114 changes: 111 additions & 3 deletions ci_resource.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#!/usr/bin/env python3
"""Resource detection and allocation for CI Runner Agent."""

from __future__ import annotations

import json
Expand Down Expand Up @@ -27,13 +28,17 @@
"ascend": "ASCEND_VISIBLE_DEVICES",
}

PROCESS_EXCLUSIVE_PLATFORMS = {"ascend", "iluvatar"}


@dataclass
class GpuInfo:
index: int
memory_used_mb: float
memory_total_mb: float
utilization_pct: float
process_count: int = 0
process_pids: tuple[int, ...] = ()


@dataclass
Expand Down Expand Up @@ -85,11 +90,17 @@ def detect_gpus(self) -> list[GpuInfo]:
if self._platform == "cambricon":
return self._detect_gpus_cambricon()

if self._platform == "iluvatar":
return self._detect_gpus_iluvatar()

if self._platform == "ascend":
return self._detect_gpus_ascend()

tool = self.GPU_QUERY_TOOLS.get(self._platform)

return self._detect_gpus_csv(tool)

def _detect_gpus_csv(self, tool) -> list[GpuInfo]:
if not tool:
return []

Expand Down Expand Up @@ -132,6 +143,66 @@ def detect_gpus(self) -> list[GpuInfo]:

return gpus

def _detect_gpus_iluvatar(self) -> list[GpuInfo]:
tool = self.GPU_QUERY_TOOLS.get("iluvatar")
if not tool:
return []

gpus = self._detect_gpus_csv(tool)

if not gpus:
return []

try:
raw_result = subprocess.run(
[tool],
capture_output=True,
text=True,
timeout=10,
)
except (FileNotFoundError, subprocess.TimeoutExpired):
return gpus

if raw_result.returncode != 0:
return gpus

process_pids: dict[int, list[int]] = {}
in_process_table = False

for line in raw_result.stdout.splitlines():
if "Processes:" in line:
in_process_table = True
continue

if not in_process_table:
continue

content = line.strip().strip("|").strip()
tokens = content.split()

if len(tokens) < 2 or not tokens[0].isdigit() or not tokens[1].isdigit():
continue

try:
gpu_index = int(tokens[0])
pid = int(tokens[1])
except ValueError:
continue

process_pids.setdefault(gpu_index, []).append(pid)

return [
GpuInfo(
index=g.index,
memory_used_mb=g.memory_used_mb,
memory_total_mb=g.memory_total_mb,
utilization_pct=g.utilization_pct,
process_count=len(process_pids.get(g.index, [])),
process_pids=tuple(process_pids.get(g.index, ())),
)
for g in gpus
]

def _detect_gpus_metax(self) -> list[GpuInfo]:
"""Parse mx-smi output for MetaX GPUs.

Expand Down Expand Up @@ -368,6 +439,22 @@ def _detect_gpus_ascend(self) -> list[GpuInfo]:

gpus = []
lines = result.stdout.splitlines()
process_pids: dict[int, list[int]] = {}

for line in lines:
process_m = re.match(r"^\|\s*(\d+)\s+\d+\s*\|\s*(\d+)\s*\|", line)

if not process_m:
continue

try:
npu_index = int(process_m.group(1))
pid = int(process_m.group(2))
except ValueError:
continue

process_pids.setdefault(npu_index, []).append(pid)

i = 0

while i < len(lines):
Expand All @@ -377,7 +464,7 @@ def _detect_gpus_ascend(self) -> list[GpuInfo]:

m1 = re.match(r"^\|\s+(\d+)\s+", line)

if m1 and i + 1 < len(lines):
if m1 and i + 1 < len(lines) and re.search(r"\b(910|310)\w*\b", line):
try:
npu_index = int(m1.group(1))
row2 = lines[i + 1]
Expand All @@ -402,6 +489,8 @@ def _detect_gpus_ascend(self) -> list[GpuInfo]:
memory_used_mb=used_mb,
memory_total_mb=total_mb,
utilization_pct=util_pct,
process_count=len(process_pids.get(npu_index, [])),
process_pids=tuple(process_pids.get(npu_index, ())),
)
)
except (ValueError, AttributeError):
Expand Down Expand Up @@ -439,7 +528,13 @@ def get_free_gpus(self) -> list[int]:
"""Return GPU indices with utilization below threshold."""
gpus = self.detect_gpus()
return [
g.index for g in gpus if g.utilization_pct < self._utilization_threshold
g.index
for g in gpus
if g.utilization_pct < self._utilization_threshold
and (
self._platform not in PROCESS_EXCLUSIVE_PLATFORMS
or g.process_count == 0
)
]

def allocate(self, gpu_count, memory_mb=0) -> tuple[list[int], bool]:
Expand Down Expand Up @@ -469,6 +564,10 @@ def allocate(self, gpu_count, memory_mb=0) -> tuple[list[int], bool]:
if g.index not in self._allocated
and self._is_gpu_memory_available(g)
and g.utilization_pct < self._utilization_threshold
and (
self._platform not in PROCESS_EXCLUSIVE_PLATFORMS
or g.process_count == 0
)
]

if len(available) < gpu_count:
Expand All @@ -477,7 +576,14 @@ def allocate(self, gpu_count, memory_mb=0) -> tuple[list[int], bool]:
if sys_res is not None and sys_res.available_memory_mb < memory_mb:
return ([], False)

available.sort(key=operator.attrgetter("utilization_pct"))
if self._platform in PROCESS_EXCLUSIVE_PLATFORMS:
available.sort(
key=operator.attrgetter(
"utilization_pct", "memory_used_mb", "index"
)
)
else:
available.sort(key=operator.attrgetter("utilization_pct"))
selected = [g.index for g in available[:gpu_count]]
self._allocated.update(selected)
return (selected, True)
Expand Down Expand Up @@ -512,6 +618,8 @@ def get_status(self) -> dict:
"memory_used_mb": g.memory_used_mb,
"memory_total_mb": g.memory_total_mb,
"utilization_pct": g.utilization_pct,
"process_count": g.process_count,
"process_pids": list(g.process_pids),
"allocated_by_agent": g.index in allocated,
}
for g in gpus
Expand Down
1 change: 0 additions & 1 deletion config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ platforms:
docker_args:
- "--runtime=runc"
- "--privileged"
- "--device=/dev/davinci0"
- "--device=/dev/davinci_manager"
- "--device=/dev/devmm_svm"
- "--device=/dev/hisi_hdc"
Expand Down
Loading