Skip to content

Commit

Permalink
add multiprocess support for modules usage
Browse files Browse the repository at this point in the history
  • Loading branch information
yodeng committed Sep 19, 2022
1 parent a17e8db commit 484dabf
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 7 deletions.
22 changes: 16 additions & 6 deletions src/src.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def __init__(self, url="", outfile="", threads=Chunk.MAX_AS, headers={}, quite=F
self.chunk_size = 100 * Chunk.OneK
self.ftp = False
self.startime = int(time.time())
self.current = current_process()
self.rate_limiter = RateLimit(
max(int(float(self.max_speed)/self.chunk_size), 1))

Expand Down Expand Up @@ -105,7 +106,9 @@ async def download(self):
self.url, self.outfile)
self.loger.debug("Ranges: %s, Sem: %s, Connections: %s, %s", self.parts,
self.threads, self.tcp_conn or 100, get_as_part(self.content_length))
with tqdm(disable=self.quite, total=int(self.content_length), initial=self.tqdm_init, unit='', ascii=True, unit_scale=True, unit_divisor=1024) as bar:
pos = len(
self.current._identity) and self.current._identity[0]-1 or None
with tqdm(position=pos, disable=self.quite, total=int(self.content_length), initial=self.tqdm_init, unit='', ascii=True, unit_scale=True, unit_divisor=1024) as bar:
tasks = []
self.rate_limiter.refresh()
for h_range in self.range_list:
Expand Down Expand Up @@ -138,7 +141,9 @@ def download_ftp(self):
human_size(self.content_length), self.content_length)
self.loger.info("Starting download %s --> %s",
self.url, self.outfile)
with tqdm(disable=self.quite, total=int(self.content_length), initial=size, unit='', ascii=True, unit_scale=True, unit_divisor=1024) as bar:
pos = len(
self.current._identity) and self.current._identity[0]-1 or None
with tqdm(position=pos, disable=self.quite, total=int(self.content_length), initial=size, unit='', ascii=True, unit_scale=True, unit_divisor=1024) as bar:
self.loger.debug(
"Start %s %s", currentThread().name, 'bytes={0}-{1}'.format(size, self.content_length))
ftp.voidcmd('TYPE I')
Expand Down Expand Up @@ -223,7 +228,9 @@ async def download_s3(self):
self.url, self.outfile)
self.loger.debug("Ranges: %s, Sem: %s, %s", self.parts,
self.threads, get_as_part(self.content_length))
with tqdm(disable=self.quite, total=int(self.content_length), initial=self.tqdm_init, unit='', ascii=True, unit_scale=True, unit_divisor=1024) as bar:
pos = len(
self.current._identity) and self.current._identity[0]-1 or None
with tqdm(position=pos, disable=self.quite, total=int(self.content_length), initial=self.tqdm_init, unit='', ascii=True, unit_scale=True, unit_divisor=1024) as bar:
self.rate_limiter.refresh()
with ThreadPoolExecutor(min(self.threads, MAX_S3_CONNECT)) as exector:
tasks = []
Expand Down Expand Up @@ -267,13 +274,13 @@ def run(self):
Done = False
try:
if self.url.startswith("http"):
self.loop = asyncio.get_event_loop()
self.loop = asyncio.new_event_loop()
self.loop.run_until_complete(self.download())
elif self.url.startswith("ftp"):
self.ftp = True
self.download_ftp()
elif self.url.startswith("s3"):
self.loop = asyncio.get_event_loop()
self.loop = asyncio.new_event_loop()
self.loop.run_until_complete(self.download_s3())
else:
self.loger.error("Only http/https or ftp urls allowed.")
Expand Down Expand Up @@ -320,7 +327,10 @@ def load_offset(self):

@property
def loger(self):
log = logging.getLogger()
if self.current.name == "MainProcess":
log = logging.getLogger()
else:
log = get_logger()
if self.quite:
log.setLevel(logging.ERROR)
return log
Expand Down
2 changes: 1 addition & 1 deletion src/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from math import floor, log10
from threading import Thread, currentThread, RLock
from urllib.parse import urlparse
from multiprocessing import cpu_count
from multiprocessing import cpu_count, current_process, get_logger

from tqdm import tqdm
from ftplib import FTP
Expand Down

0 comments on commit 484dabf

Please sign in to comment.