Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added ExposedModel to produce predictions #24

Merged
merged 13 commits into from
Apr 16, 2024
Merged
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
.DS_STORE

test_data/feature-store.json
test_data/mlruns

# Byte-compiled / optimized / DLL files
__pycache__/
Expand Down
202 changes: 56 additions & 146 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

A data managment tool for ML applications.

Similar to have DBT is a data managment tool for business analytics, will Aligned manage ML projects.
Similar to how DBT is a data managment tool for business analytics, will Aligned manage ML projects.

Aligned does this through two things.
1. A light weight data managment system. Making it possible to query a data lake and databases.
Expand Down Expand Up @@ -31,7 +31,7 @@ View the [`MatsMoll/aligned-example` repo](https://github.com/MatsMoll/aligned-e
Or see how you could query a file in a data lake.

```python
store = await FeatureStore.from_dir(".")
store = await ContractStore.from_dir(".")
df = await store.execute_sql("SELECT * FROM titanic LIMIT 10").to_polars()
```

Expand All @@ -50,11 +50,8 @@ Bellow are some of the features Aligned offers:
- [Model Performance Monitoring](https://aligned-managed-web.vercel.app/)
- [Data Freshness](#data-freshness)
- [Data Quality Assurance](#data-quality)
- [Easy Data Loading](#access-data)
- [Feature Store](https://matsmoll.github.io/posts/understanding-the-chaotic-landscape-of-mlops#feature-store)
- [Load Form Multiple Sources](#fast-development)
- [Feature Server](#feature-server)
- [Stream Processing](#stream-worker)
- [Exposing Models](#exposed-models)


All from the simple API of defining
Expand Down Expand Up @@ -91,12 +88,12 @@ All this is described through a `model_contract`, as shown bellow.
```python
@model_contract(
name="eta_taxi",
features=[
input_features=[
trips.eucledian_distance,
trips.number_of_passengers,
traffic.expected_delay
],
prediction_source=FileSource.delta_at("titanic_model/predictions")
output_source=FileSource.delta_at("titanic_model/predictions")
)
class EtaTaxi:
trip_id = Int32().as_entity()
Expand All @@ -111,20 +108,19 @@ Alinged makes handling data sources easy, as you do not have to think about how
Furthermore, Aligned makes it easy to switch parts of the business logic to a local setup for debugging purposes.

```python
from aligned import FileSource, AwsS3Config, AzureBlobConfig, Directory
import os
from aligned import FileSource, AwsS3Config, AzureBlobConfig

root_directory: Directory = FileSource.directory("my-awesome-project")

if os.getenv("USE_AWS", "false").lower() == "true":
dir_type: Literal["local", "aws", "azure"] = ...

if dir_type == "aws":
aws_config = AwsS3Config(...)
root_directory = aws_config.directory("my-awesome-project")

elif os.getenv("USE_AZURE", "false").lower() == "true":

elif dir_type == "azure":
azure_config = AzureBlobConfig(...)
root_directory = azure_config.directory("my-awesome-project")
else:
root_directory = FileSource.directory("my-awesome-project")


taxi_project = root_directory.sub_directory("eta_taxi")
Expand Down Expand Up @@ -183,64 +179,50 @@ class TitanicPassenger:
is_male, is_female = sex.one_hot_encode(['male', 'female'])
```

### Fast development
### Exposed models

Making iterativ and fast exploration in ML is important. This is why Aligned also makes it super easy to combine, and test multiple sources.
Aligned mainly focuses on defining the expected input and output of different models. However, this in itself makes it hard to use the models. This is why Aligned makes it possible to define how our ML models are exposed by setting an `exposed_model` attribute.

```python
my_db = PostgreSQLConfig.localhost()

aws_bucket = AwsS3Config(...)

@feature_view(
name="passengers",
description="...",
source=my_db.table("passengers")
)
class TitanicPassenger:

passenger_id = Int32().as_entity()
```python
from aligned.exposed_model.mlflow import mlflow_server

# Some features
@model_contract(
name="eta_taxi",
exposed_model=mlflow_server(
host="http://localhost:8000",
),
...

# Change data source
passenger_view = TitanicPassenger.query()

psql_passengers = await passenger_view.all().to_pandas()
aws_passengers = await passenger_view.using_source(
aws_bucket.parquet_at("passengers.parquet")
).to_pandas()

)
class EtaTaxi:
trip_id = Int32().as_entity()
predicted_at = EventTimestamp()
predicted_duration = trips.duration.as_regression_target()
```

## Describe Models

Usually will you need to combine multiple features for each model.
This is where a `Model` comes in.
Here can you define which features should be exposed.
This also makes it possible to get predictions with the following command:

```python
passenger = TitanicPassenger()
location = LocationFeatures()
await store.model("eta_taxi").predict_over({
"trip_id": [...]
}).to_polars()
```

@model_contract(
name="titanic",
features=[ # aka. the model input
passenger.constant_filled_age,
passenger.ordinal_sex,
passenger.sibsp,

location.distance_to_shore,
location.distance_to_closest_boat
]
)
class Titanic:
Or store them directly in the `output_source` with something like:

# Referencing the passenger's survived feature as the target
did_survive = passenger.survived.as_classification_target()
```python
await store.model("eta_taxi").predict_over({
"trip_id": [...]
}).upsert_into_output_source()
```

Some of the existing implementations are:
- MLFlow Server
- Run MLFLow model in memory
- Ollama completion endpoint
- Ollama embedded endpoint
- Send entities to generic endpoint

## Data Freshness
Making sure a source contains fresh data is a crucial part to create propper ML applications.
Therefore, Aligned provides an easy way to check how fresh a source is.
Expand Down Expand Up @@ -272,66 +254,6 @@ if freshness < datetime.now() - timedelta(days=2):
raise ValueError("To old data to create an ML model")
```

## Access Data

You can easily create a feature store that contains all your feature definitions.
This can then be used to genreate data sets, setup an instce to serve features, DAG's etc.

```python
store = await FileSource.json_at("./feature-store.json").feature_store()

# Select all features from a single feature view
df = await store.all_for("passenger", limit=100).to_pandas()
```

### Centraliced Feature Store Definition
You would often share the features with other coworkers, or split them into different stages, like `staging`, `shadow`, or `production`.
One option is therefore to reference the storage you use, and load the `FeatureStore` from there.

```python
aws_bucket = AwsS3Config(...)
store = await aws_bucket.json_at("production.json").feature_store()

# This switches from the production online store to the offline store
# Aka. the batch sources defined on the feature views
experimental_store = store.offline_store()
```
This json file can be generated by running `aligned apply`.

### Select multiple feature views

```python
df = await store.features_for({
"passenger_id": [1, 50, 110]
}, features=[
"passenger:scaled_age",
"passenger:is_male",
"passenger:sibsp"

"other_features:distance_to_closest_boat",
]).to_polars()
```

### Model Service

Selecting features for a model is super simple.


```python
df = await store.model("titanic_model").features_for({
"passenger_id": [1, 50, 110]
}).to_pandas()
```

### Feature View

If you want to only select features for a specific feature view, then this is also possible.

```python
prev_30_days = await store.feature_view("match").previous(days=30).to_pandas()
sample_of_20 = await store.feature_view("match").all(limit=20).to_pandas()
```

## Data quality
Alinged will make sure all the different features gets formatted as the correct datatype.
In addition will aligned also make sure that the returend features aligne with defined constraints.
Expand Down Expand Up @@ -363,40 +285,28 @@ df = await store.model("titanic_model").features_for({
).to_pandas()
```

## Feature Server
## Contract Store

You can define how to serve your features with the `FeatureServer`. Here can you define where you want to load, and potentially write your features to.
Aligned collects all the feature views and model contracts in a contract store. You can generate this in a few different ways, and each method serves some different use-cases.

By default will it `aligned` look for a file called `server.py`, and a `FeatureServer` object called `server`. However, this can be defined manually as well.

```python
from aligned import RedisConfig, FileSource
from aligned.schemas.repo_definition import FeatureServer
For experimentational use-cases will the `await ContractStore.from_dir(".")` probably make the most sense. However, this will scan the full directory which can lead to slow startup times.

store = FileSource.json_at("feature-store.json")
Therefore, it is also possible to manually add the different feature views and contracts with the following.

server = FeatureServer.from_reference(
store,
RedisConfig.localhost()
)
```python
store = ContractStore.empty()
store.add_feature_view(MyView)
store.add_model(MyModel)
```

Then run `aligned serve`, and a FastAPI server will start. Here can you push new features, which then transforms and stores the features, or just fetch them.

## Stream Worker

You can also setup stream processing with a similar structure. However, here will a `StreamWorker` be used.

by default will `aligned` look for a `worker.py` file with an object called `worker`. An example would be the following.
This makes it possible to define different contracts per project, or team. As a result, you can also combine differnet stores with.

```python
from aligned import RedisConfig, FileSource
from aligned.schemas.repo_definition import FeatureServer
combined_store = recommendation_store.combined_with(forecasting_store)
```

store = FileSource.json_at("feature-store.json")
Lastly, we can also load the all features from a serializable format, such as a JSON file.

server = FeatureServer.from_reference(
store,
RedisConfig.localhost()
)
```python
await FileSource.json_at("contracts.json").as_contract_store()
```
17 changes: 16 additions & 1 deletion aligned/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,14 @@
Entity,
EventTimestamp,
Float,
Int8,
Int16,
Int32,
Int64,
UInt8,
UInt16,
UInt32,
UInt64,
Json,
String,
Timestamp,
Expand All @@ -16,7 +22,7 @@
from aligned.compiler.model import model_contract, FeatureInputVersions
from aligned.data_source.stream_data_source import HttpStreamSource
from aligned.data_source.batch_data_source import CustomMethodDataSource
from aligned.feature_store import FeatureStore
from aligned.feature_store import ContractStore, FeatureStore
from aligned.feature_view import feature_view, combined_feature_view, check_schema
from aligned.schemas.text_vectoriser import EmbeddingModel
from aligned.sources.kafka import KafkaConfig
Expand All @@ -26,9 +32,11 @@
from aligned.sources.redshift import RedshiftSQLConfig
from aligned.sources.s3 import AwsS3Config
from aligned.sources.azure_blob_storage import AzureBlobConfig
from aligned.exposed_model.interface import ExposedModel
from aligned.schemas.feature import FeatureLocation

__all__ = [
'ContractStore',
'FeatureStore',
'feature_view',
# Batch Data sources
Expand All @@ -45,11 +53,18 @@
# Streaming Sources
'KafkaConfig',
# Types
'ExposedModel',
'Entity',
'String',
'Bool',
'Entity',
'UUID',
'UInt8',
'UInt16',
'UInt32',
'UInt64',
'Int8',
'Int16',
'Int32',
'Int64',
'Float',
Expand Down
Loading
Loading