Skip to content

Commit 80efe98

Browse files
authored
[PD Disaggregation] Add timestamp for analyzing splitwise deployment (#5317)
* Add timestamp for analyzing splitwise deployment * up * up * up * up * up * up * fix format * fix
1 parent 0c66163 commit 80efe98

File tree

21 files changed

+287
-188
lines changed

21 files changed

+287
-188
lines changed

docs/online_serving/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,8 @@ mm_hashes: Optional[list] = None
231231
# Hash values for multimodal (e.g., image/audio) inputs, used for verification or tracking.
232232
# Default None indicates no multimodal input or hash validation required.
233233

234+
collect_metrics: Optional[bool] = False
235+
# Whether to return metrics information, for performance analysis or debugging (default is False, meaning no metrics are returned).
234236
```
235237

236238
### Differences in Return Fields

docs/zh/online_serving/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,8 @@ logits_processors_args: Optional[Dict] = None
224224
mm_hashes: Optional[list] = None
225225
# 多模态(multimodal)输入的哈希值列表,用于验证或跟踪输入内容(如图像、音频等)。默认 None 表示无多模态输入或无需哈希验证。
226226

227+
collect_metrics: Optional[bool] = False
228+
# 是否返回生成过程中的指标信息,用于性能分析或调试(默认 False 表示不返回)。
227229
```
228230

229231
### 返回字段差异

fastdeploy/demo/offline_demo.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,9 @@
1717
from fastdeploy.engine.sampling_params import SamplingParams
1818
from fastdeploy.entrypoints.llm import LLM
1919

20-
model_name_or_path = "/workspace/ERNIE-4.5-0.3B-Paddle"
21-
22-
# 超参设置
23-
sampling_params = SamplingParams(temperature=0.1, max_tokens=30, prompt_logprobs=100)
24-
llm = LLM(model=model_name_or_path, tensor_parallel_size=1, enable_prefix_caching=False)
25-
output = llm.generate(prompts="who are you?", use_tqdm=True, sampling_params=sampling_params)
20+
model_name_or_path = "PaddlePaddle/ERNIE-4.5-0.3B-Paddle"
21+
sampling_params = SamplingParams(temperature=0.1, max_tokens=30)
22+
llm = LLM(model=model_name_or_path)
23+
output = llm.generate(prompts="who are you?", use_tqdm=True, sampling_params=sampling_params)
2624

2725
print(output)

fastdeploy/engine/async_llm.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -402,7 +402,7 @@ async def add_request(
402402

403403
try:
404404
request = Request.from_dict(prompt)
405-
request.llm_engine_recv_req_timestamp = time.time()
405+
request.metrics.scheduler_recv_req_time = time.time()
406406

407407
# Check if already preprocessed by AsyncEngineClient
408408
is_preprocessed = prompt.get("_preprocessed", False)
@@ -419,7 +419,7 @@ async def add_request(
419419
request.need_prefill_tokens = prompt_token_ids_len
420420

421421
if not is_preprocessed:
422-
request.preprocess_start_time = arrival_time
422+
request.metrics.preprocess_start_time = arrival_time
423423
input_ids_len = request.prompt_token_ids_len
424424

425425
request.set(
@@ -448,7 +448,7 @@ async def add_request(
448448
llm_logger.error(error_msg)
449449
raise EngineError(error_msg, error_code=400)
450450

451-
request.preprocess_end_time = time.time()
451+
request.metrics.preprocess_end_time = time.time()
452452

453453
# Register output queue first, then add request
454454
await self.output_processor.register_request(request_id, output_queue)

fastdeploy/engine/common_engine.py

Lines changed: 48 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -362,7 +362,6 @@ def insert_tasks(self, tasks: List[Request], current_id=-1):
362362
tasks.remove(tmp_task)
363363

364364
for item in tasks:
365-
item.schedule_start_time = time.time()
366365
trace_print(LoggingEventName.RESOURCE_ALLOCATE_START, item.request_id, getattr(item, "user", ""))
367366
available_batch = np.sum(self.resource_manager.stop_flags)
368367
if len(tasks) > available_batch:
@@ -400,7 +399,7 @@ def insert_tasks(self, tasks: List[Request], current_id=-1):
400399
if not is_decode:
401400
self.llm_logger.info(f"Tasks are sent to engine, req_ids={req_ids}")
402401
for task in tasks:
403-
task.inference_start_time = time.time()
402+
task.metrics.inference_start_time = time.time()
404403
trace_print(LoggingEventName.RESOURCE_ALLOCATE_END, task.request_id, getattr(task, "user", ""))
405404
trace_print(LoggingEventName.REQUEST_SCHEDULE_END, task.request_id, getattr(task, "user", ""))
406405
trace_print(LoggingEventName.INFERENCE_START, task.request_id, getattr(task, "user", ""))
@@ -415,7 +414,7 @@ def insert_tasks(self, tasks: List[Request], current_id=-1):
415414
def _insert_prefilled_requests(self, request_outputs: List[RequestOutput]):
416415
"""
417416
Decode insert prefilled requests into engine worker queue.
418-
Used in v1_kvcache_scheduler.
417+
Used in v0_kvcache_scheduler.
419418
Args:
420419
request_outputs: a list of RequestOutput sent by prefill instance
421420
"""
@@ -437,6 +436,10 @@ def _insert_prefilled_requests(self, request_outputs: List[RequestOutput]):
437436

438437
cur_req.prompt_token_ids[0] = req_out.outputs.token_ids[0]
439438
cur_req.num_cached_tokens = req_out.num_cached_tokens
439+
req_out.metrics.decode_recv_req_time = cur_req.metrics.decode_recv_req_time
440+
req_out.metrics.decode_preallocate_req_time = cur_req.metrics.decode_preallocate_req_time
441+
cur_req.metrics = req_out.metrics
442+
cur_req.metrics.decode_inference_start_time = time.time()
440443
if self.cfg.speculative_config.method in ["mtp"] and self.cfg.scheduler_config.splitwise_role == "decode":
441444
cur_req.draft_token_ids = copy.deepcopy(req_out.outputs.draft_token_ids)
442445

@@ -644,6 +647,7 @@ def _schedule_request_to_worker(self):
644647
batch=num_prefill_batch,
645648
)
646649
for task in tasks:
650+
task.metrics.engine_get_req_time = time.time()
647651
trace_print(LoggingEventName.REQUEST_QUEUE_END, task.request_id, getattr(task, "user", ""))
648652
if len(tasks) == 0:
649653
time.sleep(0.001)
@@ -706,7 +710,7 @@ def _fetch_request():
706710
batch=num_prefill_batch,
707711
)
708712
for task in tasks:
709-
task.schedule_start_time = time.time()
713+
task.metrics.engine_get_req_time = time.time()
710714
trace_print(LoggingEventName.REQUEST_QUEUE_END, task.request_id, getattr(task, "user", ""))
711715

712716
if self.cfg.scheduler_config.splitwise_role == "decode":
@@ -732,7 +736,10 @@ def _fetch_request():
732736
# assure can allocate block ids in P
733737
while not self.resource_manager.preallocate_resource_in_p(task):
734738
time.sleep(0.005)
735-
self.llm_logger.debug(f"P has allocated resources for request: {task.request_id}")
739+
self.llm_logger.debug(
740+
f"P has allocated resources and then ask D resource for request: {task.request_id}"
741+
)
742+
task.metrics.ask_decode_resource_start_time = time.time()
736743
while True:
737744
self.split_connector.send_splitwise_tasks([task], task.idx)
738745
status, msg = self.split_connector.check_decode_allocated(task)
@@ -742,39 +749,39 @@ def _fetch_request():
742749
)
743750
time.sleep(0.05)
744751
else:
752+
task.metrics.ask_decode_resource_finish_time = time.time()
745753
break
746754
self.llm_logger.debug(f"D has allocated resource for request: {task.request_id}")
747755
else:
748756
for task in tasks:
749757
# assure can allocate block ids in P
750758
while not self.resource_manager.preallocate_resource_in_p(task):
751759
time.sleep(0.005)
752-
self.llm_logger.debug(f"P has allocated resources for request: {task.request_id}")
760+
761+
self.llm_logger.debug(
762+
f"P has allocated resources and then ask D resource for req_id: {task.request_id}"
763+
)
764+
task.metrics.ask_decode_resource_start_time = time.time()
753765
self.split_connector.send_splitwise_tasks([task], task.idx)
754766

755767
for task in tasks:
756-
if self.cfg.scheduler_config.splitwise_role != "mixed":
757-
# assure fetch block ids from D
758-
status, msg = self.split_connector.check_decode_allocated(task)
759-
if not status:
760-
self.llm_logger.error(
761-
f"D failed to allocate resource for request {task.request_id}, message: {msg}."
762-
)
763-
self.scheduler.put_results(
764-
[
765-
RequestOutput(
766-
request_id=task.request_id,
767-
finished=True,
768-
error_code=500,
769-
error_msg=msg,
770-
)
771-
]
772-
)
773-
need_delete_tasks.append(task)
774-
continue
775-
else:
776-
self.llm_logger.debug(f"D has allocated resource for request: {task.request_id}")
777-
768+
# assure fetch block ids from D
769+
status, msg = self.split_connector.check_decode_allocated(task)
770+
task.metrics.ask_decode_resource_finish_time = time.time()
771+
if not status:
772+
self.llm_logger.error(f"{task.request_id} prefill failed with msg:{msg}.")
773+
self.scheduler.put_results(
774+
[
775+
RequestOutput(
776+
request_id=task.request_id,
777+
finished=True,
778+
error_code=500,
779+
error_msg=msg,
780+
)
781+
]
782+
)
783+
need_delete_tasks.append(task)
784+
continue
778785
for tmp_task in need_delete_tasks:
779786
tasks.remove(tmp_task)
780787
# release resource in P
@@ -822,6 +829,7 @@ def _fetch_request():
822829
# Fetch requests and add them to the scheduling queue
823830
if tasks:
824831
for task in tasks:
832+
task.metrics.add_req_to_resource_manager_time = time.time()
825833
trace_print(
826834
LoggingEventName.RESOURCE_ALLOCATE_START, task.request_id, getattr(task, "user", "")
827835
)
@@ -895,6 +903,11 @@ def _fetch_request():
895903
LoggingEventName.REQUEST_SCHEDULE_END, task.request_id, getattr(task, "user", "")
896904
)
897905
trace_print(LoggingEventName.INFERENCE_START, task.request_id, getattr(task, "user", ""))
906+
if isinstance(task, Request):
907+
if self.cfg.scheduler_config.splitwise_role == "decode":
908+
task.metrics.decode_inference_start_time = time.time()
909+
else:
910+
task.metrics.inference_start_time = time.time()
898911
self.engine_worker_queue.put_tasks((tasks, self.resource_manager.real_bsz))
899912

900913
# 4. Response error tasks
@@ -962,7 +975,7 @@ def _insert_zmq_task_to_scheduler(self):
962975
err_msg = None
963976
try:
964977
request = Request.from_dict(data)
965-
request.llm_engine_recv_req_timestamp = time.time()
978+
request.metrics.scheduler_recv_req_time = time.time()
966979
start_span("ENQUEUE_ZMQ", data, trace.SpanKind.PRODUCER)
967980
main_process_metrics.requests_number.inc()
968981
trace_print(LoggingEventName.PREPROCESSING_END, data["request_id"], data.get("user", ""))
@@ -1132,6 +1145,8 @@ def _fetch_requests():
11321145
self.llm_logger.debug(
11331146
f"D has received tasks to preallocate resource for tasks: {[task.request_id for task in tasks]}"
11341147
)
1148+
for task in tasks:
1149+
task.metrics.decode_recv_req_time = time.time()
11351150
allocate_resource_requests.extend(tasks)
11361151
elif isinstance(tasks[0], RequestOutput):
11371152
self.llm_logger.debug(
@@ -1141,6 +1156,7 @@ def _fetch_requests():
11411156
tasks = [tasks]
11421157
for task in tasks:
11431158
task.finished = False
1159+
task.metrics.decode_recv_first_token_time = time.time()
11441160
prefilled_request_ouputs.extend(tasks)
11451161

11461162
def _process_allocate_resource_requests():
@@ -1150,6 +1166,8 @@ def _process_allocate_resource_requests():
11501166

11511167
if envs.ENABLE_V1_KVCACHE_SCHEDULER:
11521168
if self.resource_manager.preallocate_resource_in_d(task):
1169+
task.metrics.decode_preallocate_req_time = time.time()
1170+
self.llm_logger.info(f"Resource available, processing task {task.request_id}")
11531171
self.split_connector.send_cache_info_to_prefill([task])
11541172
self.llm_logger.debug(f"D has successfully sent cache infos for task {task.request_id}")
11551173
processed_indices.append(idx)
@@ -1158,6 +1176,7 @@ def _process_allocate_resource_requests():
11581176
if self.resource_manager.is_resource_sufficient(task.prompt_token_ids_len):
11591177
self.llm_logger.debug(f"D Resource available, processing task {task.request_id}")
11601178
self.insert_tasks([task])
1179+
task.metrics.decode_preallocate_req_time = time.time()
11611180
processed_indices.append(idx)
11621181
is_success = True
11631182

fastdeploy/engine/engine.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -256,13 +256,13 @@ def add_requests(self, task, sampling_params=None, **kwargs):
256256
if sampling_params is not None:
257257
task.update(asdict(sampling_params))
258258
request = Request.from_dict(task)
259-
request.llm_engine_recv_req_timestamp = time.time()
259+
request.metrics.scheduler_recv_req_time = time.time()
260260
llm_logger.info(f"Receive request {request}")
261261
if sampling_params is not None:
262262
if sampling_params.temperature is not None and abs(sampling_params.temperature) < 1e-06:
263263
sampling_params.temperature = 1e-06
264264
request.sampling_params = sampling_params
265-
request.preprocess_start_time = time.time()
265+
request.metrics.preprocess_start_time = time.time()
266266
chat_template_kwargs = kwargs.get("chat_template_kwargs") or {}
267267
chat_template_kwargs["chat_template"] = kwargs.get("chat_template")
268268
kwargs["chat_template_kwargs"] = chat_template_kwargs
@@ -324,7 +324,8 @@ def add_requests(self, task, sampling_params=None, **kwargs):
324324
llm_logger.error(err_msg)
325325
raise EngineError(err_msg, error_code=400)
326326

327-
request.preprocess_end_time = time.time()
327+
request.metrics.preprocess_end_time = time.time()
328+
request.metrics.scheduler_recv_req_time = time.time()
328329
self.engine.scheduler.put_requests([request])
329330
llm_logger.info(f"Cache task with request_id ({request.get('request_id')})")
330331
llm_logger.debug(f"cache task: {request}")

0 commit comments

Comments
 (0)