Skip to content

Commit

Permalink
Merge pull request #8 from nabeel-oz/unsupervised-ml
Browse files Browse the repository at this point in the history
Unsupervised machine learning with scikit-learn
  • Loading branch information
nabeel-oz authored Sep 14, 2018
2 parents dd8e93f + 94385c1 commit 364ba03
Show file tree
Hide file tree
Showing 8 changed files with 820 additions and 199 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ This repository provides a server side extension (SSE) for Qlik Sense built usin
The current implementation includes:

- **Supervised Machine Learning** : Implemented using [scikit-learn](http://scikit-learn.org/stable/index.html), the go-to machine learning library for Python. This SSE implements the full machine learning flow from data preparation, model training and evaluation, to making predictions in Qlik.
- **Unupervised Machine Learning** : Also implemented using [scikit-learn](http://scikit-learn.org/stable/index.html). This provides capabilities for dimensionality reduction and clustering.
- **Clustering** : Implemented using [HDBSCAN](https://hdbscan.readthedocs.io/en/latest/comparing_clustering_algorithms.html), a high performance algorithm that is great for exploratory data analysis.
- **Time series forecasting** : Implemented using [Facebook Prophet](https://research.fb.com/prophet-forecasting-at-scale/), a modern library for easily generating good quality forecasts.
- **Seasonality and holiday analysis** : Also using Facebook Prophet.
Expand Down Expand Up @@ -85,4 +86,4 @@ Sample Qlik Sense apps are provided and each app includes extensive techniques t
| [Correlations](docs/Correlation.md) | [Sample App - Correlations](docs/Sample_App_Correlations.qvf) | None. |
| [Clustering](docs/Clustering.md) | [Sample App - Clustering with HDBSCAN](docs/Sample_App_Clustering.qvf) | The [qsVariable](https://github.com/erikwett/qsVariable) extension. <br/><br/>Qlik Sense April 2018 or later to view the multi-layered maps. |
| [Forecasting](docs/Prophet.md) | [Sample App - Facebook Prophet (Detailed)](docs/Sample_App_Prophet.qvf)<br><br>[Sample App - Facebook Prophet (Simple)](docs/Sample_App_Forecasting_Simple.qvf) | The [qsVariable](https://github.com/erikwett/qsVariable) extension. <br/><br/>Use the bookmarks to step through the sheets with relevant selections. |
| [Supervised Machine Learning](docs/scikit-learn.md) | [Sample App - Train & Test](docs/Sample-App-scikit-learn-Train-Test.qvf)<br><br>[Sample App - Predict](docs/Sample-App-scikit-learn-Predict.qvf)<br><br>[Sample App - Parameter Tuning](docs/Sample-App-scikit-learn-Parameter-Tuning.qvf) | Make sure you run the load for the Train_Test app before using the Predict app.<br><br>The [qsVariable](https://github.com/erikwett/qsVariable) extension. |
| [Machine Learning](docs/scikit-learn.md) | [Sample App - Train & Test](docs/Sample-App-scikit-learn-Train-Test.qvf)<br><br>[Sample App - Predict](docs/Sample-App-scikit-learn-Predict.qvf)<br><br>[Sample App - Parameter Tuning](docs/Sample-App-scikit-learn-Parameter-Tuning.qvf) | Make sure you run the load for the Train_Test app before using the Predict app.<br><br>The [qsVariable](https://github.com/erikwett/qsVariable) extension. |
147 changes: 108 additions & 39 deletions core/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
# Set the default port for this SSE Extension
_DEFAULT_PORT = '50055'

# Set the maximum message length for gRPC in bytes
MAX_MESSAGE_LENGTH = 10 * 1024 * 1024

_ONE_DAY_IN_SECONDS = 60 * 60 * 24
_MINFLOAT = float('-inf')

Expand Down Expand Up @@ -91,7 +94,10 @@ def functions(self):
22: '_sklearn',
23: '_sklearn',
24: '_sklearn',
25: '_sklearn'
25: '_sklearn',
26: '_sklearn',
27: '_sklearn',
28: '_sklearn'
}

"""
Expand Down Expand Up @@ -150,10 +156,24 @@ def _cluster(request, context):
response_rows = [iter([SSE.Dual(numData=row)]) for row in clusters]

# Values are then structured as SSE.Rows
response_rows = [SSE.Row(duals=duals) for duals in response_rows]

# Yield Row data as Bundled rows
yield SSE.BundledRows(rows=response_rows)
response_rows = [SSE.Row(duals=duals) for duals in response_rows]

# Get the number of rows in the request
num_request_bundles = len(request_list)

# Get the number of rows in the response
num_rows = len(response_rows)

# Calculate the number of rows to send per bundle
if num_rows >= num_request_bundles:
rows_per_bundle = len(response_rows)//len(request_list)
else:
rows_per_bundle = num_rows

# Stream response as BundledRows
for i in range(0, len(response_rows), rows_per_bundle):
# Yield Row data as Bundled rows
yield SSE.BundledRows(rows=response_rows[i : i + rows_per_bundle])

@staticmethod
def _correlation(request, context):
Expand Down Expand Up @@ -311,8 +331,22 @@ def _prophet(request, context):
# Values are then structured as SSE.Rows
response_rows = [SSE.Row(duals=duals) for duals in response_rows]

# Yield Row data as Bundled rows
yield SSE.BundledRows(rows=response_rows)
# Get the number of rows in the request
num_request_bundles = len(request_list)

# Get the number of rows in the response
num_rows = len(response_rows)

# Calculate the number of rows to send per bundle
if num_rows >= num_request_bundles:
rows_per_bundle = len(response_rows)//len(request_list)
else:
rows_per_bundle = num_rows

# Stream response as BundledRows
for i in range(0, len(response_rows), rows_per_bundle):
# Yield Row data as Bundled rows
yield SSE.BundledRows(rows=response_rows[i : i + rows_per_bundle])

@staticmethod
def _prophet_seasonality(request, context):
Expand Down Expand Up @@ -370,10 +404,22 @@ def _prophet_seasonality(request, context):
# The series is then converted to a list
response_rows = response_rows.apply(lambda duals: SSE.Row(duals=duals)).tolist()

# Iterate over bundled rows
for request_rows in request_list:
# Get the number of rows in the request
num_request_bundles = len(request_list)

# Get the number of rows in the response
num_rows = len(response_rows)

# Calculate the number of rows to send per bundle
if num_rows >= num_request_bundles:
rows_per_bundle = len(response_rows)//len(request_list)
else:
rows_per_bundle = num_rows

# Stream response as BundledRows
for i in range(0, len(response_rows), rows_per_bundle):
# Yield Row data as Bundled rows
yield SSE.BundledRows(rows=response_rows)
yield SSE.BundledRows(rows=response_rows[i : i + rows_per_bundle])

@staticmethod
def _sklearn(request, context):
Expand All @@ -395,7 +441,7 @@ def _sklearn(request, context):
model = SKLearnForQlik(request_list, context)

# Call the function based on the mapping in functions.json
# The IF conditions are grouped based on similar output structure
# The if conditions are grouped based on similar output structure
if function in (9, 10, 21, 24):
if function == 9:
# Set up the model and save to disk
Expand All @@ -410,25 +456,19 @@ def _sklearn(request, context):
# Set a parameter grid for hyperparameter optimization
response = model.set_param_grid()

# Get the response as SSE.Rows
response_rows = utils.get_response_rows(response.values.tolist(), ["str", "str", "str"])
dtypes = ["str", "str", "str"]

elif function == 11:
# Return the feature definitions for an existing model
response = model.get_features()

# Get the response as SSE.Rows
response_rows = utils.get_response_rows(response.values.tolist(), ["str", "num", "str", "str", "str",\
"str", "num"])
dtypes = ["str", "num", "str", "str", "str", "str", "str"]

elif function == 12:
# Train and Test an existing model, saving the sklearn pipeline for further predictions
response = model.fit()

# Get the response as SSE.Rows
response_rows = utils.get_response_rows(response.values.tolist(), ["str", "str", "str", "str", "num"])
dtypes = ["str", "str", "str", "str", "num"]

elif function in (14, 16, 19, 20):
elif function in (14, 16, 19, 20, 27):
if function == 14:
# Provide predictions in a chart expression based on an existing model
response = model.predict(load_script=False)
Expand All @@ -441,20 +481,24 @@ def _sklearn(request, context):
elif function == 20:
# Get a string that can be evaluated to get the features expression for the predict function
response = model.get_features_expression()
elif function == 27:
# Get labels for clustering
response = model.fit_transform(load_script=False)

# Get the response as SSE.Rows
response_rows = utils.get_response_rows(response.values.tolist(), ["str"])
dtypes = ["str"]

elif function in (15, 17):
elif function in (15, 17, 28):
if function == 15:
# Provide predictions in the load script based on an existing model
response = model.predict(load_script=True)
if function == 17:
elif function == 17:
# Provide prediction probabilities in the load script based on an existing model
response = model.predict(load_script=True, variant="predict_proba")

# Get the response as SSE.Rows
response_rows = utils.get_response_rows(response.values.tolist(), ["str", "str", "str"])
elif function == 28:
# Provide labels for clustering
response = model.fit_transform(load_script=True)

dtypes = ["str", "str", "str"]

elif function in (18, 22):
if function == 18:
Expand All @@ -471,24 +515,47 @@ def _sklearn(request, context):

# We convert values to type SSE.Dual, and group columns into a iterable
if estimator_type == "classifier":
# Get the response as SSE.Rows
response_rows = utils.get_response_rows(response.values.tolist(), ["str", "str", "num", "num", "num",\
"num", "num"])
dtypes = ["str", "str", "num", "num", "num", "num", "num"]
elif estimator_type == "regressor":
# Get the response as SSE.Rows
response_rows = utils.get_response_rows(response.values.tolist(), ["str", "num", "num", "num", "num", "num"])
dtypes = ["str", "num", "num", "num", "num", "num"]

elif function == 23:
# Get the confusion matrix for the classifier
response = model.get_confusion_matrix()
response_rows = utils.get_response_rows(response.values.tolist(), ["str", "str", "str", "num"])
dtypes = ["str", "str", "str", "num"]

elif function == 25:
# Get the best parameters based on a grid search cross validation
response = model.get_best_params()
response_rows = utils.get_response_rows(response.values.tolist(), ["str", "str"])

# Yield Row data as Bundled rows
yield SSE.BundledRows(rows=response_rows)
dtypes = ["str", "str"]

elif function == 26:
# Provide results from dimensionality reduction
response = model.fit_transform(load_script=True)
dtypes = ["str", "str"]

for i in range(response.shape[1]-2):
dtypes.append("num")

# Get the response as SSE.Rows
response_rows = utils.get_response_rows(response.values.tolist(), dtypes)

# Get the number of rows in the request
num_request_bundles = len(request_list)

# Get the number of rows in the response
num_rows = len(response_rows)

# Calculate the number of rows to send per bundle
if num_rows >= num_request_bundles:
rows_per_bundle = len(response_rows)//len(request_list)
else:
rows_per_bundle = num_rows

# Stream response as BundledRows
for i in range(0, len(response_rows), rows_per_bundle):
# Yield Row data as Bundled rows
yield SSE.BundledRows(rows=response_rows[i : i + rows_per_bundle])

@staticmethod
def _get_function_id(context):
Expand Down Expand Up @@ -569,7 +636,9 @@ def Serve(self, port, pem_dir):
:param pem_dir: Directory including certificates
:return: None
"""
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10),\
options=[('grpc.max_send_message_length', MAX_MESSAGE_LENGTH),('grpc.max_receive_message_length', MAX_MESSAGE_LENGTH)])

SSE.add_ConnectorServicer_to_server(self, server)

if pem_dir:
Expand Down
Loading

0 comments on commit 364ba03

Please sign in to comment.