-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutils.py
68 lines (54 loc) · 2.16 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import sys
import traci
from typing import Optional
from traci.constants import LCA_BLOCKED
class Utils:
def __init__(self, min_gap, platoon_intervehicle_distance):
self.min_gap = min_gap
self.platoon_intervehicle_distance = platoon_intervehicle_distance
@staticmethod
def getNextEdge(vid: str) -> Optional[str]:
"""
Get the next edge of the vehicle route.
Args:
vid (str): vehicle id
Returns:
str: if exists, the next road id, otherwise None
"""
edges = traci.vehicle.getRoute(vid)
curr = traci.vehicle.getRoadID(vid)
i = 0
while i < len(edges):
if edges[i] == curr:
return edges[i+1] if i+1 < len(edges) else None
i += 1
return None
def getLaneAvailableSpace(self, edge, net):
"""
The remaining available space (in meters) in the given edge.
The available space is computed for every lane in the edge and the returned value is the minor.
It is computed by subtracting the space occupied by the vehicles currently in the lane from the lane length.
Args:
edge (str): edge id
Returns:
float: remaining available space in meters
"""
available_space = sys.float_info.max
for lane in net.getEdge(edge).getLanes():
lane_id = lane.getID()
lane_length = traci.lane.getLength(lane_id)
n_vids = traci.lane.getLastStepVehicleNumber(lane_id)
occupied_space = n_vids + n_vids * max(self.min_gap, self.platoon_intervehicle_distance)
available_space = min(available_space, lane_length - occupied_space)
return available_space
@staticmethod
def checkLaneChange(vid: str, bitmask: int) -> bool:
return (traci.vehicle.getLaneChangeState(vid, LCA_BLOCKED)[0] & bitmask) == bitmask
@staticmethod
def getTotalEdgesLength(net) -> float:
"""
Compute the sum of the lengths of all edges
Returns:
float: Total edges length
"""
return sum(edge.getLength() for edge in net.getEdges())