diff --git a/docs/en/advance/metrics.md b/docs/en/advance/metrics.md index 1f05807d9a..1aef50bbc1 100644 --- a/docs/en/advance/metrics.md +++ b/docs/en/advance/metrics.md @@ -23,6 +23,17 @@ lmdeploy serve api_server Qwen/Qwen2.5-7B-Instruct --enable-metrics Replace the model path according to your needs. By default, the metrics endpoint will be available at `http://:23333/metrics`. +For VLM serving, multimodal preprocessing metrics are exported when the metrics system is enabled: + +The multimodal metrics include: + +- `lmdeploy:multimodal_requests_total`: multimodal request count +- `lmdeploy:multimodal_items_total`: multimodal item count by modality +- `lmdeploy:multimodal_preprocess_time_seconds`: total multimodal preprocessing time +- `lmdeploy:multimodal_stage_time_seconds`: stage time by stage and modality +- `lmdeploy:multimodal_item_count`: item count per request by modality +- `lmdeploy:multimodal_processing_failures_total`: processing failures by stage and modality + 2. **Navigate to the monitoring directory** ``` diff --git a/docs/zh_cn/advance/metrics.md b/docs/zh_cn/advance/metrics.md index 7eb4eec5cc..64cf2cedf7 100644 --- a/docs/zh_cn/advance/metrics.md +++ b/docs/zh_cn/advance/metrics.md @@ -22,6 +22,17 @@ lmdeploy serve api_server Qwen/Qwen2.5-7B-Instruct --enable-metrics 请根据需求替换模型路径。默认 metrics endpoint 位于 `http://:23333/metrics`。 +VLM 服务在启用指标系统后会导出多模态预处理指标: + +多模态指标包括: + +- `lmdeploy:multimodal_requests_total`:多模态请求数量 +- `lmdeploy:multimodal_items_total`:按模态统计的多模态输入数量 +- `lmdeploy:multimodal_preprocess_time_seconds`:多模态预处理总耗时 +- `lmdeploy:multimodal_stage_time_seconds`:按阶段和模态统计的耗时 +- `lmdeploy:multimodal_item_count`:按模态统计的单请求输入数量 +- `lmdeploy:multimodal_processing_failures_total`:按阶段和模态统计的处理失败次数 + 2. **进入监控目录** ``` diff --git a/lmdeploy/metrics/loggers.py b/lmdeploy/metrics/loggers.py index 69e24ede4f..1661439434 100644 --- a/lmdeploy/metrics/loggers.py +++ b/lmdeploy/metrics/loggers.py @@ -7,7 +7,13 @@ import numpy as np -from lmdeploy.metrics.stats import IterationStats, RequestStats, SchedulerStats, SpeculativeDecodingStats +from lmdeploy.metrics.stats import ( + IterationStats, + MultimodalStats, + RequestStats, + SchedulerStats, + SpeculativeDecodingStats, +) from lmdeploy.utils import get_logger logger = get_logger('lmdeploy') @@ -27,6 +33,9 @@ def record_iteration(self, stats: IterationStats) -> None: def record_specdecode(self, stats: SpeculativeDecodingStats) -> None: ... + def record_multimodal(self, stats: MultimodalStats) -> None: + ... + def log(self): # noqa pass @@ -42,6 +51,9 @@ def _reset(self, now): self.last_log_time = now self.total_prompt_tokens = 0 self.total_generation_tokens = 0 + self.total_multimodal_requests = 0 + self.total_multimodal_items = 0 + self.total_multimodal_preprocess_time = 0.0 # spec decode self.num_drafts: int = 0 self.num_draft_tokens: int = 0 @@ -72,6 +84,15 @@ def record_specdecode(self, stats: SpeculativeDecodingStats): def record_finish(self, stats: RequestStats): pass + def record_multimodal(self, stats: MultimodalStats) -> None: + if not stats.has_data: + return + + total_time, _, item_counts, _ = stats.snapshot() + self.total_multimodal_requests += 1 + self.total_multimodal_items += sum(item_counts.values()) + self.total_multimodal_preprocess_time += total_time + def get_spec_msg(self): """Get spec decoding logging msg.""" if self.num_drafts == 0: @@ -98,7 +119,7 @@ def log(self): now = time.perf_counter() # skip logging if no tokens were processed - if self.total_prompt_tokens == 0 and self.total_generation_tokens == 0: + if (self.total_prompt_tokens == 0 and self.total_generation_tokens == 0): self._reset(now) return @@ -109,7 +130,8 @@ def log(self): spec_msg = self.get_spec_msg() # format and print - log_msg = (f"[{datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')} DP{self.dp_rank}] " + log_msg = (f"[{datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')} " + f'Engine {self.dp_rank:03d}] ' f'Avg thr (in/out): {prompt_throughput:.1f} / {generation_throughput:.1f} tokens/s, ' f'Server (succeeded/failed/routed/waiting): ' f'{scheduler_stats.num_succeeded_reqs} / {scheduler_stats.num_failed_reqs} / ' @@ -121,6 +143,10 @@ def log(self): if scheduler_stats.prefix_cache_hit_rate != 0: log_msg += f'Prefix cache hit rate: {scheduler_stats.prefix_cache_hit_rate * 100 :.1f}%, ' + if self.total_multimodal_requests: + avg_mm_time = self.total_multimodal_preprocess_time / self.total_multimodal_requests + log_msg += f'Avg MM preprocess: {avg_mm_time:.3f} s/req, ' + if spec_msg is not None: log_msg += spec_msg @@ -151,6 +177,7 @@ def __init__(self, model_name: str, max_model_len: int, dp_rank: int = 0): labelnames = ['model_name', 'engine'] labelvalues = [model_name, str(dp_rank)] + self.labelvalues = labelvalues # # Scheduler stats @@ -350,6 +377,37 @@ def __init__(self, model_name: str, max_model_len: int, dp_rank: int = 0): buckets=request_latency_buckets, labelnames=labelnames).labels(*labelvalues) + # + # Multimodal preprocessing + # + self.counter_multimodal_requests = prometheus_client.Counter( + name='lmdeploy:multimodal_requests_total', + documentation='Count of multimodal requests.', + labelnames=labelnames).labels(*labelvalues) + self.counter_multimodal_items = prometheus_client.Counter( + name='lmdeploy:multimodal_items_total', + documentation='Count of multimodal input items by modality.', + labelnames=labelnames + ['modality']) + self.histogram_multimodal_preprocess_time = prometheus_client.Histogram( + name='lmdeploy:multimodal_preprocess_time_seconds', + documentation='Histogram of multimodal preprocessing time in seconds.', + buckets=request_latency_buckets, + labelnames=labelnames).labels(*labelvalues) + self.histogram_multimodal_stage_time = prometheus_client.Histogram( + name='lmdeploy:multimodal_stage_time_seconds', + documentation='Histogram of multimodal preprocessing stage time in seconds.', + buckets=request_latency_buckets, + labelnames=labelnames + ['stage', 'modality']) + self.histogram_multimodal_item_count = prometheus_client.Histogram( + name='lmdeploy:multimodal_item_count', + documentation='Histogram of multimodal item counts per request.', + buckets=[1, 2, 4, 8, 16, 32, 64, 128, 256], + labelnames=labelnames + ['modality']) + self.counter_multimodal_failures = prometheus_client.Counter( + name='lmdeploy:multimodal_processing_failures_total', + documentation='Count of multimodal preprocessing failures.', + labelnames=labelnames + ['stage', 'modality']) + def record_schedule(self, stats: SchedulerStats) -> None: """Report schedule metrics to prometheus.""" self.gauge_scheduler_succeeded.set(stats.num_succeeded_reqs) @@ -391,6 +449,26 @@ def _get_counter_value(counter) -> float: """Get the current value from a prometheus counter child.""" return counter._value.get() + def record_multimodal(self, stats: MultimodalStats) -> None: + if not stats.has_data: + return + + total_time, stage_times, item_counts, failures = stats.snapshot() + labelvalues = self.labelvalues + self.counter_multimodal_requests.inc() + if total_time > 0: + self.histogram_multimodal_preprocess_time.observe(total_time) + + for modality, count in item_counts.items(): + self.counter_multimodal_items.labels(*(labelvalues + [modality])).inc(count) + + for (stage, modality), seconds in stage_times.items(): + self.histogram_multimodal_stage_time.labels(*(labelvalues + [stage, modality])).observe(seconds) + for modality, count in item_counts.items(): + self.histogram_multimodal_item_count.labels(*(labelvalues + [modality])).observe(count) + for (stage, modality), count in failures.items(): + self.counter_multimodal_failures.labels(*(labelvalues + [stage, modality])).inc(count) + def record_specdecode(self, stats: SpeculativeDecodingStats) -> None: """Report speculative decoding metrics to prometheus.""" if stats.num_drafts <= 0: diff --git a/lmdeploy/metrics/metrics_processor.py b/lmdeploy/metrics/metrics_processor.py index 81db4fa57d..6bac129382 100644 --- a/lmdeploy/metrics/metrics_processor.py +++ b/lmdeploy/metrics/metrics_processor.py @@ -5,7 +5,7 @@ from lmdeploy.pytorch.utils import singleton from lmdeploy.utils import get_logger -from .stats import SchedulerStats +from .stats import MultimodalStats, SchedulerStats logger = get_logger('lmdeploy') @@ -93,6 +93,17 @@ def queue_update(self, update_data: tuple): return self.metrics_queue.put_nowait(update_data) + def record_multimodal(self, stats: MultimodalStats | None): + """Record multimodal preprocessing stats.""" + if not self.enable_metrics or stats is None: + return + stats.finish() + if not stats.mark_emitted(): + return + + for stat_logger in self.stat_loggers: + stat_logger.record_multimodal(stats) + def increase_total_requests(self): """Increase total requests.""" self.scheduler_stats.num_total_reqs += 1 diff --git a/lmdeploy/metrics/stats.py b/lmdeploy/metrics/stats.py index 7aa8e9efa5..80a6480bcf 100644 --- a/lmdeploy/metrics/stats.py +++ b/lmdeploy/metrics/stats.py @@ -2,7 +2,11 @@ # adapted from https://github.com/vllm-project/vllm/blob/main/vllm/v1/metrics/stats.py import time +from collections import defaultdict +from collections.abc import Iterator +from contextlib import contextmanager from dataclasses import dataclass +from threading import Lock import numpy as np @@ -94,6 +98,95 @@ def update_from_schedule_metrics(self, scheduled_metrics: ScheduleMetrics): self.prefix_cache_hit_rate = scheduled_metrics.prefix_cache_hit_rate +class MultimodalStats: + """Stats associated with multimodal prompt preprocessing.""" + + def __init__(self, enabled: bool = True): + """Initialize multimodal preprocessing stats. + + Args: + enabled (bool): Whether to record multimodal metrics. + """ + self.enabled = enabled + self.start_time = time.perf_counter() + self.total_time: float = 0.0 + self.stage_times: dict[tuple[str, str], float] = defaultdict(float) + self.item_counts: dict[str, int] = defaultdict(int) + self.failures: dict[tuple[str, str], int] = defaultdict(int) + self.finished = False + # Avoid duplicate emission if success/error paths both try to record. + self._emitted = False + # Multimodal item parsing can update this object from executor threads. + self._lock = Lock() + + def add_stage(self, stage: str, seconds: float, modality='all') -> None: + """Add elapsed time for a multimodal preprocessing stage.""" + if not self.enabled: + return + seconds = max(float(seconds), 0.0) + with self._lock: + self.stage_times[(stage, modality)] += seconds + + def add_item(self, modality, count: int = 1) -> None: + """Record multimodal input item count by modality.""" + if not self.enabled: + return + with self._lock: + self.item_counts[modality] += count + + def record_failure(self, stage: str = 'total', modality='all') -> None: + """Record a multimodal preprocessing failure.""" + if not self.enabled: + return + with self._lock: + self.failures[(stage, modality)] += 1 + + @contextmanager + def span(self, stage: str, modality='all') -> Iterator[None]: + """Measure a multimodal preprocessing stage.""" + if not self.enabled: + yield + return + + start = time.perf_counter() + try: + yield + finally: + self.add_stage(stage, time.perf_counter() - start, modality) + + def finish(self) -> None: + """Mark total multimodal preprocessing time.""" + if not self.enabled: + return + with self._lock: + if not self.finished: + self.total_time = time.perf_counter() - self.start_time + self.finished = True + + @property + def has_data(self) -> bool: + """Whether this object contains any multimodal metric data.""" + with self._lock: + return bool(self.item_counts or self.stage_times or self.failures) + + def mark_emitted(self) -> bool: + """Mark this stats object as emitted. + + Returns: + bool: True if this call marked it for the first time. + """ + with self._lock: + if self._emitted or not (self.item_counts or self.stage_times or self.failures): + return False + self._emitted = True + return True + + def snapshot(self) -> tuple[float, dict[tuple[str, str], float], dict[str, int], dict[tuple[str, str], int]]: + """Return a thread-safe copy of the collected stats.""" + with self._lock: + return self.total_time, dict(self.stage_times), dict(self.item_counts), dict(self.failures) + + class RequestStats: """Stats associated with a request.""" diff --git a/lmdeploy/serve/core/async_engine.py b/lmdeploy/serve/core/async_engine.py index fd7c0807fd..490e12c094 100644 --- a/lmdeploy/serve/core/async_engine.py +++ b/lmdeploy/serve/core/async_engine.py @@ -23,7 +23,7 @@ TurbomindEngineConfig, ) from lmdeploy.metrics.metrics_processor import metrics_processor -from lmdeploy.metrics.stats import IterationStats, RequestStats, SpeculativeDecodingStats +from lmdeploy.metrics.stats import IterationStats, MultimodalStats, RequestStats, SpeculativeDecodingStats from lmdeploy.model import ChatTemplateConfig, get_chat_template from lmdeploy.pytorch.disagg.conn.protocol import ( DistServeConnectionRequest, @@ -218,7 +218,9 @@ def _build_stat_loggers(self): logger.info(f'enable metrics, with dp: {self.backend_config.dp} dp_rank: {dp_rank}') self.stat_loggers = [ LoggingStatLogger(dp_rank=dp_rank), - PrometheusStatLogger(model_name=self.model_name, max_model_len=self.session_len, dp_rank=dp_rank) + PrometheusStatLogger(model_name=self.model_name, + max_model_len=self.session_len, + dp_rank=dp_rank) ] # set stats loggers of metrics processor @@ -534,6 +536,7 @@ def remove_session_once(): logger.warning('chat_template_kwargs["enable_thinking"] is already set, ' 'the value will not be overwritten by enable_thinking') if messages: + mm_stats = MultimodalStats(enabled=metrics_processor.enable_metrics) try: prompt = messages self.request_logger.log_prompt(session, prompt=prompt) @@ -546,7 +549,9 @@ def remove_session_once(): chat_template_kwargs=chat_template_kwargs, media_io_kwargs=media_io_kwargs, mm_processor_kwargs=mm_processor_kwargs, + mm_stats=mm_stats, **kwargs) + metrics_processor.record_multimodal(mm_stats) prompt = prompt_input.get('prompt') input_ids = prompt_input.get('input_ids') self.request_logger.log_inputs(session, @@ -560,6 +565,7 @@ def remove_session_once(): raise except Exception: logger.exception('[generate] error in prompt processing') + metrics_processor.record_multimodal(mm_stats) metrics_processor.increase_failed_requests('error') remove_session_once() yield GenOut(response='in prompt processing error', diff --git a/lmdeploy/serve/processors/multimodal.py b/lmdeploy/serve/processors/multimodal.py index cf2452935e..35a24ca3d0 100644 --- a/lmdeploy/serve/processors/multimodal.py +++ b/lmdeploy/serve/processors/multimodal.py @@ -1,9 +1,11 @@ # Copyright (c) OpenMMLab. All rights reserved. import asyncio +from contextlib import contextmanager from typing import Any, Literal import PIL +from lmdeploy.metrics.stats import MultimodalStats from lmdeploy.model import MODELS, BaseChatTemplate from lmdeploy.tokenizer import Tokenizer from lmdeploy.utils import get_logger @@ -17,6 +19,16 @@ logger = get_logger('lmdeploy') +@contextmanager +def _mm_stage(mm_stats: MultimodalStats, stage: str, modality='all'): + try: + with mm_stats.span(stage, modality): + yield + except Exception: + mm_stats.record_failure(stage, modality) + raise + + class MultimodalProcessor: """Processor for handling prompt preprocessing, message content merging, and multimodal processing.""" @@ -91,8 +103,11 @@ def merge_message_content(msg: dict) -> dict: return result @staticmethod - def _parse_multimodal_item(i: int, in_messages: list[dict], out_messages: list[dict], media_io_kwargs: dict[str, - Any]): + def _parse_multimodal_item(i: int, + in_messages: list[dict], + out_messages: list[dict], + media_io_kwargs: dict[str, Any], + mm_stats: MultimodalStats): """Synchronous helper to parse a single multimodal message item.""" role = in_messages[i]['role'] content = in_messages[i]['content'] @@ -125,37 +140,54 @@ def _parse_multimodal_item(i: int, in_messages: list[dict], out_messages: list[d item_params = {k: v for k, v in item.items() if k not in ('type', item_type)} data_src = item_val + modality = None + def _require_data_src(): if data_src is not None: return data_src + mm_stats.record_failure('parse', modality.value) raise ValueError(f'Invalid multimodal item at index {i}: {item}. ' f'Expected "{item_type}" to be a direct value or a dict containing "url" or "data".') if item_type == 'image_data': modality = Modality.IMAGE + mm_stats.add_item(modality.value) data = _require_data_src() elif item_type in ('image_url', 'image'): modality = Modality.IMAGE + mm_stats.add_item(modality.value) data_src = _require_data_src() if isinstance(data_src, PIL.Image.Image): data = data_src elif isinstance(data_src, str): - data = load_from_url(data_src, ImageMediaIO(**media_io_kwargs.get('image', {}))) + with _mm_stage(mm_stats, 'media_io', modality.value): + data = load_from_url(data_src, ImageMediaIO(**media_io_kwargs.get('image', {}))) else: + mm_stats.record_failure('parse', modality.value) raise ValueError(f'Invalid multimodal image item at index {i}: {item}. ' - 'Expected a str URL/path/data URL or PIL.Image.Image.') + 'Expected a str URL/path/data URL or PIL.Image.Image.') elif item_type in ('video_url', 'video'): modality = Modality.VIDEO - data, metadata = load_from_url( - _require_data_src(), VideoMediaIO(image_io=ImageMediaIO(), **media_io_kwargs.get('video', {}))) + mm_stats.add_item(modality.value) + data_src = _require_data_src() + with _mm_stage(mm_stats, 'media_io', modality.value): + data, metadata = load_from_url( + data_src, VideoMediaIO(image_io=ImageMediaIO(), **media_io_kwargs.get('video', {}))) item_params['video_metadata'] = metadata elif item_type in ('audio_url', 'audio'): modality = Modality.AUDIO - data = load_from_url(_require_data_src(), AudioMediaIO(**media_io_kwargs.get('audio', {}))) + mm_stats.add_item(modality.value) + data_src = _require_data_src() + with _mm_stage(mm_stats, 'media_io', modality.value): + data = load_from_url(data_src, AudioMediaIO(**media_io_kwargs.get('audio', {}))) elif item_type in ('time_series_url', 'time_series'): modality = Modality.TIME_SERIES - data = load_from_url(_require_data_src(), TimeSeriesMediaIO(**media_io_kwargs.get('time_series', {}))) + mm_stats.add_item(modality.value) + data_src = _require_data_src() + with _mm_stage(mm_stats, 'media_io', modality.value): + data = load_from_url(data_src, TimeSeriesMediaIO(**media_io_kwargs.get('time_series', {}))) else: + mm_stats.record_failure('media_io', 'unknown') raise NotImplementedError(f'unknown type: {item_type}') out_message['content'].append({'type': modality.value, 'data': data, **item_params}) @@ -164,7 +196,8 @@ def _require_data_src(): @staticmethod async def async_parse_multimodal_item(messages: list[dict], - media_io_kwargs: dict[str, Any] | None = None) -> list[dict]: + media_io_kwargs: dict[str, Any] | None = None, + mm_stats: MultimodalStats | None = None) -> list[dict]: """Convert user-input multimodal data into GPT4V message format.""" if isinstance(messages, dict): messages = [messages] @@ -172,11 +205,12 @@ async def async_parse_multimodal_item(messages: list[dict], out_messages = [None] * len(messages) media_io_kwargs = media_io_kwargs or {} + mm_stats = mm_stats or MultimodalStats(enabled=False) loop = asyncio.get_event_loop() await asyncio.gather(*[ loop.run_in_executor(None, MultimodalProcessor._parse_multimodal_item, i, messages, out_messages, - media_io_kwargs) for i in range(len(messages)) + media_io_kwargs, mm_stats) for i in range(len(messages)) ]) return out_messages @@ -190,6 +224,7 @@ async def get_prompt_input(self, chat_template_kwargs: dict | None = None, media_io_kwargs: dict[str, Any] | None = None, mm_processor_kwargs: dict[str, Any] | None = None, + mm_stats: MultimodalStats | None = None, **kwargs): """Process prompt and return prompt string and input_ids. @@ -248,6 +283,7 @@ async def get_prompt_input(self, chat_template_kwargs=chat_template_kwargs, media_io_kwargs=media_io_kwargs, mm_processor_kwargs=mm_processor_kwargs, + mm_stats=mm_stats, **kwargs) else: raise RuntimeError(f'unsupported prompt type: {type(prompt)}') @@ -382,47 +418,59 @@ async def _get_multimodal_prompt_input(self, chat_template_kwargs: dict | None = None, media_io_kwargs: dict[str, Any] | None = None, mm_processor_kwargs: dict[str, Any] | None = None, + mm_stats: MultimodalStats | None = None, **kwargs): """Process multimodal prompt and return processed data for inference engines.""" + mm_stats = mm_stats or MultimodalStats(enabled=False) chat_template = self.chat_template if do_preprocess else BaseChatTemplate() - messages = await self.async_parse_multimodal_item(messages, media_io_kwargs) + messages = await self.async_parse_multimodal_item(messages, media_io_kwargs, mm_stats=mm_stats) if self.backend == 'turbomind': if self.vl_encoder._uses_new_preprocess: - input_prompt = self.vl_encoder.model.get_input_prompt(messages=messages, - chat_template=chat_template, - sequence_start=sequence_start, - chat_template_kwargs=chat_template_kwargs) - results = await self.vl_encoder.preprocess(messages, - input_prompt=input_prompt, - mm_processor_kwargs=mm_processor_kwargs) + with _mm_stage(mm_stats, 'prompt_template'): + input_prompt = self.vl_encoder.model.get_input_prompt(messages=messages, + chat_template=chat_template, + sequence_start=sequence_start, + chat_template_kwargs=chat_template_kwargs) + with _mm_stage(mm_stats, 'hf_processor'): + results = await self.vl_encoder.preprocess(messages, + input_prompt=input_prompt, + mm_processor_kwargs=mm_processor_kwargs) else: - results = await self.vl_encoder.preprocess(messages, mm_processor_kwargs=mm_processor_kwargs) - results = await self.vl_encoder.async_infer(results) - results = await self.vl_encoder.wrap_for_turbomind(messages=results, - chat_template=chat_template, - tokenizer=self.tokenizer, - sequence_start=sequence_start, - tools=tools, - chat_template_kwargs=chat_template_kwargs) + with _mm_stage(mm_stats, 'hf_processor'): + results = await self.vl_encoder.preprocess(messages, + mm_processor_kwargs=mm_processor_kwargs) + with _mm_stage(mm_stats, 'vision_encoder'): + results = await self.vl_encoder.async_infer(results) + with _mm_stage(mm_stats, 'embedding_merge'): + results = await self.vl_encoder.wrap_for_turbomind(messages=results, + chat_template=chat_template, + tokenizer=self.tokenizer, + sequence_start=sequence_start, + tools=tools, + chat_template_kwargs=chat_template_kwargs) elif self.backend == 'pytorch': if self.vl_encoder._uses_new_preprocess: - input_prompt = self.vl_encoder.model.get_input_prompt(messages=messages, - chat_template=chat_template, - sequence_start=sequence_start, - chat_template_kwargs=chat_template_kwargs) - results = await self.vl_encoder.preprocess(messages, - input_prompt=input_prompt, - mm_processor_kwargs=mm_processor_kwargs) + with _mm_stage(mm_stats, 'prompt_template'): + input_prompt = self.vl_encoder.model.get_input_prompt(messages=messages, + chat_template=chat_template, + sequence_start=sequence_start, + chat_template_kwargs=chat_template_kwargs) + with _mm_stage(mm_stats, 'hf_processor'): + results = await self.vl_encoder.preprocess(messages, + input_prompt=input_prompt, + mm_processor_kwargs=mm_processor_kwargs) else: - results = await self.vl_encoder.preprocess(messages, - mm_processor_kwargs=mm_processor_kwargs) - results = await self.vl_encoder.wrap_for_pytorch(messages=results, - chat_template=chat_template, - tokenizer=self.tokenizer, - sequence_start=sequence_start, - tools=tools, - chat_template_kwargs=chat_template_kwargs) + with _mm_stage(mm_stats, 'hf_processor'): + results = await self.vl_encoder.preprocess(messages, + mm_processor_kwargs=mm_processor_kwargs) + with _mm_stage(mm_stats, 'embedding_merge'): + results = await self.vl_encoder.wrap_for_pytorch(messages=results, + chat_template=chat_template, + tokenizer=self.tokenizer, + sequence_start=sequence_start, + tools=tools, + chat_template_kwargs=chat_template_kwargs) return results diff --git a/tests/test_lmdeploy/test_metrics_multimodal.py b/tests/test_lmdeploy/test_metrics_multimodal.py new file mode 100644 index 0000000000..4d6e3d4620 --- /dev/null +++ b/tests/test_lmdeploy/test_metrics_multimodal.py @@ -0,0 +1,81 @@ +import sys + +import pytest + +from lmdeploy.metrics.loggers import PrometheusStatLogger +from lmdeploy.metrics.stats import MultimodalStats +from lmdeploy.serve.processors import MultimodalProcessor +from lmdeploy.vl.constants import Modality + +multimodal_module = sys.modules[MultimodalProcessor.__module__] + + +def test_multimodal_stats_snapshot_and_emit_guard(): + stats = MultimodalStats() + + stats.add_item(Modality.IMAGE.value, count=2) + stats.add_stage('media_io', 0.1, Modality.IMAGE.value) + stats.record_failure('media_io', Modality.IMAGE.value) + stats.finish() + + total_time, stage_times, item_counts, failures = stats.snapshot() + + assert total_time >= 0 + assert item_counts == {'image': 2} + assert stage_times == {('media_io', 'image'): 0.1} + assert failures == {('media_io', 'image'): 1} + assert stats.mark_emitted() + assert not stats.mark_emitted() + + +def test_prometheus_logger_records_multimodal_metrics(): + pytest.importorskip('prometheus_client') + from prometheus_client import REGISTRY, generate_latest + + logger = PrometheusStatLogger('test-model', 128, dp_rank=0) + stats = MultimodalStats() + stats.add_item(Modality.IMAGE.value, count=2) + stats.add_stage('media_io', 0.1, Modality.IMAGE.value) + stats.record_failure('media_io', Modality.IMAGE.value) + stats.finish() + + logger.record_multimodal(stats) + metrics_text = generate_latest(REGISTRY).decode('utf-8') + + assert 'lmdeploy:multimodal_requests_total' in metrics_text + assert 'lmdeploy:multimodal_items_total' in metrics_text + assert 'lmdeploy:multimodal_preprocess_time_seconds' in metrics_text + assert 'lmdeploy:multimodal_stage_time_seconds' in metrics_text + assert 'lmdeploy:multimodal_item_count' in metrics_text + assert 'lmdeploy:multimodal_processing_failures_total' in metrics_text + assert 'modality="image"' in metrics_text + assert 'stage="media_io"' in metrics_text + + +def test_parse_multimodal_item_records_multimodal_stats(monkeypatch): + load_calls = [] + + def fake_load_from_url(data_src, media_io): + load_calls.append((data_src, type(media_io).__name__)) + return f'loaded:{data_src}' + + monkeypatch.setattr(multimodal_module, 'load_from_url', fake_load_from_url) + + messages = [{ + 'role': 'user', + 'content': [{ + 'type': 'image', + 'image': 'file:///tmp/a.png', + }] + }] + stats = MultimodalStats() + parsed = [None] + + MultimodalProcessor._parse_multimodal_item(0, messages, parsed, {}, stats) + _, stage_times, item_counts, failures = stats.snapshot() + + assert parsed[0]['content'][0] == {'type': Modality.IMAGE, 'data': 'loaded:file:///tmp/a.png'} + assert load_calls == [('file:///tmp/a.png', 'ImageMediaIO')] + assert item_counts == {'image': 1} + assert ('media_io', 'image') in stage_times + assert failures == {}