Skip to content

Commit

Permalink
Update main.py
Browse files Browse the repository at this point in the history
  • Loading branch information
DiamondGotCat authored Nov 26, 2024
1 parent 6598e6e commit 68a795d
Showing 1 changed file with 15 additions and 8 deletions.
23 changes: 15 additions & 8 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import math
from KamuJpModern.ModernLogging import ModernLogging
from KamuJpModern.ModernProgressBar import ModernProgressBar
import asyncio
import aiohttp
import aiohttp.web
Expand All @@ -11,7 +12,6 @@
import zstandard as zstd # Changed from gzip to zstandard
import argparse
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm
from io import BytesIO

CHUNK_SIZE = 1024 * 1024 # 1MB
Expand Down Expand Up @@ -44,23 +44,28 @@ async def send_file(self):
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(limit=None)) as session:
tasks = []
with open(self.file_path, 'rb') as f:
compress_bar = tqdm(total=total_chunks, desc="Processing", unit="chunk")
send_bar = tqdm(total=total_chunks, desc="Sending", unit="chunk")
compress_bar = ModernProgressBar(total=total_chunks, process_name="Processing", process_color=32)
send_bar = ModernProgressBar(total=total_chunks, process_name="Sending", process_color=32)
compress_bar.start()
send_bar.start()
compress_bar.notbusy()
send_bar.notbusy()
for i in range(total_chunks):
chunk = f.read(CHUNK_SIZE)
if self.compress:
compressed_chunk = self.zstd_compressor.compress(chunk)
compress_bar.update()
data = compressed_chunk
else:
data = chunk
compress_bar.update(1)
compress_bar.update()
await self.semaphore.acquire()
task = asyncio.create_task(self.send_chunk(session, data, i, total_chunks if i == 0 else None, send_bar))
task.add_done_callback(lambda t: self.semaphore.release())
tasks.append(task)
await asyncio.gather(*tasks)
compress_bar.close()
send_bar.close()
compress_bar.finish()
send_bar.finish()

async def send_chunk(self, session, chunk, chunk_number, total_chunks=None, send_bar=None):
url = f'http://{self.target}:{self.port}/upload?chunk_number={chunk_number}'
Expand Down Expand Up @@ -106,7 +111,9 @@ async def handle_upload(self, request):
if chunk_number == 0:
self.filename = request.headers.get('X-Filename', f"received_file_{int(asyncio.get_event_loop().time())}")
self.total_chunks = int(request.headers.get('X-Total-Chunks', '1'))
self.receive_bar = tqdm(total=self.total_chunks, desc="Receiving", unit="chunk")
self.receive_bar = ModernProgressBar(total=self.total_chunks, process_name="Receiving", process_color=32)
self.receive_bar.start()
self.receive_bar.notbusy()
logger.log(f"nextdp-version: {version}", "INFO")

async with self.lock:
Expand All @@ -117,7 +124,7 @@ async def handle_upload(self, request):

# Save file if all chunks are received
if self.total_chunks is not None and len(self.chunks) == self.total_chunks:
self.receive_bar.close()
self.receive_bar.finish()
asyncio.create_task(self.save_file(version))

return aiohttp.web.Response(status=200, text="Chunk received successfully")
Expand Down

0 comments on commit 68a795d

Please sign in to comment.