Skip to content

Commit

Permalink
Upload files for hns rename folder test in gcs bucket (#2216)
Browse files Browse the repository at this point in the history
* Adding function to check the consistency of config file

* error handling when config file dir not in bucket

* reduced code repetition

* Adding function to check the consistency of config file

* added check directory functions and its unit tests

* changed logging level to error

* added helper function to create files in the gcs bucket

* elaborate success case unit test

* failure case unit tests

* comments

* Nits
  • Loading branch information
anushka567 authored Aug 1, 2024
1 parent 1e56675 commit 21ecb42
Show file tree
Hide file tree
Showing 2 changed files with 192 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,37 @@

# limitations under the License.
# To run the script, run in terminal :
# python3 generate_folder_and_files.py <config-file.json>
# python3 generate_folders_and_files.py <config-file.json>

import argparse
import json
from datetime import datetime as dt
import logging
import os
import sys
import subprocess
from subprocess import Popen

OUTPUT_FILE = str(dt.now().isoformat()) + '.out'
TEMPORARY_DIRECTORY = './tmp/data_gen'
BATCH_SIZE = 100
LOG_ERROR = "error"
LOG_INFO = "info"

logging.basicConfig(
level=logging.ERROR,
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[logging.StreamHandler(sys.stdout)],
)
logger = logging.getLogger()


def _logmessage(message) -> None:
def _logmessage(message,type) -> None:
with open(OUTPUT_FILE, 'a') as out:
out.write(message)
logger.error(message)
if type == LOG_ERROR:
logger.error(message)
else:
logger.info(message)


def _check_for_config_file_inconsistency(config) -> (int):
Expand All @@ -50,30 +57,30 @@ def _check_for_config_file_inconsistency(config) -> (int):
0 if no inconsistencies are found, 1 otherwise.
"""
if "name" not in config:
_logmessage("Bucket name not specified")
_logmessage("Bucket name not specified",LOG_ERROR)
return 1

if "folders" in config:
if not ("num_folders" in config["folders"] or "folder_structure" in config[
"folders"]):
_logmessage("Key missing for folder")
_logmessage("Key missing for nested folder",LOG_ERROR)
return 1

if config["folders"]["num_folders"] != len(
config["folders"]["folder_structure"]):
_logmessage("Inconsistency in the folder structure")
_logmessage("Inconsistency in the folder structure",LOG_ERROR)
return 1

if "nested_folders" in config:
if not ("folder_name" in config["nested_folders"] or
"num_folders" in config["nested_folders"] or
"folder_structure" in config["nested_folders"]):
_logmessage("Key missing for nested folder")
_logmessage("Key missing for nested folder",LOG_ERROR)
return 1

if config["nested_folders"]["num_folders"] != len(
config["nested_folders"]["folder_structure"]):
_logmessage("Inconsistency in the nested folder")
_logmessage("Inconsistency in the nested folder",LOG_ERROR)
return 1

return 0
Expand All @@ -94,7 +101,7 @@ def _list_directory(path) -> list:
contents_url = contents.decode('utf-8').split('\n')[:-1]
return contents_url
except subprocess.CalledProcessError as e:
_logmessage(e.output.decode('utf-8'))
_logmessage(e.output.decode('utf-8'),LOG_ERROR)


def _compare_folder_structure(folder, folder_url) -> bool:
Expand Down Expand Up @@ -238,16 +245,63 @@ def _delete_existing_data_in_gcs_bucket(gcs_bucket)->(int):
'gcloud alpha storage rm -r gs://{}/*'.format(gcs_bucket), shell=True)
return 0
except subprocess.CalledProcessError as e:
_logmessage(e.output.decode('utf-8'))
_logmessage(e.output.decode('utf-8'),LOG_ERROR)
return 1


def _generate_files_and_upload_to_gcs_bucket(destination_blob_name, num_of_files,
file_size_unit, file_size,
filename_prefix) -> int:
for batch_start in range(1, num_of_files + 1, BATCH_SIZE):
for file_num in range(batch_start, batch_start + BATCH_SIZE):
if file_num > num_of_files:
break

file_name = '{}_{}'.format(filename_prefix, file_num)
temp_file = '{}/{}.txt'.format(TEMPORARY_DIRECTORY, file_name)

# Creating files in temporary folder:
with open(temp_file, 'wb') as out:
if (file_size_unit.lower() == 'gb'):
out.truncate(1024 * 1024 * 1024 * int(file_size))
if (file_size_unit.lower() == 'mb'):
out.truncate(1024 * 1024 * int(file_size))
if (file_size_unit.lower() == 'kb'):
out.truncate(1024 * int(file_size))
if (file_size_unit.lower() == 'b'):
out.truncate(int(file_size))

num_files = os.listdir(TEMPORARY_DIRECTORY)

if not num_files:
_logmessage("Files were not created locally",LOG_ERROR)
return 1

# Starting upload to the gcs bucket.
try:
subprocess.Popen(
'gcloud storage cp --recursive {}/* {}'.format(TEMPORARY_DIRECTORY,
destination_blob_name),
shell=True).communicate()
except subprocess.CalledProcessError as e:
_logmessage("Issue while uploading files to GCS bucket.Aborting...",LOG_ERROR)
return 1

# Delete local files from temporary directory.
subprocess.call('rm -rf {}/*'.format(TEMPORARY_DIRECTORY), shell=True)

# Writing number of files uploaded to output file after every batch uploads.
_logmessage('{}/{} files uploaded to {}\n'.format(len(num_files), num_of_files,
destination_blob_name),LOG_INFO)
return 0


if __name__ == '__main__':
argv = sys.argv
if len(argv) < 2:
raise TypeError('Incorrect number of arguments.\n'
'Usage: '
'python3 generate_files.py <config_file> [--keep_files]')
'python3 generate_folders_and_files.py <config_file> [--keep_files]')

parser = argparse.ArgumentParser()
parser.add_argument(
Expand All @@ -263,8 +317,8 @@ def _delete_existing_data_in_gcs_bucket(gcs_bucket)->(int):
args = parser.parse_args(argv[1:])

# Checking that gcloud is installed:
_logmessage('Checking whether gcloud is installed.\n')
process = Popen('gcloud version', shell=True)
_logmessage('Checking whether gcloud is installed.\n',LOG_INFO)
process = subprocess.Popen('gcloud version', shell=True)
process.communicate()
exit_code = process.wait()
if exit_code != 0:
Expand All @@ -287,5 +341,5 @@ def _delete_existing_data_in_gcs_bucket(gcs_bucket)->(int):
if not dir_structure_present:
exit_code = _delete_existing_data_in_gcs_bucket(directory_structure["name"])
if exit_code != 0:
print('Error while deleting bucket.Exiting...!')
print('Error while deleting content in bucket.Exiting...!')
subprocess.call('bash', shell=True)
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
import subprocess
import unittest
import generate_folders_and_files
from mock import patch, call
import mock
from mock import patch, call ,mock_open


class TestCheckForConfigFileInconsistency(unittest.TestCase):
Expand Down Expand Up @@ -126,7 +127,7 @@ def test_listing_at_non_existent_path(self, mock_logmessage,mock_check_output):
dir_list = generate_folders_and_files._list_directory("gs://fake_bkt")

self.assertEqual(dir_list, None)
mock_logmessage.assert_called_once_with('Error while listing')
mock_logmessage.assert_called_once_with('Error while listing','error')

@patch('subprocess.check_output')
def test_listing_directory(self, mock_check_output):
Expand Down Expand Up @@ -293,10 +294,10 @@ def test_deleting_failure(self, mock_logmessage,
._delete_existing_data_in_gcs_bucket("fake_bkt")

self.assertEqual(exit_code, 1)
mock_logmessage.assert_called_once_with('Error while deleting')
mock_logmessage.assert_called_once_with('Error while deleting','error')

@patch('subprocess.check_output')
def test_deleting_success(self,mock_check_output):
def test_deleting_success(self, mock_check_output):
mock_check_output.return_value = 0

exit_code = generate_folders_and_files \
Expand All @@ -305,5 +306,122 @@ def test_deleting_success(self,mock_check_output):
self.assertEqual(exit_code, 0)


class TestGenerateFilesAndUploadToGcsBucket(unittest.TestCase):

@patch('generate_folders_and_files.TEMPORARY_DIRECTORY', './tmp/data_gen')
@patch('generate_folders_and_files.BATCH_SIZE', 10)
@patch('generate_folders_and_files.LOG_INFO','info')
@patch('builtins.open', new_callable=mock_open)
@patch('os.listdir')
@patch('subprocess.Popen')
@patch('subprocess.call')
@patch('generate_folders_and_files._logmessage')
def test_files_generation_and_upload(self, mock_logmessage, mock_call,
mock_popen, mock_listdir, mock_open
):
"""
Tests that files are created,copied to destination bucket,deleted from the
temporary directory and the log message is written correctly.
"""
mock_listdir.return_value = ['file1.txt']
mock_popen.return_value.communicate.return_value = 0
destination_blob_name = 'gs://fake-bucket'
num_of_files = 1
file_size_unit = 'MB'
file_size = 1
filename_prefix = 'file'
temp_file='./tmp/data_gen/file_1.txt'
expected_size = 1024 * 1024 * int(file_size)

exit_code = generate_folders_and_files._generate_files_and_upload_to_gcs_bucket(
destination_blob_name, num_of_files, file_size_unit, file_size,
filename_prefix)

# Assert that temp_file is opened.
mock_open.assert_called_once_with(temp_file, 'wb')
# Assert that 'truncate' was called with the expected size.
mock_open.return_value.truncate.assert_called_once_with(expected_size)
# Assert that upload started to GCS bucket and exit code is 0 indicating
# successful upload.
mock_popen.assert_called_once_with(
f'gcloud storage cp --recursive {generate_folders_and_files.TEMPORARY_DIRECTORY}/* {destination_blob_name}',
shell=True)
self.assertEqual(exit_code, 0)
# Assert that files are deleted and correct logmessage is written.
mock_call.assert_called_once_with(
f'rm -rf {generate_folders_and_files.TEMPORARY_DIRECTORY}/*',
shell=True)
expected_log_message = f'{num_of_files}/{num_of_files} files uploaded to {destination_blob_name}\n'
mock_logmessage.assert_has_calls([call(expected_log_message,generate_folders_and_files.LOG_INFO)])

@patch('generate_folders_and_files.TEMPORARY_DIRECTORY', './tmp/data_gen')
@patch('generate_folders_and_files.BATCH_SIZE', 10)
@patch('builtins.open', new_callable=mock_open)
@patch('os.listdir')
def test_files_not_created_locally(self, mock_listdir, mock_open):
"""
Tests that files are created,copied to destination bucket,deleted from the
temporary directory and the log message is written correctly.
"""
mock_listdir.return_value = []
destination_blob_name = 'gs://fake-bucket'
num_of_files = 1
file_size_unit = 'MB'
file_size = 1
filename_prefix = 'file'
temp_file='./tmp/data_gen/file_1.txt'
expected_size = 1024 * 1024 * int(file_size)

exit_code = generate_folders_and_files._generate_files_and_upload_to_gcs_bucket(
destination_blob_name, num_of_files, file_size_unit, file_size,
filename_prefix)

# Assert that temp_file is opened.
mock_open.assert_has_calls([call(temp_file, 'wb')])
# Assert that 'truncate' was called with the expected size and file is
# created.
mock_open.return_value.truncate.assert_called_once_with(expected_size)
# Assert that error log message is written to logfile.
mock_open.assert_has_calls([call().write("Files were not created locally")])
self.assertEqual(exit_code, 1)

@patch('generate_folders_and_files.TEMPORARY_DIRECTORY', './tmp/data_gen')
@patch('generate_folders_and_files.BATCH_SIZE', 10)
@patch('builtins.open', new_callable=mock_open)
@patch('os.listdir')
@patch('subprocess.Popen')
def test_files_upload_failure(self, mock_popen, mock_listdir, mock_open):
"""
Tests that files are created,copied to destination bucket,deleted from the
temporary directory and the log message is written correctly.
"""
mock_listdir.return_value = ['file1.txt']
upload_cmd="gcloud storage cp --recursive ./tmp/data_gen/ gs://fake-bucket"
mock_popen.side_effect=subprocess.CalledProcessError(returncode=1,cmd=upload_cmd)
destination_blob_name = 'gs://fake-bucket'
num_of_files = 1
file_size_unit = 'MB'
file_size = 1
filename_prefix = 'file'
temp_file='./tmp/data_gen/file_1.txt'
expected_size = 1024 * 1024 * int(file_size)

exit_code = generate_folders_and_files._generate_files_and_upload_to_gcs_bucket(
destination_blob_name, num_of_files, file_size_unit, file_size,
filename_prefix)

# Assert that temp_file is opened.
mock_open.assert_has_calls([call(temp_file, 'wb')])
# Assert that 'truncate' was called with the expected size.
mock_open.return_value.truncate.assert_called_once_with(expected_size)
# Assert that upload to GCS bucket was attempted.
mock_popen.assert_called_once_with(
f'gcloud storage cp --recursive {generate_folders_and_files.TEMPORARY_DIRECTORY}/* {destination_blob_name}',
shell=True)
# Assert that except block is executed due to the upload failure.
mock_open.assert_has_calls([call().write('Issue while uploading files to GCS bucket.Aborting...')])
self.assertEqual(exit_code, 1)


if __name__ == '__main__':
unittest.main()

0 comments on commit 21ecb42

Please sign in to comment.