-
Notifications
You must be signed in to change notification settings - Fork 2
/
cpulimit.py
192 lines (182 loc) · 7.85 KB
/
cpulimit.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
import os, sys, time, argparse
import subprocess, logging, signal
# logging.basicConfig(filename='cpu_throttle.log', level=logging.DEBUG)
# logFormatter = logging.Formatter("%(asctime)s %(filename)s: " + fmt.format("%(levelname)s") + " %(message)s", "%Y/%m/%d %H:%M:%S")
if os.geteuid() != 0:
exit('You need to run this with root privileges. Please try again with sudo.')
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)8s] %(message)s",
handlers=[
logging.FileHandler("/var/log/cpu_throttle.log"),
logging.StreamHandler(sys.stdout)
]
)
def getArguments():
parser = argparse.ArgumentParser()
parser.add_argument('--time', type=int, help='Seconds to cooldown cpu before next check, default is 20 seconds.')
parser.add_argument('--crit_temp', type=int, help='Temp for cpu to throttle down (temperature in celcius degrees)')
parser.add_argument('--debug', action='store_true', help='Output more information when set to True.')
args = parser.parse_args()
if args.time is None:
relaxtime = 30 # time in seconds
else:
relaxtime = int(args.time)
if args.crit_temp is None:
crit_temp = 64000 # temp in mili celcius degree
else:
crit_temp = int(args.crit_temp)*1000
if args.debug:
logging.getLogger().setLevel(logging.DEBUG)
else:
logging.getLogger().setLevel(logging.INFO)
return relaxtime, crit_temp, args.debug
# determine hardware and kernel types
def hardwareCheck():
# does this work: $ echo "performance" | sudo tee /sys/devices/system/cpu/cpu*/cpufreq/scaling_governor
if os.path.exists("/sys/devices/LNXSYSTM:00/LNXTHERM:00/LNXTHERM:01/thermal_zone/temp") == True:
return 4
elif os.path.exists("/sys/bus/acpi/devices/LNXTHERM:00/thermal_zone/temp") == True:
return 5 # intel
elif os.path.exists("/sys/class/hwmon/hwmon0") == True:
return 6 # amd
elif os.path.exists("/sys/class/thermal/thermal_zone3/") == True:
return 7 # intel
elif os.path.exists("/proc/acpi/thermal_zone/THM0/temperature") == True:
return 1
elif os.path.exists("/proc/acpi/thermal_zone/THRM/temperature") == True:
return 2
elif os.path.exists("/proc/acpi/thermal_zone/THR1/temperature") == True:
return 3
else:
return 0
# depending on the kernel and hardware config, read the temperature
def getTemp(hardware):
temp = 0
if hardware == 6:
# logging.debug('reading temp..')
with open("/sys/class/hwmon/hwmon0/temp1_input", 'r') as mem1:
temp = mem1.read().strip()
elif hardware == 1:
temp = open("/proc/acpi/thermal_zone/THM0/temperature").read().strip().lstrip('temperature :').rstrip(' C')
elif hardware == 2:
temp = open("/proc/acpi/thermal_zone/THRM/temperature").read().strip().lstrip('temperature :').rstrip(' C')
elif hardware == 3:
temp = open("/proc/acpi/thermal_zone/THR1/temperature").read().strip().lstrip('temperature :').rstrip(' C')
elif hardware == 4:
temp = open("/sys/devices/LNXSYSTM:00/LNXTHERM:00/LNXTHERM:01/thermal_zone/temp").read().strip().rstrip('000')
elif hardware == 5:
with open("/sys/class/thermal/thermal_zone0/temp") as mem1:
temp = mem1.read().strip()
elif hardware == 7:
with open("/sys/class/thermal/thermal_zone3/temp") as mem1:
temp = mem1.read().strip()
else:
return 0
# logging.debug(f"Temp is {temp}")
# logging.debug(f"Temp is an integer: {isinstance(temp, int)}")
temp = float(temp)
if temp < 1000:
temp = temp * 1000
return int(temp)
def getMinMaxFrequencies(hardware):
if hardware == 0:
# with open("/sys/devices/system/cpu/cpu0/cpufreq/cpuinfo_min_freq", 'r') as mem1:
# min_freq = mem1.read().strip()
# with open("/sys/devices/system/cpu/cpu0/cpufreq/cpuinfo_max_freq", 'r') as mem1:
# max_freq = mem1.read().strip()
# return (min_freq, max_freq, '')
pass
else:
freq = subprocess.run('cpufreq-info -p', shell=True, stdout=subprocess.PIPE)
if freq.returncode != 0:
logging.warning('cpufreq-info gives error, cpufrequtils package installed?')
return (0, 0, 0)
else:
return tuple(freq.stdout.decode('utf-8').strip().lower().split(' '))
def setMaxFreq(frequency, hardware, cores):
if hardware != 0 :
logging.info(f"Set max frequency to {int(frequency/1000)} MHz")
for x in range(cores):
logging.debug(f'Setting core {x} to {frequency} KHz')
if subprocess.run(f'cpufreq-set -c {x} --max {frequency}', shell=True).returncode != 0:
logging.warning('cpufreq-set gives error, cpufrequtils package installed?')
break
def setGovernor(hardware, governor):
if subprocess.run(f'cpufreq-set -g {governor}', shell=True).returncode != 0:
logging.warning('cpufreq-set gives error, cpufrequtils package installed?')
def getCovernors(hardware):
govs = subprocess.run('cpufreq-info -g', shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if govs.returncode != 0:
logging.warning('cpufreq-info gives error, cpufrequtils package installed?')
return ()
else:
logging.debug(f'cpufreq-info governors: {govs.stdout.decode().strip()}')
if govs.stdout is None:
logging.warning('No covernors found!?')
logging.debug(f'Govs: {govs.stdout.decode()}')
return ()
else:
return tuple(govs.stdout.decode('utf-8').strip().lower().split(' '))
# if proces receives a kill signal or sigterm,
# raise an error and handle it in the finally statement for a proper exit
def signal_term_handler(self, args):
raise KeyboardInterrupt()
def main():
global version
hardware = 0
cur_temp = 0
governor_high = 'ondemand'
governor_low = 'powersave'
cur_governor = 'performance'
govs = ()
relax_time, crit_temp, debug = getArguments()
logging.debug(f'critic_temp: {crit_temp}, relaxtime: {relax_time}, debug: {debug}')
cores = os.cpu_count()
if cores is None:
cores = 16
hardware = hardwareCheck()
logging.debug(f'Detected hardware/kernel type is {hardware}')
if hardware == 0:
logging.warning("Sorry, this hardware is not supported")
sys.exit()
freq = getMinMaxFrequencies(hardware)
logging.debug(f'min max gov: {freq}')
min_freq = int(freq[0])
max_freq = int(freq[1])
if freq[2] is not None:
cur_governor = freq[2]
govs = getCovernors(hardware)
if governor_high not in govs:
governor_high = 'performance'
if governor_low not in govs:
logging.warning('Wait, powersave mode not in governors list?')
governor_low = 'userspace'
# logging.debug(f'govs received: {govs}')
signal.signal(signal.SIGINT, signal_term_handler)
signal.signal(signal.SIGTERM, signal_term_handler)
try:
while True:
cur_temp = getTemp(hardware)
logging.info(f'Current temp is {int(cur_temp/1000)}')
if cur_temp is None:
logging.warning('Error: Current temp is None?!')
break
if cur_temp > crit_temp:
logging.warning("CPU temp too high")
logging.info(f"Slowing down for {relax_time} seconds")
setGovernor(hardware, governor_low)
setMaxFreq(min_freq, hardware, cores)
time.sleep(relax_time)
else:
setGovernor(hardware, governor_high)
setMaxFreq(max_freq, hardware, cores)
time.sleep(3)
except KeyboardInterrupt:
logging.warning('Terminating')
finally:
logging.warning('Setting max cpu and governor back to normal.')
setGovernor(hardware, cur_governor)
setMaxFreq(max_freq, hardware, cores)
if __name__ == '__main__':
main()