Skip to content

Commit

Permalink
Merge pull request #3201 from bcgov/feat/3145-2
Browse files Browse the repository at this point in the history
feat(3145): integrate the provisioner script into Airflow DAGs
  • Loading branch information
funtigr authored Jun 25, 2024
2 parents 0b0b670 + 617d805 commit 2d28c2c
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 28 deletions.
39 changes: 11 additions & 28 deletions helm/secdash/dags/mark_provisioned.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import os
import requests
from pymongo import MongoClient
from bson.objectid import ObjectId
from projects import get_mongo_db
from _utils import keys_exist


def fetch_products_mark_completed(provisioner_api_url, mark_provisioned_url, mongo_connection_url):
def fetch_products_mark_completed(provisioner_api_url, mark_provisioned_url, mongo_conn_id):
"""
Fetches Approved Requests from MongoDB, makes call to the Provisioner by every of them
and if labels completed = true and phase = Succeeded, makes a call to callback URL.
Expand All @@ -20,10 +20,9 @@ def fetch_products_mark_completed(provisioner_api_url, mark_provisioned_url, mon

try:
# Establish MongoDB connection
client = MongoClient(mongo_connection_url)
database = client["pltsvc"]
requests_collection = database["PrivateCloudRequest"]
requested_projects_collection = database["PrivateCloudRequestedProject"]
db = get_mongo_db(mongo_conn_id)
requests_collection = db["PrivateCloudRequest"]
requested_projects_collection = db["PrivateCloudRequestedProject"]

product_requests = requests_collection.find({"decisionStatus": "APPROVED"}, projection={
"_id": True, "licencePlate": True, "decisionDataId": True})
Expand All @@ -40,6 +39,9 @@ def fetch_products_mark_completed(provisioner_api_url, mark_provisioned_url, mon
workflow_name = f"{cluster}-{licence_plate}-{request_id}"
provisioner_workflow_url = f"{provisioner_api_url}/{workflow_name}"
request_status_in_provisioner = requests.get(provisioner_workflow_url).json()
if not keys_exist(request_status_in_provisioner, 'metadata', 'labels', 'workflows.argoproj.io/phase'):
continue

request_phase = request_status_in_provisioner['metadata']['labels']['workflows.argoproj.io/phase']
if request_phase == "Running":
continue
Expand All @@ -53,27 +55,8 @@ def fetch_products_mark_completed(provisioner_api_url, mark_provisioned_url, mon
if response.status_code != 200:
print(
f"Error while marking {licence_plate} as Provisioned: {response.status_code} - {response.reason}")

client.close()
else:
print(f"Marked {licence_plate} as Provisioned")

except Exception as e:
print(f"[fetch_products_mark_completed] Error: {e}")


def prepare_data_to_poll_provisioner(conn_id):
"""
Fetches and assembles urls from environment variables and calls the function with fetching and marking logic.
Parameter:
- conn_id (str): used to find out which environment is needed (test or prod)
"""
if conn_id == "pltsvc_test":
provisioner_api_url = os.environ["TEST_PROVISIONER_URL"]
mark_provisioned_url = os.environ["TEST_MARK_PROVISIONED_URL"]
mongo_connection_url = os.environ["MONGO_URL_TEST"]

elif conn_id == "pltsvc_prod":
provisioner_api_url = os.environ["PROD_PROVISIONER_URL"]
mark_provisioned_url = os.environ["PROD_MARK_PROVISIONED_URL"]
mongo_connection_url = os.environ["MONGO_URL_PROD"]
fetch_products_mark_completed(provisioner_api_url, mark_provisioned_url, mongo_connection_url)
23 changes: 23 additions & 0 deletions helm/secdash/dags/provisioner-prod.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import os
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from mark_provisioned import fetch_products_mark_completed

MONGO_CONN_ID = 'pltsvc-test'
PROV_API_URL = os.getenv('PROD_PROVISIONER_URL')
MARK_PROV_URL = os.getenv('PROD_MARK_PROVISIONED_URL')

with DAG(
dag_id="provisioner_prod",
schedule_interval='*/7 * * * *',
start_date=datetime.now() - timedelta(minutes=8)
) as dag:
t1 = PythonOperator(
task_id='fetch-products-mark-completed-prod',
python_callable=fetch_products_mark_completed,
op_kwargs={'provisioner_api_url': PROV_API_URL,
'mark_provisioned_url': MARK_PROV_URL, 'mongo_conn_id': MONGO_CONN_ID},
provide_context=True,
dag=dag
)
23 changes: 23 additions & 0 deletions helm/secdash/dags/provisioner-test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import os
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from mark_provisioned import fetch_products_mark_completed

MONGO_CONN_ID = 'pltsvc-test'
PROV_API_URL = os.getenv('TEST_PROVISIONER_URL')
MARK_PROV_URL = os.getenv('TEST_MARK_PROVISIONED_URL')

with DAG(
dag_id="provisioner_test",
schedule_interval='*/7 * * * *',
start_date=datetime.now() - timedelta(minutes=8)
) as dag:
t1 = PythonOperator(
task_id='fetch-products-mark-completed-test',
python_callable=fetch_products_mark_completed,
op_kwargs={'provisioner_api_url': PROV_API_URL,
'mark_provisioned_url': MARK_PROV_URL, 'mongo_conn_id': MONGO_CONN_ID},
provide_context=True,
dag=dag
)

0 comments on commit 2d28c2c

Please sign in to comment.