diff --git a/fastdeploy/engine/common_engine.py b/fastdeploy/engine/common_engine.py index 136f1950828..227f211f3bb 100644 --- a/fastdeploy/engine/common_engine.py +++ b/fastdeploy/engine/common_engine.py @@ -683,7 +683,6 @@ def _schedule_request_to_worker_v1(self): def _fetch_request(): try: nonlocal is_fetching - is_fetching = True num_prefill_batch = min( int(self.resource_manager.available_batch()), self.cfg.max_prefill_batch, @@ -767,8 +766,8 @@ def _fetch_request(): need_check_req_ids = [task.request_id for task in tasks] while need_check_req_ids: req_ids = self.engine_worker_queue.get_finished_add_cache_task_req() - self.llm_logger.info(f"get_finished_add_cache_task_req: {req_ids}") if req_ids: + self.llm_logger.info("get_finished_add_cache_task_req: %s", req_ids) for req_id in req_ids: assert req_id in need_check_req_ids need_check_req_ids.remove(req_id) @@ -797,6 +796,7 @@ def _fetch_request(): continue if self.cfg.scheduler_config.splitwise_role != "mixed": if not is_fetching: + is_fetching = True get_request_pool.submit(_fetch_request) else: @@ -807,6 +807,7 @@ def _fetch_request(): ): # Check if the thread pool is still available to avoid submitting tasks to a shutdown thread pool. try: + is_fetching = True get_request_pool.submit(_fetch_request) except RuntimeError as e: if "shutdown" in str(e): diff --git a/fastdeploy/engine/request.py b/fastdeploy/engine/request.py index acd0f7004a0..0440d4b9fe4 100644 --- a/fastdeploy/engine/request.py +++ b/fastdeploy/engine/request.py @@ -25,6 +25,7 @@ import numpy as np from typing_extensions import TypeVar +from fastdeploy import envs from fastdeploy.engine.pooling_params import PoolingParams from fastdeploy.engine.sampling_params import SamplingParams from fastdeploy.entrypoints.openai.protocol import ToolCall @@ -301,6 +302,17 @@ def to_dict(self) -> dict: "audio_end": self.audio_end, "ic_req_data": self.ic_req_data, } + + # During multimodal PD separation, position_ids are required + if isinstance(data.get("multimodal_inputs"), dict): + allowed_keys = {"position_ids"} + if not envs.ENABLE_V1_KVCACHE_SCHEDULER: + allowed_keys.update(["input_ids", "token_type_ids", "images", "image_type_ids", "grid_thw"]) + + keys_to_remove = set(data["multimodal_inputs"]) - allowed_keys + for key in keys_to_remove: + data["multimodal_inputs"].pop(key) + add_params = [ "guided_json", "guided_regex", diff --git a/fastdeploy/engine/sched/resource_manager_v1.py b/fastdeploy/engine/sched/resource_manager_v1.py index fbe30d24bcf..2c9cc51d254 100644 --- a/fastdeploy/engine/sched/resource_manager_v1.py +++ b/fastdeploy/engine/sched/resource_manager_v1.py @@ -414,7 +414,6 @@ def _get_num_new_tokens(self, request, token_budget): ): input_ids_lst = request.prompt_token_ids + request.output_token_ids input_ids = paddle.to_tensor(input_ids_lst, dtype="int64") - input_ids = paddle.to_tensor(input_ids_lst, dtype="int64") image_patch_id = inputs["image_patch_id"] if request.multimodal_img_boundaries is None: diff --git a/fastdeploy/router/router.py b/fastdeploy/router/router.py index 48bac8633c7..23d40e00e0b 100644 --- a/fastdeploy/router/router.py +++ b/fastdeploy/router/router.py @@ -32,7 +32,7 @@ class RouterArgs: """ Host address to bind the router server """ - port: str = "9000" + port: int = 9000 """ Port to bind the router server. """ @@ -55,7 +55,7 @@ def add_cli_args(parser: FlexibleArgumentParser) -> FlexibleArgumentParser: ) parser.add_argument( "--port", - type=str, + type=int, default=RouterArgs.port, help="Port number to bind the router server", ) diff --git a/fastdeploy/scheduler/splitwise_scheduler.py b/fastdeploy/scheduler/splitwise_scheduler.py index 94c947ea49f..41c826afa53 100644 --- a/fastdeploy/scheduler/splitwise_scheduler.py +++ b/fastdeploy/scheduler/splitwise_scheduler.py @@ -17,6 +17,7 @@ import copy import hashlib import math +import pickle import random import threading import time @@ -545,7 +546,7 @@ def schedule(self, req, pnodes, dnodes, mnodes, group=""): pkey, dkey = f"ReqQ_{pnode.nodeid}", f"ReqQ_{dnode.nodeid}" req_dict = req.to_dict() req_dict["group"] = group - req_str = orjson.dumps(req_dict) + req_str = pickle.dumps(req_dict, protocol=5) # logger.info(f"Schedule Req {req_str}") self.client.lpush(dkey, req_str) self.client.lpush(pkey, req_str) @@ -795,7 +796,7 @@ def select_writer(req): reqs = [ret[1]] for req_str in reqs: - req = orjson.loads(req_str) + req = pickle.loads(req_str) group = req.get("group", "") req = Request.from_dict(req) writer_idx = select_writer(req) diff --git a/fastdeploy/splitwise/splitwise_connector.py b/fastdeploy/splitwise/splitwise_connector.py index 9c13f26d823..a668ff66ea4 100644 --- a/fastdeploy/splitwise/splitwise_connector.py +++ b/fastdeploy/splitwise/splitwise_connector.py @@ -14,7 +14,7 @@ # limitations under the License. """ -import json +import pickle import time import traceback from concurrent.futures import ThreadPoolExecutor @@ -94,12 +94,12 @@ def start_receiver(self): if not socks: continue else: - self.logger.debug(f"receive {socks}") + self.logger.debug("receive %s", socks) frames = self.router_socket.recv_multipart() - self.logger.debug(f"frames: {frames}") - message = frames[-1] - self.io_executor.submit(self._process_message, message) + self.logger.debug("frames: %s", frames) + # message = frames[-1] + self.io_executor.submit(self._process_message, frames) time.sleep(0.001) else: time.sleep(5) @@ -150,7 +150,7 @@ def _send_message(self, addr, msg_type: str, payload): try: sock = self._get_push_socket(addr) - sock.send_multipart([b"", message]) + sock.send_multipart(message) self.logger.info(f"Sent {msg_type} to {addr}") @@ -387,21 +387,34 @@ def _serialize_message(self, msg_type: str, payload) -> bytes: if msg_type == "decode" or msg_type == "prefill": payload = [output.to_dict() for output in payload] - json_data = json.dumps({"type": msg_type, "payload": payload}).encode("utf-8") - return json_data + # Prepare data + data = {"type": msg_type, "payload": payload} - def _deserialize_message(self, data: bytes): + # Pickle protocol 5 supports extracting large arrays (buffers) + buffers = [] + # Serialize main data, strip large arrays as references into buffers + main_bytes = pickle.dumps(data, protocol=5, buffer_callback=buffers.append) + return [main_bytes] + buffers - # JSON反序列化 - message = json.loads(data.decode("utf-8")) + def _deserialize_message(self, frames: List[bytes]): + # identity = frames[0] + + if len(frames) < 2: + raise ValueError(f"Received frames too short, missing payload {len(frames)}") + + main_bytes = frames[1] + buffers = frames[2:] + + # Restore data, pickle will automatically fill buffers back into numpy arrays + message = pickle.loads(main_bytes, buffers=buffers) return message["type"], message["payload"] - def _process_message(self, message: bytes): + def _process_message(self, frames: bytes): """ process message """ try: - msg_type, payload = self._deserialize_message(message) + msg_type, payload = self._deserialize_message(frames) self.logger.info(f"{msg_type}") if msg_type == "prefill": diff --git a/fastdeploy/worker/gpu_model_runner.py b/fastdeploy/worker/gpu_model_runner.py index 81f44ff813f..71c5748de34 100644 --- a/fastdeploy/worker/gpu_model_runner.py +++ b/fastdeploy/worker/gpu_model_runner.py @@ -752,6 +752,15 @@ def insert_prefill_inputs(self, req_dicts: List[Request], num_running_requests: dtype="int64", ) self.seq_lens_this_time_buffer[idx : idx + 1] = num_prefill_send_token + if self.enable_mm: + if "position_ids" in request.multimodal_inputs and request.multimodal_inputs["position_ids"] is not None: + position_ids = paddle.to_tensor( + request.multimodal_inputs["position_ids"], + dtype="int64", + ) + self.share_inputs["rope_emb"][idx : idx + 1, :] = self.prepare_rope3d( + position_ids, [request.get("max_tokens", 2048)], [0, position_ids.shape[0]] + )[0] else: self.share_inputs["pre_ids"][idx : idx + 1] = -1 self.share_inputs["step_idx"][idx : idx + 1] = 0 @@ -2604,7 +2613,7 @@ def _preprocess_mm_task(self, one: dict) -> None: token_type_ids = one["token_type_ids"][np.newaxis, :] token_type_ids = paddle.to_tensor(token_type_ids, dtype=paddle.int64) - if one["images"] is not None: + if "images" in one and one["images"] is not None: image_type_ids = one["image_type_ids"][np.newaxis, :] images = one["images"] image_type_ids = paddle.to_tensor(image_type_ids, dtype=paddle.int64)