From 2bb68322c95150738806b9d385dcdcef476374e3 Mon Sep 17 00:00:00 2001 From: tianxiu2b2t Date: Sat, 30 Mar 2024 10:35:55 +0800 Subject: [PATCH 1/8] Fix stats value is -1 --- core/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/utils.py b/core/utils.py index 1099e4b..cfce2e9 100644 --- a/core/utils.py +++ b/core/utils.py @@ -506,7 +506,7 @@ def readVarInt(self) -> int: j += 1 if (k & 0x80) != 128: break - return i - 2**31 * 2 if i >= 2**31 - 1 else i + return i def readString( self, maximun: Optional[int] = None, encoding: Optional[str] = None From dbc5a9e5ed86376e16c33caf243598b2f1d4f743 Mon Sep 17 00:00:00 2001 From: tianxiu2b2t Date: Sat, 30 Mar 2024 16:13:52 +0800 Subject: [PATCH 2/8] Add webdav support, but not enable. --- core/api.py | 11 +----- core/cluster.py | 90 ++++++++++++++++++++++++++++++++++++++++++++++++- core/utils.py | 4 +-- core/web.py | 4 +-- 4 files changed, 94 insertions(+), 15 deletions(-) diff --git a/core/api.py b/core/api.py index 099fab5..960b0df 100644 --- a/core/api.py +++ b/core/api.py @@ -27,7 +27,7 @@ class BMCLAPIFile: size: int mtime: int = 0 def __hash__(self): - return int.from_bytes(bytes.fromhex(self.hash)) + return int.from_bytes(bytes.fromhex(self.hash), byteorder="big") def __eq__(self, other): if isinstance(other, BMCLAPIFile): @@ -98,15 +98,6 @@ async def get_size(self, hash: str) -> int: """ raise NotImplementedError - @abc.abstractmethod - async def copy(self, origin: Path, hash: str) -> int: - """ - origin: src path - hash: desc path (new path) - return File size - """ - raise NotImplementedError - @abc.abstractmethod async def write(self, hash: str, io: io.BytesIO) -> int: """ diff --git a/core/cluster.py b/core/cluster.py index 95c4eb2..16b8534 100644 --- a/core/cluster.py +++ b/core/cluster.py @@ -524,11 +524,97 @@ async def get_cache_stats(self) -> StatsCache: class WebDav(Storage): - def __init__(self, username: str, password: str, endpoint: str) -> None: + def __init__(self, username: str, password: str, endpoint: str, dir: str, token: Optional[str] = None) -> None: self.username = username self.password = password self.endpoint = endpoint + self.dir = dir + self.cache: dict[str, File] = {} + self.timer = Timer.repeat(self.clear_cache, (), CHECK_CACHE, CHECK_CACHE) + self.files: dict[str, File] = {} + self.token = token + self.fetch = False + self.empty = File("", "", 0) + Timer.delay(self._list_all) + def _endpoint(self, file: str): + return f"{self.dir}/{file.removeprefix('/')}" + def _client(self): + return webdav3_client.Client({ + "webdav_username": self.username, + "webdav_password": self.password, + "webdav_hostname": self.endpoint, + "webdav_token": self.token + }) + async def get(self, file: str) -> File: + if file in self.cache: + self.cache[file].last_access = time.time() + self.cache[file].cache = True + return self.cache[file] + ... + + async def clear_cache(self): + ... + async def _list_all(self, force = False): + if self.fetch and not force: + return + self.fetch = True + self.files = {} + async with self._client() as client: + dirs = (await client.list(self.dir))[1:] + with tqdm(total=len(dirs), desc="[WebDav List Files]") as pbar: + await dashboard.set_status_by_tqdm("正在获取WebDav文件列表中", pbar) + for dir in (await client.list(self.dir))[1:]: + pbar.update(1) + for file in (await client.list(self._endpoint(dir,), get_info=True))[1:]: + self.files[file['name']] = File(file['path'].removeprefix(f"/dav/{self.dir}/"), file['name'], int(file['size'])) + await asyncio.sleep(0) + return self.files + async def exists(self, hash: str) -> bool: + if not self.fetch: + self.fetch = True + await self._list_all() + return hash in self.files + + + async def get_size(self, hash: str) -> int: + return self.files.get(hash, self.empty).size + + async def write(self, hash: str, io: io.BytesIO) -> int: + async with self._client() as client: + path = self._endpoint(f"{hash[:2]}/{hash}") + await client.upload_to(io, path) + self.files[hash] = File(path, hash, len(io.getbuffer())) + return self.files[hash].size + async def get_files(self, dir: str) -> list[str]: + return list((hash for hash in self.files.keys() if hash.startswith(dir))) + + async def get_hash(self, hash: str) -> str: + """ + hash: file, length `hash` parametar, md5 length is 32, else sha1 + return hash file content + """ + raise NotImplementedError + + + async def get_files_size(self, dir: str) -> int: + return sum((file.size for hash, file in self.files.items() if hash.startswith(dir))) + + async def removes(self, hashs: list[str]) -> int: + """ + dir: path + Remove files (file: hash str) + return success remove files + """ + raise NotImplementedError + + async def get_cache_stats(self) -> StatsCache: + """ + dir: path + Getting cache files + return StatsCache + """ + return StatsCache() class TypeStorage(Enum): FILE = "file" WEBDAV = "webdav" @@ -625,6 +711,8 @@ async def enable(self) -> None: for storage in storages.get_storages(): if isinstance(storage, FileStorage): storage_str["file"] += 1 + elif isinstance(storage, WebDav): + storage_str["webdav"] += 1 await self.emit( "enable", { diff --git a/core/utils.py b/core/utils.py index cfce2e9..029d987 100644 --- a/core/utils.py +++ b/core/utils.py @@ -474,7 +474,7 @@ def readIntegetr(self): return (value[0] << 24) + (value[1] << 16) + (value[2] << 8) + (value[3] << 0) def readBoolean(self): - return bool(int.from_bytes(self.read(1))) + return bool(int.from_bytes(self.read(1), byteorder="big")) def readShort(self): value = self.read(2) @@ -501,7 +501,7 @@ def readVarInt(self) -> int: j: int = 0 k: int while 1: - k = int.from_bytes(self.read(1)) + k = int.from_bytes(self.read(1), byteorder="big") i |= (k & 0x7F) << j * 7 j += 1 if (k & 0x80) != 128: diff --git a/core/web.py b/core/web.py index 7821029..170a7e5 100644 --- a/core/web.py +++ b/core/web.py @@ -317,7 +317,7 @@ def __init__(self, opcode: int, data: io.BytesIO) -> None: self.data = data self.close = self.opcode == WebSocketOpcode.CLOSE.value if self.close: - self.status = int.from_bytes(self.data.getbuffer()[:2]) + self.status = int.from_bytes(self.data.getbuffer()[:2], byteorder="big") self.reason = self.data.getbuffer()[:2] self.content = data if self.opcode == WebSocketOpcode.TEXT.value: @@ -937,7 +937,7 @@ async def __call__(self, request: "Request", client: Client, response_configurat class RedirectResponse(Response): def __init__(self, location: str) -> None: - super().__init__(headers=Header({"Location": location}), status_code=301) + super().__init__(headers=Header({"Location": location}), status_code=307) class Request: From 7073b1e572a65856a29b72fb619987ae46d35c0d Mon Sep 17 00:00:00 2001 From: tianxiu2b2t Date: Sat, 30 Mar 2024 16:20:37 +0800 Subject: [PATCH 3/8] Support lower python version --- core/cluster.py | 4 ++-- core/utils.py | 16 ++++++++-------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/core/cluster.py b/core/cluster.py index 16b8534..f986787 100644 --- a/core/cluster.py +++ b/core/cluster.py @@ -288,10 +288,10 @@ async def __call__(self, ) -> Any: self.checked = True more_files = {storage: [] for storage in storages.get_storages()} prefix_files = { - prefix: [] for prefix in (prefix.to_bytes().hex() for prefix in range(256)) + prefix: [] for prefix in (prefix.to_bytes(1, "big").hex() for prefix in range(256)) } prefix_hash = { - prefix: [] for prefix in (prefix.to_bytes().hex() for prefix in range(256)) + prefix: [] for prefix in (prefix.to_bytes(1, "big").hex() for prefix in range(256)) } for file in files: diff --git a/core/utils.py b/core/utils.py index 029d987..63e8c61 100644 --- a/core/utils.py +++ b/core/utils.py @@ -421,20 +421,20 @@ def write(self, value: bytes | int): if isinstance(value, bytes): self.io.write(value) else: - self.io.write((value + 256 if value < 0 else value).to_bytes()) # type: ignore + self.io.write((value + 256 if value < 0 else value).to_bytes(1, "big")) # type: ignore def writeBoolean(self, value: bool): - self.write(value.to_bytes()) + self.write(value.to_bytes(1, "big")) def writeShort(self, data: int): - self.write(((data >> 8) & 0xFF).to_bytes()) - self.write(((data >> 0) & 0xFF).to_bytes()) + self.write(((data >> 8) & 0xFF).to_bytes(1, "big")) + self.write(((data >> 0) & 0xFF).to_bytes(1, "big")) def writeInteger(self, data: int): - self.write(((data >> 24) & 0xFF).to_bytes()) - self.write(((data >> 16) & 0xFF).to_bytes()) - self.write(((data >> 8) & 0xFF).to_bytes()) - self.write((data & 0xFF).to_bytes()) + self.write(((data >> 24) & 0xFF).to_bytes(1, "big")) + self.write(((data >> 16) & 0xFF).to_bytes(1, "big")) + self.write(((data >> 8) & 0xFF).to_bytes(1, "big")) + self.write((data & 0xFF).to_bytes(1, "big")) def writeVarInt(self, value: int): self.write(MinecraftUtils.getVarInt(value)) From ea3879e35ea374c6889cfca5b1f09581b0b86976 Mon Sep 17 00:00:00 2001 From: tianxiu2b2t Date: Sat, 30 Mar 2024 21:56:15 +0800 Subject: [PATCH 4/8] Fix too many enable and add update check. --- bmclapi_dashboard/static/js/index.min.js | 65 ++++++- core/api.py | 2 +- core/cluster.py | 232 +++++++++++++---------- core/config.py | 4 + core/dashboard.py | 14 +- core/utils.py | 43 +++++ 6 files changed, 249 insertions(+), 111 deletions(-) diff --git a/bmclapi_dashboard/static/js/index.min.js b/bmclapi_dashboard/static/js/index.min.js index 21accd9..6e576da 100644 --- a/bmclapi_dashboard/static/js/index.min.js +++ b/bmclapi_dashboard/static/js/index.min.js @@ -3,6 +3,7 @@ const langs = { "zh_cn": { "dashboard": "数据统计", + "version": "版本信息", "measure": "宽带测试", "authentication.fetching": "获取账户信息中", "authentication.login": "登陆", @@ -238,6 +239,21 @@ ".root .left .copyright": [ "position: fixed;", "bottom: 2px", + "width: 184px", + "box-sizing: border-sizing", + "display: flex", + "justify-content: center", + "flex-direction: column", + "align-items: center" + ], + ".root .left .box": [ + "box-sizing: border-sizing", + "border-radius: 8px", + "padding-left: 8px", + "padding-right: 8px", + "background: white", + "margin-bottom: 4px", + "margin-top: 4px" ], ".qps .icon": [ "width: 14px", @@ -471,14 +487,15 @@ display_left() update_left() } - header.append(ttb.createElement("h3").append(ttb.createElement("a").setText(document.title).setAttribute("href", (() => { + const github = (() => { for (child of document.head.children) { if (child.getAttribute("github")) return "//github.com/" + child.getAttribute("github") } return "" - })()))) + })(); + header.append(ttb.createElement("h3").append(ttb.createElement("a").setText(document.title).setAttribute("href", github))) left_copyright.append( - ttb.createElement("p").append(ttb.createElement("a").setAttribute("href", "//github.com/TTB-Network").setText("TTB Network"), " - ", ttb.VERSION) + ttb.createElement("a").class("box").setAttribute("href", "//github.com/TTB-Network").setText("TTB Network") ) left_arrow.event("click", () => { left.toggle("hide") @@ -841,7 +858,7 @@ onopen() { if (this._isws) ttb.createNotication("info", "", ttb.createElement("h4").setText("实时隧道已开启")) this.send("uptime") - this.send("storage") + this.send("version") this.send("status") this._timer_qps?.block(); this._timer?.block() @@ -1099,6 +1116,13 @@ }) } + if (type == "version") { + if (data.cur != data.latest) { + ttb.createNotication("info", "", ttb.createElement("h4").setText("有新的版本更新"), ttb.createElement("p").setText(data.latest)) + } + version.version = data + version._update() + } root_handler("_ws_message", type, data) } _deserializeData(input) { @@ -1362,11 +1386,40 @@ page.push(...this._page) } } + class Version { + constructor() { + this.version = { + "cur": "Unknown", + "latest": "Unknown" + } + this._page = [ + ttb.createElement("div").class("panel").append( + ttb.createElement("p").class("title").setText("当前节点版本"), + ttb.createElement("p").class("value").setText("-"), + ttb.createElement("p").class("title").setText("最新版本"), + ttb.createElement("p").class("value").setText("-"), + ttb.createElement("button").class("button").setText("点击下载最新").setAttribute("href").event('click', () => { + if (this.version.cur == this.version.latest) return + window.location.href = github + "/releases/" + this.version.latest + }) + ) + ] + this._update() + } + _update() { + this._page[0].getChildrens()[1].setText(this.version.cur) + this._page[0].getChildrens()[3].setText(this.version.latest) + this._page[0].getChildrens()[4].valueOf().style.display = (this.version.cur != this.version.latest ? "" : "none") + } + connect(page) { + page.push(...this._page) + } + } class Storage { constructor() { this._storages = [] } - connect() { + connect(page) { //ws.send("storage") } } @@ -1374,7 +1427,9 @@ const dashboard = new Dashboard() const measure = new Measure() const storage = new Storage() + const version = new Version() menu("dashboard", "", dashboard) + menu("version", "", version) ws.menu("measure", "", measure) ws.setAuthInfo() })(); diff --git a/core/api.py b/core/api.py index 960b0df..fdee508 100644 --- a/core/api.py +++ b/core/api.py @@ -9,7 +9,6 @@ import zlib import aiofiles -from tqdm import tqdm from core.config import Config @@ -46,6 +45,7 @@ class File: size: int last_hit: float = 0 last_access: float = 0 + expiry: Optional[float] = None data: Optional[io.BytesIO] = None cache: bool = False diff --git a/core/cluster.py b/core/cluster.py index f986787..8fd0840 100644 --- a/core/cluster.py +++ b/core/cluster.py @@ -1,6 +1,5 @@ import asyncio import base64 -from dataclasses import dataclass from enum import Enum import hashlib import hmac @@ -26,6 +25,8 @@ import core.web as web from core.logger import logger import plugins +import aiowebdav.client as webdav3_client + from core.api import ( File, @@ -37,13 +38,14 @@ ) VERSION = "" +fetch_version = "" version_path = Path("VERSION") if version_path.exists(): with open(Path("VERSION"), "r", encoding="utf-8") as f: - VERSION = f.read().split('\n')[0] + fetch_version = VERSION = f.read().split('\n')[0] f.close() else: - VERSION = "" + fetch_version = VERSION = "Unknown" API_VERSION = "1.10.1" USER_AGENT = f"openbmclapi-cluster/{API_VERSION} python-openbmclapi/{VERSION}" BASE_URL = "https://openbmclapi.bangbang93.com/" @@ -58,7 +60,7 @@ CACHE_BUFFER: int = 1024 * 1024 * 512 CACHE_TIME: int = 1800 CHECK_CACHE: int = 60 -SIGN_SKIP: bool = False +SIGN_SKIP: bool = True DASHBOARD_USERNAME: str = Config.get("dashboard.username") DASHBOARD_PASSWORD: str = Config.get("dashboard.password") @@ -101,7 +103,9 @@ async def fetchToken(self): logger.info("Fetched token.") except aiohttp.ClientError as e: - logger.error(f"An error occured whilst fetching token: {e}.") + logger.error(f"An error occured whilst fetching token, retry in 5s to fetch again: {e}.") + await asyncio.sleep(5) + return self.fetchToken() async def getToken(self) -> str: if not self.token: @@ -158,7 +162,12 @@ async def get_files(self) -> list[BMCLAPIFile]: logger.debug(f"Filelist response status: {req.status}") if req.status == 204: return [] - req.raise_for_status() + if req.status != 200: + try: + req.raise_for_status() + except: + logger.error(traceback.format_exc()) + return [] logger.info("Requested filelist.") files = await ParseFileList()(zstd.decompress(await req.read())) self.last_modified = max(self.last_modified, *(file.mtime for file in files)) @@ -263,6 +272,12 @@ async def __call__(self, ) -> Any: sorted(files, key=lambda x: x.hash) if not self.checked: await dashboard.set_status("正在检查缺失文件") + if not files: + logger.warn("No files? Skip check files") + if self.check_files_timer: + self.check_files_timer.block() + self.check_files_timer = Timer.repeat(self, (), 1800, 1800) + return with tqdm( total=len(files) * len(storages.get_storages()), unit=" file(s)", unit_scale=True ) as pbar: @@ -423,6 +438,7 @@ async def get(self, hash: str) -> File: if hash in self.cache: file = self.cache[hash] file.last_access = time.time() + file.expiry = file.last_access + CACHE_TIME file.cache = True return file path = Path(str(self.dir) + f"/{hash[:2]}/{hash}") @@ -475,13 +491,15 @@ async def clear_cache(self): size: int = 0 old_keys: list[str] = [] old_size: int = 0 - for file in sorted( - self.cache.items(), key=lambda x: x[1].last_access, reverse=True + file: File + key: str + for key, file in sorted( + self.cache.items(), key=lambda x: x[1].expiry, reverse=True ): - if size <= CACHE_BUFFER and time.time() - file[1].last_access <= CACHE_TIME: + if size <= CACHE_BUFFER and time.time() >= file.expiry: continue - old_keys.append(file[0]) - old_size += file[1].size + old_keys.append(key) + old_size += file.size if not old_keys: return for key in old_keys: @@ -524,49 +542,55 @@ async def get_cache_stats(self) -> StatsCache: class WebDav(Storage): - def __init__(self, username: str, password: str, endpoint: str, dir: str, token: Optional[str] = None) -> None: + def __init__(self, username: str, password: str, hostname: str, endpoint: str, token: Optional[str] = None, dav: str = "/dav") -> None: self.username = username self.password = password + self.hostname = hostname self.endpoint = endpoint - self.dir = dir - self.cache: dict[str, File] = {} - self.timer = Timer.repeat(self.clear_cache, (), CHECK_CACHE, CHECK_CACHE) + self.dav = dav self.files: dict[str, File] = {} self.token = token self.fetch = False + self.cache: dict[str, File] = {} self.empty = File("", "", 0) Timer.delay(self._list_all) def _endpoint(self, file: str): - return f"{self.dir}/{file.removeprefix('/')}" + return f"{self.endpoint}/{file.removeprefix('/')}" def _client(self): return webdav3_client.Client({ "webdav_username": self.username, "webdav_password": self.password, - "webdav_hostname": self.endpoint, + "webdav_hostname": self.hostname + self.dav, "webdav_token": self.token }) async def get(self, file: str) -> File: - if file in self.cache: - self.cache[file].last_access = time.time() + if file in self.cache and self.cache[file].expiry - 10 > time.time(): self.cache[file].cache = True + self.cache[file].last_hit = time.time() return self.cache[file] - ... + async with aiohttp.ClientSession(self.hostname, auth=aiohttp.BasicAuth(self.username, self.password)) as session: + async with session.get(self.dav + self._endpoint(file[:2] + "/" + file), allow_redirects=False) as resp: + f = File(self.dav + self._endpoint(file[:2] + "/" + file), file, size = int(resp.headers.get("Content-Length", 0))) + if resp.status == 200: + f.set_data(await resp.read()) + elif resp.status // 100 == 3: + f.path = resp.headers.get("Location") + self.cache[file] = f + return self.cache[file] - async def clear_cache(self): - ... async def _list_all(self, force = False): if self.fetch and not force: return self.fetch = True self.files = {} async with self._client() as client: - dirs = (await client.list(self.dir))[1:] + dirs = (await client.list(self.endpoint))[1:] with tqdm(total=len(dirs), desc="[WebDav List Files]") as pbar: await dashboard.set_status_by_tqdm("正在获取WebDav文件列表中", pbar) - for dir in (await client.list(self.dir))[1:]: + for dir in (await client.list(self.endpoint))[1:]: pbar.update(1) for file in (await client.list(self._endpoint(dir,), get_info=True))[1:]: - self.files[file['name']] = File(file['path'].removeprefix(f"/dav/{self.dir}/"), file['name'], int(file['size'])) + self.files[file['name']] = File(file['path'].removeprefix(f"/dav/{self.endpoint}/"), file['name'], int(file['size'])) await asyncio.sleep(0) return self.files async def exists(self, hash: str) -> bool: @@ -590,30 +614,26 @@ async def get_files(self, dir: str) -> list[str]: return list((hash for hash in self.files.keys() if hash.startswith(dir))) async def get_hash(self, hash: str) -> str: - """ - hash: file, length `hash` parametar, md5 length is 32, else sha1 - return hash file content - """ - raise NotImplementedError + async with self._client() as session: + h = get_hash(hash) + async for data in await session.download_iter(self._endpoint(f"{hash[:2]}/{hash}")): + h.update(data) + return h.hexdigest() async def get_files_size(self, dir: str) -> int: return sum((file.size for hash, file in self.files.items() if hash.startswith(dir))) async def removes(self, hashs: list[str]) -> int: - """ - dir: path - Remove files (file: hash str) - return success remove files - """ - raise NotImplementedError + success = 0 + async with self._client() as client: + for hash in hashs: + await client.clean(self._endpoint(f"{hash[:2]}/{hash}")) + success += 1 + return success + async def get_cache_stats(self) -> StatsCache: - """ - dir: path - Getting cache files - return StatsCache - """ return StatsCache() class TypeStorage(Enum): FILE = "file" @@ -665,47 +685,64 @@ class ClusterState(Enum): def is_enable(value): return value == ClusterState.ENABLE or value == ClusterState.KEEPALIVE or value == ClusterState.TRUST + class Cluster: def __init__(self) -> None: + self.connected = False self.sio = socketio.AsyncClient() self.sio.on("message", self._message) self.stats_storage: Optional[stats.SyncStorage] = None - self.keepaliveTimer: Optional[Task] = None - self.keepaliveTimeoutTimer: Optional[Task] = None - self.keepalive_lock = asyncio.Lock() - self.connected = False - self.check_files_timer: Optional[Task] = None - self.trusted = True - self.state = ClusterState.INIT self.downloader = FileDownloader() self.file_check = FileCheck(self.downloader) self._enable_timer: Optional[Task] = None + self.keepaliveTimer: Optional[Task] = None + self.keepaliveTimeoutTimer: Optional[Task] = None + self.keepalive_lock = asyncio.Lock() def _message(self, message): logger.info(f"[Remote] {message}") if "信任度过低" in message: self.trusted = False + async def emit(self, channel, data=None): + await self.sio.emit( + channel, data, callback=lambda x: Timer.delay(self.message, (channel, x)) + ) async def init(self): - await dashboard.set_status("初始化节点中") - await self.file_check() - await dashboard.set_status("初始化节点成功") + if not self.sio.connected: + try: + await self.sio.connect( + BASE_URL, + auth={"token": await token.getToken()}, + transports=["websocket"], + ) + except: + logger.warn("Failed to connect to the main server. Retrying after 5s.") + Timer.delay(self.init, (), 5) + return await self.start() + async def start(self): - self.state = ClusterState.START - logger.info(f"Starting cluster version {API_VERSION}.") - await dashboard.set_status("启动节点中") - try: - await self.sio.connect( - BASE_URL, - auth={"token": await token.getToken()}, - transports=["websocket"], - ) - except: - logger.warn("Failed to connect to the main server. Retrying after 5s.") - Timer.delay(self.start, (), 5) - return await self.cert() - await self.reconnect() - async def enable(self) -> None: + await self.file_check() + await self.enable() + async def cert(self): + await self.emit("request-cert") + async def enable(self): + if self.connected: + logger.debug("Trying to enable again? Blocked. (\nFrom bangbang93:\n 谁他妈\n 一秒钟发了好几百个enable请求\n ban了解一下等我回杭州再看\n ban了先\n\n > Timestamp at 2024/3/30 14:07 GMT+8\n)") + return + self.connected = True + if self._enable_timer != None: + self._enable_timer.block() + self._enable_timer = Timer.delay(self.reconnect, (), 30) + await self._enable() + async def reconnect(self): + if self.connected: + await self.disable() + self.connected = False + logger.info("Retrying after 5s.") + await asyncio.sleep(5) + await self.enable() + async def _enable(self): storage_str = {"file": 0, "webdav": 0} self.trusted = True for storage in storages.get_storages(): @@ -729,10 +766,7 @@ async def enable(self) -> None: }, }, ) - self._enable_timer = Timer.delay(self._enable_task, (), 30, ) await dashboard.set_status("巡检中") - async def _enable_task(self): - await self.reconnect() async def message(self, type, data: list[Any]): if len(data) == 1: data.append(None) @@ -746,6 +780,9 @@ async def message(self, type, data: list[Any]): certificate.load_text(ack["cert"], ack["key"]) elif type == "enable": err, ack = data + if self._enable_timer != None: + self._enable_timer.block() + self._enable_timer = None if err: logger.error(f"Unable to start service: {err['message']} Retry in 5s to reconnect.") await asyncio.sleep(5) @@ -756,9 +793,6 @@ async def message(self, type, data: list[Any]): logger.info( f"Hosting on {CLUSTER_ID}.openbmclapi.933.moe:{PUBLIC_PORT or PORT}." ) - if self._enable_timer != None: - self._enable_timer.block() - self._enable_timer = None await self.start_keepalive() await dashboard.set_status( "正常工作" + ("" if self.trusted else "(节点信任度过低)") @@ -780,7 +814,7 @@ async def message(self, type, data: list[Any]): self.keepalive_lock.release() if type != "request-cert": logger.debug(type, data) - + async def start_keepalive(self, delay=0): if self.keepaliveTimer: self.keepaliveTimer.block() @@ -813,39 +847,15 @@ async def _keepalive(self): ) await self.start_keepalive(60) - async def reconnect(self): - try: - await self.disable() - except: - ... - await self.cert() - await self.enable() - async def _keepalive_timeout(self): logger.warn("Failed to keepalive! Reconnecting the main server...") await self.reconnect() - async def cert(self): - if Path("./.ssl/cert").exists() == Path("./.ssl/key").exists() == True: - return - await self.emit("request-cert") - - async def emit(self, channel, data=None): - await self.sio.emit( - channel, data, callback=lambda x: Timer.delay(self.message, (channel, x)) - ) - async def disable(self): - self.connected = False - if self.keepalive_lock and self.keepalive_lock.locked(): - self.keepalive_lock.release() - if self.keepaliveTimer: - self.keepaliveTimer.block() - if self.keepaliveTimeoutTimer: - self.keepaliveTimeoutTimer.block() if self.sio.connected: await self.emit("disable") logger.info("Disconnected from the main server...") + await dashboard.set_status("已下线") async def get_cache_stats(self) -> StatsCache: stat = StatsCache() @@ -854,28 +864,43 @@ async def get_cache_stats(self) -> StatsCache: stat.total += t.total stat.bytes += t.bytes return stat - token = TokenManager() cluster: Optional[Cluster] = None last_status: str = "-" storages = StorageManager() github_api = "https://api.github.com" +download_url = "" async def check_update(): + global fetch_version, download_url async with aiohttp.ClientSession(base_url=github_api) as session: logger.info("Checking update...") try: async with session.get("/repos/TTB-Network/python-openbmclapi/releases/latest") as req: req.raise_for_status() - fetched_version: str = (await req.json())["tag_name"] - if fetched_version != VERSION: - logger.success(f"New version found: {fetched_version}!") + data = await req.json() + fetch_version = data["tag_name"] + if fetch_version != VERSION: + logger.success(f"New version found: {fetch_version}!") + download_url = data["zipball_url"] + await dashboard.trigger("version") + if True and utils.Version.from_string(VERSION) < utils.Version.from_string(fetch_version): + Timer.delay(download_latest) else: logger.info(f"Already up to date.") except aiohttp.ClientError as e: logger.error(f"An error occured whilst checking update: {e}.") + Timer.delay(check_update, (), 3600) + +async def download_latest(): + async with aiohttp.ClientSession() as session: + async with session.get(download_url) as resp: + resp.raise_for_status() + async with aiofiles.open(f"./cache/latest_release-{fetch_version}.zip", "wb") as w: + await w.write(await resp.read()) + logger.success(f"Downloaded latest release. See './cache/latest_release-{fetch_version}.zip'") async def init(): await check_update() @@ -886,7 +911,8 @@ async def init(): for plugin in plugins.get_plugins(): await plugin.init() await plugin.enable() - storages.add_storage(FileStorage(Path("bmclapi"))) + #storages.add_storage(FileStorage(Path("bmclapi"))) + storages.add_storage(WebDav("admin", "123456", "http://127.0.0.1:5244", "/bmclapi", "alist-06e39e3a-d195-44a1-8f8f-296928308922Z84AuS5ttoA7hAW1Wy0jT2N40xpUrA2WeHQrJVum1WcmVmy3bv92zB5NrWbOpp96")) Timer.delay(cluster.init) app = web.app diff --git a/core/config.py b/core/config.py index af1adee..bdbb3d9 100644 --- a/core/config.py +++ b/core/config.py @@ -33,6 +33,10 @@ def __init__(self, path: str) -> None: self.cfg = {} if self.file.exists(): self.load() + else: + logger.warn(f"File config is not exists: '{self.file.absolute()}' create default configs.") + for key, value in defaults.items(): + self.set(key, value) def load(self): with open(self.file, "r", encoding="utf-8") as f: diff --git a/core/dashboard.py b/core/dashboard.py index 03c18bf..c12527b 100644 --- a/core/dashboard.py +++ b/core/dashboard.py @@ -118,8 +118,11 @@ async def process(type: str, data: Any): else StatsCache() ), } - if type == "storage": - return stats.get_storage_stats() + if type == "version": + return { + "cur": cluster.VERSION, + "latest": cluster.fetch_version + } async def set_status_by_tqdm( text: str, pbar: tqdm, format=unit.format_numbers @@ -162,6 +165,13 @@ async def set_status(text): await _set_status(text) last_text = text +async def trigger(type: str, data: Any = None): + app = web.app + output = to_bytes(type, await process(type, data)) + for ws in app.get_websockets("/bmcl/"): + await ws.send(output.io.getvalue()) + + def to_bytes(type: str, data: Any): output = utils.DataOutputStream() output.writeString(type) diff --git a/core/utils.py b/core/utils.py index 63e8c61..ca83202 100644 --- a/core/utils.py +++ b/core/utils.py @@ -393,6 +393,49 @@ def check_sign(hash: str, secret: str, s: str, e: str) -> bool: base64.urlsafe_b64encode(sha1.digest()).decode().strip("=") == s and time.time() * 1000 <= t ) + + +@dataclass +class Version: + major: int + minor: int + patch: int + @staticmethod + def from_string(version: str) -> 'Version': + if version.count(".") == 2: + v = [int(''.join((a for a in v if a.isdigit()))) for v in version.split(".", 2)] + return Version(v[0], v[1], v[2]) + return Version(0, 0, 0) + def __eq__(self, other: object): + if isinstance(other, Version): + return (self.major, self.minor, self.patch) == (other.major, other.minor, other.patch) + elif isinstance(other, tuple) and len(other) == 3 and all(isinstance(v, int) for v in other): + return (self.major, self.minor, self.patch) == other + return False + + def __lt__(self, other: object) -> bool: + if isinstance(other, Version): + return (self.major, self.minor, self.patch) < (other.major, other.minor, other.patch) + elif isinstance(other, tuple) and len(other) == 3 and all(isinstance(v, int) for v in other): + return (self.major, self.minor, self.patch) < other + return False + + def __gt__(self, other: object) -> bool: + if isinstance(other, Version): + return (self.major, self.minor, self.patch) > (other.major, other.minor, other.patch) + elif isinstance(other, tuple) and len(other) == 3 and all(isinstance(v, int) for v in other): + return (self.major, self.minor, self.patch) > other + return False + + def __ne__(self, other: object) -> bool: + return not self.__eq__(other) + + def __le__(self, other: object) -> bool: + return self.__eq__(other) or self.__lt__(other) + + def __ge__(self, other: object) -> bool: + return self.__eq__(other) or self.__gt__(other) + class MinecraftUtils: From b634996cc73f3846ea962ebbc25e51da303dccab Mon Sep 17 00:00:00 2001 From: tianxiu2b2t Date: Sat, 30 Mar 2024 21:57:56 +0800 Subject: [PATCH 5/8] . --- core/cluster.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/cluster.py b/core/cluster.py index 8fd0840..a000d66 100644 --- a/core/cluster.py +++ b/core/cluster.py @@ -911,8 +911,7 @@ async def init(): for plugin in plugins.get_plugins(): await plugin.init() await plugin.enable() - #storages.add_storage(FileStorage(Path("bmclapi"))) - storages.add_storage(WebDav("admin", "123456", "http://127.0.0.1:5244", "/bmclapi", "alist-06e39e3a-d195-44a1-8f8f-296928308922Z84AuS5ttoA7hAW1Wy0jT2N40xpUrA2WeHQrJVum1WcmVmy3bv92zB5NrWbOpp96")) + storages.add_storage(FileStorage(Path("bmclapi"))) Timer.delay(cluster.init) app = web.app From 6137627fd6323efb58daab750a77d9f038b43e6d Mon Sep 17 00:00:00 2001 From: tianxiu2b2t Date: Sun, 31 Mar 2024 00:16:36 +0800 Subject: [PATCH 6/8] Fix unable to clean cache --- core/cluster.py | 1 + 1 file changed, 1 insertion(+) diff --git a/core/cluster.py b/core/cluster.py index a000d66..caa0b7a 100644 --- a/core/cluster.py +++ b/core/cluster.py @@ -450,6 +450,7 @@ async def get(self, hash: str) -> File: file.set_data(buf.getbuffer()) self.cache[hash] = file file.cache = False + file.expiry = file.last_access + CACHE_TIME return file async def exists(self, hash: str) -> bool: From b1e3779fde26e0c3678cf53c9cc0fe63a064845a Mon Sep 17 00:00:00 2001 From: SilianZ Date: Sun, 31 Mar 2024 01:41:32 +0800 Subject: [PATCH 7/8] =?UTF-8?q?:bug:=20=E4=BF=AE=E5=A4=8D=E6=BC=8F?= =?UTF-8?q?=E6=B4=9E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- VERSION | 2 +- bmclapi_dashboard/static/js/index.min.js | 2 +- core/cluster.py | 115 ++++++++++++++++------- core/dashboard.py | 19 +++- core/utils.py | 43 --------- 5 files changed, 95 insertions(+), 86 deletions(-) diff --git a/VERSION b/VERSION index 4f1ec5f..d862caa 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -v1.10.1-708e4ab +v1.10.1-708e4ab \ No newline at end of file diff --git a/bmclapi_dashboard/static/js/index.min.js b/bmclapi_dashboard/static/js/index.min.js index 6e576da..107afe3 100644 --- a/bmclapi_dashboard/static/js/index.min.js +++ b/bmclapi_dashboard/static/js/index.min.js @@ -1398,7 +1398,7 @@ ttb.createElement("p").class("value").setText("-"), ttb.createElement("p").class("title").setText("最新版本"), ttb.createElement("p").class("value").setText("-"), - ttb.createElement("button").class("button").setText("点击下载最新").setAttribute("href").event('click', () => { + ttb.createElement("button").class("button").setText("查看详情").setAttribute("href").event('click', () => { if (this.version.cur == this.version.latest) return window.location.href = github + "/releases/" + this.version.latest }) diff --git a/core/cluster.py b/core/cluster.py index 8617981..8b74043 100644 --- a/core/cluster.py +++ b/core/cluster.py @@ -38,14 +38,13 @@ ) VERSION = "" -fetch_version = "" version_path = Path("VERSION") if version_path.exists(): with open(Path("VERSION"), "r", encoding="utf-8") as f: VERSION = f.read().split("\n")[0] f.close() else: - fetch_version = VERSION = "Unknown" + VERSION = "Unknown" API_VERSION = "1.10.1" USER_AGENT = f"openbmclapi-cluster/{API_VERSION} python-openbmclapi/{VERSION}" BASE_URL = "https://openbmclapi.bangbang93.com/" @@ -325,12 +324,10 @@ async def __call__( self.checked = True more_files = {storage: [] for storage in storages.get_storages()} prefix_files = { - prefix: [] - for prefix in (prefix.to_bytes().hex() for prefix in range(256)) + prefix: [] for prefix in (prefix.to_bytes(1, "big").hex() for prefix in range(256)) } prefix_hash = { - prefix: [] - for prefix in (prefix.to_bytes().hex() for prefix in range(256)) + prefix: [] for prefix in (prefix.to_bytes(1, "big").hex() for prefix in range(256)) } for file in files: @@ -640,6 +637,42 @@ async def exists(self, hash: str) -> bool: return hash in self.files + async def get_size(self, hash: str) -> int: + return self.files.get(hash, self.empty).size + + async def write(self, hash: str, io: io.BytesIO) -> int: + async with self._client() as client: + path = self._endpoint(f"{hash[:2]}/{hash}") + await client.upload_to(io, path) + self.files[hash] = File(path, hash, len(io.getbuffer())) + return self.files[hash].size + + async def get_files(self, dir: str) -> list[str]: + return list((hash for hash in self.files.keys() if hash.startswith(dir))) + + async def get_hash(self, hash: str) -> str: + async with self._client() as session: + h = get_hash(hash) + async for data in await session.download_iter(self._endpoint(f"{hash[:2]}/{hash}")): + h.update(data) + return h.hexdigest() + + + async def get_files_size(self, dir: str) -> int: + return sum((file.size for hash, file in self.files.items() if hash.startswith(dir))) + + async def removes(self, hashs: list[str]) -> int: + success = 0 + async with self._client() as client: + for hash in hashs: + await client.clean(self._endpoint(f"{hash[:2]}/{hash}")) + success += 1 + return success + + + async def get_cache_stats(self) -> StatsCache: + return StatsCache() + class TypeStorage(Enum): FILE = "file" WEBDAV = "webdav" @@ -716,10 +749,20 @@ def __init__(self) -> None: self.downloader = FileDownloader() self.file_check = FileCheck(self.downloader) self._enable_timer: Optional[Task] = None + self.keepaliveTimer: Optional[Task] = None + self.keepaliveTimeoutTimer: Optional[Task] = None + self.keepalive_lock = asyncio.Lock() + def _message(self, message): logger.info(f"[Remote] {message}") if "信任度过低" in message: self.trusted = False + + async def emit(self, channel, data=None): + await self.sio.emit( + channel, data, callback=lambda x: Timer.delay(self.message, (channel, x)) + ) + async def init(self): if not self.sio.connected: try: @@ -729,15 +772,35 @@ async def init(self): transports=["websocket"], ) except: - logger.warn("Failed to connect to the main server. Retrying after 5s.") + logger.warn("Failed to connect to the main server, retrying after 5s.") Timer.delay(self.init, (), 5) return await self.start() + + async def start(self): await self.cert() - await self.reconnect() - - async def enable(self) -> None: + await self.file_check() + await self.enable() + async def cert(self): + await self.emit("request-cert") + async def enable(self): + if self.connected: + logger.debug("Still trying to enable cluster? You has been blocked. (\nFrom bangbang93:\n 谁他妈\n 一秒钟发了好几百个enable请求\n ban了解一下等我回杭州再看\n ban了先\n\n > Timestamp at 2024/3/30 14:07 GMT+8\n)") + return + self.connected = True + if self._enable_timer != None: + self._enable_timer.block() + self._enable_timer = Timer.delay(self.reconnect, (), 30) + await self._enable() + async def reconnect(self): + if self.connected: + await self.disable() + self.connected = False + logger.info("Retrying after 5s.") + await asyncio.sleep(5) + await self.enable() + async def _enable(self): storage_str = {"file": 0, "webdav": 0} self.trusted = True for storage in storages.get_storages(): @@ -761,16 +824,8 @@ async def enable(self) -> None: }, }, ) - self._enable_timer = Timer.delay( - self._enable_task, - (), - 30, - ) await dashboard.set_status("巡检中") - - async def _enable_task(self): - await self.reconnect() - + async def message(self, type, data: list[Any]): if len(data) == 1: data.append(None) @@ -879,9 +934,9 @@ async def get_cache_stats(self) -> StatsCache: github_api = "https://api.github.com" download_url = "" - async def check_update(): - global fetch_version, download_url + global fetched_version + fetched_version = "Unknown" async with aiohttp.ClientSession(base_url=github_api) as session: logger.info("Checking update...") try: @@ -890,28 +945,16 @@ async def check_update(): ) as req: req.raise_for_status() data = await req.json() - fetch_version = data["tag_name"] - if fetch_version != VERSION: - logger.success(f"New version found: {fetch_version}!") - download_url = data["zipball_url"] + fetched_version = data["tag_name"] + if fetched_version != VERSION: + logger.success(f"New version found: {fetched_version}!") await dashboard.trigger("version") - if True and utils.Version.from_string(VERSION) < utils.Version.from_string(fetch_version): - Timer.delay(download_latest) else: logger.info(f"Already up to date.") except aiohttp.ClientError as e: logger.error(f"An error occured whilst checking update: {e}.") Timer.delay(check_update, (), 3600) - -async def download_latest(): - async with aiohttp.ClientSession() as session: - async with session.get(download_url) as resp: - resp.raise_for_status() - async with aiofiles.open(f"./cache/latest_release-{fetch_version}.zip", "wb") as w: - await w.write(await resp.read()) - logger.success(f"Downloaded latest release. See './cache/latest_release-{fetch_version}.zip'") - async def init(): await check_update() global cluster diff --git a/core/dashboard.py b/core/dashboard.py index 76e04d8..55a48ac 100644 --- a/core/dashboard.py +++ b/core/dashboard.py @@ -129,11 +129,15 @@ async def process(type: str, data: Any): else StatsCache() ), } - if type == "storage": - return stats.get_storage_stats() - - -async def set_status_by_tqdm(text: str, pbar: tqdm, format=unit.format_numbers): + if type == "version": + return { + "cur": cluster.VERSION, + "latest": cluster.fetched_version + } + +async def set_status_by_tqdm( + text: str, pbar: tqdm, format=unit.format_numbers +): global cur_tqdm_text, cur_tqdm, cur_tqdm_unit, task_tqdm cur_tqdm_text = text cur_tqdm = pbar @@ -181,6 +185,11 @@ async def set_status(text): await _set_status(text) last_text = text +async def trigger(type: str, data: Any = None): + app = web.app + output = to_bytes(type, await process(type, data)) + for ws in app.get_websockets("/bmcl/"): + await ws.send(output.io.getvalue()) def to_bytes(type: str, data: Any): output = utils.DataOutputStream() diff --git a/core/utils.py b/core/utils.py index ca83202..63e8c61 100644 --- a/core/utils.py +++ b/core/utils.py @@ -393,49 +393,6 @@ def check_sign(hash: str, secret: str, s: str, e: str) -> bool: base64.urlsafe_b64encode(sha1.digest()).decode().strip("=") == s and time.time() * 1000 <= t ) - - -@dataclass -class Version: - major: int - minor: int - patch: int - @staticmethod - def from_string(version: str) -> 'Version': - if version.count(".") == 2: - v = [int(''.join((a for a in v if a.isdigit()))) for v in version.split(".", 2)] - return Version(v[0], v[1], v[2]) - return Version(0, 0, 0) - def __eq__(self, other: object): - if isinstance(other, Version): - return (self.major, self.minor, self.patch) == (other.major, other.minor, other.patch) - elif isinstance(other, tuple) and len(other) == 3 and all(isinstance(v, int) for v in other): - return (self.major, self.minor, self.patch) == other - return False - - def __lt__(self, other: object) -> bool: - if isinstance(other, Version): - return (self.major, self.minor, self.patch) < (other.major, other.minor, other.patch) - elif isinstance(other, tuple) and len(other) == 3 and all(isinstance(v, int) for v in other): - return (self.major, self.minor, self.patch) < other - return False - - def __gt__(self, other: object) -> bool: - if isinstance(other, Version): - return (self.major, self.minor, self.patch) > (other.major, other.minor, other.patch) - elif isinstance(other, tuple) and len(other) == 3 and all(isinstance(v, int) for v in other): - return (self.major, self.minor, self.patch) > other - return False - - def __ne__(self, other: object) -> bool: - return not self.__eq__(other) - - def __le__(self, other: object) -> bool: - return self.__eq__(other) or self.__lt__(other) - - def __ge__(self, other: object) -> bool: - return self.__eq__(other) or self.__gt__(other) - class MinecraftUtils: From 49f5c855c84191843798dd2e9dc73c35dd0771ff Mon Sep 17 00:00:00 2001 From: SilianZ Date: Sun, 31 Mar 2024 01:47:18 +0800 Subject: [PATCH 8/8] =?UTF-8?q?:bug:=20=E4=BF=AE=E5=A4=8D=E6=BC=8F?= =?UTF-8?q?=E6=B4=9E=E5=B9=B6=E8=BF=9B=E8=A1=8C=E6=A0=BC=E5=BC=8F=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- core/api.py | 2 +- core/cluster.py | 107 ++++++++++++++++++++++++++++++++-------------- core/config.py | 4 +- core/dashboard.py | 14 +++--- 4 files changed, 86 insertions(+), 41 deletions(-) diff --git a/core/api.py b/core/api.py index 444ec6d..4d3b677 100644 --- a/core/api.py +++ b/core/api.py @@ -46,7 +46,7 @@ class File: size: int last_hit: float = 0 last_access: float = 0 - expiry: Optional[float] = None + expiry: Optional[float] = None data: Optional[io.BytesIO] = None cache: bool = False diff --git a/core/cluster.py b/core/cluster.py index 8b74043..16ec598 100644 --- a/core/cluster.py +++ b/core/cluster.py @@ -102,7 +102,9 @@ async def fetchToken(self): logger.info("Fetched token.") except aiohttp.ClientError as e: - logger.error(f"An error occured whilst fetching token, retrying in 5s: {e}.") + logger.error( + f"An error occured whilst fetching token, retrying in 5s: {e}." + ) await asyncio.sleep(5) return self.fetchToken() @@ -324,10 +326,12 @@ async def __call__( self.checked = True more_files = {storage: [] for storage in storages.get_storages()} prefix_files = { - prefix: [] for prefix in (prefix.to_bytes(1, "big").hex() for prefix in range(256)) + prefix: [] + for prefix in (prefix.to_bytes(1, "big").hex() for prefix in range(256)) } prefix_hash = { - prefix: [] for prefix in (prefix.to_bytes(1, "big").hex() for prefix in range(256)) + prefix: [] + for prefix in (prefix.to_bytes(1, "big").hex() for prefix in range(256)) } for file in files: @@ -579,7 +583,15 @@ async def get_cache_stats(self) -> StatsCache: class WebDav(Storage): - def __init__(self, username: str, password: str, hostname: str, endpoint: str, token: Optional[str] = None, dav: str = "/dav") -> None: + def __init__( + self, + username: str, + password: str, + hostname: str, + endpoint: str, + token: Optional[str] = None, + dav: str = "/dav", + ) -> None: self.username = username self.password = password self.hostname = hostname @@ -591,31 +603,44 @@ def __init__(self, username: str, password: str, hostname: str, endpoint: str, t self.cache: dict[str, File] = {} self.empty = File("", "", 0) Timer.delay(self._list_all) + def _endpoint(self, file: str): return f"{self.endpoint}/{file.removeprefix('/')}" + def _client(self): - return webdav3_client.Client({ - "webdav_username": self.username, - "webdav_password": self.password, - "webdav_hostname": self.hostname + self.dav, - "webdav_token": self.token - }) + return webdav3_client.Client( + { + "webdav_username": self.username, + "webdav_password": self.password, + "webdav_hostname": self.hostname + self.dav, + "webdav_token": self.token, + } + ) + async def get(self, file: str) -> File: if file in self.cache and self.cache[file].expiry - 10 > time.time(): self.cache[file].cache = True self.cache[file].last_hit = time.time() return self.cache[file] - async with aiohttp.ClientSession(self.hostname, auth=aiohttp.BasicAuth(self.username, self.password)) as session: - async with session.get(self.dav + self._endpoint(file[:2] + "/" + file), allow_redirects=False) as resp: - f = File(self.dav + self._endpoint(file[:2] + "/" + file), file, size = int(resp.headers.get("Content-Length", 0))) + async with aiohttp.ClientSession( + self.hostname, auth=aiohttp.BasicAuth(self.username, self.password) + ) as session: + async with session.get( + self.dav + self._endpoint(file[:2] + "/" + file), allow_redirects=False + ) as resp: + f = File( + self.dav + self._endpoint(file[:2] + "/" + file), + file, + size=int(resp.headers.get("Content-Length", 0)), + ) if resp.status == 200: f.set_data(await resp.read()) elif resp.status // 100 == 3: f.path = resp.headers.get("Location") self.cache[file] = f return self.cache[file] - - async def _list_all(self, force = False): + + async def _list_all(self, force=False): if self.fetch and not force: return self.fetch = True @@ -626,16 +651,27 @@ async def _list_all(self, force = False): await dashboard.set_status_by_tqdm("正在获取 WebDav 文件列表中", pbar) for dir in (await client.list(self.endpoint))[1:]: pbar.update(1) - for file in (await client.list(self._endpoint(dir,), get_info=True))[1:]: - self.files[file['name']] = File(file['path'].removeprefix(f"/dav/{self.endpoint}/"), file['name'], int(file['size'])) + for file in ( + await client.list( + self._endpoint( + dir, + ), + get_info=True, + ) + )[1:]: + self.files[file["name"]] = File( + file["path"].removeprefix(f"/dav/{self.endpoint}/"), + file["name"], + int(file["size"]), + ) await asyncio.sleep(0) return self.files + async def exists(self, hash: str) -> bool: if not self.fetch: self.fetch = True await self._list_all() return hash in self.files - async def get_size(self, hash: str) -> int: return self.files.get(hash, self.empty).size @@ -649,17 +685,20 @@ async def write(self, hash: str, io: io.BytesIO) -> int: async def get_files(self, dir: str) -> list[str]: return list((hash for hash in self.files.keys() if hash.startswith(dir))) - + async def get_hash(self, hash: str) -> str: async with self._client() as session: h = get_hash(hash) - async for data in await session.download_iter(self._endpoint(f"{hash[:2]}/{hash}")): + async for data in await session.download_iter( + self._endpoint(f"{hash[:2]}/{hash}") + ): h.update(data) return h.hexdigest() - async def get_files_size(self, dir: str) -> int: - return sum((file.size for hash, file in self.files.items() if hash.startswith(dir))) + return sum( + (file.size for hash, file in self.files.items() if hash.startswith(dir)) + ) async def removes(self, hashs: list[str]) -> int: success = 0 @@ -669,10 +708,10 @@ async def removes(self, hashs: list[str]) -> int: success += 1 return success - async def get_cache_stats(self) -> StatsCache: return StatsCache() + class TypeStorage(Enum): FILE = "file" WEBDAV = "webdav" @@ -739,7 +778,6 @@ def is_enable(value): ) - class Cluster: def __init__(self) -> None: self.connected = False @@ -752,17 +790,17 @@ def __init__(self) -> None: self.keepaliveTimer: Optional[Task] = None self.keepaliveTimeoutTimer: Optional[Task] = None self.keepalive_lock = asyncio.Lock() - + def _message(self, message): logger.info(f"[Remote] {message}") if "信任度过低" in message: self.trusted = False - + async def emit(self, channel, data=None): await self.sio.emit( channel, data, callback=lambda x: Timer.delay(self.message, (channel, x)) ) - + async def init(self): if not self.sio.connected: try: @@ -777,22 +815,26 @@ async def init(self): return await self.start() - async def start(self): await self.cert() await self.file_check() await self.enable() + async def cert(self): await self.emit("request-cert") + async def enable(self): if self.connected: - logger.debug("Still trying to enable cluster? You has been blocked. (\nFrom bangbang93:\n 谁他妈\n 一秒钟发了好几百个enable请求\n ban了解一下等我回杭州再看\n ban了先\n\n > Timestamp at 2024/3/30 14:07 GMT+8\n)") + logger.debug( + "Still trying to enable cluster? You has been blocked. (\nFrom bangbang93:\n 谁他妈\n 一秒钟发了好几百个enable请求\n ban了解一下等我回杭州再看\n ban了先\n\n > Timestamp at 2024/3/30 14:07 GMT+8\n)" + ) return self.connected = True if self._enable_timer != None: self._enable_timer.block() self._enable_timer = Timer.delay(self.reconnect, (), 30) await self._enable() + async def reconnect(self): if self.connected: await self.disable() @@ -800,6 +842,7 @@ async def reconnect(self): logger.info("Retrying after 5s.") await asyncio.sleep(5) await self.enable() + async def _enable(self): storage_str = {"file": 0, "webdav": 0} self.trusted = True @@ -825,7 +868,7 @@ async def _enable(self): }, ) await dashboard.set_status("巡检中") - + async def message(self, type, data: list[Any]): if len(data) == 1: data.append(None) @@ -875,7 +918,7 @@ async def message(self, type, data: list[Any]): self.keepalive_lock.release() if type != "request-cert": logger.debug(type, data) - + async def start_keepalive(self, delay=0): if self.keepaliveTimer: self.keepaliveTimer.block() @@ -934,6 +977,7 @@ async def get_cache_stats(self) -> StatsCache: github_api = "https://api.github.com" download_url = "" + async def check_update(): global fetched_version fetched_version = "Unknown" @@ -955,6 +999,7 @@ async def check_update(): logger.error(f"An error occured whilst checking update: {e}.") Timer.delay(check_update, (), 3600) + async def init(): await check_update() global cluster diff --git a/core/config.py b/core/config.py index bdbb3d9..807ad96 100644 --- a/core/config.py +++ b/core/config.py @@ -34,7 +34,9 @@ def __init__(self, path: str) -> None: if self.file.exists(): self.load() else: - logger.warn(f"File config is not exists: '{self.file.absolute()}' create default configs.") + logger.warn( + f'''The config file "{self.file.absolute()}" doesn't exist, creating a default config file...''' + ) for key, value in defaults.items(): self.set(key, value) diff --git a/core/dashboard.py b/core/dashboard.py index 55a48ac..b739f23 100644 --- a/core/dashboard.py +++ b/core/dashboard.py @@ -130,14 +130,10 @@ async def process(type: str, data: Any): ), } if type == "version": - return { - "cur": cluster.VERSION, - "latest": cluster.fetched_version - } - -async def set_status_by_tqdm( - text: str, pbar: tqdm, format=unit.format_numbers -): + return {"cur": cluster.VERSION, "latest": cluster.fetched_version} + + +async def set_status_by_tqdm(text: str, pbar: tqdm, format=unit.format_numbers): global cur_tqdm_text, cur_tqdm, cur_tqdm_unit, task_tqdm cur_tqdm_text = text cur_tqdm = pbar @@ -185,12 +181,14 @@ async def set_status(text): await _set_status(text) last_text = text + async def trigger(type: str, data: Any = None): app = web.app output = to_bytes(type, await process(type, data)) for ws in app.get_websockets("/bmcl/"): await ws.send(output.io.getvalue()) + def to_bytes(type: str, data: Any): output = utils.DataOutputStream() output.writeString(type)