Skip to content

Commit

Permalink
remove AIRFLOW_DAGS env var dependency
Browse files Browse the repository at this point in the history
- this info is passed as input variable when instantiating the class
  • Loading branch information
lforesta committed Sep 24, 2020
1 parent eb14e38 commit 7973720
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 33 deletions.
19 changes: 11 additions & 8 deletions src/eodc_openeo_bindings/job_writer/dag_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def __init__(self, job_id_extensions: Optional[Dict[str, str]] = None):
def get_domain(self,
job_id: str,
user_name: str,
dags_folder: str,
process_graph_json: Union[str, dict],
job_data: str,
process_defs: Union[dict, list, str],
Expand All @@ -37,6 +38,7 @@ def get_domain(self,
return AirflowDagDomain(job_id=job_id,
job_id_extension=self.job_id_extensions,
user_name=user_name,
dags_folder=dags_folder,
process_graph_json=process_graph_json,
job_data=job_data,
process_defs=process_defs,
Expand All @@ -49,31 +51,32 @@ def get_domain(self,
add_parallel_sensor=add_parallel_sensor,
)

def write_job(self, job_id: str, user_name: str, process_graph_json: Union[str, dict], job_data: str,
def write_job(self, job_id: str, user_name: str, dags_folder: str, process_graph_json: Union[str, dict], job_data: str,
process_defs: Union[dict, list, str], in_filepaths: List[str],
user_email: str = None, job_description: str = None, parallelize_tasks: bool = False,
vrt_only: bool = False, add_delete_sensor: bool = False, add_parallel_sensor: bool = False):
return super().write_job(job_id=job_id, user_name=user_name, process_graph_json=process_graph_json,
return super().write_job(job_id=job_id, user_name=user_name, dags_folder=dags_folder, process_graph_json=process_graph_json,
job_data=job_data, process_defs=process_defs, in_filepaths=in_filepaths, user_email=user_email, job_description=job_description,
parallelize_tasks=parallelize_tasks, vrt_only=vrt_only,
add_delete_sensor=add_delete_sensor, add_parallel_sensor=add_parallel_sensor)

def move_dag(self, filepath: str):
def move_dag(self, filepath: str, dags_folder: str):
# Move file to DAGs folder (must copy/delete because of different volume mounts)
copyfile(filepath, os.environ.get('AIRFLOW_DAGS') + "/" + filepath)
#copyfile(filepath, os.environ.get('AIRFLOW_DAGS') + "/" + filepath)
copyfile(filepath, dags_folder + "/" + filepath)
os.remove(filepath)

def write_and_move_job(self, job_id: str, user_name: str, process_graph_json: Union[str, dict], job_data: str,
def write_and_move_job(self, job_id: str, user_name: str, dags_folder: str, process_graph_json: Union[str, dict], job_data: str,
process_defs: Union[dict, list, str], in_filepaths: List[str],
user_email: str = None, job_description: str = None, parallelize_tasks: bool = False,
vrt_only: bool = False, add_delete_sensor: bool = False, add_parallel_sensor: bool = False):
_, domain = self.write_job(job_id, user_name, process_graph_json, job_data, process_defs, in_filepaths, user_email, job_description,
_, domain = self.write_job(job_id, user_name, dags_folder, process_graph_json, job_data, process_defs, in_filepaths, user_email, job_description,
parallelize_tasks, vrt_only, add_delete_sensor, add_parallel_sensor)
self.move_dag(domain.filepath)
self.move_dag(domain.filepath, domain.dags_folder)

def rewrite_and_move_job(self, domain: AirflowDagDomain):
_, domain = self.rewrite_job(domain)
self.move_dag(domain.filepath)
self.move_dag(domain.filepath, domain.dags_folder)

def get_imports(self, domain: AirflowDagDomain) -> str:
imports = '''\
Expand Down
2 changes: 2 additions & 0 deletions src/eodc_openeo_bindings/job_writer/job_domain.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class AirflowDagDomain(JobDomain):
def __init__(self,
job_id: str,
job_id_extension: JobIdExtension,
dags_folder: str,
user_name: str,
process_graph_json: Union[str, dict],
job_data: str,
Expand Down Expand Up @@ -68,6 +69,7 @@ def __init__(self,
self.in_filepaths = in_filepaths
self.job_id = job_id
self.job_id_extension = job_id_extension
self.dags_folder = dags_folder
self.user_name = user_name
self.user_email = user_email
self.job_description = job_description if job_description else 'No description provided'
Expand Down
14 changes: 8 additions & 6 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,18 +83,20 @@ def backend_processes():


@pytest.fixture()
def setup_airflow_dag_folder(request):
def airflow_dag_folder(request):
test_folder = get_test_folder()
os.environ['AIRFLOW_DAGS'] = os.path.join(test_folder, 'airflow_dag')
if os.path.isdir(os.environ['AIRFLOW_DAGS']):
shutil.rmtree(os.environ['AIRFLOW_DAGS'])
os.makedirs(os.environ['AIRFLOW_DAGS'])
dags_folder = os.path.join(test_folder, 'airflow_dag')
if os.path.isdir(dags_folder):
shutil.rmtree(dags_folder)
os.makedirs(dags_folder)

def fin():
shutil.rmtree(os.environ['AIRFLOW_DAGS'])
shutil.rmtree(dags_folder)

request.addfinalizer(fin)

return dags_folder


@pytest.fixture()
def setup_ref_job_folder():
Expand Down
38 changes: 19 additions & 19 deletions tests/test_write_airflow_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,67 +15,67 @@ def get_ref_node_from_name(name, all_nodes):


def test_airflow_dag(evi_file,
setup_airflow_dag_folder, setup_ref_job_folder,
airflow_dag_folder, setup_ref_job_folder,
backend_processes, S2_filepaths):

job_id = "jb-12345"
out_filepath = os.path.join(os.environ['AIRFLOW_DAGS'], f'dag_{job_id}_prep.py')
out_filepath = os.path.join(airflow_dag_folder, f'dag_{job_id}_prep.py')
user_name = "jdoe_67890"

writer = AirflowDagWriter()
writer.write_and_move_job(job_id=job_id, user_name=user_name, process_graph_json=evi_file, job_data='./openeo_job',
writer.write_and_move_job(job_id=job_id, user_name=user_name, dags_folder=airflow_dag_folder, process_graph_json=evi_file, job_data='./openeo_job',
process_defs=backend_processes, in_filepaths=S2_filepaths)

with open(out_filepath) as outfile:
out_content = outfile.read()

ref_filepath = out_filepath.replace('.py', '_ref.py').replace(os.environ['AIRFLOW_DAGS'], os.environ['REF_JOBS'])
ref_filepath = out_filepath.replace('.py', '_ref.py').replace(airflow_dag_folder, os.environ['REF_JOBS'])
with open(ref_filepath) as ref_file:
ref_content = ref_file.read()

assert out_content == ref_content


def test_airflow_dag_vrt_only(evi_file,
setup_airflow_dag_folder, setup_ref_job_folder,
airflow_dag_folder, setup_ref_job_folder,
backend_processes, S2_filepaths):

job_id = "jb-12346"
out_filepath = os.path.join(os.environ['AIRFLOW_DAGS'], f'dag_{job_id}_prep.py')
out_filepath = os.path.join(airflow_dag_folder, f'dag_{job_id}_prep.py')
user_name = "jdoe_67890"

writer = AirflowDagWriter()
writer.write_and_move_job(job_id=job_id, user_name=user_name, process_graph_json=evi_file, job_data='./openeo_job',
writer.write_and_move_job(job_id=job_id, user_name=user_name, dags_folder=airflow_dag_folder, process_graph_json=evi_file, job_data='./openeo_job',
process_defs=backend_processes, in_filepaths=S2_filepaths, vrt_only=True)

with open(out_filepath) as outfile:
out_content = outfile.read()

ref_filepath = out_filepath.replace('.py', '_ref.py').replace(os.environ['AIRFLOW_DAGS'], os.environ['REF_JOBS'])
ref_filepath = out_filepath.replace('.py', '_ref.py').replace(airflow_dag_folder, os.environ['REF_JOBS'])
with open(ref_filepath) as outfile:
ref_content = outfile.read()

assert out_content == ref_content


def test_airflow_dag_parallel(evi_file, setup_airflow_dag_folder, airflow_job_folder,
def test_airflow_dag_parallel(evi_file, airflow_dag_folder, airflow_job_folder,
setup_ref_job_folder, backend_processes, S2_filepaths):

job_data = './openeo_job'

job_id = "jb-12347"
user_name = "jdoe_67890"
writer = AirflowDagWriter()
writer.write_and_move_job(job_id=job_id, user_name=user_name, process_graph_json=evi_file, job_data=job_data,
writer.write_and_move_job(job_id=job_id, user_name=user_name, dags_folder=airflow_dag_folder, process_graph_json=evi_file, job_data=job_data,
process_defs=backend_processes, in_filepaths=S2_filepaths,
vrt_only=True, add_delete_sensor=True, add_parallel_sensor=True)

# Check DAG before parallelisation
out_filepath = os.path.join(os.environ['AIRFLOW_DAGS'], f'dag_{job_id}_prep.py')
out_filepath = os.path.join(airflow_dag_folder, f'dag_{job_id}_prep.py')
with open(out_filepath) as outfile:
out_content = outfile.read()

ref_filepath = out_filepath.replace('.py', '_ref.py').replace(os.environ['AIRFLOW_DAGS'], os.environ['REF_JOBS'])
ref_filepath = out_filepath.replace('.py', '_ref.py').replace(airflow_dag_folder, os.environ['REF_JOBS'])
with open(ref_filepath) as outfile:
ref_content = outfile.read()
assert out_content == ref_content
Expand All @@ -87,7 +87,7 @@ def test_airflow_dag_parallel(evi_file, setup_airflow_dag_folder, airflow_job_fo

# Recreate (parallelised DAG)
writer = AirflowDagWriter()
domain = writer.get_domain(job_id, user_name, evi_file, job_data,
domain = writer.get_domain(job_id, user_name, airflow_dag_folder, evi_file, job_data,
process_defs=backend_processes, in_filepaths=S2_filepaths,
vrt_only=False, parallelize_tasks=True, add_delete_sensor=True)
# domain.job_id = domain.job_id + "_2"
Expand All @@ -97,31 +97,31 @@ def test_airflow_dag_parallel(evi_file, setup_airflow_dag_folder, airflow_job_fo
rmtree(job_data)

# Check DAG after parallelisation
out_filepath = os.path.join(os.environ['AIRFLOW_DAGS'], f'dag_{job_id}_parallel.py')
out_filepath = os.path.join(airflow_dag_folder, f'dag_{job_id}_parallel.py')
with open(out_filepath) as outfile:
out_content = outfile.read()

ref_filepath = out_filepath.replace('.py', '_ref.py').replace(os.environ['AIRFLOW_DAGS'], os.environ['REF_JOBS'])
ref_filepath = out_filepath.replace('.py', '_ref.py').replace(airflow_dag_folder, os.environ['REF_JOBS'])
with open(ref_filepath) as outfile:
ref_content = outfile.read()
assert out_content == ref_content


def test_airflow_dag_delete_sensor(evi_file, setup_airflow_dag_folder, setup_ref_job_folder,
def test_airflow_dag_delete_sensor(evi_file, airflow_dag_folder, setup_ref_job_folder,
backend_processes, S2_filepaths):

job_id = "jb-12345_delete_sensor"
out_filepath = os.path.join(os.environ['AIRFLOW_DAGS'], f'dag_{job_id}_prep.py')
out_filepath = os.path.join(airflow_dag_folder, f'dag_{job_id}_prep.py')
user_name = "jdoe_67890"

writer = AirflowDagWriter()
writer.write_and_move_job(job_id=job_id, user_name=user_name, process_graph_json=evi_file, job_data='./openeo_job',
writer.write_and_move_job(job_id=job_id, user_name=user_name, dags_folder=airflow_dag_folder, process_graph_json=evi_file, job_data='./openeo_job',
process_defs=backend_processes, in_filepaths=S2_filepaths, add_delete_sensor=True)

with open(out_filepath) as outfile:
out_content = outfile.read()

ref_filepath = out_filepath.replace('.py', '_ref.py').replace(os.environ['AIRFLOW_DAGS'], os.environ['REF_JOBS'])
ref_filepath = out_filepath.replace('.py', '_ref.py').replace(airflow_dag_folder, os.environ['REF_JOBS'])
with open(ref_filepath) as outfile:
ref_content = outfile.read()
assert out_content == ref_content

0 comments on commit 7973720

Please sign in to comment.