From 5f9c0373a3f7cfe26ccab56b60e38355a1a5a62f Mon Sep 17 00:00:00 2001 From: wangyafeng Date: Mon, 18 May 2026 16:43:34 +0800 Subject: [PATCH] [Feature]console metrics log for pd disaggregation --- fastdeploy/engine/common_engine.py | 1 + .../engine/sched/resource_manager_v1.py | 29 +++++++---- .../engine/sched/scheduler_metrics_logger.py | 52 +++++++++++++++++-- tests/engine/test_scheduler_metrics_logger.py | 34 +++++++++++- tests/v1/test_resource_manager_v1.py | 23 ++++++++ 5 files changed, 125 insertions(+), 14 deletions(-) diff --git a/fastdeploy/engine/common_engine.py b/fastdeploy/engine/common_engine.py index f553a4f8ee5..9aaed231370 100644 --- a/fastdeploy/engine/common_engine.py +++ b/fastdeploy/engine/common_engine.py @@ -196,6 +196,7 @@ def __init__(self, cfg: FDConfig, start_queue=True, use_async_llm=False): self.scheduler_metrics_logger = SchedulerMetricsLogger( enabled=True, dp_rank=self.cfg.parallel_config.local_data_parallel_id, + splitwise_role=self.cfg.scheduler_config.splitwise_role, ) self.resource_manager.scheduler_metrics_logger = self.scheduler_metrics_logger self.token_processor.set_scheduler_metrics_logger(self.scheduler_metrics_logger) diff --git a/fastdeploy/engine/sched/resource_manager_v1.py b/fastdeploy/engine/sched/resource_manager_v1.py index de89ab3adca..2c63d7b70df 100644 --- a/fastdeploy/engine/sched/resource_manager_v1.py +++ b/fastdeploy/engine/sched/resource_manager_v1.py @@ -1759,15 +1759,26 @@ def _log_console_scheduler_metrics(self, scheduled_reqs: list[Request | Schedule prefill_reqs = [r for r in scheduled_reqs if isinstance(r, Request) and r.task_type == RequestType.PREFILL] has_decode = any(getattr(r, "task_type", None) == RequestType.DECODE for r in scheduled_reqs) - self.scheduler_metrics_logger.log_prefill_batch( - prefill_reqs=prefill_reqs, - running_cnt=running_cnt, - queue_cnt=queue_cnt, - tokens_used=tokens_used, - token_usage=token_usage, - free_blocks=free_blocks, - evictable_blocks=evictable_blocks, - ) + if self.config.scheduler_config.splitwise_role == "decode": + self.scheduler_metrics_logger.log_decode_bootstrap_batch( + prefill_reqs=prefill_reqs, + running_cnt=running_cnt, + queue_cnt=queue_cnt, + tokens_used=tokens_used, + token_usage=token_usage, + free_blocks=free_blocks, + evictable_blocks=evictable_blocks, + ) + else: + self.scheduler_metrics_logger.log_prefill_batch( + prefill_reqs=prefill_reqs, + running_cnt=running_cnt, + queue_cnt=queue_cnt, + tokens_used=tokens_used, + token_usage=token_usage, + free_blocks=free_blocks, + evictable_blocks=evictable_blocks, + ) if has_decode: has_prefill = len(prefill_reqs) > 0 graph_opt_cfg = self.config.graph_opt_config diff --git a/fastdeploy/engine/sched/scheduler_metrics_logger.py b/fastdeploy/engine/sched/scheduler_metrics_logger.py index 0aaa29e246c..b989584fe07 100644 --- a/fastdeploy/engine/sched/scheduler_metrics_logger.py +++ b/fastdeploy/engine/sched/scheduler_metrics_logger.py @@ -29,9 +29,10 @@ class SchedulerMetricsLogger: DEFAULT_DECODE_LOG_INTERVAL = 5 - def __init__(self, enabled: bool = True, dp_rank: int = 0) -> None: + def __init__(self, enabled: bool = True, dp_rank: int = 0, splitwise_role: str = "mixed") -> None: self.enabled = enabled self.dp_rank = dp_rank + self.splitwise_role = splitwise_role decode_log_interval = envs.FD_CONSOLE_DECODE_LOG_INTERVAL if decode_log_interval <= 0: decode_log_interval = self.DEFAULT_DECODE_LOG_INTERVAL @@ -65,8 +66,9 @@ def on_decode_tokens(self, num_tokens: int) -> None: with self._lock: self._decode_tokens_since_last += num_tokens - def log_prefill_batch( + def _log_prefill_like_batch( self, + batch_name: str, prefill_reqs: Iterable, running_cnt: int, queue_cnt: int, @@ -91,8 +93,9 @@ def log_prefill_batch( cached_tokens += getattr(req, "num_cached_tokens", 0) or 0 msg = ( - "Prefill batch, " + f"{batch_name}, " f"dp_rank: {self.dp_rank}, " + f"splitwise_role: {self.splitwise_role}, " f"#new-seq: {len(prefill_reqs)}, " f"#new-token: {new_tokens}, " f"#cached-token: {cached_tokens}, " @@ -104,6 +107,48 @@ def log_prefill_batch( ) self._logger.info(msg) + def log_prefill_batch( + self, + prefill_reqs: Iterable, + running_cnt: int, + queue_cnt: int, + tokens_used: int, + token_usage: float, + free_blocks: int = 0, + evictable_blocks: int = 0, + ) -> None: + self._log_prefill_like_batch( + batch_name="Prefill batch", + prefill_reqs=prefill_reqs, + running_cnt=running_cnt, + queue_cnt=queue_cnt, + tokens_used=tokens_used, + token_usage=token_usage, + free_blocks=free_blocks, + evictable_blocks=evictable_blocks, + ) + + def log_decode_bootstrap_batch( + self, + prefill_reqs: Iterable, + running_cnt: int, + queue_cnt: int, + tokens_used: int, + token_usage: float, + free_blocks: int = 0, + evictable_blocks: int = 0, + ) -> None: + self._log_prefill_like_batch( + batch_name="Decode bootstrap batch from prefill", + prefill_reqs=prefill_reqs, + running_cnt=running_cnt, + queue_cnt=queue_cnt, + tokens_used=tokens_used, + token_usage=token_usage, + free_blocks=free_blocks, + evictable_blocks=evictable_blocks, + ) + def log_decode_batch( self, running_cnt: int, @@ -132,6 +177,7 @@ def log_decode_batch( msg = ( "Decode batch, " f"dp_rank: {self.dp_rank}, " + f"splitwise_role: {self.splitwise_role}, " f"#running-req: {running_cnt}, " f"#token: {tokens_used}, " f"token usage: {token_usage:.2f}, " diff --git a/tests/engine/test_scheduler_metrics_logger.py b/tests/engine/test_scheduler_metrics_logger.py index c1305a3daa6..cab38350c49 100644 --- a/tests/engine/test_scheduler_metrics_logger.py +++ b/tests/engine/test_scheduler_metrics_logger.py @@ -32,7 +32,7 @@ def test_on_decode_tokens_accumulates(): def test_log_prefill_batch_logs_expected_message(): - logger = SchedulerMetricsLogger(enabled=True, dp_rank=2) + logger = SchedulerMetricsLogger(enabled=True, dp_rank=2, splitwise_role="prefill") logger._logger = mock.Mock() reqs = [ @@ -46,6 +46,7 @@ def test_log_prefill_batch_logs_expected_message(): message = logger._logger.info.call_args[0][0] assert "Prefill batch" in message assert "dp_rank: 2" in message + assert "splitwise_role: prefill" in message assert "#new-seq: 2" in message assert "#new-token: 4" in message assert "#cached-token: 3" in message @@ -54,8 +55,31 @@ def test_log_prefill_batch_logs_expected_message(): assert "#queue-req: 6" in message +def test_log_decode_bootstrap_batch_logs_expected_message(): + logger = SchedulerMetricsLogger(enabled=True, dp_rank=0, splitwise_role="decode") + logger._logger = mock.Mock() + + reqs = [types.SimpleNamespace(prefill_start_index=4, prefill_end_index=5, num_cached_tokens=4)] + + logger.log_decode_bootstrap_batch( + prefill_reqs=reqs, + running_cnt=1, + queue_cnt=0, + tokens_used=5, + token_usage=0.25, + ) + + logger._logger.info.assert_called_once() + message = logger._logger.info.call_args[0][0] + assert "Decode bootstrap batch" in message + assert "splitwise_role: decode" in message + assert "#new-seq: 1" in message + assert "#new-token: 1" in message + assert "#cached-token: 4" in message + + def test_log_decode_batch_computes_throughput(monkeypatch): - logger = SchedulerMetricsLogger(enabled=True, dp_rank=1) + logger = SchedulerMetricsLogger(enabled=True, dp_rank=1, splitwise_role="decode") logger._logger = mock.Mock() logger._decode_batch_count = logger._decode_log_interval - 1 logger._decode_tokens_since_last = 10 @@ -69,6 +93,7 @@ def test_log_decode_batch_computes_throughput(monkeypatch): message = logger._logger.info.call_args[0][0] assert "Decode batch" in message assert "dp_rank: 1" in message + assert "splitwise_role: decode" in message assert "gen throughput (token/s): 5.00" in message assert "#queue-req: 7" in message assert logger._decode_tokens_since_last == 0 @@ -99,3 +124,8 @@ def test_decode_log_interval_non_positive_falls_back_to_default(monkeypatch): monkeypatch.setenv("FD_CONSOLE_DECODE_LOG_INTERVAL", "0") logger = SchedulerMetricsLogger(enabled=True, dp_rank=0) assert logger._decode_log_interval == SchedulerMetricsLogger.DEFAULT_DECODE_LOG_INTERVAL + + +def test_default_splitwise_role_is_mixed(): + logger = SchedulerMetricsLogger(enabled=True, dp_rank=0) + assert logger.splitwise_role == "mixed" diff --git a/tests/v1/test_resource_manager_v1.py b/tests/v1/test_resource_manager_v1.py index d9ab6a59dbc..07e2ab76bb7 100644 --- a/tests/v1/test_resource_manager_v1.py +++ b/tests/v1/test_resource_manager_v1.py @@ -27,6 +27,7 @@ if not hasattr(paddle, "enable_compat"): paddle.enable_compat = lambda scope=None: None +from fastdeploy import envs from fastdeploy.config import CacheConfig, FDConfig, ParallelConfig, SchedulerConfig from fastdeploy.engine.args_utils import EngineArgs from fastdeploy.engine.request import ( @@ -36,6 +37,7 @@ RequestMetrics, RequestOutput, RequestStatus, + RequestType, ) from fastdeploy.engine.sched.resource_manager_v1 import ( ResourceManagerV1, @@ -569,6 +571,27 @@ def test_preallocate_resource_in_p_and_d(self): self.assertEqual(request_d.num_computed_tokens, request_d.need_prefill_tokens) self.assertEqual(request_d.disaggregate_info["block_tables"], [4, 5]) + def test_decode_role_prefill_task_logs_decode_bootstrap_batch(self): + manager = _build_manager(splitwise_role="decode", enable_prefix_caching=False) + _register_manager_cleanup(self, manager) + manager.cache_manager = MagicMock() + manager.cache_manager.num_gpu_blocks = 8 + manager.cache_manager.gpu_free_block_list = [0, 1, 2, 3] + manager.scheduler_metrics_logger = MagicMock() + + request = _make_request(prompt_token_ids=[1, 2, 3, 4]) + request.task_type = RequestType.PREFILL + request.prefill_start_index = 4 + request.prefill_end_index = 5 + batch_request = BatchRequest() + batch_request.add_request(request) + + with patch.object(envs, "FD_CONSOLE_SCHEDULER_METRICS", True): + manager._log_console_scheduler_metrics(batch_request) + + manager.scheduler_metrics_logger.log_decode_bootstrap_batch.assert_called_once() + manager.scheduler_metrics_logger.log_prefill_batch.assert_not_called() + def test_prefilled_request_flow_and_resource_check(self): manager = _build_manager(splitwise_role="decode", speculative_method="mtp") _register_manager_cleanup(self, manager)