Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
241 changes: 149 additions & 92 deletions fastdeploy/model_executor/pre_and_post_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,55 +380,82 @@ def save_output_normal(
# In the future, we will abandon this approach.
if envs.FD_USE_GET_SAVE_OUTPUT_V1:
if save_each_rank or model_output.mp_rank == 0:
recover_share_inputs_map = recover_batch_index_for_output(
share_inputs,
model_output.index_to_batch_id,
model_output.enable_pd_reorder,
["sampled_token_ids"],
)
recover_batch_index_for_sampler_output(
sampler_output, model_output.index_to_batch_id, model_output.enable_pd_reorder
)
output = _build_stream_transfer_data(
recover_share_inputs_map["sampled_token_ids"],
logprobs=sampler_output.logprobs_tensors,
prompt_logprobs_list=model_output.prompt_logprobs_list,
)
if not model_output.enable_pd_reorder:
output = _build_stream_transfer_data(
share_inputs["sampled_token_ids"],
logprobs=sampler_output.logprobs_tensors,
prompt_logprobs_list=model_output.prompt_logprobs_list,
)
else:
recover_share_inputs_map = recover_batch_index_for_output(
share_inputs,
model_output.index_to_batch_id,
model_output.enable_pd_reorder,
["sampled_token_ids"],
)
recover_batch_index_for_sampler_output(
sampler_output, model_output.index_to_batch_id, model_output.enable_pd_reorder
)
output = _build_stream_transfer_data(
recover_share_inputs_map["sampled_token_ids"],
logprobs=sampler_output.logprobs_tensors,
prompt_logprobs_list=model_output.prompt_logprobs_list,
)
async_output_queue.put(output)
else:
if sampler_output.logprobs_tensors is None:
recover_share_inputs_map = recover_batch_index_for_output(
share_inputs,
model_output.index_to_batch_id,
model_output.enable_pd_reorder,
["last_preempted_idx", "sampled_token_ids"],
)
save_output(
recover_share_inputs_map["sampled_token_ids"],
model_output.not_need_stop,
recover_share_inputs_map["last_preempted_idx"],
model_output.mp_rank,
save_each_rank,
)
if not model_output.enable_pd_reorder:
save_output(
share_inputs["sampled_token_ids"],
model_output.not_need_stop,
share_inputs["last_preempted_idx"],
model_output.mp_rank,
save_each_rank,
)
else:
recover_share_inputs_map = recover_batch_index_for_output(
share_inputs,
model_output.index_to_batch_id,
model_output.enable_pd_reorder,
["last_preempted_idx", "sampled_token_ids"],
)
save_output(
recover_share_inputs_map["sampled_token_ids"],
model_output.not_need_stop,
recover_share_inputs_map["last_preempted_idx"],
model_output.mp_rank,
save_each_rank,
)
else:
recover_share_inputs_map = recover_batch_index_for_output(
share_inputs,
model_output.index_to_batch_id,
model_output.enable_pd_reorder,
["last_preempted_idx"],
)
recover_batch_index_for_sampler_output(
sampler_output, model_output.index_to_batch_id, model_output.enable_pd_reorder
)
save_output_topk(
share_inputs["sampled_token_ids"],
sampler_output.logprobs_tensors.logprob_token_ids,
sampler_output.logprobs_tensors.logprobs,
sampler_output.logprobs_tensors.selected_token_ranks,
model_output.not_need_stop,
recover_share_inputs_map["last_preempted_idx"],
model_output.mp_rank,
)
if not model_output.enable_pd_reorder:
save_output_topk(
share_inputs["sampled_token_ids"],
sampler_output.logprobs_tensors.logprob_token_ids,
sampler_output.logprobs_tensors.logprobs,
sampler_output.logprobs_tensors.selected_token_ranks,
model_output.not_need_stop,
share_inputs["last_preempted_idx"],
model_output.mp_rank,
)
else:
recover_share_inputs_map = recover_batch_index_for_output(
share_inputs,
model_output.index_to_batch_id,
model_output.enable_pd_reorder,
["last_preempted_idx"],
)
recover_batch_index_for_sampler_output(
sampler_output, model_output.index_to_batch_id, model_output.enable_pd_reorder
)
save_output_topk(
share_inputs["sampled_token_ids"],
sampler_output.logprobs_tensors.logprob_token_ids,
sampler_output.logprobs_tensors.logprobs,
sampler_output.logprobs_tensors.selected_token_ranks,
model_output.not_need_stop,
recover_share_inputs_map["last_preempted_idx"],
model_output.mp_rank,
)
share_inputs["last_preempted_idx"][:] = 0


Expand Down Expand Up @@ -529,54 +556,84 @@ def post_process_specualate(

if not skip_save_output:
if sampler_output.logprobs_tensors is None:
recover_model_output_map = recover_batch_index_for_output(
model_output,
model_output.index_to_batch_id,
model_output.enable_pd_reorder,
["accept_tokens", "accept_num", "seq_lens_decoder", "prompt_lens"],
)
recover_share_inputs = recover_batch_index_for_output(
share_inputs, model_output.index_to_batch_id, model_output.enable_pd_reorder, ["preempted_idx"]
)
speculate_save_output(
recover_model_output_map["accept_tokens"],
recover_model_output_map["accept_num"],
model_output.not_need_stop,
recover_model_output_map["seq_lens_decoder"],
recover_model_output_map["prompt_lens"],
recover_share_inputs["preempted_idx"],
model_output.mp_rank,
save_each_rank,
bool(envs.ENABLE_V1_KVCACHE_SCHEDULER),
)
if not model_output.enable_pd_reorder:
speculate_save_output(
model_output["accept_tokens"],
model_output["accept_num"],
model_output.not_need_stop,
model_output["seq_lens_decoder"],
model_output["prompt_lens"],
share_inputs["preempted_idx"],
model_output.mp_rank,
save_each_rank,
bool(envs.ENABLE_V1_KVCACHE_SCHEDULER),
)
else:
recover_model_output_map = recover_batch_index_for_output(
model_output,
model_output.index_to_batch_id,
model_output.enable_pd_reorder,
["accept_tokens", "accept_num", "seq_lens_decoder", "prompt_lens"],
)
recover_share_inputs = recover_batch_index_for_output(
share_inputs, model_output.index_to_batch_id, model_output.enable_pd_reorder, ["preempted_idx"]
)
speculate_save_output(
recover_model_output_map["accept_tokens"],
recover_model_output_map["accept_num"],
model_output.not_need_stop,
recover_model_output_map["seq_lens_decoder"],
recover_model_output_map["prompt_lens"],
recover_share_inputs["preempted_idx"],
model_output.mp_rank,
save_each_rank,
bool(envs.ENABLE_V1_KVCACHE_SCHEDULER),
)
else:
recover_batch_index_for_sampler_output(
sampler_output, model_output.index_to_batch_id, model_output.enable_pd_reorder
)
recover_model_output_map = recover_batch_index_for_output(
model_output,
model_output.index_to_batch_id,
model_output.enable_pd_reorder,
["seq_lens_decoder", "prompt_lens"],
)
recover_share_inputs = recover_batch_index_for_output(
share_inputs, model_output.index_to_batch_id, model_output.enable_pd_reorder, ["preempted_idx"]
)
speculate_save_output_topk(
sampler_output.sampled_token_ids,
sampler_output.logprobs_tensors.logprob_token_ids,
sampler_output.logprobs_tensors.logprobs,
sampler_output.logprobs_tensors.selected_token_ranks,
sampler_output.token_num_per_batch,
sampler_output.cu_batch_token_offset,
model_output.not_need_stop,
recover_model_output_map["seq_lens_decoder"],
recover_model_output_map["prompt_lens"],
recover_share_inputs["preempted_idx"],
3, # mtype
model_output.mp_rank,
save_each_rank,
)
if not model_output.enable_pd_reorder:
speculate_save_output_topk(
sampler_output.sampled_token_ids,
sampler_output.logprobs_tensors.logprob_token_ids,
sampler_output.logprobs_tensors.logprobs,
sampler_output.logprobs_tensors.selected_token_ranks,
sampler_output.token_num_per_batch,
sampler_output.cu_batch_token_offset,
model_output.not_need_stop,
model_output["seq_lens_decoder"],
model_output["prompt_lens"],
share_inputs["preempted_idx"],
3, # mtype
model_output.mp_rank,
save_each_rank,
)
else:
recover_batch_index_for_sampler_output(
sampler_output, model_output.index_to_batch_id, model_output.enable_pd_reorder
)
recover_model_output_map = recover_batch_index_for_output(
model_output,
model_output.index_to_batch_id,
model_output.enable_pd_reorder,
["seq_lens_decoder", "prompt_lens"],
)
recover_share_inputs = recover_batch_index_for_output(
share_inputs, model_output.index_to_batch_id, model_output.enable_pd_reorder, ["preempted_idx"]
)
speculate_save_output_topk(
sampler_output.sampled_token_ids,
sampler_output.logprobs_tensors.logprob_token_ids,
sampler_output.logprobs_tensors.logprobs,
sampler_output.logprobs_tensors.selected_token_ranks,
sampler_output.token_num_per_batch,
sampler_output.cu_batch_token_offset,
model_output.not_need_stop,
recover_model_output_map["seq_lens_decoder"],
recover_model_output_map["prompt_lens"],
recover_share_inputs["preempted_idx"],
3, # mtype
model_output.mp_rank,
save_each_rank,
)


def post_process(
Expand Down
Loading