Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions fastdeploy/engine/common_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ def __init__(self, cfg: FDConfig, start_queue=True, use_async_llm=False):
self.scheduler_metrics_logger = SchedulerMetricsLogger(
enabled=True,
dp_rank=self.cfg.parallel_config.local_data_parallel_id,
splitwise_role=self.cfg.scheduler_config.splitwise_role,
)
self.resource_manager.scheduler_metrics_logger = self.scheduler_metrics_logger
self.token_processor.set_scheduler_metrics_logger(self.scheduler_metrics_logger)
Expand Down
29 changes: 20 additions & 9 deletions fastdeploy/engine/sched/resource_manager_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -1759,15 +1759,26 @@ def _log_console_scheduler_metrics(self, scheduled_reqs: list[Request | Schedule
prefill_reqs = [r for r in scheduled_reqs if isinstance(r, Request) and r.task_type == RequestType.PREFILL]
has_decode = any(getattr(r, "task_type", None) == RequestType.DECODE for r in scheduled_reqs)

self.scheduler_metrics_logger.log_prefill_batch(
prefill_reqs=prefill_reqs,
running_cnt=running_cnt,
queue_cnt=queue_cnt,
tokens_used=tokens_used,
token_usage=token_usage,
free_blocks=free_blocks,
evictable_blocks=evictable_blocks,
)
if self.config.scheduler_config.splitwise_role == "decode":
self.scheduler_metrics_logger.log_decode_bootstrap_batch(
prefill_reqs=prefill_reqs,
running_cnt=running_cnt,
queue_cnt=queue_cnt,
tokens_used=tokens_used,
token_usage=token_usage,
free_blocks=free_blocks,
evictable_blocks=evictable_blocks,
)
else:
self.scheduler_metrics_logger.log_prefill_batch(
prefill_reqs=prefill_reqs,
running_cnt=running_cnt,
queue_cnt=queue_cnt,
tokens_used=tokens_used,
token_usage=token_usage,
free_blocks=free_blocks,
evictable_blocks=evictable_blocks,
)
if has_decode:
has_prefill = len(prefill_reqs) > 0
graph_opt_cfg = self.config.graph_opt_config
Expand Down
52 changes: 49 additions & 3 deletions fastdeploy/engine/sched/scheduler_metrics_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ class SchedulerMetricsLogger:

DEFAULT_DECODE_LOG_INTERVAL = 5

def __init__(self, enabled: bool = True, dp_rank: int = 0) -> None:
def __init__(self, enabled: bool = True, dp_rank: int = 0, splitwise_role: str = "mixed") -> None:
self.enabled = enabled
self.dp_rank = dp_rank
self.splitwise_role = splitwise_role
decode_log_interval = envs.FD_CONSOLE_DECODE_LOG_INTERVAL
if decode_log_interval <= 0:
decode_log_interval = self.DEFAULT_DECODE_LOG_INTERVAL
Expand Down Expand Up @@ -65,8 +66,9 @@ def on_decode_tokens(self, num_tokens: int) -> None:
with self._lock:
self._decode_tokens_since_last += num_tokens

def log_prefill_batch(
def _log_prefill_like_batch(
self,
batch_name: str,
prefill_reqs: Iterable,
running_cnt: int,
queue_cnt: int,
Expand All @@ -91,8 +93,9 @@ def log_prefill_batch(
cached_tokens += getattr(req, "num_cached_tokens", 0) or 0

msg = (
"Prefill batch, "
f"{batch_name}, "
f"dp_rank: {self.dp_rank}, "
f"splitwise_role: {self.splitwise_role}, "
f"#new-seq: {len(prefill_reqs)}, "
f"#new-token: {new_tokens}, "
f"#cached-token: {cached_tokens}, "
Expand All @@ -104,6 +107,48 @@ def log_prefill_batch(
)
self._logger.info(msg)

def log_prefill_batch(
self,
prefill_reqs: Iterable,
running_cnt: int,
queue_cnt: int,
tokens_used: int,
token_usage: float,
free_blocks: int = 0,
evictable_blocks: int = 0,
) -> None:
self._log_prefill_like_batch(
batch_name="Prefill batch",
prefill_reqs=prefill_reqs,
running_cnt=running_cnt,
queue_cnt=queue_cnt,
tokens_used=tokens_used,
token_usage=token_usage,
free_blocks=free_blocks,
evictable_blocks=evictable_blocks,
)

def log_decode_bootstrap_batch(
self,
prefill_reqs: Iterable,
running_cnt: int,
queue_cnt: int,
tokens_used: int,
token_usage: float,
free_blocks: int = 0,
evictable_blocks: int = 0,
) -> None:
self._log_prefill_like_batch(
batch_name="Decode bootstrap batch from prefill",
prefill_reqs=prefill_reqs,
running_cnt=running_cnt,
queue_cnt=queue_cnt,
tokens_used=tokens_used,
token_usage=token_usage,
free_blocks=free_blocks,
evictable_blocks=evictable_blocks,
)

def log_decode_batch(
self,
running_cnt: int,
Expand Down Expand Up @@ -132,6 +177,7 @@ def log_decode_batch(
msg = (
"Decode batch, "
f"dp_rank: {self.dp_rank}, "
f"splitwise_role: {self.splitwise_role}, "
f"#running-req: {running_cnt}, "
f"#token: {tokens_used}, "
f"token usage: {token_usage:.2f}, "
Expand Down
34 changes: 32 additions & 2 deletions tests/engine/test_scheduler_metrics_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def test_on_decode_tokens_accumulates():


def test_log_prefill_batch_logs_expected_message():
logger = SchedulerMetricsLogger(enabled=True, dp_rank=2)
logger = SchedulerMetricsLogger(enabled=True, dp_rank=2, splitwise_role="prefill")
logger._logger = mock.Mock()

reqs = [
Expand All @@ -46,6 +46,7 @@ def test_log_prefill_batch_logs_expected_message():
message = logger._logger.info.call_args[0][0]
assert "Prefill batch" in message
assert "dp_rank: 2" in message
assert "splitwise_role: prefill" in message
assert "#new-seq: 2" in message
assert "#new-token: 4" in message
assert "#cached-token: 3" in message
Expand All @@ -54,8 +55,31 @@ def test_log_prefill_batch_logs_expected_message():
assert "#queue-req: 6" in message


def test_log_decode_bootstrap_batch_logs_expected_message():
logger = SchedulerMetricsLogger(enabled=True, dp_rank=0, splitwise_role="decode")
logger._logger = mock.Mock()

reqs = [types.SimpleNamespace(prefill_start_index=4, prefill_end_index=5, num_cached_tokens=4)]

logger.log_decode_bootstrap_batch(
prefill_reqs=reqs,
running_cnt=1,
queue_cnt=0,
tokens_used=5,
token_usage=0.25,
)

logger._logger.info.assert_called_once()
message = logger._logger.info.call_args[0][0]
assert "Decode bootstrap batch" in message
assert "splitwise_role: decode" in message
assert "#new-seq: 1" in message
assert "#new-token: 1" in message
assert "#cached-token: 4" in message


def test_log_decode_batch_computes_throughput(monkeypatch):
logger = SchedulerMetricsLogger(enabled=True, dp_rank=1)
logger = SchedulerMetricsLogger(enabled=True, dp_rank=1, splitwise_role="decode")
logger._logger = mock.Mock()
logger._decode_batch_count = logger._decode_log_interval - 1
logger._decode_tokens_since_last = 10
Expand All @@ -69,6 +93,7 @@ def test_log_decode_batch_computes_throughput(monkeypatch):
message = logger._logger.info.call_args[0][0]
assert "Decode batch" in message
assert "dp_rank: 1" in message
assert "splitwise_role: decode" in message
assert "gen throughput (token/s): 5.00" in message
assert "#queue-req: 7" in message
assert logger._decode_tokens_since_last == 0
Expand Down Expand Up @@ -99,3 +124,8 @@ def test_decode_log_interval_non_positive_falls_back_to_default(monkeypatch):
monkeypatch.setenv("FD_CONSOLE_DECODE_LOG_INTERVAL", "0")
logger = SchedulerMetricsLogger(enabled=True, dp_rank=0)
assert logger._decode_log_interval == SchedulerMetricsLogger.DEFAULT_DECODE_LOG_INTERVAL


def test_default_splitwise_role_is_mixed():
logger = SchedulerMetricsLogger(enabled=True, dp_rank=0)
assert logger.splitwise_role == "mixed"
23 changes: 23 additions & 0 deletions tests/v1/test_resource_manager_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
if not hasattr(paddle, "enable_compat"):
paddle.enable_compat = lambda scope=None: None

from fastdeploy import envs
from fastdeploy.config import CacheConfig, FDConfig, ParallelConfig, SchedulerConfig
from fastdeploy.engine.args_utils import EngineArgs
from fastdeploy.engine.request import (
Expand All @@ -36,6 +37,7 @@
RequestMetrics,
RequestOutput,
RequestStatus,
RequestType,
)
from fastdeploy.engine.sched.resource_manager_v1 import (
ResourceManagerV1,
Expand Down Expand Up @@ -569,6 +571,27 @@
self.assertEqual(request_d.num_computed_tokens, request_d.need_prefill_tokens)
self.assertEqual(request_d.disaggregate_info["block_tables"], [4, 5])

def test_decode_role_prefill_task_logs_decode_bootstrap_batch(self):
manager = _build_manager(splitwise_role="decode", enable_prefix_caching=False)
_register_manager_cleanup(self, manager)
manager.cache_manager = MagicMock()
manager.cache_manager.num_gpu_blocks = 8
manager.cache_manager.gpu_free_block_list = [0, 1, 2, 3]
manager.scheduler_metrics_logger = MagicMock()

request = _make_request(prompt_token_ids=[1, 2, 3, 4])
request.task_type = RequestType.PREFILL
request.prefill_start_index = 4
request.prefill_end_index = 5
batch_request = BatchRequest()

Check failure on line 586 in tests/v1/test_resource_manager_v1.py

View workflow job for this annotation

GitHub Actions / Pre Commit

Ruff (F821)

tests/v1/test_resource_manager_v1.py:586:25: F821 Undefined name `BatchRequest`
batch_request.add_request(request)

with patch.object(envs, "FD_CONSOLE_SCHEDULER_METRICS", True):
manager._log_console_scheduler_metrics(batch_request)

manager.scheduler_metrics_logger.log_decode_bootstrap_batch.assert_called_once()
manager.scheduler_metrics_logger.log_prefill_batch.assert_not_called()

def test_prefilled_request_flow_and_resource_check(self):
manager = _build_manager(splitwise_role="decode", speculative_method="mtp")
_register_manager_cleanup(self, manager)
Expand Down
Loading