Skip to content

Commit

Permalink
ref: some work related to #14
Browse files Browse the repository at this point in the history
  • Loading branch information
paulmueller committed Jan 20, 2022
1 parent 80865e5 commit 4ab29f0
Show file tree
Hide file tree
Showing 12 changed files with 154 additions and 84 deletions.
1 change: 1 addition & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
0.9.2
- ci: turn tests into a submodule
- ref: add pyqtSlot decorators where misssing
- ref: cleanup threads to isolate #14
0.9.1
- fix: don't print upload job when upload is completed in CLI
- fix: daemonize background thread in CLI dcoraid-upload-task
Expand Down
18 changes: 11 additions & 7 deletions dcoraid/download/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
]


class DownloadJob(object):
class DownloadJob:
def __init__(self, api, resource_id, download_path):
"""Wrapper for resource downloads
Expand Down Expand Up @@ -78,16 +78,20 @@ def __getstate__(self):
}
return dj_state

@property
def file_size(self):
return self.get_resource_dict()["size"]

@staticmethod
def from_download_job_state(dj_state, api):
"""Reinstantiate a job from an `DownloadJob.__getstate__` dict
"""
return DownloadJob(api=api, **dj_state)

@property
def file_size(self):
return self.get_resource_dict()["size"]

@property
def id(self):
return self.resource_id

@functools.lru_cache(maxsize=100)
def get_resource_dict(self):
"""Return resource dictionary"""
Expand Down Expand Up @@ -223,7 +227,7 @@ def task_download_resource(self):
"""Start the download
The progress of the download is monitored and written
to attributes. The current status can be retrieved
to the attributes. The current status can be retrieved
via :func:`DownloadJob.get_status`.
"""
if self.state in ["init", "wait-disk"]:
Expand All @@ -247,7 +251,7 @@ def task_download_resource(self):
if shutil.disk_usage(self.path_temp.parent).free < size:
# there is not enough space on disk for the download
self.set_state("wait-disk")
time.sleep(.2)
time.sleep(1)
else:
# proceed with download
# reset everything
Expand Down
8 changes: 8 additions & 0 deletions dcoraid/download/queue.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import pathlib
import time
import warnings

from ..worker import Daemon
Expand Down Expand Up @@ -97,6 +98,13 @@ def __init__(self, api, path_persistent_job_list=None):
def __contains__(self, download_job):
return download_job in self.jobs

def __del__(self):
self.daemon_download.shutdown_flag.set()
self.daemon_verify.shutdown_flag.set()
time.sleep(.2)
self.daemon_download.terminate()
self.daemon_verify.terminate()

def __getitem__(self, index):
return self.jobs[index]

Expand Down
20 changes: 8 additions & 12 deletions dcoraid/gui/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,6 @@ def close(self):
self.timer.stop()
if self.panel_upload.widget_jobs.timer is not None:
self.panel_upload.widget_jobs.timer.stop()
self.panel_upload.jobs.daemon_compress.join()
self.panel_upload.jobs.daemon_upload.join()
self.panel_upload.jobs.daemon_verify.join()
self.panel_upload.jobs.daemon_compress.terminate()
self.panel_upload.jobs.daemon_upload.terminate()
self.panel_upload.jobs.daemon_verify.terminate()
QtWidgets.QApplication.processEvents(QtCore.QEventLoop.AllEvents,
3000)
super(DCORAid, self).close()

@QtCore.pyqtSlot()
Expand Down Expand Up @@ -258,10 +250,14 @@ def refresh_login_status(self):
text = "{}".format(fullname)
tip = "user '{}'".format(name)
icon = "user-lock"
self.status_widget.set_status(text=text,
tooltip=tip,
icon=icon,
server=api.server)
try:
self.status_widget.set_status(text=text,
tooltip=tip,
icon=icon,
server=api.server)
except BaseException:
# Probably application killed
pass


class StatusWidget(QtWidgets.QWidget):
Expand Down
6 changes: 5 additions & 1 deletion dcoraid/upload/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ def from_upload_job_state(uj_state, api, cache_dir=None):
"""
return UploadJob(api=api, cache_dir=cache_dir, **uj_state)

@property
def id(self):
return self.dataset_id

def cleanup(self):
"""cleanup temporary files in the user's cache directory"""
shutil.rmtree(self.cache_dir, ignore_errors=True)
Expand Down Expand Up @@ -362,7 +366,7 @@ def task_compress_resources(self):
# As long as there is less space free than the
# input file size, we stall here.
self.set_state("wait-disk")
time.sleep(0.2)
time.sleep(1)
self.set_state("compress")
compress(path_out=path_out, path_in=path)
# replace current path_out with compressed path
Expand Down
11 changes: 11 additions & 0 deletions dcoraid/upload/queue.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import pathlib
import time
import warnings

from ..api import APINotFoundError
Expand Down Expand Up @@ -143,6 +144,15 @@ def __init__(self, api, path_persistent_job_list=None, cache_dir=None):
def __contains__(self, upload_job):
return upload_job in self.jobs

def __del__(self):
self.daemon_upload.shutdown_flag.set()
self.daemon_verify.shutdown_flag.set()
self.daemon_compress.shutdown_flag.set()
time.sleep(.2)
self.daemon_upload.terminate()
self.daemon_verify.terminate()
self.daemon_compress.terminate()

def __getitem__(self, index):
return self.jobs[index]

Expand Down Expand Up @@ -180,6 +190,7 @@ def abort_job(self, dataset_id):
elif job.state == "compress":
job.set_state("abort")
self.daemon_compress.terminate()
self.daemon_compress.shutdown_flag.set()
self.daemon_compress = CompressDaemon(self.jobs)

def add_job(self, upload_job):
Expand Down
57 changes: 31 additions & 26 deletions dcoraid/worker/daemon.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,35 @@
import atexit
import logging
import threading
import traceback

import time

from ..common import ConnectionTimeoutErrors

from .kthread import KThread
from .kthread import KThread, KThreadExit


class Daemon(KThread):
def __init__(self, queue, job_trigger_state, job_function_name):
"""Daemon base class for running uploads/downloads in the background"""
self.queue = queue
self.state = "running"
self.job_trigger_state = job_trigger_state
self.job_function_name = job_function_name
super(Daemon, self).__init__()
self.daemon = True # We don't have to worry about ending this thread
self.start()

def join(self, *args, **kwargs):
"""Join thread by breaking the while loop"""
self.state = "exiting"
super(Daemon, self).join(*args, **kwargs)
assert self.state == "exited"
# The shutdown_flag is a threading.Event object that
# indicates whether the thread should be terminated.
self.shutdown_flag = threading.Event()

atexit.register(self.shutdown_flag.set)

self.start()

def run(self):
while True:
if self.state == "exiting":
self.state = "exited"
break
elif self.state != "running":
# Don't do anything
time.sleep(.1)
continue
else:
try:
while not self.shutdown_flag.is_set():
# Get the first job that is in the trigger state
for job in self.queue:
if job.state == self.job_trigger_state:
Expand All @@ -56,16 +51,26 @@ def run(self):
job.traceback = traceback.format_exc(limit=1) \
+ "\nDCOR-Aid will retry in 10s!"
logger.error(
f"(dataset {job.dataset_id}) {traceback.format_exc()}")
f"(dataset {job.id}) {traceback.format_exc()}")
time.sleep(10)
job.set_state(self.job_trigger_state)
except SystemExit:
except KThreadExit:
job.set_state("abort")
logger.error(f"(dataset {job.dataset_id}) Aborted!")
logger.error(f"{job.__class__.__name__} {job.id} Aborted!")
except SystemExit:
# nothing to do
self.terminate()
except BaseException:
# Set job to error state and let the user figure
# out what to do next.
job.set_state("error")
job.traceback = traceback.format_exc()
logger.error(
f"(dataset {job.dataset_id}) {traceback.format_exc()}")
if not self.shutdown_flag.is_set():
# Only log if the thread is supposed to be running.
# Set job to error state and let the user figure
# out what to do next.
job.set_state("error")
job.traceback = traceback.format_exc()
logger.error(
f"(dataset {job.id}) {traceback.format_exc()}")
except KThreadExit:
# killed by KThread
pass
except SystemExit:
self.terminate()
29 changes: 19 additions & 10 deletions dcoraid/worker/kthread.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
"""https://github.com/munshigroup/kthread"""
import atexit
import ctypes
import inspect
import threading
import time
import threading


class KThreadExit(BaseException):
pass


def _async_raise(tid, exctype):
"""Raises the exception, causing the thread to exit"""
if not inspect.isclass(exctype):
raise TypeError("Only types can be raised (not instances)")
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
ctypes.c_long(tid), ctypes.py_object(exctype))
if res == 0:
raise ValueError("Invalid thread ID")
pass # ignore
elif res != 1:
# """if it returns a number greater than one, you're in trouble,
# and you should call it again with exc=NULL to revert the effect"""
Expand All @@ -23,16 +25,20 @@ def _async_raise(tid, exctype):
class KThread(threading.Thread):
"""Killable thread. See terminate() for details."""

def __init__(self, *args, **kwargs):
super(KThread, self).__init__(*args, **kwargs)
atexit.register(self.terminate)

def _get_my_tid(self):
"""Determines the instance's thread ID"""
if not self.is_alive():
raise threading.ThreadError("Thread is not active")
return None # Thread is not active

# do we have it cached?
if hasattr(self, "_thread_id"):
return self._thread_id

# no, look for it in the _active dict
# look for it in the _active dict
for tid, tobj in threading._active.items():
if tobj is self:
self._thread_id = tid
Expand All @@ -42,14 +48,17 @@ def _get_my_tid(self):

def raise_exc(self, exctype):
"""raises the given exception type in the context of this thread"""
_async_raise(self._get_my_tid(), exctype)
thread_id = self._get_my_tid()
if thread_id:
_async_raise(thread_id, exctype)

def terminate(self):
"""raises SystemExit in the context of the given thread, which should
cause the thread to exit silently (unless caught)"""
# WARNING: using terminate() can introduce instability in your
# programs. It is worth noting that terminate() will NOT work if the
# thread in question is blocked by a syscall (accept(), recv(), etc.).
atexit.unregister(self.terminate)
while self.is_alive():
self.raise_exc(SystemExit)
time.sleep(0.01)
self.raise_exc(KThreadExit)
time.sleep(.05)
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def pytest_configure(config):
settings.value("user scenario", "dcor-dev")
settings.setValue("auth/server", "dcor-dev.mpl.mpg.de")
settings.setValue("auth/api key", common.get_api_key())
settings.setValue("debug/without timers", 1)
settings.setValue("debug/without timers", "1")
settings.sync()
# cleanup
cleanup_dcoraid_tasks()
Expand Down
16 changes: 10 additions & 6 deletions tests/test_download_queue.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import pathlib
import tempfile
import time

import pytest

Expand Down Expand Up @@ -45,8 +46,9 @@ def test_queue_remove_job():
joblist = DownloadQueue(api=api,
path_persistent_job_list=pdjl_path)
# disable all daemons, so no downloading happens
joblist.daemon_download.join()
joblist.daemon_verify.join()
joblist.daemon_download.shutdown_flag.set()
joblist.daemon_verify.shutdown_flag.set()
time.sleep(.2)
resource_id = ds_dict["resources"][0]["id"]
dj = joblist.new_job(resource_id=resource_id,
download_path=td)
Expand Down Expand Up @@ -109,8 +111,9 @@ def test_persistent_download_joblist_job_added_in_queue():
pdjl = PersistentDownloadJobList(pdjl_path)

uq = DownloadQueue(api=api, path_persistent_job_list=pdjl_path)
uq.daemon_download.join()
uq.daemon_verify.join()
uq.daemon_download.shutdown_flag.set()
uq.daemon_verify.shutdown_flag.set()
time.sleep(.2)

assert pdjl.num_queued == 0
uq.add_job(dj)
Expand Down Expand Up @@ -145,8 +148,9 @@ def test_persistent_download_joblist_skip_queued_resources():
pdjl.immortalize_job(dj)

dq = DownloadQueue(api=api, path_persistent_job_list=pdjl_path)
dq.daemon_download.join()
dq.daemon_verify.join()
dq.daemon_download.shutdown_flag.set()
dq.daemon_verify.shutdown_flag.set()
time.sleep(.2)

assert len(dq) == 1
assert dq.jobs_eternal.num_queued == 1
Expand Down
Loading

0 comments on commit 4ab29f0

Please sign in to comment.