From 4710d9ecd7daa4bc410ad60a3deb10361e161d4c Mon Sep 17 00:00:00 2001 From: terrinens <132042125+terrinens@users.noreply.github.com> Date: Sun, 15 Sep 2024 06:30:33 +0900 Subject: [PATCH] resolve : #4 #5 (#6) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit resolve : #5 fix : 버전 관리 수정 fix : 작업 목록 생성 병목 현상 수정 Signed-off-by: DongChuelKim --- app.py | 20 +++--- manager/file_manager.py | 80 ++++++++++++++------- manager/runner_manager.py | 144 ++++++++++++++++++++++++++++++-------- 3 files changed, 179 insertions(+), 65 deletions(-) diff --git a/app.py b/app.py index 72f68f9..3f62cb9 100644 --- a/app.py +++ b/app.py @@ -8,7 +8,7 @@ from logger.log import create_logger from manager.file_manager import file_manager -from manager.runner_manager import Manager, ready +from manager.runner_manager import Manager, is_ready, task_count from manager.service_manager import registration app = Flask(__name__) @@ -41,11 +41,11 @@ def jar_upload(): if result: response = jsonify({ - 'message': '업로드가 완료되었습니다. 작업이 진행중 입니다.', + 'message': 'Upload has been completed. Work is in progress.', 'polling': f'/tasking?uuid={uuid}' }), 202 else: - response = jsonify({'message': '업로드에 실패했습니다.'}), 400 + response = jsonify({'message': 'Upload failed.'}), 400 return response @@ -63,19 +63,19 @@ def tasking(): if is_tasking: return jsonify({ - 'message': f'작업은 진행중입니다. 대기번호 : {waiting}', + 'message': f'Work is in progress. waiting number : {waiting}', 'polling': f'{request.path}?uuid={uuid}' }), 202 else: - return jsonify({'message': '해당 작업은 완료되었습니다.'}), 200 + return jsonify({'message': 'That work has been completed.'}), 200 @app.route('/ready', methods=['GET']) def ready(): - if ready: - return jsonify({'message': '대기중인 작업이 없습니다.'}), 200 + if is_ready(): + return jsonify({'message': 'There are no pending tasks.'}), 200 else: - return jsonify({'message': f'대기중인 작업의 수 {len(manager.task_list)}'}), 202 + return jsonify({'message': f'Number of pending tasks {task_count()}'}), 202 def add_parse(parse: argparse.ArgumentParser): @@ -107,7 +107,7 @@ def add_parse(parse: argparse.ArgumentParser): if debug: log = create_logger('UEC_log', 'uec.log', console_level=debug) - manager = Manager(target_dir=save_dir, server_port=backend_port, debug=debug).start() + manager = Manager(target_dir=save_dir, server_port=backend_port, debug=debug, maintenance_count=10).start() log.info(f"The server has started. Port : {port}") - app.run(port=port, debug=debug) + app.run(host='0.0.0.0', port=port, debug=debug) diff --git a/manager/file_manager.py b/manager/file_manager.py index 789fb7a..688afb0 100644 --- a/manager/file_manager.py +++ b/manager/file_manager.py @@ -1,7 +1,9 @@ +import hashlib import os import re +import time +import uuid from os.path import join as opj -from uuid import uuid4 from werkzeug.datastructures import FileStorage @@ -10,56 +12,82 @@ log = create_logger('FM_Log', 'uec.log') -def custom_sort(string): +def version_sort(string): match = re.search(r'v(\d+)', string) return int(match.group(1)) if match else float('-inf') -def _version_manager(save_dir, file: FileStorage): +def __version_manager(save_dir, file: FileStorage): save_path = opj(save_dir, file.filename) - true_name = file.filename[:file.filename.rfind('.')] file_list = os.listdir(save_dir) last_same_files = list(filter(lambda x: true_name in x, file_list)) - last_same_files.sort(key=custom_sort, reverse=True) + last_same_files.sort(key=version_sort, reverse=True) - if os.path.exists(save_path) and os.path.isfile(save_path): + if last_same_files: log.debug('An existing file exists. Change the file name.') + log.debug('The most recent version of the file was found. Change to the corresponding version or higher.') - if last_same_files: - log.debug('The most recent version of the file was found. Change to the corresponding version or higher.') - - last_same_file = last_same_files[0] - - if match := re.search(r'v(\d+)', last_same_file): - next_number = int(match[1]) + 1 - new_name = true_name + f' v{next_number}' + '.' + file.filename.split('.')[-1] - - else: - new_name = true_name + ' v1' + '.' + file.filename.split('.')[-1] - - log.info(f'The version change has been completed. file name : {new_name}') - return opj(save_dir, new_name) - - elif len(last_same_files) > 0 and os.path.isfile(save_path): last_same_file = last_same_files[0] if match := re.search(r'v(\d+)', last_same_file): + next_number = int(match[1]) + 1 - return true_name + f' v{next_number}' + '.' + file.filename.split('.')[-1] + new_name = true_name + f' v{next_number}' + '.' + file.filename.split('.')[-1] + else: - return save_path + new_name = true_name + ' v2' + '.' + file.filename.split('.')[-1] + log.info(f'The version change has been completed. new version name : {new_name}') + return opj(save_dir, new_name) else: return save_path +def __gen_random_uuid(): + timestamp = str(int(time.time())) + random_hase = hashlib.sha224(os.urandom(4)).hexdigest() + name = f"{timestamp}{random_hase}" + return uuid.uuid5(uuid.NAMESPACE_DNS, name) + + def file_manager(save_dir, jar: FileStorage): - save_path = _version_manager(save_dir, jar) + save_path = __version_manager(save_dir, jar) try: jar.save(str(save_path)) - return uuid4(), True + return __gen_random_uuid(), True except Exception as e: log.error(f'An error occurred while saving the file. : {e}') return None, False + + +def matching_files(target_dir, extension, ignores=None): + files = sorted(os.listdir(target_dir), key=version_sort, reverse=True) + match_files = [] + + for file in files: + is_file = os.path.isfile(os.path.join(target_dir, file)) + is_eq_extension = file.endswith(extension) + is_ignore = (ignores is not None) and file in ignores + + if is_file and is_eq_extension and not is_ignore: + match_files.append(file) + + return sorted(match_files, key=version_sort) + + +def old_file_remove(target_dir, extension, maintenance_count, ignores=None): + matches = matching_files(target_dir, extension, ignores) + if len(matches) > maintenance_count: + matches.sort(key=version_sort) + + remove_list = matches[:len(matches) - maintenance_count] + + for path in remove_list: + try: + os.remove(os.path.join(target_dir, path)) + except PermissionError: + pass + + log.info(f'The following files were deleted: {remove_list}') diff --git a/manager/runner_manager.py b/manager/runner_manager.py index 9129971..21711b2 100644 --- a/manager/runner_manager.py +++ b/manager/runner_manager.py @@ -1,4 +1,6 @@ +import asyncio import logging +import math import os import subprocess import sys @@ -11,12 +13,25 @@ from watchdog.observers import Observer from logger.log import create_logger +from manager.file_manager import matching_files, old_file_remove -ob_log = create_logger('Observer_Log', 'rm.log') -log = create_logger('RM_Log', 'rm.log') +ob_log = create_logger('Observer_Log', 'runner_manager.log') +log = create_logger('RM_Log', 'runner_manager.log') ready = True +_loop_thread = None +_task_list = [] +_managed_file_count = 0 + + +def is_ready(): + return ready and _loop_thread is None + + +def task_count(): + return len(_task_list) + def _wait_for_file(file_path, timeout=10): start_time = time.time() @@ -37,15 +52,24 @@ def _require_else(obj, default_value): class Manager(FileSystemEventHandler): - def __init__(self, target_dir, server_port, debug): + def __init__(self, target_dir, server_port, maintenance_count=math.inf, debug=False): super().__init__() + self.target_dir = _require_else(target_dir, os.getcwd()) self.server_port = _require_else(server_port, 8080) + self.observer = Observer() self.observer.schedule(self, self.target_dir, recursive=False) + self.queue = Queue() + # 해당 UUID는 어디까지나 새로운 작업을 queue에 전달하기 위해 임의적으로 받아들이는 변수입니다. + # queue에서 작업 번호를 부여하기 위한 목적을 제외하고선 사용하지 마십시오. + # 해당 uuid를 사용하게 되면 가장 마지막에 할당된 작업 번호로 고정될 확률이 높습니다. self.uuid = None - self.task_list = [] + + self.maintenance_count = maintenance_count + global _managed_file_count + _managed_file_count = len(matching_files(self.target_dir, '.jar')) if debug: global ob_log @@ -59,15 +83,29 @@ def on_created(self, event): is_extension_jar = event.src_path.endswith('.jar') if not is_dir and is_target_dir and is_extension_jar: - global ready - if not self.queue.empty() and ready: - ready = False - - log.debug('A new file has been detected. Try updating to the new server.') + ob_log.debug('A new file has been detected. A task has been added to the queue.') + ob_log.debug(f'task number : {self.uuid}') _wait_for_file(event.src_path) - self.task_list.append(self.uuid) - self.queue.put((self.uuid, _start_server(self, self.server_port, event))) + self.__add_queue((self.uuid, event)) + global _managed_file_count + _managed_file_count += 1 + + def on_deleted(self, event): + global _managed_file_count + if (not event.is_directory + and event.src_path.endswith('.jar') + and _managed_file_count > 0 + ): _managed_file_count -= 1 + + def __file_maintenance(self): + global _managed_file_count + if _managed_file_count > self.maintenance_count: + ob_log.info( + f'Delete older versions when the file exceeds its maintenance size. ' + f'Number of files currently being managed : {_managed_file_count}' + ) + old_file_remove(self.target_dir, '.jar', self.maintenance_count, _task_list) def __start_observer(self): try: @@ -78,24 +116,74 @@ def __start_observer(self): log.error(f"Directory : {self.target_dir} Location could not be found. Exit the program.") sys.exit(1) + @staticmethod + def __get_task_by_uuid(uuid): + global _task_list + for task in _task_list: + if task[0] == uuid: + return task + return None + def is_tasking(self, uuid) -> (bool, int): - try: - tasking = uuid in self.task_list - waiting = self.task_list.index(uuid) - except ValueError: - tasking = False - waiting = 0 + global _task_list + task = self.__get_task_by_uuid(uuid) + + if task is None: + return False, 0 + + tasking = task is not None + waiting = _task_list.index(task) return tasking, waiting def complete_tasking(self): - if self.uuid in self.task_list: - self.task_list.remove(self.uuid) + try: + global _task_list + _task_list.remove(self.__get_task_by_uuid(self.uuid)) + except ValueError: + pass def start(self): threading.Thread(target=self.__start_observer, daemon=True).start() return self + def __add_queue(self, obj): + global _task_list + _, event = obj + _task_list.append((self.uuid, event.src_path)) + + global ready + if ready: ready = not ready + + self.queue.put(obj) + asyncio.run(self.__start_processing()) + + async def __start_processing(self): + global _loop_thread + if _loop_thread is None: + _loop_thread = threading.Thread(target=self.__run_event_loop, daemon=True) + _loop_thread.start() + + def __run_event_loop(self): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop.run_until_complete(self.__process_queue()) + loop.close() + + async def __process_queue(self): + while True: + uuid, event = self.queue.get() + if event is None: + global ready + ready = True + + global _loop_thread + _loop_thread = None + break + + await _start_server(self, uuid, event) + self.__file_maintenance() + def _get_process_from_port(port): log.debug('Search for process from port...') @@ -117,7 +205,6 @@ def _get_files_used_by_pid(process): try: log.debug(f'PID : {pid} the list of files used by...') file = None - process = psutil.Process(pid=pid) with process.oneshot(): file = process.open_files() except psutil.NoSuchProcess as e: @@ -174,23 +261,22 @@ def _popen_observer(jar_file): return process, None, False -async def _rollback_server(before_jar): +def _rollback_server(before_jar): try: log.info('Roll back to the previous server.') process, error_message, has_error = _popen_observer(before_jar[0]) if has_error: - print(f'An attempt was made to run the previous JAR {before_jar[0]}, ' - f'but an error occurred. The server failed to start.') + log.error(f'An attempt was made to run the previous JAR {before_jar[0]}, ' + f'but an error occurred. The server failed to start.') except FileNotFoundError: - print(f'Tried to run previous JAR: {before_jar[0]}, but could not find the file.') + log.error(f'Tried to run previous JAR: {before_jar[0]}, but could not find the file.') -def _start_server(manager: Manager, server_port, event): - uuid = manager.uuid +async def _start_server(manager: Manager, uuid, event): jar = event.src_path - before_jar = _terminate_server(server_port) + before_jar = _terminate_server(manager.server_port) jar_error = False try: @@ -209,8 +295,8 @@ def _start_server(manager: Manager, server_port, event): if before_jar: _rollback_server(before_jar[0]) else: - log.info('The rollback attempt failed because the previously running process did not exist. ' - 'There is no running server.') + log.warning('The rollback attempt failed because the previously running process did not exist. ' + 'There is no running server.') manager.complete_tasking() if manager.queue.empty():