diff --git a/custom_ops/gpu_ops/cpp_extensions.cc b/custom_ops/gpu_ops/cpp_extensions.cc index 07f50d107dc..076559d719e 100644 --- a/custom_ops/gpu_ops/cpp_extensions.cc +++ b/custom_ops/gpu_ops/cpp_extensions.cc @@ -1148,6 +1148,7 @@ PYBIND11_MODULE(fastdeploy_ops, m) { */ m.def("get_output_kv_signal", &GetOutputKVSignal, + py::call_guard(), py::arg("x"), py::arg("rank_id"), py::arg("wait_flag"), diff --git a/custom_ops/xpu_ops/src/ops/pybind/pybind.cc b/custom_ops/xpu_ops/src/ops/pybind/pybind.cc index a2e753012b0..40489be9718 100644 --- a/custom_ops/xpu_ops/src/ops/pybind/pybind.cc +++ b/custom_ops/xpu_ops/src/ops/pybind/pybind.cc @@ -874,6 +874,7 @@ PYBIND11_MODULE(fastdeploy_ops, m) { m.def("get_output_kv_signal", &GetOutputKVSignal, + py::call_guard(), py::arg("x"), py::arg("rank_id"), py::arg("wait_flag"), diff --git a/fastdeploy/cache_manager/cache_messager.py b/fastdeploy/cache_manager/cache_messager.py index 98b992bc754..4ba583cbfa9 100644 --- a/fastdeploy/cache_manager/cache_messager.py +++ b/fastdeploy/cache_manager/cache_messager.py @@ -714,7 +714,7 @@ def prefill_layerwise_send_cache_thread(self): else: end_layer_idx = prefilled_layer_idx if sended_layer_idx == prefilled_layer_idx: # computation not in next layer - time.sleep(0.01) + time.sleep(0.001) for layer_idx in range(start_layer_idx, end_layer_idx + 1): for i, (block_id_start, block_id_end) in enumerate(block_start_end_list): @@ -847,24 +847,23 @@ def consume_signals(self): kv_signal_data = paddle.full(shape=[512 * 3 + 2], fill_value=-1, dtype="int32") while True: try: - get_output_kv_signal(kv_signal_data, self.rank_id, 0) # wait_flag + get_output_kv_signal(kv_signal_data, self.rank_id, 1) # wait_flag if not self.cache_info: time.sleep(0.01) continue tasks_count = kv_signal_data[0] if tasks_count == -1: - time.sleep(0.001) continue - layer_id = kv_signal_data[1].numpy().tolist() + layer_id = kv_signal_data[1].item() if layer_id == self.num_layers - 1: logger.info(f"tasks_count: {tasks_count}, layer_id: {layer_id} self.rank_id {self.rank_id}") batch_engine_signals = [] # format for signal to put in cache_prefilled_engine_ids_queue: [(engine_idx1, prefilled_token_num1), (engine_idx2, prefilled_token_num2)] with self.engine_cache_task_thread_lock: for bi in range(tasks_count): - engine_idx = kv_signal_data[3 * bi + 2].numpy().tolist() - chuck_token_offset = kv_signal_data[3 * bi + 3].numpy().tolist() - current_seq_len = kv_signal_data[3 * bi + 4].numpy().tolist() + engine_idx = kv_signal_data[3 * bi + 2].item() + chuck_token_offset = kv_signal_data[3 * bi + 3].item() + current_seq_len = kv_signal_data[3 * bi + 4].item() self.engine_cache_tasks[engine_idx]["prefilled_layer_idx"] = layer_id self.engine_cache_tasks[engine_idx]["prefilled_token_num"] = ( chuck_token_offset + current_seq_len diff --git a/fastdeploy/output/token_processor.py b/fastdeploy/output/token_processor.py index b06343b4d69..dcafb4c322b 100644 --- a/fastdeploy/output/token_processor.py +++ b/fastdeploy/output/token_processor.py @@ -481,14 +481,6 @@ def _recycle_resources(self, task_id, index, task, result=None, is_prefill=False llm_logger.info(f"finished_task_id: {finished_task_id}") self.prefill_result_status[finished_task_id[0]] = finished_task_id[1] if task_id in self.prefill_result_status: - if envs.ENABLE_V1_KVCACHE_SCHEDULER: - self.resource_manager.finish_requests_async(task_id) - else: - self.resource_manager.stop_flags[index] = True - self.resource_manager.tasks_list[index] = None - self.resource_manager._recycle_block_tables(task) - if task_id in self.resource_manager.req_dict: - del self.resource_manager.req_dict[task_id] if self.prefill_result_status[task_id] != "finished": result.error_code = 400 result.error_message = f"{task_id} failed to {self.prefill_result_status[task_id]}" @@ -497,6 +489,14 @@ def _recycle_resources(self, task_id, index, task, result=None, is_prefill=False ) result.metrics.send_request_output_to_decode_time = time.time() self.split_connector.send_first_token(task.disaggregate_info, [result]) + if envs.ENABLE_V1_KVCACHE_SCHEDULER: + self.resource_manager.finish_requests_async(task_id) + else: + self.resource_manager.stop_flags[index] = True + self.resource_manager.tasks_list[index] = None + self.resource_manager._recycle_block_tables(task) + if task_id in self.resource_manager.req_dict: + del self.resource_manager.req_dict[task_id] break else: # TODO: Refine checking sending cache and do not keep waiting