diff --git a/main.py b/main.py index 8f6b902..71f48c5 100644 --- a/main.py +++ b/main.py @@ -1,283 +1,179 @@ -# NextDrop - High-Speed Data Pipeline - Ice Memory Version (Fixed) +# NextDrop - High-Speed Data Pipeline - Updated Version import math +from KamuJpModern.ModernLogging import ModernLogging import asyncio -import websockets +import aiohttp +import aiohttp.web import socket import os import sys -import zstandard as zstd +import zstandard as zstd # Changed from gzip to zstandard import argparse -from concurrent.futures import ProcessPoolExecutor -import json -import aiofiles -import logging -import hashlib -import multiprocessing -import psutil -import tempfile -from KamuJpModern import KamuJpModern +from concurrent.futures import ThreadPoolExecutor +from tqdm import tqdm +from io import BytesIO -# ロギングの設定 -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger('NextDrop') - -CHUNK_SIZE = 1 * 1024 * 1024 # 1MB +CHUNK_SIZE = 1024 * 1024 # 1MB +DEFAULT_MAX_CONCURRENT_TASKS = 100 # Maximum number of concurrent tasks COMPRESSION_LEVEL = 3 # zstd compression level (1-22) +logger = ModernLogging("NextDrop") class FileSender: - def __init__(self, target, port, file_path, compress=False, version="2.0"): + def __init__(self, target, port, file_path, num_threads=4, compress=False, version="2.0"): self.target = target self.port = port self.file_path = file_path + self.num_threads = num_threads self.compress = compress self.version = version - self.max_retries = 3 # 再試行回数 - self.chunk_hashes = {} - self.executor = None - self.initialize_executor() - - def initialize_executor(self): - cpu_count = multiprocessing.cpu_count() - self.executor = ProcessPoolExecutor(max_workers=cpu_count) - logger.info(f"ProcessPoolExecutor initialized with {cpu_count} workers.") + self.semaphore = asyncio.Semaphore(DEFAULT_MAX_CONCURRENT_TASKS) + self.zstd_compressor = zstd.ZstdCompressor(level=COMPRESSION_LEVEL) if self.compress else None async def send_file(self): file_size = os.path.getsize(self.file_path) if file_size == 0: total_chunks = 1 - logger.warning("File size is 0. Setting total_chunks to 1.") + logger.log("File size is 0. Setting total_chunks to 1.", "WARNING") else: total_chunks = math.ceil(file_size / CHUNK_SIZE) - uri = f'ws://{self.target}:{self.port}/upload' - async with websockets.connect(uri, max_size=None) as websocket: - # ファイル全体のハッシュ値を計算 - file_hash = await self.calculate_file_hash() - # メタデータの送信 - metadata = { - 'filename': os.path.basename(self.file_path), - 'total_chunks': total_chunks, - 'version': self.version, - 'compress': self.compress, - 'file_hash': file_hash - } - await websocket.send(json.dumps(metadata)) - logger.info(f"Sending file '{self.file_path}' to {self.target}:{self.port}") + if self.compress: + logger.log("Sending in compression mode.", "INFO") - # チャンクの送信 - async with aiofiles.open(self.file_path, 'rb') as f: - send_bar = KamuJpModern().modernProgressBar(total=total_chunks, process_name="Sending", process_color=32) - send_bar.start() - send_bar.notbusy() - for chunk_number in range(total_chunks): - data = await f.read(CHUNK_SIZE) + async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(limit=None)) as session: + tasks = [] + with open(self.file_path, 'rb') as f: + compress_bar = tqdm(total=total_chunks, desc="Processing", unit="chunk") + send_bar = tqdm(total=total_chunks, desc="Sending", unit="chunk") + for i in range(total_chunks): + chunk = f.read(CHUNK_SIZE) if self.compress: - # 非同期圧縮 - loop = asyncio.get_event_loop() - data = await loop.run_in_executor(self.executor, self.compress_data, data) - # チャンクのハッシュ値を計算 - chunk_hash = hashlib.sha256(data).hexdigest() - self.chunk_hashes[chunk_number] = chunk_hash - # メッセージの作成 - message = { - 'chunk_number': chunk_number, - 'data': data.decode('latin1'), # バイナリデータを文字列に変換 - 'chunk_hash': chunk_hash - } - await self.send_chunk(websocket, message, send_bar) - send_bar.finish() - # チャンクハッシュの送信 - await websocket.send(json.dumps({'chunk_hashes': self.chunk_hashes})) - logger.info("File transfer completed.") - - def compress_data(self, data): - self.adjust_executor() - compressor = zstd.ZstdCompressor(level=COMPRESSION_LEVEL) - return compressor.compress(data) - - def adjust_executor(self): - # システムのCPU負荷を取得 - cpu_usage = psutil.cpu_percent(interval=1) - cpu_count = multiprocessing.cpu_count() - - # CPU負荷が高い場合、ワーカー数を減らす - if cpu_usage > 80 and self.executor._max_workers > 1: - new_worker_count = max(1, self.executor._max_workers - 1) - self.executor._max_workers = new_worker_count - logger.info(f"High CPU usage detected ({cpu_usage}%). Reducing worker count to {new_worker_count}.") - # CPU負荷が低い場合、ワーカー数を増やす - elif cpu_usage < 50 and self.executor._max_workers < cpu_count: - new_worker_count = self.executor._max_workers + 1 - self.executor._max_workers = new_worker_count - logger.info(f"Low CPU usage detected ({cpu_usage}%). Increasing worker count to {new_worker_count}.") - - async def calculate_file_hash(self): - hasher = hashlib.sha256() - async with aiofiles.open(self.file_path, 'rb') as f: - while True: - data = await f.read(CHUNK_SIZE) - if not data: - break - hasher.update(data) - return hasher.hexdigest() - - async def send_chunk(self, websocket, message, send_bar): - for attempt in range(1, self.max_retries + 1): - try: - await websocket.send(json.dumps(message)) - # ACKの受信 - ack = await websocket.recv() - ack = json.loads(ack) - if ack.get('status') == 'ok': - send_bar.update() - break + compressed_chunk = self.zstd_compressor.compress(chunk) + data = compressed_chunk + else: + data = chunk + compress_bar.update(1) + await self.semaphore.acquire() + task = asyncio.create_task(self.send_chunk(session, data, i, total_chunks if i == 0 else None, send_bar)) + task.add_done_callback(lambda t: self.semaphore.release()) + tasks.append(task) + await asyncio.gather(*tasks) + compress_bar.close() + send_bar.close() + + async def send_chunk(self, session, chunk, chunk_number, total_chunks=None, send_bar=None): + url = f'http://{self.target}:{self.port}/upload?chunk_number={chunk_number}' + headers = { + 'nextdp-version': self.version + } + if chunk_number == 0: + headers['X-Filename'] = os.path.basename(self.file_path) + headers['X-Total-Chunks'] = str(total_chunks) + try: + async with session.post(url, data=chunk, headers=headers) as resp: + if resp.status != 200: + logger.log(f"Failed to send chunk {chunk_number}: Status {resp.status}", "ERROR") else: - raise Exception(f"Receiver reported error: {ack.get('error')}") - except Exception as e: - logger.error(f"Error sending chunk {message['chunk_number']} (Attempt {attempt}): {e}") - if attempt == self.max_retries: - logger.error(f"Failed to send chunk {message['chunk_number']} after {self.max_retries} attempts.") - sys.exit(1) - await asyncio.sleep(1) # 再試行前に待機 + if send_bar: + send_bar.update(1) + except Exception as e: + logger.log(f"Exception occurred while sending chunk {chunk_number}: {e}", "ERROR") class FileReceiver: - def __init__(self, port, save_dir): + def __init__(self, port, save_dir, compress=False): self.port = port self.save_dir = save_dir + self.chunks = {} + self.compress = compress + self.lock = asyncio.Lock() self.filename = None self.total_chunks = None - self.compress = False - self.file_hash = None - self.chunk_hashes = {} - self.max_retries = 3 # 再試行回数 - self.executor = None - self.initialize_executor() - self.temp_dir = tempfile.mkdtemp() # 一時ディレクトリの作成 - - def initialize_executor(self): - cpu_count = multiprocessing.cpu_count() - self.executor = ProcessPoolExecutor(max_workers=cpu_count) - logger.info(f"ProcessPoolExecutor initialized with {cpu_count} workers.") + self.receive_bar = None - def adjust_executor(self): - # システムのCPU負荷を取得 - cpu_usage = psutil.cpu_percent(interval=1) - cpu_count = multiprocessing.cpu_count() + async def handle_upload(self, request): + if request.path == '/upload' and request.method == 'POST': + try: + chunk_number = int(request.query.get('chunk_number', -1)) + if chunk_number == -1: + return aiohttp.web.Response(status=400, text="ERROR: Invalid chunk number") + + version = request.headers.get('nextdp-version', '1.0') - # CPU負荷が高い場合、ワーカー数を減らす - if cpu_usage > 80 and self.executor._max_workers > 1: - new_worker_count = max(1, self.executor._max_workers - 1) - self.executor._max_workers = new_worker_count - logger.info(f"High CPU usage detected ({cpu_usage}%). Reducing worker count to {new_worker_count}.") - # CPU負荷が低い場合、ワーカー数を増やす - elif cpu_usage < 50 and self.executor._max_workers < cpu_count: - new_worker_count = self.executor._max_workers + 1 - self.executor._max_workers = new_worker_count - logger.info(f"Low CPU usage detected ({cpu_usage}%). Increasing worker count to {new_worker_count}.") + data = await request.read() - async def start_server(self): - async def handler(websocket): - # メタデータの受信 - metadata_str = await websocket.recv() - metadata = json.loads(metadata_str) - self.filename = metadata['filename'] - self.total_chunks = metadata['total_chunks'] - self.compress = metadata['compress'] - self.file_hash = metadata['file_hash'] - logger.info(f"Receiving file '{self.filename}' with {self.total_chunks} chunks.") + # Get filename and total chunks from the first chunk + if chunk_number == 0: + self.filename = request.headers.get('X-Filename', f"received_file_{int(asyncio.get_event_loop().time())}") + self.total_chunks = int(request.headers.get('X-Total-Chunks', '1')) + self.receive_bar = tqdm(total=self.total_chunks, desc="Receiving", unit="chunk") + logger.log(f"nextdp-version: {version}", "INFO") - receive_bar = KamuJpModern().modernProgressBar(total=self.total_chunks, process_name="Receiving", process_color=32) - receive_bar.start() - receive_bar.notbusy() - # チャンクの受信 - for _ in range(self.total_chunks): - for attempt in range(1, self.max_retries + 1): - try: - message_str = await websocket.recv() - message = json.loads(message_str) - chunk_number = message['chunk_number'] - data = message['data'].encode('latin1') # 文字列をバイナリデータに変換 - chunk_hash = message['chunk_hash'] - # チャンクのハッシュ値を検証 - if hashlib.sha256(data).hexdigest() != chunk_hash: - raise Exception("Chunk hash mismatch.") - self.chunk_hashes[chunk_number] = chunk_hash - # チャンクを一時ファイルに保存 - await self.save_chunk_to_tempfile(chunk_number, data) - # ACKの送信 - await websocket.send(json.dumps({'status': 'ok'})) - receive_bar.update() - break - except Exception as e: - logger.error(f"Error receiving chunk {chunk_number} (Attempt {attempt}): {e}") - if attempt == self.max_retries: - logger.error(f"Failed to receive chunk {chunk_number} after {self.max_retries} attempts.") - await websocket.send(json.dumps({'status': 'error', 'error': str(e)})) - sys.exit(1) - await asyncio.sleep(1) # 再試行前に待機 - receive_bar.finish() + async with self.lock: + self.chunks[chunk_number] = data - # チャンクハッシュの受信 - chunk_hashes_str = await websocket.recv() - received_chunk_hashes = json.loads(chunk_hashes_str).get('chunk_hashes', {}) - # チャンクハッシュの検証 - if self.chunk_hashes != received_chunk_hashes: - logger.error("Chunk hashes do not match.") - sys.exit(1) + if self.receive_bar: + self.receive_bar.update(1) - # ファイルの保存 - await self.save_file() - logger.info("File received and saved successfully.") + # Save file if all chunks are received + if self.total_chunks is not None and len(self.chunks) == self.total_chunks: + self.receive_bar.close() + asyncio.create_task(self.save_file(version)) - server = await websockets.serve(handler, '0.0.0.0', self.port, max_size=None) - logger.info(f"Receiver is listening on port {self.port}") - await server.wait_closed() + return aiohttp.web.Response(status=200, text="Chunk received successfully") + except Exception as e: + logger.log(f"Server error: {e}", "ERROR") + return aiohttp.web.Response(status=500, text=f"Server error: {e}") + logger.log(f"Unknown request path or method: {request.method} {request.path}", "WARNING") + return aiohttp.web.Response(status=404, text="Not Found") - async def save_chunk_to_tempfile(self, chunk_number, data): - temp_file_path = os.path.join(self.temp_dir, f"chunk_{chunk_number}") - # 非同期ファイル書き込み - async with aiofiles.open(temp_file_path, 'wb') as temp_file: - await temp_file.write(data) - # メモリ上のデータを削除 - del data + async def start_server(self): + app = aiohttp.web.Application(client_max_size=1024 * 1024 * 1024 * 20) # Set maximum to 20GB + app.router.add_post('/upload', self.handle_upload) + runner = aiohttp.web.AppRunner(app) + await runner.setup() + site = aiohttp.web.TCPSite(runner, '0.0.0.0', self.port) + await site.start() + logger.log(f"Receiver is listening on port {self.port}", "INFO") + while True: + await asyncio.sleep(3600) + + async def save_file(self, version): + if not self.chunks: + logger.log("No chunks received. Aborting save.", "WARNING") + return + + sorted_chunks = sorted(self.chunks.items()) + file_data = b''.join([chunk for _, chunk in sorted_chunks]) + + if self.compress: + logger.log("Decompressing data.", "INFO") + try: + if version == "2.0": + dctx = zstd.ZstdDecompressor() + file_data = dctx.decompress(file_data) + else: + # Use gzip for version 1.0 + import gzip + file_data = gzip.decompress(file_data) + except Exception as e: + logger.log("Failed to decompress. Please check the sender's compression settings.", "ERROR") + return - async def save_file(self): save_path = os.path.join(self.save_dir, self.filename) - # ファイルの書き込み - async with aiofiles.open(save_path, 'wb') as final_file: - for chunk_number in range(self.total_chunks): - temp_file_path = os.path.join(self.temp_dir, f"chunk_{chunk_number}") - async with aiofiles.open(temp_file_path, 'rb') as temp_file: - data = await temp_file.read() - if self.compress: - # 非同期解凍 - loop = asyncio.get_event_loop() - data = await loop.run_in_executor(self.executor, self.decompress_data, data) - await final_file.write(data) - # 一時ファイルを削除してメモリを解放 - os.remove(temp_file_path) - # 一時ディレクトリを削除 - os.rmdir(self.temp_dir) - # ファイルのハッシュ値を検証 - if await self.calculate_file_hash(save_path) != self.file_hash: - logger.error("File hash does not match.") - sys.exit(1) - def decompress_data(self, data): - self.adjust_executor() - decompressor = zstd.ZstdDecompressor() - return decompressor.decompress(data) + try: + with open(save_path, 'wb') as f: + f.write(file_data) + logger.log(f"File '{self.filename}' saved to '{self.save_dir}'.", "INFO") + + self.chunks = {} + self.filename = None + self.total_chunks = None + self.receive_bar = None - async def calculate_file_hash(self, file_path): - hasher = hashlib.sha256() - async with aiofiles.open(file_path, 'rb') as f: - while True: - data = await f.read(CHUNK_SIZE) - if not data: - break - hasher.update(data) - return hasher.hexdigest() + except Exception as e: + logger.log(f"Error while saving file: {e}", "ERROR") def get_local_ip(): hostname = socket.gethostname() @@ -289,31 +185,35 @@ async def main(): # Send mode arguments send_parser = subparsers.add_parser('send', help='Send a file') - send_parser.add_argument('target', type=str, help='Target IP address or local address', default='127.0.0.1') + send_parser.add_argument('target', type=str, help='Target IP address or local address (ending with .local)', default='127.0.0.1') send_parser.add_argument('--port', type=int, help='Target port number', default=4321) send_parser.add_argument('file_path', type=str, help='Path of the file to send') + send_parser.add_argument('--threads', type=int, default=1, help='Number of threads to use (default: 1)') send_parser.add_argument('--compress', action='store_true', help='Compress the file before sending') - send_parser.add_argument('--version', type=str, default="2.0", help='Version of the nextdp protocol') + send_parser.add_argument('--version', type=str, default="2.0", help='Version of the nextdp protocol (default: 2.0)') # Receive mode arguments receive_parser = subparsers.add_parser('receive', help='Receive a file') receive_parser.add_argument('--port', type=int, help='Port number to receive on', default=4321) receive_parser.add_argument('save_dir', type=str, help='Directory to save the received file') + receive_parser.add_argument('--compress', action='store_true', help='Decompress the received file') args = parser.parse_args() if args.mode == 'send': if not os.path.isfile(args.file_path): - logger.error(f"Error: File '{args.file_path}' does not exist.") + logger.log(f"Error: File '{args.file_path}' does not exist.", "ERROR") sys.exit(1) - sender = FileSender(args.target, args.port, args.file_path, args.compress, args.version) + sender = FileSender(args.target, args.port, args.file_path, args.threads, args.compress, args.version) await sender.send_file() + logger.log("File transfer completed.", "INFO") elif args.mode == 'receive': if not os.path.isdir(args.save_dir): - logger.error(f"Error: Directory '{args.save_dir}' does not exist.") + logger.log(f"Error: Directory '{args.save_dir}' does not exist.", "ERROR") sys.exit(1) - receiver = FileReceiver(args.port, args.save_dir) - await receiver.start_server() + receiver = FileReceiver(args.port, args.save_dir, args.compress) + receiver_task = asyncio.create_task(receiver.start_server()) + await receiver_task else: parser.print_help() sys.exit(1) @@ -322,4 +222,4 @@ async def main(): try: asyncio.run(main()) except KeyboardInterrupt: - logger.info("Keyboard interrupt detected. Exiting...") + logger.log("Keyboard interrupt detected. Exiting...", "INFO")