Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions fastdeploy/engine/common_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -683,7 +683,6 @@ def _schedule_request_to_worker_v1(self):
def _fetch_request():
try:
nonlocal is_fetching
is_fetching = True
num_prefill_batch = min(
int(self.resource_manager.available_batch()),
self.cfg.max_prefill_batch,
Expand Down Expand Up @@ -767,8 +766,8 @@ def _fetch_request():
need_check_req_ids = [task.request_id for task in tasks]
while need_check_req_ids:
req_ids = self.engine_worker_queue.get_finished_add_cache_task_req()
self.llm_logger.info(f"get_finished_add_cache_task_req: {req_ids}")
if req_ids:
self.llm_logger.info("get_finished_add_cache_task_req: %s", req_ids)
for req_id in req_ids:
assert req_id in need_check_req_ids
need_check_req_ids.remove(req_id)
Expand Down Expand Up @@ -797,6 +796,7 @@ def _fetch_request():
continue
if self.cfg.scheduler_config.splitwise_role != "mixed":
if not is_fetching:
is_fetching = True
get_request_pool.submit(_fetch_request)

else:
Expand All @@ -807,6 +807,7 @@ def _fetch_request():
):
# Check if the thread pool is still available to avoid submitting tasks to a shutdown thread pool.
try:
is_fetching = True
get_request_pool.submit(_fetch_request)
except RuntimeError as e:
if "shutdown" in str(e):
Expand Down
12 changes: 12 additions & 0 deletions fastdeploy/engine/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import numpy as np
from typing_extensions import TypeVar

from fastdeploy import envs
from fastdeploy.engine.pooling_params import PoolingParams
from fastdeploy.engine.sampling_params import SamplingParams
from fastdeploy.entrypoints.openai.protocol import ToolCall
Expand Down Expand Up @@ -301,6 +302,17 @@ def to_dict(self) -> dict:
"audio_end": self.audio_end,
"ic_req_data": self.ic_req_data,
}

# During multimodal PD separation, position_ids are required
if isinstance(data.get("multimodal_inputs"), dict):
allowed_keys = {"position_ids"}
if not envs.ENABLE_V1_KVCACHE_SCHEDULER:
allowed_keys.update(["input_ids", "token_type_ids", "images", "image_type_ids", "grid_thw"])

keys_to_remove = set(data["multimodal_inputs"]) - allowed_keys
for key in keys_to_remove:
data["multimodal_inputs"].pop(key)

add_params = [
"guided_json",
"guided_regex",
Expand Down
1 change: 0 additions & 1 deletion fastdeploy/engine/sched/resource_manager_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,6 @@ def _get_num_new_tokens(self, request, token_budget):
):
input_ids_lst = request.prompt_token_ids + request.output_token_ids
input_ids = paddle.to_tensor(input_ids_lst, dtype="int64")
input_ids = paddle.to_tensor(input_ids_lst, dtype="int64")
image_patch_id = inputs["image_patch_id"]

if request.multimodal_img_boundaries is None:
Expand Down
4 changes: 2 additions & 2 deletions fastdeploy/router/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class RouterArgs:
"""
Host address to bind the router server
"""
port: str = "9000"
port: int = 9000
"""
Port to bind the router server.
"""
Expand All @@ -55,7 +55,7 @@ def add_cli_args(parser: FlexibleArgumentParser) -> FlexibleArgumentParser:
)
parser.add_argument(
"--port",
type=str,
type=int,
default=RouterArgs.port,
help="Port number to bind the router server",
)
Expand Down
5 changes: 3 additions & 2 deletions fastdeploy/scheduler/splitwise_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import copy
import hashlib
import math
import pickle
import random
import threading
import time
Expand Down Expand Up @@ -545,7 +546,7 @@ def schedule(self, req, pnodes, dnodes, mnodes, group=""):
pkey, dkey = f"ReqQ_{pnode.nodeid}", f"ReqQ_{dnode.nodeid}"
req_dict = req.to_dict()
req_dict["group"] = group
req_str = orjson.dumps(req_dict)
req_str = pickle.dumps(req_dict, protocol=5)
# logger.info(f"Schedule Req {req_str}")
self.client.lpush(dkey, req_str)
self.client.lpush(pkey, req_str)
Expand Down Expand Up @@ -795,7 +796,7 @@ def select_writer(req):
reqs = [ret[1]]

for req_str in reqs:
req = orjson.loads(req_str)
req = pickle.loads(req_str)
group = req.get("group", "")
req = Request.from_dict(req)
writer_idx = select_writer(req)
Expand Down
39 changes: 26 additions & 13 deletions fastdeploy/splitwise/splitwise_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# limitations under the License.
"""

import json
import pickle
import time
import traceback
from concurrent.futures import ThreadPoolExecutor
Expand Down Expand Up @@ -94,12 +94,12 @@ def start_receiver(self):
if not socks:
continue
else:
self.logger.debug(f"receive {socks}")
self.logger.debug("receive %s", socks)

frames = self.router_socket.recv_multipart()
self.logger.debug(f"frames: {frames}")
message = frames[-1]
self.io_executor.submit(self._process_message, message)
self.logger.debug("frames: %s", frames)
# message = frames[-1]
self.io_executor.submit(self._process_message, frames)
time.sleep(0.001)
else:
time.sleep(5)
Expand Down Expand Up @@ -150,7 +150,7 @@ def _send_message(self, addr, msg_type: str, payload):
try:

sock = self._get_push_socket(addr)
sock.send_multipart([b"", message])
sock.send_multipart(message)

self.logger.info(f"Sent {msg_type} to {addr}")

Expand Down Expand Up @@ -387,21 +387,34 @@ def _serialize_message(self, msg_type: str, payload) -> bytes:
if msg_type == "decode" or msg_type == "prefill":
payload = [output.to_dict() for output in payload]

json_data = json.dumps({"type": msg_type, "payload": payload}).encode("utf-8")
return json_data
# Prepare data
data = {"type": msg_type, "payload": payload}

def _deserialize_message(self, data: bytes):
# Pickle protocol 5 supports extracting large arrays (buffers)
buffers = []
# Serialize main data, strip large arrays as references into buffers
main_bytes = pickle.dumps(data, protocol=5, buffer_callback=buffers.append)
return [main_bytes] + buffers

# JSON反序列化
message = json.loads(data.decode("utf-8"))
def _deserialize_message(self, frames: List[bytes]):
# identity = frames[0]

if len(frames) < 2:
raise ValueError(f"Received frames too short, missing payload {len(frames)}")

main_bytes = frames[1]
buffers = frames[2:]

# Restore data, pickle will automatically fill buffers back into numpy arrays
message = pickle.loads(main_bytes, buffers=buffers)
return message["type"], message["payload"]

def _process_message(self, message: bytes):
def _process_message(self, frames: bytes):
"""
process message
"""
try:
msg_type, payload = self._deserialize_message(message)
msg_type, payload = self._deserialize_message(frames)
self.logger.info(f"{msg_type}")

if msg_type == "prefill":
Expand Down
11 changes: 10 additions & 1 deletion fastdeploy/worker/gpu_model_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,15 @@ def insert_prefill_inputs(self, req_dicts: List[Request], num_running_requests:
dtype="int64",
)
self.seq_lens_this_time_buffer[idx : idx + 1] = num_prefill_send_token
if self.enable_mm:
if "position_ids" in request.multimodal_inputs and request.multimodal_inputs["position_ids"] is not None:
position_ids = paddle.to_tensor(
request.multimodal_inputs["position_ids"],
dtype="int64",
)
self.share_inputs["rope_emb"][idx : idx + 1, :] = self.prepare_rope3d(
position_ids, [request.get("max_tokens", 2048)], [0, position_ids.shape[0]]
)[0]
else:
self.share_inputs["pre_ids"][idx : idx + 1] = -1
self.share_inputs["step_idx"][idx : idx + 1] = 0
Expand Down Expand Up @@ -2604,7 +2613,7 @@ def _preprocess_mm_task(self, one: dict) -> None:
token_type_ids = one["token_type_ids"][np.newaxis, :]
token_type_ids = paddle.to_tensor(token_type_ids, dtype=paddle.int64)

if one["images"] is not None:
if "images" in one and one["images"] is not None:
image_type_ids = one["image_type_ids"][np.newaxis, :]
images = one["images"]
image_type_ids = paddle.to_tensor(image_type_ids, dtype=paddle.int64)
Expand Down
Loading