COM6012 Scalable Machine Learning 2024 by Shuo Zhou at The University of Sheffield
- Task 1: To finish in the lab session on 19th April. Essential
- Task 2: To finish by the following Wednesday 24th March. Exercise
- Task 3: To explore further. Optional
- Chapters Clustering and RFM Analysis of PySpark tutorial
- Clustering in Spark
- PySpark API on clustering
- PySpark code on clustering
- k-means clustering on Wiki
- k-means++ on Wiki
- k-means|| paper
k-means is one of the most commonly used clustering algorithms that clusters the data points into a predefined number of clusters. The Spark MLlib implementation includes a parallelized variant of the k-means++ method called k-means||.
is implemented as an Estimator
and generates a KMeansModel
as the base model.
API: class'features', predictionCol='prediction', k=2, initMode='k-means||', initSteps=2, tol=0.0001, maxIter=20, seed=None, distanceMeasure='euclidean', weightCol=None)
The following parameters are available:
- k: the number of desired clusters.
- maxIter: the maximum number of iterations
- initMode: specifies either random initialization or initialization via k-means||
- initSteps: determines the number of steps in the k-means|| algorithm (default=2, advanced)
- tol: determines the distance threshold within which we consider k-means to have converged.
- seed: setting the random seed (so that multiple runs have the same results)
- distanceMeasure: either Euclidean (default) or cosine distance measure
- weightCol: optional weighting of data points
First log into the Stanage cluster
ssh $
You need to replace $USER
with your username (using lowercase and without $
Once logged in, we can request 2 cpu cores from reserved resources by
srun --account=default --reservation=com6012-8 --cpus-per-task=2 --time=01:00:00 --pty /bin/bash
if the reserved resources are not available, request core from the general queue by
srun --pty --cpus-per-task=2 bash -i
Now set up our conda environment, using
source # assuming you copied HPC/ to your root directory (see Lab 1, Task 2)
if you created a
script in Lab 1. If not, use
module load Java/17.0.4
module load Anaconda3/2022.05
source activate myspark
We'll be generating plots as part of this lab, so you will need to install matplotlib
if you have not done so already with:
pip install matplotlib
Now we can start the PySpark shell with two cpu cores
cd com6012/ScalableML # our main working directory
pyspark --master local[2] # start pyspark with the 2 cpu cores requested above.
If you are experiencing a segmentation fault
when entering the pyspark
interactive shell, run export LANG=en_US.UTF-8 LC_ALL=en_US.UTF-8
to fix it. It is recommended to add this line to your
We will do some plotting in this lab. To plot and save figures on HPC, we need to do the following before using pyplot:
import matplotlib
matplotlib.use('Agg') # Must be before importing matplotlib.pyplot or pylab!
Now import modules needed in this lab:
from import KMeans
from import KMeansModel
from import ClusteringEvaluator
from import Vectors
import matplotlib.pyplot as plt
Here, we study
data = [(Vectors.dense([0.0, 0.0]),), (Vectors.dense([1.0, 1.0]),),
(Vectors.dense([9.0, 8.0]),), (Vectors.dense([8.0, 9.0]),)]
df = spark.createDataFrame(data, ["features"])
kmeans = KMeans(k=2, seed=1) # Two clusters with seed = 1
model =
We examine the cluster centers (centroids) and use the trained model to "predict" the cluster index for a data point.
centers = model.clusterCenters()
# 2
for center in centers:
# [0.5 0.5]
# [8.5 8.5]
# 0
We can use the trained model to cluster any data points in the same space, where the cluster index is considered as the prediction
transformed = model.transform(df)
# +---------+----------+
# | features|prediction|
# +---------+----------+
# |[0.0,0.0]| 0|
# |[1.0,1.0]| 0|
# |[9.0,8.0]| 1|
# |[8.0,9.0]| 1|
# +---------+----------+
We can examine the training summary for the trained model.
# True
summary = model.summary
# < object at 0x2b1662948d30>
# 2
# [2, 2]]
summary.trainingCost #sum of squared distances of points to their nearest center
# 2.0
You can check out the KMeansSummary API for details of the summary information, e.g., we can find out that the training cost is the sum of squared distances to the nearest centroid for all points in the training dataset.
We can save an algorithm/model in a temporary location (see API on save) and then load it later.
Save and load the
import tempfile
temp_path = tempfile.mkdtemp()
kmeans_path = temp_path + "/kmeans"
kmeans2 = KMeans.load(kmeans_path)
# 2
Save and load the learned
model_path = temp_path + "/kmeans_model"
model2 = KMeansModel.load(model_path)
# False
# [array([0.5, 0.5]), array([8.5, 8.5])]
Clustering of the Iris flower data set is a classical example discussed the Wikipedia page of $k$-means clustering. This data set was introduced by Ronald Fisher, "the father of modern statistics and experimental design" (and thus machine learning) and also "the greatest biologist since Darwin". The code below is based on Chapter Clustering of PySpark tutorial, with some changes introduced.
df ="Data/iris.csv", format="csv", inferSchema="true", header="true").cache(),True)
# +------------+-----------+------------+-----------+-------+
# |sepal_length|sepal_width|petal_length|petal_width|species|
# +------------+-----------+------------+-----------+-------+
# | 5.1| 3.5| 1.4| 0.2| setosa|
# | 4.9| 3.0| 1.4| 0.2| setosa|
# | 4.7| 3.2| 1.3| 0.2| setosa|
# | 4.6| 3.1| 1.5| 0.2| setosa|
# | 5.0| 3.6| 1.4| 0.2| setosa|
# +------------+-----------+------------+-----------+-------+
# only showing top 5 rows
# root
# |-- sepal_length: double (nullable = true)
# |-- sepal_width: double (nullable = true)
# |-- petal_length: double (nullable = true)
# |-- petal_width: double (nullable = true)
# |-- species: string (nullable = true)
We can use .describe().show()
to inspect the (statistics of) data:
# +-------+------------------+-------------------+------------------+------------------+---------+
# |summary| sepal_length| sepal_width| petal_length| petal_width| species|
# +-------+------------------+-------------------+------------------+------------------+---------+
# | count| 150| 150| 150| 150| 150|
# | mean| 5.843333333333335| 3.0540000000000007|3.7586666666666693|1.1986666666666672| null|
# | stddev|0.8280661279778637|0.43359431136217375| 1.764420419952262|0.7631607417008414| null|
# | min| 4.3| 2.0| 1.0| 0.1| setosa|
# | max| 7.9| 4.4| 6.9| 2.5|virginica|
# +-------+------------------+-------------------+------------------+------------------+---------+
Use a transData
function similar to that in Lab 2 to convert the attributes into feature vectors.
def transData(data):
return r: [Vectors.dense(r[:-1])]).toDF(['features'])
dfFeatureVec= transData(df).cache(), False)
# +-----------------+
# |features |
# +-----------------+
# |[5.1,3.5,1.4,0.2]|
# |[4.9,3.0,1.4,0.2]|
# |[4.7,3.2,1.3,0.2]|
# |[4.6,3.1,1.5,0.2]|
# |[5.0,3.6,1.4,0.2]|
# +-----------------+
# only showing top 5 rows
We can perform a Silhouette Analysis to determine silhouette
is the default metric. You can also refer to this scikit-learn notebook on the same topic. Other ways of determining the best
import numpy as np
silhouettes = np.zeros(numK)
costs= np.zeros(numK)
for k in range(2,numK): # k = 2:9
kmeans = KMeans().setK(k).setSeed(11)
model =
predictions = model.transform(dfFeatureVec)
evaluator = ClusteringEvaluator() # to compute the silhouette score
silhouettes[k] = evaluator.evaluate(predictions)
We can take a look at the clustering results (the prediction
below is the cluster index/label).
# +-----------------+----------+
# | features|prediction|
# +-----------------+----------+
# |[5.1,3.5,1.4,0.2]| 1|
# |[4.9,3.0,1.4,0.2]| 1|
# |[4.7,3.2,1.3,0.2]| 1|
# |[4.6,3.1,1.5,0.2]| 1|
# |[5.0,3.6,1.4,0.2]| 1|
# |[5.4,3.9,1.7,0.4]| 5|
# |[4.6,3.4,1.4,0.3]| 1|
# |[5.0,3.4,1.5,0.2]| 1|
# |[4.4,2.9,1.4,0.2]| 1|
# |[4.9,3.1,1.5,0.1]| 1|
# |[5.4,3.7,1.5,0.2]| 5|
# |[4.8,3.4,1.6,0.2]| 1|
# |[4.8,3.0,1.4,0.1]| 1|
# |[4.3,3.0,1.1,0.1]| 1|
# |[5.8,4.0,1.2,0.2]| 5|
# +-----------------+----------+
# only showing top 15 rows
Plot the cost (sum of squared distances of points to their nearest centroid, the smaller the better) against
fig, ax = plt.subplots(1,1, figsize =(8,6))
We can see that this cost measure is biased towards a large
fig, ax = plt.subplots(1,1, figsize =(8,6))
We can see that the silhouette measure is biased towards a small
Carry out some further studies on the iris clustering problem above.
- Choose
$k=3$ and evaluate the clustering results against the ground truth (class labels) using the Normalized Mutual Information (NMI) available in scikit-learn. You need to installscikit-learn
in themyspark
environment viaconda install -y scikit-learn
. This allows us to study the clustering quality when we know the true number of clusters. - Use multiple (e.g., 10 or 20) random seeds to generate different clustering results and plot the respective NMI values (with respect to ground truth with
$k=3$ as in the question above) to observe the effect of initialisation.
- Follow Chapter RFM Analysis of PySpark tutorial to perform RFM Customer Value Analysis
- The data can be downloaded from Online Retail Data Set at UCI.
- Note the data cleaning step that checks and removes rows containing null value via
. You may need to do the same when you are dealing with real data. - The data manipulation steps are also useful to learn.
- The original task is a classification task. We can ignore the class labels and perform clustering on the data.
- Write a standalone program (and submit as a batch job to HPC) to do
$k$ -means clustering on the KDDCUP1999 data with 4M points. You may start with the smaller 10% subset.
- Follow the scikit-learn example Color Quantization using K-Means to perform the same using PySpark on your high-resolution photos.