Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev/statistics #21

Merged
merged 11 commits into from
Mar 24, 2024
2 changes: 1 addition & 1 deletion bmclapi_dashboard/static/js/index.min.js

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion bmclapi_dashboard/static/js/ttb.min.js

Large diffs are not rendered by default.

17 changes: 12 additions & 5 deletions core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ def disconnect(self, client: ProxyClient):
if client not in self._tables:
return
self._tables.remove(client)
def get_origin_from_ip(self, ip: tuple[str, int]):
# ip is connected client
for target in self._tables:
if target.target.get_sock_address() == ip:
return target.origin.get_address()
return None

ssl_server: Optional[asyncio.Server] = None
server: Optional[asyncio.Server] = None
Expand All @@ -85,7 +91,7 @@ def disconnect(self, client: ProxyClient):
IO_BUFFER: int = Config.get("advanced.io_buffer")

async def _handle_ssl(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
return await _handle_process(Client(reader, writer), True)
return await _handle_process(Client(reader, writer, peername = proxy.get_origin_from_ip(writer.get_extra_info("peername"))), True)

async def _handle(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
return await _handle_process(Client(reader, writer))
Expand All @@ -101,7 +107,7 @@ async def _handle_process(client: Client, ssl: bool = False):
await client.writer.drain()
break
if protocol == Protocol.Unknown and not ssl and ssl_server:
target = Client(*(await asyncio.open_connection("127.0.0.1", ssl_server.sockets[0].getsockname()[1])))
target = Client(*(await asyncio.open_connection("127.0.0.1", ssl_server.sockets[0].getsockname()[1])), peername=client.get_address())
proxying = True
await proxy.connect(client, target, header)
break
Expand All @@ -121,7 +127,7 @@ async def check_ports():
global ssl_server, server, client_side_ssl, restart, check_port_key
while 1:
ports: list[tuple[asyncio.Server, ssl.SSLContext | None]] = []
for service in ((server, None), (ssl_server, client_side_ssl if get_loads() != 0 else None)):
for service in ((server, None), (ssl_server, client_side_ssl if get_loaded() else None)):
if not service[0]:
continue
ports.append((service[0], service[1]))
Expand All @@ -144,13 +150,14 @@ async def check_ports():
async def main():
global ssl_server, server, server_side_ssl, restart
await web.init()
certificate.load_cert(Path(".ssl/cert"), Path(".ssl/key"))
Timer.delay(check_ports, (), 5)
while 1:
try:
server = await asyncio.start_server(_handle, port=PORT)
ssl_server = await asyncio.start_server(_handle_ssl, port=0 if SSL_PORT == PORT else SSL_PORT, ssl=server_side_ssl if get_loads() != 0 else None)
ssl_server = await asyncio.start_server(_handle_ssl, port=0 if SSL_PORT == PORT else SSL_PORT, ssl=server_side_ssl if get_loaded() else None)
logger.info(f"Listening server on {PORT}")
logger.info(f"Listening server on {ssl_server.sockets[0].getsockname()[1]} Loaded certificates: {get_loads()}")
logger.info(f"Listening server on {ssl_server.sockets[0].getsockname()[1]}")
async with server, ssl_server:
await asyncio.gather(server.serve_forever(), ssl_server.serve_forever())
except asyncio.CancelledError:
Expand Down
15 changes: 13 additions & 2 deletions core/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class File:
last_hit: float = 0
last_access: float = 0
data: Optional[io.BytesIO] = None
cache: bool = False
def is_url(self):
if not isinstance(self.path, str):
return False
Expand All @@ -49,6 +50,11 @@ def set_data(self, data: io.BytesIO | memoryview | bytes):
data = io.BytesIO(data)
self.data = io.BytesIO(zlib.compress(data.getbuffer()))

@dataclass
class StatsCache:
total: int = 0
bytes: int = 0

class Storage(metaclass=abc.ABCMeta):
@abc.abstractmethod
async def get(self, file: str) -> File:
Expand All @@ -66,6 +72,9 @@ async def exists(self, hash: str) -> bool:
async def get_size(self, hash: str) -> int:
raise NotImplementedError
@abc.abstractmethod
async def copy(self, origin: Path, hash: str) -> int:
raise NotImplementedError
@abc.abstractmethod
async def write(self, hash: str, io: io.BytesIO) -> int:
raise NotImplementedError
@abc.abstractmethod
Expand All @@ -80,7 +89,9 @@ async def get_files_size(self, dir: str) -> int:
@abc.abstractmethod
async def removes(self, hashs: list[str]) -> int:
raise NotImplementedError

@abc.abstractmethod
async def get_cache_stats(self) -> StatsCache:
raise NotImplementedError

def get_hash(org):
if len(org) == 32:
Expand All @@ -92,7 +103,7 @@ def get_hash(org):
async def get_file_hash(org: str, path: Path):
hash = get_hash(org)
async with aiofiles.open(path, "rb") as r:
while data := await r.read(Config.get("io_buffer")):
while data := await r.read(Config.get("advanced.io_buffer")):
if not data:
break
hash.update(data)
Expand Down
16 changes: 9 additions & 7 deletions core/certificate.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
from pathlib import Path
import ssl
import time
Expand All @@ -13,22 +14,23 @@
client_side_ssl = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
client_side_ssl.check_hostname = False

_loads: int = 0
_loaded: bool = False

def load_cert(cert, key):
global server_side_ssl, client_side_ssl, _loads
global server_side_ssl, client_side_ssl, _loaded
if not os.path.exists(cert) or not os.path.exists(key):
return False
try:
server_side_ssl.load_cert_chain(cert, key)
client_side_ssl.load_verify_locations(cert)
_loads += 1
_loaded = True
return True
except:
logger.error("Failed to load certificate: ", traceback.format_exc())
return False

def get_loads() -> int:
global _loads
return _loads
def get_loaded() -> bool:
return _loaded

def load_text(cert: str, key: str):
t = time.time()
Expand All @@ -38,7 +40,7 @@ def load_text(cert: str, key: str):
c.write(cert)
k.write(key)
if load_cert(cert_file, key_file):
logger.info("Loaded certificate from text! Current: ", get_loads())
logger.info("Loaded certificate from text!")
core.restart = True
if core.server:
core.server.close()
Expand Down
93 changes: 72 additions & 21 deletions core/cluster.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import dataclasses
import hashlib
import hmac
import io
Expand All @@ -13,7 +14,7 @@
import socketio
from tqdm import tqdm
from core.config import Config
from core import certificate, unit
from core import certificate, system, unit
from core.timer import Task, Timer
import pyzstd as zstd
import core.utils as utils
Expand All @@ -25,6 +26,7 @@
from core.api import (
File,
BMCLAPIFile,
StatsCache,
Storage,
get_hash,
)
Expand Down Expand Up @@ -54,6 +56,7 @@ async def fetchToken(self):
async with aiohttp.ClientSession(
headers={"User-Agent": USER_AGENT}, base_url=BASE_URL
) as session:
logger.info("Fetching token")
try:
async with session.get(
"/openbmclapi-agent/challenge", params={"clusterId": CLUSTER_ID}
Expand All @@ -80,15 +83,14 @@ async def fetchToken(self):
Timer.delay(
self.fetchToken, delay=float(content["ttl"]) / 1000.0 - 600
)
logger.info("Fetched token")

except aiohttp.ClientError as e:
logger.error(f"Error fetching token: {e}.")

async def getToken(self) -> str:
if not self.token:
logger.info("Fetching token")
await self.fetchToken()
logger.info("Fetched token")
return self.token or ""

class ParseFileList:
Expand Down Expand Up @@ -152,6 +154,8 @@ async def _download(self, pbar: tqdm, session: aiohttp.ClientSession):
except:
pbar.update(-size)
await self.queues.put(file)
if cluster:
await cluster._check_files_sync_status("下载文件中", pbar, unit.format_more_bytes)
await session.close()

async def _mount_file(self, file: BMCLAPIFile):
Expand All @@ -166,9 +170,12 @@ async def _mount_file(self, file: BMCLAPIFile):
logger.error(traceback.format_exc())
if result != file.size:
logger.error(f"Download file error: File {file.hash}({unit.format_bytes(file.size)}) copy to target file error: {file.hash}({unit.format_bytes(result)})")
os.remove(f"./cache/download/{file.hash[:2]}/{file.hash}")
try:
os.remove(f"./cache/download/{file.hash[:2]}/{file.hash}")
except:
...
async def download(self, storages: list['Storage'], miss: list[BMCLAPIFile]):
with tqdm(desc="Downloading files", unit="bytes", unit_divisor=1024, total=sum((file.size for file in miss)), unit_scale=True) as pbar:
with tqdm(desc="Downloading files", unit="b", unit_divisor=1024, total=sum((file.size for file in miss)), unit_scale=True) as pbar:
self.storages = storages
for file in miss:
await self.queues.put(file)
Expand Down Expand Up @@ -197,13 +204,12 @@ def __init__(self, dir: Path) -> None:
raise FileExistsError("The path is file.")
self.dir.mkdir(exist_ok=True, parents=True)
self.cache: dict[str, File] = {}
self.stats: stats.StorageStats = stats.get_storage(f"File_{self.dir}")
self.timer = Timer.repeat(self.clear_cache, (), CHECK_CACHE, CHECK_CACHE)
async def get(self, hash: str) -> File:
if hash in self.cache:
file = self.cache[hash]
file.last_access = time.time()
self.stats.hit(file, cache = True)
file.cache = True
return file
path = Path(str(self.dir) + f"/{hash[:2]}/{hash}")
buf = io.BytesIO()
Expand All @@ -213,12 +219,17 @@ async def get(self, hash: str) -> File:
file = File(path, hash, buf.tell(), time.time(), time.time())
file.set_data(buf.getbuffer())
self.cache[hash] = file
self.stats.hit(file)
file.cache = False
return file
async def exists(self, hash: str) -> bool:
return os.path.exists(str(self.dir) + f"/{hash[:2]}/{hash}")
async def get_size(self, hash: str) -> int:
return os.path.getsize(str(self.dir) + f"/{hash[:2]}/{hash}")
async def copy(self, origin: Path, hash: str):
Path(str(self.dir) + f"/{hash[:2]}/{hash}").parent.mkdir(exist_ok=True, parents=True)
async with aiofiles.open(str(self.dir) + f"/{hash[:2]}/{hash}", "wb") as w, aiofiles.open(origin, "rb") as r:
await w.write(await r.read())
return origin.stat().st_size
async def write(self, hash: str, io: io.BytesIO) -> int:
Path(str(self.dir) + f"/{hash[:2]}/{hash}").parent.mkdir(exist_ok=True, parents=True)
async with aiofiles.open(str(self.dir) + f"/{hash[:2]}/{hash}", "wb") as w:
Expand Down Expand Up @@ -258,9 +269,10 @@ async def clear_cache(self):
logger.info(f"Outdate caches: {unit.format_number(len(old_keys))}({unit.format_bytes(old_size)})")
async def get_files(self, dir: str) -> list[str]:
files = []
with os.scandir(str(self.dir) + f"/{dir}") as session:
for file in session:
files.append(file.name)
if os.path.exists(str(self.dir) + f"/{dir}"):
with os.scandir(str(self.dir) + f"/{dir}") as session:
for file in session:
files.append(file.name)
return files
async def removes(self, hashs: list[str]) -> int:
success = 0
Expand All @@ -272,14 +284,25 @@ async def removes(self, hashs: list[str]) -> int:
return success
async def get_files_size(self, dir: str) -> int:
size = 0
with os.scandir(str(self.dir) + f"/{dir}") as session:
for file in session:
size += file.stat().st_size
if os.path.exists(str(self.dir) + f"/{dir}"):
with os.scandir(str(self.dir) + f"/{dir}") as session:
for file in session:
size += file.stat().st_size
return size
async def get_cache_stats(self) -> StatsCache:
stat = StatsCache()
for file in self.cache.values():
stat.total += 1
stat.bytes += file.size
return stat
class WebDav(Storage):
def __init__(self) -> None:
super().__init__()
class Cluster:
def __init__(self) -> None:
self.sio = socketio.AsyncClient()
self.storages: list[Storage] = []
self.storage_stats: dict[Storage, stats.StorageStats] = {}
self.started = False
self.sio.on("message", self._message)
self.cur_storage: Optional[stats.SyncStorage] = None
Expand All @@ -293,13 +316,21 @@ def _message(self, message):
logger.info(f"[Remote] {message}")
if "信任度过低" in message:
self.trusted = False
def get_storages(self):
return self.storages.copy()
def add_storage(self, storage):
self.storages.append(storage)
type = "Unknown"
key = time.time()
if isinstance(storage, FileStorage):
type = "File"
key = storage.dir
self.storage_stats[storage] = stats.get_storage(f"{type}_{key}")

async def _check_files_sync_status(self, text: str, pbar: tqdm):
async def _check_files_sync_status(self, text: str, pbar: tqdm, format = unit.format_numbers):
if self.check_files_timer:
return
n, total = unit.format_numbers(pbar.n, pbar.total)
n, total = format(pbar.n, pbar.total)
await set_status(f"{text} ({n}/{total})")

async def check_files(self):
Expand Down Expand Up @@ -377,7 +408,6 @@ async def check_files(self):
if paths:
for path in paths:
os.remove(path)
pbar.disable()
pbar.update(1)
if dir:
for d in dir:
Expand All @@ -402,8 +432,19 @@ async def start(self, ):
await self.check_files()
await set_status("启动服务")
await self.enable()
async def get(self, hash):
return await self.storages[0].get(hash)
async def get(self, hash) -> File:
storage = self.storages[0]
stat = self.storage_stats[storage]
file = await storage.get(hash)
stat.hit(file)
return file
async def get_cache_stats(self) -> StatsCache:
stat = StatsCache()
for storage in self.storages:
t = await storage.get_cache_stats()
stat.total += t.total
stat.bytes += t.bytes
return stat
async def exists(self, hash):
return await self.storages[0].exists(hash)
async def enable(self) -> None:
Expand Down Expand Up @@ -486,14 +527,16 @@ async def _keepalive(self):
"bytes": storage.get_total_bytes() - storage.get_last_bytes(),
})
await self.start_keepalive(300)
async def _keepalive_timeout(self):
logger.warn("Failed to keepalive? Reconnect the main")
async def reconnect(self):
try:
await self.disable()
except:
...
await self.cert()
await self.enable()
async def _keepalive_timeout(self):
logger.warn("Failed to keepalive? Reconnect the main")
await self.reconnect()
async def cert(self):
if Path("./.ssl/cert").exists() == Path("./.ssl/key").exists() == True:
return
Expand Down Expand Up @@ -574,6 +617,13 @@ async def process(type: str, data: Any):
async with aiohttp.ClientSession(BASE_URL) as session:
async with session.get(data) as resp:
return resp.json()
if type == "system":
return {
"memory": system.get_used_memory(),
"connections": system.get_connections(),
"cpu": system.get_cpus(),
"cache": dataclasses.asdict(await cluster.get_cache_stats()) if cluster else StatsCache()
}

token = TokenManager()
cluster: Optional[Cluster] = None
Expand All @@ -592,6 +642,7 @@ async def set_status(text: str):
async def init():
global cluster
cluster = Cluster()
system.init()
plugins.load_plugins()
for plugin in plugins.get_plugins():
await plugin.init()
Expand Down
Loading