Skip to content

Commit

Permalink
migrated to python 3
Browse files Browse the repository at this point in the history
  • Loading branch information
rucinpk committed Nov 24, 2023
1 parent b0ec6e1 commit 7eea2df
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 56 deletions.
1 change: 0 additions & 1 deletion make-release
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#! /usr/bin/env python
from __future__ import print_function
from builtins import str

import subprocess
Expand Down
9 changes: 3 additions & 6 deletions qs/jobs.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
#! /usr/bin/env python

from __future__ import print_function

import heapq
import random
import time

from builtins import hex
from builtins import object
from gevent import event
from past.builtins import basestring
from functools import total_ordering


Expand Down Expand Up @@ -239,6 +236,7 @@ def finishjob(self, jobid, result=None, error=None):
self._mark_finished(j, result=result, error=error, ttl=ttl)

def updatejob(self, jobid, info):
print("updatejob", jobid, info)
j = self.id2job[jobid]
j.info.update(info)

Expand All @@ -249,7 +247,6 @@ def pushjob(self, job):

if job.jobid is None:
job.jobid = job.serial

self.id2job[job.jobid] = job

channel = job.channel
Expand Down Expand Up @@ -278,7 +275,7 @@ def push(self, channel, payload=None, priority=0, jobid=None, timeout=None, ttl=
if jobid is not None:
if jobid in self.id2job and self.id2job[jobid].error != "killed":
return jobid

print("Job", jobid, "not found, pushing new job")
return self.pushjob(
job(
payload=payload,
Expand Down Expand Up @@ -322,5 +319,5 @@ def pop(self, channels):

def prefixmatch(self, prefix):
for jobid in self.id2job:
if isinstance(jobid, basestring) and jobid.startswith(prefix):
if isinstance(jobid, str) and jobid.startswith(prefix):
yield jobid
1 change: 0 additions & 1 deletion qs/misc.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from builtins import object
import sys
import traceback

Expand Down
36 changes: 22 additions & 14 deletions qs/qserve.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,7 @@
#! /usr/bin/env python

from __future__ import print_function

import getopt
import os
import pickle
import sys
from builtins import object
from builtins import str

import gevent
import gevent.pool
Expand All @@ -26,20 +20,27 @@ def __init__(self, **kw):
self.running_jobs = {}

def rpc_qadd(
self, channel, payload=None, priority=0, job_id=None, wait=False, timeout=None, ttl=None
self,
channel,
payload=None,
priority=0,
jobid=None,
wait=False,
timeout=None,
ttl=None,
):
job_id = self.workq.push(
jobid = self.workq.push(
payload=payload,
priority=priority,
channel=channel,
jobid=job_id,
jobid=jobid,
timeout=timeout,
ttl=ttl,
)
if not wait:
return job_id
return jobid

res = self.workq.waitjobs([job_id])[0]
res = self.workq.waitjobs([jobid])[0]
return res._json()

def rpc_qpull(self, channels=None):
Expand All @@ -48,7 +49,7 @@ def rpc_qpull(self, channels=None):

j = self.workq.pop(channels)
self.running_jobs[j.jobid] = j

print("pull", j)
return j._json()

def rpc_qfinish(self, jobid, result=None, error=None, traceback=None):
Expand All @@ -61,14 +62,17 @@ def rpc_qfinish(self, jobid, result=None, error=None, traceback=None):
del self.running_jobs[jobid]

def rpc_qsetinfo(self, jobid, info):
print("setinfo: %s: %r" % (jobid, info))
self.workq.updatejob(jobid, info)

def rpc_qinfo(self, jobid):
print("info: %s" % (jobid,))
if jobid in self.workq.id2job:
return self.workq.id2job[jobid]._json()
return None

def rpc_qwait(self, jobids):
print("wait", jobids)
res = self.workq.waitjobs(jobids)
return [j._json() for j in res]

Expand All @@ -83,6 +87,7 @@ def rpc_qdrop(self, jobids):
self.workq.dropjobs(jobids)

def rpc_qprefixmatch(self, prefix):
print("prefixmatch", prefix)
return list(self.workq.prefixmatch(prefix))

def rpc_getstats(self):
Expand Down Expand Up @@ -113,7 +118,8 @@ def loaddb(self):

if qpath and os.path.exists(qpath):
print("loading", qpath)
self.db = pickle.load(open(qpath))
q_file = open(qpath, "rb")
self.db = pickle.load(q_file)
print("loaded", len(self.db.workq.id2job), "jobs")
else:
self.db = db()
Expand Down Expand Up @@ -241,7 +247,9 @@ def parse_options(argv=None):
usage()
sys.exit(0)

return dict(port=port, interface=interface, data_dir=data_dir, allowed_ips=allowed_ips)
return dict(
port=port, interface=interface, data_dir=data_dir, allowed_ips=allowed_ips
)


def main(argv=None):
Expand Down
3 changes: 3 additions & 0 deletions qs/rpcclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,6 @@ def call(**kwargs):
call.__name__ = name
self.__dict__[name] = call
return call

def get_client(self):
return self._rpc_client
19 changes: 6 additions & 13 deletions qs/rpcserver.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,6 @@
#! /usr/bin/env python

from __future__ import print_function

import traceback
from builtins import object
from builtins import str

from past.builtins import basestring

try:
import simplejson as json
Expand All @@ -23,16 +17,16 @@ def key2str(kwargs):
return r


class Dispatcher(object):
class Dispatcher:
def __call__(self, req):
name, kwargs = req
kwargs = key2str(kwargs)

assert isinstance(name, basestring), "bad name argument"
assert isinstance(name, str), "bad name argument"
cmd_name = str("rpc_" + name)
m = getattr(self, cmd_name, None)
if not m:
raise RuntimeError("no such method: %r" % (name,))
raise RuntimeError("no such method: %r" % (cmd_name,))
return m(**kwargs)


Expand All @@ -57,7 +51,7 @@ def __repr__(self):
return "<Client %s>" % self.client_id


class Server(object):
class Server:
def __init__(self, port=8080, host="", get_request_handler=None, secret=None, is_allowed=None):
self.port = port
self.host = host
Expand Down Expand Up @@ -89,14 +83,13 @@ def handle_client(self, sock, addr):
self.log("+DENY %r" % (addr,))
sock.close()
return

sock_file = None
current = getcurrent()
try:
self.client_count += 1
clientid = "<%s %s:%s>" % (self.client_count, addr[0], addr[1])
current.clientid = clientid
sock_file = sock.makefile()
sock_file = sock.makefile("rw")
lineq = queue.Queue()

def readlines():
Expand All @@ -115,7 +108,7 @@ def readlines():
current.link(lambda _: readgr.kill())
handle_request = self.get_request_handler(client=(sock, addr), clientid=clientid)

# self.log("+connect: %s" % (clientid, ))
self.log("+connect: %s" % (clientid, ))

while 1:
current.status = "idle"
Expand Down
42 changes: 21 additions & 21 deletions qs/slave.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
#! /usr/bin/env python

from __future__ import division
from __future__ import print_function

import os
import sys
import time
Expand Down Expand Up @@ -31,7 +28,7 @@ def short_err_msg():
return "".join(msg)


class Worker(object):
class Worker:
def __init__(self, proxy):
self.proxy = proxy

Expand All @@ -42,10 +39,12 @@ def dispatch(self, job):
self.jobid_prefix = None

method = job["channel"]

method_name = f"rpc_{method}"

m = getattr(self, "rpc_" + method, None)
m = getattr(self, method_name, None)
if m is None:
raise RuntimeError("no such method %r" % (method,))
raise RuntimeError("no such method %r" % (method_name,))

kwargs = job.get("payload") or dict()
tmp = {}
Expand All @@ -54,20 +53,19 @@ def dispatch(self, job):
tmp[str(k)] = v
else:
tmp[k] = v

return m(**tmp)

def q_set_info(self, info):
return self.proxy.q_set_info(jobid=self.jobid, info=info)

def q_add(
def qadd(
self, channel, payload=None, jobid=None, prefix=None, wait=False, timeout=None, ttl=None
):
"""call q_add on proxy with the same priority as the current job"""
if jobid is None and prefix is not None:
jobid = "%s::%s" % (prefix, channel)

return self.proxy.q_add(
return self.proxy.qadd(
channel=channel,
payload=payload,
priority=self.priority,
Expand All @@ -77,8 +75,8 @@ def q_add(
ttl=ttl,
)

def q_add_w(self, channel, payload=None, jobid=None, timeout=None):
r = self.proxy.q_add(
def qaddw(self, channel, payload=None, jobid=None, timeout=None):
r = self.proxy.qadd(
channel=channel,
payload=payload,
priority=self.priority,
Expand All @@ -98,7 +96,6 @@ def main(
):
if port is None:
port = 14311

channels = []
skip_channels = []

Expand Down Expand Up @@ -161,12 +158,13 @@ def check_parent():
def check_parent():
pass

def handle_one_job(qs):
def handle_one_job(server_proxy: ServerProxy):
print("SLAVE HANDLING", server_proxy)
sleeptime = 0.5

while 1:
try:
job = qs.qpull(channels=channels)
job = server_proxy.qpull(channels=channels)
break
except Exception as err:
check_parent()
Expand All @@ -177,24 +175,26 @@ def handle_one_job(qs):
sleeptime *= 2

check_parent()
# print "got job:", job
print("got job:", job)
try:
result = WorkHandler(qs).dispatch(job)
print(server_proxy)
result = WorkHandler(server_proxy).dispatch(job)
except Exception as err:
print("error:", err)
try:
qs.qfinish(jobid=job["jobid"], error=short_err_msg())
server_proxy.qfinish(jobid=job["jobid"], error=short_err_msg())
traceback.print_exc()
except:
pass
return

try:
qs.qfinish(jobid=job["jobid"], result=result)
server_proxy.qfinish(jobid=job["jobid"], result=result)
except:
pass

def start_worker():
print("Server proxy form start_worker", host, port)
qs = ServerProxy(host=host, port=port)
while 1:
handle_one_job(qs)
Expand All @@ -203,7 +203,6 @@ def start_worker():

def run_with_threads():
import threading

for i in range(numthreads):
t = threading.Thread(target=start_worker)
t.start()
Expand All @@ -216,7 +215,7 @@ def run_with_threads():

def run_with_procs():
children = set()

print("Proc run")
while 1:
while len(children) < num_procs:
try:
Expand All @@ -228,6 +227,7 @@ def run_with_procs():

if pid == 0:
try:
print("Server Proxy", host, port)
qs = ServerProxy(host=host, port=port)
handle_one_job(qs)
finally:
Expand Down Expand Up @@ -269,7 +269,7 @@ def run_with_gevent():

if __name__ == "__main__":

class Commands(object):
class Commands:
def rpc_divide(self, a, b):
print("rpc_divide", (a, b))
return old_div(a, b)
Expand Down

0 comments on commit 7eea2df

Please sign in to comment.