Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions docs/en/advance/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,17 @@ lmdeploy serve api_server Qwen/Qwen2.5-7B-Instruct --enable-metrics
Replace the model path according to your needs.
By default, the metrics endpoint will be available at `http://<lmdeploy_server_host>:23333/metrics`.

For VLM serving, multimodal preprocessing metrics are exported when the metrics system is enabled:

The multimodal metrics include:

- `lmdeploy:multimodal_requests_total`: multimodal request count
- `lmdeploy:multimodal_items_total`: multimodal item count by modality
- `lmdeploy:multimodal_preprocess_time_seconds`: total multimodal preprocessing time
- `lmdeploy:multimodal_stage_time_seconds`: stage time by stage and modality
- `lmdeploy:multimodal_item_count`: item count per request by modality
- `lmdeploy:multimodal_processing_failures_total`: processing failures by stage and modality

2. **Navigate to the monitoring directory**

```
Expand Down
11 changes: 11 additions & 0 deletions docs/zh_cn/advance/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,17 @@ lmdeploy serve api_server Qwen/Qwen2.5-7B-Instruct --enable-metrics

请根据需求替换模型路径。默认 metrics endpoint 位于 `http://<lmdeploy_server_host>:23333/metrics`

VLM 服务在启用指标系统后会导出多模态预处理指标:

多模态指标包括:

- `lmdeploy:multimodal_requests_total`:多模态请求数量
- `lmdeploy:multimodal_items_total`:按模态统计的多模态输入数量
- `lmdeploy:multimodal_preprocess_time_seconds`:多模态预处理总耗时
- `lmdeploy:multimodal_stage_time_seconds`:按阶段和模态统计的耗时
- `lmdeploy:multimodal_item_count`:按模态统计的单请求输入数量
- `lmdeploy:multimodal_processing_failures_total`:按阶段和模态统计的处理失败次数

2. **进入监控目录**

```
Expand Down
84 changes: 81 additions & 3 deletions lmdeploy/metrics/loggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,13 @@

import numpy as np

from lmdeploy.metrics.stats import IterationStats, RequestStats, SchedulerStats, SpeculativeDecodingStats
from lmdeploy.metrics.stats import (
IterationStats,
MultimodalStats,
RequestStats,
SchedulerStats,
SpeculativeDecodingStats,
)
from lmdeploy.utils import get_logger

logger = get_logger('lmdeploy')
Expand All @@ -27,6 +33,9 @@ def record_iteration(self, stats: IterationStats) -> None:
def record_specdecode(self, stats: SpeculativeDecodingStats) -> None:
...

def record_multimodal(self, stats: MultimodalStats) -> None:
...

def log(self): # noqa
pass

Expand All @@ -42,6 +51,9 @@ def _reset(self, now):
self.last_log_time = now
self.total_prompt_tokens = 0
self.total_generation_tokens = 0
self.total_multimodal_requests = 0
self.total_multimodal_items = 0
self.total_multimodal_preprocess_time = 0.0
# spec decode
self.num_drafts: int = 0
self.num_draft_tokens: int = 0
Expand Down Expand Up @@ -72,6 +84,15 @@ def record_specdecode(self, stats: SpeculativeDecodingStats):
def record_finish(self, stats: RequestStats):
pass

def record_multimodal(self, stats: MultimodalStats) -> None:
if not stats.has_data:
return

total_time, _, item_counts, _ = stats.snapshot()
self.total_multimodal_requests += 1
self.total_multimodal_items += sum(item_counts.values())
self.total_multimodal_preprocess_time += total_time

def get_spec_msg(self):
"""Get spec decoding logging msg."""
if self.num_drafts == 0:
Expand All @@ -98,7 +119,7 @@ def log(self):
now = time.perf_counter()

# skip logging if no tokens were processed
if self.total_prompt_tokens == 0 and self.total_generation_tokens == 0:
if (self.total_prompt_tokens == 0 and self.total_generation_tokens == 0):
self._reset(now)
return

Expand All @@ -109,7 +130,8 @@ def log(self):
spec_msg = self.get_spec_msg()

# format and print
log_msg = (f"[{datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')} DP{self.dp_rank}] "
log_msg = (f"[{datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')} "
f'Engine {self.dp_rank:03d}] '
f'Avg thr (in/out): {prompt_throughput:.1f} / {generation_throughput:.1f} tokens/s, '
f'Server (succeeded/failed/routed/waiting): '
f'{scheduler_stats.num_succeeded_reqs} / {scheduler_stats.num_failed_reqs} / '
Expand All @@ -121,6 +143,10 @@ def log(self):
if scheduler_stats.prefix_cache_hit_rate != 0:
log_msg += f'Prefix cache hit rate: {scheduler_stats.prefix_cache_hit_rate * 100 :.1f}%, '

if self.total_multimodal_requests:
avg_mm_time = self.total_multimodal_preprocess_time / self.total_multimodal_requests
log_msg += f'Avg MM preprocess: {avg_mm_time:.3f} s/req, '

if spec_msg is not None:
log_msg += spec_msg

Expand Down Expand Up @@ -151,6 +177,7 @@ def __init__(self, model_name: str, max_model_len: int, dp_rank: int = 0):

labelnames = ['model_name', 'engine']
labelvalues = [model_name, str(dp_rank)]
self.labelvalues = labelvalues

#
# Scheduler stats
Expand Down Expand Up @@ -350,6 +377,37 @@ def __init__(self, model_name: str, max_model_len: int, dp_rank: int = 0):
buckets=request_latency_buckets,
labelnames=labelnames).labels(*labelvalues)

#
# Multimodal preprocessing
#
self.counter_multimodal_requests = prometheus_client.Counter(
name='lmdeploy:multimodal_requests_total',
documentation='Count of multimodal requests.',
labelnames=labelnames).labels(*labelvalues)
self.counter_multimodal_items = prometheus_client.Counter(
name='lmdeploy:multimodal_items_total',
documentation='Count of multimodal input items by modality.',
labelnames=labelnames + ['modality'])
self.histogram_multimodal_preprocess_time = prometheus_client.Histogram(
name='lmdeploy:multimodal_preprocess_time_seconds',
documentation='Histogram of multimodal preprocessing time in seconds.',
buckets=request_latency_buckets,
labelnames=labelnames).labels(*labelvalues)
self.histogram_multimodal_stage_time = prometheus_client.Histogram(
name='lmdeploy:multimodal_stage_time_seconds',
documentation='Histogram of multimodal preprocessing stage time in seconds.',
buckets=request_latency_buckets,
labelnames=labelnames + ['stage', 'modality'])
self.histogram_multimodal_item_count = prometheus_client.Histogram(
name='lmdeploy:multimodal_item_count',
documentation='Histogram of multimodal item counts per request.',
buckets=[1, 2, 4, 8, 16, 32, 64, 128, 256],
labelnames=labelnames + ['modality'])
self.counter_multimodal_failures = prometheus_client.Counter(
name='lmdeploy:multimodal_processing_failures_total',
documentation='Count of multimodal preprocessing failures.',
labelnames=labelnames + ['stage', 'modality'])

def record_schedule(self, stats: SchedulerStats) -> None:
"""Report schedule metrics to prometheus."""
self.gauge_scheduler_succeeded.set(stats.num_succeeded_reqs)
Expand Down Expand Up @@ -391,6 +449,26 @@ def _get_counter_value(counter) -> float:
"""Get the current value from a prometheus counter child."""
return counter._value.get()

def record_multimodal(self, stats: MultimodalStats) -> None:
if not stats.has_data:
return

total_time, stage_times, item_counts, failures = stats.snapshot()
labelvalues = self.labelvalues
self.counter_multimodal_requests.inc()
if total_time > 0:
self.histogram_multimodal_preprocess_time.observe(total_time)

for modality, count in item_counts.items():
self.counter_multimodal_items.labels(*(labelvalues + [modality])).inc(count)

for (stage, modality), seconds in stage_times.items():
self.histogram_multimodal_stage_time.labels(*(labelvalues + [stage, modality])).observe(seconds)
for modality, count in item_counts.items():
self.histogram_multimodal_item_count.labels(*(labelvalues + [modality])).observe(count)
for (stage, modality), count in failures.items():
self.counter_multimodal_failures.labels(*(labelvalues + [stage, modality])).inc(count)

def record_specdecode(self, stats: SpeculativeDecodingStats) -> None:
"""Report speculative decoding metrics to prometheus."""
if stats.num_drafts <= 0:
Expand Down
13 changes: 12 additions & 1 deletion lmdeploy/metrics/metrics_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from lmdeploy.pytorch.utils import singleton
from lmdeploy.utils import get_logger

from .stats import SchedulerStats
from .stats import MultimodalStats, SchedulerStats

logger = get_logger('lmdeploy')

Expand Down Expand Up @@ -93,6 +93,17 @@ def queue_update(self, update_data: tuple):
return
self.metrics_queue.put_nowait(update_data)

def record_multimodal(self, stats: MultimodalStats | None):
"""Record multimodal preprocessing stats."""
if not self.enable_metrics or stats is None:
return
stats.finish()
if not stats.mark_emitted():
return

for stat_logger in self.stat_loggers:
stat_logger.record_multimodal(stats)

def increase_total_requests(self):
"""Increase total requests."""
self.scheduler_stats.num_total_reqs += 1
Expand Down
93 changes: 93 additions & 0 deletions lmdeploy/metrics/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@
# adapted from https://github.com/vllm-project/vllm/blob/main/vllm/v1/metrics/stats.py

import time
from collections import defaultdict
from collections.abc import Iterator
from contextlib import contextmanager
from dataclasses import dataclass
from threading import Lock

import numpy as np

Expand Down Expand Up @@ -94,6 +98,95 @@ def update_from_schedule_metrics(self, scheduled_metrics: ScheduleMetrics):
self.prefix_cache_hit_rate = scheduled_metrics.prefix_cache_hit_rate


class MultimodalStats:
"""Stats associated with multimodal prompt preprocessing."""

def __init__(self, enabled: bool = True):
"""Initialize multimodal preprocessing stats.

Args:
enabled (bool): Whether to record multimodal metrics.
"""
self.enabled = enabled
self.start_time = time.perf_counter()
self.total_time: float = 0.0
self.stage_times: dict[tuple[str, str], float] = defaultdict(float)
self.item_counts: dict[str, int] = defaultdict(int)
self.failures: dict[tuple[str, str], int] = defaultdict(int)
self.finished = False
# Avoid duplicate emission if success/error paths both try to record.
self._emitted = False
# Multimodal item parsing can update this object from executor threads.
self._lock = Lock()

def add_stage(self, stage: str, seconds: float, modality='all') -> None:
"""Add elapsed time for a multimodal preprocessing stage."""
if not self.enabled:
return
seconds = max(float(seconds), 0.0)
with self._lock:
self.stage_times[(stage, modality)] += seconds

def add_item(self, modality, count: int = 1) -> None:
"""Record multimodal input item count by modality."""
if not self.enabled:
return
with self._lock:
self.item_counts[modality] += count

def record_failure(self, stage: str = 'total', modality='all') -> None:
"""Record a multimodal preprocessing failure."""
if not self.enabled:
return
with self._lock:
self.failures[(stage, modality)] += 1

@contextmanager
def span(self, stage: str, modality='all') -> Iterator[None]:
"""Measure a multimodal preprocessing stage."""
if not self.enabled:
yield
return

start = time.perf_counter()
try:
yield
finally:
self.add_stage(stage, time.perf_counter() - start, modality)

def finish(self) -> None:
"""Mark total multimodal preprocessing time."""
if not self.enabled:
return
with self._lock:
if not self.finished:
self.total_time = time.perf_counter() - self.start_time
self.finished = True

@property
def has_data(self) -> bool:
"""Whether this object contains any multimodal metric data."""
with self._lock:
return bool(self.item_counts or self.stage_times or self.failures)

def mark_emitted(self) -> bool:
"""Mark this stats object as emitted.

Returns:
bool: True if this call marked it for the first time.
"""
with self._lock:
if self._emitted or not (self.item_counts or self.stage_times or self.failures):
return False
self._emitted = True
return True

def snapshot(self) -> tuple[float, dict[tuple[str, str], float], dict[str, int], dict[tuple[str, str], int]]:
"""Return a thread-safe copy of the collected stats."""
with self._lock:
return self.total_time, dict(self.stage_times), dict(self.item_counts), dict(self.failures)


class RequestStats:
"""Stats associated with a request."""

Expand Down
10 changes: 8 additions & 2 deletions lmdeploy/serve/core/async_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
TurbomindEngineConfig,
)
from lmdeploy.metrics.metrics_processor import metrics_processor
from lmdeploy.metrics.stats import IterationStats, RequestStats, SpeculativeDecodingStats
from lmdeploy.metrics.stats import IterationStats, MultimodalStats, RequestStats, SpeculativeDecodingStats
from lmdeploy.model import ChatTemplateConfig, get_chat_template
from lmdeploy.pytorch.disagg.conn.protocol import (
DistServeConnectionRequest,
Expand Down Expand Up @@ -218,7 +218,9 @@ def _build_stat_loggers(self):
logger.info(f'enable metrics, with dp: {self.backend_config.dp} dp_rank: {dp_rank}')
self.stat_loggers = [
LoggingStatLogger(dp_rank=dp_rank),
PrometheusStatLogger(model_name=self.model_name, max_model_len=self.session_len, dp_rank=dp_rank)
PrometheusStatLogger(model_name=self.model_name,
max_model_len=self.session_len,
dp_rank=dp_rank)
]

# set stats loggers of metrics processor
Expand Down Expand Up @@ -510,6 +512,7 @@ async def generate(
logger.warning('chat_template_kwargs["enable_thinking"] is already set, '
'the value will not be overwritten by enable_thinking')
if messages:
mm_stats = MultimodalStats(enabled=metrics_processor.enable_metrics)
try:
prompt = messages
self.request_logger.log_prompt(session, prompt=prompt)
Expand All @@ -522,7 +525,9 @@ async def generate(
chat_template_kwargs=chat_template_kwargs,
media_io_kwargs=media_io_kwargs,
mm_processor_kwargs=mm_processor_kwargs,
mm_stats=mm_stats,
**kwargs)
metrics_processor.record_multimodal(mm_stats)
prompt = prompt_input.get('prompt')
input_ids = prompt_input.get('input_ids')
self.request_logger.log_inputs(session,
Expand All @@ -532,6 +537,7 @@ async def generate(
adapter_name=adapter_name)
except Exception:
logger.exception('[generate] error in prompt processing')
metrics_processor.record_multimodal(mm_stats)
metrics_processor.increase_failed_requests('error')
yield GenOut(response='in prompt processing error',
history_token_len=session.step,
Expand Down
Loading
Loading