Skip to content

Commit

Permalink
fix http Accept-Encoding without content_length and update ftp downlo…
Browse files Browse the repository at this point in the history
…ad using ftplib to replace aioftp
  • Loading branch information
yodeng committed Sep 19, 2022
1 parent faa0205 commit a17e8db
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 45 deletions.
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ hget是用于下载文件的命令行软件,支持http和ftp两种下载协议
+ Cython
+ requests
+ aiohttp
+ aioftp
+ tqdm


Expand Down
1 change: 0 additions & 1 deletion recipe/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ requirements:
- requests
- tqdm
- aiohttp
- aioftp
- boto3

about:
Expand Down
1 change: 0 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
Cython
requests
aiohttp
aioftp
boto3
tqdm
89 changes: 48 additions & 41 deletions src/src.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,46 +118,51 @@ async def download(self):
tasks.append(task)
await asyncio.gather(*tasks)

async def download_ftp(self):
def download_ftp(self):
u = urlparse(self.url)
host, filepath = u.hostname, u.path
port = u.port or aioftp.DEFAULT_PORT
#path_io = aioftp.pathio.PathIO(timeout=None)
if host and filepath:
host, self.filepath = u.hostname, u.path
port = u.port or 21
if host and self.filepath:
size = 0
if os.path.isfile(self.outfile):
size = os.path.getsize(self.outfile)
async with aioftp.Client.context(host, port) as client:
if await client.is_file(filepath):
stat = await client.stat(filepath)
self.content_length = int(stat["size"])
_thread.start_new_thread(
self.check_offset, (self.datatimeout,))
if os.getenv("RUN_HGET_FIRST") != 'false':
self.loger.info("Logging in as anonymous success")
self.loger.info("File size: %s (%d bytes)",
human_size(self.content_length), self.content_length)
self.loger.info("Starting download %s --> %s",
self.url, self.outfile)
with tqdm(disable=self.quite, total=int(self.content_length), initial=size, unit='', ascii=True, unit_scale=True, unit_divisor=1024) as bar:
self.loger.debug(
"Start %s %s", asyncio.current_task().get_name(), 'bytes={0}-{1}'.format(size, self.content_length))
async with client.download_stream(filepath, offset=size) as stream:
# async with path_io.open(self.outfile, mode=size and "ab" or "wb") as f:
chunk_size = self.chunk_size//100
self.rate_limiter.clamped_calls = max(
1, int(float(self.max_speed)/chunk_size))
self.rate_limiter.refresh()
with open(self.outfile, mode=size and "ab" or "wb") as f:
async for chunk in stream.iter_by_block(chunk_size):
if chunk:
# await f.write(block)
self.rate_limiter.wait()
f.write(chunk)
f.flush()
bar.update(len(chunk))
self.loger.debug(
"Finished %s %s", asyncio.current_task().get_name(), 'bytes={0}-{1}'.format(size, self.content_length))
ftp = FTP()
ftp.connect(host, port=port, timeout=self.datatimeout)
ftp.login()
_thread.start_new_thread(
self.check_offset, (self.datatimeout,))
self.content_length = ftp.size(self.filepath)
if os.getenv("RUN_HGET_FIRST") != 'false':
self.loger.info("Logging in as anonymous success")
self.loger.info("File size: %s (%d bytes)",
human_size(self.content_length), self.content_length)
self.loger.info("Starting download %s --> %s",
self.url, self.outfile)
with tqdm(disable=self.quite, total=int(self.content_length), initial=size, unit='', ascii=True, unit_scale=True, unit_divisor=1024) as bar:
self.loger.debug(
"Start %s %s", currentThread().name, 'bytes={0}-{1}'.format(size, self.content_length))
ftp.voidcmd('TYPE I')
with ftp.transfercmd("RETR " + self.filepath, rest=size) as conn:
self.chunk_size = self.chunk_size//100
self.rate_limiter.clamped_calls = max(
1, int(float(self.max_speed)/self.chunk_size))
self.rate_limiter.refresh()
with open(self.outfile, mode="ab") as f:
while True:
chunk = conn.recv(self.chunk_size)
if not chunk:
break
self.rate_limiter.wait()
f.write(chunk)
f.flush()
bar.update(len(chunk))
# ftp.voidresp()
self.loger.debug(
"Finished %s %s", currentThread().name, 'bytes={0}-{1}'.format(size, self.content_length))
try:
ftp.quit()
except:
ftp.close()

async def fetch(self, session, pbar=None, headers=None):
if headers:
Expand All @@ -179,16 +184,18 @@ async def fetch(self, session, pbar=None, headers=None):
self.loger.debug(
"Finished %s %s", asyncio.current_task().get_name(), headers["Range"])
else:
if not hasattr(session, "head_object"):
async with session.get(self.url, timeout=self.datatimeout, params=self.extra) as req:
return req
else:
if hasattr(session, "head_object"):
content_length = await self.loop.run_in_executor(
None,
self.get_s3_content_length,
session
)
return content_length
elif hasattr(session, "size"):
return session.size(self.filepath)
else:
async with session.get(self.url, headers=self.headers, timeout=self.datatimeout, params=self.extra) as req:
return req

def set_sem(self, n):
self.sem = asyncio.Semaphore(n)
Expand Down Expand Up @@ -264,7 +271,7 @@ def run(self):
self.loop.run_until_complete(self.download())
elif self.url.startswith("ftp"):
self.ftp = True
asyncio.run(self.download_ftp())
self.download_ftp()
elif self.url.startswith("s3"):
self.loop = asyncio.get_event_loop()
self.loop.run_until_complete(self.download_s3())
Expand Down
24 changes: 23 additions & 1 deletion src/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import socket
import struct
import asyncio
import aioftp
import argparse
import functools
import subprocess
Expand All @@ -20,6 +19,7 @@
from multiprocessing import cpu_count

from tqdm import tqdm
from ftplib import FTP
from boto3 import client
from botocore.config import Config
from botocore import UNSIGNED
Expand Down Expand Up @@ -60,6 +60,7 @@ async def send(self, conn: "Connection") -> "ClientResponse":

default_headers = {
"Connection": "close",
"Accept-Encoding": "identity",
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36',
}

Expand Down Expand Up @@ -265,3 +266,24 @@ def autoreloader(main_func, *args, **kwargs):
sys.exit(exit_code)
except KeyboardInterrupt:
pass


def download_ftp_file(host, ftpath, localpath, bar=None):
ftp = FTP()
ftp.connect(host, port=21)
ftp.login()
ftp.voidcmd('TYPE I')
length = ftp.size(ftpath)
s = 0
if os.path.isfile(localpath):
s = os.path.getsize(localpath)
if s >= length:
return
with ftp.transfercmd("RETR " + ftpath, rest=s) as conn:
with open(localpath, "ab") as fo:
while True:
data = conn.recv(1024)
if not data:
break
fo.write(data)
# ftp.voidresp()

0 comments on commit a17e8db

Please sign in to comment.