From 09aa3eb4eac361f5377d24c3f7b875a8dae580fd Mon Sep 17 00:00:00 2001 From: tianxiu2b2t Date: Fri, 13 Dec 2024 23:29:28 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=94=AF=E6=8C=81=20webdav=20=E6=8F=90?= =?UTF-8?q?=E4=BE=9B=E7=9B=B4=E9=93=BE=E8=AE=BF=E9=97=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- core/cluster.py | 19 ++-- core/config.py | 2 +- core/dashboard.py | 32 ------- core/storages/__init__.py | 182 +++++++++++++++++++++----------------- core/utils.py | 29 ++++++ core/web.py | 42 ++++++--- 6 files changed, 172 insertions(+), 134 deletions(-) diff --git a/core/cluster.py b/core/cluster.py index a3e1c97..1c137e9 100644 --- a/core/cluster.py +++ b/core/cluster.py @@ -14,6 +14,7 @@ import pyzstd as zstd import aiohttp from tqdm import tqdm + from . import web from . import utils, logger, config, scheduler, units, storages, i18n from .storages import File as SFile @@ -21,6 +22,7 @@ import urllib.parse as urlparse from . import database as db from .config import USER_AGENT, API_VERSION +from .utils import WrapperTQDM from cryptography import x509 from cryptography.hazmat.backends import default_backend @@ -123,19 +125,19 @@ async def get_missing_files(self, files: set[File]) -> set[File | Any]: function = self._check_exists await self.available() - with tqdm( + with WrapperTQDM(tqdm( total=len(self.available_storages) * 256, desc="List Files", unit="dir", unit_scale=True - ) as pbar: + )) as pbar: await asyncio.gather(*(self.get_storage_filelist(storage, pbar) for storage in self.available_storages)) - with tqdm( + with WrapperTQDM(tqdm( total=len(files) * len(self.available_storages), desc="Checking files", unit="file", unit_scale=True, - ) as pbar: + )) as pbar: missing_files = set() waiting_files: asyncio.Queue[File] = asyncio.Queue() @@ -145,14 +147,14 @@ async def get_missing_files(self, files: set[File]) -> set[File | Any]: await asyncio.gather(*(self._get_missing_file_storage(function, missing_files, waiting_files, storage, pbar) for storage in self.available_storages)) return missing_files - async def get_storage_filelist(self, storage: storages.iStorage, pbar: tqdm): + async def get_storage_filelist(self, storage: storages.iStorage, pbar: WrapperTQDM): result = await storage.list_files(pbar) for files in result.values(): for file in files: self.cache_filelist[storage][file.hash] = file return result - async def _get_missing_file_storage(self, function: Callable[..., Coroutine[Any, Any, bool]], missing_files: set[File], files: asyncio.Queue[File], storage: storages.iStorage, pbar: tqdm): + async def _get_missing_file_storage(self, function: Callable[..., Coroutine[Any, Any, bool]], missing_files: set[File], files: asyncio.Queue[File], storage: storages.iStorage, pbar: WrapperTQDM): while not files.empty(): file = await files.get() if await function(file, storage): @@ -290,13 +292,13 @@ def __init__(self, total: int = 0, size: int = 0): self.pbar = None def __enter__(self): - self.pbar = tqdm( + self.pbar = WrapperTQDM(tqdm( total=self.total_size, unit="b", unit_scale=True, unit_divisor=1024, desc=i18n.locale.t("cluster.processbar.download_files") - ) + )) self.downloaded_files = 0 self.failed_files = 0 return self @@ -1151,6 +1153,7 @@ async def _(request: aweb.Request): elif isinstance(storage, storages.WebDavStorage): file = await storage.get_file(MEASURES_HASH[size]) if file.url: + logger.debug("Requested measure url:", file.url) return aweb.HTTPFound(file.url) elif file.size >= 0: return aweb.Response( diff --git a/core/config.py b/core/config.py index 3e03020..0722bad 100644 --- a/core/config.py +++ b/core/config.py @@ -170,7 +170,7 @@ def rank_clusters_url(self): const = Const() -VERSION = "3.3.7" +VERSION = "3.3.8" API_VERSION = "1.13.1" USER_AGENT = f"openbmclapi/{API_VERSION} python-openbmclapi/{VERSION}" PYTHON_VERSION = ".".join(map(str, (sys.version_info.major, sys.version_info.minor, sys.version_info.micro))) \ No newline at end of file diff --git a/core/dashboard.py b/core/dashboard.py index cd1ec0b..41a06d7 100644 --- a/core/dashboard.py +++ b/core/dashboard.py @@ -232,37 +232,6 @@ async def _(request: web.Request): except: return web.json_response([]) -@route.get("/api/system_info") -async def _(request: web.Request): - return web.json_response(counter.get_json()) - -@route.get("/api/count") -async def _(request: web.Request): - # statistics of the cluster hits and bytes - session = db.SESSION.get_session() - current_hour = db.get_hour() - hour_of_day = (current_hour // 24) * 24 - next_hour = hour_of_day + 24 - q = session.query(db.ClusterStatisticsTable).filter( - db.ClusterStatisticsTable.hour >= hour_of_day, - db.ClusterStatisticsTable.hour < next_hour - ).all() - return web.json_response({ - "hits": sum([int(item.hits) for item in q]), # type: ignore - "bytes": sum([int(item.bytes) for item in q]) # type: ignore - }) - - -@route.get("/api/openbmclapi/rank") -async def _(request: web.Request): - async with aiohttp.ClientSession( - "bd.bangbang93.com" - ) as session: - async with session.get("/openbmclapi/metric/rank") as resp: - return web.json_response( - await resp.json(), - ) - route.static("/assets", "./assets") class JSONEncoder(json.JSONEncoder): @@ -282,7 +251,6 @@ async def websocket_process_api(resp: web.WebSocketResponse, data: Any): return await resp.send_json(res, dumps=json_dumps) - async def process_api( event: Optional[str], req_data: Any, diff --git a/core/storages/__init__.py b/core/storages/__init__.py index 9d2e857..fa0823f 100644 --- a/core/storages/__init__.py +++ b/core/storages/__init__.py @@ -14,6 +14,7 @@ from core import config, units from .. import scheduler, utils, cache from ..logger import logger +from ..utils import WrapperTQDM import urllib.parse as urlparse import aiofiles @@ -22,6 +23,9 @@ import aiowebdav.client as webdav3_client import aiowebdav.exceptions as webdav3_exceptions +FILES_DIR = "/download" +MEASURE_DIR = "/measure" + @dataclass class File: name: str @@ -36,7 +40,7 @@ class iStorage(metaclass=abc.ABCMeta): def __init__(self, path: str, weight: int) -> None: if self.type == "_interface": raise ValueError("Cannot instantiate interface") - self.path = path + self.path = path.rstrip("/") self.weight = weight self.current_weight = 0 @@ -57,7 +61,7 @@ async def read_file(self, file_hash: str) -> bytes: async def delete_file(self, file_hash: str) -> bool: raise NotImplementedError("Not implemented") @abc.abstractmethod - async def list_files(self, pbar: Optional[tqdm] = None) -> defaultdict[str, deque[File]]: + async def list_files(self, pbar: Optional[WrapperTQDM] = None) -> defaultdict[str, deque[File]]: raise NotImplementedError("Not implemented") @abc.abstractmethod async def exists(self, file_hash: str) -> bool: @@ -70,7 +74,26 @@ async def get_mtime(self, file_hash: str) -> float: raise NotImplementedError("Not implemented") def get_path(self, file_hash: str) -> str: - return f"{self.path}/{file_hash[:2]}/{file_hash}" + return f"{self.get_path_dir(file_hash)}/{file_hash}" + + def get_path_dir(self, file_hash: str) -> str: + return f"{self.path}{FILES_DIR}/{file_hash[:2]}" + + def get_measure_path(self, size: int) -> str: + return f"{self.get_measure_path_dir()}/{size}" + + def get_measure_path_dir(self): + return f"{self.path}{MEASURE_DIR}" + + def get_real_path(self, path: str) -> str: + if self.path.startswith(path): + path = path.removeprefix(self.path) + return path.lstrip("/") + + + @abc.abstractmethod + async def raw_write_file(self, path: str, content: bytes, mtime: Optional[float]) -> bool: + raise NotImplementedError("Not implemented") class LocalStorage(iStorage): type: str = "local" @@ -85,27 +108,32 @@ def __repr__(self) -> str: return f"LocalStorage({self.path})" async def write_file(self, file: File, content: bytes, mtime: Optional[float]) -> bool: - Path(f"{self.path}/{file.hash[:2]}").mkdir(parents=True, exist_ok=True) - async with aiofiles.open(f"{self.path}/{file.hash[:2]}/{file.hash}", "wb") as f: + return await self.raw_write_file(self.get_path(file.hash), content, mtime) + + async def raw_write_file(self, path: str, content: bytes, mtime: Optional[float]) -> bool: + path = self.get_real_path(path) + file = Path(f"{self.path}/{path}") + file.parent.mkdir(parents=True, exist_ok=True) + async with aiofiles.open(file, "wb") as f: await f.write(content) return True async def read_file(self, file_hash: str) -> bytes: if not await self.exists(file_hash): raise FileNotFoundError(f"File {file_hash} not found") - async with aiofiles.open(f"{self.path}/{file_hash[:2]}/{file_hash}", "rb") as f: + async with aiofiles.open(self.get_path(file_hash), "rb") as f: return await f.read() async def exists(self, file_hash: str) -> bool: - return os.path.exists(f"{self.path}/{file_hash[:2]}/{file_hash}") + return os.path.exists(self.get_path(file_hash)) async def delete_file(self, file_hash: str) -> bool: if not await self.exists(file_hash): return False - os.remove(f"{self.path}/{file_hash[:2]}/{file_hash}") + os.remove(self.get_path(file_hash)) return True - async def list_files(self, pbar: Optional[tqdm] = None) -> dict[str, deque[File]]: + async def list_files(self, pbar: Optional[WrapperTQDM] = None) -> dict[str, deque[File]]: def update(): pbar.update(1) # type: ignore def empty(): @@ -116,7 +144,7 @@ def empty(): files: defaultdict[str, deque[File]] = defaultdict(deque) for root_id in range(0, 256): - root = f"{self.path}/{root_id:02x}" + root = self.get_path_dir(f"{root_id:02x}") if not os.path.exists(root): update_tqdm() continue @@ -135,10 +163,10 @@ def empty(): return files async def get_size(self, file_hash: str) -> int: - return os.path.getsize(f"{self.path}/{file_hash[:2]}/{file_hash}") + return os.path.getsize(self.get_path(file_hash)) async def get_mtime(self, file_hash: str) -> float: - return os.path.getmtime(f"{self.path}/{file_hash[:2]}/{file_hash}") + return os.path.getmtime(self.get_path(file_hash)) @dataclass class AlistFileInfo: @@ -156,40 +184,6 @@ class AlistResult: message: str data: Any -class AlistPath: - def __init__(self, path: str): - self.path = path if path.startswith("/") else f"/{path}" - - @property - def parent(self): - if self.path == "/": - return None - return AlistPath("/".join(self.path.split("/")[:-1])) - - @property - def parents(self): - return [AlistPath("/".join(self.path.split("/")[:-i])) for i in range(1, len(self.path.split("/")))] - - @property - def name(self): - return self.path.split("/")[-1] - - def __div__(self, other: object): - if isinstance(other, AlistPath): - return AlistPath(f"{self.path}/{other.path}") - return AlistPath(f"{self.path}{other}") - - def __truediv__(self, other: object): - return self.__div__(other) - - def __add__(self, other: 'AlistPath'): - return AlistPath(f"{self.path}{other.path}") - - def __repr__(self): - return f"AlistPath({self.path})" - - def __str__(self): - return self.path @dataclass class AlistLink: @@ -355,7 +349,7 @@ async def delete_file(self, file_hash: str) -> bool: "names": [ file_hash ], - "dir": f"{self.path}/{file_hash[:2]}" + "dir": self.get_path_dir(file_hash) } ) return result.code == 200 @@ -371,7 +365,7 @@ async def get_size(self, file_hash: str) -> int: info = await self.__info_file(file_hash) return max(0, info.size) - async def list_files(self, pbar: Optional[tqdm] = None) -> defaultdict[str, deque[File]]: + async def list_files(self, pbar: Optional[WrapperTQDM] = None) -> defaultdict[str, deque[File]]: def update(): pbar.update(1) # type: ignore def empty(): @@ -381,39 +375,31 @@ def empty(): update_tqdm = update files: defaultdict[str, deque[File]] = defaultdict(deque) - async def get_files(root_id: int): - root = f"{self.path}/{root_id:02x}" - try: - async with session.post( - "/api/fs/list", - data={ - "path": str(root) - }, - ) as resp: - result = AlistResult( - **await resp.json() - ) - if result.code != 200: - logger.tdebug("storage.debug.error_alist", status=result.code, message=result.message) - except: - logger.traceback() - return [] - finally: - update_tqdm() - return ((result.data or {}).get("content", None) or []) async with aiohttp.ClientSession( self.url, headers={ "Authorization": await self._get_token() } ) as session: - results = await asyncio.gather( - *(get_files(root_id) for root_id in range(256)) - ) - for root_id, result in zip( - range(256), results - ): - for r in result: + for root_id in range(256): + try: + async with session.post( + "/api/fs/list", + data={ + "path": str(self.get_path_dir(f"{root_id:02x}")) + }, + ) as resp: + result = AlistResult( + **await resp.json() + ) + except: + logger.traceback() + continue + finally: + update_tqdm() + if result.code != 200: + logger.tdebug("storage.debug.error_alist", status=result.code, message=result.message) + for r in ((result.data or {}).get("content", None) or []): file = File( r["name"], r["size"], @@ -421,8 +407,6 @@ async def get_files(root_id: int): r["name"] ) files[f"{root_id:02x}"].append(file) - await asyncio.sleep(0) - return files async def read_file(self, file_hash: str) -> bytes: @@ -458,6 +442,18 @@ async def write_file(self, file: File, content: bytes, mtime: float | None) -> b async def close(self): await self.session.close() + async def raw_write_file(self, path: str, content: bytes, mtime: Optional[float]) -> bool: + path = self.get_real_path(path) + result = await self._action_data( + "put", + "/api/fs/put", + content, + { + "File-Path": urlparse.quote(f"{self.path}/{path}") + } + ) + return result.code == 200 + @dataclass class WebDavFileInfo: created: float @@ -517,7 +513,7 @@ def __init__(self, path: str, weight: int, url: str, username: str, password: st def unique_id(self) -> str: return hashlib.md5(f"{self.type},{self.url},{self.path}".encode()).hexdigest() - async def list_files(self, pbar: Optional[tqdm] = None) -> defaultdict[str, deque[File]]: + async def list_files(self, pbar: Optional[WrapperTQDM] = None) -> defaultdict[str, deque[File]]: def update(): pbar.update(1) # type: ignore def empty(): @@ -528,7 +524,8 @@ def empty(): files: defaultdict[str, deque[File]] = defaultdict(deque) async def get_files(root_id: int): - root = f"{self.path}/{root_id:02x}/" + root = self.get_path_dir(f"{root_id:02x}") + "/" + print(root) try: result = await self.client.list( root, @@ -607,6 +604,19 @@ async def write_file(self, file: File, content: bytes, mtime: float | None) -> b return False return True + async def raw_write_file(self, path: str, content: bytes, mtime: Optional[float]) -> bool: + path = self.get_real_path(path) + await self._mkdir(path.rsplit("/", 1)[0]) + try: + await self.client.upload_to( + io.BytesIO(content), + path, + ) + except: + logger.traceback() + return False + return True + async def _mkdir(self, dir: str): async with self.client_lock: d = "" @@ -617,6 +627,20 @@ async def _mkdir(self, dir: str): d += "/" async def get_file(self, file_hash: str) -> WebDavFile: + # by old code + file = WebDavFile( + file_hash, + 0 + ) + + # replace url to basic auth url + + urlobject = urlparse.urlparse(f"{self.url}") + url = f"{urlobject.scheme}://{urlparse.quote(self.username)}:{urlparse.quote(self.password)}@{urlobject.hostname}:{urlobject.port}{urlobject.path}{self.get_path(file_hash)}" + file.url = url + return file + + async with self.session.get( f"{self.url}{self.get_path(file_hash)}", allow_redirects=False diff --git a/core/utils.py b/core/utils.py index 7e9326c..bd64d64 100644 --- a/core/utils.py +++ b/core/utils.py @@ -12,6 +12,8 @@ import time from typing import Any, Callable, Optional +from tqdm import tqdm + from core import logger @@ -182,6 +184,30 @@ def to_hours(self): def to_days(self): return self.day + self.hour / 24 + self.minute / 1440 + self.second / 86400 + self.milisecond / 86400000 +class WrapperTQDM: + def __init__(self, pbar: tqdm): + self.pbar = pbar + + def __enter__(self): + wrapper_tqdms.appendleft(self) + self.pbar.__enter__() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if self in wrapper_tqdms: + wrapper_tqdms.remove(self) + self.pbar.__exit__(exc_type, exc_val, exc_tb) + + def update(self, n: float | None = 1): + self.pbar.update(n) + + def set_postfix_str(self, s: str): + self.pbar.set_postfix_str(s) + + def close(self): + if self in wrapper_tqdms: + wrapper_tqdms.remove(self) + self.pbar.close() def check_sign(hash: str, secret: str, s: str, e: str) -> bool: return check_sign_without_time(hash, secret, s, e) and time.time() - 300 < int(e, 36) @@ -282,3 +308,6 @@ class ServiceError: httpCode: int message: str name: str + + +wrapper_tqdms: deque[WrapperTQDM] = deque() \ No newline at end of file diff --git a/core/web.py b/core/web.py index 5c4143f..474d3a0 100644 --- a/core/web.py +++ b/core/web.py @@ -19,9 +19,11 @@ @dataclass class CheckServer: + object: Any port: Optional[int] start_handle: Callable client: Optional[ssl.SSLContext] = None + args: tuple[Any, ...] = () @dataclass class PrivateSSLServer: @@ -84,9 +86,15 @@ async def middleware(request: web.Request, handler: Any) -> web.Response: pass setattr(request, "custom_address", address) start = time.perf_counter_ns() - resp = None + resp: web.Response = None # type: ignore try: resp = await asyncio.create_task(handler(request)) + #if not isinstance(resp, web.WebSocketResponse): + # if not resp.prepared: + # await resp.prepare(request) + #await resp.write_eof() + resp.force_close() + #resp.force_close() return resp finally: status = 500 @@ -179,7 +187,7 @@ async def init(): scheduler.run_repeat_later( check_server, 5, - 5 + 10 ) async def forward_data(reader: asyncio.StreamReader, writer: asyncio.StreamWriter): @@ -189,8 +197,7 @@ async def forward_data(reader: asyncio.StreamReader, writer: asyncio.StreamWrite break writer.write(data) await writer.drain() - writer.close() - await writer.wait_closed() + await close_writer(writer) async def close_writer(writer: asyncio.StreamWriter): if writer.is_closing(): @@ -227,10 +234,8 @@ async def open_forward_data(reader: asyncio.StreamReader, writer: asyncio.Stream ... finally: if "target_w" in locals(): - target_w.close() - await target_w.wait_closed() - writer.close() - await writer.wait_closed() + await close_writer(target_w) + await close_writer(writer) async def public_handle(reader: asyncio.StreamReader, writer: asyncio.StreamWriter): @@ -282,7 +287,10 @@ async def start_public_server(): await asyncio.wait_for(public_server.wait_closed(), timeout=5) except: ... - public_server = await asyncio.start_server(public_handle, '0.0.0.0', get_public_port()) + port = get_public_port() + if port == 0: + port = await get_free_port() + public_server = await asyncio.start_server(public_handle, '0.0.0.0', port) await public_server.start_serving() @@ -341,7 +349,7 @@ async def start_private_server( server = await asyncio.start_server( private_handle, '0.0.0.0', - 0, + await get_free_port(), ssl=context ) await server.start_serving() @@ -367,12 +375,15 @@ def get_certificate_domains( async def check_server(): servers: list[CheckServer] = [] if site is not None: - servers.append(CheckServer(site._port, start_private_server)) + servers.append(CheckServer(site, site._port, start_tcp_site)) if public_server is not None: - servers.append(CheckServer(get_server_port(public_server), start_public_server)) + servers.append(CheckServer(public_server, get_server_port(public_server), start_public_server)) if privates: - for server in privates.values(): - servers.append(CheckServer(get_server_port(server.server), start_private_server, server.key)) + for hash, server in privates.items(): + servers.append(CheckServer(server.server, get_server_port(server.server), start_private_server, server.key, ( + Path(hash[0]), + Path(hash[1]) + ))) #logger.tdebug("web.debug.check_server", servers=len(servers)) results = await asyncio.gather(*[asyncio.create_task(_check_server(server)) for server in servers]) @@ -398,6 +409,9 @@ async def _check_server( ): if server.port is None: return False + if isinstance(server.object, asyncio.Server): + if len(server.object.sockets) != 0: + return True try: r, w = await asyncio.wait_for( asyncio.open_connection(