Skip to content

Commit

Permalink
Merge pull request #13 from SKA-INAF/devel
Browse files Browse the repository at this point in the history
Devel
  • Loading branch information
simoneriggi authored Dec 19, 2024
2 parents fa01847 + f9a2c54 commit f5bfbe5
Show file tree
Hide file tree
Showing 9 changed files with 562 additions and 117 deletions.
72 changes: 55 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ Apps are run as Docker (Kuberneter deploy) or Singularity (Slurm deploy) contain
* `classifier-cnn` image classifier (TensorFlow 2.x): `docker://sriggi/cnn-classifier`
* `umap` dimensionality reduction: `docker://sriggi/umap-job`
* `outlier-finder` with Isolation Forest: `docker://sriggi/outlier-finder-job`
* `hdbscan` cluster search: `docker://sriggi/hdbscan-job`
* `hdbscan` cluster search: `docker://sriggi/hdbscan-job`
* `similarity-search`: `docker://sriggi/similarity-search-job`

Singularity containers can be created from docker images with:

Expand All @@ -97,20 +98,20 @@ Before running the application you must do some preparatory stuff:

* (OPTIONAL) Create a dedicated user & group (e.g. `caesar`) allowed to run the application and services and give it ownership to the directories created below * Create the application working dir (by default `/opt/caesar-rest`)
* (OPTIONAL) Mount an external storage in the application working dir, for example using rclone: `/usr/bin/rclone mount --daemon [--uid=[UID] --gid=[UID]] --umask 000 --allow-other --file-perms 0777 --dir-cache-time 0m5s --vfs-cache-mode full [RCLONE_REMOTE_STORAGE]:[RCLONE_REMOTE_STORAGE_PATH] /opt/caesar-rest -vvv` where `UID` is the Linux user id of the user previously created.
* Create the top directory for data upload (by default `/opt/caesar-rest/data`)
* Create the top directory for jobs (by default `/opt/caesar-rest/jobs`)
* Create the top directory for data upload (by default `/opt/caesar-rest/data`). Place here also supported pre-configured datasets.
* Create the top directory for jobs (by default `/opt/caesar-rest/jobs`)
* Create the top directory for models (by default `/opt/caesar-rest/models`) and put TensorFlow/PyTorch model & weights files under this path.
* (OPTIONAL) Create the log directory for system services (see below), e.g. `/opt/caesar-rest/logs`
* (OPTIONAL) Create the run directory for system services (see below), e.g. `/opt/caesar-rest/run`


### **Run DB service**
caesar-rest requires a MongoDB service where to store user data and job information. To start the DB service:

```systemctl start mongodb.service```

Alternatively you can use the Docker container ```sriggi/caesar-rest-db:latest``` (see https://hub.docker.com/r/sriggi/caesar-rest-db) and deploy it with DockerCompose or Kubernetes (see the configuration files under the repository ```config``` directory.

### **Run Filebeat service**
### **Run Filebeat service (OPTIONAL)**
caesar-rest uses filebeat to forward file logs to an ElasticSearch service. To start the service:

```systemctl start filebeat.service```
Expand All @@ -119,6 +120,7 @@ Alternatively, you can use the Docker container for the application ```sriggi/ca

### **Run Celery services (OPTIONAL)**
If you want to manage jobs with Celery, you must run a message broker service (i.e. rabbitmq), a task store service (i.e. redis or mongdb) and one or more Celery worker services.
**NB: Celery job management option is not developed and maintained anymore in caesar-rest application. We suggest to use Slurm or Kubernetes deployment.**

#### **Run broker service**
To run the rabbimq message broker service:
Expand Down Expand Up @@ -197,10 +199,43 @@ In production you may want to run this as a system service:
* Start the service:
```sudo systemctl caesar-workers.service start```
```systemctl start caesar-workers.service```
Alternatively, you can use the Docker container ```sriggi/caesar-rest-worker:latest``` (https://hub.docker.com/r/sriggi/caesar-rest-worker) and deploy it with DockerCompose or Kubernetes (see the configuration files under the repository ```config``` directory.
### **Run Slurm services (OPTIONAL)**
If you want to manage jobs with Slurm, you must run the following services:
```systemctl start munge.service```
```systemctl start slurmd.service```
```systemctl start slurmdbd.service```
```systemctl start slurmctld.service```
```systemctl start slurmrestd.service```
Below, we report a sample configuration file (`/usr/lib/systemd/system/slurmrestd.service`) for the Slurm REST service:
```
[Unit]
Description=Slurm REST daemon
After=network.target munge.service slurmctld.service
ConditionPathExists=/etc/slurm/slurm.conf

[Service]
Type=simple
User=caesar
Group=caesar
EnvironmentFile=-/etc/sysconfig/slurmrestd
# Default to local auth via socket
ExecStart=/usr/sbin/slurmrestd -f /etc/slurm/slurmrestd.conf -a rest_auth/jwt -s openapi/v0.0.36 -vvvv 0.0.0.0:6820
ExecReload=/bin/kill -HUP $MAINPID

[Install]
WantedBy=multi-user.target
```
**NB: Slurm is currently the suggested job management option for caesar-rest application.**
### **Run the web application**
#### **Run the application in development mode**
Expand Down Expand Up @@ -276,6 +311,7 @@ where supported `ARGS` are:
* `umap_container`: Path to UMAP Singularity container (default=/opt/containers/sclassifier/umap_latest.sif)
* `outlier_finder_container`: Path to OutlierFinder Singularity container (default=/opt/containers/sclassifier/outlier_finder_latest.sif)
* `hdbscan_container`: Path to HDBSCAN Singularity container (default=/opt/containers/sclassifier/hdbscan_latest.sif)
* `simsearch_container`: Path to Similarity Search Singularity container (default=/opt/containers/sclassifier/similarity-search_latest.sif)
DATASET OPTIONS
* `dataset_smgps`: Path to smgps dataset json filelist
Expand Down Expand Up @@ -570,7 +606,8 @@ Server response contains a list of valid apps that can be queried for further de
"featextractor-simclr",
"umap",
"outlier-finder",
"hdbscan"
"hdbscan",
"similarity-search"
]
}
```
Expand Down Expand Up @@ -744,12 +781,12 @@ Server response contains a list of app options that can be used in job submissio
* Request methods: POST
* Request header: ```content-type: application/json```
A sample curl request would be:
A sample curl request for running the `caesar` source finder app would be:
```
curl -X POST \
-H 'Content-Type: application/json' \
-d '{"app":"caesar","job_inputs":{"inputfile":"/opt/caesar-rest/data/67a49bf7555b41739095681bf52a1f99.fits","run":true,"no-logredir":true,"envfile":"/home/riggi/Software/setvars.sh","no-mpi":true,"no-nestedsearch":true,"no-extendedsearch":true}}' \
-d '{"app": "caesar","data_inputs": {"data": "39ca08fc5c7c446d8756a48088ee684c"},"job_options": {"run": true,"no-logredir": true,"no-mpi": true,"no-nestedsearch": true,"no-extendedsearch": true}}' \
--url 'http://localhost:8080/caesar/api/v1.0/job'
```
Expand All @@ -760,18 +797,19 @@ Server response is:
```
{
"app": "caesar",
"job_id": "69ca62d7-5098-4fe7-a675-63895a2d06b1",
"job_inputs": {
"envfile": "/home/riggi/Software/setvars.sh",
"inputfile": "67a49bf7555b41739095681bf52a1f99",
"data_inputs": "39ca08fc5c7c446d8756a48088ee684c",
"job_id": "a4095b815a074d81a0cc447762aa29f1",
"job_options": {
"no-extendedsearch": true,
"no-logredir": true,
"no-mpi": true,
"no-nestedsearch": true,
"run": true
},
"status": "Job submitted with success",
"submit_date": "2020-04-24T14:05:24.761766"
},
"state": "PENDING",
"status": "Job submitted and registered with success",
"submit_date": "2024-12-19T10:00:42.865802",
"tag": ""
}
```
Expand Down
13 changes: 11 additions & 2 deletions apps/run_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def get_args():
# - Specify cmd options
parser.add_argument('-datadir','--datadir', dest='datadir', default='/opt/caesar-rest/data', required=False, type=str, help='Directory where to store uploaded data')
parser.add_argument('-jobdir','--jobdir', dest='jobdir', default='/opt/caesar-rest/jobs', required=False, type=str, help='Directory where to store jobs')
parser.add_argument('-modeldir','--modeldir', dest='modeldir', default='/opt/caesar-rest/models', required=False, type=str, help='Directory where models are stored')
parser.add_argument('-job_scheduler','--job_scheduler', dest='job_scheduler', default='celery', required=False, type=str, help='Job scheduler to be used. Options are: {celery,kubernetes,slurm} (default=celery)')
parser.add_argument('-job_monitoring_period','--job_monitoring_period', dest='job_monitoring_period', default=5, required=False, type=int, help='Job monitoring poll period in seconds')
parser.add_argument('--debug', dest='debug', action='store_true')
Expand Down Expand Up @@ -112,6 +113,7 @@ def get_args():
parser.add_argument('-slurm_queue','--slurm_queue', dest='slurm_queue', default='normal', required=False, type=str, help='Slurm cluster queue for submitting jobs')
parser.add_argument('-slurm_jobdir','--slurm_jobdir', dest='slurm_jobdir', default='/mnt/storage/jobs', required=False, type=str, help='Path at which the job directory is mounted in Slurm cluster')
parser.add_argument('-slurm_datadir','--slurm_datadir', dest='slurm_datadir', default='/mnt/storage/data', required=False, type=str, help='Path at which the data directory is mounted in Slurm cluster')
parser.add_argument('-slurm_modeldir','--slurm_modeldir', dest='slurm_modeldir', default='/mnt/storage/models', required=False, type=str, help='Path at which the models directory is mounted in Slurm cluster')
parser.add_argument('-slurm_max_cores_per_job','--slurm_max_cores_per_job', dest='slurm_max_cores_per_job', default=4, required=False, type=int, help='Slurm maximum number of cores reserved for a job (default=4)')

# - Volume mount options
Expand All @@ -130,6 +132,7 @@ def get_args():
parser.add_argument('-umap_container','--umap_container', dest='umap_container', default='/opt/containers/sclassifier/umap_latest.sif', required=False, type=str, help='Path to UMAP Singularity container (default=/opt/containers/sclassifier/umap_latest.sif)')
parser.add_argument('-outlier_finder_container','--outlier_finder_container', dest='outlier_finder_container', default='/opt/containers/sclassifier/outlier_finder_latest.sif', required=False, type=str, help='Path to OutlierFinder Singularity container (default=/opt/containers/sclassifier/outlier_finder_latest.sif)')
parser.add_argument('-hdbscan_container','--hdbscan_container', dest='hdbscan_container', default='/opt/containers/sclassifier/hdbscan_latest.sif', required=False, type=str, help='Path to HDBSCAN Singularity container (default=/opt/containers/sclassifier/hdbscan_latest.sif)')
parser.add_argument('-simsearch_container','--simsearch_container', dest='simsearch_container', default='/opt/containers/sclassifier/similarity-search_latest.sif', required=False, type=str, help='Path to Similarity Search Singularity container (default=/opt/containers/sclassifier/similarity-search_latest.sif)')

# - Dataset options
parser.add_argument('-dataset_smgps','--dataset_smgps', dest='dataset_smgps', default='', required=False, type=str, help='Path to smgps dataset json filelist')
Expand Down Expand Up @@ -159,6 +162,7 @@ def get_args():
# - Dir options
datadir= args.datadir
jobdir= args.jobdir
modeldir= args.modeldir
debug= args.debug

# - Log level options
Expand Down Expand Up @@ -192,8 +196,6 @@ def get_args():
logger.info("Setting log level to %s ..." % loglevel)
logger.setLevel(loglevel)



# - AAI options
use_aai= args.aai
secret_file= args.secretfile
Expand Down Expand Up @@ -271,6 +273,7 @@ def get_args():
slurm_queue= args.slurm_queue
slurm_jobdir= args.slurm_jobdir
slurm_datadir= args.slurm_datadir
slurm_modeldir= args.slurm_modeldir
slurm_max_cores_per_job= args.slurm_max_cores_per_job

# - Singularity container options
Expand All @@ -282,6 +285,7 @@ def get_args():
umap_container= args.umap_container
outlier_finder_container= args.outlier_finder_container
hdbscan_container= args.hdbscan_container
simsearch_container= args.simsearch_container

# - Dataset options
dataset_smgps= args.dataset_smgps
Expand All @@ -301,6 +305,7 @@ def get_args():
config= Config()
config.UPLOAD_FOLDER= datadir
config.JOB_DIR= jobdir
config.MODEL_DIR= modeldir
config.USE_AAI= False
config.JOB_MONITORING_PERIOD= job_monitoring_period

Expand Down Expand Up @@ -336,6 +341,7 @@ def get_args():
config.SLURM_PORT= slurm_port
config.SLURM_JOB_DIR= slurm_jobdir
config.SLURM_DATA_DIR= slurm_datadir
config.SLURM_MODEL_DIR= slurm_modeldir
config.SLURM_MAX_CORE_PER_JOB= slurm_max_cores_per_job

config.MOUNT_RCLONE_VOLUME= args.mount_rclone_volume
Expand All @@ -356,6 +362,7 @@ def get_args():
config.SLURM_UMAP_JOB_IMAGE= umap_container
config.SLURM_OUTLIER_FINDER_JOB_IMAGE= outlier_finder_container
config.SLURM_HDBSCAN_JOB_IMAGE= hdbscan_container
config.SLURM_SIMSEARCH_JOB_IMAGE= simsearch_container

# - Create data manager (DEPRECATED BY MONGO)
##logger.info("Creating data manager ...")
Expand Down Expand Up @@ -447,8 +454,10 @@ def get_args():
jobmgr_slurm.cluster_batch_workdir= config.SLURM_BATCH_WORKDIR
jobmgr_slurm.cluster_jobdir= config.SLURM_JOB_DIR
jobmgr_slurm.cluster_datadir= config.SLURM_DATA_DIR
jobmgr_slurm.cluster_modeldir= config.SLURM_MODEL_DIR
jobmgr_slurm.app_jobdir= config.JOB_DIR
jobmgr_slurm.app_datadir= config.UPLOAD_FOLDER
jobmgr_slurm.app_modeldir= config.MODEL_DIR
jobmgr_slurm.max_cores= config.SLURM_MAX_CORE_PER_JOB

# - Initialize client
Expand Down
31 changes: 16 additions & 15 deletions caesar_rest/base_app_configurator.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ class AppConfigurator(object):
def __init__(self):
""" Constructor"""

self.job_inputs= ''
self.job_options= ''
self.data_inputs= ''
self.cmd= ''
self.cmd_args= []
Expand Down Expand Up @@ -199,46 +199,47 @@ def set_data_input_option_value(self):
# Left empty as to be overridden in derived classes


def validate(self, job_inputs, data_inputs):
def validate(self, job_options, data_inputs):
""" Validate job input """

logger.info("Validating given inputs ...", action="submitjob")

# - Check if job inputs are empty
if not job_inputs:
if not job_options:
self.validation_status= 'Empty job inputs given!'
logger.warn(self.validation_status, action="submitjob")
return False

# - Check data inputs
if not data_inputs or data_inputs is None:
#if not data_inputs or data_inputs is None:
if data_inputs is None or (isinstance(data_inputs,list) and not data_inputs) or (isinstance(data_inputs,str) and data_inputs==""):
self.validation_status= 'Empty or null data input given!'
logger.warn(self.validation_status, action="submitjob")
return False

self.data_inputs= data_inputs

# - Convert json string to dictionary
#print("type(job_inputs)")
#print(type(job_inputs))
#print(job_inputs)
#print("type(job_options)")
#print(type(job_options))
#print(job_options)

if not isinstance(job_inputs,dict):
if not isinstance(job_options,dict):
self.validation_status= 'Given job inputs data is not a dictionary!'
logger.warn(self.validation_status, action="submitjob")
return False

try:
self.job_inputs= yaml.safe_load(json.dumps(job_inputs))
self.job_options= yaml.safe_load(json.dumps(job_options))

except ValueError:
self.validation_status= 'Failed to parse job inputs as json dictionary!'
logger.warn(self.validation_status, action="submitjob")
return False

#print("type(self.job_inputs)")
#print(type(self.job_inputs))
#print(self.job_inputs)
#print("type(self.job_options)")
#print(type(self.job_options))
#print(self.job_options)

# - Validate options
valid= self.validate_options()
Expand All @@ -263,7 +264,7 @@ def validate_options(self):

# - Validate options
for opt_name, option in self.valid_options.items():
option_given= opt_name in self.job_inputs
option_given= opt_name in self.job_options

# - Check if mandatory option is given
mandatory= option.mandatory
Expand All @@ -281,7 +282,7 @@ def validate_options(self):
if value_required:
# - Check for value type
expected_val_type= option.value_type
parsed_value= self.job_inputs[opt_name]
parsed_value= self.job_options[opt_name]
parsed_value_type= type(parsed_value)
if not isinstance(parsed_value, expected_val_type):
#self.validation_status= ''.join(["Option ",opt_name," expects a ",str(expected_val_type)," value type and not a ",str(parsed_value_type)," !"])
Expand Down Expand Up @@ -348,7 +349,7 @@ def validate_options(self):
else: # No value required

# - Check boolean value given
parsed_value= self.job_inputs[opt_name]
parsed_value= self.job_options[opt_name]
parsed_value_type= type(parsed_value)
if not isinstance(parsed_value, bool):
self.validation_status= ''.join(["Failed to parse bool option ",opt_name," (parsed value type is not a boolean)!"])
Expand Down
8 changes: 7 additions & 1 deletion caesar_rest/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class Config(object):

# - Additional options
JOB_DIR= '/opt/caesar-rest/jobs'
MODEL_DIR= '/opt/caesar-rest/models'
UPLOAD_ALLOWED_FILE_FORMATS= set(['png', 'jpg', 'jpeg', 'gif', 'fits'])
JOB_MONITORING_PERIOD= 5 # in seconds

Expand Down Expand Up @@ -58,14 +59,16 @@ class Config(object):
SLURM_BATCH_WORKDIR= '/opt/caesar-rest/batchlogs'
SLURM_JOB_DIR= '/mnt/storage/jobs' # Path at which the job directory is mounted in Slurm cluster
SLURM_DATA_DIR= '/mnt/storage/data' # Path at which the data directory is mounted in Slurm cluster
SLURM_MODEL_DIR= '/mnt/storage/models' # Path at which the models directory is mounted in Slurm cluster
SLURM_CAESAR_JOB_IMAGE= '/opt/containers/caesar/caesar-job_latest.sif'
SLURM_MASKRCNN_JOB_IMAGE= '/opt/containers/mrcnn/mrcnn-detect_latest.sif'
SLURM_AEGEAN_JOB_IMAGE= '/opt/containers/aegean/aegean-job_latest.sif'
SLURM_CUTEX_JOB_IMAGE= '/opt/containers/cutex/cutex-job_latest.sif'
SLURM_CNN_CLASSIFIER_JOB_IMAGE= '/opt/containers/sclassifier/cnn-classifier_latest.sif'
SLURM_UMAP_JOB_IMAGE= '/opt/containers/sclassifier/umap_latest.sif'
SLURM_OUTLIER_FINDER_JOB_IMAGE= '/opt/containers/sclassifier/outlier_finder_latest.sif'
SLURM_HDBSCAN_JOB_IMAGE= '/opt/containers/sclassifier/hdbscan_latest.sif'
SLURM_HDBSCAN_JOB_IMAGE= '/opt/containers/sclassifier/hdbscan_latest.sif'
SLURM_SIMSEARCH_JOB_IMAGE= '/opt/containers/sclassifier/similarity-search_latest.sif'
SLURM_MAX_CORE_PER_JOB= 4 # Maximum number of cores reserved for a job

# - AAI options
Expand Down Expand Up @@ -106,6 +109,9 @@ class Config(object):
# - HDBSCAN app options
HDBSCAN_JOB_IMAGE= 'sriggi/hdbscan:latest'

# - SIM SEARCH app options
SIMSEARCH_JOB_IMAGE= 'sriggi/similarity-search:latest'

# - DATASET app options
# NB: paths are to be configured at app deployment phase.
DATASETS= {
Expand Down
4 changes: 2 additions & 2 deletions caesar_rest/hdbscan_app_configurator.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ def __init__(self):
min_value=-1,
max_value=10000
),
'cluster-sel-eps' : ValueOption(
name='cluster-sel-eps',
'cluster-selection-epsilon' : ValueOption(
name='cluster-selection-epsilon',
value='',
value_type=float,
description='A distance threshold. Clusters below this value will be merged (default=0)',
Expand Down
Loading

0 comments on commit f5bfbe5

Please sign in to comment.