-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserver.py
322 lines (289 loc) · 12.7 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
from http import HTTPStatus
import http.server as srv
import json
import logging
import re
import threading
import time
from .mimeparser import MimeParser
threading.excepthook = lambda *args: logging.exception("Exception in thread")
PRINTER_API = "/api/v1/"
CLUSTER_API = "/cluster-api/v1/"
MJPG_STREAMER_PORT = 8080
class Handler(srv.BaseHTTPRequestHandler):
"""
Regex for a path in form:
/cluster-api/v1/print_jobs/<UUID>...
with the uuid and the suffix (everything past the uuid) in their
respective groups "uuid" and "suffix".
"""
uuid_regex = re.compile(r"^" + CLUSTER_API + r"print_jobs/"
+ r"(?P<uuid>[0-9a-f]{8}(?:-[0-9a-f]{4}){3}-[0-9a-f]{12})"
+ r"(?P<suffix>.*)$")
# Keeps TCP connections alive
protocol_version = "HTTP/1.1"
def __init__(self, request, client_address, server):
self.module = server.module
self.reactor = server.module.reactor
self.content_manager = self.module.content_manager
self._size = None # For logging GET requests
super().__init__(request, client_address, server)
def do_GET(self):
"""
Implement a case-specific response, limited to the requests
that we can expect from Cura. For a summary of those see
README.md
"""
if self.path == CLUSTER_API + "printers":
self.get_json(self.content_manager.get_printer_status())
elif self.path == CLUSTER_API + "print_jobs":
self.get_json(self.content_manager.get_print_jobs())
elif self.path == CLUSTER_API + "materials":
self.get_json(self.content_manager.get_materials())
elif self.path == "/?action=stream":
self.get_stream()
elif self.path == "/?action=snapshot":
self.get_snapshot()
elif self.path == PRINTER_API + "system":
self.send_error(HTTPStatus.NOT_IMPLEMENTED)
else:
m = self.uuid_regex.match(self.path)
if m and m.group("suffix") == "/preview_image":
self.get_preview_image(m.group("uuid"))
else:
# NOTE: send_error() calls end_headers()
self.send_error(HTTPStatus.NOT_FOUND)
def do_POST(self):
if self.headers.get_content_maintype() == "multipart":
if self.path == CLUSTER_API + "print_jobs/":
self.post_print_job()
elif self.path == CLUSTER_API + "materials/":
self.post_material()
else:
m = self.uuid_regex.match(self.path)
if m and m.group("suffix") == "/action/move":
self.post_move_to_top(m.group("uuid"))
else:
self.send_error(HTTPStatus.NOT_FOUND)
def do_PUT(self):
m = self.uuid_regex.match(self.path)
if m and m.group("suffix") == "/action":
# pause, print or abort
self.put_action(m.group("uuid"))
elif m and not m.group("suffix"):
# force print job
self.put_force(m.group("uuid"))
else:
self.send_error(HTTPStatus.NOT_FOUND)
def do_DELETE(self):
m = self.uuid_regex.match(self.path)
if m and not m.group("suffix"):
# Delete print job from queue
self.delete_print_job(m.group("uuid"))
else:
self.send_error(HTTPStatus.NOT_FOUND)
def get_json(self, content):
"""Send an object JSON-formatted"""
try:
json_content = json.dumps(content)
except TypeError:
self.send_error(HTTPStatus.INTERNAL_SERVER_ERROR,
"JSON serialization failed")
else:
self.send_response(HTTPStatus.OK, size=len(json_content))
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json_content.encode())
def get_preview_image(self, uuid):
"""Send back the preview image for the print job with uuid"""
index, print_job = self.content_manager.uuid_to_print_job(uuid)
if not print_job:
self.send_error(HTTPStatus.NOT_FOUND, "Print job not in Queue")
else:
try:
thumbnail_path = self.module.get_thumbnail_path(
index, print_job.name)
with open(thumbnail_path, "rb") as fp:
image_data = fp.read()
self.send_response(HTTPStatus.OK, size=len(image_data))
self.send_header("Content-Type", "image/png")
self.end_headers()
self.wfile.write(image_data)
except IOError:
self.send_error(HTTPStatus.INTERNAL_SERVER_ERROR,
"Failed to open preview image at " + thumbnail_path)
def get_stream(self):
"""Redirect to the port on which mjpg-streamer is running"""
self.send_response(HTTPStatus.FOUND)
self.send_header("Location", "http://{}:{}/?action=stream".format(
self.module.ADDRESS, MJPG_STREAMER_PORT))
self.end_headers()
def get_snapshot(self):
"""Snapshot only sends a single image"""
self.send_response(HTTPStatus.FOUND)
self.send_header("Location", "http://{}:{}/?action=snapshot".format(
self.module.ADDRESS, MJPG_STREAMER_PORT))
self.end_headers()
def post_print_job(self):
boundary = self.headers.get_boundary()
length = int(self.headers.get("Content-Length", 0))
try:
parser = MimeParser(self.rfile, boundary, length,
self.module.SDCARD_PATH, overwrite=False)
submessages, paths = parser.parse()
except Exception as e:
self.send_error(HTTPStatus.INTERNAL_SERVER_ERROR,
"Parser failed: " + str(e))
else:
#for msg in submessages:
# name = msg.get_param("name", header="Content-Disposition")
# if name == "owner":
# owner = msg.get_payload().strip()
for path in paths:
self.reactor.cb(self.module.add_print, path)
self.send_response(HTTPStatus.OK)
self.end_headers()
def post_material(self):
boundary = self.headers.get_boundary()
length = int(self.headers.get("Content-Length", 0))
try:
parser = MimeParser(self.rfile, boundary, length,
self.module.MATERIAL_PATH)
submessages, paths = parser.parse()
except Exception as e:
self.send_error(HTTPStatus.INTERNAL_SERVER_ERROR,
"Parser failed: " + str(e))
else:
self.reactor.cb(self.read_material_file, paths[0])
# Reply is checked specifically for 200
self.send_response(HTTPStatus.OK)
self.end_headers()
@staticmethod
def read_material_file(e, printer, path):
fm = printer.objects['filament_manager']
# Invalidate XML tree cache
fm.cached_parse.cache_clear()
fm.read_single_file(path)
def post_move_to_top(self, uuid):
"""Move print job with uuid to the top of the queue"""
length = int(self.headers.get("Content-Length", 0))
rdata = self.rfile.read(length)
try:
data = json.loads(rdata)
except ValueError:
self.send_error(HTTPStatus.BAD_REQUEST, "Failed to read JSON")
return
old_index, _ = self.content_manager.uuid_to_print_job(uuid)
new_index = data.get("to_position")
if old_index is None:
self.send_error(HTTPStatus.NOT_FOUND, "Print job not in Queue")
elif data.get("list") != "queued" or not isinstance(new_index, int):
self.send_error(HTTPStatus.BAD_REQUEST,
"Unexpected JSON content: " + rdata)
else:
if self.reactor.cb(self.module.queue_move,
old_index, uuid, new_index-old_index, wait=True):
self.send_response(HTTPStatus.OK)
self.end_headers()
else:
self.send_error(HTTPStatus.CONFLICT, "Queue order has changed")
def delete_print_job(self, uuid):
"""Delete print job with uuid from the queue"""
index, print_job = self.content_manager.uuid_to_print_job(uuid)
if not print_job:
self.send_error(HTTPStatus.NOT_FOUND, "Print job not in queue")
else:
if self.reactor.cb(self.module.queue_delete, index, uuid, wait=True):
self.send_response(HTTPStatus.OK)
self.end_headers()
else:
self.send_error(HTTPStatus.CONFLICT, "Queue order has changed")
def put_action(self, uuid):
"""
Pause, Print or Abort a print job.
This is only called for the current print job.
"""
length = int(self.headers.get("Content-Length", 0))
rdata = self.rfile.read(length)
try:
data = json.loads(rdata)
except ValueError:
self.send_error(HTTPStatus.BAD_REQUEST, "Failed to read JSON")
return
index, print_job = self.content_manager.uuid_to_print_job(uuid)
action = data.get("action")
if not print_job:
self.send_error(HTTPStatus.NOT_FOUND, "Print job not in Queue")
elif index != 0: # This request is only handled for the current print
self.send_error(HTTPStatus.BAD_REQUEST,
"Can only operate on current print job. Got " + str(index))
else:
res = True
if action == "print":
res = self.reactor.cb(self.module.resume_print, uuid, wait=True)
elif action == "pause":
res = self.reactor.cb(self.module.pause_print, uuid, wait=True)
elif action == "abort":
res = self.reactor.cb(self.module.stop_print, uuid, wait=True)
else:
self.send_error(HTTPStatus.BAD_REQUEST, "Unknown action: " + str(action))
if not res:
self.send_error(HTTPStatus.CONFLICT,
"Failed to " + str(action) + ", queue order has changed")
def put_force(self, uuid):
"""
Force a print job that requires configuration change
This is not called until possibly configuration changes are
implemented.
"""
length = int(self.headers.get("Content-Length", 0))
rdata = self.rfile.read(length)
try:
data = json.loads(rdata)
except ValueError:
self.send_error(HTTPStatus.BAD_REQUEST, "Failed to read JSON")
return
index, print_job = self.content_manager.uuid_to_print_job(uuid)
if not print_job:
self.send_error(HTTPStatus.NOT_FOUND, "Print job not in Queue")
elif data.get("force") is not True:
self.send_error(HTTPStatus.BAD_REQUEST,
'Expected {"force": True}. Got: ' + rdata)
else:
self.send_error(HTTPStatus.NOT_IMPLEMENTED)
def send_response(self, code, message=None, size=None):
"""
Accept size as an argument (can be int or str) which sends the
Content-Length header and takes care of logging the size as well.
"""
if size is not None:
self._size = str(size)
srv.BaseHTTPRequestHandler.send_response(self, code, message)
# Keep track of when the last request was handled
# send_error() also calls here
self.server.last_request = time.time()
if self._size is not None:
self.send_header("Content-Length", self._size)
def log_request(self, code="-", size="-"):
"""Add size to logging"""
if self._size is not None:
size = self._size + "B"
srv.BaseHTTPRequestHandler.log_request(self, code, size)
def log_error(self, format, *args):
"""Similar to log_message, but log under loglevel ERROR"""
# Overwrite format string. Default is "code %d, message %s"
if format == "code %d, message %s":
format = "Errorcode %d: %s"
logging.error("<%s> " + format, self.address_string(), *args)
def log_message(self, format, *args):
logging.log(logging.INFO, "<%s> " + format, self.address_string(), *args)
class Server(srv.ThreadingHTTPServer, threading.Thread):
"""Wrapper class to store the module in the server and add threading"""
def __init__(self, server_address, RequestHandler, module):
super().__init__(server_address, RequestHandler)
threading.Thread.__init__(self, name="Server-Thread")
self.module = module
self.last_request = 0 # Time of last request in seconds since epoch
run = srv.HTTPServer.serve_forever
def get_server(module):
return Server((module.ADDRESS, 8008), Handler, module)