From 2237dc1a7d81a499fc209016394f405b42ebcf99 Mon Sep 17 00:00:00 2001 From: liyonghua0910 Date: Mon, 5 Jan 2026 11:44:19 +0000 Subject: [PATCH 1/2] [fix] fix mtp cache attaching for pd disaggregation --- fastdeploy/spec_decode/mtp.py | 27 ++++++++++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/fastdeploy/spec_decode/mtp.py b/fastdeploy/spec_decode/mtp.py index 279e4cd42d8..765dc0ed9a3 100644 --- a/fastdeploy/spec_decode/mtp.py +++ b/fastdeploy/spec_decode/mtp.py @@ -15,6 +15,7 @@ """ import os +import time from typing import List import numpy as np @@ -24,6 +25,7 @@ from fastdeploy import envs from fastdeploy.config import FDConfig from fastdeploy.engine.request import Request, RequestType +from fastdeploy.inter_communicator import IPCSignal from fastdeploy.model_executor.forward_meta import ForwardMeta from fastdeploy.model_executor.layers.attention import get_attention_backend from fastdeploy.model_executor.layers.attention.base_attention_backend import ( @@ -205,7 +207,30 @@ def initialize_kv_cache(self, main_model_num_blocks, profile: bool = False): if kv_cache_quant_type == "block_wise_fp8": kv_cache_scale_shape = [key_cache_shape[0], key_cache_shape[1], key_cache_shape[2]] local_rank = self.local_rank % self.parallel_config.tensor_parallel_size - if not profile and self.scheduler_config.splitwise_role != "mixed": + + cache_ready_signal_data = np.zeros(shape=[self.parallel_config.tensor_parallel_size], dtype=np.int32) + cache_ready_signal = IPCSignal( + name="cache_ready_signal", + array=cache_ready_signal_data, + dtype=np.int32, + suffix=self.parallel_config.local_engine_worker_queue_port, + create=False, + ) + + # Check if gpu runner needs to create kv cache + # 1. During profiling, it creates its own kv cache. + # 2. GPU runner creates kv cache tensor unless p/d disaggregation is enabled. + create_cache_tensor = profile or self.scheduler_config.splitwise_role == "mixed" + + if not create_cache_tensor: + logger.info(f"Waiting for cache managers to create kv cache.. {cache_ready_signal.value}") + while cache_ready_signal.value[local_rank] != 1: + time.sleep(1) + logger.info(f"OK! Stop waiting. {cache_ready_signal.value}") + + logger.info(f"Initializing kv cache for all layers. {cache_ready_signal.value}") + + if not create_cache_tensor: cache_kvs_list = [] for i in range( self.num_main_model_layers, From a54e5eec7432268c8b73c21411268a01a432c84c Mon Sep 17 00:00:00 2001 From: liyonghua0910 Date: Mon, 5 Jan 2026 11:48:45 +0000 Subject: [PATCH 2/2] [fix] fix port --- fastdeploy/spec_decode/mtp.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fastdeploy/spec_decode/mtp.py b/fastdeploy/spec_decode/mtp.py index 765dc0ed9a3..f333d43e411 100644 --- a/fastdeploy/spec_decode/mtp.py +++ b/fastdeploy/spec_decode/mtp.py @@ -213,7 +213,7 @@ def initialize_kv_cache(self, main_model_num_blocks, profile: bool = False): name="cache_ready_signal", array=cache_ready_signal_data, dtype=np.int32, - suffix=self.parallel_config.local_engine_worker_queue_port, + suffix=self.parallel_config.engine_worker_queue_port, create=False, )