Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement prediction service #276

Open
wants to merge 27 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
b73b28b
feat: Add initial prediction service data pipeline to learner - imple…
lrahmani Feb 9, 2023
b90bae7
Adding draft of generic prediction service. (#278)
hanwag Apr 3, 2023
9534624
Bug-Fix: linting.
hanwag Apr 3, 2023
b65df16
Bug-Fix: linting.
hanwag Apr 3, 2023
d794d4e
Bug-Fix: failing pytests.
hanwag Apr 3, 2023
f89a3a8
Bug-Fix: data compatibilities.
hanwag Apr 3, 2023
e0fab4a
Bug-Fix: typo pred compatibilities.
hanwag Apr 3, 2023
3765c5c
Bug-Fix: failing tests and linting.
hanwag Apr 4, 2023
c617ee2
tmp passing failing test.
hanwag Apr 5, 2023
7ec9711
Bug-Fix: Prediction optional.
hanwag Apr 5, 2023
021752a
Bug-Fix: linting and filepath.
hanwag Apr 5, 2023
9eb9710
Bug-Fix: path update.
hanwag Apr 5, 2023
4e3a371
bug-fix: folder location and linting error.
hanwag Apr 5, 2023
b3004e1
bug-fix: trailing white space.
hanwag Apr 5, 2023
377c21e
Bug-Fix: type checking errors.
hanwag Apr 5, 2023
0e9fe4c
Bug-Fix: style checks.
hanwag Apr 5, 2023
299c303
Merge branch 'master' into feature/prediction
lrahmani Apr 5, 2023
8ea23ac
Bug-Fix: increase time out and failing env1 test.
hanwag Apr 6, 2023
27dba67
Bug-Fix: pred data loader.
hanwag Apr 6, 2023
3ae322e
Added debug messages.
hanwag Apr 6, 2023
bd94ba9
Bug-Fix: failing scania test.
hanwag Apr 6, 2023
fb01378
Bug-Fix: timeout, env17 removed, fixed.
hanwag Apr 6, 2023
a8ec8e4
Bug-Fix: keras fraud test.
hanwag Apr 6, 2023
6587465
Bug-Fix: test type annotations.
hanwag Apr 6, 2023
07143c5
Bug-Fix: added type hints.
hanwag Apr 6, 2023
ce38a66
Implement Prediction dataloader for Scania learner (#280)
hanwag Apr 18, 2023
0179864
Multiple metrics (#283)
hanwag May 23, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/python-app.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ jobs:

strategy:
matrix:
python-version: [3.7, 3.8]
python-version: [3.7]
env:
GITHUB_ACTION: true

Expand Down
50 changes: 25 additions & 25 deletions colearn/ml_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,32 +20,7 @@
from typing import Any, Optional

import onnx
import onnxmltools
import sklearn
import tensorflow as tf
import torch
from pydantic import BaseModel
from tensorflow import keras

model_classes_keras = (tf.keras.Model, keras.Model, tf.estimator.Estimator)
model_classes_scipy = (torch.nn.Module)
model_classes_sklearn = (sklearn.base.ClassifierMixin)


def convert_model_to_onnx(model: Any):
"""
Helper function to convert a ML model to onnx format
"""
if isinstance(model, model_classes_keras):
return onnxmltools.convert_keras(model)
if isinstance(model, model_classes_sklearn):
return onnxmltools.convert_sklearn(model)
if 'xgboost' in model.__repr__():
return onnxmltools.convert_sklearn(model)
if isinstance(model, model_classes_scipy):
raise Exception("Pytorch models not yet supported to onnx")
else:
raise Exception("Attempt to convert unsupported model to onnx: {model}")


class DiffPrivBudget(BaseModel):
Expand Down Expand Up @@ -94,6 +69,17 @@ class ColearnModel(BaseModel):
model: Optional[Any]


class PredictionRequest(BaseModel):
name: str
input_data: Any
pred_data_loader_key: Optional[Any]
hanwag marked this conversation as resolved.
Show resolved Hide resolved


class Prediction(BaseModel):
name: str
prediction_data: Any


def deser_model(model: Any) -> onnx.ModelProto:
"""
Helper function to recover a onnx model from its deserialized form
Expand Down Expand Up @@ -136,3 +122,17 @@ def mli_get_current_model(self) -> ColearnModel:
Returns the current model
"""
pass

@abc.abstractmethod
def mli_make_prediction(self, request: PredictionRequest) -> Prediction:
"""
Make prediction using the current model.
Does not change the current weights of the model.

:param request: data to get the prediction for
:returns: the prediction
"""
pass


_DM_PREDICTION_SUFFIX = b">>>result<<<"
hanwag marked this conversation as resolved.
Show resolved Hide resolved
44 changes: 44 additions & 0 deletions colearn/onnxutils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# ------------------------------------------------------------------------------
#
# Copyright 2021 Fetch.AI Limited
#
# Licensed under the Creative Commons Attribution-NonCommercial International
# License, Version 4.0 (the "License"); you may not use this file except in
# compliance with the License. You may obtain a copy of the License at
#
# http://creativecommons.org/licenses/by-nc/4.0/legalcode
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# ------------------------------------------------------------------------------
from typing import Any

import onnxmltools
import sklearn
import tensorflow as tf
import torch
from tensorflow import keras

model_classes_keras = (tf.keras.Model, keras.Model, tf.estimator.Estimator)
model_classes_scipy = (torch.nn.Module)
model_classes_sklearn = (sklearn.base.ClassifierMixin)


def convert_model_to_onnx(model: Any):
"""
Helper function to convert a ML model to onnx format
"""
if isinstance(model, model_classes_keras):
return onnxmltools.convert_keras(model)
if isinstance(model, model_classes_sklearn):
return onnxmltools.convert_sklearn(model)
if 'xgboost' in model.__repr__():
return onnxmltools.convert_sklearn(model)
if isinstance(model, model_classes_scipy):
raise Exception("Pytorch models not yet supported to onnx")
else:
raise Exception("Attempt to convert unsupported model to onnx: {model}")
2 changes: 1 addition & 1 deletion colearn_examples/ml_interface/keras_fraud.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
input_classes = 431
n_classes = 1
loss = "binary_crossentropy"
optimizer = tf.keras.optimizers.Adam
optimizer = tf.keras.optimizers.legacy.Adam
l_rate = 0.0001
l_rate_decay = 1e-5
batch_size = 10000
Expand Down
6 changes: 5 additions & 1 deletion colearn_examples/ml_interface/mli_fraud.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
import sklearn
from sklearn.linear_model import SGDClassifier

from colearn.ml_interface import MachineLearningInterface, Weights, ProposedWeights, ColearnModel, ModelFormat, convert_model_to_onnx
from colearn.ml_interface import MachineLearningInterface, Prediction, PredictionRequest, Weights, ProposedWeights, ColearnModel, ModelFormat
from colearn.onnxutils import convert_model_to_onnx
from colearn.training import initial_result, collective_learning_round, set_equal_weights
from colearn.utils.plot import ColearnPlot
from colearn.utils.results import Results, print_results
Expand Down Expand Up @@ -130,6 +131,9 @@ def test(self, data, labels):
except sklearn.exceptions.NotFittedError:
return 0

def mli_make_prediction(self, request: PredictionRequest) -> Prediction:
raise NotImplementedError()


if __name__ == "__main__":
parser = argparse.ArgumentParser()
Expand Down
6 changes: 5 additions & 1 deletion colearn_examples/ml_interface/mli_random_forest_iris.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
from sklearn import datasets
from sklearn.ensemble import RandomForestClassifier

from colearn.ml_interface import MachineLearningInterface, Weights, ProposedWeights, ColearnModel, ModelFormat, convert_model_to_onnx
from colearn.ml_interface import MachineLearningInterface, Prediction, PredictionRequest, Weights, ProposedWeights, ColearnModel, ModelFormat
from colearn.onnxutils import convert_model_to_onnx
from colearn.training import initial_result, collective_learning_round
from colearn.utils.plot import ColearnPlot
from colearn.utils.results import Results, print_results
Expand Down Expand Up @@ -114,6 +115,9 @@ def set_weights(self, weights: Weights):
def test(self, data_array, labels_array):
return self.model.score(data_array, labels_array)

def mli_make_prediction(self, request: PredictionRequest) -> Prediction:
raise NotImplementedError()


train_fraction = 0.9
vote_fraction = 0.05
Expand Down
6 changes: 5 additions & 1 deletion colearn_examples/ml_interface/xgb_reg_boston.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
import numpy as np
import xgboost as xgb

from colearn.ml_interface import MachineLearningInterface, Weights, ProposedWeights, ColearnModel, ModelFormat, convert_model_to_onnx
from colearn.ml_interface import MachineLearningInterface, Prediction, PredictionRequest, Weights, ProposedWeights, ColearnModel, ModelFormat
from colearn.onnxutils import convert_model_to_onnx
from colearn.training import initial_result, collective_learning_round
from colearn.utils.data import split_list_into_fractions
from colearn.utils.plot import ColearnPlot
Expand Down Expand Up @@ -112,6 +113,9 @@ def mli_get_current_model(self) -> ColearnModel:
def test(self, data_matrix):
return mse(self.model.predict(data_matrix), data_matrix.get_label())

def mli_make_prediction(self, request: PredictionRequest) -> Prediction:
raise NotImplementedError()


train_fraction = 0.9
vote_fraction = 0.05
Expand Down
56 changes: 46 additions & 10 deletions colearn_grpc/example_grpc_learner_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

import colearn_grpc.proto.generated.interface_pb2 as ipb2
import colearn_grpc.proto.generated.interface_pb2_grpc as ipb2_grpc
from colearn.ml_interface import MachineLearningInterface, ProposedWeights, Weights, ColearnModel
from colearn.ml_interface import MachineLearningInterface, Prediction, PredictionRequest, ProposedWeights, Weights, ColearnModel
from colearn_grpc.logging import get_logger
from colearn_grpc.utils import iterator_to_weights, weights_to_iterator

Expand Down Expand Up @@ -65,13 +65,15 @@ def start(self):
# Attempt to get the certificate from the server and use it to encrypt the
# connection. If the certificate cannot be found, try to create an unencrypted connection.
try:
assert (':' in self.address), f"Poorly formatted address, needs :port - {self.address}"
assert (
':' in self.address), f"Poorly formatted address, needs :port - {self.address}"
_logger.info(f"Connecting to server: {self.address}")
addr, port = self.address.split(':')
trusted_certs = ssl.get_server_certificate((addr, int(port)))

# create credentials
credentials = grpc.ssl_channel_credentials(root_certificates=trusted_certs.encode())
credentials = grpc.ssl_channel_credentials(
root_certificates=trusted_certs.encode())
except ssl.SSLError as e:
_logger.warning(
f"Encountered ssl error when attempting to get certificate from learner server: {e}")
Expand Down Expand Up @@ -118,15 +120,21 @@ def get_supported_system(self):
response = self.stub.QuerySupportedSystem(request)
r = {
"data_loaders": {},
"prediction_data_loaders": {},
"model_architectures": {},
"compatibilities": {}
"data_compatibilities": {},
"pred_compatibilities": {},
}
for d in response.data_loaders:
r["data_loaders"][d.name] = d.default_parameters
for p in response.prediction_data_loaders:
r["prediction_data_loaders"][p.name] = p.default_parameters
for m in response.model_architectures:
r["model_architectures"][m.name] = m.default_parameters
for c in response.compatibilities:
r["compatibilities"][c.model_architecture] = c.dataloaders
for dc in response.data_compatibilities:
r["data_compatibilities"][dc.model_architecture] = dc.dataloaders
for pc in response.pred_compatibilities:
r["pred_compatibilities"][pc.model_architecture] = pc.prediction_dataloaders
return r

def get_version(self):
Expand All @@ -137,18 +145,29 @@ def get_version(self):
return response.version

def setup_ml(self, dataset_loader_name, dataset_loader_parameters,
model_arch_name, model_parameters):

_logger.info(f"Setting up ml: model_arch: {model_arch_name}, dataset_loader: {dataset_loader_name}")
model_arch_name, model_parameters,
prediction_dataset_loader_name=None,
prediction_dataset_loader_parameters=None,
):
_logger.info(
f"Setting up ml: model_arch: {model_arch_name}, dataset_loader: {dataset_loader_name},"
f"prediction_dataset_loader: {prediction_dataset_loader_name}")
_logger.debug(f"Model params: {model_parameters}")
_logger.debug(f"Dataloader params: {dataset_loader_parameters}")
_logger.debug(
f"Prediction dataloader params: {prediction_dataset_loader_parameters}")

request = ipb2.RequestMLSetup()
request.dataset_loader_name = dataset_loader_name
request.dataset_loader_parameters = dataset_loader_parameters
request.model_arch_name = model_arch_name
request.model_parameters = model_parameters

if request.prediction_dataset_loader_name:
request.prediction_dataset_loader_name = prediction_dataset_loader_name
if request.prediction_dataset_loader_parameters:
request.prediction_dataset_loader_parameters = prediction_dataset_loader_parameters

_logger.info(f"Setting up ml with request: {request}")

try:
Expand All @@ -173,7 +192,8 @@ def mli_propose_weights(self) -> Weights:
def mli_test_weights(self, weights: Weights = None) -> ProposedWeights:
try:
if weights:
response = self.stub.TestWeights(weights_to_iterator(weights, encode=False))
response = self.stub.TestWeights(
weights_to_iterator(weights, encode=False))
else:
raise Exception("mli_test_weights(None) is not currently supported")

Expand Down Expand Up @@ -211,3 +231,19 @@ def mli_get_current_model(self) -> ColearnModel:
response = self.stub.GetCurrentModel(request)

return ColearnModel(model_format=response.model_format, model_file=response.model_file, model=response.model)

def mli_make_prediction(self, request: PredictionRequest) -> Prediction:
request_pb = ipb2.PredictionRequest()
request_pb.name = request.name
request_pb.input_data = request.input_data
if request.pred_data_loader_key:
request_pb.pred_data_loader_key = request.pred_data_loader_key

_logger.info(f"Requesting prediction {request.name}")

try:
response = self.stub.MakePrediction(request_pb)
return Prediction(name=response.name, prediction_data=response.prediction_data)
except grpc.RpcError as ex:
_logger.exception(f"Failed to make_prediction: {ex}")
raise ConnectionError(f"GRPC error: {ex}")
Loading