Skip to content
1 change: 1 addition & 0 deletions custom_ops/gpu_ops/cpp_extensions.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1148,6 +1148,7 @@ PYBIND11_MODULE(fastdeploy_ops, m) {
*/
m.def("get_output_kv_signal",
&GetOutputKVSignal,
py::call_guard<py::gil_scoped_release>(),
py::arg("x"),
py::arg("rank_id"),
py::arg("wait_flag"),
Expand Down
1 change: 1 addition & 0 deletions custom_ops/xpu_ops/src/ops/pybind/pybind.cc
Original file line number Diff line number Diff line change
Expand Up @@ -874,6 +874,7 @@ PYBIND11_MODULE(fastdeploy_ops, m) {

m.def("get_output_kv_signal",
&GetOutputKVSignal,
py::call_guard<py::gil_scoped_release>(),
py::arg("x"),
py::arg("rank_id"),
py::arg("wait_flag"),
Expand Down
13 changes: 6 additions & 7 deletions fastdeploy/cache_manager/cache_messager.py
Original file line number Diff line number Diff line change
Expand Up @@ -714,7 +714,7 @@ def prefill_layerwise_send_cache_thread(self):
else:
end_layer_idx = prefilled_layer_idx
if sended_layer_idx == prefilled_layer_idx: # computation not in next layer
time.sleep(0.01)
time.sleep(0.001)

for layer_idx in range(start_layer_idx, end_layer_idx + 1):
for i, (block_id_start, block_id_end) in enumerate(block_start_end_list):
Expand Down Expand Up @@ -847,24 +847,23 @@ def consume_signals(self):
kv_signal_data = paddle.full(shape=[512 * 3 + 2], fill_value=-1, dtype="int32")
while True:
try:
get_output_kv_signal(kv_signal_data, self.rank_id, 0) # wait_flag
get_output_kv_signal(kv_signal_data, self.rank_id, 1) # wait_flag
if not self.cache_info:
time.sleep(0.01)
continue
tasks_count = kv_signal_data[0]
if tasks_count == -1:
time.sleep(0.001)
continue
layer_id = kv_signal_data[1].numpy().tolist()
layer_id = kv_signal_data[1].item()
if layer_id == self.num_layers - 1:
logger.info(f"tasks_count: {tasks_count}, layer_id: {layer_id} self.rank_id {self.rank_id}")
batch_engine_signals = []
# format for signal to put in cache_prefilled_engine_ids_queue: [(engine_idx1, prefilled_token_num1), (engine_idx2, prefilled_token_num2)]
with self.engine_cache_task_thread_lock:
for bi in range(tasks_count):
engine_idx = kv_signal_data[3 * bi + 2].numpy().tolist()
chuck_token_offset = kv_signal_data[3 * bi + 3].numpy().tolist()
current_seq_len = kv_signal_data[3 * bi + 4].numpy().tolist()
engine_idx = kv_signal_data[3 * bi + 2].item()
chuck_token_offset = kv_signal_data[3 * bi + 3].item()
current_seq_len = kv_signal_data[3 * bi + 4].item()
self.engine_cache_tasks[engine_idx]["prefilled_layer_idx"] = layer_id
self.engine_cache_tasks[engine_idx]["prefilled_token_num"] = (
chuck_token_offset + current_seq_len
Expand Down
16 changes: 8 additions & 8 deletions fastdeploy/output/token_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,14 +481,6 @@ def _recycle_resources(self, task_id, index, task, result=None, is_prefill=False
llm_logger.info(f"finished_task_id: {finished_task_id}")
self.prefill_result_status[finished_task_id[0]] = finished_task_id[1]
if task_id in self.prefill_result_status:
if envs.ENABLE_V1_KVCACHE_SCHEDULER:
self.resource_manager.finish_requests_async(task_id)
else:
self.resource_manager.stop_flags[index] = True
self.resource_manager.tasks_list[index] = None
self.resource_manager._recycle_block_tables(task)
if task_id in self.resource_manager.req_dict:
del self.resource_manager.req_dict[task_id]
if self.prefill_result_status[task_id] != "finished":
result.error_code = 400
result.error_message = f"{task_id} failed to {self.prefill_result_status[task_id]}"
Expand All @@ -497,6 +489,14 @@ def _recycle_resources(self, task_id, index, task, result=None, is_prefill=False
)
result.metrics.send_request_output_to_decode_time = time.time()
self.split_connector.send_first_token(task.disaggregate_info, [result])
if envs.ENABLE_V1_KVCACHE_SCHEDULER:
self.resource_manager.finish_requests_async(task_id)
else:
self.resource_manager.stop_flags[index] = True
self.resource_manager.tasks_list[index] = None
self.resource_manager._recycle_block_tables(task)
if task_id in self.resource_manager.req_dict:
del self.resource_manager.req_dict[task_id]
break
else:
# TODO: Refine checking sending cache and do not keep waiting
Expand Down
Loading