Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add evaluate api endpoint for server based deployment #640

Merged
merged 19 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion deploy/README.md → deploy/models/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

1. Install the required dependencies with poetry.
```bash
poetry install --with deploy
poetry install --with deploy_models
```
2. Serialize trained model and move it to the `model_repo` directory. Then create
a `config.pbtxt` file for the model.
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
44 changes: 44 additions & 0 deletions deploy/report/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Model Report Deployment using FastAPI

1. Install the required dependencies with poetry.

```bash
poetry install --with deploy_report
```

2. Run the evaluation server (development).

```bash
cd api/
fastapi dev main.py --host <host_ip> --port <port>
```

If running in production,

```bash
cd api/
fastapi run main.py --host <host_ip> --port <port>
```

The end points you can access are:

`/evaluate`: You can send model predictions, labels and metadata
for evaluation to this API endpoint using a POST request. For example:
```bash
curl -X POST 'http://<host_ip>:<port>/evaluate' \
-H 'Content-Type: application/json' \
-d '{
"preds_prob": [0.2, 0.7, 0.1, 0.9],
"target": [0, 1, 0, 1],
"metadata": {
"Age": [25, 30, 45, 50],
"Sex": ["M", "F", "M", "F"]
}
}'
```

If this is the first evaluation for the model, a new model report is created.
Else, the previous model report is used to add the new evaluation.

The server serves the latest model report, and hence to view it you
can navigate to the server IP address either directly at `http://<host_ip>:<port>` or `http://<host_ip>:<port>/evaluate`. In the latter case, we essentially send a GET request to the ``evaluate`` API endpoint which fetches the latest model report.
1 change: 1 addition & 0 deletions deploy/report/api/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""CyclOps evaluation server."""
307 changes: 307 additions & 0 deletions deploy/report/api/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,307 @@
"""A light weight server for evaluation."""

import logging
import os
import shutil
from datetime import datetime
from typing import Any, Dict, List

import pandas as pd
from datasets.arrow_dataset import Dataset
from fastapi import FastAPI, HTTPException
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates
from pydantic import BaseModel, Field, validator

from cyclops.data.slicer import SliceSpec
from cyclops.evaluate import evaluator
from cyclops.evaluate.metrics import create_metric
from cyclops.evaluate.metrics.experimental import MetricDict
from cyclops.report.plot.classification import ClassificationPlotter
from cyclops.report.report import ModelCardReport
from cyclops.report.utils import flatten_results_dict
from cyclops.utils.log import setup_logging


LOGGER = logging.getLogger(__name__)
setup_logging(print_level="WARN", logger=LOGGER)


app = FastAPI()
TEMPLATES_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "templates")
templates = Jinja2Templates(directory=TEMPLATES_PATH)


class EvaluationInput(BaseModel):
"""Input data for evaluation."""

preds_prob: List[float] = Field(..., min_items=1)
target: List[float] = Field(..., min_items=1)
metadata: Dict[str, List[Any]] = Field(default_factory=dict)

@classmethod
@validator("preds_prob", "target")
def check_list_length(
cls, v: List[float], values: Dict[str, List[Any]], **kwargs: Any
) -> List[float]:
"""Check if preds_prob and target have the same length.

Parameters
----------
v : List[float]
List of values.
values : Dict[str, List[Any]]
Dictionary of values.

Returns
-------
List[float]
List of values.

Raises
------
ValueError
If preds_prob and target have different lengths.

"""
if "preds_prob" in values and len(v) != len(values["preds_prob"]):
raise ValueError("preds_prob and target must have the same length")
return v

@classmethod
@validator("metadata")
def check_metadata_length(
cls, v: Dict[str, List[Any]], values: Dict[str, List[Any]], **kwargs: Any
) -> Dict[str, List[Any]]:
"""Check if metadata columns have the same length as preds_prob and target.

Parameters
----------
v : Dict[str, List[Any]]
Dictionary of values.
values : Dict[str, List[Any]]
Dictionary of values.

Returns
-------
Dict[str, List[Any]]
Dictionary of values.

Raises
------
ValueError
If metadata columns have different lengths than preds_prob and target.

"""
if "preds_prob" in values:
for column in v.values():
if len(column) != len(values["preds_prob"]):
raise ValueError(
"All metadata columns must have the same length as preds_prob and target"
)
return v


# This endpoint serves the UI
@app.get("/", response_class=HTMLResponse)
async def get_home() -> HTMLResponse:
"""Return home page for cyclops model report app.

Returns
-------
HTMLResponse
Home page for cyclops model report app.

"""
return templates.TemplateResponse(
"test_report.html", {"request": {"method": "POST"}}
)


@app.post("/evaluate")
async def evaluate_result(data: EvaluationInput) -> None:
"""Calculate metric and return result from request body.

Parameters
----------
data : EvaluationInput
Input data for evaluation.

Raises
------
HTTPException
If there is an internal server error.

"""
try:
# Create a dictionary with all data
df_dict = {
"target": data.target,
"preds_prob": data.preds_prob,
**data.metadata,
}

# Create DataFrame
df = pd.DataFrame(df_dict)

_eval(df)
LOGGER.info("Generated report.")
except Exception as e:
LOGGER.error(f"Error during evaluation: {str(e)}")
raise HTTPException(status_code=500, detail="Internal server error") from e


@app.get("/evaluate")
async def get_report() -> HTMLResponse:
"""Return latest updated model report.

Returns
-------
HTMLResponse
Latest updated model report as HTML response.

"""
return templates.TemplateResponse(
"test_report.html", {"request": {"method": "GET"}}
)


def _export(report: ModelCardReport) -> None:
"""Prepare and export report file."""
if not os.path.exists("./cyclops_report"):
LOGGER.info("Creating report for the first time!")
report_path = report.export(
output_filename="test_report.html",
synthetic_timestamp=str(datetime.today()),
last_n_evals=3,
)
shutil.copy(f"{report_path}", TEMPLATES_PATH)


def _eval(df: pd.DataFrame) -> None:
amrit110 marked this conversation as resolved.
Show resolved Hide resolved
"""Evaluate and return report.

Parameters
----------
df : pd.DataFrame
DataFrame containing target, preds_prob and metadata columns.

"""
report = ModelCardReport()
data = Dataset.from_pandas(df)
metric_names = [
"binary_accuracy",
"binary_precision",
"binary_recall",
"binary_f1_score",
]
metrics = [
create_metric(metric_name, experimental=True) for metric_name in metric_names
]
metric_collection = MetricDict(metrics)
spec_list = [
{
"Age": {
"min_value": 30,
"max_value": 50,
"min_inclusive": True,
"max_inclusive": False,
},
},
{
"Age": {
"min_value": 50,
"max_value": 70,
"min_inclusive": True,
"max_inclusive": False,
},
},
fcogidi marked this conversation as resolved.
Show resolved Hide resolved
]
slice_spec = SliceSpec(spec_list)
result = evaluator.evaluate(
dataset=data,
metrics=metric_collection,
slice_spec=slice_spec,
target_columns="target",
prediction_columns="preds_prob",
)
results_flat = flatten_results_dict(results=result)

# Log into report
for name, metric in results_flat["model_for_preds_prob"].items():
split, name = name.split("/") # noqa: PLW2901
descriptions = {
"BinaryPrecision": "The proportion of predicted positive instances that are correctly predicted.",
"BinaryRecall": "The proportion of actual positive instances that are correctly predicted. Also known as recall or true positive rate.",
"BinaryAccuracy": "The proportion of all instances that are correctly predicted.",
"BinaryF1Score": "The harmonic mean of precision and recall.",
}
report.log_quantitative_analysis(
"performance",
name=name,
value=metric.tolist(),
description=descriptions[name],
metric_slice=split,
pass_fail_thresholds=0.6,
pass_fail_threshold_fns=lambda x, threshold: bool(x >= threshold),
)

# Log plot in report
plotter = ClassificationPlotter(task_type="binary", class_names=["0", "1"])
plotter.set_template("plotly_white")

# Extracting the overall classification metric values.
overall_performance = {
metric_name: metric_value
for metric_name, metric_value in result["model_for_preds_prob"][
"overall"
].items()
if metric_name not in ["BinaryROC", "BinaryPrecisionRecallCurve"]
}
# Plotting the overall classification metric values.
overall_performance_plot = plotter.metrics_value(
overall_performance,
title="Overall Performance",
)
report.log_plotly_figure(
fig=overall_performance_plot,
caption="Overall Performance",
section_name="quantitative analysis",
)
report.log_from_dict(
data={
"name": "Heart Failure Prediction Model",
"description": "The model was trained on the Kaggle Heart Failure \
Prediction Dataset to predict risk of heart failure.",
},
section_name="model_details",
)
report.log_version(
version_str="0.0.1",
date=str(datetime.today().date()),
description="Initial Release",
)
report.log_owner(
name="CyclOps Team",
contact="vectorinstitute.github.io/cyclops/",
email="cyclops@vectorinstitute.ai",
)
report.log_license(identifier="Apache-2.0")
report.log_reference(
link="https://scikit-learn.org/stable/modules/generated/sklearn.linear_model.SGDClassifier.html", # noqa: E501
)
report.log_from_dict(
data={
"users": [
{"description": "Hospitals"},
{"description": "Clinicians"},
],
},
section_name="considerations",
)
report.log_user(description="ML Engineers")
report.log_use_case(
description="Predicting risk of heart failure.",
kind="primary",
)
_export(report)
Empty file.
Loading
Loading