Skip to content

Commit cd229f9

Browse files
committed
get_tps() supports multi-processing
1 parent 93df90f commit cd229f9

File tree

2 files changed

+169
-39
lines changed

2 files changed

+169
-39
lines changed

README.md

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -254,11 +254,36 @@ result_callback_function (id: int, config, result, log: dict) -> Any
254254

255255
- #### get_tps()
256256
Get TPS report in dict type after completing `start()` or by passing a list data.
257-
```python
258-
def get_tps(logs: dict=None, debug: bool=False, display_intervals: bool = False, interval: float=0, reverse_interval: bool = False) -> dict:
259-
```
260-
The log dict matches the format of the [get_logs()](#get_logs) and refers to it by default.
261-
Each task within a log will be validated for success according to the [callback_function()](#task.result_callback) result rule.
257+
```python
258+
def get_tps(logs: dict=None, display_intervals: bool=False, interval: float=0, reverse_interval: bool=False, use_processing: bool=False, verbose: bool=False, debug: bool=False,) -> dict:
259+
```
260+
The log dict matches the format of the [get_logs()](#get_logs) and refers to it by default.
261+
Each task within a log will be validated for success according to the [callback_function()](#task.result_callback) result rule.
262+
263+
> Enabling `use_processing` can speed up the peak-finding process, particularly for large tasks with long durations.
264+
265+
Example output with `debug` mode and `use_processing` enabled:
266+
```bash
267+
--- Start calculating the TPS data ---
268+
- Average TPS: 0.83, Total Duration: 1202.3867809772491s, Success Count: 999
269+
--- Start to compile intervals with an interval of 13 seconds ---
270+
- Interval - Start Time: 1734937209.851285, End Time: 1734937222.851285, TPS: 51.23
271+
* Peak detected above the current TPS threshold - Interval TPS: 51.23, Main TPS: 0.83
272+
- Interval - Start Time: 1734937222.851285, End Time: 1734937235.851285, TPS: 18.0
273+
- Interval - Start Time: 1734937235.851285, End Time: 1734937248.851285, TPS: 0.0
274+
...
275+
- Interval - Start Time: 1734938405.851285, End Time: 1734938412.238066, TPS: 0.0
276+
--- Start to find the peak TPS ---
277+
- Detecting from Start Time: 1734937210, Count: 67, Current TPS Threshold: 51.23, Worker: 104
278+
* Peak detected above the current TPS threshold - TPS: 53.5, Started at: 1734937210, Ended at: 1734937220
279+
* Peak detected above the current TPS threshold - TPS: 53.857142857142854, Started at: 1734937210, Ended at: 1734937224
280+
* Peak detected above the current TPS threshold - TPS: 55.13333333333333, Started at: 1734937210, Ended at: 1734937225
281+
* Peak detected above the current TPS threshold - TPS: 55.166666666666664, Started at: 1734937210, Ended at: 1734937228
282+
- Detecting from Start Time: 1734937224, Count: 73, Current TPS Threshold: 55.166666666666664, Worker: 105
283+
...
284+
- Detecting from Start Time: 1734937212, Count: 82, Current TPS Threshold: 55.166666666666664, Worker: 102
285+
* Peak detected above the current TPS threshold - TPS: 55.53846153846154, Started at: 1734937212, Ended at: 1734937225
286+
```
262287

263288
### Scenarios
264289

src/worker_dispatcher/worker_dispatcher.py

Lines changed: 139 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -563,17 +563,31 @@ def _merge_dicts_recursive(default_dict, user_dict):
563563
# TPS report
564564
def get_tps(
565565
logs: dict = None,
566-
debug: bool = False,
567566
display_intervals: bool = False,
568567
interval: float = 0,
569568
reverse_interval: bool = False,
569+
use_processing: bool = False,
570+
verbose: bool = False,
571+
debug: bool = False,
570572
) -> dict:
571573

574+
verbose = True if debug else verbose
575+
576+
# Multi-processing protection
577+
if use_processing:
578+
in_child_process = (multiprocessing.current_process().name != 'MainProcess')
579+
# Return False if is in worker process to let caller handle
580+
if in_child_process:
581+
# print("Exit procedure due to the child process")
582+
return False
583+
572584
# Logs data check
573585
logs = logs if logs else get_logs()
574586
if not isinstance(logs, list):
575587
return False
576588

589+
if verbose: print("--- Start calculating the TPS data ---")
590+
577591
# Run the TPS for all
578592
success_id_set = set()
579593
main_tps_data = _tps_calculate(False, False, logs, True, success_id_set)
@@ -582,20 +596,39 @@ def get_tps(
582596
ended_at = main_tps_data['ended_at']
583597
success_count = main_tps_data['count']['success']
584598
exec_time_avg = main_tps_data['metrics']['execution_time']['avg']
585-
tps = main_tps_data['tps']
599+
total_duration = main_tps_data['duration']
600+
tps_threshold = main_tps_data['tps']
601+
602+
if debug: print(" - Average TPS: {}, Total Duration: {}s, Success Count: {}".format(tps_threshold, total_duration, success_count))
586603

587604
# Peak data definition
588605
peak_tps_data = {}
589606

590607
# Compile Intervals and find peak TPS
608+
"""
609+
Compile Intervals
610+
This will also calculate whether the interval TPS exceeds the current one.
611+
"""
591612
interval_log_list = []
592-
if success_count > 0:
593-
interval = interval if interval else round(exec_time_avg * 3, 2) if (exec_time_avg * 3) >= 1 else 5
613+
if success_count == 0:
614+
if verbose: print("--- No sucessful task to compile intervals ---")
615+
else:
616+
ideal_max_row_num = 100
617+
interval = (
618+
interval
619+
if interval
620+
else round(exec_time_avg * 3, 2)
621+
if (total_duration / (exec_time_avg * 3)) <= ideal_max_row_num
622+
else math.ceil(total_duration / ideal_max_row_num)
623+
if (total_duration / ideal_max_row_num) >= 1
624+
else 1
625+
)
594626
interval_success_count = 0
595627
interval_ended_at = ended_at
596628
# Reserve option
597629
interval_pointer = interval_ended_at if reverse_interval else started_at
598630
interval = interval * -1 if reverse_interval else interval
631+
if verbose: print("--- Start to compile intervals with an interval of {} seconds ---".format(interval))
599632
while started_at <= interval_pointer <= ended_at:
600633
current_success_count = 0
601634
# Shift Indicator
@@ -612,46 +645,79 @@ def get_tps(
612645
# Find the peak
613646
current_success_count = int(tps_data['count']['success'])
614647
current_tps = float(tps_data['tps'])
615-
if debug: print(" - Interval - Start Time: {}, End Time: {}, TPS: {}".format(peak_started_at, interval_ended_at, current_tps))
648+
if debug: print(" - Interval - Start Time: {}, End Time: {}, TPS: {}".format(peak_started_at, interval_ended_at, current_tps))
616649
if current_success_count and current_success_count > interval_success_count:
617650
interval_success_count = current_success_count
618-
if debug: print(" * Find peak above the main TPS - Interval TPS: {}, Main TPS: {}".format(current_tps, tps))
619651
# Comparing with the main TPS
620-
if current_tps > tps:
652+
if current_tps > tps_threshold:
653+
if debug: print(" * Peak detected above the current TPS threshold - Interval TPS: {}, Main TPS: {}".format(current_tps, tps_threshold))
621654
peak_tps_data = tps_data
655+
# Update the TPS threshold
656+
tps_threshold = current_tps
622657

623-
# Peak Finding Algorithm
658+
"""
659+
Peak Finding Algorithm with multi-processing support
660+
Find each start time with more successes than the current threshold, and then continue adding the duration to gather the verified successes within the period.
661+
"""
624662
start_time_counts = {}
625-
tps_threshold = float(peak_tps_data['tps']) if peak_tps_data else tps
626663
peak_started_time = 0
627664
peak_ended_time = 0
665+
if verbose: print("--- Start to find the peak TPS ---")
666+
# Divide each successful start time into integers and sum accordingly
628667
for log in logs:
629-
if log['task_id'] in success_id_set:
668+
if result_is_success(log['result']):
630669
key = str(math.floor(log['started_at']))
631670
start_time_counts[key] = start_time_counts[key] + 1 if key in start_time_counts else 1
632-
# Each Calculation
633-
for start_time, count in start_time_counts.items():
634-
start_time = int(start_time)
635-
if count <= tps_threshold:
636-
continue
637-
if debug: print("- Peak Finding - Start Time: {}, Count: {}, Current TPS Threshold: {}".format(start_time, count, tps_threshold))
638-
duration_count = 0
639-
remaining_count = count
640-
while (start_time + duration_count) <= math.ceil(ended_at):
641-
success_count = 0
642-
duration_count = ended_at - start_time if (start_time + duration_count) > ended_at else duration_count + 1
643-
for log in logs:
644-
if math.floor(log['started_at']) >= start_time and math.ceil(log['ended_at']) <= start_time + duration_count:
645-
success_count += 1
646-
remaining_count -= 1
647-
# Check Finest
648-
cuurent_tps = success_count / duration_count
649-
# Find the peak
650-
if cuurent_tps > tps_threshold:
651-
tps_threshold = cuurent_tps
652-
peak_started_time = start_time
653-
peak_ended_time = start_time + duration_count
654-
if debug: print(" * Find Peak - TPS: {}, Started at: {}, Ended at: {}".format(tps_threshold, peak_started_time, peak_ended_time))
671+
672+
# Calculate each division of the start time
673+
# Multi-processing support solution
674+
wrap_references = {
675+
'tps_threshold': tps_threshold,
676+
'peak_started_time': peak_started_time,
677+
'peak_ended_time': peak_ended_time,
678+
}
679+
if use_processing:
680+
with multiprocessing.Manager() as manager: # Define Manager inside the pool block
681+
managed_wrap_references = manager.dict(wrap_references)
682+
lock = manager.Lock()
683+
with multiprocessing.Pool(multiprocessing.cpu_count()) as pool: # Use as many processes as CPU cores
684+
# Convert dictionary to a list of (key, value) tuples and process them
685+
pool.starmap(_search_peak_by_start_time, [(start_time, count, managed_wrap_references, logs, ended_at, debug, lock) for start_time, count in start_time_counts.items()])
686+
wrap_references = dict(managed_wrap_references)
687+
else:
688+
for start_time, count in start_time_counts.items():
689+
_search_peak_by_start_time(start_time, count, wrap_references, logs, ended_at, debug)
690+
# Unwrap the references
691+
tps_threshold = wrap_references['tps_threshold']
692+
peak_started_time = wrap_references['peak_started_time']
693+
peak_ended_time = wrap_references['peak_ended_time']
694+
695+
# for start_time, count in start_time_counts.items():
696+
# start_time = int(start_time)
697+
# # Skip if this start time count cannot exceed the current TPS threshold.
698+
# if count <= tps_threshold:
699+
# continue
700+
# if debug: print(" - Detecting from Start Time: {}, Count: {}, Current TPS Threshold: {}".format(start_time, count, tps_threshold))
701+
# duration_count = 0
702+
# # Count successes for each accumulative period to compare with the current TPS threshold
703+
# while (start_time + duration_count) <= math.ceil(ended_at):
704+
# success_count = 0
705+
# duration_count = ended_at - start_time if (start_time + duration_count) > ended_at else duration_count + 1
706+
# for log in logs:
707+
# if math.floor(log['started_at']) >= start_time and math.ceil(log['ended_at']) <= (start_time + duration_count) and result_is_success(log['result']):
708+
# success_count += 1
709+
# # Check Finest
710+
# current_tps = success_count / duration_count
711+
# # Find the peak
712+
# if current_tps > tps_threshold:
713+
# tps_threshold = current_tps
714+
# peak_started_time = start_time
715+
# peak_ended_time = start_time + duration_count
716+
# if debug: print(" * Peak detected above the current TPS threshold - TPS: {}, Started at: {}, Ended at: {}".format(tps_threshold, peak_started_time, peak_ended_time))
717+
# # Detect if this round can be skipped
718+
# if success_count >= count:
719+
# continue
720+
655721
# Aggregation
656722
if peak_started_time or peak_ended_time:
657723
peak_tps_data = _tps_calculate(peak_started_time, peak_ended_time, logs)
@@ -754,11 +820,50 @@ def _tps_calculate(started_at: float, ended_at: float, logs: list, display_valid
754820
tps_data['count']['invalidity'] = total_count - invalid_count
755821
return tps_data
756822

823+
def _search_peak_by_start_time(start_time, count, wrap_references, logs, ended_at, debug, lock=None):
824+
start_time = int(start_time)
825+
# Skip if this start time count cannot exceed the current TPS threshold.
826+
if count <= wrap_references['tps_threshold']:
827+
return
828+
if debug:
829+
worker_id = multiprocessing.current_process().name.split('-')[-1]
830+
print(" - Detecting from Start Time: {}, Count: {}, Current TPS Threshold: {}, Worker: {}".format(start_time, count, wrap_references['tps_threshold'], worker_id))
831+
duration_count = 0
832+
# Count successes for each accumulative period to compare with the current TPS threshold
833+
while (start_time + duration_count) <= math.ceil(ended_at):
834+
success_count = 0
835+
duration_count = ended_at - start_time if (start_time + duration_count) > ended_at else duration_count + 1
836+
for log in logs:
837+
if math.floor(log['started_at']) >= start_time and math.ceil(log['ended_at']) <= (start_time + duration_count) and result_is_success(log['result']):
838+
success_count += 1
839+
# Check Finest
840+
current_tps = success_count / duration_count
841+
# Find the peak
842+
if current_tps > wrap_references['tps_threshold']:
843+
if lock:
844+
with lock:
845+
# Lock and check again
846+
if current_tps > wrap_references['tps_threshold']:
847+
_search_peak_by_start_time_write(wrap_references, current_tps, start_time, duration_count, debug)
848+
else:
849+
_search_peak_by_start_time_write(wrap_references, current_tps, start_time, duration_count, debug)
850+
# Detect if this round can be skipped
851+
if success_count >= count:
852+
continue
853+
return
854+
855+
def _search_peak_by_start_time_write(wrap_references, current_tps, start_time, duration_count, debug):
856+
wrap_references['tps_threshold'] = current_tps
857+
wrap_references['peak_started_time'] = start_time
858+
wrap_references['peak_ended_time'] = start_time + duration_count
859+
if debug: print(" * Peak detected above the current TPS threshold - TPS: {}, Started at: {}, Ended at: {}".format(wrap_references['tps_threshold'], wrap_references['peak_started_time'], wrap_references['peak_ended_time']))
860+
return
861+
757862
def _validate_log_format(log) -> bool:
758863
return all(key in log for key in ('started_at', 'ended_at', 'result'))
759864

760865
def result_is_success(result) -> bool:
761-
if (isinstance(result, requests.Response) and result.status_code != 200) or not result:
866+
if not result or (isinstance(result, requests.Response) and result.status_code != 200):
762867
return False
763868
else:
764869
return True

0 commit comments

Comments
 (0)