Skip to content

Commit

Permalink
Merge pull request #4 from moises-ai/ctx-manager
Browse files Browse the repository at this point in the history
Implements file download and conversion in parallel using ctx manager
  • Loading branch information
alvesfelipe committed Mar 10, 2023
2 parents 9da715e + ee3cb2e commit e9f31fd
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 10 deletions.
26 changes: 24 additions & 2 deletions maestro_worker_python/convert_files.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import tempfile
import logging
import subprocess
import concurrent.futures

from dataclasses import dataclass
from typing import List
from .response import ValidationError
from contextlib import contextmanager

logger = logging.getLogger(__name__)

Expand All @@ -16,11 +19,10 @@ def __init__(self, message):
@dataclass
class FileToConvert:
input_file_path: str
output_file_path: str
file_format: str
output_file_path: str = None
max_duration: int = 1200


def convert_files(convert_files: List[FileToConvert]):
logger.info(f"Converting {len(convert_files)} files")

Expand All @@ -37,6 +39,26 @@ def convert_files(convert_files: List[FileToConvert]):

logger.info(f"Finished converting {len(convert_files)} files")

@contextmanager
def convert_files_manager(convert_files: List[FileToConvert]):
try:
thread_list = []
list_objects = []
with concurrent.futures.ThreadPoolExecutor() as executor:
for convert_file in convert_files:
file_format = '.m4a' if convert_file.file_format == "m4a" else '.wav'
filename = tempfile.NamedTemporaryFile(suffix=file_format)
target_function = _convert_to_m4a if convert_file.file_format == "m4a" else _convert_to_wav
thread_list.append(
executor.submit(target_function, convert_file.input_file_path, filename.name, convert_file.max_duration)
)
list_objects.append(filename)
for thread in thread_list:
thread.result()
yield [obj.name for obj in list_objects]
finally:
for obj in list_objects:
obj.close()

def _convert_to_wav(input_file_path, output_file_path, max_duration):
_run_subprocess(f"ffmpeg -y -hide_banner -loglevel error -t {max_duration} -i {input_file_path} -ar 44100 {output_file_path}")
Expand Down
36 changes: 30 additions & 6 deletions maestro_worker_python/download_file.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,35 @@

import logging
import urllib.request
from urllib.parse import urlparse
import tempfile

from urllib.parse import urlparse
from urllib.request import urlretrieve
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
from contextlib import contextmanager
from typing import List
from dataclasses import dataclass

def download_file(signed_url: str):
logging.info(f"Downloading input: {urlparse(signed_url).path}")
file_name, headers = urllib.request.urlretrieve(signed_url)
def download_file(url: str):
logging.info(f"Downloading input: {urlparse(url).path}")
file_name, _ = urlretrieve(url)
logging.info(f"Downloaded input")
return file_name

@contextmanager
def download_files_manager(url_list: List[str]):
try:
thread_list = []
list_objects = []
for url in url_list:
filename = tempfile.NamedTemporaryFile()
logging.info("Downloading file from url -> %s, filename -> %s", url, filename.name)
with ThreadPoolExecutor(max_workers=20) as exe:
thread_list.append(exe.submit(urlretrieve, url, filename.name))
list_objects.append(filename)
for thread in as_completed(thread_list):
path, _ = thread.result()
logging.info("File downloaded to: %s", path)
yield [obj.name for obj in list_objects]
finally:
for obj in list_objects:
obj.close()
20 changes: 19 additions & 1 deletion tests/test_convert_files.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import os
import pytest
import hashlib
import subprocess
from pathlib import Path
from maestro_worker_python.convert_files import convert_files, FileToConvert, FileConversionError
from maestro_worker_python.convert_files import convert_files_manager, convert_files, FileToConvert, FileConversionError
from maestro_worker_python.response import ValidationError


Expand Down Expand Up @@ -90,6 +91,23 @@ def test_should_convert_valid_audio_file():
assert _get_hash(input_file_path) == _get_hash(output_file_path)
Path(output_file_path).unlink(missing_ok=True)

def test_should_convert_multiple_valid_audio_files_and_delete_after_context():
input_file_path, output_file_path = TEST_PATH / "silent.ogg", TEST_PATH / "silent.wav"
converted_files_list = []
with convert_files_manager(
[FileToConvert(
input_file_path=input_file_path,
output_file_path=output_file_path,
file_format="wav",
), FileToConvert(
input_file_path=input_file_path,
output_file_path=output_file_path,
file_format="wav",
)]
) as converted_files:
converted_files_list = converted_files
result = [os.path.exists(path) for path in converted_files_list]
assert all(result) == False

def _get_hash(file_name):
process = subprocess.run(
Expand Down
25 changes: 24 additions & 1 deletion tests/test_downlaod_file.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from maestro_worker_python.download_file import download_file
import os
from maestro_worker_python.download_file import download_file, download_files_manager


def test_download_file(httpserver):
Expand All @@ -8,3 +9,25 @@ def test_download_file(httpserver):
file_name = download_file(url)
with open(file_name) as f:
assert f.read() == "hello"

def test_download_files_manager(httpserver):
httpserver.expect_request("/test").respond_with_data("hello")
url = httpserver.url_for("/test?foo=bar")

files_content = []
with download_files_manager([url, url]) as downloaded_files:
for file in downloaded_files:
with open(file) as f:
files_content.append(f.read() == "hello")
assert all(files_content) == True

def test_download_files_manager_delete(httpserver):
httpserver.expect_request("/test").respond_with_data("hello")
url = httpserver.url_for("/test?foo=bar")

files_path_exists = []
with download_files_manager([url, url]) as downloaded_files:
files_path = downloaded_files
for path in files_path:
files_path_exists.append(os.path.exists(path))
assert all(files_path_exists) == False

0 comments on commit e9f31fd

Please sign in to comment.