Skip to content

Commit

Permalink
added methods to store CUBE endpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
Sandip117 committed Apr 11, 2024
1 parent 4007eeb commit ba9c37f
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 17 deletions.
2 changes: 1 addition & 1 deletion app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

class Settings(BaseSettings):
pflink_mongodb: MongoDsn = 'mongodb://localhost:27017'
version: str = "4.0.3"
version: str = "4.0.4"
mongo_username: str = "admin"
mongo_password: str = "admin"
log_level: str = "DEBUG"
Expand Down
51 changes: 51 additions & 0 deletions app/controllers/cube.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import pymongo.results
from app.config import settings
from pymongo import MongoClient
from app.controllers.subprocesses.utils import (
get_cube_url_from_pfdcm,
do_cube_create_user,
Expand All @@ -6,6 +9,21 @@
from app.models import cube
import requests
import json
from typing import List

MONGO_DETAILS = str(settings.pflink_mongodb)
client = MongoClient(MONGO_DETAILS, username=settings.mongo_username, password=settings.mongo_password)
database = client.database
cube_collection = database.get_collection("cubes_collection")


# helper methods to add and retrieve

def cube_add_helper(cube_data: cube.CubeService) -> dict:
return {
"_id": cube_data["service_name"],
"service_URL": cube_data["service_URL"],
}


async def get_plugins(pfdcm: str, cube_name: str):
Expand Down Expand Up @@ -50,3 +68,36 @@ def get_cube_client(pfdcm: str, cube_name: str):
raise Exception(ex)


def add_cube_service(cube_service_data: cube.CubeService) -> dict:
"""
DB constraint: Only unique names allowed
"""
try:
cube_svc: pymongo.results.InsertOneResult = cube_collection.insert_one(cube_add_helper(cube_service_data))
if cube_svc.acknowledged:
inserted_cube_svc: dict = cube_collection.find_one({"_id": cube_svc.inserted_id})
return inserted_cube_svc
else:
raise Exception("Could not store new record.")
except Exception as ex:
return {"error": str(ex)}


def delete_cube_service(service_name: str) -> bool:
"""Remove an existing cube service"""
pass


def update_cube_service(service_name: str, new_data: dict) -> dict:
"""Update an existing cube service"""
pass


def retrieve_cube_service(service_name: str) -> dict:
"""Retrieve an existing cube service entry from the DB"""
pass


def retrieve_cube_services() -> List[str]:
"""Retrieve all the cube services present in the DB"""
pass
26 changes: 13 additions & 13 deletions app/controllers/subprocesses/wf_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

dictConfig(log.log_config)
logger = logging.getLogger('pflink-logger')
d = {'workername': 'WORKFLOW_MGR', 'key' : "",'log_color': "\33[33m"}
d = {'workername': 'WORKFLOW_MGR', 'key': "", 'log_color': "\33[33m"}


def define_parameters():
Expand All @@ -40,13 +40,15 @@ def define_parameters():
parser.add_argument('--test', default=False, action='store_true')
return parser


def shorten(s, width=100, placeholder='[...]'):
"""
Validate a given feed name for size = 100 chars
if size exceeds, trim the name and add a suffix placeholder
"""
return s[:width] if len(s) <= width else s[:width - len(placeholder)] + placeholder


def str_to_param_dict(params: str) -> dict:
"""
Convert CLI arguments passed as string to a dictionary of parameters
Expand All @@ -69,7 +71,6 @@ def str_to_param_dict(params: str) -> dict:

class WorkflowError(Exception):
def __init__(self, message, errors):

# Call the base class constructor with the parameters it needs
super(WorkflowError, self).__init__(message)

Expand All @@ -81,6 +82,7 @@ class WorkflowManager:
"""
This module manages different states of a workflow by constantly checking the status of a workflow in the DB.
"""

def __init__(self, args):
"""
Initialize class variables
Expand All @@ -99,7 +101,7 @@ def run(self):
self.manage_workflow(key, self.args.test)
response = self.__workflow.response
logger.info(f"Workflow manager exited with status {response.status} and\
current workflow state as {response.workflow_state}",extra=d)
current workflow state as {response.workflow_state}", extra=d)

def fetch_and_load(self, db_key: str, test: bool):
"""
Expand Down Expand Up @@ -209,7 +211,7 @@ def get_feed_name(self) -> str:
feed_name = shorten(feed_name)
return feed_name
except WorkflowError as er:
raise WorkflowError("Feed name could not be created.",er)
raise WorkflowError("Feed name could not be created.", er)

def run_plugin_or_pipeline_instance(self, prev_id: str):
"""
Expand All @@ -223,7 +225,7 @@ def run_plugin_or_pipeline_instance(self, prev_id: str):
plugin_params = str_to_param_dict(request.workflow_info.plugin_params)
plugin_params["previous_id"] = prev_id
plugin_id = self.get_plugin_id(request.workflow_info.plugin_name,
request.workflow_info.plugin_version)
request.workflow_info.plugin_version)
self.run_plugin(plugin_id, prev_id, plugin_params)
if request.workflow_info.pipeline_name:
pipeline_id = self.get_pipeline_id(request.workflow_info.pipeline_name)
Expand Down Expand Up @@ -291,7 +293,7 @@ def get_plugin_id(self, name: str, version: str = "") -> str:
plugin_id: str = self.__client.getPluginId(plugin_search_params)
return plugin_id
except Exception as er:
raise WorkflowError("Plugin could not be found.",er)
raise WorkflowError("Plugin could not be found.", er)

def get_data_path(self, request: WorkflowRequestSchema) -> str:
"""
Expand All @@ -305,7 +307,7 @@ def get_data_path(self, request: WorkflowRequestSchema) -> str:
data_path = self.__client.getSwiftPath(pacs_search_params)
return data_path
except WorkflowError as er:
raise WorkflowError("Data path could not be found.",er)
raise WorkflowError("Data path could not be found.", er)

def get_pipeline_id(self, name: str) -> str:
"""Method to search for a particular pipeline and return its ID"""
Expand All @@ -314,7 +316,7 @@ def get_pipeline_id(self, name: str) -> str:
pipeline_id = self.__client.getPipelineId(pipeline_search_params)
return str(pipeline_id)
except WorkflowError as er:
raise WorkflowError("Pipeline could not be found.",er)
raise WorkflowError("Pipeline could not be found.", er)

def run_pipeline(self, pipeline_id: str, name: str, prev_id: str) -> dict:
"""Run a pipeline instance of a previous plugin instance ID"""
Expand All @@ -324,7 +326,7 @@ def run_pipeline(self, pipeline_id: str, name: str, prev_id: str) -> dict:
feed_resp: dict = self.__client.createWorkflow(pipeline_id, pipeline_params)
return feed_resp
except WorkflowError as er:
raise WorkflowError("Pipeline could not be run.",er)
raise WorkflowError("Pipeline could not be run.", er)

def run_plugin(self, plugin_id: str, prev_id: str, plugin_params: dict) -> str:
"""Run a plugin instance on a previous plugin instance ID"""
Expand All @@ -334,7 +336,7 @@ def run_plugin(self, plugin_id: str, prev_id: str, plugin_params: dict) -> str:
resp = self.__client.createFeed(plugin_id, plugin_params)
return resp
except WorkflowError as err:
raise WorkflowError("Plugin could not be run.",err)
raise WorkflowError("Plugin could not be run.", err)

def create_new_feed(self, feed_name: str, data_path: str, dircopy_id: str) -> (str, str):
"""
Expand All @@ -346,7 +348,7 @@ def create_new_feed(self, feed_name: str, data_path: str, dircopy_id: str) -> (s
feed_response = self.__client.createFeed(dircopy_id, feed_params)
return feed_response["feed_id"], feed_response["id"]
except WorkflowError as er:
raise WorkflowError("Feed could not be created.",er)
raise WorkflowError("Feed could not be created.", er)

def task_producer(self):
"""
Expand Down Expand Up @@ -437,5 +439,3 @@ def main():
Main entry point of this script
"""
main()


3 changes: 1 addition & 2 deletions app/controllers/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ def retrieve_workflows(search_params: WorkflowSearchSchema, test: bool = False):
collection = test_collection if test else workflow_collection
index = collection.create_index([('$**', TEXT)],
name='search_index', default_language='english')
workflows = []
# query, rank, response = search.compound_queries(search_params)

workflows = collection.aggregate(
[
{"$match": {"$text": {"$search": search_params.keywords}}},
Expand Down
22 changes: 21 additions & 1 deletion app/models/cube.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from pydantic import BaseModel, Field
from pydantic import BaseModel, Field, validator
from typing import List


class Plugin(BaseModel):
Expand All @@ -7,5 +8,24 @@ class Plugin(BaseModel):
version: str = Field(...)


class CubeService(BaseModel):
"""This class represents a CUBE service"""
service_name: str = Field(...)
service_URL: str = Field(...)

@validator('*')
def check_for_empty_string(cls, v):
assert v != '', "Empty strings not allowed."
return v


class CubeServiceResponse(BaseModel):
"""This class represents a CUBE service response from `pflink`"""
data: dict
message: str = ""


class CubeServiceCollection(BaseModel):
"""This class represents the collection of CUBE services available"""
data: List[str]
message: str = ""

0 comments on commit ba9c37f

Please sign in to comment.