forked from Ridepad/uwu-logs
-
Notifications
You must be signed in to change notification settings - Fork 0
/
logs_spells_order.py
132 lines (109 loc) · 4.28 KB
/
logs_spells_order.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
from collections import defaultdict
import json
import logs_base
from constants import COMBINE_SPELLS, running_time
IGNORED_FLAGS = {
"SPELL_HEAL",
"SPELL_PERIODIC_HEAL",
"ENCHANT_APPLIED",
"ENCHANT_REMOVED",
"SPELL_DRAIN",
"ENVIRONMENTAL_DAMAGE",
}
def to_int(s: str):
minutes, seconds = s.split(":", 1)
return int(minutes) * 600 + int(seconds.replace('.', ''))
def convert_keys(data: dict[str, int]):
FIRST_KEY = to_int(list(data)[0])
for k in list(data):
new_key = to_int(k) - FIRST_KEY
if new_key < 0:
new_key = new_key + 36000
data[new_key] = data.pop(k)
def to_float(s: str):
minutes, seconds = s[-9:].split(":", 1)
return int(minutes), float(seconds)
def _timestamp(s: str):
return s.split(',', 1)[0]
def get_delta_wrap(logs_slice, start_index):
start_minutes, start_seconds = to_float(_timestamp(logs_slice[start_index]))
first_minutes, _ = to_float(_timestamp(logs_slice[0]))
end_minutes, _ = to_float(_timestamp(logs_slice[-1]))
if first_minutes > start_minutes:
def get_delta(current_ts):
_minutes, _seconds = to_float(current_ts)
_seconds = _seconds - start_seconds
if _minutes > 50:
_minutes = _minutes - start_minutes - 60
else:
_minutes = _minutes - start_minutes
return int((_minutes * 60 + _seconds)*1000)
elif start_minutes > end_minutes:
def get_delta(current_ts: str):
_minutes, _seconds = to_float(current_ts)
_seconds = _seconds - start_seconds
if _minutes < 20:
_minutes = _minutes - start_minutes + 60
else:
_minutes = _minutes - start_minutes
return int((_minutes * 60 + _seconds)*1000)
else:
def get_delta(current_ts):
_minutes, _seconds = to_float(current_ts)
_seconds = _seconds - start_seconds
_minutes = _minutes - start_minutes
return int((_minutes * 60 + _seconds)*1000)
return get_delta
@running_time
def get_history(logs_slice: list[str], source_guid: str, ignored_guids: set[str], start_index: int):
flags = set()
history = defaultdict(list)
get_delta = get_delta_wrap(logs_slice, start_index)
if not ignored_guids:
ignored_guids = set()
elif source_guid in ignored_guids:
ignored_guids.remove(source_guid)
for line in logs_slice:
if source_guid not in line:
continue
try:
timestamp, flag, _, sName, tGUID, tName, spell_id, _, etc = line.split(',', 8)
if flag in IGNORED_FLAGS or tGUID in ignored_guids:
continue
_delta = get_delta(timestamp)
history[spell_id].append((_delta, flag, sName, tName, tGUID, etc))
flags.add(flag)
except ValueError:
continue
for spell_id in list(history):
if spell_id not in COMBINE_SPELLS:
continue
main_spell_id = COMBINE_SPELLS[spell_id]
history[main_spell_id] = sorted(history[main_spell_id] + history.pop(spell_id))
return {
"DATA": history,
"FLAGS": flags,
}
class Timeline(logs_base.THE_LOGS):
@logs_base.cache_wrap
def get_spell_history(self, s: int, f: int, guid: str) -> dict[str, defaultdict[str, int]]:
ts = self.get_timestamp()
s_shifted = ts[self.find_index(s, 180)]
logs_slice = self.LOGS[s_shifted:f]
players_and_pets = self.get_players_and_pets_guids()
data = get_history(logs_slice, guid, players_and_pets, s-s_shifted)
_spells = self.SPELLS_WITH_ICONS
data["SPELLS"] = {
x: _spells[int(x)]
for x in data["DATA"]
}
data["RDURATION"] = self.get_slice_duration(s, f)
data["NAME"] = self.guid_to_name(guid)
data["CLASS"] = self.get_classes().get(guid, "npc")
return data
def get_spell_history_wrap(self, segments: dict, player_name: str):
s, f = segments[0]
player = self.name_to_guid(player_name)
return self.get_spell_history(s, f, player)
def get_spell_history_wrap_json(self, segments: dict, player_name: str):
return json.dumps((self.get_spell_history_wrap(segments, player_name)), default=list)