diff --git a/utils/process_result.py b/utils/process_result.py index 5fb059473..242414cd5 100644 --- a/utils/process_result.py +++ b/utils/process_result.py @@ -59,21 +59,42 @@ def get_required_env_vars(required_vars): is_multinode = os.environ.get('IS_MULTINODE', 'false').lower() == 'true' if is_multinode: - # TODO: Eventually will have to have a separate condition in here for multinode disagg and - # multinode agg. For now, just assume that multinode implies disagg. - - multinode_env = get_required_env_vars(['PREFILL_GPUS', 'DECODE_GPUS', 'PREFILL_NUM_WORKERS', 'PREFILL_TP', - 'PREFILL_EP', 'PREFILL_DP_ATTN', 'DECODE_NUM_WORKERS', 'DECODE_TP', 'DECODE_EP', 'DECODE_DP_ATTN']) - prefill_gpus = int(multinode_env['PREFILL_GPUS']) - decode_gpus = int(multinode_env['DECODE_GPUS']) - prefill_num_workers = int(multinode_env['PREFILL_NUM_WORKERS']) - prefill_tp = int(multinode_env['PREFILL_TP']) - prefill_ep = int(multinode_env['PREFILL_EP']) - prefill_dp_attn = multinode_env['PREFILL_DP_ATTN'] - decode_num_workers = int(multinode_env['DECODE_NUM_WORKERS']) - decode_tp = int(multinode_env['DECODE_TP']) - decode_ep = int(multinode_env['DECODE_EP']) - decode_dp_attn = multinode_env['DECODE_DP_ATTN'] + # Detect topology from the prefill/decode GPU split. A disaggregated run keeps + # prefill and decode in separate pools (decode GPUs > 0); an aggregated ("agg") + # run colocates them in one pool and so reports zero decode GPUs. Classify on + # that split rather than trusting the DISAGG env flag. + topo_env = get_required_env_vars(['PREFILL_GPUS', 'DECODE_GPUS']) + prefill_gpus = int(topo_env['PREFILL_GPUS']) + decode_gpus = int(topo_env['DECODE_GPUS']) + is_agg = decode_gpus == 0 + + if is_agg: + # Aggregated multinode has no decode pool, so the DECODE_* detail vars may be + # absent; read all detail vars tolerantly and default to zero/empty. + detail_env = { + 'PREFILL_NUM_WORKERS': os.environ.get('PREFILL_NUM_WORKERS', '0'), + 'PREFILL_TP': os.environ.get('PREFILL_TP', '0'), + 'PREFILL_EP': os.environ.get('PREFILL_EP', '0'), + 'PREFILL_DP_ATTN': os.environ.get('PREFILL_DP_ATTN', ''), + 'DECODE_NUM_WORKERS': os.environ.get('DECODE_NUM_WORKERS', '0'), + 'DECODE_TP': os.environ.get('DECODE_TP', '0'), + 'DECODE_EP': os.environ.get('DECODE_EP', '0'), + 'DECODE_DP_ATTN': os.environ.get('DECODE_DP_ATTN', ''), + } + else: + # Disaggregated multinode requires the full prefill+decode env contract. + detail_env = get_required_env_vars(['PREFILL_NUM_WORKERS', 'PREFILL_TP', 'PREFILL_EP', + 'PREFILL_DP_ATTN', 'DECODE_NUM_WORKERS', 'DECODE_TP', + 'DECODE_EP', 'DECODE_DP_ATTN']) + + prefill_num_workers = int(detail_env['PREFILL_NUM_WORKERS']) + prefill_tp = int(detail_env['PREFILL_TP']) + prefill_ep = int(detail_env['PREFILL_EP']) + prefill_dp_attn = detail_env['PREFILL_DP_ATTN'] + decode_num_workers = int(detail_env['DECODE_NUM_WORKERS']) + decode_tp = int(detail_env['DECODE_TP']) + decode_ep = int(detail_env['DECODE_EP']) + decode_dp_attn = detail_env['DECODE_DP_ATTN'] total_gpus = prefill_gpus + decode_gpus if total_gpus <= 0: @@ -87,6 +108,8 @@ def get_required_env_vars(required_vars): multi_node_data = { 'is_multinode': True, + # decode_gpus == 0 => aggregated, not disaggregated (overrides DISAGG env). + 'disagg': not is_agg, 'prefill_tp': prefill_tp, 'prefill_ep': prefill_ep, 'prefill_dp_attention': prefill_dp_attn, diff --git a/utils/test_process_result.py b/utils/test_process_result.py index 4037689ea..9e137de39 100644 --- a/utils/test_process_result.py +++ b/utils/test_process_result.py @@ -417,6 +417,39 @@ def test_multinode_aggregate_decode_fields_zero(self, tmp_path, multinode_env_va assert output_data["output_tput_per_gpu"] == pytest.approx(750.0) assert output_data["input_tput_per_gpu"] == pytest.approx(250.0) + def test_multinode_agg_detected_when_decode_gpus_zero(self, tmp_path, sample_benchmark_result, multinode_env_vars): + """decode_gpus == 0 marks an aggregated multinode run: it is classified as agg + (disagg=False, overriding the DISAGG env flag) and the DECODE_* detail vars are + no longer required.""" + env = multinode_env_vars.copy() + env["DISAGG"] = "true" # launcher mislabels it; topology is the source of truth + env["PREFILL_GPUS"] = "16" + env["DECODE_GPUS"] = "0" + env["PREFILL_TP"] = "16" + env["PREFILL_EP"] = "1" + env["PREFILL_NUM_WORKERS"] = "1" + env["PREFILL_DP_ATTN"] = "false" + # An agg run has no decode pool, so it does not export the DECODE_* detail vars. + for key in ("DECODE_NUM_WORKERS", "DECODE_TP", "DECODE_EP", "DECODE_DP_ATTN"): + env.pop(key, None) + + result = run_script(tmp_path, env, sample_benchmark_result) + assert result.returncode == 0, f"Script failed: {result.stderr}" + + output_data = json.loads(result.stdout) + # Detected as aggregated despite DISAGG=true in the env. + assert output_data["disagg"] is False + assert output_data["is_multinode"] is True + assert output_data["num_prefill_gpu"] == 16 + assert output_data["num_decode_gpu"] == 0 + assert output_data["decode_tp"] == 0 + assert output_data["decode_ep"] == 0 + assert output_data["decode_num_workers"] == 0 + # Cluster-wide per-GPU denominators (single colocated pool of 16 GPUs). + assert output_data["tput_per_gpu"] == pytest.approx(15000.5 / 16) + assert output_data["output_tput_per_gpu"] == pytest.approx(12000.0 / 16) + assert output_data["input_tput_per_gpu"] == pytest.approx((15000.5 - 12000.0) / 16) + def test_multinode_zero_total_gpus_fails(self, tmp_path, sample_benchmark_result, multinode_env_vars): """Invalid multinode metadata should fail before throughput division.""" env = multinode_env_vars.copy()