From 8acdd9f156ee98e6b988b992071dc5bd60418a08 Mon Sep 17 00:00:00 2001 From: xunyoyo <33387866+xunyoyo@users.noreply.github.com> Date: Wed, 24 Dec 2025 14:05:32 +0800 Subject: [PATCH 1/7] =?UTF-8?q?[CI]=20=E3=80=90Hackathon=209th=20Sprint=20?= =?UTF-8?q?No.41=E3=80=91NO.41=20=E5=8A=9F=E8=83=BD=E6=A8=A1=E5=9D=97?= =?UTF-8?q?=E5=8D=95=E6=B5=8B=E8=A1=A5=E5=85=85=20-new?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add splitwise connector tests Co-authored-by: CSWYF3634076 Co-authored-by: YuBaoku <49938469+EmmonsCurse@users.noreply.github.com> --- tests/splitwise/test_splitwise_connector.py | 567 ++++++++++++++++++++ 1 file changed, 567 insertions(+) create mode 100644 tests/splitwise/test_splitwise_connector.py diff --git a/tests/splitwise/test_splitwise_connector.py b/tests/splitwise/test_splitwise_connector.py new file mode 100644 index 00000000000..fbea4d194d8 --- /dev/null +++ b/tests/splitwise/test_splitwise_connector.py @@ -0,0 +1,567 @@ +""" +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any, Dict, List +from unittest.mock import Mock, patch + +import paddle +import pytest +import zmq + +if not hasattr(paddle, "compat"): + + class _CompatStub: + def enable_torch_proxy(self, scope=None): + return None + + paddle.compat = _CompatStub() + +from fastdeploy import envs +from fastdeploy.engine.request import Request, RequestMetrics, RequestOutput +from fastdeploy.engine.sampling_params import SamplingParams +from fastdeploy.splitwise.splitwise_connector import SplitwiseConnector + + +@dataclass +class DummyParallelConfig: + local_data_parallel_id: int = 0 + data_parallel_size: int = 1 + + +@dataclass +class DummySchedulerConfig: + splitwise_role: str = "mixed" + + +@dataclass +class DummyCacheConfig: + local_pd_comm_port: int = 12345 + + +@dataclass +class DummyCfg: + parallel_config: DummyParallelConfig = field(default_factory=DummyParallelConfig) + scheduler_config: DummySchedulerConfig = field(default_factory=DummySchedulerConfig) + cache_config: DummyCacheConfig = field(default_factory=DummyCacheConfig) + + +class DummyWorkerQueue: + def __init__(self) -> None: + self.cache_info_calls: List[List[Dict[str, Any]]] = [] + self.disaggregated_calls: List[Any] = [] + + def put_cache_info(self, cache_info: List[Dict[str, Any]]) -> None: + self.cache_info_calls.append(cache_info) + + def put_disaggregated_tasks(self, payload: Any) -> None: + self.disaggregated_calls.append(payload) + + +class DummyTask: + def __init__(self, request_id: str, disaggregate_info: Dict[str, Any], error_msg: str | None = None) -> None: + self.request_id = request_id + self.disaggregate_info = disaggregate_info + self._error_msg = error_msg + + def get(self, key: str, default: Any = None) -> Any: + if key == "error_msg": + return self._error_msg + return default + + +def _build_connector() -> SplitwiseConnector: + connector = SplitwiseConnector(cfg=DummyCfg(), worker_queue=DummyWorkerQueue(), resource_manager=None) + if not hasattr(connector, "push_sockets"): + connector.push_sockets = {} + return connector + + +def test_serialize_deserialize_prefill_roundtrip_uses_paddle_tensor(): + connector = _build_connector() + token_tensor = paddle.to_tensor([11, 12, 13], dtype="int64") + request = Request( + request_id="req-1", + prompt="hello", + prompt_token_ids=token_tensor.tolist(), + prompt_token_ids_len=3, + messages=None, + history=None, + tools=None, + system=None, + eos_token_ids=None, + sampling_params=SamplingParams(), + pooling_params=None, + multimodal_inputs=None, + multimodal_data=None, + disable_chat_template=False, + disaggregate_info={"decode_ip": "127.0.0.1", "decode_connector_port": 9000}, + metrics=RequestMetrics(), + ) + + serialized = connector._serialize_message("prefill", [request]) + msg_type, payload = connector._deserialize_message([b"identity"] + serialized) + + assert msg_type == "prefill" + assert payload[0]["request_id"] == "req-1" + assert payload[0]["prompt_token_ids"] == token_tensor.tolist() + + +def test_deserialize_message_rejects_short_frames(): + connector = _build_connector() + with pytest.raises(ValueError, match="frames too short"): + connector._deserialize_message([b"identity"]) + + +def test_process_message_cache_sync_updates_state_and_cache_queue(): + connector = _build_connector() + worker_queue = connector.engine_worker_queue + payload = [ + {"request_id": "req-ok"}, + {"request_id": "req-error", "error_msg": "bad"}, + ] + + frames = [b"identity"] + connector._serialize_message("cache_sync", payload) + connector._process_message(frames) + + assert connector.current_request_ids["req-ok"] == "finished" + assert connector.current_request_ids["req-error"] == "bad" + assert worker_queue.cache_info_calls == [payload] + + +def test_check_decode_allocated_handles_finished_and_error_states(): + connector = _build_connector() + + finished_task = Request( + request_id="req-finished", + prompt=None, + prompt_token_ids=None, + prompt_token_ids_len=None, + messages=None, + history=None, + tools=None, + system=None, + eos_token_ids=None, + sampling_params=None, + pooling_params=None, + multimodal_inputs=None, + multimodal_data=None, + disable_chat_template=False, + disaggregate_info={}, + ) + connector.current_request_ids["req-finished"] = "finished" + ok, msg = connector.check_decode_allocated(finished_task) + assert (ok, msg) == (True, "") + assert "req-finished" not in connector.current_request_ids + + error_task = Request( + request_id="req-error", + prompt=None, + prompt_token_ids=None, + prompt_token_ids_len=None, + messages=None, + history=None, + tools=None, + system=None, + eos_token_ids=None, + sampling_params=None, + pooling_params=None, + multimodal_inputs=None, + multimodal_data=None, + disable_chat_template=False, + disaggregate_info={}, + ) + connector.current_request_ids["req-error"] = "allocation_failed" + ok, msg = connector.check_decode_allocated(error_task) + assert (ok, msg) == (False, "allocation_failed") + assert "req-error" not in connector.current_request_ids + + +def test_send_cache_info_to_prefill_groups_by_addr_and_skips_error(): + connector = _build_connector() + connector._send_message = Mock() + + tasks = [ + DummyTask( + request_id="req-1", + disaggregate_info={ + "prefill_ip": "10.0.0.1", + "prefill_connector_port": 9001, + "block_tables": [1, 2, 3], + }, + ), + DummyTask( + request_id="req-err", + disaggregate_info={ + "prefill_ip": "10.0.0.2", + "prefill_connector_port": 9002, + "block_tables": [9], + }, + error_msg="failed", + ), + ] + + connector.send_cache_info_to_prefill(tasks) + + connector._send_message.assert_called_once_with( + "10.0.0.1:9001", + "cache_sync", + [{"request_id": "req-1", "dest_block_ids": [1, 2, 3]}], + ) + + +def test_init_network_configures_router_and_poller(): + connector = _build_connector() + mock_socket = Mock() + mock_poller = Mock() + connector.zmq_ctx = Mock() + connector.zmq_ctx.socket.return_value = mock_socket + + with patch("fastdeploy.splitwise.splitwise_connector.zmq.Poller", return_value=mock_poller): + connector._init_network() + + mock_socket.bind.assert_called_once_with("tcp://*:12345") + mock_poller.register.assert_called_once_with(mock_socket, zmq.POLLIN) + assert connector.prefill_cache_info == [] + + +def test_init_non_mixed_creates_network_state(): + cfg = DummyCfg( + parallel_config=DummyParallelConfig(local_data_parallel_id=1, data_parallel_size=2), + scheduler_config=DummySchedulerConfig(splitwise_role="prefill"), + ) + with patch.object(SplitwiseConnector, "_init_network") as mock_init: + connector = SplitwiseConnector(cfg=cfg, worker_queue=DummyWorkerQueue(), resource_manager=None) + + assert connector.local_data_parallel_id == 1 + assert connector.pull_socket is None + assert connector.push_sockets == {} + mock_init.assert_called_once_with() + + +def test_get_push_socket_reuses_existing_and_handles_zmq_error(): + connector = _build_connector() + open_socket = Mock() + open_socket.closed = False + connector.push_sockets["127.0.0.1:8000"] = open_socket + + same_socket = connector._get_push_socket("127.0.0.1:8000") + assert same_socket is open_socket + + connector.zmq_ctx = Mock() + connector.zmq_ctx.socket.side_effect = zmq.ZMQError("boom") + with pytest.raises(ConnectionError, match="Failed to connect"): + connector._get_push_socket("127.0.0.1:9000") + + +def test_get_push_socket_creates_and_configures_socket(): + connector = _build_connector() + connector.zmq_ctx = Mock() + new_socket = Mock() + new_socket.closed = False + connector.zmq_ctx.socket.return_value = new_socket + + socket = connector._get_push_socket("127.0.0.1:7000") + + assert socket is new_socket + new_socket.connect.assert_called_once_with("tcp://127.0.0.1:7000") + assert connector.push_sockets["127.0.0.1:7000"] is new_socket + + +def test_send_message_serializes_and_sends_payload(): + connector = _build_connector() + mock_socket = Mock() + connector._get_push_socket = Mock(return_value=mock_socket) + request = Request( + request_id="req-send", + prompt=None, + prompt_token_ids=[1, 2], + prompt_token_ids_len=2, + messages=None, + history=None, + tools=None, + system=None, + eos_token_ids=None, + sampling_params=SamplingParams(), + pooling_params=None, + multimodal_inputs=None, + multimodal_data=None, + disable_chat_template=False, + disaggregate_info={}, + metrics=RequestMetrics(), + ) + + connector._send_message("127.0.0.1:9000", "prefill", [request]) + + mock_socket.send_multipart.assert_called_once() + sent_frames = mock_socket.send_multipart.call_args[0][0] + msg_type, payload = connector._deserialize_message([b"identity"] + sent_frames) + assert msg_type == "prefill" + assert payload[0]["request_id"] == "req-send" + + +def test_send_message_handles_missing_addr_and_errors(): + connector = _build_connector() + connector._send_message(None, "prefill", []) + + connector._get_push_socket = Mock(side_effect=ConnectionError) + connector._send_message("127.0.0.1:7000", "prefill", []) + + failing_socket = Mock() + failing_socket.send_multipart.side_effect = zmq.Again() + connector._get_push_socket = Mock(return_value=failing_socket) + connector._send_message("127.0.0.1:7001", "prefill", []) + + crash_socket = Mock() + crash_socket.send_multipart.side_effect = RuntimeError("boom") + connector._get_push_socket = Mock(return_value=crash_socket) + connector.push_sockets["127.0.0.1:7002"] = crash_socket + connector._send_message("127.0.0.1:7002", "prefill", []) + assert "127.0.0.1:7002" not in connector.push_sockets + + +def test_send_splitwise_tasks_updates_roles_and_tracks_ids(): + connector = _build_connector() + connector._send_message = Mock() + task = Request( + request_id="req-role", + prompt=None, + prompt_token_ids=[0], + prompt_token_ids_len=1, + messages=None, + history=None, + tools=None, + system=None, + eos_token_ids=None, + sampling_params=SamplingParams(), + pooling_params=None, + multimodal_inputs=None, + multimodal_data=None, + disable_chat_template=False, + disaggregate_info={"decode_ip": "127.0.0.1", "decode_connector_port": 9001}, + metrics=RequestMetrics(), + ) + + connector.send_splitwise_tasks([task], current_id=0) + + assert connector.current_request_ids["req-role"] == "init" + connector._send_message.assert_called_once() + assert task.disaggregate_info["role"] == "prefill" + + +def test_send_splitwise_tasks_skips_missing_disaggregate_info(): + connector = _build_connector() + connector._send_message = Mock() + task = Request( + request_id="req-skip", + prompt=None, + prompt_token_ids=[0], + prompt_token_ids_len=1, + messages=None, + history=None, + tools=None, + system=None, + eos_token_ids=None, + sampling_params=SamplingParams(), + pooling_params=None, + multimodal_inputs=None, + multimodal_data=None, + disable_chat_template=False, + disaggregate_info=None, + metrics=RequestMetrics(), + ) + + connector.send_splitwise_tasks([task], current_id=0) + connector._send_message.assert_not_called() + + +def test_send_cache_info_to_messager_handles_v1_and_v0_modes(monkeypatch): + connector = _build_connector() + worker_queue = connector.engine_worker_queue + + class _Task: + def __init__(self, request_id: str, idx: int, disaggregate_info: Dict[str, Any]): + self.request_id = request_id + self.idx = idx + self.block_tables = [1, 2] + self.need_prefill_tokens = 5 + self.disaggregate_info = disaggregate_info + + task = _Task("req-cache", 7, {"decode_ip": "1.1.1.1"}) + + monkeypatch.setattr(envs, "ENABLE_V1_KVCACHE_SCHEDULER", False) + connector.send_cache_info_to_messager([task], current_id=3) + assert worker_queue.cache_info_calls[-1][0]["current_id"] == 3 + + monkeypatch.setattr(envs, "ENABLE_V1_KVCACHE_SCHEDULER", True) + connector.send_cache_info_to_messager([task], current_id=9) + latest_call = worker_queue.cache_info_calls[-1][0] + assert latest_call["current_id"] == 7 + assert latest_call["need_prefill_tokens"] == 5 + + task_without_info = _Task("req-empty", 1, None) + connector.send_cache_info_to_messager([task_without_info], current_id=1) + assert worker_queue.cache_info_calls[-1] == [] + + +def test_process_message_prefill_and_decode_dispatches_to_worker_queue(): + connector = _build_connector() + worker_queue = connector.engine_worker_queue + request = Request( + request_id="req-prefill", + prompt="hi", + prompt_token_ids=[1], + prompt_token_ids_len=1, + messages=None, + history=None, + tools=None, + system=None, + eos_token_ids=None, + sampling_params=SamplingParams(), + pooling_params=None, + multimodal_inputs=None, + multimodal_data=None, + disable_chat_template=False, + disaggregate_info={}, + metrics=RequestMetrics(), + ) + decode_output = RequestOutput(request_id="req-decode") + + prefill_frames = [b"id"] + connector._serialize_message("prefill", [request]) + connector._process_message(prefill_frames) + decode_frames = [b"id"] + connector._serialize_message("decode", [decode_output]) + connector._process_message(decode_frames) + + assert worker_queue.disaggregated_calls[0][0] == "decode" + assert worker_queue.disaggregated_calls[0][1][0].request_id == "req-prefill" + assert worker_queue.disaggregated_calls[1][1][0].request_id == "req-decode" + + +def test_process_message_handles_cache_sync_with_decode_cache_task(): + connector = _build_connector() + connector.enable_decode_cache_task = True + payload = [{"request_id": "req-cache"}] + frames = [b"identity"] + connector._serialize_message("cache_sync", payload) + connector._process_message(frames) + assert connector.current_request_ids == {} + + +def test_process_message_logs_error_on_bad_frames(): + connector = _build_connector() + connector.logger = Mock() + connector._process_message([b"only-one-frame"]) + connector.logger.error.assert_called_once() + + +def test_check_decode_allocated_times_out(monkeypatch): + connector = _build_connector() + task = Request( + request_id="req-timeout", + prompt=None, + prompt_token_ids=None, + prompt_token_ids_len=None, + messages=None, + history=None, + tools=None, + system=None, + eos_token_ids=None, + sampling_params=None, + pooling_params=None, + multimodal_inputs=None, + multimodal_data=None, + disable_chat_template=False, + disaggregate_info={}, + ) + connector.current_request_ids["req-timeout"] = "init" + + monkeypatch.setattr(envs, "FD_PREFILL_WAIT_DECODE_RESOURCE_SECONDS", 0) + monkeypatch.setattr("fastdeploy.splitwise.splitwise_connector.time.sleep", lambda *_: None) + + ok, msg = connector.check_decode_allocated(task) + assert (ok, msg) == (False, "timeout") + assert "req-timeout" not in connector.current_request_ids + + +def test_check_decode_allocated_returns_immediately_for_empty_or_cached(): + connector = _build_connector() + no_info_task = Request( + request_id="req-none", + prompt=None, + prompt_token_ids=None, + prompt_token_ids_len=None, + messages=None, + history=None, + tools=None, + system=None, + eos_token_ids=None, + sampling_params=None, + pooling_params=None, + multimodal_inputs=None, + multimodal_data=None, + disable_chat_template=False, + disaggregate_info=None, + ) + ok, msg = connector.check_decode_allocated(no_info_task) + assert (ok, msg) == (True, "") + + connector.enable_decode_cache_task = True + cache_task = Request( + request_id="req-cache", + prompt=None, + prompt_token_ids=None, + prompt_token_ids_len=None, + messages=None, + history=None, + tools=None, + system=None, + eos_token_ids=None, + sampling_params=None, + pooling_params=None, + multimodal_inputs=None, + multimodal_data=None, + disable_chat_template=False, + disaggregate_info={}, + ) + ok, msg = connector.check_decode_allocated(cache_task) + assert (ok, msg) == (True, "") + + +def test_send_first_token_wraps_task_list(): + connector = _build_connector() + connector._send_message = Mock() + task = Request( + request_id="req-token", + prompt=None, + prompt_token_ids=[1], + prompt_token_ids_len=1, + messages=None, + history=None, + tools=None, + system=None, + eos_token_ids=None, + sampling_params=SamplingParams(), + pooling_params=None, + multimodal_inputs=None, + multimodal_data=None, + disable_chat_template=False, + disaggregate_info={}, + metrics=RequestMetrics(), + ) + + connector.send_first_token({"decode_ip": "1.2.3.4", "decode_connector_port": 7777}, task) + connector._send_message.assert_called_once_with("1.2.3.4:7777", "decode", [task]) From a0fed22ddbbe7fa41b6feeef184063cc572a436b Mon Sep 17 00:00:00 2001 From: GoldPancake <56388518+Deleter-D@users.noreply.github.com> Date: Wed, 24 Dec 2025 15:00:06 +0800 Subject: [PATCH 2/7] [Feature] Add entropy calculation script --- scripts/calculate_avg_entropy.py | 57 ++++++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) create mode 100644 scripts/calculate_avg_entropy.py diff --git a/scripts/calculate_avg_entropy.py b/scripts/calculate_avg_entropy.py new file mode 100644 index 00000000000..f24c976cd57 --- /dev/null +++ b/scripts/calculate_avg_entropy.py @@ -0,0 +1,57 @@ +import argparse +import os +import re +from typing import List, Optional + + +def extract_entropy_values(log_path: str) -> List[float]: + pattern = r"entropy:\s*([0-9]+\.?[0-9]*(?:[eE][+-]?[0-9]+)?)" + + entropy_values = [] + with open(log_path, "r") as f: + lines = f.readlines() + for line in lines: + match = re.search(pattern, line) + if match: + try: + entropy_value = float(match.group(1)) + entropy_values.append(entropy_value) + except ValueError: + continue + + return entropy_values + + +def calculate_average(entropy_values: List[float], drop_ratio: float = 0.1) -> Optional[float]: + if not entropy_values: + return None + sorted_vals = sorted(entropy_values) + n = len(sorted_vals) + drop_count = int(n * drop_ratio) + filtered_vals = sorted_vals[drop_count : n - drop_count] if drop_count > 0 else sorted_vals + if not filtered_vals: + return None, [] + avg = sum(filtered_vals) / len(filtered_vals) + return avg, filtered_vals + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--log-dir", type=str, required=True) + parser.add_argument("--drop-ratio", "-d", type=float, default=0.1) + parser.add_argument("--verbose", "-v", action="store_true") + args = parser.parse_args() + entropy_values = extract_entropy_values(os.path.join(args.log_dir, "data_processor.log")) + average_entropy, filtered_vals = calculate_average(entropy_values, args.drop_ratio) + + print(f"{len(entropy_values)} entropy values were found") + print(f"effective entropy values: {len(filtered_vals)} (drop ratio {args.drop_ratio})") + print(f"Average entropy: {average_entropy:.10f}") + if args.verbose: + print("\nentropy details:") + for i, value in enumerate(filtered_vals, 1): + print(f" {i}. {value}") + + +if __name__ == "__main__": + main() From ba4b7afb3aa8597324025d2ecffba3687afcd498 Mon Sep 17 00:00:00 2001 From: bukejiyu <52310069+bukejiyu@users.noreply.github.com> Date: Wed, 24 Dec 2025 15:19:11 +0800 Subject: [PATCH 3/7] [Others] Rename tensor_parallel_degree to tensor_model_parallel_size for paddleformers 0.4.1 (#5727) --- .../model_executor/models/ernie4_5_moe.py | 2 +- .../model_executor/models/ernie4_5_mtp.py | 16 ++++----- .../models/ernie4_5_vl/dfnrope/modeling.py | 30 +++++++++------- .../models/ernie4_5_vl/ernie4_5_vl_moe.py | 4 +-- .../models/ernie4_5_vl/modeling_resampler.py | 24 ++++++------- fastdeploy/model_executor/models/glm4_moe.py | 2 +- .../models/qwen2_5_vl/dfnrope/modeling.py | 32 +++++++++-------- .../models/qwen2_5_vl/qwen2_5_vl.py | 4 +-- fastdeploy/model_executor/models/tp_utils.py | 34 ++++++++++--------- fastdeploy/worker/worker_process.py | 2 +- tests/model_executor/test_tp_utils.py | 12 +++---- 11 files changed, 86 insertions(+), 76 deletions(-) diff --git a/fastdeploy/model_executor/models/ernie4_5_moe.py b/fastdeploy/model_executor/models/ernie4_5_moe.py index 7314a4acf55..99af90ca583 100644 --- a/fastdeploy/model_executor/models/ernie4_5_moe.py +++ b/fastdeploy/model_executor/models/ernie4_5_moe.py @@ -796,7 +796,7 @@ def _get_tensor_parallel_mappings(cls, config: PretrainedConfig, is_split=True): fn = split_or_merge_func_v1( is_split=is_split, - tensor_parallel_degree=config.tensor_model_parallel_size, + tensor_model_parallel_size=config.tensor_model_parallel_size, tensor_parallel_rank=config.tensor_parallel_rank, num_attention_heads=config.num_attention_heads, num_key_value_heads=config.num_key_value_heads, diff --git a/fastdeploy/model_executor/models/ernie4_5_mtp.py b/fastdeploy/model_executor/models/ernie4_5_mtp.py index 4ddba9a9b15..db8499444b2 100644 --- a/fastdeploy/model_executor/models/ernie4_5_mtp.py +++ b/fastdeploy/model_executor/models/ernie4_5_mtp.py @@ -76,7 +76,7 @@ def _get_tensor_parallel_mappings(cls, config, is_split=True): def gqa_qkv_split_func( weight, - tensor_parallel_degree, + tensor_model_parallel_size, tensor_parallel_rank, num_attention_heads, num_key_value_heads, @@ -109,9 +109,9 @@ def split_tensor(tensor, degree): else: return np.split(tensor, degree, axis=-1) - q_list = split_tensor(q, tensor_parallel_degree) - k_list = split_tensor(k, tensor_parallel_degree) - v_list = split_tensor(v, tensor_parallel_degree) + q_list = split_tensor(q, tensor_model_parallel_size) + k_list = split_tensor(k, tensor_model_parallel_size) + v_list = split_tensor(v, tensor_model_parallel_size) if tensor_parallel_rank is None: return [np.concatenate([q_i, k_i, v_i], axis=-1) for q_i, k_i, v_i in zip(q_list, k_list, v_list)] @@ -126,9 +126,9 @@ def split_tensor(tensor, degree): ) def gqa_qkv_merge_func(weight_list, num_attention_heads, num_key_value_heads, head_dim): - tensor_parallel_degree = len(weight_list) - num_attention_heads = num_attention_heads // tensor_parallel_degree - num_key_value_heads = num_key_value_heads // tensor_parallel_degree + tensor_model_parallel_size = len(weight_list) + num_attention_heads = num_attention_heads // tensor_model_parallel_size + num_key_value_heads = num_key_value_heads // tensor_model_parallel_size is_paddle_tensor = not isinstance(weight_list[0], np.ndarray) @@ -170,7 +170,7 @@ def slice_tensor(tensor, start, end): if is_split: qkv_fn = partial( gqa_qkv_split_func, - tensor_parallel_degree=config.tensor_model_parallel_size, + tensor_model_parallel_size=config.tensor_model_parallel_size, tensor_parallel_rank=config.tensor_parallel_rank, num_attention_heads=config.num_attention_heads, num_key_value_heads=config.num_key_value_heads, diff --git a/fastdeploy/model_executor/models/ernie4_5_vl/dfnrope/modeling.py b/fastdeploy/model_executor/models/ernie4_5_vl/dfnrope/modeling.py index 0706bf2ab93..b4dd3aa26f0 100644 --- a/fastdeploy/model_executor/models/ernie4_5_vl/dfnrope/modeling.py +++ b/fastdeploy/model_executor/models/ernie4_5_vl/dfnrope/modeling.py @@ -159,15 +159,15 @@ def __init__( self, dim: int, num_heads: int = 16, - tensor_parallel_degree: int = 1, + tensor_model_parallel_size: int = 1, tensor_parallel_rank: int = 0, model_format: str = "", ) -> None: super().__init__() self.num_heads = num_heads - self.tensor_parallel_degree = tensor_parallel_degree + self.tensor_model_parallel_size = tensor_model_parallel_size self.tensor_parallel_rank = tensor_parallel_rank - if tensor_parallel_degree > 1: + if tensor_model_parallel_size > 1: use_fuse_matmul_bias = False if current_platform.is_maca() or current_platform.is_iluvatar() else True self.qkv = ColumnParallelLinear( dim, @@ -199,7 +199,7 @@ def __init__( self.head_dim = dim // num_heads # must added self.num_heads = num_heads self.hidden_size = dim - self.num_heads_per_rank = divide(self.num_heads, self.tensor_parallel_degree) + self.num_heads_per_rank = divide(self.num_heads, self.tensor_model_parallel_size) def weight_loader(self, param, loaded_weight, loaded_shard_id: Optional[str] = None): weight_need_transpose = getattr(param, "weight_need_transpose", False) @@ -209,7 +209,9 @@ def weight_loader(self, param, loaded_weight, loaded_shard_id: Optional[str] = N if load_bias: head_dim = self.hidden_size // self.num_heads shard_weight = loaded_weight[...].reshape([3, self.num_heads, head_dim]) - shard_weight = paddle.split(shard_weight, self.tensor_parallel_degree, axis=-2)[self.tensor_parallel_rank] + shard_weight = paddle.split(shard_weight, self.tensor_model_parallel_size, axis=-2)[ + self.tensor_parallel_rank + ] shard_weight = shard_weight.reshape([-1]) else: shard_weight = loaded_weight[...].reshape( @@ -220,7 +222,9 @@ def weight_loader(self, param, loaded_weight, loaded_shard_id: Optional[str] = N self.head_dim, ] ) - shard_weight = paddle.split(shard_weight, self.tensor_parallel_degree, axis=-2)[self.tensor_parallel_rank] + shard_weight = paddle.split(shard_weight, self.tensor_model_parallel_size, axis=-2)[ + self.tensor_parallel_rank + ] shard_weight = shard_weight.reshape([self.hidden_size, -1]) shard_weight = get_tensor(shard_weight) shard_weight = fd_cast(shard_weight, param) @@ -252,7 +256,7 @@ def forward( [ seq_length, 3, - self.num_heads // self.tensor_parallel_degree, + self.num_heads // self.tensor_model_parallel_size, -1, ] ) @@ -332,13 +336,13 @@ def __init__( dim: int, hidden_dim: int, hidden_act: str, - tensor_parallel_degree: int = 1, + tensor_model_parallel_size: int = 1, model_format: str = "", ) -> None: super().__init__() - self.tensor_parallel_degree = tensor_parallel_degree + self.tensor_model_parallel_size = tensor_model_parallel_size - if self.tensor_parallel_degree > 1: + if self.tensor_model_parallel_size > 1: self.fc1 = ColumnParallelLinear( dim, hidden_dim, @@ -418,7 +422,7 @@ class DFNRopeVisionBlock(nn.Layer): def __init__( self, config, - tensor_parallel_degree: int, + tensor_model_parallel_size: int, tensor_parallel_rank: int, attn_implementation: str = "sdpa", model_format: str = "", @@ -437,7 +441,7 @@ def __init__( self.attn = VisionFlashAttention2( config.embed_dim, num_heads=config.num_heads, - tensor_parallel_degree=tensor_parallel_degree, + tensor_model_parallel_size=tensor_model_parallel_size, tensor_parallel_rank=tensor_parallel_rank, model_format=model_format, ) @@ -445,7 +449,7 @@ def __init__( dim=config.embed_dim, hidden_dim=mlp_hidden_dim, hidden_act=config.hidden_act, - tensor_parallel_degree=tensor_parallel_degree, + tensor_model_parallel_size=tensor_model_parallel_size, model_format=model_format, ) self.config = config diff --git a/fastdeploy/model_executor/models/ernie4_5_vl/ernie4_5_vl_moe.py b/fastdeploy/model_executor/models/ernie4_5_vl/ernie4_5_vl_moe.py index 5b7e9bd1172..43e0505b077 100644 --- a/fastdeploy/model_executor/models/ernie4_5_vl/ernie4_5_vl_moe.py +++ b/fastdeploy/model_executor/models/ernie4_5_vl/ernie4_5_vl_moe.py @@ -978,7 +978,7 @@ def _get_tensor_parallel_mappings(cls, config: PretrainedConfig, is_split=True): fn = split_or_merge_func_v1( is_split=is_split, - tensor_parallel_degree=config.tensor_model_parallel_size, + tensor_model_parallel_size=config.tensor_model_parallel_size, tensor_parallel_rank=config.tensor_parallel_rank, num_attention_heads=config.num_attention_heads, num_key_value_heads=config.num_key_value_heads, @@ -986,7 +986,7 @@ def _get_tensor_parallel_mappings(cls, config: PretrainedConfig, is_split=True): ) vision_fn = split_or_merge_func_v1( is_split=is_split, - tensor_parallel_degree=config.tensor_model_parallel_size, + tensor_model_parallel_size=config.tensor_model_parallel_size, tensor_parallel_rank=config.tensor_parallel_rank, num_attention_heads=config.vision_config.get("num_heads"), num_key_value_heads=config.vision_config.get("num_heads"), diff --git a/fastdeploy/model_executor/models/ernie4_5_vl/modeling_resampler.py b/fastdeploy/model_executor/models/ernie4_5_vl/modeling_resampler.py index 320827b0b90..ebeb8ef7dac 100644 --- a/fastdeploy/model_executor/models/ernie4_5_vl/modeling_resampler.py +++ b/fastdeploy/model_executor/models/ernie4_5_vl/modeling_resampler.py @@ -155,7 +155,7 @@ def __init__( self.temporal_conv_size = temporal_conv_size self.use_recompute_resampler = False self.use_temporal_conv = True - self.tensor_parallel_degree = config.pretrained_config.tensor_model_parallel_size + self.tensor_model_parallel_size = config.pretrained_config.tensor_model_parallel_size self.prefix_name = prefix_name # for 空间四合一 @@ -174,7 +174,7 @@ def __init__( has_bias=True, fuse_matmul_bias=use_fuse_matmul_bias, ) - if self.tensor_parallel_degree > 1 + if self.tensor_model_parallel_size > 1 else nn.Linear(self.spatial_dim, self.spatial_dim) ), nn.GELU(), @@ -206,7 +206,7 @@ def __init__( out_config.hidden_size = out_dim self.after_norm = RMSNorm(out_config) - if self.tensor_parallel_degree > 1: + if self.tensor_model_parallel_size > 1: set_weight_attrs(self.spatial_linear[0].weight, {"output_dim": False}) def spatial_conv_reshape(self, x, spatial_conv_size): @@ -232,17 +232,17 @@ def fwd_spatial(x): x = self.spatial_conv_reshape(x, self.spatial_conv_size) num_pad = 0 - if self.tensor_parallel_degree > 1: + if self.tensor_model_parallel_size > 1: num_pad = ( - x.shape[0] + self.tensor_parallel_degree - 1 - ) // self.tensor_parallel_degree * self.tensor_parallel_degree - x.shape[0] + x.shape[0] + self.tensor_model_parallel_size - 1 + ) // self.tensor_model_parallel_size * self.tensor_model_parallel_size - x.shape[0] if num_pad > 0: x = paddle.nn.functional.pad(x, [0, num_pad, 0, 0]) x = self.spatial_linear(x) - if self.tensor_parallel_degree > 1: + if self.tensor_model_parallel_size > 1: x = AllGatherOp.apply(x) if num_pad > 0: @@ -298,13 +298,13 @@ def fwd_placeholder(x, grid_thw, to_tensor=False): def fwd_temporal(x): num_pad = 0 - if self.tensor_parallel_degree > 1: + if self.tensor_model_parallel_size > 1: num_pad = ( - x.shape[0] + self.tensor_parallel_degree - 1 - ) // self.tensor_parallel_degree * self.tensor_parallel_degree - x.shape[0] + x.shape[0] + self.tensor_model_parallel_size - 1 + ) // self.tensor_model_parallel_size * self.tensor_model_parallel_size - x.shape[0] if num_pad > 0: x = paddle.nn.functional.pad(x, [0, num_pad, 0, 0]) - if self.tensor_parallel_degree > 1: + if self.tensor_model_parallel_size > 1: x = ScatterOp.apply(x, axis=0) x = self.temporal_linear(x) @@ -316,7 +316,7 @@ def fwd_temporal(x): def fwd_mlp(x): x = self.mlp(x) x = self.after_norm(x) - if self.tensor_parallel_degree > 1: + if self.tensor_model_parallel_size > 1: x = AllGatherOp.apply(x) return x diff --git a/fastdeploy/model_executor/models/glm4_moe.py b/fastdeploy/model_executor/models/glm4_moe.py index 495341d837e..42c3233f31c 100644 --- a/fastdeploy/model_executor/models/glm4_moe.py +++ b/fastdeploy/model_executor/models/glm4_moe.py @@ -549,7 +549,7 @@ def _get_tensor_parallel_mappings(cls, config, is_split=True): fn = split_or_merge_func_v1( is_split=is_split, - tensor_parallel_degree=config.tensor_model_parallel_size, + tensor_model_parallel_size=config.tensor_model_parallel_size, tensor_parallel_rank=config.tensor_parallel_rank, num_attention_heads=config.num_attention_heads, num_key_value_heads=config.num_key_value_heads, diff --git a/fastdeploy/model_executor/models/qwen2_5_vl/dfnrope/modeling.py b/fastdeploy/model_executor/models/qwen2_5_vl/dfnrope/modeling.py index e8184f8d3ae..f2f49605c0e 100644 --- a/fastdeploy/model_executor/models/qwen2_5_vl/dfnrope/modeling.py +++ b/fastdeploy/model_executor/models/qwen2_5_vl/dfnrope/modeling.py @@ -78,16 +78,16 @@ def __init__( self, dim: int, num_heads: int = 16, - tensor_parallel_degree: int = 1, + tensor_model_parallel_size: int = 1, tensor_parallel_rank: int = 0, model_format: str = "", ) -> None: super().__init__() self.num_heads = num_heads - self.tensor_parallel_degree = tensor_parallel_degree + self.tensor_model_parallel_size = tensor_model_parallel_size self.tensor_parallel_rank = tensor_parallel_rank - if tensor_parallel_degree > 1: + if tensor_model_parallel_size > 1: self.qkv = ColumnParallelLinear( dim, dim * 3, @@ -122,7 +122,7 @@ def __init__( self.head_dim = dim // num_heads # must added self.num_heads = num_heads self.hidden_size = dim - self.num_heads_per_rank = divide(self.num_heads, self.tensor_parallel_degree) + self.num_heads_per_rank = divide(self.num_heads, self.tensor_model_parallel_size) def weight_loader(self, param, loaded_weight, loaded_shard_id: Optional[str] = None): weight_need_transpose = getattr(param, "weight_need_transpose", False) @@ -132,7 +132,9 @@ def weight_loader(self, param, loaded_weight, loaded_shard_id: Optional[str] = N if load_bias: head_dim = self.hidden_size // self.num_heads shard_weight = loaded_weight[...].reshape([3, self.num_heads, head_dim]) - shard_weight = paddle.split(shard_weight, self.tensor_parallel_degree, axis=-2)[self.tensor_parallel_rank] + shard_weight = paddle.split(shard_weight, self.tensor_model_parallel_size, axis=-2)[ + self.tensor_parallel_rank + ] shard_weight = shard_weight.reshape([-1]) else: shard_weight = loaded_weight[...].reshape( @@ -143,7 +145,9 @@ def weight_loader(self, param, loaded_weight, loaded_shard_id: Optional[str] = N self.head_dim, ] ) - shard_weight = paddle.split(shard_weight, self.tensor_parallel_degree, axis=-2)[self.tensor_parallel_rank] + shard_weight = paddle.split(shard_weight, self.tensor_model_parallel_size, axis=-2)[ + self.tensor_parallel_rank + ] shard_weight = shard_weight.reshape([self.hidden_size, -1]) shard_weight = fd_cast(shard_weight, param) assert param.shape == shard_weight.shape, ( @@ -176,7 +180,7 @@ def forward( [ seq_length, 3, - self.num_heads // self.tensor_parallel_degree, + self.num_heads // self.tensor_model_parallel_size, -1, ] ) @@ -265,13 +269,13 @@ def __init__( hidden_dim: int, bias: bool = False, hidden_act: str = "gelu", - tensor_parallel_degree: int = 1, + tensor_model_parallel_size: int = 1, model_format: str = "", ) -> None: super().__init__() - self.tensor_parallel_degree = tensor_parallel_degree + self.tensor_model_parallel_size = tensor_model_parallel_size - if self.tensor_parallel_degree > 1: + if self.tensor_model_parallel_size > 1: self.gate_proj = ColumnParallelLinear( dim, hidden_dim, @@ -414,7 +418,7 @@ def __init__( num_heads: int, mlp_hidden_dim: int, hidden_act: str = "gelu", - tensor_parallel_degree: int = 1, + tensor_model_parallel_size: int = 1, tensor_parallel_rank: int = 0, attn_implementation: str = "sdpa", model_format: str = "", @@ -432,7 +436,7 @@ def __init__( self.attn = VisionFlashAttention2( dim=dim, num_heads=num_heads, - tensor_parallel_degree=tensor_parallel_degree, + tensor_model_parallel_size=tensor_model_parallel_size, tensor_parallel_rank=tensor_parallel_rank, model_format=model_format, ) @@ -442,7 +446,7 @@ def __init__( hidden_dim=mlp_hidden_dim, bias=True, hidden_act=hidden_act, - tensor_parallel_degree=tensor_parallel_degree, + tensor_model_parallel_size=tensor_model_parallel_size, model_format=model_format, ) @@ -558,7 +562,7 @@ def __init__(self, config, prefix_name: str = "") -> None: num_heads=config.vision_config.num_heads, mlp_hidden_dim=config.vision_config.intermediate_size, hidden_act=config.vision_config.hidden_act, - tensor_parallel_degree=config.pretrained_config.tensor_model_parallel_size, + tensor_model_parallel_size=config.pretrained_config.tensor_model_parallel_size, tensor_parallel_rank=config.pretrained_config.tensor_parallel_rank, model_format=model_format, ) diff --git a/fastdeploy/model_executor/models/qwen2_5_vl/qwen2_5_vl.py b/fastdeploy/model_executor/models/qwen2_5_vl/qwen2_5_vl.py index 021028edff5..2846487478b 100644 --- a/fastdeploy/model_executor/models/qwen2_5_vl/qwen2_5_vl.py +++ b/fastdeploy/model_executor/models/qwen2_5_vl/qwen2_5_vl.py @@ -388,7 +388,7 @@ def _get_tensor_parallel_mappings(cls, config: PretrainedConfig, is_split=True): fn = split_or_merge_func_v1( is_split=is_split, - tensor_parallel_degree=config.tensor_model_parallel_size, + tensor_model_parallel_size=config.tensor_model_parallel_size, tensor_parallel_rank=config.tensor_parallel_rank, num_attention_heads=config.num_attention_heads, num_key_value_heads=config.num_key_value_heads, @@ -397,7 +397,7 @@ def _get_tensor_parallel_mappings(cls, config: PretrainedConfig, is_split=True): vision_fn = split_or_merge_func_v1( is_split=is_split, - tensor_parallel_degree=config.tensor_model_parallel_size, + tensor_model_parallel_size=config.tensor_model_parallel_size, tensor_parallel_rank=config.tensor_parallel_rank, num_attention_heads=config.vision_config.get("num_heads"), num_key_value_heads=config.vision_config.get("num_heads"), diff --git a/fastdeploy/model_executor/models/tp_utils.py b/fastdeploy/model_executor/models/tp_utils.py index 367d8cffb25..48c4ec98d42 100644 --- a/fastdeploy/model_executor/models/tp_utils.py +++ b/fastdeploy/model_executor/models/tp_utils.py @@ -202,7 +202,7 @@ def build_expanded_keys( def gqa_qkv_split_func( - tensor_parallel_degree, + tensor_model_parallel_size, tensor_parallel_rank, num_attention_heads, num_key_value_heads, @@ -258,15 +258,17 @@ def split_tensor(tensor, degree): else: return np.split(tensor, degree, axis=0) - q_list = split_tensor(q, tensor_parallel_degree) - repeat_kv = num_key_value_heads < tensor_parallel_degree and tensor_parallel_degree % num_key_value_heads == 0 - repeat_num = tensor_parallel_degree // num_key_value_heads if repeat_kv else 1 + q_list = split_tensor(q, tensor_model_parallel_size) + repeat_kv = ( + num_key_value_heads < tensor_model_parallel_size and tensor_model_parallel_size % num_key_value_heads == 0 + ) + repeat_num = tensor_model_parallel_size // num_key_value_heads if repeat_kv else 1 if repeat_kv: k_list = split_tensor(k, num_key_value_heads) v_list = split_tensor(v, num_key_value_heads) else: - k_list = split_tensor(k, tensor_parallel_degree) - v_list = split_tensor(v, tensor_parallel_degree) + k_list = split_tensor(k, tensor_model_parallel_size) + v_list = split_tensor(v, tensor_model_parallel_size) if tensor_parallel_rank is None: res = [] @@ -332,9 +334,9 @@ def gqa_qkv_merge_func(num_attention_heads, num_key_value_heads, head_dim): def fn(weight_list, is_column=True): """fn""" - tensor_parallel_degree = len(weight_list) - local_num_attention_heads = num_attention_heads // tensor_parallel_degree - local_num_key_value_heads = num_key_value_heads // tensor_parallel_degree + tensor_model_parallel_size = len(weight_list) + local_num_attention_heads = num_attention_heads // tensor_model_parallel_size + local_num_key_value_heads = num_key_value_heads // tensor_model_parallel_size is_paddle_tensor = not isinstance(weight_list[0], np.ndarray) @@ -391,7 +393,7 @@ def slice_tensor(tensor, start, end): def split_or_merge_qkv_func( is_split, - tensor_parallel_degree, + tensor_model_parallel_size, tensor_parallel_rank, num_attention_heads, num_key_value_heads, @@ -402,7 +404,7 @@ def split_or_merge_qkv_func( """ if is_split: return gqa_qkv_split_func( - tensor_parallel_degree=tensor_parallel_degree, + tensor_model_parallel_size=tensor_model_parallel_size, tensor_parallel_rank=tensor_parallel_rank, num_attention_heads=num_attention_heads, num_key_value_heads=num_key_value_heads, @@ -418,7 +420,7 @@ def split_or_merge_qkv_func( def split_or_merge_func_v1( is_split, - tensor_parallel_degree, + tensor_model_parallel_size, tensor_parallel_rank, num_attention_heads=None, num_key_value_heads=None, @@ -435,14 +437,14 @@ def fn(x, **kwargs): if is_tp_row_bias: tensor = x[:, ...] if isinstance(tensor, paddle.Tensor): - res = tensor / tensor_parallel_degree + res = tensor / tensor_model_parallel_size else: - res = paddle.to_tensor(tensor, paddle.get_default_dtype()) / tensor_parallel_degree + res = paddle.to_tensor(tensor, paddle.get_default_dtype()) / tensor_model_parallel_size return res elif is_gqa: func = split_or_merge_qkv_func( is_split=is_split, - tensor_parallel_degree=tensor_parallel_degree, + tensor_model_parallel_size=tensor_model_parallel_size, tensor_parallel_rank=tensor_parallel_rank, num_attention_heads=num_attention_heads, num_key_value_heads=num_key_value_heads, @@ -453,7 +455,7 @@ def fn(x, **kwargs): else: func = split_or_merge_func( is_split=is_split, - tensor_model_parallel_size=tensor_parallel_degree, + tensor_model_parallel_size=tensor_model_parallel_size, tensor_parallel_rank=tensor_parallel_rank, num_attention_heads=num_attention_heads, ) diff --git a/fastdeploy/worker/worker_process.py b/fastdeploy/worker/worker_process.py index 4fcecbddd78..d0b97f7fd78 100644 --- a/fastdeploy/worker/worker_process.py +++ b/fastdeploy/worker/worker_process.py @@ -129,7 +129,7 @@ def init_distributed_environment(seed: int = 20) -> Tuple[int, int]: def update_fd_config_for_mm(fd_config: FDConfig) -> None: architectures = fd_config.model_config.architectures if fd_config.model_config.enable_mm and ErnieArchitectures.contains_ernie_arch(architectures): - fd_config.model_config.tensor_parallel_degree = fd_config.parallel_config.tensor_parallel_size + fd_config.model_config.tensor_model_parallel_size = fd_config.parallel_config.tensor_parallel_size fd_config.model_config.tensor_parallel_rank = fd_config.parallel_config.tensor_parallel_rank fd_config.model_config.vision_config.dtype = fd_config.model_config.dtype diff --git a/tests/model_executor/test_tp_utils.py b/tests/model_executor/test_tp_utils.py index 666733d558e..8953bb9637d 100644 --- a/tests/model_executor/test_tp_utils.py +++ b/tests/model_executor/test_tp_utils.py @@ -396,7 +396,7 @@ def test_invalid_placeholder_raises(self): class GQATensorOpsTest(unittest.TestCase): def test_gqa_split_returns_all_partitions(self): func = _tp_utils.gqa_qkv_split_func( - tensor_parallel_degree=2, + tensor_model_parallel_size=2, tensor_parallel_rank=None, num_attention_heads=4, num_key_value_heads=2, @@ -411,7 +411,7 @@ def test_gqa_split_returns_all_partitions(self): def test_gqa_split_with_rank_and_repeat_kv(self): func = _tp_utils.gqa_qkv_split_func( - tensor_parallel_degree=2, + tensor_model_parallel_size=2, tensor_parallel_rank=1, num_attention_heads=2, num_key_value_heads=1, @@ -423,7 +423,7 @@ def test_gqa_split_with_rank_and_repeat_kv(self): def test_gqa_split_on_matrix_rows(self): func = _tp_utils.gqa_qkv_split_func( - tensor_parallel_degree=2, + tensor_model_parallel_size=2, tensor_parallel_rank=None, num_attention_heads=4, num_key_value_heads=2, @@ -454,7 +454,7 @@ def test_split_or_merge_qkv_dispatch(self): def test_split_or_merge_func_v1_row_bias(self): fn = _tp_utils.split_or_merge_func_v1( is_split=True, - tensor_parallel_degree=4, + tensor_model_parallel_size=4, tensor_parallel_rank=0, ) bias = np.ones(4, dtype=np.float32) @@ -464,7 +464,7 @@ def test_split_or_merge_func_v1_row_bias(self): def test_split_or_merge_func_v1_gqa_path(self): fn = _tp_utils.split_or_merge_func_v1( is_split=True, - tensor_parallel_degree=2, + tensor_model_parallel_size=2, tensor_parallel_rank=None, num_attention_heads=4, num_key_value_heads=2, @@ -477,7 +477,7 @@ def test_split_or_merge_func_v1_gqa_path(self): def test_split_or_merge_func_v1_default_path(self): fn = _tp_utils.split_or_merge_func_v1( is_split=False, - tensor_parallel_degree=2, + tensor_model_parallel_size=2, tensor_parallel_rank=None, num_attention_heads=4, ) From 11227e00bbeb40d9bebf481bfddd082f5312e1c3 Mon Sep 17 00:00:00 2001 From: Nyakku Shigure Date: Wed, 24 Dec 2025 15:23:46 +0800 Subject: [PATCH 4/7] [GraphOptimization] Wrap deep gemm and triton as python op (#5673) * [GraphOptimization] Wrap deep gemm and triton as python op * add unitest to _base_test && compatibility * paddle.static.MetaTensor -> "paddle.static.MetaTensor" * mv register_custom_python_op * rename yaml --------- Co-authored-by: DrRyanHuang --- .github/workflows/_base_test.yml | 18 +- fastdeploy/distributed/communication.py | 15 +- .../layers/moe/fused_moe_triton_backend.py | 346 +++++++++++------- .../layers/quantization/block_wise_fp8.py | 46 ++- fastdeploy/utils.py | 15 + tests/ce/deploy/ernie45t_21b_cinn_fp8.yaml | 8 + ...cinn.yaml => ernie45t_21b_cinn_wint4.yaml} | 0 tests/ce/deploy/ernie45t_21b_sot_fp8.yaml | 8 + ...b_sot.yaml => ernie45t_21b_sot_wint4.yaml} | 0 9 files changed, 316 insertions(+), 140 deletions(-) create mode 100644 tests/ce/deploy/ernie45t_21b_cinn_fp8.yaml rename tests/ce/deploy/{ernie45t_21b_cinn.yaml => ernie45t_21b_cinn_wint4.yaml} (100%) create mode 100644 tests/ce/deploy/ernie45t_21b_sot_fp8.yaml rename tests/ce/deploy/{ernie45t_21b_sot.yaml => ernie45t_21b_sot_wint4.yaml} (100%) diff --git a/.github/workflows/_base_test.yml b/.github/workflows/_base_test.yml index 4087a50ff41..378e0105ad3 100644 --- a/.github/workflows/_base_test.yml +++ b/.github/workflows/_base_test.yml @@ -233,14 +233,28 @@ jobs: curl -X POST http://0.0.0.0:${FLASK_PORT}/switch \ -H "Content-Type: application/json" \ - -d "{\"--model\": \"/MODELDATA/ernie-4_5-21b-a3b-bf16-paddle\", \"--config\": \"ernie45t_21b_sot.yaml\", \"--enable-logprob\": \"False\"}" + -d "{\"--model\": \"/MODELDATA/ernie-4_5-21b-a3b-bf16-paddle\", \"--config\": \"ernie45t_21b_sot_wint4.yaml\", \"--enable-logprob\": \"False\"}" check_service 360 export TEMPLATE=TOKEN_NORMAL python -m pytest -sv test_seed_usage.py -k "not test_seed_stream" || TEST_EXIT_CODE=1 curl -X POST http://0.0.0.0:${FLASK_PORT}/switch \ -H "Content-Type: application/json" \ - -d "{\"--model\": \"/MODELDATA/ernie-4_5-21b-a3b-bf16-paddle\", \"--config\": \"ernie45t_21b_cinn.yaml\", \"--enable-logprob\": \"False\"}" + -d "{\"--model\": \"/MODELDATA/ernie-4_5-21b-a3b-bf16-paddle\", \"--config\": \"ernie45t_21b_cinn_wint4.yaml\", \"--enable-logprob\": \"False\"}" + check_service 360 + export TEMPLATE=TOKEN_NORMAL + python -m pytest -sv test_seed_usage.py -k "not test_seed_stream" || TEST_EXIT_CODE=1 + + curl -X POST http://0.0.0.0:${FLASK_PORT}/switch \ + -H "Content-Type: application/json" \ + -d "{\"--model\": \"/MODELDATA/ernie-4_5-21b-a3b-bf16-paddle\", \"--config\": \"ernie45t_21b_sot_fp8.yaml\", \"--enable-logprob\": \"False\"}" + check_service 360 + export TEMPLATE=TOKEN_NORMAL + python -m pytest -sv test_seed_usage.py -k "not test_seed_stream" || TEST_EXIT_CODE=1 + + curl -X POST http://0.0.0.0:${FLASK_PORT}/switch \ + -H "Content-Type: application/json" \ + -d "{\"--model\": \"/MODELDATA/ernie-4_5-21b-a3b-bf16-paddle\", \"--config\": \"ernie45t_21b_cinn_fp8.yaml\", \"--enable-logprob\": \"False\"}" check_service 360 export TEMPLATE=TOKEN_NORMAL python -m pytest -sv test_seed_usage.py -k "not test_seed_stream" || TEST_EXIT_CODE=1 diff --git a/fastdeploy/distributed/communication.py b/fastdeploy/distributed/communication.py index 922fbb3df8e..039f545c324 100644 --- a/fastdeploy/distributed/communication.py +++ b/fastdeploy/distributed/communication.py @@ -20,6 +20,8 @@ import paddle.distributed as dist from paddle.distributed import fleet +from fastdeploy.utils import register_custom_python_op + _TP_AR = None @@ -50,7 +52,18 @@ def custom_ar_clear_ipc_handles(): try: - @paddle.jit.marker.unified + def tensor_model_parallel_all_reduce_infer_meta(x: "paddle.static.MetaTensor", group_) -> paddle.static.MetaTensor: + return paddle.static.MetaTensor(shape=x.shape, dtype=x.dtype) + + @register_custom_python_op( + name="tensor_model_parallel_all_reduce", + infer_meta=tensor_model_parallel_all_reduce_infer_meta, + input_names=[ + "input_", + ], + output_names=["out"], + inplace_map={}, + ) def tensor_model_parallel_all_reduce( input_: paddle.Tensor, group_: paddle.distributed.communication.group.Group = None, diff --git a/fastdeploy/model_executor/layers/moe/fused_moe_triton_backend.py b/fastdeploy/model_executor/layers/moe/fused_moe_triton_backend.py index da705357c12..c332adf8cc1 100644 --- a/fastdeploy/model_executor/layers/moe/fused_moe_triton_backend.py +++ b/fastdeploy/model_executor/layers/moe/fused_moe_triton_backend.py @@ -28,7 +28,7 @@ set_weight_attrs, weight_fully_copied, ) -from fastdeploy.utils import ceil_div +from fastdeploy.utils import ceil_div, register_custom_python_op from ..quantization.quant_base import QuantMethodBase @@ -1141,6 +1141,196 @@ def apply( return out +def python_op_fused_moe_kernel_paddle_infer_meta( + x, + layer_added_weight_attrs_0, + layer_added_scale_attrs_0, + layer_added_weight_attrs1, + layer_added_scale_attrs1, + gate_out, + gate_correction_bias, + top_k: int, + N1: int, + N2: int, + num_local_experts: int, + moe_intermediate_size: int, + hidden_size: int, + config: dict, + quant_config, + topk_ids_hookfunc, +): + token_num = x.shape[0] + return paddle.static.MetaTensor(shape=[token_num, hidden_size], dtype=x.dtype) + + +@register_custom_python_op( + name="python_op_fused_moe_kernel_paddle", + infer_meta=python_op_fused_moe_kernel_paddle_infer_meta, + input_names=[ + "x", + "layer_added_weight_attrs_0", + "layer_added_scale_attrs_0", + "layer_added_weight_attrs1", + "layer_added_scale_attrs1", + "gate_out", + "gate_correction_bias", + ], + output_names=["out"], + inplace_map={}, +) +def python_op_fused_moe_kernel_paddle( + x: paddle.Tensor, + layer_added_weight_attrs_0: paddle.Tensor, + layer_added_scale_attrs_0: paddle.Tensor, + layer_added_weight_attrs1: paddle.Tensor, + layer_added_scale_attrs1: paddle.Tensor, + gate_out: paddle.Tensor, + gate_correction_bias: paddle.Tensor, + top_k: int, + N1: int, + N2: int, + num_local_experts: int, + moe_intermediate_size: int, + hidden_size: int, + config: dict, + quant_config, + topk_ids_hookfunc, +): + + token_num = x.shape[0] + if x.shape[0] == 0: + return paddle.zeros([token_num, hidden_size], dtype=x.dtype) + + topk_ids, topk_weights = fastdeploy.model_executor.ops.gpu.moe_topk_select( + gate_out, + gate_correction_bias, + top_k, + True, # apply_norm_weight + False, + ) + if topk_ids_hookfunc is not None: + topk_ids_hookfunc(topk_ids=topk_ids) + + from fastdeploy.model_executor.ops.gpu import tritonmoe_preprocess_func + + sorted_token_ids, expert_ids, num_tokens_post_padded = tritonmoe_preprocess_func( + topk_ids, num_local_experts, config["BLOCK_SIZE_M"] + ) + # cache13 = create_empty_tensor(tuple([token_num * top_k * max(N1, N2)]), x.dtype) + cache13 = paddle.empty([token_num * top_k * max(N1, N2)], dtype=x.dtype) + intermediate_cache1 = cache13[: token_num * top_k * N1].view([token_num * top_k, N1]) + max_num_tokens_padded = sorted_token_ids.shape[0] + + grid = ( + ceil_div(max_num_tokens_padded, config["BLOCK_SIZE_M"]) + * ceil_div(moe_intermediate_size * 2, config["BLOCK_SIZE_N"]), + ) + + from .triton_moe_kernels import fused_moe_kernel_paddle + + x_q, x_scale = fastdeploy.model_executor.ops.gpu.per_token_quant(x, quant_config.weight_block_size[0]) + + fused_moe_kernel_paddle[grid]( + x_q, + layer_added_weight_attrs_0, + intermediate_cache1, + x_scale, + layer_added_scale_attrs_0, + None, + sorted_token_ids, + expert_ids, + num_tokens_post_padded, + max_num_tokens_padded, + token_num * top_k, + N=moe_intermediate_size * 2, + K=hidden_size, + stride_am=x_q.strides[0], + stride_ak=x_q.strides[1], + stride_be=layer_added_weight_attrs_0.strides[0], + stride_bk=layer_added_weight_attrs_0.strides[2], + stride_bn=layer_added_weight_attrs_0.strides[1], + stride_cm=intermediate_cache1.strides[0], + stride_cn=intermediate_cache1.strides[1], + # + stride_asm=x_scale.strides[0], # only used in blockwise fp8 + stride_ask=x_scale.strides[1], # only used in blockwise fp8 + stride_bse=layer_added_scale_attrs_0.strides[0], + stride_bsk=layer_added_scale_attrs_0.strides[2], + stride_bsn=layer_added_scale_attrs_0.strides[1], + group_n=quant_config.weight_block_size[1], + group_k=quant_config.weight_block_size[0], + # Meta-parameters + BLOCK_SIZE_M=config["BLOCK_SIZE_M"], + BLOCK_SIZE_N=config["BLOCK_SIZE_N"], + BLOCK_SIZE_K=config["BLOCK_SIZE_K"], + GROUP_SIZE_M=config["GROUP_SIZE_M"], + MUL_ROUTED_WEIGHT=False, + top_k=top_k, + compute_type_enum=1, + use_fp8_w8a8=True, + use_int8_w8a16=False, + per_channel_quant=False, + even_Ks=hidden_size % config["BLOCK_SIZE_K"] == 0, + ) + + intermediate_cache2 = paddle.incubate.nn.functional.swiglu(intermediate_cache1) + + intermediate_cache3 = cache13[: token_num * top_k * N2].view([token_num * top_k, N2]) + + grid = (ceil_div(max_num_tokens_padded, config["BLOCK_SIZE_M"]) * ceil_div(hidden_size, config["BLOCK_SIZE_N"]),) + + x_q, x_scale = fastdeploy.model_executor.ops.gpu.per_token_quant( + intermediate_cache2, quant_config.weight_block_size[0] + ) + + fused_moe_kernel_paddle[grid]( + x_q, + layer_added_weight_attrs1, + intermediate_cache3, + x_scale, + layer_added_scale_attrs1, + topk_weights, + sorted_token_ids, + expert_ids, + num_tokens_post_padded, + max_num_tokens_padded, + token_num * top_k, + N=hidden_size, + K=moe_intermediate_size, + stride_am=x_q.strides[0], + stride_ak=x_q.strides[1], + stride_be=layer_added_weight_attrs1.strides[0], + stride_bk=layer_added_weight_attrs1.strides[2], + stride_bn=layer_added_weight_attrs1.strides[1], + stride_cm=intermediate_cache3.strides[0], + stride_cn=intermediate_cache3.strides[1], + stride_asm=x_scale.strides[0], # only used in blockwise fp8 + stride_ask=x_scale.strides[1], # only used in blockwise fp8 + stride_bse=layer_added_scale_attrs1.strides[0], + stride_bsk=layer_added_scale_attrs1.strides[2], + stride_bsn=layer_added_scale_attrs1.strides[1], + group_n=quant_config.weight_block_size[1], + group_k=quant_config.weight_block_size[0], + # Meta-parameters + BLOCK_SIZE_M=config["BLOCK_SIZE_M"], + BLOCK_SIZE_N=config["BLOCK_SIZE_N"], + BLOCK_SIZE_K=config["BLOCK_SIZE_K"], + GROUP_SIZE_M=config["GROUP_SIZE_M"], + MUL_ROUTED_WEIGHT=True, + top_k=1, + compute_type_enum=1, + use_fp8_w8a8=True, + use_int8_w8a16=False, + per_channel_quant=False, + even_Ks=moe_intermediate_size % config["BLOCK_SIZE_K"] == 0, + ) + + intermediate_cache3.reshape_([token_num, top_k, hidden_size]) + out = intermediate_cache3.sum(axis=1) + + return out + + class BlockWiseFP8MoEMethod(QuantMethodBase): """ Use Triton Group Gemm to compute Fused BlockWise FP8 Quant MoE. @@ -1479,9 +1669,7 @@ def apply( """ Triton compute Fused MoE. """ - token_num = x.shape[0] - if token_num == 0: - return paddle.zeros([token_num, layer.hidden_size], dtype=x.dtype) + gate_out = gate(x.cast("float32")) top_k = layer.top_k num_local_experts = layer.num_local_experts @@ -1490,15 +1678,12 @@ def apply( E, N1, _ = getattr(layer, self.added_weight_attrs[0]).shape N2 = getattr(layer, self.added_weight_attrs[1]).shape[1] - topk_ids, topk_weights = fastdeploy.model_executor.ops.gpu.moe_topk_select( - gate_out, - layer.gate_correction_bias, - layer.top_k, - True, # apply_norm_weight - False, - ) - if topk_ids_hookfunc is not None: - topk_ids_hookfunc(topk_ids=topk_ids) + gate_correction_bias = layer.gate_correction_bias + # for triton op input + layer_added_weight_attrs_0 = getattr(layer, self.added_weight_attrs[0]) + layer_added_scale_attrs_0 = getattr(layer, self.added_scale_attrs[0]) + layer_added_weight_attrs1 = getattr(layer, self.added_weight_attrs[1]) + layer_added_scale_attrs1 = getattr(layer, self.added_scale_attrs[1]) config = { "BLOCK_SIZE_M": 64, @@ -1508,123 +1693,22 @@ def apply( "num_warps": 4, "num_stages": 3, } - from fastdeploy.model_executor.ops.gpu import tritonmoe_preprocess_func - - sorted_token_ids, expert_ids, num_tokens_post_padded = tritonmoe_preprocess_func( - topk_ids, num_local_experts, config["BLOCK_SIZE_M"] - ) - # cache13 = create_empty_tensor(tuple([token_num * top_k * max(N1, N2)]), x.dtype) - cache13 = paddle.empty([token_num * top_k * max(N1, N2)], dtype=x.dtype) - intermediate_cache1 = cache13[: token_num * top_k * N1].view([token_num * top_k, N1]) - max_num_tokens_padded = sorted_token_ids.shape[0] - - grid = ( - ceil_div(max_num_tokens_padded, config["BLOCK_SIZE_M"]) - * ceil_div(moe_intermediate_size * 2, config["BLOCK_SIZE_N"]), - ) - - from .triton_moe_kernels import fused_moe_kernel_paddle - - x_q, x_scale = fastdeploy.model_executor.ops.gpu.per_token_quant(x, self.quant_config.weight_block_size[0]) - - fused_moe_kernel_paddle[grid]( - x_q, - getattr(layer, self.added_weight_attrs[0]), - intermediate_cache1, - x_scale, - getattr(layer, self.added_scale_attrs[0]), - None, - sorted_token_ids, - expert_ids, - num_tokens_post_padded, - max_num_tokens_padded, - token_num * top_k, - N=moe_intermediate_size * 2, - K=hidden_size, - stride_am=x_q.strides[0], - stride_ak=x_q.strides[1], - stride_be=getattr(layer, self.added_weight_attrs[0]).strides[0], - stride_bk=getattr(layer, self.added_weight_attrs[0]).strides[2], - stride_bn=getattr(layer, self.added_weight_attrs[0]).strides[1], - stride_cm=intermediate_cache1.strides[0], - stride_cn=intermediate_cache1.strides[1], - # - stride_asm=x_scale.strides[0], # only used in blockwise fp8 - stride_ask=x_scale.strides[1], # only used in blockwise fp8 - stride_bse=getattr(layer, self.added_scale_attrs[0]).strides[0], - stride_bsk=getattr(layer, self.added_scale_attrs[0]).strides[2], - stride_bsn=getattr(layer, self.added_scale_attrs[0]).strides[1], - group_n=self.quant_config.weight_block_size[1], - group_k=self.quant_config.weight_block_size[0], - # Meta-parameters - BLOCK_SIZE_M=config["BLOCK_SIZE_M"], - BLOCK_SIZE_N=config["BLOCK_SIZE_N"], - BLOCK_SIZE_K=config["BLOCK_SIZE_K"], - GROUP_SIZE_M=config["GROUP_SIZE_M"], - MUL_ROUTED_WEIGHT=False, - top_k=top_k, - compute_type_enum=1, - use_fp8_w8a8=True, - use_int8_w8a16=False, - per_channel_quant=False, - even_Ks=hidden_size % config["BLOCK_SIZE_K"] == 0, - ) - - intermediate_cache2 = paddle.incubate.nn.functional.swiglu(intermediate_cache1) - intermediate_cache3 = cache13[: token_num * top_k * N2].view([token_num * top_k, N2]) - - grid = ( - ceil_div(max_num_tokens_padded, config["BLOCK_SIZE_M"]) * ceil_div(hidden_size, config["BLOCK_SIZE_N"]), - ) - - x_q, x_scale = fastdeploy.model_executor.ops.gpu.per_token_quant( - intermediate_cache2, self.quant_config.weight_block_size[0] - ) - - fused_moe_kernel_paddle[grid]( - x_q, - getattr(layer, self.added_weight_attrs[1]), - intermediate_cache3, - x_scale, - getattr(layer, self.added_scale_attrs[1]), - topk_weights, - sorted_token_ids, - expert_ids, - num_tokens_post_padded, - max_num_tokens_padded, - token_num * top_k, - N=hidden_size, - K=moe_intermediate_size, - stride_am=x_q.strides[0], - stride_ak=x_q.strides[1], - stride_be=getattr(layer, self.added_weight_attrs[1]).strides[0], - stride_bk=getattr(layer, self.added_weight_attrs[1]).strides[2], - stride_bn=getattr(layer, self.added_weight_attrs[1]).strides[1], - stride_cm=intermediate_cache3.strides[0], - stride_cn=intermediate_cache3.strides[1], - stride_asm=x_scale.strides[0], # only used in blockwise fp8 - stride_ask=x_scale.strides[1], # only used in blockwise fp8 - stride_bse=getattr(layer, self.added_scale_attrs[1]).strides[0], - stride_bsk=getattr(layer, self.added_scale_attrs[1]).strides[2], - stride_bsn=getattr(layer, self.added_scale_attrs[1]).strides[1], - group_n=self.quant_config.weight_block_size[1], - group_k=self.quant_config.weight_block_size[0], - # Meta-parameters - BLOCK_SIZE_M=config["BLOCK_SIZE_M"], - BLOCK_SIZE_N=config["BLOCK_SIZE_N"], - BLOCK_SIZE_K=config["BLOCK_SIZE_K"], - GROUP_SIZE_M=config["GROUP_SIZE_M"], - MUL_ROUTED_WEIGHT=True, - top_k=1, - compute_type_enum=1, - use_fp8_w8a8=True, - use_int8_w8a16=False, - per_channel_quant=False, - even_Ks=moe_intermediate_size % config["BLOCK_SIZE_K"] == 0, + return python_op_fused_moe_kernel_paddle( + x, + layer_added_weight_attrs_0, + layer_added_scale_attrs_0, + layer_added_weight_attrs1, + layer_added_scale_attrs1, + gate_out, + gate_correction_bias, + top_k, + N1, + N2, + num_local_experts, + moe_intermediate_size, + hidden_size, + config, + self.quant_config, + topk_ids_hookfunc, ) - - intermediate_cache3.reshape_([token_num, top_k, hidden_size]) - out = intermediate_cache3.sum(axis=1) - - return out diff --git a/fastdeploy/model_executor/layers/quantization/block_wise_fp8.py b/fastdeploy/model_executor/layers/quantization/block_wise_fp8.py index 59daa238480..d5af8106ff7 100644 --- a/fastdeploy/model_executor/layers/quantization/block_wise_fp8.py +++ b/fastdeploy/model_executor/layers/quantization/block_wise_fp8.py @@ -31,6 +31,7 @@ process_weight_transpose, set_weight_attrs, ) +from fastdeploy.utils import register_custom_python_op from ..utils import get_tensor, per_block_cast_to_fp8 from .quant_base import QuantConfigBase, QuantMethodBase @@ -81,6 +82,43 @@ def get_quant_method(self, layer) -> Optional[QuantMethodBase]: return BlockWiseFP8LinearMethod(self) +def deep_gemm_fp8_fp8_bf16_nt_infer_meta( + x_meta: "paddle.static.MetaTensor", + x_scale_tensor_meta: "paddle.static.MetaTensor", + layer_weight_meta: "paddle.static.MetaTensor", + layer_weight_scale_inv_meta: "paddle.static.MetaTensor", + linear_out_meta: "paddle.static.MetaTensor", + layer_output_size: int, +): + return paddle.static.MetaTensor(shape=[x_meta.shape[0], layer_output_size], dtype=paddle.bfloat16) + + +@register_custom_python_op( + name="deep_gemm_fp8_fp8_bf16_nt", + infer_meta=deep_gemm_fp8_fp8_bf16_nt_infer_meta, + input_names=["x", "x_scale_tensor", "layer_weight", "layer_weight_scale_inv", "linear_out_empty"], + output_names=["linear_out"], + inplace_map={}, +) +def deep_gemm_fp8_fp8_bf16_nt( + x: paddle.Tensor, + x_scale_tensor: paddle.Tensor, + layer_weight: paddle.Tensor, + layer_weight_scale_inv: paddle.Tensor, + linear_out: paddle.Tensor, + layer_output_size: int, +): + from fastdeploy.model_executor.ops.gpu import deep_gemm + + deep_gemm.gemm_fp8_fp8_bf16_nt( + (x, x_scale_tensor), + (layer_weight, layer_weight_scale_inv), + linear_out, + ) + + return linear_out + + class BlockWiseFP8LinearMethod(QuantMethodBase): """ block wise quantization method for linear @@ -230,12 +268,8 @@ def apply(self, layer, x): x, self.quant_config.weight_block_size[0] ) linear_out = paddle.empty((x.shape[0], layer.output_size), dtype=paddle.bfloat16) - from fastdeploy.model_executor.ops.gpu import deep_gemm - - deep_gemm.gemm_fp8_fp8_bf16_nt( - (x, x_scale_tensor), - (layer.weight, layer.weight_scale_inv), - linear_out, + linear_out = deep_gemm_fp8_fp8_bf16_nt( + x, x_scale_tensor, layer.weight, layer.weight_scale_inv, linear_out, layer.output_size ) if layer.with_bias: linear_out = paddle.add(linear_out, layer.bias) diff --git a/fastdeploy/utils.py b/fastdeploy/utils.py index f8330c35867..240feb11291 100644 --- a/fastdeploy/utils.py +++ b/fastdeploy/utils.py @@ -1229,3 +1229,18 @@ def to_tensor(tasks: List[Any]): multimodal_inputs[key] = [paddle.to_tensor(v) for v in value] except Exception as e: llm_logger.warning(f"Tensor conversion failed: {type(e).__name__}: {e}") + + +def do_nothing(*args, **kwargs): + def decorator(func): + return func + + return decorator + + +if hasattr(paddle.static, "register_op"): + from paddle.static import register_op +else: + register_op = do_nothing + +register_custom_python_op = register_op diff --git a/tests/ce/deploy/ernie45t_21b_cinn_fp8.yaml b/tests/ce/deploy/ernie45t_21b_cinn_fp8.yaml new file mode 100644 index 00000000000..d02af4a6c6f --- /dev/null +++ b/tests/ce/deploy/ernie45t_21b_cinn_fp8.yaml @@ -0,0 +1,8 @@ +max_model_len: 32768 +max_num_seqs: 128 +tensor_parallel_size: 1 +quantization: block_wise_fp8 +graph_optimization_config: + graph_opt_level: 2 + sot_warmup_sizes: [2,16,32,64] + use_cudagraph: True diff --git a/tests/ce/deploy/ernie45t_21b_cinn.yaml b/tests/ce/deploy/ernie45t_21b_cinn_wint4.yaml similarity index 100% rename from tests/ce/deploy/ernie45t_21b_cinn.yaml rename to tests/ce/deploy/ernie45t_21b_cinn_wint4.yaml diff --git a/tests/ce/deploy/ernie45t_21b_sot_fp8.yaml b/tests/ce/deploy/ernie45t_21b_sot_fp8.yaml new file mode 100644 index 00000000000..269afb10047 --- /dev/null +++ b/tests/ce/deploy/ernie45t_21b_sot_fp8.yaml @@ -0,0 +1,8 @@ +max_model_len: 32768 +max_num_seqs: 128 +tensor_parallel_size: 1 +quantization: block_wise_fp8 +graph_optimization_config: + graph_opt_level: 1 + sot_warmup_sizes: [2,16,32,64] + use_cudagraph: True diff --git a/tests/ce/deploy/ernie45t_21b_sot.yaml b/tests/ce/deploy/ernie45t_21b_sot_wint4.yaml similarity index 100% rename from tests/ce/deploy/ernie45t_21b_sot.yaml rename to tests/ce/deploy/ernie45t_21b_sot_wint4.yaml From 6b0fba82942d6c2ba4a3623974495f9a953bea75 Mon Sep 17 00:00:00 2001 From: Divano Date: Wed, 24 Dec 2025 15:35:17 +0800 Subject: [PATCH 5/7] Update run.sh --- tests/ce/stable_cases/run.sh | 76 +++++++++++++++++++++++++++++------- 1 file changed, 62 insertions(+), 14 deletions(-) diff --git a/tests/ce/stable_cases/run.sh b/tests/ce/stable_cases/run.sh index 81197253ba5..defb56564b0 100644 --- a/tests/ce/stable_cases/run.sh +++ b/tests/ce/stable_cases/run.sh @@ -1,18 +1,18 @@ #!/bin/bash # ================== Configuration Parameters ================== -FD_API_PORT=${FD_API_PORT:-8000} -FD_ENGINE_QUEUE_PORT=${FD_ENGINE_QUEUE_PORT:-8001} -FD_METRICS_PORT=${FD_METRICS_PORT:-8002} -FD_CACHE_QUEUE_PORT=${FD_CACHE_QUEUE_PORT:-8003} +FD_API_PORT=${FD_API_PORT:-8180} +FD_ENGINE_QUEUE_PORT=${FD_ENGINE_QUEUE_PORT:-8181} +FD_METRICS_PORT=${FD_METRICS_PORT:-8182} +FD_CACHE_QUEUE_PORT=${FD_CACHE_QUEUE_PORT:-8183} HOST="0.0.0.0" PORT="${FD_API_PORT}" # 这里需要配合启动脚本那个URL PORT BASE_URL="http://$HOST:$PORT" -TOTAL_ROUNDS=30 -CHAT_REQUESTS_PER_ROUND=1 +TOTAL_ROUNDS=6 +CHAT_REQUESTS_PER_ROUND=3 export CUDA_VISIBLE_DEVICES=0,1 MAX_MEMORY_MB=10240 # 10GB @@ -79,24 +79,72 @@ check_gpu_memory() { local gpu_ids gpu_ids=($(get_visible_gpu_ids)) + echo "========== GPU Memory Check ==========" + echo "CUDA_VISIBLE_DEVICES = $CUDA_VISIBLE_DEVICES" + echo "MAX_MEMORY_MB = $MAX_MEMORY_MB" + echo "======================================" + if [ ${#gpu_ids[@]} -eq 0 ]; then echo "Assertion failed: No valid GPU IDs in CUDA_VISIBLE_DEVICES='$CUDA_VISIBLE_DEVICES'" >&2 exit 1 fi for gpu_id in "${gpu_ids[@]}"; do - local memory_used - memory_used=$(nvidia-smi -i "$gpu_id" --query-gpu=memory.used --format=csv,noheader,nounits 2>/dev/null) || \ - assert_success $? "Failed to query GPU $gpu_id memory usage" - - if ! [[ "$memory_used" =~ ^[0-9]+ ]]; then - echo "Assertion failed: Invalid memory value for GPU $gpu_id: $memory_used" >&2 + echo + echo "---- GPU $gpu_id ----" + + # Query summary + local summary + summary=$(nvidia-smi -i "$gpu_id" \ + --query-gpu=index,name,memory.total,memory.used,memory.free,utilization.gpu \ + --format=csv,noheader,nounits) || { + echo "Failed to query GPU $gpu_id summary" >&2 + exit 1 + } + + # Parse fields + IFS=',' read -r idx name mem_total mem_used mem_free util <<< "$summary" + + echo "GPU $idx: $name" + echo "Total Memory : ${mem_total} MB" + echo "Used Memory : ${mem_used} MB" + echo "Free Memory : ${mem_free} MB" + echo "GPU Util : ${util} %" + + # --- Hard assertions --- + assert_true "$(( mem_used <= MAX_MEMORY_MB ))" \ + "GPU $gpu_id memory.used ${mem_used} MB > MAX_MEMORY_MB ${MAX_MEMORY_MB} MB" + + # --- Soft safety check: usage ratio --- + local used_ratio + used_ratio=$(( mem_used * 100 / mem_total )) + + echo "Used Ratio : ${used_ratio} %" + + if [ "$used_ratio" -gt 90 ]; then + echo "Assertion failed: GPU $gpu_id memory usage > 90% (${used_ratio}%)" >&2 exit 1 fi - assert_true "$(( memory_used <= MAX_MEMORY_MB ))" \ - "GPU $gpu_id memory $memory_used MB > $MAX_MEMORY_MB MB" + # --- Process-level attribution --- + echo "Processes on GPU $gpu_id:" + local proc_info + proc_info=$(nvidia-smi -i "$gpu_id" \ + --query-compute-apps=pid,process_name,used_memory \ + --format=csv,noheader,nounits) + + if [ -z "$proc_info" ]; then + echo " (No active compute processes)" + else + echo "$proc_info" | while IFS=',' read -r pid pname pmem; do + echo " PID=$pid NAME=$pname MEM=${pmem}MB" + done + fi + + echo "GPU $gpu_id memory check PASSED" done + + echo "========== GPU Memory Check DONE ==========" } # ==================================================== From c7ab32d1548eef8caa8e5ae4129ad43f3a8a41cb Mon Sep 17 00:00:00 2001 From: chen <103103266+ckl117@users.noreply.github.com> Date: Wed, 24 Dec 2025 16:49:20 +0800 Subject: [PATCH 6/7] check (#5736) --- custom_ops/gpu_ops/update_inputs_v1.cu | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/custom_ops/gpu_ops/update_inputs_v1.cu b/custom_ops/gpu_ops/update_inputs_v1.cu index 64230ae2565..7dd786dabfb 100644 --- a/custom_ops/gpu_ops/update_inputs_v1.cu +++ b/custom_ops/gpu_ops/update_inputs_v1.cu @@ -50,6 +50,11 @@ __global__ void update_inputs_kernel_v1(bool* not_need_stop, } if (thread_idx < bsz) { if (stop_flag_now) { + // chuned when max_tokens=1 + if (seq_lens_this_time[thread_idx] + seq_lens_decoder[thread_idx] < + prompt_lens[thread_idx]) { + topk_ids[thread_idx] = -1; + } seq_lens_this_time[thread_idx] = 0; // stop at next step seq_lens_decoder[thread_idx] = 0; seq_lens_encoder[thread_idx] = 0; From e75f93d3024b245d448d943fd369ea69aa8c28cd Mon Sep 17 00:00:00 2001 From: YuBaoku <49938469+EmmonsCurse@users.noreply.github.com> Date: Wed, 24 Dec 2025 17:08:40 +0800 Subject: [PATCH 7/7] [CI] Refactor RL tests to reuse test_metrics (#5741) --- .../Qwen2_5_VL/test_Qwen2_5_VL_serving.py | 476 ------------------ .../{_test_metrics.py => test_metrics.py} | 2 - 2 files changed, 478 deletions(-) delete mode 100644 tests/ci_use/Qwen2_5_VL/test_Qwen2_5_VL_serving.py rename tests/ci_use/metrics/{_test_metrics.py => test_metrics.py} (99%) diff --git a/tests/ci_use/Qwen2_5_VL/test_Qwen2_5_VL_serving.py b/tests/ci_use/Qwen2_5_VL/test_Qwen2_5_VL_serving.py deleted file mode 100644 index 98ed5567833..00000000000 --- a/tests/ci_use/Qwen2_5_VL/test_Qwen2_5_VL_serving.py +++ /dev/null @@ -1,476 +0,0 @@ -# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import json -import os -import re -import signal -import subprocess -import sys -import time - -import openai -import pytest -import requests - -tests_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) -sys.path.insert(0, tests_dir) - -from e2e.utils.serving_utils import ( - FD_API_PORT, - FD_CACHE_QUEUE_PORT, - FD_ENGINE_QUEUE_PORT, - FD_METRICS_PORT, - clean_ports, - is_port_open, -) - - -@pytest.fixture(scope="session", autouse=True) -def setup_and_run_server(): - """ - Pytest fixture that runs once per test session: - - Cleans ports before tests - - Starts the API server as a subprocess - - Waits for server port to open (up to 30 seconds) - - Tears down server after all tests finish - """ - print("Pre-test port cleanup...") - clean_ports() - - model_path = "/ModelData/Qwen2.5-VL-7B-Instruct" - - log_path = "server.log" - limit_mm_str = json.dumps({"image": 100, "video": 100}) - - cmd = [ - sys.executable, - "-m", - "fastdeploy.entrypoints.openai.api_server", - "--model", - model_path, - "--port", - str(FD_API_PORT), - # "--tensor-parallel-size", - # "2", - "--engine-worker-queue-port", - str(FD_ENGINE_QUEUE_PORT), - "--metrics-port", - str(FD_METRICS_PORT), - "--cache-queue-port", - str(FD_CACHE_QUEUE_PORT), - "--enable-mm", - "--max-model-len", - "32768", - "--max-num-batched-tokens", - "384", - "--max-num-seqs", - "128", - "--limit-mm-per-prompt", - limit_mm_str, - ] - - print(cmd) - # Start subprocess in new process group - with open(log_path, "w") as logfile: - process = subprocess.Popen( - cmd, - stdout=logfile, - stderr=subprocess.STDOUT, - start_new_session=True, # Enables killing full group via os.killpg - ) - - print(f"Started API server with pid {process.pid}") - # Wait up to 10 minutes for API server to be ready - for _ in range(10 * 60): - if is_port_open("127.0.0.1", FD_API_PORT): - print(f"API server is up on port {FD_API_PORT}") - break - time.sleep(1) - else: - print("[TIMEOUT] API server failed to start in 10 minutes. Cleaning up...") - try: - os.killpg(process.pid, signal.SIGTERM) - except Exception as e: - print(f"Failed to kill process group: {e}") - raise RuntimeError(f"API server did not start on port {FD_API_PORT}") - - yield # Run tests - - print("\n===== Post-test server cleanup... =====") - try: - os.killpg(process.pid, signal.SIGTERM) - print(f"API server (pid={process.pid}) terminated") - except Exception as e: - print(f"Failed to terminate API server: {e}") - - -@pytest.fixture(scope="session") -def api_url(request): - """ - Returns the API endpoint URL for chat completions. - """ - return f"http://0.0.0.0:{FD_API_PORT}/v1/chat/completions" - - -@pytest.fixture(scope="session") -def metrics_url(request): - """ - Returns the metrics endpoint URL. - """ - return f"http://0.0.0.0:{FD_METRICS_PORT}/metrics" - - -@pytest.fixture -def headers(): - """ - Returns common HTTP request headers. - """ - return {"Content-Type": "application/json"} - - -@pytest.fixture -def consistent_payload(): - """ - Returns a fixed payload for consistency testing, - including a fixed random seed and temperature. - """ - return { - "messages": [ - { - "role": "user", - "content": [ - { - "type": "image_url", - "image_url": { - "url": "https://ku.baidu-int.com/vk-assets-ltd/space/2024/09/13/933d1e0a0760498e94ec0f2ccee865e0", - "detail": "high", - }, - }, - {"type": "text", "text": "请描述图片内容"}, - ], - } - ], - "temperature": 0.8, - "top_p": 0, # fix top_p to reduce randomness - "seed": 13, # fixed random seed - } - - -# ========================== -# Consistency test for repeated runs with fixed payload -# ========================== -def test_consistency_between_runs(api_url, headers, consistent_payload): - """ - Test that result is same as the base result. - """ - # request - resp1 = requests.post(api_url, headers=headers, json=consistent_payload) - assert resp1.status_code == 200 - result1 = resp1.json() - content1 = result1["choices"][0]["message"]["content"] - file_res_temp = "Qwen2.5-VL-7B-Instruct-temp" - f_o = open(file_res_temp, "a") - f_o.writelines(content1) - f_o.close() - - # base result - content2 = """这张图片展示了一群人在进行手工艺活动。前景中有两个孩子和一个成年人,他们似乎在制作或展示某种手工艺品。成年人手里拿着一个扇子,上面有彩色的图案,可能是通过某种方式绘制或涂鸦而成。孩子们看起来很专注,可能是在观察或参与这个过程。 - -背景中还有其他几个人,其中一个人穿着粉色的衣服,背对着镜头。整个场景看起来像是在一个室内环境中,光线充足,氛围轻松愉快。""" - - # Verify that result is same as the base result - assert content1 == content2 - - -# ========================== -# OpenAI Client Chat Completion Test -# ========================== - - -@pytest.fixture -def openai_client(): - ip = "0.0.0.0" - service_http_port = str(FD_API_PORT) - client = openai.Client( - base_url=f"http://{ip}:{service_http_port}/v1", - api_key="EMPTY_API_KEY", - ) - return client - - -# Non-streaming test -def test_non_streaming_chat(openai_client): - """Test non-streaming chat functionality with the local service""" - response = openai_client.chat.completions.create( - model="default", - messages=[ - { - "role": "system", - "content": "You are a helpful AI assistant.", - }, # system不是必需,可选 - { - "role": "user", - "content": [ - { - "type": "image_url", - "image_url": { - "url": "https://ku.baidu-int.com/vk-assets-ltd/space/2024/09/13/933d1e0a0760498e94ec0f2ccee865e0", - "detail": "high", - }, - }, - {"type": "text", "text": "请描述图片内容"}, - ], - }, - ], - temperature=1, - max_tokens=53, - stream=False, - ) - - assert hasattr(response, "choices") - assert len(response.choices) > 0 - assert hasattr(response.choices[0], "message") - assert hasattr(response.choices[0].message, "content") - - -# Streaming test -def test_streaming_chat(openai_client, capsys): - """Test streaming chat functionality with the local service""" - response = openai_client.chat.completions.create( - model="default", - messages=[ - { - "role": "system", - "content": "You are a helpful AI assistant.", - }, # system不是必需,可选 - {"role": "user", "content": "List 3 countries and their capitals."}, - { - "role": "assistant", - "content": "China(Beijing), France(Paris), Australia(Canberra).", - }, - { - "role": "user", - "content": [ - { - "type": "image_url", - "image_url": { - "url": "https://ku.baidu-int.com/vk-assets-ltd/space/2024/09/13/933d1e0a0760498e94ec0f2ccee865e0", - "detail": "high", - }, - }, - {"type": "text", "text": "请描述图片内容"}, - ], - }, - ], - temperature=1, - max_tokens=512, - stream=True, - ) - - output = [] - for chunk in response: - if hasattr(chunk.choices[0], "delta") and hasattr(chunk.choices[0].delta, "content"): - output.append(chunk.choices[0].delta.content) - assert len(output) > 2 - - -# ========================== -# OpenAI Client additional chat/completions test -# ========================== - - -def test_non_streaming_chat_with_return_token_ids(openai_client, capsys): - """ - Test return_token_ids option in non-streaming chat functionality with the local service - """ - # 设定 return_token_ids - response = openai_client.chat.completions.create( - model="default", - messages=[ - {"role": "system", "content": "You are a helpful AI assistant."}, # system不是必需,可选 - { - "role": "user", - "content": [ - { - "type": "image_url", - "image_url": { - "url": "https://paddlenlp.bj.bcebos.com/datasets/paddlemix/demo_images/example2.jpg", - "detail": "high", - }, - }, - {"type": "text", "text": "请描述图片内容"}, - ], - }, - ], - temperature=1, - max_tokens=53, - extra_body={"return_token_ids": True}, - stream=False, - ) - assert hasattr(response, "choices") - assert len(response.choices) > 0 - assert hasattr(response.choices[0], "message") - assert hasattr(response.choices[0].message, "prompt_token_ids") - assert isinstance(response.choices[0].message.prompt_token_ids, list) - assert hasattr(response.choices[0].message, "completion_token_ids") - assert isinstance(response.choices[0].message.completion_token_ids, list) - - # 不设定 return_token_ids - response = openai_client.chat.completions.create( - model="default", - messages=[ - {"role": "system", "content": "You are a helpful AI assistant."}, # system不是必需,可选 - { - "role": "user", - "content": [ - { - "type": "image_url", - "image_url": { - "url": "https://paddlenlp.bj.bcebos.com/datasets/paddlemix/demo_images/example2.jpg", - "detail": "high", - }, - }, - {"type": "text", "text": "请描述图片内容"}, - ], - }, - ], - temperature=1, - max_tokens=53, - extra_body={"return_token_ids": False}, - stream=False, - ) - assert hasattr(response, "choices") - assert len(response.choices) > 0 - assert hasattr(response.choices[0], "message") - assert hasattr(response.choices[0].message, "prompt_token_ids") - assert response.choices[0].message.prompt_token_ids is None - assert hasattr(response.choices[0].message, "completion_token_ids") - assert response.choices[0].message.completion_token_ids is None - - -def test_streaming_chat_with_return_token_ids(openai_client, capsys): - """ - Test return_token_ids option in streaming chat functionality with the local service - """ - # enable return_token_ids - response = openai_client.chat.completions.create( - model="default", - messages=[ - {"role": "system", "content": "You are a helpful AI assistant."}, # system不是必需,可选 - { - "role": "user", - "content": [ - { - "type": "image_url", - "image_url": { - "url": "https://paddlenlp.bj.bcebos.com/datasets/paddlemix/demo_images/example2.jpg", - "detail": "high", - }, - }, - {"type": "text", "text": "请描述图片内容"}, - ], - }, - ], - temperature=1, - max_tokens=53, - extra_body={"return_token_ids": True}, - stream=True, - ) - is_first_chunk = True - for chunk in response: - assert hasattr(chunk, "choices") - assert len(chunk.choices) > 0 - assert hasattr(chunk.choices[0], "delta") - assert hasattr(chunk.choices[0].delta, "prompt_token_ids") - assert hasattr(chunk.choices[0].delta, "completion_token_ids") - if is_first_chunk: - is_first_chunk = False - assert isinstance(chunk.choices[0].delta.prompt_token_ids, list) - assert chunk.choices[0].delta.completion_token_ids is None - else: - assert chunk.choices[0].delta.prompt_token_ids is None - assert isinstance(chunk.choices[0].delta.completion_token_ids, list) - - # disable return_token_ids - response = openai_client.chat.completions.create( - model="default", - messages=[ - {"role": "system", "content": "You are a helpful AI assistant."}, # system不是必需,可选 - { - "role": "user", - "content": [ - { - "type": "image_url", - "image_url": { - "url": "https://paddlenlp.bj.bcebos.com/datasets/paddlemix/demo_images/example2.jpg", - "detail": "high", - }, - }, - {"type": "text", "text": "请描述图片内容"}, - ], - }, - ], - temperature=1, - max_tokens=53, - extra_body={"return_token_ids": False}, - stream=True, - ) - for chunk in response: - assert hasattr(chunk, "choices") - assert len(chunk.choices) > 0 - assert hasattr(chunk.choices[0], "delta") - assert hasattr(chunk.choices[0].delta, "prompt_token_ids") - assert chunk.choices[0].delta.prompt_token_ids is None - assert hasattr(chunk.choices[0].delta, "completion_token_ids") - assert chunk.choices[0].delta.completion_token_ids is None - - -def test_profile_reset_block_num(): - """测试profile reset_block_num功能,与baseline diff不能超过15%""" - log_file = "./log/config.log" - baseline = 30000 - - if not os.path.exists(log_file): - pytest.fail(f"Log file not found: {log_file}") - - with open(log_file, "r") as f: - log_lines = f.readlines() - - target_line = None - for line in log_lines: - if "Reset block num" in line: - target_line = line.strip() - break - - if target_line is None: - pytest.fail("日志中没有Reset block num信息") - - match = re.search(r"total_block_num:(\d+)", target_line) - if not match: - pytest.fail(f"Failed to extract total_block_num from line: {target_line}") - - try: - actual_value = int(match.group(1)) - except ValueError: - pytest.fail(f"Invalid number format: {match.group(1)}") - - lower_bound = baseline * (1 - 0.15) - upper_bound = baseline * (1 + 0.15) - print(f"Reset total_block_num: {actual_value}. baseline: {baseline}") - - assert lower_bound <= actual_value <= upper_bound, ( - f"Reset total_block_num {actual_value} 与 baseline {baseline} diff需要在5%以内" - f"Allowed range: [{lower_bound:.1f}, {upper_bound:.1f}]" - ) diff --git a/tests/ci_use/metrics/_test_metrics.py b/tests/ci_use/metrics/test_metrics.py similarity index 99% rename from tests/ci_use/metrics/_test_metrics.py rename to tests/ci_use/metrics/test_metrics.py index 21a78606be2..0fb46c54f43 100644 --- a/tests/ci_use/metrics/_test_metrics.py +++ b/tests/ci_use/metrics/test_metrics.py @@ -84,8 +84,6 @@ def setup_and_run_server(): "32768", "--max-num-seqs", "1", - "--quantization", - "wint8", "--gpu-memory-utilization", "0.9", "--load-strategy",