From 25e85c4a234983dc6dfe06b51493852a8e37fcc6 Mon Sep 17 00:00:00 2001 From: liyonghua0910 Date: Mon, 5 Jan 2026 11:44:19 +0000 Subject: [PATCH 1/2] [fix] fix mtp cache attaching for pd disaggregation --- fastdeploy/spec_decode/mtp.py | 27 ++++++++++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/fastdeploy/spec_decode/mtp.py b/fastdeploy/spec_decode/mtp.py index 7efbea9fd02..aa9074c0c97 100644 --- a/fastdeploy/spec_decode/mtp.py +++ b/fastdeploy/spec_decode/mtp.py @@ -15,6 +15,7 @@ """ import os +import time from typing import List import numpy as np @@ -24,6 +25,7 @@ from fastdeploy import envs from fastdeploy.config import FDConfig from fastdeploy.engine.request import Request, RequestType +from fastdeploy.inter_communicator import IPCSignal from fastdeploy.model_executor.forward_meta import ForwardMeta from fastdeploy.model_executor.layers.attention import get_attention_backend from fastdeploy.model_executor.layers.attention.base_attention_backend import ( @@ -206,7 +208,30 @@ def initialize_kv_cache(self, main_model_num_blocks, profile: bool = False): if kv_cache_quant_type == "block_wise_fp8": kv_cache_scale_shape = [key_cache_shape[0], key_cache_shape[1], key_cache_shape[2]] local_rank = self.local_rank % self.parallel_config.tensor_parallel_size - if not profile and self.scheduler_config.splitwise_role != "mixed": + + cache_ready_signal_data = np.zeros(shape=[self.parallel_config.tensor_parallel_size], dtype=np.int32) + cache_ready_signal = IPCSignal( + name="cache_ready_signal", + array=cache_ready_signal_data, + dtype=np.int32, + suffix=self.parallel_config.local_engine_worker_queue_port, + create=False, + ) + + # Check if gpu runner needs to create kv cache + # 1. During profiling, it creates its own kv cache. + # 2. GPU runner creates kv cache tensor unless p/d disaggregation is enabled. + create_cache_tensor = profile or self.scheduler_config.splitwise_role == "mixed" + + if not create_cache_tensor: + logger.info(f"Waiting for cache managers to create kv cache.. {cache_ready_signal.value}") + while cache_ready_signal.value[local_rank] != 1: + time.sleep(1) + logger.info(f"OK! Stop waiting. {cache_ready_signal.value}") + + logger.info(f"Initializing kv cache for all layers. {cache_ready_signal.value}") + + if not create_cache_tensor: cache_kvs_list = [] for i in range( self.num_main_model_layers, From 34d27d7737090752b1f5cc7244cdbfe162fbab4a Mon Sep 17 00:00:00 2001 From: liyonghua0910 Date: Tue, 6 Jan 2026 03:24:56 +0000 Subject: [PATCH 2/2] [fix] fix test_mtp_proposer.py --- tests/spec_decode/test_mtp_proposer.py | 36 ++++++++++++++++++++++---- 1 file changed, 31 insertions(+), 5 deletions(-) diff --git a/tests/spec_decode/test_mtp_proposer.py b/tests/spec_decode/test_mtp_proposer.py index cac0d27e198..de2c3b806a3 100644 --- a/tests/spec_decode/test_mtp_proposer.py +++ b/tests/spec_decode/test_mtp_proposer.py @@ -141,11 +141,17 @@ def test_init_and_config_methods(self, mock_rope, mock_attn_backend, mock_model_ # Test is_chunk_prefill_enabled self.assertTrue(proposer.is_chunk_prefill_enabled()) + @patch("fastdeploy.spec_decode.mtp.IPCSignal") @patch("fastdeploy.spec_decode.mtp.get_model_loader") @patch("fastdeploy.spec_decode.mtp.get_attention_backend") @patch("fastdeploy.spec_decode.mtp.get_rope") - def test_dummy_prefill_inputs_and_kv_cache(self, mock_rope, mock_attn_backend, mock_model_loader): + def test_dummy_prefill_inputs_and_kv_cache( + self, mock_rope, mock_attn_backend, mock_model_loader, mock_ipc_signal_cls + ): """Test dummy_prefill_inputs and initialize_kv_cache with different branches""" + mock_ipc_signal = Mock() + mock_ipc_signal.value = [0] * self.fd_config.parallel_config.tensor_parallel_size + mock_ipc_signal_cls.return_value = mock_ipc_signal mock_model = Mock() mock_model.compute_logits = Mock(return_value=paddle.zeros([2, 32000])) mock_model_loader.return_value.load_model.return_value = mock_model @@ -180,11 +186,15 @@ def test_dummy_prefill_inputs_and_kv_cache(self, mock_rope, mock_attn_backend, m proposer.clear_mtp_cache() self.assertNotIn("caches", proposer.model_inputs) + @patch("fastdeploy.spec_decode.mtp.IPCSignal") @patch("fastdeploy.spec_decode.mtp.get_model_loader") @patch("fastdeploy.spec_decode.mtp.get_attention_backend") @patch("fastdeploy.spec_decode.mtp.get_rope") - def test_update_mtp_block_num(self, mock_rope, mock_attn_backend, mock_model_loader): + def test_update_mtp_block_num(self, mock_rope, mock_attn_backend, mock_model_loader, mock_ipc_signal_cls): """Test update_mtp_block_num""" + mock_ipc_signal = Mock() + mock_ipc_signal.value = [0] * self.fd_config.parallel_config.tensor_parallel_size + mock_ipc_signal_cls.return_value = mock_ipc_signal mock_model = Mock() mock_model.compute_logits = Mock(return_value=paddle.zeros([2, 32000])) mock_model_loader.return_value.load_model.return_value = mock_model @@ -200,11 +210,15 @@ def test_update_mtp_block_num(self, mock_rope, mock_attn_backend, mock_model_loa self.assertEqual(proposer.main_model_num_gpu_blocks, 20) self.assertIn("free_list", proposer.model_inputs) + @patch("fastdeploy.spec_decode.mtp.IPCSignal") @patch("fastdeploy.spec_decode.mtp.get_model_loader") @patch("fastdeploy.spec_decode.mtp.get_attention_backend") @patch("fastdeploy.spec_decode.mtp.get_rope") - def test_insert_tasks_v1(self, mock_rope, mock_attn_backend, mock_model_loader): + def test_insert_tasks_v1(self, mock_rope, mock_attn_backend, mock_model_loader, mock_ipc_signal_cls): """Test insert_tasks_v1 with different request types""" + mock_ipc_signal = Mock() + mock_ipc_signal.value = [0] * self.fd_config.parallel_config.tensor_parallel_size + mock_ipc_signal_cls.return_value = mock_ipc_signal mock_model = Mock() mock_model.compute_logits = Mock(return_value=paddle.zeros([2, 32000])) mock_model_loader.return_value.load_model.return_value = mock_model @@ -327,11 +341,17 @@ def test_insert_prefill_inputs(self, mock_rope, mock_attn_backend, mock_model_lo request.disaggregate_info = None proposer.insert_prefill_inputs([request], 1) + @patch("fastdeploy.spec_decode.mtp.IPCSignal") @patch("fastdeploy.spec_decode.mtp.get_model_loader") @patch("fastdeploy.spec_decode.mtp.get_attention_backend") @patch("fastdeploy.spec_decode.mtp.get_rope") - def test_forward_meta_and_exist_prefill(self, mock_rope, mock_attn_backend, mock_model_loader): + def test_forward_meta_and_exist_prefill( + self, mock_rope, mock_attn_backend, mock_model_loader, mock_ipc_signal_cls + ): """Test _initialize_forward_meta, _initialize_forward_meta_xpu, and exist_prefill""" + mock_ipc_signal = Mock() + mock_ipc_signal.value = [0] * self.fd_config.parallel_config.tensor_parallel_size + mock_ipc_signal_cls.return_value = mock_ipc_signal mock_model = Mock() mock_model.compute_logits = Mock(return_value=paddle.zeros([2, 32000])) mock_model_loader.return_value.load_model.return_value = mock_model @@ -497,11 +517,17 @@ def test_extend_draft_token_and_run_impl(self, mock_ngram, mock_rope, mock_attn_ ): proposer._run_impl(full_hidden_states) + @patch("fastdeploy.spec_decode.mtp.IPCSignal") @patch("fastdeploy.spec_decode.mtp.get_model_loader") @patch("fastdeploy.spec_decode.mtp.get_attention_backend") @patch("fastdeploy.spec_decode.mtp.get_rope") - def test_padding_cudagraph_inputs_and_empty_cache(self, mock_rope, mock_attn_backend, mock_model_loader): + def test_padding_cudagraph_inputs_and_empty_cache( + self, mock_rope, mock_attn_backend, mock_model_loader, mock_ipc_signal_cls + ): """Test padding_cudagraph_inputs and _empty_cache""" + mock_ipc_signal = Mock() + mock_ipc_signal.value = [0] * self.fd_config.parallel_config.tensor_parallel_size + mock_ipc_signal_cls.return_value = mock_ipc_signal mock_model = Mock() mock_model.compute_logits = Mock(return_value=paddle.zeros([2, 32000])) mock_model_loader.return_value.load_model.return_value = mock_model