Skip to content

Commit

Permalink
Merge pull request #51 from DARPA-ASKEM/dataservice-update
Browse files Browse the repository at this point in the history
terarium dataservice migration changes
  • Loading branch information
mwdchang authored Feb 1, 2024
2 parents 1f13594 + 8976f8b commit 6138fd0
Show file tree
Hide file tree
Showing 21 changed files with 440 additions and 416 deletions.
4 changes: 4 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ indent_style = space
[*.{yml,yaml}]
indent_style = space

[*.{py}]
indent_size = 4
indent_style = space

[*.{cmd,bat}]
end_of_line = crlf

Expand Down
26 changes: 3 additions & 23 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ authors = ["Powell Fendley", "Five Grant"]
readme = "README.md"
packages = [{include = "service"}, {include = "tests"}]


[tool.poetry.dependencies]
python = "^3.9"
python = "^3.10"
requests = "^2.31.0"
fastapi = "^0.96.0"
rq = "^1.15.0"
Expand All @@ -21,6 +22,7 @@ poethepoet = "^0.21.1"
# juliacall = { version="^0.9.14", optional = true }
dill = "^0.3.7"


[tool.poetry.scripts]
mockrabbitmq = "service.utils.rabbitmq:mock_rabbitmq_consumer"

Expand All @@ -35,6 +37,7 @@ mock = "^5.1.0"
fakeredis = "^2.17.0"
httpx = "^0.24.1"


[tool.poe.tasks]
install-pyciemss = "pip install --no-cache-dir git+https://github.com/fivegrant/pyciemss.git@087bc64d935f2ab5090330f1f7d6bde930404115 --use-pep517"

Expand All @@ -47,3 +50,7 @@ pythonpath = "service"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"


[tool.ruff]
ignore = ["E501"]
13 changes: 4 additions & 9 deletions service/execute.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import logging

# from juliacall import newmodule
from settings import settings
from utils.tds import (
update_tds_status,
cleanup_job_dir,
Expand All @@ -10,9 +9,6 @@

from pyciemss.interfaces import sample, calibrate, ensemble_sample # noqa: F401

TDS_SIMULATIONS = "/simulations/"
TDS_URL = settings.TDS_URL

# jl = newmodule("SciMLIntegration")
# jl.seval("using SciMLIntegration, PythonCall")

Expand All @@ -21,9 +17,8 @@


def run(request, *, job_id):
logging.debug(f"STARTED {job_id} (username: {request.username})")
sim_results_url = TDS_URL + TDS_SIMULATIONS + job_id
update_tds_status(sim_results_url, status="running", start=True)
logging.debug(f"STARTED {job_id} (user_id: {request.user_id})")
update_tds_status(job_id, status="running", start=True)

# if request.engine == "ciemss":
operation_name = request.__class__.pyciemss_lib_function
Expand All @@ -36,6 +31,6 @@ def run(request, *, job_id):
# operation = request.__class__.sciml_lib_function
# output = operation(job_id, jl)

attach_files(output, TDS_URL, TDS_SIMULATIONS, job_id)
attach_files(output, job_id)
cleanup_job_dir(job_id)
logging.debug(f"FINISHED {job_id} (username: {request.username})")
logging.debug(f"FINISHED {job_id} (user_id: {request.user_id})")
34 changes: 10 additions & 24 deletions service/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,6 @@
from utils.convert import convert_to_static_interventions, convert_to_solution_mapping
from utils.rabbitmq import gen_rabbitmq_hook # noqa: F401
from utils.tds import fetch_dataset, fetch_model, fetch_inferred_parameters
from settings import settings

TDS_CONFIGURATIONS = "/model_configurations/"
TDS_SIMULATIONS = "/simulations/"
TDS_URL = settings.TDS_URL


class Timespan(BaseModel):
Expand Down Expand Up @@ -95,7 +90,7 @@ class QuantityOfInterest(BaseModel):
class OperationRequest(BaseModel):
pyciemss_lib_function: ClassVar[str] = ""
engine: str = Field("ciemss", example="ciemss")
username: str = Field("not_provided", example="not_provided")
user_id: str = Field("not_provided", example="not_provided")

def gen_pyciemss_args(self, job_id):
raise NotImplementedError("PyCIEMSS cannot handle this operation")
Expand Down Expand Up @@ -137,15 +132,13 @@ class Simulate(OperationRequest):

def gen_pyciemss_args(self, job_id):
# Get model from TDS
amr_path = fetch_model(
self.model_config_id, TDS_URL, TDS_CONFIGURATIONS, job_id
)
amr_path = fetch_model(self.model_config_id, job_id)

interventions = convert_to_static_interventions(self.interventions)

extra_options = self.extra.dict()
inferred_parameters = fetch_inferred_parameters(
extra_options.pop("inferred_parameters"), TDS_URL, job_id
extra_options.pop("inferred_parameters"), job_id
)

return {
Expand All @@ -159,9 +152,7 @@ def gen_pyciemss_args(self, job_id):
}

def run_sciml_operation(self, job_id, julia_context):
amr_path = fetch_model(
self.model_config_id, TDS_URL, TDS_CONFIGURATIONS, job_id
)
amr_path = fetch_model(self.model_config_id, job_id)
with open(amr_path, "r") as file:
amr = file.read()
result = julia_context.simulate(amr, self.timespan.start, self.timespan.end)
Expand Down Expand Up @@ -209,11 +200,9 @@ class Calibrate(OperationRequest):
)

def gen_pyciemss_args(self, job_id):
amr_path = fetch_model(
self.model_config_id, TDS_URL, TDS_CONFIGURATIONS, job_id
)
amr_path = fetch_model(self.model_config_id, job_id)

dataset_path = fetch_dataset(self.dataset.dict(), TDS_URL, job_id)
dataset_path = fetch_dataset(self.dataset.dict(), job_id)

# TODO: Test RabbitMQ
try:
Expand Down Expand Up @@ -271,10 +260,7 @@ def gen_pyciemss_args(self, job_id):
solution_mappings = [
convert_to_solution_mapping(config) for config in self.model_configs
]
amr_paths = [
fetch_model(config.id, TDS_URL, TDS_CONFIGURATIONS, job_id)
for config in self.model_configs
]
amr_paths = [fetch_model(config.id, job_id) for config in self.model_configs]

return {
"model_paths_or_jsons": amr_paths,
Expand Down Expand Up @@ -310,7 +296,7 @@ class Config:
# pyciemss_lib_function: ClassVar[
# str
# ] = "load_and_calibrate_and_sample_ensemble_model"
# username: str = Field("not_provided", example="not_provided")
# user_id: str = Field("not_provided", example="not_provided")
# model_configs: List[ModelConfig] = Field(
# [],
# example=[],
Expand All @@ -327,11 +313,11 @@ class Config:
# solution_mappings = [config.solution_mappings for config
# in self.model_configs]
# amr_paths = [
# fetch_model(config.id, TDS_URL, TDS_CONFIGURATIONS, job_id)
# fetch_model(config.id, job_id)
# for config in self.model_configs
# ]

# dataset_path = fetch_dataset(self.dataset.dict(), TDS_URL, job_id)
# dataset_path = fetch_dataset(self.dataset.dict(), job_id)

# # Generate timepoints
# time_count = self.timespan.end - self.timespan.start
Expand Down
2 changes: 2 additions & 0 deletions service/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ class Settings(BaseSettings):
"""

TDS_URL: str = "http://data-service-api:8000"
TDS_USER: str = "user"
TDS_PASSWORD: str = "password"
REDIS_HOST: str = "redis"
REDIS_PORT: int = 6379
RABBITMQ_HOST: str = "rabbitmq.pyciemss"
Expand Down
41 changes: 14 additions & 27 deletions service/utils/rq_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@

import logging
from uuid import uuid4
import json
import requests

from fastapi import Response, status
from redis import Redis
Expand All @@ -13,10 +11,7 @@
from rq.job import Job

from settings import settings
from utils.tds import update_tds_status

TDS_SIMULATIONS = "/simulations/"
TDS_URL = settings.TDS_URL
from utils.tds import update_tds_status, create_tds_job, cancel_tds_job

logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)
Expand All @@ -27,42 +22,37 @@ def get_redis():


def update_status_on_job_fail(job, connection, etype, value, traceback):
update_tds_status(TDS_URL + TDS_SIMULATIONS + str(job.id), "error")
update_tds_status(str(job.id), "error")
log_message = f"""
###############################
There was an exception in CIEMSS Service
job: {job.id}
{etype}: {value}
{etype}: {value}
################################
"""
logging.exception(log_message)


def create_job(request_payload, sim_type, redis_conn):
job_id = f"ciemss-{uuid4()}"
workflow_id = f"{uuid4()}"

post_url = TDS_URL + TDS_SIMULATIONS
payload = {
"id": job_id,
"name": workflow_id,
"execution_payload": request_payload.dict(),
"result_files": [],
"type": sim_type,
"status": "queued",
"engine": request_payload.engine,
"workflow_id": job_id,
"workflow_id": workflow_id,
}
logging.info(payload)
response = requests.post(post_url, json=payload)
if response.status_code >= 300:
raise Exception(
(
"Failed to create simulation on TDS "
f"(status: {response.status_code}): {json.dumps(payload)}"
)
)
logging.info(response.content)

res = create_tds_job(payload)
job_id = res["id"]

logging.info(res)

queue = Queue(connection=redis_conn, default_timeout=-1)
queue.enqueue_call(
Expand All @@ -80,7 +70,7 @@ def fetch_job_status(job_id, redis_conn):
"""Fetch a job's results from RQ.
Args:
job_id (str): The id of the job being run in RQ.
job_id (uuid): The id of the job being run in RQ.
Returns:
Response:
Expand Down Expand Up @@ -111,10 +101,7 @@ def kill_job(job_id, redis_conn):
else:
job.cancel()

url = TDS_URL + TDS_SIMULATIONS + str(job_id)
tds_payload = requests.get(url).json()
tds_payload["status"] = "cancelled"
requests.put(url, json=json.loads(json.dumps(tds_payload, default=str)))
cancel_tds_job(str(job_id))

result = job.get_status()
return result
Loading

0 comments on commit 6138fd0

Please sign in to comment.