Skip to content

Commit

Permalink
Merge pull request #97 from CybercentreCanada/persistent-service-update
Browse files Browse the repository at this point in the history
missing imports for User(Settings)
  • Loading branch information
cccs-rs authored Sep 7, 2021
2 parents e34e174 + 89d2b0d commit 5137bcc
Showing 1 changed file with 23 additions and 19 deletions.
42 changes: 23 additions & 19 deletions assemblyline_v4_service/updater/updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
from assemblyline.remote.datatypes.hash import Hash
from assemblyline.common.security import get_random_password, get_password_hash
from assemblyline.remote.datatypes.lock import Lock
from assemblyline.odm.models.user import User
from assemblyline.odm.models.user_settings import UserSettings


if typing.TYPE_CHECKING:
Expand Down Expand Up @@ -63,20 +65,20 @@ def temporary_api_key(ds: AssemblylineDatastore, user_name: str, permissions=('R


class ServiceUpdater(ThreadedCoreBase):
def __init__(self, logger: logging.Logger=None,
shutdown_timeout: float=None, config: Config=None,
datastore: AssemblylineDatastore=None,
redis:RedisType=None, redis_persist:RedisType=None):
super().__init__(f'assemblyline.{SERVICE_NAME}_updater', logger=logger, shutdown_timeout=shutdown_timeout,
config=config, datastore=datastore, redis=redis,
def __init__(self, logger: logging.Logger = None,
shutdown_timeout: float = None, config: Config = None,
datastore: AssemblylineDatastore = None,
redis: RedisType = None, redis_persist: RedisType = None):
super().__init__(f'assemblyline.{SERVICE_NAME}_updater', logger=logger, shutdown_timeout=shutdown_timeout,
config=config, datastore=datastore, redis=redis,
redis_persist=redis_persist)

self.update_data_hash = Hash(f'service-updates-{SERVICE_NAME}', redis_persist)
self._update_dir = None
self._update_tar = None
self._service: Optional[Service] = None

# A event flag that gets set when an update should be run for
# A event flag that gets set when an update should be run for
# reasons other than it being the regular interval (eg, change in signatures)
self.source_update_flag = threading.Event()
self.local_update_flag = threading.Event()
Expand All @@ -98,7 +100,7 @@ def update_directory(self):
return self._update_dir

def update_tar(self):
return self._update_tar
return self._update_tar

def get_active_config_hash(self) -> int:
return self.update_data_hash.get(CONFIG_HASH_KEY) or 0
Expand Down Expand Up @@ -144,30 +146,32 @@ def try_run(self):

def _run_internal_http(self):
"""run backend insecure http server
A small inprocess server to syncronize info between gunicorn and the updater daemon.
This HTTP server is not safe for exposing externally, but fine for IPC.
"""
them = self

class Handler(BaseHTTPRequestHandler):
def do_GET(self):
self.send_response(200)
self.send_header("Content-type", "application/json")
self.end_headers()
self.end_headers()
self.wfile.write(json.dumps(them.status()).encode())

def log_error(self, format: str, *args: Any):
them.log.info(format%args)
them.log.info(format % args)

def log_message(self, format: str, *args: Any):
them.log.debug(format%args)
them.log.debug(format % args)

self._internal_server = ThreadingHTTPServer(('0.0.0.0', 9999), Handler)
self._internal_server.serve_forever()

def _run_http(self):
# Start a server for our http interface in a separate process
proc = subprocess.Popen(["gunicorn", "assemblyline_v4_service.updater.app:app", "--config=python:assemblyline_v4_service.updater.gunicorn_config"])
proc = subprocess.Popen(["gunicorn", "assemblyline_v4_service.updater.app:app",
"--config=python:assemblyline_v4_service.updater.gunicorn_config"])
while self.sleep(1):
if proc.poll() is not None:
break
Expand All @@ -176,7 +180,7 @@ def _run_http(self):
if proc.poll() is not None:
proc.terminate()
proc.wait()

@staticmethod
def config_hash(service: Service) -> int:
if service is None:
Expand All @@ -198,12 +202,12 @@ def _sync_settings(self):
def do_local_update(self) -> None:
raise NotImplementedError()

def do_source_update(self, service:Service) -> None:
def do_source_update(self, service: Service) -> None:
raise NotImplementedError()

def _run_source_updates(self):
# Wait until basic data is loaded
while self._service is None and self.sleep(1):
while self._service is None and self.sleep(1):
pass
if not self._service:
return
Expand Down Expand Up @@ -242,8 +246,8 @@ def _run_source_updates(self):
self.source_update_flag.set()
self.sleep(60)
continue
def serve_directory(self, new_directory:str):

def serve_directory(self, new_directory: str):
self.log.info("Update finished with new data.")
new_tar = ''
try:
Expand All @@ -266,7 +270,7 @@ def serve_directory(self, new_directory:str):

def _run_local_updates(self):
# Wait until basic data is loaded
while self._service is None and self.sleep(1):
while self._service is None and self.sleep(1):
pass
if not self._service:
return
Expand Down

0 comments on commit 5137bcc

Please sign in to comment.