From ee3cb2e354e29adb4a0380eec8a8329f6ab43a4a Mon Sep 17 00:00:00 2001 From: Felipe Alves Date: Fri, 10 Mar 2023 06:05:58 +0000 Subject: [PATCH] Implements file download and conversion in parallel using context manager --- maestro_worker_python/convert_files.py | 26 +++++++++++++++++-- maestro_worker_python/download_file.py | 36 +++++++++++++++++++++----- tests/test_convert_files.py | 20 +++++++++++++- tests/test_downlaod_file.py | 25 +++++++++++++++++- 4 files changed, 97 insertions(+), 10 deletions(-) diff --git a/maestro_worker_python/convert_files.py b/maestro_worker_python/convert_files.py index f315672..3176578 100644 --- a/maestro_worker_python/convert_files.py +++ b/maestro_worker_python/convert_files.py @@ -1,9 +1,12 @@ +import tempfile import logging import subprocess import concurrent.futures + from dataclasses import dataclass from typing import List from .response import ValidationError +from contextlib import contextmanager logger = logging.getLogger(__name__) @@ -16,11 +19,10 @@ def __init__(self, message): @dataclass class FileToConvert: input_file_path: str - output_file_path: str file_format: str + output_file_path: str = None max_duration: int = 1200 - def convert_files(convert_files: List[FileToConvert]): logger.info(f"Converting {len(convert_files)} files") @@ -37,6 +39,26 @@ def convert_files(convert_files: List[FileToConvert]): logger.info(f"Finished converting {len(convert_files)} files") +@contextmanager +def convert_files_manager(convert_files: List[FileToConvert]): + try: + thread_list = [] + list_objects = [] + with concurrent.futures.ThreadPoolExecutor() as executor: + for convert_file in convert_files: + file_format = '.m4a' if convert_file.file_format == "m4a" else '.wav' + filename = tempfile.NamedTemporaryFile(suffix=file_format) + target_function = _convert_to_m4a if convert_file.file_format == "m4a" else _convert_to_wav + thread_list.append( + executor.submit(target_function, convert_file.input_file_path, filename.name, convert_file.max_duration) + ) + list_objects.append(filename) + for thread in thread_list: + thread.result() + yield [obj.name for obj in list_objects] + finally: + for obj in list_objects: + obj.close() def _convert_to_wav(input_file_path, output_file_path, max_duration): _run_subprocess(f"ffmpeg -y -hide_banner -loglevel error -t {max_duration} -i {input_file_path} -ar 44100 {output_file_path}") diff --git a/maestro_worker_python/download_file.py b/maestro_worker_python/download_file.py index b9cbe95..f278c2c 100644 --- a/maestro_worker_python/download_file.py +++ b/maestro_worker_python/download_file.py @@ -1,11 +1,35 @@ - import logging -import urllib.request -from urllib.parse import urlparse +import tempfile +from urllib.parse import urlparse +from urllib.request import urlretrieve +from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import as_completed +from contextlib import contextmanager +from typing import List +from dataclasses import dataclass -def download_file(signed_url: str): - logging.info(f"Downloading input: {urlparse(signed_url).path}") - file_name, headers = urllib.request.urlretrieve(signed_url) +def download_file(url: str): + logging.info(f"Downloading input: {urlparse(url).path}") + file_name, _ = urlretrieve(url) logging.info(f"Downloaded input") return file_name + +@contextmanager +def download_files_manager(url_list: List[str]): + try: + thread_list = [] + list_objects = [] + for url in url_list: + filename = tempfile.NamedTemporaryFile() + logging.info("Downloading file from url -> %s, filename -> %s", url, filename.name) + with ThreadPoolExecutor(max_workers=20) as exe: + thread_list.append(exe.submit(urlretrieve, url, filename.name)) + list_objects.append(filename) + for thread in as_completed(thread_list): + path, _ = thread.result() + logging.info("File downloaded to: %s", path) + yield [obj.name for obj in list_objects] + finally: + for obj in list_objects: + obj.close() \ No newline at end of file diff --git a/tests/test_convert_files.py b/tests/test_convert_files.py index 80dc5c2..e55d41e 100644 --- a/tests/test_convert_files.py +++ b/tests/test_convert_files.py @@ -1,8 +1,9 @@ +import os import pytest import hashlib import subprocess from pathlib import Path -from maestro_worker_python.convert_files import convert_files, FileToConvert, FileConversionError +from maestro_worker_python.convert_files import convert_files_manager, convert_files, FileToConvert, FileConversionError from maestro_worker_python.response import ValidationError @@ -90,6 +91,23 @@ def test_should_convert_valid_audio_file(): assert _get_hash(input_file_path) == _get_hash(output_file_path) Path(output_file_path).unlink(missing_ok=True) +def test_should_convert_multiple_valid_audio_files_and_delete_after_context(): + input_file_path, output_file_path = TEST_PATH / "silent.ogg", TEST_PATH / "silent.wav" + converted_files_list = [] + with convert_files_manager( + [FileToConvert( + input_file_path=input_file_path, + output_file_path=output_file_path, + file_format="wav", + ), FileToConvert( + input_file_path=input_file_path, + output_file_path=output_file_path, + file_format="wav", + )] + ) as converted_files: + converted_files_list = converted_files + result = [os.path.exists(path) for path in converted_files_list] + assert all(result) == False def _get_hash(file_name): process = subprocess.run( diff --git a/tests/test_downlaod_file.py b/tests/test_downlaod_file.py index 8f5f3f2..4a04a1e 100644 --- a/tests/test_downlaod_file.py +++ b/tests/test_downlaod_file.py @@ -1,4 +1,5 @@ -from maestro_worker_python.download_file import download_file +import os +from maestro_worker_python.download_file import download_file, download_files_manager def test_download_file(httpserver): @@ -8,3 +9,25 @@ def test_download_file(httpserver): file_name = download_file(url) with open(file_name) as f: assert f.read() == "hello" + +def test_download_files_manager(httpserver): + httpserver.expect_request("/test").respond_with_data("hello") + url = httpserver.url_for("/test?foo=bar") + + files_content = [] + with download_files_manager([url, url]) as downloaded_files: + for file in downloaded_files: + with open(file) as f: + files_content.append(f.read() == "hello") + assert all(files_content) == True + +def test_download_files_manager_delete(httpserver): + httpserver.expect_request("/test").respond_with_data("hello") + url = httpserver.url_for("/test?foo=bar") + + files_path_exists = [] + with download_files_manager([url, url]) as downloaded_files: + files_path = downloaded_files + for path in files_path: + files_path_exists.append(os.path.exists(path)) + assert all(files_path_exists) == False \ No newline at end of file