From bd1e75dec794c6632e0fe77c7b74d93b3c3fe2c8 Mon Sep 17 00:00:00 2001 From: ST-XX <15625257+ST-XX@users.noreply.github.com> Date: Tue, 30 Dec 2025 17:44:04 +0800 Subject: [PATCH 1/3] get_output_kv_signal blocking read mode --- custom_ops/gpu_ops/cpp_extensions.cc | 1 + fastdeploy/cache_manager/cache_messager.py | 13 ++++++------- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/custom_ops/gpu_ops/cpp_extensions.cc b/custom_ops/gpu_ops/cpp_extensions.cc index edd6863af8d..160eb37da07 100644 --- a/custom_ops/gpu_ops/cpp_extensions.cc +++ b/custom_ops/gpu_ops/cpp_extensions.cc @@ -1146,6 +1146,7 @@ PYBIND11_MODULE(fastdeploy_ops, m) { */ m.def("get_output_kv_signal", &GetOutputKVSignal, + py::call_guard(), py::arg("x"), py::arg("rank_id"), py::arg("wait_flag"), diff --git a/fastdeploy/cache_manager/cache_messager.py b/fastdeploy/cache_manager/cache_messager.py index c3dc37079c9..fa10b684ecb 100644 --- a/fastdeploy/cache_manager/cache_messager.py +++ b/fastdeploy/cache_manager/cache_messager.py @@ -690,7 +690,7 @@ def prefill_layerwise_send_cache_thread(self): else: end_layer_idx = prefilled_layer_idx if sended_layer_idx == prefilled_layer_idx: # computation not in next layer - time.sleep(0.01) + time.sleep(0.001) for layer_idx in range(start_layer_idx, end_layer_idx + 1): for i, (block_id_start, block_id_end) in enumerate(block_start_end_list): @@ -822,24 +822,23 @@ def consume_signals(self): kv_signal_data = paddle.full(shape=[512 * 3 + 2], fill_value=-1, dtype="int32") while True: try: - get_output_kv_signal(kv_signal_data, self.rank_id, 0) # wait_flag + get_output_kv_signal(kv_signal_data, self.rank_id, 1) # wait_flag if not self.cache_info: time.sleep(0.01) continue tasks_count = kv_signal_data[0] if tasks_count == -1: - time.sleep(0.001) continue - layer_id = kv_signal_data[1].numpy().tolist() + layer_id = kv_signal_data[1].item() if layer_id == self.num_layers - 1: logger.info(f"tasks_count: {tasks_count}, layer_id: {layer_id} self.rank_id {self.rank_id}") batch_engine_signals = [] # format for signal to put in cache_prefilled_engine_ids_queue: [(engine_idx1, prefilled_token_num1), (engine_idx2, prefilled_token_num2)] with self.engine_cache_task_thread_lock: for bi in range(tasks_count): - engine_idx = kv_signal_data[3 * bi + 2].numpy().tolist() - chuck_token_offset = kv_signal_data[3 * bi + 3].numpy().tolist() - current_seq_len = kv_signal_data[3 * bi + 4].numpy().tolist() + engine_idx = kv_signal_data[3 * bi + 2].item() + chuck_token_offset = kv_signal_data[3 * bi + 3].item() + current_seq_len = kv_signal_data[3 * bi + 4].item() self.engine_cache_tasks[engine_idx]["prefilled_layer_idx"] = layer_id self.engine_cache_tasks[engine_idx]["prefilled_token_num"] = ( chuck_token_offset + current_seq_len From 62adaeaf729217d5f81a10a26554a6bbfbf003fd Mon Sep 17 00:00:00 2001 From: ST-XX <15625257+ST-XX@users.noreply.github.com> Date: Tue, 30 Dec 2025 18:01:22 +0800 Subject: [PATCH 2/3] send first token before recycle --- fastdeploy/output/token_processor.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/fastdeploy/output/token_processor.py b/fastdeploy/output/token_processor.py index 3c77b7b916e..21af6fa8cdb 100644 --- a/fastdeploy/output/token_processor.py +++ b/fastdeploy/output/token_processor.py @@ -488,14 +488,6 @@ def _recycle_resources(self, task_id, index, task, result=None, is_prefill=False llm_logger.info(f"finished_task_id: {finished_task_id}") self.prefill_result_status[finished_task_id[0]] = finished_task_id[1] if task_id in self.prefill_result_status: - if envs.ENABLE_V1_KVCACHE_SCHEDULER: - self.resource_manager.finish_requests_async(task_id) - else: - self.resource_manager.stop_flags[index] = True - self.resource_manager.tasks_list[index] = None - self.resource_manager._recycle_block_tables(task) - if task_id in self.resource_manager.req_dict: - del self.resource_manager.req_dict[task_id] if self.prefill_result_status[task_id] != "finished": result.error_code = 400 result.error_message = f"{task_id} failed to {self.prefill_result_status[task_id]}" @@ -504,6 +496,14 @@ def _recycle_resources(self, task_id, index, task, result=None, is_prefill=False ) result.metrics.send_request_output_to_decode_time = time.time() self.split_connector.send_first_token(task.disaggregate_info, [result]) + if envs.ENABLE_V1_KVCACHE_SCHEDULER: + self.resource_manager.finish_requests_async(task_id) + else: + self.resource_manager.stop_flags[index] = True + self.resource_manager.tasks_list[index] = None + self.resource_manager._recycle_block_tables(task) + if task_id in self.resource_manager.req_dict: + del self.resource_manager.req_dict[task_id] break else: # TODO: Refine checking sending cache and do not keep waiting From c399b8d349745475b738d9e517aeff5a22df479d Mon Sep 17 00:00:00 2001 From: ST-XX <15625257+ST-XX@users.noreply.github.com> Date: Tue, 30 Dec 2025 21:49:32 +0800 Subject: [PATCH 3/3] xpu get_output_kv_signal blocking read mode --- custom_ops/xpu_ops/src/ops/pybind/pybind.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/custom_ops/xpu_ops/src/ops/pybind/pybind.cc b/custom_ops/xpu_ops/src/ops/pybind/pybind.cc index 23817415772..45a056e0cea 100644 --- a/custom_ops/xpu_ops/src/ops/pybind/pybind.cc +++ b/custom_ops/xpu_ops/src/ops/pybind/pybind.cc @@ -863,6 +863,7 @@ PYBIND11_MODULE(fastdeploy_ops, m) { m.def("get_output_kv_signal", &GetOutputKVSignal, + py::call_guard(), py::arg("x"), py::arg("rank_id"), py::arg("wait_flag"),