Skip to content

Commit

Permalink
fix download file methods to allow url redirect
Browse files Browse the repository at this point in the history
  • Loading branch information
alvesfelipe committed Dec 4, 2023
1 parent e7b53b2 commit 2ed05de
Show file tree
Hide file tree
Showing 3 changed files with 325 additions and 165 deletions.
71 changes: 46 additions & 25 deletions maestro_worker_python/download_file.py
Original file line number Diff line number Diff line change
@@ -1,43 +1,64 @@
from __future__ import annotations

import os
import logging
import tempfile
import requests

from urllib.parse import urlparse
from urllib.request import urlretrieve
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
from contextlib import contextmanager


def download_file(url: str):
logging.info(f"Downloading input: {urlparse(url).path}")
file_name, _ = urlretrieve(url)
logging.info(f"Downloaded input")
return file_name
response = requests.get(url, allow_redirects=True)

if response.status_code == 200:
file_name = urlparse(url).path.split('/')[-1] or 'downloaded_file'
file_name = os.path.join(tempfile.gettempdir(), file_name)
with open(file_name, 'wb') as file:
file.write(response.content)

logging.info(f"Downloaded input to {file_name}")
return file_name
else:
logging.error(f"Failed to download file: HTTP {response.status_code}")
return None



@contextmanager
def download_files_manager(*urls: str) -> None | str | list[str]:
def download_files_manager(*urls: str):
try:
thread_list = []
list_objects = []
for url in urls:
filename = tempfile.NamedTemporaryFile()
logging.info("Downloading file from url -> %s, filename -> %s", url, filename.name)
with ThreadPoolExecutor(max_workers=20) as exe:
thread_list.append(exe.submit(urlretrieve, url, filename.name))
list_objects.append(filename)
for thread in as_completed(thread_list):
path, _ = thread.result()
logging.info("File downloaded to: %s", path)
downloaded_files = [obj.name for obj in list_objects]
if len(downloaded_files) == 0:
yield None
elif len(downloaded_files) == 1:
yield downloaded_files[0]
else:
yield downloaded_files
with ThreadPoolExecutor(max_workers=20) as executor:
future_to_url = {executor.submit(download_file, url): url for url in urls}

downloaded_files = []
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
file_name = future.result()
if file_name:
downloaded_files.append(file_name)
logging.info(f"File downloaded from {url} to {file_name}")
else:
logging.error(f"Failed to download file from {url}")
except Exception as exc:
logging.error(f"Error downloading file from {url}: {exc}")

if len(downloaded_files) == 0:
yield None
elif len(downloaded_files) == 1:
yield downloaded_files[0]
else:
yield downloaded_files

finally:
for obj in list_objects:
obj.close()
for file_name in downloaded_files:
try:
os.remove(file_name)
logging.info(f"Deleted temporary file {file_name}")
except Exception as exc:
logging.warning(f"Could not delete temporary file {file_name}: {exc}")
Loading

0 comments on commit 2ed05de

Please sign in to comment.