diff --git a/README.md b/README.md index fde56ce79..9dc528ec0 100644 --- a/README.md +++ b/README.md @@ -89,6 +89,11 @@ python scripts/test_perf.py --verbose ``` + - 单请求推理服务测试 + ```bash + python test/service/request.py --content="text:Image 1:" --content="image_url:xxx.jpg" --content="text:Image 2:" --content="image_url:xxxx.jpg" --content="text:Compare the 2 images." + ``` + - 运行推理基准测试(C-Eval/MMLU) ```bash diff --git a/csrc/engine/infer_engine.cpp b/csrc/engine/infer_engine.cpp index 2e34c6cca..987add6b7 100644 --- a/csrc/engine/infer_engine.cpp +++ b/csrc/engine/infer_engine.cpp @@ -89,10 +89,21 @@ InferEngine::Input::to_model_input(infinicore::Device device) const { -> std::optional { return t.has_value() ? t.value()->to(device) : t; }; + auto to_device_vec = [&](const std::optional> &vec) + -> std::optional> { + if (!vec.has_value()) { + return vec; + } + std::vector result; + result.reserve(vec->size()); + for (const auto &t : vec.value()) { + result.push_back(t->to(device)); + } + return result; + }; infinilm::InfinilmModel::Input input = { to_device(input_ids), // @todo: on device in the future - to_device(pixel_values), to_device(position_ids), to_device(past_sequence_lengths), // @todo: on device in the future to_device(total_sequence_lengths), @@ -100,8 +111,9 @@ InferEngine::Input::to_model_input(infinicore::Device device) const { to_device(cu_seqlens), to_device(block_tables), to_device(slot_mapping), - to_device(image_bound), - to_device(tgt_sizes), + to_device_vec(pixel_values), + to_device_vec(image_bound), + to_device_vec(tgt_sizes), }; infinilm::global_state::get_forward_context().attn_metadata = { @@ -110,8 +122,11 @@ InferEngine::Input::to_model_input(infinicore::Device device) const { input.input_offsets, input.cu_seqlens, input.block_tables, - input.slot_mapping, - }; + input.slot_mapping}; + + global_state::get_forward_context().mm_metadata = { + image_req_ids}; + return input; } diff --git a/csrc/engine/rank_worker.hpp b/csrc/engine/rank_worker.hpp index 027ebc1cc..288024e6a 100644 --- a/csrc/engine/rank_worker.hpp +++ b/csrc/engine/rank_worker.hpp @@ -36,8 +36,6 @@ class RankWorker { struct Input { /// Token IDs tensor of shape `[batch, seq_len]`. std::optional input_ids; - /// Image pixel values for multi-modal models. - std::optional pixel_values; /// Position IDs tensor of shape `[batch, seq_len]` or `[seq_len]`. std::optional position_ids; /// Past Lengths of cached sequence for each request, of shape `[num_requests]`. @@ -52,10 +50,14 @@ class RankWorker { std::optional block_tables; /// Slot ids for each token `[seq]`. Used for paged cache. std::optional slot_mapping; + /// Image pixel values for multi-modal models. + std::optional> pixel_values; /// Image placeholder bounds for MiniCPM-V style replacement. - std::optional image_bound; + std::optional> image_bound; /// Target patch sizes for each image (MiniCPM-V). - std::optional tgt_sizes; + std::optional> tgt_sizes; + /// req_id for each pixel_values among a batch + std::optional> image_req_ids; float temperature{1}; diff --git a/csrc/global_state/forward_context.hpp b/csrc/global_state/forward_context.hpp index a7ab6f862..2568fc7ee 100644 --- a/csrc/global_state/forward_context.hpp +++ b/csrc/global_state/forward_context.hpp @@ -40,8 +40,13 @@ struct AttentionMetadata { input.slot_mapping) {} }; +struct MultiModalMetadata { + std::optional> image_req_ids; +}; + struct ForwardContext { AttentionMetadata attn_metadata; + MultiModalMetadata mm_metadata; std::vector kv_cache_vec; }; diff --git a/csrc/layers/linear/fused_linear.cpp b/csrc/layers/linear/fused_linear.cpp index 04d0ad316..2c0520901 100644 --- a/csrc/layers/linear/fused_linear.cpp +++ b/csrc/layers/linear/fused_linear.cpp @@ -31,14 +31,14 @@ QKVParallelLinear::QKVParallelLinear(size_t hidden_size, const infinicore::Device &device, engine::distributed::RankInfo rank_info) : infinilm::nn::ColumnParallelLinear( - hidden_size, - calculate_out_feature_size(num_q_head, q_dim, num_k_head, k_dim, num_v_head, v_dim, rank_info), - quantization, - (q_bias || k_bias || v_bias), - dtype, - device, - rank_info.tp_rank, - rank_info.tp_size), + hidden_size, + calculate_out_feature_size(num_q_head, q_dim, num_k_head, k_dim, num_v_head, v_dim, rank_info), + quantization == nullptr ? std::make_shared() : quantization, + (q_bias || k_bias || v_bias), + dtype, + device, + rank_info.tp_rank, + rank_info.tp_size), q_dim_(q_dim), k_dim_(k_dim), v_dim_(v_dim), @@ -120,7 +120,17 @@ GateUpParallelLinear::GateUpParallelLinear(size_t hidden_size, size_t intermedia std::shared_ptr quantization, const infinicore::DataType &dtype, const infinicore::Device &device, engine::distributed::RankInfo rank_info) - : infinilm::nn::ColumnParallelLinear(hidden_size, intermediate_size * 2, quantization, gate_bias || up_bias, dtype, device, rank_info.tp_rank, rank_info.tp_size), gate_bias_(gate_bias), up_bias_(up_bias) { + : infinilm::nn::ColumnParallelLinear( + hidden_size, + intermediate_size * 2, + quantization == nullptr ? std::make_shared() : quantization, + gate_bias || up_bias, + dtype, + device, + rank_info.tp_rank, + rank_info.tp_size), + gate_bias_(gate_bias), + up_bias_(up_bias) { if (gate_bias_ != up_bias_) { throw std::runtime_error("Not supported yet: gate_bias and up_bias should be given at the same time"); } diff --git a/csrc/layers/linear/fused_linear.hpp b/csrc/layers/linear/fused_linear.hpp index 92ec3b909..6e4a34856 100644 --- a/csrc/layers/linear/fused_linear.hpp +++ b/csrc/layers/linear/fused_linear.hpp @@ -1,7 +1,7 @@ #pragma once #include "../../engine/distributed/communication_group.hpp" -#include "linear.hpp" #include "../quantization/quantization.hpp" +#include "linear.hpp" #include namespace infinilm::layers::linear { @@ -13,7 +13,7 @@ class QKVParallelLinear : public infinilm::nn::ColumnParallelLinear { size_t q_dim, size_t k_dim, size_t v_dim, size_t num_q_head, size_t num_k_head, size_t num_v_head, bool q_bias, bool k_bias, bool v_bias, - std::shared_ptr quantization, + std::shared_ptr quantization = nullptr, const infinicore::DataType &dtype = infinicore::DataType::F32, const infinicore::Device &device = infinicore::Device(), engine::distributed::RankInfo rank_info = engine::distributed::RankInfo()); @@ -21,7 +21,7 @@ class QKVParallelLinear : public infinilm::nn::ColumnParallelLinear { explicit QKVParallelLinear(size_t hidden_size, size_t head_dim, size_t num_q_head, size_t num_kv_head, - std::shared_ptr quantization, + std::shared_ptr quantization = nullptr, bool bias = false, const infinicore::DataType &dtype = infinicore::DataType::F32, const infinicore::Device &device = infinicore::Device(), @@ -32,7 +32,7 @@ class QKVParallelLinear : public infinilm::nn::ColumnParallelLinear { size_t num_q_head, size_t num_kv_head, const std::string &q_name, const std::string &k_name, const std::string &v_name, RegisterParamFn register_fn, - std::shared_ptr quantization, + std::shared_ptr quantization = nullptr, bool bias = false, const infinicore::DataType &dtype = infinicore::DataType::F32, const infinicore::Device &device = infinicore::Device(), @@ -84,21 +84,22 @@ class QKVParallelLinear : public infinilm::nn::ColumnParallelLinear { class GateUpParallelLinear : public infinilm::nn::ColumnParallelLinear { public: - GateUpParallelLinear(size_t hidden_size, size_t intermediate_size, std::shared_ptr quantization, + GateUpParallelLinear(size_t hidden_size, size_t intermediate_size, + std::shared_ptr quantization = nullptr, bool bias = false, const infinicore::DataType &dtype = infinicore::DataType::F32, const infinicore::Device &device = infinicore::Device(), engine::distributed::RankInfo rank_info = engine::distributed::RankInfo()); GateUpParallelLinear(size_t hidden_size, size_t intermediate_size, bool gate_bias, bool up_bias, - std::shared_ptr quantization, + std::shared_ptr quantization = nullptr, const infinicore::DataType &dtype = infinicore::DataType::F32, const infinicore::Device &device = infinicore::Device(), engine::distributed::RankInfo rank_info = engine::distributed::RankInfo()); GateUpParallelLinear(size_t hidden_size, size_t intermediate_size, const std::string &gate_name, const std::string &up_name, RegisterParamFn register_fn, - std::shared_ptr quantization, + std::shared_ptr quantization = nullptr, bool bias = false, const infinicore::DataType &dtype = infinicore::DataType::F32, const infinicore::Device &device = infinicore::Device(), diff --git a/csrc/layers/quantization/none_quantization.cpp b/csrc/layers/quantization/none_quantization.cpp index 6f49a3943..e1f67a7d1 100644 --- a/csrc/layers/quantization/none_quantization.cpp +++ b/csrc/layers/quantization/none_quantization.cpp @@ -4,6 +4,8 @@ namespace infinilm::quantization { +NoneQuantization::NoneQuantization() : NoneQuantization(nlohmann::json()) {} + std::vector NoneQuantization::get_param_layout( size_t in_features, size_t out_features, int split_dim, int tp_rank, int tp_size, @@ -14,8 +16,7 @@ std::vector NoneQuantization::get_param_layout( std::vector descs; descs.push_back({"weight", {out_features, in_features}, dtype, split_dim, tp_rank, tp_size}); if (bias) { - descs.push_back({"bias", {out_features}, dtype, split_dim >= 0 ? 0 : -1, - split_dim >= 0 ? tp_rank : 0, split_dim >= 0 ? tp_size : 1}); + descs.push_back({"bias", {out_features}, dtype, split_dim >= 0 ? 0 : -1, split_dim >= 0 ? tp_rank : 0, split_dim >= 0 ? tp_size : 1}); } return descs; } diff --git a/csrc/layers/quantization/none_quantization.hpp b/csrc/layers/quantization/none_quantization.hpp index 44fd890d9..18fe4bf17 100644 --- a/csrc/layers/quantization/none_quantization.hpp +++ b/csrc/layers/quantization/none_quantization.hpp @@ -6,7 +6,9 @@ namespace infinilm::quantization { class NoneQuantization : public BaseQuantization { public: explicit NoneQuantization(const nlohmann::json &quant_config) - : BaseQuantization(quant_config) {}; + : BaseQuantization(quant_config){}; + + NoneQuantization(); QuantScheme get_quant_scheme() const override { return QuantScheme::NONE; diff --git a/csrc/models/infinilm_model.hpp b/csrc/models/infinilm_model.hpp index 33e3909e5..5cabcef23 100644 --- a/csrc/models/infinilm_model.hpp +++ b/csrc/models/infinilm_model.hpp @@ -21,9 +21,6 @@ class InfinilmModel : public infinicore::nn::Module { struct Input { /// Token IDs tensor of shape `[batch, seq_len]`. std::optional input_ids; - /// Image pixel values for multi-modal models. - /// Shape is model-specific (e.g. LLaVA: [batch, 3, H, W], MiniCPM-V: [batch, 3, patch, seq_len * patch]). - std::optional pixel_values; /// Position IDs tensor of shape `[batch, seq_len]` or `[seq_len]`. std::optional position_ids; /// Past Lengths of cached sequence for each request, of shape `[num_requests]`. @@ -38,12 +35,15 @@ class InfinilmModel : public infinicore::nn::Module { std::optional block_tables; /// Slot ids for each token `[seq]`. Used for paged cache. std::optional slot_mapping; + /// Image pixel values for multi-modal models. + /// Vector of tensors. Shape is model-specific (e.g. LLaVA: [batch, 3, H, W], MiniCPM-V: [n_patch, 3, filter_H, H * W / filter_H]). + std::optional> pixel_values; /// Image placeholder bounds for MiniCPM-V style replacement. - /// Tensor shape: [batch, max_ranges, 2] (start, end). - std::optional image_bound; + /// Vector of tensors shape: [n_patch, 2]. + std::optional> image_bound; /// Target patch sizes for each image (MiniCPM-V). - /// Tensor shape: [batch, 2] or [batch, max_slices, 2] if pre-flattened. - std::optional tgt_sizes; + /// Vector of tensors shape: [n_path, 2] if pre-flattened. + std::optional> tgt_sizes; }; struct Output { diff --git a/csrc/models/minicpmv/minicpmv_model.cpp b/csrc/models/minicpmv/minicpmv_model.cpp index c372f148c..bbe1d227a 100644 --- a/csrc/models/minicpmv/minicpmv_model.cpp +++ b/csrc/models/minicpmv/minicpmv_model.cpp @@ -33,22 +33,21 @@ MiniCPMVModel::MiniCPMVModel(std::shared_ptr mode embed_dim, num_heads, vision_cfg.value("hidden_size", 768), + vision_cfg.value("image_size", 224), + vision_cfg.value("patch_size", 16), dtype, device); } -infinicore::Tensor MiniCPMVModel::replace_embeddings(const infinicore::Tensor &inputs_embeds, - const infinicore::Tensor &vision_hidden, - const infinicore::Tensor &image_bound) const { - auto out = infinicore::Tensor::empty(inputs_embeds->shape(), inputs_embeds->dtype(), inputs_embeds->device()); - out->copy_from(inputs_embeds); - +void MiniCPMVModel::replace_embeddings(infinicore::Tensor inputs_embeds, + const infinicore::Tensor &vision_hidden, + const infinicore::Tensor &image_bound) const { auto bounds_cpu = image_bound->to(infinicore::Device::cpu()); auto batch_size = inputs_embeds->size(0); ASSERT_EQ(batch_size, 1); ASSERT_EQ(bounds_cpu->size(0), 1); - auto out_slice = out->squeeze(0); + auto out_slice = inputs_embeds->squeeze(0); auto bound_slice = bounds_cpu->squeeze(0); auto vision_len = vision_hidden->size(0); for (size_t patch = 0; patch < vision_len; ++patch) { @@ -60,8 +59,6 @@ infinicore::Tensor MiniCPMVModel::replace_embeddings(const infinicore::Tensor &i out_slice->narrow({{0, size_t(start), size_t(end - start)}})->copy_from(patch_embed); } - - return out; } InfinilmModel::Output MiniCPMVModel::forward(const InfinilmModel::Input &input) const { @@ -70,36 +67,30 @@ InfinilmModel::Output MiniCPMVModel::forward(const InfinilmModel::Input &input) } auto input_ids = input.input_ids.value(); - if (input.pixel_values.has_value() && input_ids->size(1) > 1) { - if (!input.image_bound.has_value()) { - throw std::runtime_error("MiniCPMVModel: image_bound required for multimodal input"); + if (input.pixel_values.has_value() && input.pixel_values.value().size() > 0) { + if (!input.image_bound.has_value() or !input.tgt_sizes.has_value()) { + throw std::runtime_error("MiniCPMVModel: image_bound and tgt_sizes must be provided with pixel_values"); + } + if (input.pixel_values->size() != input.image_bound->size() || input.pixel_values->size() != input.tgt_sizes->size()) { + throw std::runtime_error("MiniCPMVModel: pixel_values, image_bound and tgt_sizes must have the same number of elements"); } - auto pixel_values = input.pixel_values.value(); - auto vision_embedding = vpm_->forward(pixel_values, input.tgt_sizes); - auto vision_hidden = resampler_->forward(vision_embedding, input.tgt_sizes); auto inputs_embeds = llm_->model().embed_tokens(input_ids); - auto merged_embeds = replace_embeddings(inputs_embeds, vision_hidden, input.image_bound.value()); - - infinicore::Tensor position_ids; - if (input.position_ids.has_value()) { - position_ids = input.position_ids.value(); - } else { - auto batch = merged_embeds->size(0); - auto seq_len = merged_embeds->size(1); - auto pos_cpu = infinicore::Tensor::zeros({batch, seq_len}, infinicore::DataType::I64, infinicore::Device::cpu()); - auto *pos_ptr = reinterpret_cast(pos_cpu->data()); - for (size_t b = 0; b < batch; ++b) { - for (size_t i = 0; i < seq_len; ++i) { - pos_ptr[b * seq_len + i] = static_cast(i); - } - } - position_ids = pos_cpu->to(merged_embeds->device()); + + // inputs_embeds concat tokens from all requests, while images are processed per request + // slice inputs_embeds using request offsets to get the embedding of each request + infinicore::Tensor input_offsets_cpu = input.input_offsets.value()->to(infinicore::Device::cpu()); + int32_t *offsets = (int32_t *)(input_offsets_cpu->data()); + for (size_t i : global_state::get_forward_context().mm_metadata.image_req_ids.value()) { + auto pixel_values = input.pixel_values.value().at(i); + auto vision_embedding = vpm_->forward(pixel_values, input.tgt_sizes.value().at(i)); + auto vision_hidden = resampler_->forward(vision_embedding, input.tgt_sizes.value().at(i)); + replace_embeddings(inputs_embeds->narrow({{1, size_t(offsets[i]), size_t(offsets[i + 1] - offsets[i])}}), vision_hidden, input.image_bound.value().at(i)); } auto hidden_states = llm_->model().forward_embeds( - merged_embeds, - position_ids); + inputs_embeds, + input.position_ids.value()); auto logits = llm_->logits_from_hidden(hidden_states); return {logits}; diff --git a/csrc/models/minicpmv/minicpmv_model.hpp b/csrc/models/minicpmv/minicpmv_model.hpp index 72d0aa11c..98bfb6944 100644 --- a/csrc/models/minicpmv/minicpmv_model.hpp +++ b/csrc/models/minicpmv/minicpmv_model.hpp @@ -23,9 +23,9 @@ class MiniCPMVModel : public InfinilmModel { void reset_cache(const cache::CacheConfig *cache_config) override; private: - infinicore::Tensor replace_embeddings(const infinicore::Tensor &inputs_embeds, - const infinicore::Tensor &vision_hidden, - const infinicore::Tensor &image_bound) const; + void replace_embeddings(infinicore::Tensor inputs_embeds, + const infinicore::Tensor &vision_hidden, + const infinicore::Tensor &image_bound) const; std::shared_ptr config_; diff --git a/csrc/models/minicpmv/resampler.cpp b/csrc/models/minicpmv/resampler.cpp index 47bd8336f..45a858d4b 100644 --- a/csrc/models/minicpmv/resampler.cpp +++ b/csrc/models/minicpmv/resampler.cpp @@ -121,15 +121,20 @@ Resampler::Resampler(size_t num_queries, size_t embed_dim, size_t num_heads, size_t kv_dim, + size_t image_size, + size_t patch_size, const infinicore::DataType &dtype, const infinicore::Device &device) : num_queries_(num_queries), embed_dim_(embed_dim), num_heads_(num_heads), kv_dim_(kv_dim), + image_size_(image_size), + patch_size_(patch_size), use_kv_proj_(kv_dim != embed_dim) { INFINICORE_NN_PARAMETER_INIT(query, ({num_queries_, embed_dim_}, dtype, device)); INFINICORE_NN_PARAMETER_INIT(proj, ({embed_dim_, embed_dim_}, dtype, device)); + INFINICORE_NN_MODULE_INIT(attn, embed_dim_, num_heads_, dtype, device); INFINICORE_NN_MODULE_INIT(ln_q, embed_dim_, 1e-6, dtype, device); INFINICORE_NN_MODULE_INIT(ln_kv, embed_dim_, 1e-6, dtype, device); @@ -138,10 +143,19 @@ Resampler::Resampler(size_t num_queries, if (use_kv_proj_) { INFINICORE_NN_MODULE_INIT(kv_proj, kv_dim_, embed_dim_, false, dtype, device); } + + // Initialize full 2d embeddings with max size, calculate on cpu and copy to gpu + size_t num_patches = image_size_ / patch_size_; + INFINICORE_NN_BUFFER_INIT(embedding_table, ({num_patches, num_patches, embed_dim_}, dtype, device_)); + std::vector buf(num_patches * num_patches * embed_dim_); + compute_2d_sincos_pos_embed(buf.data(), embed_dim_, num_patches, num_patches); + auto embedding_table_cpu = infinicore::Tensor::zeros({num_patches, num_patches, embed_dim_}, dtype, infinicore::Device::cpu()); + write_pos_embed(embedding_table_cpu->data(), embedding_table_cpu->dtype(), buf.data(), num_patches * num_patches * embed_dim_); + embedding_table_->copy_from(embedding_table_cpu); } infinicore::Tensor Resampler::forward(const infinicore::Tensor &x, - const std::optional &tgt_sizes) const { + const infinicore::Tensor &tgt_sizes) const { auto batch_size = x->size(0); auto seq_len = x->size(1); @@ -152,33 +166,22 @@ infinicore::Tensor Resampler::forward(const infinicore::Tensor &x, kv = ln_kv_->forward(kv); // Build positional embeddings on CPU - std::vector tgt_sizes_host; - if (tgt_sizes.has_value()) { - auto tgt_cpu = tgt_sizes.value()->to(infinicore::Device::cpu()); - auto n = tgt_cpu->numel(); - tgt_sizes_host.resize(n); - std::memcpy(tgt_sizes_host.data(), tgt_cpu->data(), n * sizeof(int64_t)); - } + auto tgt_cpu = tgt_sizes->to(infinicore::Device::cpu()); + int64_t *tgt_sizes_ptr = (int64_t *)(tgt_cpu->data()); - auto pos_cpu = infinicore::Tensor::zeros({batch_size, seq_len, embed_dim_}, kv->dtype(), infinicore::Device::cpu()); - auto *pos_ptr = reinterpret_cast(pos_cpu->data()); - const size_t elem_size = pos_cpu->element_size(); + auto pos_embeddings = infinicore::Tensor::zeros(kv->shape(), kv->dtype(), kv->device()); for (size_t b = 0; b < batch_size; ++b) { - size_t tgt_h = 1; - size_t tgt_w = seq_len; - if (!tgt_sizes_host.empty()) { - tgt_h = static_cast(tgt_sizes_host[b * 2]); - tgt_w = static_cast(tgt_sizes_host[b * 2 + 1]); - } - const size_t patch_len = tgt_h * tgt_w; - std::vector buf(patch_len * embed_dim_); - compute_2d_sincos_pos_embed(buf.data(), embed_dim_, tgt_h, tgt_w); - write_pos_embed(pos_ptr + b * seq_len * embed_dim_ * elem_size, pos_cpu->dtype(), buf.data(), patch_len * embed_dim_); + + auto tgt_h = static_cast(tgt_sizes_ptr[b * 2]); + auto tgt_w = static_cast(tgt_sizes_ptr[b * 2 + 1]); + + auto src_embeddings = embedding_table_->narrow({{0, 0, tgt_h}, {1, 0, tgt_w}}); + auto tgt_embeddings = pos_embeddings->narrow({{0, b, 1}, {1, 0, tgt_h * tgt_w}})->view({tgt_h, tgt_w, embed_dim_}); + tgt_embeddings->copy_from(src_embeddings); } - auto pos = pos_cpu->to(kv->device()); - auto kv_with_pos = infinicore::op::add(kv, pos); + auto kv_with_pos = infinicore::op::add(kv, pos_embeddings); auto q = ln_q_->forward(query_); if (q->shape().size() == 2) { diff --git a/csrc/models/minicpmv/resampler.hpp b/csrc/models/minicpmv/resampler.hpp index 7cb5d6c57..ab9751d41 100644 --- a/csrc/models/minicpmv/resampler.hpp +++ b/csrc/models/minicpmv/resampler.hpp @@ -1,8 +1,8 @@ #pragma once #include "../../config/model_config.hpp" +#include "../../layers/linear/fused_linear.hpp" #include "infinicore/nn/layer_norm.hpp" -#include "infinicore/nn/linear.hpp" #include "infinicore/nn/module.hpp" #include "infinicore/tensor.hpp" @@ -30,7 +30,7 @@ class ResamplerAttention : public infinicore::nn::Module { INFINICORE_NN_PARAMETER(in_proj_weight); INFINICORE_NN_PARAMETER(in_proj_bias); - INFINICORE_NN_MODULE(infinicore::nn::Linear, out_proj); + INFINICORE_NN_MODULE(infinilm::nn::Linear, out_proj); }; class Resampler : public infinicore::nn::Module { @@ -39,22 +39,27 @@ class Resampler : public infinicore::nn::Module { size_t embed_dim, size_t num_heads, size_t kv_dim, + size_t image_size, + size_t patch_size, const infinicore::DataType &dtype, const infinicore::Device &device); infinicore::Tensor forward(const infinicore::Tensor &x, - const std::optional &tgt_sizes) const; + const infinicore::Tensor &tgt_sizes) const; private: size_t num_queries_; size_t embed_dim_; size_t num_heads_; size_t kv_dim_; + size_t image_size_; + size_t patch_size_; bool use_kv_proj_; INFINICORE_NN_PARAMETER(query); INFINICORE_NN_PARAMETER(proj); - INFINICORE_NN_MODULE(infinicore::nn::Linear, kv_proj); + INFINICORE_NN_BUFFER(embedding_table); + INFINICORE_NN_MODULE(infinilm::nn::Linear, kv_proj); INFINICORE_NN_MODULE(ResamplerAttention, attn); INFINICORE_NN_MODULE(infinicore::nn::LayerNorm, ln_q); INFINICORE_NN_MODULE(infinicore::nn::LayerNorm, ln_kv); diff --git a/csrc/models/minicpmv/siglip_vision.cpp b/csrc/models/minicpmv/siglip_vision.cpp index ea903c10f..23a23f2f1 100644 --- a/csrc/models/minicpmv/siglip_vision.cpp +++ b/csrc/models/minicpmv/siglip_vision.cpp @@ -1,6 +1,8 @@ #include "siglip_vision.hpp" +#include "../../global_state/global_state.hpp" #include "infinicore/ops.hpp" +#include "infinicore/ops/mha.hpp" #include #include @@ -36,7 +38,7 @@ SiglipVisionEmbeddings::SiglipVisionEmbeddings(const nlohmann::json &config, } infinicore::Tensor SiglipVisionEmbeddings::forward(const infinicore::Tensor &pixel_values, - const std::optional &tgt_sizes) const { + const infinicore::Tensor &tgt_sizes) const { auto patch_embeds = patch_embedding_->forward(pixel_values); auto batch_size = patch_embeds->size(0); auto seq_len = patch_embeds->size(2) * patch_embeds->size(3); @@ -50,12 +52,11 @@ infinicore::Tensor SiglipVisionEmbeddings::forward(const infinicore::Tensor &pix const size_t num_patches_per_side = static_cast(std::sqrt(static_cast(num_positions_))); std::vector tgt_sizes_host; - if (tgt_sizes.has_value()) { - auto tgt_cpu = tgt_sizes.value()->to(infinicore::Device::cpu()); - auto n = tgt_cpu->numel(); - tgt_sizes_host.resize(n); - std::memcpy(tgt_sizes_host.data(), tgt_cpu->data(), n * sizeof(int64_t)); - } + + auto tgt_cpu = tgt_sizes->to(infinicore::Device::cpu()); + auto n = tgt_cpu->numel(); + tgt_sizes_host.resize(n); + std::memcpy(tgt_sizes_host.data(), tgt_cpu->data(), n * sizeof(int64_t)); for (size_t b = 0; b < batch_size; ++b) { size_t nb_h = num_patches_per_side; @@ -93,44 +94,52 @@ SiglipAttention::SiglipAttention(const nlohmann::json &config, if (embed_dim_ % num_heads_ != 0) { throw std::runtime_error("SiglipAttention: embed_dim must be divisible by num_heads"); } - INFINICORE_NN_MODULE_INIT(q_proj, embed_dim_, embed_dim_, true, dtype, device); - INFINICORE_NN_MODULE_INIT(k_proj, embed_dim_, embed_dim_, true, dtype, device); - INFINICORE_NN_MODULE_INIT(v_proj, embed_dim_, embed_dim_, true, dtype, device); + qkv_proj_ = std::make_shared( + embed_dim_, head_dim_, num_heads_, num_heads_, + "q_proj", "k_proj", "v_proj", [this](const std::string &n, infinicore::nn::Parameter p) { this->register_parameter(n, std::move(p)); }, + nullptr, true, dtype, device); + INFINICORE_NN_MODULE_INIT(out_proj, embed_dim_, embed_dim_, true, dtype, device); + + attention_backend_ = infinilm::global_state::get_infinilm_config().attention_backend; } infinicore::Tensor SiglipAttention::forward(const infinicore::Tensor &hidden_states, const std::optional &attention_mask) const { - (void)attention_mask; auto shape = hidden_states->shape(); size_t batch_size = shape[0]; size_t seq_len = shape[1]; - auto q = q_proj_->forward(const_cast(hidden_states)); - auto k = k_proj_->forward(const_cast(hidden_states)); - auto v = v_proj_->forward(const_cast(hidden_states)); - - auto q_reshaped = q->view({batch_size, seq_len, num_heads_, head_dim_})->permute({0, 2, 1, 3})->contiguous(); - auto k_reshaped = k->view({batch_size, seq_len, num_heads_, head_dim_})->permute({0, 2, 1, 3})->contiguous(); - auto v_reshaped = v->view({batch_size, seq_len, num_heads_, head_dim_})->permute({0, 2, 1, 3})->contiguous(); - - auto q_flat = q_reshaped->view({batch_size * num_heads_, seq_len, head_dim_}); - auto k_flat = k_reshaped->view({batch_size * num_heads_, seq_len, head_dim_}); - auto v_flat = v_reshaped->view({batch_size * num_heads_, seq_len, head_dim_}); + auto qkv = qkv_proj_->forward(const_cast(hidden_states))->view({batch_size, seq_len, num_heads_ * 3, head_dim_}); + auto q = qkv->narrow({{2, 0, num_heads_}}); + auto k = qkv->narrow({{2, num_heads_, num_heads_}}); + auto v = qkv->narrow({{2, num_heads_ * 2, num_heads_}}); - auto k_t = k_flat->permute({0, 2, 1}); - auto attn_weights = infinicore::op::matmul(q_flat, k_t, scale_); - - auto attn_view = attn_weights->view({batch_size * num_heads_, seq_len, seq_len}); - infinicore::op::softmax_(attn_view, attn_view, -1); - - auto attn_output = infinicore::op::matmul(attn_weights, v_flat); - auto out = attn_output->view({batch_size, num_heads_, seq_len, head_dim_}) - ->permute({0, 2, 1, 3}) - ->contiguous() - ->view({batch_size, seq_len, embed_dim_}); - - return out_proj_->forward(out); + if (attention_backend_ == infinilm::backends::AttentionBackend::FLASH_ATTN) { + auto out = infinicore::op::mha(q, k, v, std::nullopt, scale_, false)->view({batch_size, seq_len, num_heads_ * head_dim_}); + return out_proj_->forward(out); + } else { + auto q_reshaped = q->view({batch_size, seq_len, num_heads_, head_dim_})->permute({0, 2, 1, 3})->contiguous(); + auto k_reshaped = k->view({batch_size, seq_len, num_heads_, head_dim_})->permute({0, 2, 1, 3})->contiguous(); + auto v_reshaped = v->view({batch_size, seq_len, num_heads_, head_dim_})->permute({0, 2, 1, 3})->contiguous(); + + auto q_flat = q_reshaped->view({batch_size * num_heads_, seq_len, head_dim_}); + auto k_flat = k_reshaped->view({batch_size * num_heads_, seq_len, head_dim_}); + auto v_flat = v_reshaped->view({batch_size * num_heads_, seq_len, head_dim_}); + + auto k_t = k_flat->permute({0, 2, 1}); + auto attn_weights = infinicore::op::matmul(q_flat, k_t, scale_); + + auto attn_view = attn_weights->view({batch_size * num_heads_, seq_len, seq_len}); + infinicore::op::softmax_(attn_view, attn_view, -1); + + auto attn_output = infinicore::op::matmul(attn_weights, v_flat); + auto out = attn_output->view({batch_size, num_heads_, seq_len, head_dim_}) + ->permute({0, 2, 1, 3}) + ->contiguous() + ->view({batch_size, seq_len, embed_dim_}); + return out_proj_->forward(out); + } } SiglipMLP::SiglipMLP(const nlohmann::json &config, @@ -211,7 +220,7 @@ SiglipVisionModel::SiglipVisionModel(const nlohmann::json &config, } infinicore::Tensor SiglipVisionModel::forward(const infinicore::Tensor &pixel_values, - const std::optional &tgt_sizes) const { + const infinicore::Tensor &tgt_sizes) const { auto hidden_states = embeddings_->forward(pixel_values, tgt_sizes); hidden_states = encoder_->forward(hidden_states, std::nullopt); return post_layernorm_->forward(hidden_states); diff --git a/csrc/models/minicpmv/siglip_vision.hpp b/csrc/models/minicpmv/siglip_vision.hpp index 6e075d3bb..9bfb7a77b 100644 --- a/csrc/models/minicpmv/siglip_vision.hpp +++ b/csrc/models/minicpmv/siglip_vision.hpp @@ -1,9 +1,10 @@ #pragma once +#include "../../backends/attention_backends.hpp" #include "../../config/model_config.hpp" +#include "../../layers/linear/fused_linear.hpp" #include "infinicore/nn/embedding.hpp" #include "infinicore/nn/layer_norm.hpp" -#include "infinicore/nn/linear.hpp" #include "infinicore/nn/module.hpp" #include "infinicore/tensor.hpp" #include @@ -36,7 +37,7 @@ class SiglipVisionEmbeddings : public infinicore::nn::Module { const infinicore::Device &device); infinicore::Tensor forward(const infinicore::Tensor &pixel_values, - const std::optional &tgt_sizes) const; + const infinicore::Tensor &tgt_sizes) const; private: size_t hidden_size_; @@ -61,11 +62,10 @@ class SiglipAttention : public infinicore::nn::Module { size_t num_heads_; size_t head_dim_; float scale_; + infinilm::backends::AttentionBackend attention_backend_; - INFINICORE_NN_MODULE(infinicore::nn::Linear, q_proj); - INFINICORE_NN_MODULE(infinicore::nn::Linear, k_proj); - INFINICORE_NN_MODULE(infinicore::nn::Linear, v_proj); - INFINICORE_NN_MODULE(infinicore::nn::Linear, out_proj); + INFINICORE_NN_MODULE(infinilm::layers::linear::QKVParallelLinear, qkv_proj); + INFINICORE_NN_MODULE(infinilm::nn::Linear, out_proj); }; class SiglipMLP : public infinicore::nn::Module { @@ -78,8 +78,8 @@ class SiglipMLP : public infinicore::nn::Module { private: std::string activation_; - INFINICORE_NN_MODULE(infinicore::nn::Linear, fc1); - INFINICORE_NN_MODULE(infinicore::nn::Linear, fc2); + INFINICORE_NN_MODULE(infinilm::nn::Linear, fc1); + INFINICORE_NN_MODULE(infinilm::nn::Linear, fc2); }; class SiglipEncoderLayer : public infinicore::nn::Module { @@ -119,7 +119,7 @@ class SiglipVisionModel : public infinicore::nn::Module { bool drop_last_layer); infinicore::Tensor forward(const infinicore::Tensor &pixel_values, - const std::optional &tgt_sizes) const; + const infinicore::Tensor &tgt_sizes) const; private: nlohmann::json config_; diff --git a/csrc/pybind11/engine/engine.hpp b/csrc/pybind11/engine/engine.hpp index 0d480bbff..a87562acc 100644 --- a/csrc/pybind11/engine/engine.hpp +++ b/csrc/pybind11/engine/engine.hpp @@ -30,6 +30,7 @@ namespace infinilm::engine { inline void bind_infer_engine(py::module &m) { py::class_> infer_engine(m, "InferEngine"); + infer_engine .def(py::init([]( const std::string &model_path, @@ -70,8 +71,14 @@ inline void bind_infer_engine(py::module &m) { return state_dict_tp_all; }) .def("process_weights_after_loading", &InferEngine::process_weights_after_loading, "Process the weights after loading on all workers (e.g., for quantization)") - .def("forward", [](InferEngine &self, const InferEngine::Input &input) -> InferEngine::Output { return self.forward(input); }, "Run inference on all ranks with arbitrary arguments") - .def("reset_cache", [](InferEngine &self, std::shared_ptr cfg) { self.reset_cache(cfg ? cfg.get() : nullptr); }, py::arg("cache_config") = py::none()) + .def( + "forward", [](InferEngine &self, const InferEngine::Input &input) -> InferEngine::Output { + py::gil_scoped_release release; + return self.forward(input); + }, + "Run inference on all ranks with arbitrary arguments") + .def( + "reset_cache", [](InferEngine &self, std::shared_ptr cfg) { self.reset_cache(cfg ? cfg.get() : nullptr); }, py::arg("cache_config") = py::none()) .def("get_cache_config", [](const InferEngine &self) -> std::shared_ptr { auto cfg = self.get_cache_config(); return cfg ? std::shared_ptr(cfg->unique_copy()) : nullptr; }) @@ -81,7 +88,6 @@ inline void bind_infer_engine(py::module &m) { .def( py::init([]( std::optional input_ids, - std::optional pixel_values, std::optional position_ids, std::optional past_sequence_lengths, std::optional total_sequence_lengths, @@ -89,12 +95,13 @@ inline void bind_infer_engine(py::module &m) { std::optional cu_seqlens, std::optional block_tables, std::optional slot_mapping, - std::optional image_bound, - std::optional tgt_sizes, + std::optional> pixel_values, + std::optional> image_bound, + std::optional> tgt_sizes, + std::optional> image_req_ids, py::kwargs kwargs) { InferEngine::Input input{ std::move(input_ids), - std::move(pixel_values), std::move(position_ids), std::move(past_sequence_lengths), std::move(total_sequence_lengths), @@ -102,8 +109,10 @@ inline void bind_infer_engine(py::module &m) { std::move(cu_seqlens), std::move(block_tables), std::move(slot_mapping), + std::move(pixel_values), std::move(image_bound), std::move(tgt_sizes), + std::move(image_req_ids), }; // Explicit defaults @@ -138,7 +147,6 @@ inline void bind_infer_engine(py::module &m) { return input; }), py::arg("input_ids") = std::nullopt, - py::arg("pixel_values") = std::nullopt, py::arg("position_ids") = std::nullopt, py::arg("past_sequence_lengths") = std::nullopt, py::arg("total_sequence_lengths") = std::nullopt, @@ -146,10 +154,11 @@ inline void bind_infer_engine(py::module &m) { py::arg("cu_seqlens") = std::nullopt, py::arg("block_tables") = std::nullopt, py::arg("slot_mapping") = std::nullopt, + py::arg("pixel_values") = std::nullopt, py::arg("image_bound") = std::nullopt, - py::arg("tgt_sizes") = std::nullopt) + py::arg("tgt_sizes") = std::nullopt, + py::arg("image_req_ids") = std::nullopt) .def_readwrite("input_ids", &InferEngine::Input::input_ids) - .def_readwrite("pixel_values", &InferEngine::Input::pixel_values) .def_readwrite("position_ids", &InferEngine::Input::position_ids) .def_readwrite("past_sequence_lengths", &InferEngine::Input::past_sequence_lengths) .def_readwrite("total_sequence_lengths", &InferEngine::Input::total_sequence_lengths) @@ -157,8 +166,10 @@ inline void bind_infer_engine(py::module &m) { .def_readwrite("cu_seqlens", &InferEngine::Input::cu_seqlens) .def_readwrite("block_tables", &InferEngine::Input::block_tables) .def_readwrite("slot_mapping", &InferEngine::Input::slot_mapping) + .def_readwrite("pixel_values", &InferEngine::Input::pixel_values) .def_readwrite("image_bound", &InferEngine::Input::image_bound) .def_readwrite("tgt_sizes", &InferEngine::Input::tgt_sizes) + .def_readwrite("image_req_ids", &InferEngine::Input::image_req_ids) .def_readwrite("temperature", &InferEngine::Input::temperature) .def_readwrite("top_k", &InferEngine::Input::top_k) .def_readwrite("top_p", &InferEngine::Input::top_p); diff --git a/examples/test_infer.py b/examples/test_infer.py index a3ce5e6d5..a90a5d3e8 100644 --- a/examples/test_infer.py +++ b/examples/test_infer.py @@ -47,9 +47,9 @@ def test( ] if image_path is not None: for conversation in conversations: - conversation["content"] = [ + conversation[0]["content"] = [ {"type": "image_url", "image_url": {"url": image_path}} - ] + conversation["content"] + ] + conversation[0]["content"] t1 = time.time() print("=================== start generate ====================") diff --git a/python/infinilm/infer_engine.py b/python/infinilm/infer_engine.py index 31d65de93..56747a8ac 100644 --- a/python/infinilm/infer_engine.py +++ b/python/infinilm/infer_engine.py @@ -125,7 +125,6 @@ def forward( self, input_ids, *, - pixel_values=None, position_ids=None, past_kv_lengths=None, total_kv_lengths=None, @@ -133,8 +132,10 @@ def forward( cu_seqlens=None, block_tables=None, slot_mapping=None, + pixel_values=None, image_bound=None, tgt_sizes=None, + image_req_ids=None, temperature=None, top_k=None, top_p=None, @@ -142,9 +143,6 @@ def forward( try: # TODO: Remove `_underlying` and simplify the corresponding code. input_ids = input_ids._underlying if input_ids is not None else None - pixel_values = ( - pixel_values._underlying if pixel_values is not None else None - ) position_ids = ( position_ids._underlying if position_ids is not None else None ) @@ -164,15 +162,25 @@ def forward( slot_mapping = ( slot_mapping._underlying if slot_mapping is not None else None ) - image_bound = image_bound._underlying if image_bound is not None else None - tgt_sizes = tgt_sizes._underlying if tgt_sizes is not None else None + + def convert_tensor_list(tensor_list_): + if tensor_list_ is None: + return None + if not isinstance(tensor_list_, list): + tensor_list_ = [tensor_list_] + if len(tensor_list_) == 0: + return None + return [tensor._underlying for tensor in tensor_list_] + + pixel_values = convert_tensor_list(pixel_values) + image_bound = convert_tensor_list(image_bound) + tgt_sizes = convert_tensor_list(tgt_sizes) return infinicore.Tensor( super() .forward( super().Input( input_ids, - pixel_values=pixel_values, position_ids=position_ids, past_sequence_lengths=past_kv_lengths, total_sequence_lengths=total_kv_lengths, @@ -180,8 +188,10 @@ def forward( cu_seqlens=cu_seqlens, block_tables=block_tables, slot_mapping=slot_mapping, + pixel_values=pixel_values, image_bound=image_bound, tgt_sizes=tgt_sizes, + image_req_ids=image_req_ids, temperature=temperature, top_k=top_k, top_p=top_p, @@ -364,6 +374,6 @@ def state_dict_keyname(self): def load_state_dict(self, state_dict, strict=None): for name, param in state_dict.items(): super().load_param(name, param._underlying) - + def process_weights_after_loading(self): super().process_weights_after_loading() diff --git a/python/infinilm/llm/cache_manager.py b/python/infinilm/llm/cache_manager.py index 44ca13762..ea857b029 100644 --- a/python/infinilm/llm/cache_manager.py +++ b/python/infinilm/llm/cache_manager.py @@ -3,6 +3,7 @@ """ from collections import deque +import queue from typing import List, Dict, Set import xxhash import numpy as np @@ -45,9 +46,9 @@ class BlockManager: """ def __init__(self, num_blocks: int, block_size: int): - assert ( - num_blocks > 0 and block_size > 0 - ), "num_blocks and block_size must be positive" + assert num_blocks > 0 and block_size > 0, ( + "num_blocks and block_size must be positive" + ) self.num_blocks = num_blocks self.block_size = block_size @@ -67,12 +68,20 @@ def reset_req_blocks(self) -> None: self.req_block_ids.clear() @classmethod - def compute_hash(cls, token_ids: List[int], prefix_hash: int = -1) -> int: + def compute_hash( + cls, + token_ids: List[int], + prefix_hash: int = -1, + mm_data_identifiers: List[str] = None, + ) -> int: """Compute hash for token sequence with optional prefix chaining.""" h = xxhash.xxh64() if prefix_hash != -1: h.update(prefix_hash.to_bytes(8, "little")) h.update(np.array(token_ids, dtype=np.int32).tobytes()) + if mm_data_identifiers is not None: + for identifier in mm_data_identifiers: + h.update(identifier.encode("utf-8")) return h.intdigest() def _allocate_partial_block(self, block_id: int) -> Block: @@ -100,9 +109,9 @@ def _allocate_full_block(self, block_id: int) -> Block: def _deallocate_block(self, block_id: int): """Deallocate a block and return it to free list.""" block = self.blocks[block_id] - assert ( - block.ref_count == 0 - ), f"Block {block_id} ref_count not zero, cannot deallocate" + assert block.ref_count == 0, ( + f"Block {block_id} ref_count not zero, cannot deallocate" + ) if block.hash != -1 and self.hash_to_block_id.get(block.hash) == block_id: del self.hash_to_block_id[block.hash] @@ -115,73 +124,142 @@ def can_allocate(self, num_required_blocks: int) -> bool: return len(self.free_block_ids) >= num_required_blocks def allocate_blocks( - self, token_ids: List[int], block_table: List[int] = None + self, + token_ids: List[int], + block_table: List[int] = None, + mm_token_index_mappings: List[dict] = None, ) -> tuple[List[int], List[int], int]: """Allocate cache blocks for new request with prefix caching support. Args: token_ids: Input token sequence block_table: Existing block_table (for decode phase) - + mm_token_index_mappings: List of multimodal token index mappings Returns: Tuple of (block_table, slot_mapping, num_cached_tokens) """ if block_table is None: block_table = [] + # Static args num_tokens = len(token_ids) num_blocks = (num_tokens + self.block_size - 1) // self.block_size + num_full_blocks = num_tokens // self.block_size + remain_tokens = num_tokens % self.block_size + num_mm_inputs = ( + 0 if not mm_token_index_mappings else len(mm_token_index_mappings) + ) + + # Variables slot_mapping = [] num_cached_tokens = 0 prefix_hash = -1 cache_miss = False + mm_start_counter = 0 + mm_caching_queue = queue.Queue(maxsize=len(mm_token_index_mappings)) + blocks_blueprint = [] # [{"prefix_hash": int or -1 if not a full block, "block_id": int or -1 if not cached}, ...] + max_blocks_to_reuse = num_full_blocks for block_idx in range(num_blocks): start_idx = block_idx * self.block_size end_idx = min(start_idx + self.block_size, num_tokens) block_tokens = token_ids[start_idx:end_idx] - # Only full blocks can be hashed for reuse - if len(block_tokens) == self.block_size: - prefix_hash = self.compute_hash(block_tokens, prefix_hash) - - # Try to reuse existing block - if not cache_miss: - cached_block_id = self.hash_to_block_id.get(prefix_hash, -1) - if ( - cached_block_id != -1 - and self.blocks[cached_block_id].token_ids == block_tokens - ): - # Check if all tokens are cached - if num_cached_tokens + self.block_size == len(token_ids): - cache_miss = True - else: - # Reuse successful - block = self.blocks[cached_block_id] - block.ref_count += 1 - block_table.append(cached_block_id) - num_cached_tokens += self.block_size - continue - else: - cache_miss = True - else: - prefix_hash = -1 - - # Cannot reuse, allocate new block - if not self.free_block_ids: - raise RuntimeError("No available cache blocks") + # Process multimodal token index mappings for this block + mm_data_identifiers = [] + while ( + mm_start_counter < num_mm_inputs + and mm_token_index_mappings[mm_start_counter]["start_index"] < end_idx + and mm_token_index_mappings[mm_start_counter]["start_index"] + >= start_idx + ): + # for all mm_data whose start_index is within this block's token range, add its identifier to the list + mm_data_identifiers.append( + mm_token_index_mappings[mm_start_counter]["identifier"] + ) + mm_caching_queue.put((mm_start_counter)) + mm_start_counter += 1 + + prefix_hash = ( + self.compute_hash(block_tokens, prefix_hash, mm_data_identifiers) + if len(block_tokens) == self.block_size + else -1 + ) + + # Try to reuse existing block if no previous cache miss yet + cached_block_id = ( + self.hash_to_block_id.get(prefix_hash, -1) if not cache_miss else -1 + ) + if ( + cached_block_id != -1 + and self.blocks[cached_block_id].token_ids != block_tokens + ): + cached_block_id = -1 + if end_idx == num_tokens and remain_tokens == 0: + # Spicial case, when the last block is fully packed, we cannot reuse it because we need to leave at least one uncached token for forward + cached_block_id = -1 + + # Deal with the first cache miss + if not cache_miss and cached_block_id == -1: + max_blocks_to_reuse = min(max_blocks_to_reuse, block_idx) + cache_miss = True + + if not cache_miss: + # pop fully cached mm_data + while ( + not mm_caching_queue.empty() + and mm_token_index_mappings[mm_caching_queue.queue[0]]["end_index"] + < end_idx + ): + mm_caching_queue.get() + + blocks_blueprint.append( + {"prefix_hash": prefix_hash, "block_id": cached_block_id} + ) + + # If there is one incomplete mm_data, tailing blocks need to fall back until all included mm_data are complete + if not mm_caching_queue.empty(): + incomplete_mm = mm_token_index_mappings[mm_caching_queue.get()] + incomplete_mm_start = incomplete_mm[ + "start_index" + ] # Fall back until this index is no longer included in the block + max_blocks_to_reuse = min( + max_blocks_to_reuse, incomplete_mm_start // self.block_size + ) + + num_cached_tokens = max_blocks_to_reuse * self.block_size + + for block_id in range(num_blocks): + n_block_tokens = self.block_size + + if block_id < max_blocks_to_reuse: + # Reuse block + block = self.blocks[blocks_blueprint[block_id]["block_id"]] + block.ref_count += 1 - new_block_id = self.free_block_ids[0] - if prefix_hash != -1: - block = self._allocate_full_block(new_block_id) - block.update(prefix_hash, block_tokens) else: - block = self._allocate_partial_block(new_block_id) - block_table.append(new_block_id) - - # Generate slot_mapping - for i in range(len(block_tokens)): - slot_mapping.append(new_block_id * self.block_size + i) + new_block_id = self.free_block_ids[0] + if blocks_blueprint[block_id]["prefix_hash"] != -1: + start_idx = block_id * self.block_size + end_idx = start_idx + self.block_size + block_tokens = token_ids[start_idx:end_idx] + block = self._allocate_full_block(new_block_id) + block.update( + blocks_blueprint[block_id]["prefix_hash"], block_tokens + ) + else: + block = self._allocate_partial_block(new_block_id) + n_block_tokens = remain_tokens + slot_mapping.extend( + list( + range( + block.block_id * self.block_size, + block.block_id * self.block_size + n_block_tokens, + ) + ) + ) + + block_table.append(block.block_id) return block_table, slot_mapping, num_cached_tokens diff --git a/python/infinilm/llm/llm.py b/python/infinilm/llm/llm.py index cba3af83a..953b35e59 100644 --- a/python/infinilm/llm/llm.py +++ b/python/infinilm/llm/llm.py @@ -442,17 +442,29 @@ def generate( for content in contents: request_id = f"cmpl-{uuid.uuid4().hex}" processed_inputs = None + mm_index_mappings = None if apply_chat_template: prompt = self.engine.apply_chat_template( content, add_generation_prompt=True ) - images, videos, audios = resolve_multimodal_inputs(content) + mm_inputs = resolve_multimodal_inputs(content) + processed_inputs = self.engine.process( - prompt, images, videos, audios, return_tensors="pt" + prompt, + mm_inputs["images"], + mm_inputs["videos"], + mm_inputs["audios"], + return_tensors="pt", ) prompt_token_ids = processed_inputs.get("input_ids").flatten().tolist() + mm_index_mappings = self.engine.processor.get_mm_token_index_list( + prompt_token_ids, + image_ids=mm_inputs["image_urls"], + video_ids=mm_inputs["video_urls"], + audio_ids=mm_inputs["audio_urls"], + ) else: prompt = content prompt_token_ids = self.engine.tokenize(prompt) @@ -462,6 +474,7 @@ def generate( prompt=prompt, prompt_token_ids=prompt_token_ids, processed_inputs=processed_inputs, + mm_token_index_mappings=mm_index_mappings, sampling_params=sampling_params, eos_token_ids=self.engine.eos_token_ids, ) @@ -688,7 +701,7 @@ def add_request( if request_id is None: request_id = f"cmpl-{uuid.uuid4().hex}" - images, videos, audios = None, None, None + mm_index_mappings = None processed_inputs = None if prompt_token_ids is not None: @@ -708,12 +721,23 @@ def add_request( messages, add_generation_prompt=add_generation_prompt ) - images, videos, audios = resolve_multimodal_inputs(messages) + mm_inputs = resolve_multimodal_inputs(messages) + processed_inputs = self.engine.process( - prompt, images, videos, audios, return_tensors="pt" + prompt, + mm_inputs["images"], + mm_inputs["videos"], + mm_inputs["audios"], + return_tensors="pt", ) prompt_token_ids = processed_inputs.get("input_ids").flatten().tolist() + mm_index_mappings = self.engine.processor.get_mm_token_index_list( + prompt_token_ids, + image_ids=mm_inputs["image_urls"], + video_ids=mm_inputs["video_urls"], + audio_ids=mm_inputs["audio_urls"], + ) if sampling_params is None: sampling_params = SamplingParams(max_tokens=self.config.max_tokens) @@ -726,6 +750,7 @@ def add_request( prompt=prompt, prompt_token_ids=prompt_token_ids, processed_inputs=processed_inputs, + mm_token_index_mappings=mm_index_mappings, sampling_params=sampling_params, eos_token_ids=self.engine.eos_token_ids, request_data=request_data, diff --git a/python/infinilm/llm/request.py b/python/infinilm/llm/request.py index 15bcf69f4..a94c2f68b 100644 --- a/python/infinilm/llm/request.py +++ b/python/infinilm/llm/request.py @@ -106,6 +106,7 @@ def __init__( prompt: Optional[str] = None, prompt_token_ids: Optional[List[int]] = None, processed_inputs: Optional[dict] = None, + mm_token_index_mappings: Optional[List[dict]] = None, sampling_params: Optional[SamplingParams] = None, eos_token_ids: Optional[List[int]] = None, arrival_time: Optional[float] = None, @@ -122,6 +123,7 @@ def __init__( self.prompt_token_ids: List[int] = prompt_token_ids or [] self.prompt_length: int = len(self.prompt_token_ids) self.processed_inputs: Optional[dict] = processed_inputs + self.mm_token_index_mappings: Optional[List[dict]] = mm_token_index_mappings # Sampling parameters self.sampling_params: SamplingParams = sampling_params or SamplingParams() @@ -186,6 +188,9 @@ def get_num_blocks_required(self, block_size: int) -> int: def get_max_tokens(self) -> Optional[int]: return self.sampling_params.max_tokens + def get_mm_token_index_mappings(self) -> Optional[List[dict]]: + return self.mm_token_index_mappings + def is_finished(self) -> bool: return self.status in [ RequestStatus.FINISHED, diff --git a/python/infinilm/llm/scheduler.py b/python/infinilm/llm/scheduler.py index f9c11635a..c5f4921a9 100644 --- a/python/infinilm/llm/scheduler.py +++ b/python/infinilm/llm/scheduler.py @@ -86,7 +86,9 @@ def schedule(self) -> Optional[SchedulerOutput]: # Allocate blocks with automatic prefix caching support req.block_table, req.slot_mapping, req.num_cached_tokens = ( - self.cache_manager.allocate_blocks(req_tokens, req.block_table) + self.cache_manager.allocate_blocks( + req_tokens, req.block_table, req.get_mm_token_index_mappings() + ) ) req.num_blocks = len(req.block_table) diff --git a/python/infinilm/multimodal/multimodal.py b/python/infinilm/multimodal/multimodal.py index 7b067734d..7e14a8c5e 100644 --- a/python/infinilm/multimodal/multimodal.py +++ b/python/infinilm/multimodal/multimodal.py @@ -1,5 +1,23 @@ -from typing import List, Union +from typing import List, Optional, Union from PIL import Image +import xxhash + + +def has_multimodal_inputs(messages: Union[List[dict], dict]) -> bool: + """Check if the input messages contain any multimodal inputs.""" + if isinstance(messages, dict): + messages = [messages] + + for msg in messages: + content = msg.get("content", []) + if not isinstance(content, list): + return False + + for item in content: + if item.get("type") in ["image_url", "video_url", "audio_url"]: + return True + + return False def resolve_multimodal_inputs(messages: Union[List[dict], dict]): @@ -8,8 +26,11 @@ def resolve_multimodal_inputs(messages: Union[List[dict], dict]): messages = [messages] images = [] + image_urls = [] videos = [] + video_urls = [] audios = [] + audio_urls = [] for msg in messages: content = msg.get("content", []) @@ -19,11 +40,19 @@ def resolve_multimodal_inputs(messages: Union[List[dict], dict]): for item in content: if item.get("type") == "text": pass - elif item.get("type") == "image": + elif item.get("type") == "image_url": # TODO support other image url formats - images.append(Image.open(item["image_url"])) + images.append(Image.open(item["image_url"]["url"])) + image_urls.append(item["image_url"]["url"]) else: # TODO support video/audio raise NotImplementedError("Only image input is supported for now") - return images, videos, audios + return { + "images": images, + "image_urls": image_urls, + "videos": videos, + "video_urls": video_urls, + "audios": audios, + "audio_urls": audio_urls, + } diff --git a/python/infinilm/processors/basic_llm_processor.py b/python/infinilm/processors/basic_llm_processor.py index e341f3ba6..a3bc95f7f 100644 --- a/python/infinilm/processors/basic_llm_processor.py +++ b/python/infinilm/processors/basic_llm_processor.py @@ -2,6 +2,7 @@ from transformers import AutoTokenizer from ..llm.static_scheduler import StaticSchedulerOutput from ..llm.scheduler import SchedulerOutput +from typing import override @register_processor("default") @@ -11,6 +12,7 @@ def __init__(self, model_dir_path: str): model_dir_path, trust_remote_code=True ) + @override def __call__(self, prompt: str, return_tensors: str = None, **kwargs) -> dict: if return_tensors is None: return self.tokenizer(prompt) @@ -25,6 +27,7 @@ def __call__(self, prompt: str, return_tensors: str = None, **kwargs) -> dict: # "pt" or "np" or "tf". return self.tokenizer(prompt, return_tensors="pt") + @override def apply_chat_template( self, conversation, @@ -50,6 +53,7 @@ def apply_chat_template( **kwargs, ) + @override def build_model_inputs( self, scheduler_output: SchedulerOutput | StaticSchedulerOutput, @@ -237,5 +241,12 @@ def _build_model_input_from_batch_scheduler_output( "top_p": top_p, } + @override def get_tokenizer(self): return self.tokenizer + + @override + def get_mm_token_index_list( + self, prompt_token_ids, image_ids=None, video_ids=None, audio_ids=None, **kwargs + ): + return [] diff --git a/python/infinilm/processors/minicpmv_processor.py b/python/infinilm/processors/minicpmv_processor.py new file mode 100644 index 000000000..c04de04b3 --- /dev/null +++ b/python/infinilm/processors/minicpmv_processor.py @@ -0,0 +1,324 @@ +from typing import override + +from transformers import AutoConfig, AutoProcessor + +from .processor import InfinilmProcessor +from .processor import register_processor + + +@register_processor("minicpmv") +class MiniCPMVProcessor(InfinilmProcessor): + def __init__(self, model_dir_path: str): + """Initialize the processor with the model directory path.""" + self.processor = AutoProcessor.from_pretrained( + model_dir_path, trust_remote_code=True + ) + self.tokenizer = self.processor.tokenizer + self.config = AutoConfig.from_pretrained(model_dir_path, trust_remote_code=True) + self.pixel_values_dtype = ( + self.config.dtype + if hasattr(self.config, "dtype") + else self.config.torch_dtype + ) + + @override + def __call__( + self, + prompt, + images=None, + videos=None, + audios=None, + return_tensors: str = None, + **kwargs, + ) -> dict: + """ + Process the input prompt and media into final inputs. + + { + 'input_ids': TensorShape(1, seq_len), + 'attention_mask': TensorShape(1, seq_len), + 'pixel_values': [[TensorShape(patch_channel, patch_height, patch_width * dim) * n_patches]], + 'image_sizes': [[TensorShape(2,) * n_images]], + 'image_bound': [TensorShape(total_patch, 2)], + 'tgt_sizes': [TensorShape(total_patch, 2)], + } + + For text-only input, result only contains 'input_ids' and 'attention_mask'. + """ + if not images and not videos and not audios: + return self.tokenizer(prompt, return_tensors=return_tensors, **kwargs) + + results = self.processor( + prompt, images=images, return_tensors="pt", max_slice_nums=9, **kwargs + ) + + return results + + @override + def apply_chat_template( + self, + conversation, + add_generation_prompt: bool = False, + tokenize: bool = True, + **kwargs, + ): + """Apply chant template given input messages""" + processed_msg = [] + for msg in conversation: + content = msg["content"] + if not isinstance(content, list): + if isinstance(content, str): + processed_msg.append( + {"role": msg.get("role", "user"), "content": content} + ) + else: + raise ValueError("Content must be a list of items or a string") + continue + + processed_content = [] + for item in content: + if item.get("type") == "text": + processed_content.append(item.get("text", "")) + elif item.get("type") == "image_url": + processed_content.append("(./)") + else: + raise NotImplementedError("Only image input is supported for now") + + processed_msg.append( + { + "role": msg.get("role", "user"), + "content": "\n".join(processed_content), + } + ) + + return self.tokenizer.apply_chat_template( + conversation=processed_msg, + add_generation_prompt=add_generation_prompt, + tokenize=tokenize, + **kwargs, + ) + + @override + def build_model_inputs( + self, + scheduler_output, + temperature: float = 1.0, + top_p: float = 0.8, + top_k: int = 1, + **kwargs, + ) -> dict: + """Build batched infinilm model inputs from the scheduler output.""" + import infinicore + + if not scheduler_output.scheduled_requests: + raise RuntimeError( + "build_model_inputs called with empty scheduled_requests" + ) + + tokens = [] + seq_lens = [] + seq_offsets = [0] + block_tables = [] + slot_mapping = [] + cached_lens = [] + position_ids = [] + cu_seqlens = [0] + mm_data = {} + + max_block_table_len = max( + len(req.block_table) for req in scheduler_output.scheduled_requests + ) + current_offset = 0 + + for req_id, req in enumerate(scheduler_output.scheduled_requests): + num_cached = req.num_cached_tokens + if scheduler_output.is_prefill: + # Prefill phase + req_tokens = req.get_input_tokens() + tokens_to_compute = req_tokens[num_cached:] + tokens.extend(tokens_to_compute) + + compute_len = len(tokens_to_compute) + seq_len = len(req_tokens) + seq_lens.append(seq_len) + + current_offset += compute_len + seq_offsets.append(current_offset) + + slot_mapping.extend(req.slot_mapping) + cached_lens.append(num_cached) + position_ids.extend(range(num_cached, num_cached + compute_len)) + + if ( + req.processed_inputs is not None + and "pixel_values" in req.processed_inputs + ): + import torch + + num_cached_patch = ( + (req.processed_inputs["image_bound"][0][:, 1] <= num_cached) + .sum() + .item() + ) + + # if all patches are already cached, skip processing multimodal inputs and return text-only inputs for this request + if num_cached_patch < len(req.processed_inputs["pixel_values"]): + # 1. pixel_values + all_pixel_values = [] + pixel_values = req.processed_inputs["pixel_values"] + tgt_sizes = req.processed_inputs["tgt_sizes"] + image_bound = req.processed_inputs["image_bound"] + for pv in pixel_values: + all_pixel_values.extend( + [ + t.flatten(end_dim=1) + .permute(1, 0) + .to(self.pixel_values_dtype) + for i, t in enumerate(pv) + if i >= num_cached_patch + ] + ) + + pixel_values_tensor = torch.nn.utils.rnn.pad_sequence( + all_pixel_values, batch_first=True, padding_value=0.0 + ) + B, L, _ = pixel_values_tensor.shape + pixel_values_tensor = ( + pixel_values_tensor.permute(0, 2, 1) + .reshape(B, 3, -1, L) + .contiguous() + ) + pixel_values_infini = infinicore.from_torch(pixel_values_tensor) + + # 2. tgt_sizes + all_tgt_sizes = [ + tgt_size + for i, tgt_size in enumerate(tgt_sizes) + if isinstance(tgt_size, torch.Tensor) + and i >= num_cached_patch + ] + + tgt_sizes_tensor = torch.vstack(all_tgt_sizes).to(torch.int64) + + tgt_sizes_infini = infinicore.from_torch(tgt_sizes_tensor) + + # 3. image_bound + batch_size = len(image_bound) + max_ranges = max(len(b) for b in image_bound) + + bound = torch.zeros( + (batch_size, max_ranges, 2), dtype=torch.int64 + ) + + for i, bnd in enumerate(image_bound): + bnd = bnd[num_cached_patch:, :] + if len(bnd) > 0: + bound[i, : len(bnd), :] = bnd + + image_bound_infini = infinicore.from_torch(bound) + + def append_mm_data(mm_data__: dict, key__: str, value__): + if mm_data__.get(key__) is None: + mm_data[key__] = [value__] + else: + mm_data[key__].append(value__) + + append_mm_data(mm_data, "pixel_values", pixel_values_infini) + append_mm_data(mm_data, "tgt_sizes", tgt_sizes_infini) + append_mm_data(mm_data, "image_bound", image_bound_infini) + append_mm_data(mm_data, "image_req_ids", req_id) + + else: + # Decode phase + seq_len = req.get_total_length() + last_token = req.generated_token_ids[-1] + tokens.append(last_token) + seq_lens.append(seq_len) + + current_offset += 1 + seq_offsets.append(current_offset) + + slot_mapping.extend(req.slot_mapping) + cached_lens.append(num_cached) + position_ids.append(seq_len - 1) + + # Pad block_table to same length + padded_block_table = req.block_table + [-1] * ( + max_block_table_len - len(req.block_table) + ) + block_tables.append(padded_block_table) + cu_seqlens.append(cu_seqlens[-1] + seq_len) + + return { + "input_ids": infinicore.from_list([tokens], dtype=infinicore.int64), + "position_ids": infinicore.from_list(position_ids, dtype=infinicore.int64), + "past_kv_lengths": infinicore.from_list( + cached_lens, dtype=infinicore.int32 + ), + "total_kv_lengths": infinicore.from_list(seq_lens, dtype=infinicore.int32), + "input_offsets": infinicore.from_list(seq_offsets, dtype=infinicore.int32), + "cu_seqlens": infinicore.from_list(cu_seqlens, dtype=infinicore.int32), + "block_tables": infinicore.from_list(block_tables, dtype=infinicore.int32), + "slot_mapping": infinicore.from_list(slot_mapping, dtype=infinicore.int64), + "temperature": temperature, + "top_k": top_k, + "top_p": top_p, + **mm_data, + } + + @override + def get_tokenizer(self): + """Return the text tokenizer associated with this processor.""" + return self.tokenizer + + @override + def get_mm_token_index_list( + self, prompt_token_ids, image_ids=None, video_ids=None, **kwargs + ): + image_idx = -1 + patch_start = [] + patch_end = [] + mm_token_index_list = [] + for i, token_id in enumerate(prompt_token_ids): + if token_id == self.tokenizer.im_id_start_id: + assert len(patch_start) == len(patch_end), ( + "Invalid prompt format: image start token found before previous image end token is closed" + ) + # deal with previous image patches + if patch_start: + for start, end in zip(patch_start, patch_end): + mm_token_index_list.append( + { + "start_index": start, + "end_index": end, + "identifier": image_ids[image_idx], + } + ) + # reset patch start and end for next image + patch_start = [] + patch_end = [] + + # increment image index for next image + image_idx += 1 + patch_start.append(i + 1) + elif token_id == self.tokenizer.slice_start_id: + patch_start.append(i + 1) + elif ( + token_id == self.tokenizer.im_id_end_id + or token_id == self.tokenizer.slice_end_id + ): + patch_end.append(i - 1) + + if patch_start: + for start, end in zip(patch_start, patch_end): + mm_token_index_list.append( + { + "start_index": start, + "end_index": end, + "identifier": image_ids[image_idx], + } + ) + assert image_idx + 1 == len(image_ids), ( + "The number of image tokens does not match the number of images data provided" + ) + return mm_token_index_list diff --git a/python/infinilm/processors/processor.py b/python/infinilm/processors/processor.py index 9d712878e..a2952bc1e 100644 --- a/python/infinilm/processors/processor.py +++ b/python/infinilm/processors/processor.py @@ -13,25 +13,35 @@ def __call__( **kwargs, ) -> dict: """Process the input prompt and media into final inputs.""" - raise NotImplementedError("InfinilmProcessor is not implemented yet") + raise NotImplementedError("__call__ is not implemented yet") def apply_chat_template( self, - messages, + conversation, add_generation_prompt: bool = False, tokenize: bool = True, **kwargs, ): """Apply chant template given input messages""" - raise NotImplementedError("InfinilmProcessor is not implemented yet") + raise NotImplementedError("apply_chat_template is not implemented yet") def build_model_inputs(self, scheduler_output, **kwargs) -> dict: """Build batched infinilm model inputs from the scheduler output.""" - raise NotImplementedError("InfinilmProcessor is not implemented yet") + raise NotImplementedError("build_model_inputs is not implemented yet") def get_tokenizer(self): """Return the text tokenizer associated with this processor.""" - raise NotImplementedError("InfinilmProcessor is not implemented yet") + raise NotImplementedError("get_tokenizer is not implemented yet") + + def get_mm_token_index_list( + self, prompt_token_ids, image_ids=None, video_ids=None, audio_ids=None, **kwargs + ): + """ + Get the list of starting token index and identifier mapping for multimodal inputs, sorted by index. + Return: [{"start_index": , "identifier": }, ...] + """ + raise NotImplementedError("get_mm_token_index_list is not implemented yet") + # Global registry mapping model_type strings to their Processor classes _PROCESSOR_REGISTRY = {} @@ -39,6 +49,7 @@ def get_tokenizer(self): def register_processor(model_type: str): """Decorator to register a Processor class for a specific model type.""" + def decorator(cls): if model_type in _PROCESSOR_REGISTRY: raise ValueError( @@ -47,6 +58,7 @@ def decorator(cls): ) _PROCESSOR_REGISTRY[model_type] = cls return cls + return decorator diff --git a/python/infinilm/server/inference_server.py b/python/infinilm/server/inference_server.py index 71e9c992f..3d35941c3 100644 --- a/python/infinilm/server/inference_server.py +++ b/python/infinilm/server/inference_server.py @@ -218,7 +218,8 @@ async def chat_completions(request: Request): data["messages"] = [{"role": "user", "content": data.get("prompt")}] # Normalize messages to handle multimodal content (list format) - data["messages"] = self._normalize_messages(data.get("messages", [])) + # data["messages"] = self._normalize_messages(data.get("messages", [])) + data["messages"] = data.get("messages", []) stream = data.get("stream", False) request_id = f"cmpl-{uuid.uuid4().hex}" diff --git a/scripts/test_perf.py b/scripts/test_perf.py index 6a33d8f0d..f1be60334 100644 --- a/scripts/test_perf.py +++ b/scripts/test_perf.py @@ -1,8 +1,11 @@ import asyncio +from pathlib import Path import time from openai import AsyncOpenAI import argparse import random +import subprocess + PROMPTS = [ "如果猫能写诗,它们会写些什么?", @@ -27,13 +30,95 @@ "想象一下,如果每个人都能读懂他人的思想。", ] +IMAGE_PROMPTS = [ + "请描述一下图片里的内容。", + "图片里有人吗?", + "请结合图片,讲一个小故事。", +] + NUM_REQUESTS = 64 CONCURRENCY = 20 API_URL = "http://127.0.0.1:8000" -MODEL = "FM9G-7B" +MODEL = "" + + +class ImageCollector: + def __init__(self, dir_path: str, port=None): + self.dir_path = Path(dir_path).resolve() + + if not self.dir_path.is_dir(): + raise ValueError(f"Not a valid directory: {self.dir_path}") + + self.image_files = [ + file.resolve() + for file in self.dir_path.rglob("*") + if file.is_file() and file.suffix.lower() in [".jpg", ".jpeg"] + ] + + assert len(self.image_files) > 0, "No image file found in provided directory!" + + self.host = "127.0.0.1" + self.port = port + self.server_process = None + + # Only start HTTP server if BOTH host and port are provided + self.use_http = self.host is not None and self.port is not None + + if self.use_http: + self._start_server() + + def _start_server(self): + print( + f"[ImageCollector] Starting image HTTP server...\n" + f" Directory: {self.dir_path}\n" + f" URL: http://{self.host}:{self.port}\n" + ) + self.server_process = subprocess.Popen( + [ + "python", + "-m", + "http.server", + str(self.port), + "--bind", + self.host, + ], + cwd=str(self.dir_path), + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + time.sleep(0.5) -async def benchmark_user(client, semaphore, queue, results, user_id, verbose): + def stop_server(self): + if self.server_process is not None: + self.server_process.terminate() + + try: + self.server_process.wait(timeout=3) + except subprocess.TimeoutExpired: + self.server_process.kill() + + self.server_process = None + + def __del__(self): + self.stop_server() + + def random_image_url(self): + image_path = random.choice(self.image_files) + + # Return local absolute path + if not self.use_http: + return str(image_path) + + # Return HTTP URL + relative_path = image_path.relative_to(self.dir_path) + + return f"http://{self.host}:{self.port}/{relative_path.as_posix()}" + + +async def benchmark_user( + client, semaphore, queue, results, user_id, verbose, image_collector=None +): while True: async with semaphore: task_id = await queue.get() @@ -41,14 +126,33 @@ async def benchmark_user(client, semaphore, queue, results, user_id, verbose): queue.task_done() break - question = random.choice(PROMPTS) try: print(f"🚀 User#{user_id} Sending request #{task_id}") + messages = None + if image_collector is None: + messages = [{"role": "user", "content": random.choice(PROMPTS)}] + else: + messages = [ + { + "role": "user", + "content": [ + { + "type": "image_url", + "image_url": { + "url": image_collector.random_image_url() + }, + }, + {"type": "text", "text": random.choice(IMAGE_PROMPTS)}, + ], + } + ] + + print(messages) start_time = time.time() stream = await client.chat.completions.create( model=MODEL, - messages=[{"role": "user", "content": question}], + messages=messages, stream=True, ) @@ -97,7 +201,7 @@ async def benchmark_user(client, semaphore, queue, results, user_id, verbose): print(f" 📏 平均 token 解码时间: {ms_per_token:.2f} ms/token") else: print(f" 📏 平均 token 解码时间: N/A (no token generated)") - print(f" ❓ 提问: {question}") + print(f" ❓ 提问: {messages}") print(f" 💬 回答: {answer}\n") queue.task_done() @@ -108,7 +212,7 @@ async def benchmark_user(client, semaphore, queue, results, user_id, verbose): queue.task_done() -async def run_benchmark(verbose=False): +async def run_benchmark(verbose=False, image_collector=None): client = AsyncOpenAI(base_url=API_URL, api_key="default") semaphore = asyncio.Semaphore(CONCURRENCY) queue = asyncio.Queue() @@ -120,7 +224,9 @@ async def run_benchmark(verbose=False): users = [ asyncio.create_task( - benchmark_user(client, semaphore, queue, results, user_id, verbose) + benchmark_user( + client, semaphore, queue, results, user_id, verbose, image_collector + ) ) for user_id in range(CONCURRENCY) ] @@ -175,6 +281,17 @@ async def run_benchmark(verbose=False): if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--verbose", action="store_true") + parser.add_argument("--image-dir", type=str, default=None) + parser.add_argument("--mm-port", type=str, default=None) + parser.add_argument("--api-url", type=str, default="127.0.0.1:8000") + parser.add_argument("--model", type=str, default="") args = parser.parse_args() - asyncio.run(run_benchmark(args.verbose)) + API_URL = "http://" + args.api_url + MODEL = args.model + + image_collector = None + if args.image_dir is not None: + image_collector = ImageCollector(args.image_dir, port=args.mm_port) + + asyncio.run(run_benchmark(args.verbose, image_collector)) diff --git a/test/service/request.py b/test/service/request.py new file mode 100644 index 000000000..f93c636f9 --- /dev/null +++ b/test/service/request.py @@ -0,0 +1,141 @@ +import argparse +import asyncio +import time + +from openai import AsyncOpenAI + + +def get_args(): + parser = argparse.ArgumentParser(description="Send request in OpenAI format") + + parser.add_argument( + "--system", + type=str, + default="", + help="system prompt", + ) + parser.add_argument( + "--content", + action="append", + default=[], + help="start with content type['text', 'image_url'] and colon, e.g. text:hello or image_url:http://example.com/image.jpg", + ) + + parser.add_argument( + "--port", type=int, default=8000, help="Infer server port, default 8000" + ) + parser.add_argument( + "--host", + type=str, + default="127.0.0.1", + help="Infer server url, default 127.0.0.1", + ) + + parser.add_argument( + "--api-url", + type=str, + default=None, + help="Full service url, if given host and port will be ignored", + ) + + parser.add_argument( + "--model", + type=str, + default="default", + help="Name or path of the model being served, needed by vllm", + ) + + return parser.parse_args() + + +def build_messages(content_args, system_prompt): + contents = [] + for content in content_args: + if ":" not in content: + raise ValueError( + f"Invalid content format: '{content}'. Expected format is 'type:value'." + ) + ctype, cvalue = content.split(":", 1) + + if ctype == "text": + contents.append({"type": "text", "text": cvalue}) + elif ctype == "image_url": + contents.append({"type": "image_url", "image_url": {"url": cvalue}}) + else: + raise ValueError( + f"Unsupported content type: '{ctype}'. Supported types are 'text' and 'image_url'." + ) + + messages = ( + [] if not system_prompt else [{"role": "system", "content": system_prompt}] + ) + messages.append({"role": "user", "content": contents}) + return messages + + +async def benchmark_user(client, messages, model_name): + try: + print(f" ❓ 提问: {messages}") + start_time = time.time() + stream = await client.chat.completions.create( + model=model_name, + messages=messages, + stream=True, + ) + + first_token_time = None + total_tokens = 0 + answer_chunks = [] + + async for chunk in stream: + if first_token_time is None: + first_token_time = time.time() + delta = chunk.choices[0].delta.content + if delta: + answer_chunks.append(delta) + total_tokens += 1 + if chunk.choices[0].finish_reason is not None: + break + + end_time = time.time() + + ttft = first_token_time - start_time if first_token_time else None + elapsed_time = end_time - start_time if start_time else None + ms_per_token_decode = ( + ((elapsed_time - ttft) / (total_tokens - 1) * 1000) + if total_tokens - 1 > 0 and elapsed_time + else None + ) + + answer = "".join(answer_chunks) + print(f" 💬 回答: {answer}\n") + print(f" 总耗时: {elapsed_time:.3f}s") + print(f" 首字延迟 TTFT: {ttft:.3f}s") + print(f" Token间延迟 ITL: {ms_per_token_decode:.2f} ms") + print( + f" Decode吞吐: {1000 / ms_per_token_decode:.2f} tokens/s" + if ms_per_token_decode + else " Decode吞吐: N/A" + ) + + except Exception as e: + print(f" ❌ Error: {e}\n") + + +def main(): + args = get_args() + if not args.content: + args.content = ["text:山东最高的山是?"] + messages = build_messages(args.content, args.system) + api_url = ( + f"http://{args.api_url}" + if args.api_url is not None + else f"http://{args.host}:{args.port}" + ) + + client = AsyncOpenAI(base_url=api_url, api_key="default") + asyncio.run(benchmark_user(client, messages, args.model_name)) + + +if __name__ == "__main__": + main()