-
Notifications
You must be signed in to change notification settings - Fork 0
/
start_stop.py
160 lines (132 loc) · 5.93 KB
/
start_stop.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import argparse
import json
import os
import shutil
import time
import redis
import sys
import signal
import subprocess
from autotest_backend.config import config
_THIS_DIR = os.path.dirname(os.path.realpath(__file__))
_PID_FILE = os.path.join(_THIS_DIR, "supervisord.pid")
_CONF_FILE = os.path.join(_THIS_DIR, "supervisord.conf")
_SUPERVISORD = shutil.which(os.path.join(os.path.dirname(sys.executable), "supervisord")) or shutil.which("supervisord")
_RQ = shutil.which(os.path.join(os.path.dirname(sys.executable), "rq")) or shutil.which("rq")
SECONDS_PER_DAY = 86400
HEADER = f"""[supervisord]
"""
CONTENT = """[program:rq_worker_{worker_user}]
environment=WORKERUSER={worker_user}
command={rq} worker {worker_args} settings {queues}
process_name=rq_worker_{worker_user}
directory={directory}
stopsignal=TERM
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
{log_info}
"""
REDIS_CONNECTION = redis.Redis.from_url(config["redis_url"], decode_responses=True)
def get_log_info(log, error_log):
log_info = []
if log:
if log == '-':
log_info.extend(["stdout_logfile=/dev/fd/1", "stdout_logfile_maxbytes=0"])
else:
log_info.append(f"stdout_logfile={log}")
if error_log:
if error_log == '-':
if log == '-':
log_info.append(f"redirect_stderr=true")
else:
log_info.extend(["stderr_logfile=/dev/fd/2", "stderr_logfile_maxbytes=0"])
else:
log_info.append(f"stderr_logfile={error_log}")
return "\n".join(log_info)
def create_enqueuer_wrapper(rq, log, error_log):
log_info = get_log_info(log, error_log)
with open(_CONF_FILE, "w") as f:
f.write(HEADER)
for worker_data in config["workers"]:
c = CONTENT.format(
worker_user=worker_data["user"],
rq=rq,
worker_args=f"--url {config['redis_url']} --log-format '{worker_data['user']} %%(asctime)s %%(message)s'",
queues=" ".join(worker_data.get("queues", "high low batch".split())),
directory=os.path.dirname(os.path.realpath(__file__)),
log_info=log_info,
)
f.write(c)
def start(rq, supervisord, log, error_log, extra_args):
create_enqueuer_wrapper(rq, log, error_log)
proc = subprocess.run([supervisord, "-c", _CONF_FILE, *extra_args], cwd=_THIS_DIR, capture_output=True)
if proc.returncode:
raise Exception(f"supervisord failed to start with the following error:\n{proc.stderr}")
def stop():
if os.path.isfile(_PID_FILE):
with open(_PID_FILE) as f:
pid = int(f.read().strip())
os.kill(pid, signal.SIGTERM)
else:
sys.stderr.write("supervisor is already stopped")
def stat(rq, extra_args):
subprocess.run([rq, "info", "--url", config["redis_url"], *extra_args], check=True)
def clean(age, dry_run):
for settings_id, settings in dict(REDIS_CONNECTION.hgetall("autotest:settings") or {}).items():
settings = json.loads(settings)
last_access_timestamp = settings.get("_last_access")
access = int(time.time() - (last_access_timestamp or 0))
if last_access_timestamp is None or (access > (age * SECONDS_PER_DAY)):
dir_path = os.path.join(config["workspace"], "scripts", str(settings_id))
if dry_run and os.path.isdir(dir_path):
last_access = "UNKNOWN" if last_access_timestamp is None else access // SECONDS_PER_DAY
print(f"{dir_path} -> last accessed {last_access or '< 1'} days ago")
else:
settings["_error"] = "the settings for this test have expired, please re-upload the settings."
REDIS_CONNECTION.hset("autotest:settings", key=settings_id, value=json.dumps(settings))
if os.path.isdir(dir_path):
shutil.rmtree(dir_path)
def _exec_type(path):
exec_path = shutil.which(path)
if exec_path:
return exec_path
raise argparse.ArgumentTypeError(f"no executable found at: '{path}'")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(dest="command")
start_parser = subparsers.add_parser("start", help="start the autotester")
subparsers.add_parser("stop", help="stop the autotester")
restart_parser = subparsers.add_parser("restart", help="restart the autotester")
stat_parser = subparsers.add_parser("stat", help="display current status of the autotester queues")
clean_parser = subparsers.add_parser("clean", help="clean up old/unused test scripts")
clean_parser.add_argument(
"-a", "--age", default=0, type=int, help="clean up tests older than <age> in days. Default=0"
)
clean_parser.add_argument(
"-d", "--dry-run", action="store_true", help="list files that will be deleted without actually removing them"
)
for parser_ in (start_parser, restart_parser, stat_parser):
parser_.add_argument("--rq", default=_RQ, type=_exec_type, help=f"path to rq executable, default={_RQ}")
if parser_ is not stat_parser:
parser_.add_argument(
"--supervisord",
default=_SUPERVISORD,
type=_exec_type,
help=f"path to supervisord executable, default={_SUPERVISORD}",
)
parser_.add_argument('-l', '--log', help='path to log file to write rq logs to.')
parser_.add_argument('-e', '--error_log', help='path to log file to write rq error logs to.')
args, remainder = parser.parse_known_args()
if args.command == "start":
start(args.rq, args.supervisord, args.log, args.error_log, remainder)
elif args.command == "stop":
stop()
elif args.command == "restart":
stop()
start(args.rq, args.supervisord, args.log, args.error_log, remainder)
elif args.command == "stat":
stat(args.rq, remainder)
elif args.command == "clean":
clean(args.age, args.dry_run)