Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 38 additions & 15 deletions utils/process_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,21 +59,42 @@
is_multinode = os.environ.get('IS_MULTINODE', 'false').lower() == 'true'

if is_multinode:
# TODO: Eventually will have to have a separate condition in here for multinode disagg and
# multinode agg. For now, just assume that multinode implies disagg.

multinode_env = get_required_env_vars(['PREFILL_GPUS', 'DECODE_GPUS', 'PREFILL_NUM_WORKERS', 'PREFILL_TP',
'PREFILL_EP', 'PREFILL_DP_ATTN', 'DECODE_NUM_WORKERS', 'DECODE_TP', 'DECODE_EP', 'DECODE_DP_ATTN'])
prefill_gpus = int(multinode_env['PREFILL_GPUS'])
decode_gpus = int(multinode_env['DECODE_GPUS'])
prefill_num_workers = int(multinode_env['PREFILL_NUM_WORKERS'])
prefill_tp = int(multinode_env['PREFILL_TP'])
prefill_ep = int(multinode_env['PREFILL_EP'])
prefill_dp_attn = multinode_env['PREFILL_DP_ATTN']
decode_num_workers = int(multinode_env['DECODE_NUM_WORKERS'])
decode_tp = int(multinode_env['DECODE_TP'])
decode_ep = int(multinode_env['DECODE_EP'])
decode_dp_attn = multinode_env['DECODE_DP_ATTN']
# Detect topology from the prefill/decode GPU split. A disaggregated run keeps
# prefill and decode in separate pools (decode GPUs > 0); an aggregated ("agg")
# run colocates them in one pool and so reports zero decode GPUs. Classify on
# that split rather than trusting the DISAGG env flag.
topo_env = get_required_env_vars(['PREFILL_GPUS', 'DECODE_GPUS'])
prefill_gpus = int(topo_env['PREFILL_GPUS'])
decode_gpus = int(topo_env['DECODE_GPUS'])
is_agg = decode_gpus == 0

if is_agg:
# Aggregated multinode has no decode pool, so the DECODE_* detail vars may be
# absent; read all detail vars tolerantly and default to zero/empty.
detail_env = {
'PREFILL_NUM_WORKERS': os.environ.get('PREFILL_NUM_WORKERS', '0'),
'PREFILL_TP': os.environ.get('PREFILL_TP', '0'),
'PREFILL_EP': os.environ.get('PREFILL_EP', '0'),
'PREFILL_DP_ATTN': os.environ.get('PREFILL_DP_ATTN', ''),
'DECODE_NUM_WORKERS': os.environ.get('DECODE_NUM_WORKERS', '0'),
'DECODE_TP': os.environ.get('DECODE_TP', '0'),
'DECODE_EP': os.environ.get('DECODE_EP', '0'),
'DECODE_DP_ATTN': os.environ.get('DECODE_DP_ATTN', ''),
}

Check failure on line 83 in utils/process_result.py

View check run for this annotation

Claude / Claude Code Review

PREFILL_* env vars silently default to 0 in agg multinode case

In the new `is_agg` branch, all eight detail env vars are read tolerantly with `os.environ.get(..., '0'/'')`, including `PREFILL_NUM_WORKERS/TP/EP/DP_ATTN`. This contradicts both the PR description ("the DECODE_* detail vars are read tolerantly") and the inline comment one line above ("the DECODE_* detail vars may be absent"). An agg multinode run still has a colocated prefill pool (line 105 enforces `prefill_gpus > 0`), so the PREFILL_* topology vars should remain strictly required — otherwise
Comment on lines +74 to +83

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 In the new is_agg branch, all eight detail env vars are read tolerantly with os.environ.get(..., '0'/''), including PREFILL_NUM_WORKERS/TP/EP/DP_ATTN. This contradicts both the PR description ("the DECODE_* detail vars are read tolerantly") and the inline comment one line above ("the DECODE_* detail vars may be absent"). An agg multinode run still has a colocated prefill pool (line 105 enforces prefill_gpus > 0), so the PREFILL_* topology vars should remain strictly required — otherwise a launcher that forgets to export PREFILL_TP will silently emit prefill_tp=0 into the agg JSON consumed by downstream ETL, masking a real misconfiguration. Fix: in the is_agg branch, call get_required_env_vars(['PREFILL_NUM_WORKERS','PREFILL_TP','PREFILL_EP','PREFILL_DP_ATTN']) and only default the DECODE_* vars tolerantly.

Extended reasoning...

What the bug is

The new is_agg branch in utils/process_result.py (lines 74-83) reads all eight detail env vars tolerantly:

detail_env = {
    'PREFILL_NUM_WORKERS': os.environ.get('PREFILL_NUM_WORKERS', '0'),
    'PREFILL_TP':          os.environ.get('PREFILL_TP', '0'),
    'PREFILL_EP':          os.environ.get('PREFILL_EP', '0'),
    'PREFILL_DP_ATTN':     os.environ.get('PREFILL_DP_ATTN', ''),
    'DECODE_NUM_WORKERS':  os.environ.get('DECODE_NUM_WORKERS', '0'),
    ...
}

This is broader than what the PR description and the inline comment justify. Both state the intent is only to tolerate the DECODE_* vars:

  • PR description: "for the agg case, the DECODE_ detail vars are read tolerantly (default 0/empty) instead of being hard-required — an agg run has no decode pool and may not export them."*
  • Inline comment (line 72-73): "Aggregated multinode has no decode pool, so the DECODE_ detail vars may be absent."*

Why the existing code does not prevent it

The agg branch only enforces topology (PREFILL_GPUS, DECODE_GPUS) via get_required_env_vars. Lines 105-106 then enforce prefill_gpus > 0 — so by the time we use prefill_tp/prefill_ep/etc., we know there is a prefill pool whose parallelism is topologically meaningful. Yet the PREFILL_* detail vars describing that pool are silently defaulted to 0/empty if missing.

The new test test_multinode_agg_detected_when_decode_gpus_zero only pop()s the DECODE_* keys (lines 432-433) and keeps all PREFILL_* keys set, so it neither verifies the strict-PREFILL contract nor exercises the over-broad tolerance.

Impact

For an agg run that forgets to export PREFILL_TP (or any other PREFILL_* detail var), the script produces an agg JSON with prefill_tp=0 (or empty prefill_dp_attention) instead of failing fast like the disagg branch would. Since the agg JSON is consumed by downstream ETL, a 0 silently passes through as valid data — a data-quality regression introduced by this PR.

Step-by-step proof

Launcher misconfiguration scenario for an aggregated multinode run:

  1. IS_MULTINODE=true, PREFILL_GPUS=16, DECODE_GPUS=0 → exported correctly.
  2. PREFILL_TP is forgotten by the launcher (not exported).
  3. PREFILL_EP, PREFILL_NUM_WORKERS, PREFILL_DP_ATTN are exported correctly.
  4. Script reaches the multinode block. is_agg = (decode_gpus == 0) → True.
  5. The is_agg dict at lines 74-83 reads 'PREFILL_TP': os.environ.get('PREFILL_TP', '0')'0'.
  6. prefill_tp = int(detail_env['PREFILL_TP'])0.
  7. multi_node_data['prefill_tp'] = 0 written to agg_*.json.
  8. ETL ingests prefill_tp=0 for a 16-GPU prefill pool — silently wrong; no error.

Compare to the disagg branch (lines 87-90): the same forgotten PREFILL_TP would have raised EnvironmentError: Missing required environment variables: PREFILL_TP, failing the run cleanly.

Fix

In the is_agg branch, keep PREFILL_* strict and only relax DECODE_*:

if is_agg:
    prefill_env = get_required_env_vars([
        'PREFILL_NUM_WORKERS', 'PREFILL_TP', 'PREFILL_EP', 'PREFILL_DP_ATTN',
    ])
    detail_env = {
        **prefill_env,
        'DECODE_NUM_WORKERS': os.environ.get('DECODE_NUM_WORKERS', '0'),
        'DECODE_TP':          os.environ.get('DECODE_TP', '0'),
        'DECODE_EP':          os.environ.get('DECODE_EP', '0'),
        'DECODE_DP_ATTN':     os.environ.get('DECODE_DP_ATTN', ''),
    }

This matches the PR's stated contract, matches the inline comment, and matches what the existing agg test already exercises (it sets all PREFILL_* and only omits DECODE_*).

else:
# Disaggregated multinode requires the full prefill+decode env contract.
detail_env = get_required_env_vars(['PREFILL_NUM_WORKERS', 'PREFILL_TP', 'PREFILL_EP',
'PREFILL_DP_ATTN', 'DECODE_NUM_WORKERS', 'DECODE_TP',
'DECODE_EP', 'DECODE_DP_ATTN'])

prefill_num_workers = int(detail_env['PREFILL_NUM_WORKERS'])
prefill_tp = int(detail_env['PREFILL_TP'])
prefill_ep = int(detail_env['PREFILL_EP'])
prefill_dp_attn = detail_env['PREFILL_DP_ATTN']
decode_num_workers = int(detail_env['DECODE_NUM_WORKERS'])
decode_tp = int(detail_env['DECODE_TP'])
decode_ep = int(detail_env['DECODE_EP'])
decode_dp_attn = detail_env['DECODE_DP_ATTN']

total_gpus = prefill_gpus + decode_gpus
if total_gpus <= 0:
Expand All @@ -87,6 +108,8 @@

multi_node_data = {
'is_multinode': True,
# decode_gpus == 0 => aggregated, not disaggregated (overrides DISAGG env).
'disagg': not is_agg,
'prefill_tp': prefill_tp,
'prefill_ep': prefill_ep,
'prefill_dp_attention': prefill_dp_attn,
Expand Down
33 changes: 33 additions & 0 deletions utils/test_process_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,39 @@ def test_multinode_aggregate_decode_fields_zero(self, tmp_path, multinode_env_va
assert output_data["output_tput_per_gpu"] == pytest.approx(750.0)
assert output_data["input_tput_per_gpu"] == pytest.approx(250.0)

def test_multinode_agg_detected_when_decode_gpus_zero(self, tmp_path, sample_benchmark_result, multinode_env_vars):
"""decode_gpus == 0 marks an aggregated multinode run: it is classified as agg
(disagg=False, overriding the DISAGG env flag) and the DECODE_* detail vars are
no longer required."""
env = multinode_env_vars.copy()
env["DISAGG"] = "true" # launcher mislabels it; topology is the source of truth
env["PREFILL_GPUS"] = "16"
env["DECODE_GPUS"] = "0"
env["PREFILL_TP"] = "16"
env["PREFILL_EP"] = "1"
env["PREFILL_NUM_WORKERS"] = "1"
env["PREFILL_DP_ATTN"] = "false"
# An agg run has no decode pool, so it does not export the DECODE_* detail vars.
for key in ("DECODE_NUM_WORKERS", "DECODE_TP", "DECODE_EP", "DECODE_DP_ATTN"):
env.pop(key, None)

result = run_script(tmp_path, env, sample_benchmark_result)
assert result.returncode == 0, f"Script failed: {result.stderr}"

output_data = json.loads(result.stdout)
# Detected as aggregated despite DISAGG=true in the env.
assert output_data["disagg"] is False
assert output_data["is_multinode"] is True
assert output_data["num_prefill_gpu"] == 16
assert output_data["num_decode_gpu"] == 0
assert output_data["decode_tp"] == 0
assert output_data["decode_ep"] == 0
assert output_data["decode_num_workers"] == 0
# Cluster-wide per-GPU denominators (single colocated pool of 16 GPUs).
assert output_data["tput_per_gpu"] == pytest.approx(15000.5 / 16)
assert output_data["output_tput_per_gpu"] == pytest.approx(12000.0 / 16)
assert output_data["input_tput_per_gpu"] == pytest.approx((15000.5 - 12000.0) / 16)

def test_multinode_zero_total_gpus_fails(self, tmp_path, sample_benchmark_result, multinode_env_vars):
"""Invalid multinode metadata should fail before throughput division."""
env = multinode_env_vars.copy()
Expand Down