diff --git a/csrc/engine/infer_engine.cpp b/csrc/engine/infer_engine.cpp index db0dfdd4..8f1fcf7f 100644 --- a/csrc/engine/infer_engine.cpp +++ b/csrc/engine/infer_engine.cpp @@ -132,10 +132,21 @@ InferEngine::Input::to_model_input(infinicore::Device device) const { -> std::optional { return t.has_value() ? t.value()->to(device) : t; }; + auto to_device_vec = [&](const std::optional> &vec) + -> std::optional> { + if (!vec.has_value()) { + return vec; + } + std::vector result; + result.reserve(vec->size()); + for (const auto &t : vec.value()) { + result.push_back(t->to(device)); + } + return result; + }; infinilm::InfinilmModel::Input input = { to_device(input_ids), // @todo: on device in the future - to_device(pixel_values), to_device(position_ids), to_device(past_sequence_lengths), // @todo: on device in the future to_device(total_sequence_lengths), @@ -143,8 +154,9 @@ InferEngine::Input::to_model_input(infinicore::Device device) const { to_device(cu_seqlens), to_device(block_tables), to_device(slot_mapping), - to_device(image_bound), - to_device(tgt_sizes), + to_device_vec(pixel_values), + to_device_vec(image_bound), + to_device_vec(tgt_sizes), }; infinilm::global_state::get_forward_context().attn_metadata = { @@ -153,8 +165,11 @@ InferEngine::Input::to_model_input(infinicore::Device device) const { input.input_offsets, input.cu_seqlens, input.block_tables, - input.slot_mapping, - }; + input.slot_mapping}; + + global_state::get_forward_context().mm_metadata = { + image_req_ids}; + return input; } diff --git a/csrc/engine/rank_worker.hpp b/csrc/engine/rank_worker.hpp index f6adcf47..7e0d427c 100644 --- a/csrc/engine/rank_worker.hpp +++ b/csrc/engine/rank_worker.hpp @@ -36,8 +36,6 @@ class RankWorker { struct Input { /// Token IDs tensor of shape `[batch, seq_len]`. std::optional input_ids; - /// Image pixel values for multi-modal models. - std::optional pixel_values; /// Position IDs tensor of shape `[batch, seq_len]` or `[seq_len]`. std::optional position_ids; /// Past Lengths of cached sequence for each request, of shape `[num_requests]`. @@ -52,10 +50,14 @@ class RankWorker { std::optional block_tables; /// Slot ids for each token `[seq]`. Used for paged cache. std::optional slot_mapping; + /// Image pixel values for multi-modal models. + std::optional> pixel_values; /// Image placeholder bounds for MiniCPM-V style replacement. - std::optional image_bound; + std::optional> image_bound; /// Target patch sizes for each image (MiniCPM-V). - std::optional tgt_sizes; + std::optional> tgt_sizes; + /// req_id for each pixel_values among a batch + std::optional> image_req_ids; float temperature{1}; diff --git a/csrc/global_state/forward_context.hpp b/csrc/global_state/forward_context.hpp index a7ab6f86..2568fc7e 100644 --- a/csrc/global_state/forward_context.hpp +++ b/csrc/global_state/forward_context.hpp @@ -40,8 +40,13 @@ struct AttentionMetadata { input.slot_mapping) {} }; +struct MultiModalMetadata { + std::optional> image_req_ids; +}; + struct ForwardContext { AttentionMetadata attn_metadata; + MultiModalMetadata mm_metadata; std::vector kv_cache_vec; }; diff --git a/csrc/models/infinilm_model.hpp b/csrc/models/infinilm_model.hpp index 7570a6f2..ca57df8c 100644 --- a/csrc/models/infinilm_model.hpp +++ b/csrc/models/infinilm_model.hpp @@ -21,9 +21,6 @@ class InfinilmModel : public infinicore::nn::Module { struct Input { /// Token IDs tensor of shape `[batch, seq_len]`. std::optional input_ids; - /// Image pixel values for multi-modal models. - /// Shape is model-specific (e.g. LLaVA: [batch, 3, H, W], MiniCPM-V: [batch, 3, patch, seq_len * patch]). - std::optional pixel_values; /// Position IDs tensor of shape `[batch, seq_len]` or `[seq_len]`. std::optional position_ids; /// Past Lengths of cached sequence for each request, of shape `[num_requests]`. @@ -38,12 +35,15 @@ class InfinilmModel : public infinicore::nn::Module { std::optional block_tables; /// Slot ids for each token `[seq]`. Used for paged cache. std::optional slot_mapping; + /// Image pixel values for multi-modal models. + /// Vector of tensors. Shape is model-specific (e.g. LLaVA: [batch, 3, H, W], MiniCPM-V: [n_patch, 3, filter_H, H * W / filter_H]). + std::optional> pixel_values; /// Image placeholder bounds for MiniCPM-V style replacement. - /// Tensor shape: [batch, max_ranges, 2] (start, end). - std::optional image_bound; + /// Vector of tensors shape: [n_patch, 2]. + std::optional> image_bound; /// Target patch sizes for each image (MiniCPM-V). - /// Tensor shape: [batch, 2] or [batch, max_slices, 2] if pre-flattened. - std::optional tgt_sizes; + /// Vector of tensors shape: [n_path, 2] if pre-flattened. + std::optional> tgt_sizes; }; struct Output { diff --git a/csrc/models/minicpmv/minicpmv_model.cpp b/csrc/models/minicpmv/minicpmv_model.cpp index c372f148..0afd6eb4 100644 --- a/csrc/models/minicpmv/minicpmv_model.cpp +++ b/csrc/models/minicpmv/minicpmv_model.cpp @@ -70,36 +70,30 @@ InfinilmModel::Output MiniCPMVModel::forward(const InfinilmModel::Input &input) } auto input_ids = input.input_ids.value(); - if (input.pixel_values.has_value() && input_ids->size(1) > 1) { - if (!input.image_bound.has_value()) { - throw std::runtime_error("MiniCPMVModel: image_bound required for multimodal input"); + if (input.pixel_values.has_value() && input.pixel_values.value().size() > 0) { + if (!input.image_bound.has_value() or !input.tgt_sizes.has_value()) { + throw std::runtime_error("MiniCPMVModel: image_bound and tgt_sizes must be provided with pixel_values"); + } + if (input.pixel_values->size() != input.image_bound->size() || input.pixel_values->size() != input.tgt_sizes->size()) { + throw std::runtime_error("MiniCPMVModel: pixel_values, image_bound and tgt_sizes must have the same number of elements"); } - auto pixel_values = input.pixel_values.value(); - auto vision_embedding = vpm_->forward(pixel_values, input.tgt_sizes); - auto vision_hidden = resampler_->forward(vision_embedding, input.tgt_sizes); auto inputs_embeds = llm_->model().embed_tokens(input_ids); - auto merged_embeds = replace_embeddings(inputs_embeds, vision_hidden, input.image_bound.value()); - - infinicore::Tensor position_ids; - if (input.position_ids.has_value()) { - position_ids = input.position_ids.value(); - } else { - auto batch = merged_embeds->size(0); - auto seq_len = merged_embeds->size(1); - auto pos_cpu = infinicore::Tensor::zeros({batch, seq_len}, infinicore::DataType::I64, infinicore::Device::cpu()); - auto *pos_ptr = reinterpret_cast(pos_cpu->data()); - for (size_t b = 0; b < batch; ++b) { - for (size_t i = 0; i < seq_len; ++i) { - pos_ptr[b * seq_len + i] = static_cast(i); - } - } - position_ids = pos_cpu->to(merged_embeds->device()); + + // inputs_embeds concat tokens from all requests, while images are processed per request + // slice inputs_embeds using request offsets to get the embedding of each request + infinicore::Tensor input_offsets_cpu = input.input_offsets.value()->to(infinicore::Device::cpu()); + int32_t *offsets = (int32_t *)(input_offsets_cpu->data()); + for (size_t i : global_state::get_forward_context().mm_metadata.image_req_ids.value()) { + auto pixel_values = input.pixel_values.value().at(i); + auto vision_embedding = vpm_->forward(pixel_values, input.tgt_sizes.value().at(i)); + auto vision_hidden = resampler_->forward(vision_embedding, input.tgt_sizes.value().at(i)); + inputs_embeds = replace_embeddings(inputs_embeds->narrow({{1, size_t(offsets[i]), size_t(offsets[i + 1] - offsets[i])}}), vision_hidden, input.image_bound.value().at(i)); } auto hidden_states = llm_->model().forward_embeds( - merged_embeds, - position_ids); + inputs_embeds, + input.position_ids.value()); auto logits = llm_->logits_from_hidden(hidden_states); return {logits}; diff --git a/csrc/models/minicpmv/resampler.cpp b/csrc/models/minicpmv/resampler.cpp index 47bd8336..dcdc1da5 100644 --- a/csrc/models/minicpmv/resampler.cpp +++ b/csrc/models/minicpmv/resampler.cpp @@ -141,7 +141,7 @@ Resampler::Resampler(size_t num_queries, } infinicore::Tensor Resampler::forward(const infinicore::Tensor &x, - const std::optional &tgt_sizes) const { + const infinicore::Tensor &tgt_sizes) const { auto batch_size = x->size(0); auto seq_len = x->size(1); @@ -153,12 +153,11 @@ infinicore::Tensor Resampler::forward(const infinicore::Tensor &x, // Build positional embeddings on CPU std::vector tgt_sizes_host; - if (tgt_sizes.has_value()) { - auto tgt_cpu = tgt_sizes.value()->to(infinicore::Device::cpu()); - auto n = tgt_cpu->numel(); - tgt_sizes_host.resize(n); - std::memcpy(tgt_sizes_host.data(), tgt_cpu->data(), n * sizeof(int64_t)); - } + + auto tgt_cpu = tgt_sizes->to(infinicore::Device::cpu()); + auto n = tgt_cpu->numel(); + tgt_sizes_host.resize(n); + std::memcpy(tgt_sizes_host.data(), tgt_cpu->data(), n * sizeof(int64_t)); auto pos_cpu = infinicore::Tensor::zeros({batch_size, seq_len, embed_dim_}, kv->dtype(), infinicore::Device::cpu()); auto *pos_ptr = reinterpret_cast(pos_cpu->data()); diff --git a/csrc/models/minicpmv/resampler.hpp b/csrc/models/minicpmv/resampler.hpp index 7cb5d6c5..01f9efc2 100644 --- a/csrc/models/minicpmv/resampler.hpp +++ b/csrc/models/minicpmv/resampler.hpp @@ -43,7 +43,7 @@ class Resampler : public infinicore::nn::Module { const infinicore::Device &device); infinicore::Tensor forward(const infinicore::Tensor &x, - const std::optional &tgt_sizes) const; + const infinicore::Tensor &tgt_sizes) const; private: size_t num_queries_; diff --git a/csrc/models/minicpmv/siglip_vision.cpp b/csrc/models/minicpmv/siglip_vision.cpp index ea903c10..d6b17bbd 100644 --- a/csrc/models/minicpmv/siglip_vision.cpp +++ b/csrc/models/minicpmv/siglip_vision.cpp @@ -36,7 +36,7 @@ SiglipVisionEmbeddings::SiglipVisionEmbeddings(const nlohmann::json &config, } infinicore::Tensor SiglipVisionEmbeddings::forward(const infinicore::Tensor &pixel_values, - const std::optional &tgt_sizes) const { + const infinicore::Tensor &tgt_sizes) const { auto patch_embeds = patch_embedding_->forward(pixel_values); auto batch_size = patch_embeds->size(0); auto seq_len = patch_embeds->size(2) * patch_embeds->size(3); @@ -50,12 +50,11 @@ infinicore::Tensor SiglipVisionEmbeddings::forward(const infinicore::Tensor &pix const size_t num_patches_per_side = static_cast(std::sqrt(static_cast(num_positions_))); std::vector tgt_sizes_host; - if (tgt_sizes.has_value()) { - auto tgt_cpu = tgt_sizes.value()->to(infinicore::Device::cpu()); - auto n = tgt_cpu->numel(); - tgt_sizes_host.resize(n); - std::memcpy(tgt_sizes_host.data(), tgt_cpu->data(), n * sizeof(int64_t)); - } + + auto tgt_cpu = tgt_sizes->to(infinicore::Device::cpu()); + auto n = tgt_cpu->numel(); + tgt_sizes_host.resize(n); + std::memcpy(tgt_sizes_host.data(), tgt_cpu->data(), n * sizeof(int64_t)); for (size_t b = 0; b < batch_size; ++b) { size_t nb_h = num_patches_per_side; @@ -211,7 +210,7 @@ SiglipVisionModel::SiglipVisionModel(const nlohmann::json &config, } infinicore::Tensor SiglipVisionModel::forward(const infinicore::Tensor &pixel_values, - const std::optional &tgt_sizes) const { + const infinicore::Tensor &tgt_sizes) const { auto hidden_states = embeddings_->forward(pixel_values, tgt_sizes); hidden_states = encoder_->forward(hidden_states, std::nullopt); return post_layernorm_->forward(hidden_states); diff --git a/csrc/models/minicpmv/siglip_vision.hpp b/csrc/models/minicpmv/siglip_vision.hpp index 6e075d3b..8cf0230d 100644 --- a/csrc/models/minicpmv/siglip_vision.hpp +++ b/csrc/models/minicpmv/siglip_vision.hpp @@ -36,7 +36,7 @@ class SiglipVisionEmbeddings : public infinicore::nn::Module { const infinicore::Device &device); infinicore::Tensor forward(const infinicore::Tensor &pixel_values, - const std::optional &tgt_sizes) const; + const infinicore::Tensor &tgt_sizes) const; private: size_t hidden_size_; @@ -119,7 +119,7 @@ class SiglipVisionModel : public infinicore::nn::Module { bool drop_last_layer); infinicore::Tensor forward(const infinicore::Tensor &pixel_values, - const std::optional &tgt_sizes) const; + const infinicore::Tensor &tgt_sizes) const; private: nlohmann::json config_; diff --git a/csrc/pybind11/engine/engine.hpp b/csrc/pybind11/engine/engine.hpp index 2741c9cd..1b59a04b 100644 --- a/csrc/pybind11/engine/engine.hpp +++ b/csrc/pybind11/engine/engine.hpp @@ -67,8 +67,10 @@ inline void bind_infer_engine(py::module &m) { return state_dict_tp_all; }) .def("process_weights_after_loading", &InferEngine::process_weights_after_loading, "Process the weights after loading on all workers (e.g., for quantization)") - .def("forward", [](InferEngine &self, const InferEngine::Input &input) -> InferEngine::Output { return self.forward(input); }, "Run inference on all ranks with arbitrary arguments") - .def("reset_cache", [](InferEngine &self, std::shared_ptr cfg) { self.reset_cache(cfg ? cfg.get() : nullptr); }, py::arg("cache_config") = py::none()) + .def( + "forward", [](InferEngine &self, const InferEngine::Input &input) -> InferEngine::Output { return self.forward(input); }, "Run inference on all ranks with arbitrary arguments") + .def( + "reset_cache", [](InferEngine &self, std::shared_ptr cfg) { self.reset_cache(cfg ? cfg.get() : nullptr); }, py::arg("cache_config") = py::none()) .def("get_cache_config", [](const InferEngine &self) -> std::shared_ptr { auto cfg = self.get_cache_config(); return cfg ? std::shared_ptr(cfg->unique_copy()) : nullptr; }) @@ -114,8 +116,10 @@ inline void bind_infer_engine(py::module &m) { return state_dict_tp_all; }) .def("process_weights_after_loading", &InferEngine::process_weights_after_loading, "Process the weights after loading on all workers (e.g., for quantization)") - .def("forward", [](InferEngine &self, const InferEngine::Input &input) -> InferEngine::Output { return self.forward(input); }, "Run inference on all ranks with arbitrary arguments") - .def("reset_cache", [](InferEngine &self, std::shared_ptr cfg) { self.reset_cache(cfg ? cfg.get() : nullptr); }, py::arg("cache_config") = py::none()) + .def( + "forward", [](InferEngine &self, const InferEngine::Input &input) -> InferEngine::Output { return self.forward(input); }, "Run inference on all ranks with arbitrary arguments") + .def( + "reset_cache", [](InferEngine &self, std::shared_ptr cfg) { self.reset_cache(cfg ? cfg.get() : nullptr); }, py::arg("cache_config") = py::none()) .def("get_cache_config", [](const InferEngine &self) { auto cfg = self.get_cache_config(); return std::shared_ptr(std::move(cfg->unique_copy())); }) @@ -125,7 +129,6 @@ inline void bind_infer_engine(py::module &m) { .def( py::init([]( std::optional input_ids, - std::optional pixel_values, std::optional position_ids, std::optional past_sequence_lengths, std::optional total_sequence_lengths, @@ -133,12 +136,13 @@ inline void bind_infer_engine(py::module &m) { std::optional cu_seqlens, std::optional block_tables, std::optional slot_mapping, - std::optional image_bound, - std::optional tgt_sizes, + std::optional> pixel_values, + std::optional> image_bound, + std::optional> tgt_sizes, + std::optional> image_req_ids, py::kwargs kwargs) { InferEngine::Input input{ std::move(input_ids), - std::move(pixel_values), std::move(position_ids), std::move(past_sequence_lengths), std::move(total_sequence_lengths), @@ -146,8 +150,10 @@ inline void bind_infer_engine(py::module &m) { std::move(cu_seqlens), std::move(block_tables), std::move(slot_mapping), + std::move(pixel_values), std::move(image_bound), std::move(tgt_sizes), + std::move(image_req_ids), }; // Explicit defaults @@ -182,7 +188,6 @@ inline void bind_infer_engine(py::module &m) { return input; }), py::arg("input_ids") = std::nullopt, - py::arg("pixel_values") = std::nullopt, py::arg("position_ids") = std::nullopt, py::arg("past_sequence_lengths") = std::nullopt, py::arg("total_sequence_lengths") = std::nullopt, @@ -190,10 +195,11 @@ inline void bind_infer_engine(py::module &m) { py::arg("cu_seqlens") = std::nullopt, py::arg("block_tables") = std::nullopt, py::arg("slot_mapping") = std::nullopt, + py::arg("pixel_values") = std::nullopt, py::arg("image_bound") = std::nullopt, - py::arg("tgt_sizes") = std::nullopt) + py::arg("tgt_sizes") = std::nullopt, + py::arg("image_req_ids") = std::nullopt) .def_readwrite("input_ids", &InferEngine::Input::input_ids) - .def_readwrite("pixel_values", &InferEngine::Input::pixel_values) .def_readwrite("position_ids", &InferEngine::Input::position_ids) .def_readwrite("past_sequence_lengths", &InferEngine::Input::past_sequence_lengths) .def_readwrite("total_sequence_lengths", &InferEngine::Input::total_sequence_lengths) @@ -201,8 +207,10 @@ inline void bind_infer_engine(py::module &m) { .def_readwrite("cu_seqlens", &InferEngine::Input::cu_seqlens) .def_readwrite("block_tables", &InferEngine::Input::block_tables) .def_readwrite("slot_mapping", &InferEngine::Input::slot_mapping) + .def_readwrite("pixel_values", &InferEngine::Input::pixel_values) .def_readwrite("image_bound", &InferEngine::Input::image_bound) .def_readwrite("tgt_sizes", &InferEngine::Input::tgt_sizes) + .def_readwrite("image_req_ids", &InferEngine::Input::image_req_ids) .def_readwrite("temperature", &InferEngine::Input::temperature) .def_readwrite("top_k", &InferEngine::Input::top_k) .def_readwrite("top_p", &InferEngine::Input::top_p); diff --git a/python/infinilm/infer_engine.py b/python/infinilm/infer_engine.py index 13bb18a1..43930f10 100644 --- a/python/infinilm/infer_engine.py +++ b/python/infinilm/infer_engine.py @@ -96,7 +96,6 @@ def forward( self, input_ids, *, - pixel_values=None, position_ids=None, past_kv_lengths=None, total_kv_lengths=None, @@ -104,8 +103,10 @@ def forward( cu_seqlens=None, block_tables=None, slot_mapping=None, + pixel_values=None, image_bound=None, tgt_sizes=None, + image_req_ids=None, temperature=None, top_k=None, top_p=None, @@ -113,9 +114,6 @@ def forward( try: # TODO: Remove `_underlying` and simplify the corresponding code. input_ids = input_ids._underlying if input_ids is not None else None - pixel_values = ( - pixel_values._underlying if pixel_values is not None else None - ) position_ids = ( position_ids._underlying if position_ids is not None else None ) @@ -135,15 +133,25 @@ def forward( slot_mapping = ( slot_mapping._underlying if slot_mapping is not None else None ) - image_bound = image_bound._underlying if image_bound is not None else None - tgt_sizes = tgt_sizes._underlying if tgt_sizes is not None else None + + def convert_tensor_list(tensor_list_): + if tensor_list_ is None: + return None + if not isinstance(tensor_list_, list): + tensor_list_ = [tensor_list_] + if len(tensor_list_) == 0: + return None + return [tensor._underlying for tensor in tensor_list_] + + pixel_values = convert_tensor_list(pixel_values) + image_bound = convert_tensor_list(image_bound) + tgt_sizes = convert_tensor_list(tgt_sizes) return infinicore.Tensor( super() .forward( super().Input( input_ids, - pixel_values=pixel_values, position_ids=position_ids, past_sequence_lengths=past_kv_lengths, total_sequence_lengths=total_kv_lengths, @@ -151,8 +159,10 @@ def forward( cu_seqlens=cu_seqlens, block_tables=block_tables, slot_mapping=slot_mapping, + pixel_values=pixel_values, image_bound=image_bound, tgt_sizes=tgt_sizes, + image_req_ids=image_req_ids, temperature=temperature, top_k=top_k, top_p=top_p, @@ -338,6 +348,6 @@ def state_dict_keyname(self): def load_state_dict(self, state_dict, strict=None): for name, param in state_dict.items(): super().load_param(name, param._underlying) - + def process_weights_after_loading(self): super().process_weights_after_loading() diff --git a/python/infinilm/processors/minicpmv_processor.py b/python/infinilm/processors/minicpmv_processor.py index dd9d756c..9934e802 100644 --- a/python/infinilm/processors/minicpmv_processor.py +++ b/python/infinilm/processors/minicpmv_processor.py @@ -124,7 +124,7 @@ def build_model_inputs( ) current_offset = 0 - for req in scheduler_output.scheduled_requests: + for req_id, req in enumerate(scheduler_output.scheduled_requests): num_cached = req.num_cached_tokens if scheduler_output.is_prefill: # Prefill phase @@ -160,10 +160,7 @@ def build_model_inputs( ) # if all patches are already cached, skip processing multimodal inputs and return text-only inputs for this request - if ( - num_cached_patch - < req.processed_inputs["image_bound"][0].shape[0] - ): + if num_cached_patch < len(req.processed_inputs["pixel_values"]): # 1. pixel_values all_pixel_values = [] pixel_values = req.processed_inputs["pixel_values"] @@ -217,9 +214,17 @@ def build_model_inputs( bound[i, : len(bnd), :] = bnd image_bound_infini = infinicore.from_torch(bound) - mm_data["pixel_values"] = pixel_values_infini - mm_data["tgt_sizes"] = tgt_sizes_infini - mm_data["image_bound"] = image_bound_infini + + def append_mm_data(mm_data__: dict, key__: str, value__): + if mm_data__.get(key__) is None: + mm_data[key__] = [value__] + else: + mm_data[key__].append(value__) + + append_mm_data(mm_data, "pixel_values", pixel_values_infini) + append_mm_data(mm_data, "tgt_sizes", tgt_sizes_infini) + append_mm_data(mm_data, "image_bound", image_bound_infini) + append_mm_data(mm_data, "image_req_ids", req_id) else: # Decode phase diff --git a/scripts/test_perf.py b/scripts/test_perf.py index 6a33d8f0..f1be6033 100644 --- a/scripts/test_perf.py +++ b/scripts/test_perf.py @@ -1,8 +1,11 @@ import asyncio +from pathlib import Path import time from openai import AsyncOpenAI import argparse import random +import subprocess + PROMPTS = [ "如果猫能写诗,它们会写些什么?", @@ -27,13 +30,95 @@ "想象一下,如果每个人都能读懂他人的思想。", ] +IMAGE_PROMPTS = [ + "请描述一下图片里的内容。", + "图片里有人吗?", + "请结合图片,讲一个小故事。", +] + NUM_REQUESTS = 64 CONCURRENCY = 20 API_URL = "http://127.0.0.1:8000" -MODEL = "FM9G-7B" +MODEL = "" + + +class ImageCollector: + def __init__(self, dir_path: str, port=None): + self.dir_path = Path(dir_path).resolve() + + if not self.dir_path.is_dir(): + raise ValueError(f"Not a valid directory: {self.dir_path}") + + self.image_files = [ + file.resolve() + for file in self.dir_path.rglob("*") + if file.is_file() and file.suffix.lower() in [".jpg", ".jpeg"] + ] + + assert len(self.image_files) > 0, "No image file found in provided directory!" + + self.host = "127.0.0.1" + self.port = port + self.server_process = None + + # Only start HTTP server if BOTH host and port are provided + self.use_http = self.host is not None and self.port is not None + + if self.use_http: + self._start_server() + + def _start_server(self): + print( + f"[ImageCollector] Starting image HTTP server...\n" + f" Directory: {self.dir_path}\n" + f" URL: http://{self.host}:{self.port}\n" + ) + self.server_process = subprocess.Popen( + [ + "python", + "-m", + "http.server", + str(self.port), + "--bind", + self.host, + ], + cwd=str(self.dir_path), + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + time.sleep(0.5) -async def benchmark_user(client, semaphore, queue, results, user_id, verbose): + def stop_server(self): + if self.server_process is not None: + self.server_process.terminate() + + try: + self.server_process.wait(timeout=3) + except subprocess.TimeoutExpired: + self.server_process.kill() + + self.server_process = None + + def __del__(self): + self.stop_server() + + def random_image_url(self): + image_path = random.choice(self.image_files) + + # Return local absolute path + if not self.use_http: + return str(image_path) + + # Return HTTP URL + relative_path = image_path.relative_to(self.dir_path) + + return f"http://{self.host}:{self.port}/{relative_path.as_posix()}" + + +async def benchmark_user( + client, semaphore, queue, results, user_id, verbose, image_collector=None +): while True: async with semaphore: task_id = await queue.get() @@ -41,14 +126,33 @@ async def benchmark_user(client, semaphore, queue, results, user_id, verbose): queue.task_done() break - question = random.choice(PROMPTS) try: print(f"🚀 User#{user_id} Sending request #{task_id}") + messages = None + if image_collector is None: + messages = [{"role": "user", "content": random.choice(PROMPTS)}] + else: + messages = [ + { + "role": "user", + "content": [ + { + "type": "image_url", + "image_url": { + "url": image_collector.random_image_url() + }, + }, + {"type": "text", "text": random.choice(IMAGE_PROMPTS)}, + ], + } + ] + + print(messages) start_time = time.time() stream = await client.chat.completions.create( model=MODEL, - messages=[{"role": "user", "content": question}], + messages=messages, stream=True, ) @@ -97,7 +201,7 @@ async def benchmark_user(client, semaphore, queue, results, user_id, verbose): print(f" 📏 平均 token 解码时间: {ms_per_token:.2f} ms/token") else: print(f" 📏 平均 token 解码时间: N/A (no token generated)") - print(f" ❓ 提问: {question}") + print(f" ❓ 提问: {messages}") print(f" 💬 回答: {answer}\n") queue.task_done() @@ -108,7 +212,7 @@ async def benchmark_user(client, semaphore, queue, results, user_id, verbose): queue.task_done() -async def run_benchmark(verbose=False): +async def run_benchmark(verbose=False, image_collector=None): client = AsyncOpenAI(base_url=API_URL, api_key="default") semaphore = asyncio.Semaphore(CONCURRENCY) queue = asyncio.Queue() @@ -120,7 +224,9 @@ async def run_benchmark(verbose=False): users = [ asyncio.create_task( - benchmark_user(client, semaphore, queue, results, user_id, verbose) + benchmark_user( + client, semaphore, queue, results, user_id, verbose, image_collector + ) ) for user_id in range(CONCURRENCY) ] @@ -175,6 +281,17 @@ async def run_benchmark(verbose=False): if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--verbose", action="store_true") + parser.add_argument("--image-dir", type=str, default=None) + parser.add_argument("--mm-port", type=str, default=None) + parser.add_argument("--api-url", type=str, default="127.0.0.1:8000") + parser.add_argument("--model", type=str, default="") args = parser.parse_args() - asyncio.run(run_benchmark(args.verbose)) + API_URL = "http://" + args.api_url + MODEL = args.model + + image_collector = None + if args.image_dir is not None: + image_collector = ImageCollector(args.image_dir, port=args.mm_port) + + asyncio.run(run_benchmark(args.verbose, image_collector)) diff --git a/test/service/request.py b/test/service/request.py index d5fa008e..881771fc 100644 --- a/test/service/request.py +++ b/test/service/request.py @@ -6,10 +6,8 @@ def get_args(): - # 1. 创建参数解析器(支持重复 --message 构建列表) - parser = argparse.ArgumentParser(description="向推理服务发送 OpenAI 格式请求") + parser = argparse.ArgumentParser(description="Send request in OpenAI format") - # 核心:重复 --content 自动拼成列表 parser.add_argument( "--system", type=str, @@ -23,15 +21,13 @@ def get_args(): help="start with content type['text', 'image_url'] and colon, e.g. text:hello or image_url:http://example.com/image.jpg", ) - # 目标服务地址与端口 parser.add_argument( - "--port", type=int, default=8000, help="推理服务端口,默认 8000" + "--port", type=int, default=8000, help="Infer server port, default 8000" ) parser.add_argument( - "--host", default="127.0.0.1", help="推理服务地址,默认 127.0.0.1" + "--host", default="127.0.0.1", help="Infer server url, default 127.0.0.1" ) - # 解析参数 return parser.parse_args()