Using k-means Machine Learning Algorithm with Apache Spark and R

In this post, I will demonstrate the usage of the k-means clustering algorithm in R and in Apache Spark.Apache Spark (hereinafter Spark) offers two implementations of k-means algorithm: one is packaged with its MLlib library; the other one exists in Spark’s package. While both implementations are currently more or less functionally equivalent, the Spark ML team recommends using the package by showcasing its support for pipeline processing (inspired by the scikit-learn Machine Learning library in Python) and its versatile DataFrame data structure (probably inspired by R’s DataFrame matrix-like structure similar to tables in relational databases, also wildly popular in Python’s pandas library.)

The k-means clustering is an example of an unsupervised ML algorithm where you are only required to give a hint to the computer as to how many clusters (classes of objects) you expect to be present in your data set. The algorithm will go ahead and use your data as the training data set to build a model and try to figure out the boundaries of those clusters. Then you can proceed to the classification phase with your test data.

With k-means, you, essentially, have your computer (or a cluster of computers) perform a partitioning of your data into Voronoi cells where the cells represent the identified clusters.

Working on this blog article, I used the CDH 5.8 QuickStart VM; some of the setup details can be found in this post. I am going to leave out some of the basic and/or unnecessary details in the discussion below that I believe would not aid in your understanding of the topics at hand.

Data Preparation and Using k-means in R

For input data, I created an input flat file containing 150 records, each of which has two space-delimited numbers — a very simple two-dimensional data set. Every record represents an observation (case) of a two-variable data point. File generation details are coming up in a moment …

I used R to generate the file as I like its compact way for creating sequences of random normally distributed numbers with a given mean and a standard deviation and its built-in production quality graphics library.

Note: Yes, you can find all those capabilities in Python’s scikit-learn as well.

The file was generated using this R script:
dataSet <- rbind(matrix(rnorm(100, mean = 5, sd = 1.0), ncol = 2),
matrix(rnorm(100, mean = 2, sd = 0.7), ncol = 2),
matrix(rnorm(100, mean = 10, sd = 2.0), ncol = 2))

which, essentially, creates a two-dimensional matrix made up of three data sets with the following parameters:

1. mean = 5, standard deviation (sd) = 1.0
2. mean = 2, standard deviation (sd) = 0.7
3. mean = 10, standard deviation (sd) = 2.0

As you can see, we have three distinct groups of data points grouped around their respective mean values (which will be later identified as cluster centroids by the k-means algorithm) scattered round as per their sd values. Those groups will make into the class clusters.

Note: 300 (100 + 100 + 100) generated random numbers are arranged into two columns using the ncol=2 named parameter. As you can see, I’m using the same value range for both columns.

I named the columns (which become features) ‘Feature A’ and ‘Feature B’ using this code:
colnames(X) <- c("Feature A", "Feature B")

In the above code, c (combine) is the R function for creating a vector of elements of the same type.

The kmeans R function call below does what its name suggests to, so you can see how easily you can perform the k-means clustering computation in R as well as get useful information about the created model:

clusters <- kmeans(X, 3, nstart=11)

The input data set is X; 3 is the ‘k’ — the number of clusters — I know this as I did so on purpose (you can try out other k values to see how the algorithm handle those curveballs); nstart controls how many random starting points to use — its value should be an odd number to have a majority vote during the final decision made by the algorithm on the centroid location.

When I ran the above code, the kmeans function generated the following report:

K-means clustering with 3 clusters of sizes 47, 52, 51
Cluster means:
Feature A Feature B
1 10.185935 10.162513
2 1.997293 2.353496
3 5.380902 5.319707

Close enough, given the wide data dispersion of the 1st cluster which resulted in losing three of its points (47 = 50 – 3) and the second centroid attracting 2 points (52 = 50 + 2) that, probably, do not belong to it.

The graph plotting was done with these commands:

plot(X, col = clusters$cluster);
points(clusters$centers, col = 1:2:3, pch = 10, cex = 8)

The plotting results are shown in the diagram below.

Three clusters generated in R

Three clusters generated in R

When R completes its work, it offers you some additional information about the lay of the clusters.

For example, it gives you the following metric for cluster cohesiveness:

Within cluster sum of squares by cluster:
[1] 57.80008 278.54440 129.16181
(between_SS / total_SS = 87.3 %)

I saved the file using this command:

write.table (X, "...Path of Your Choice ...", row.names = FALSE, col.names=FALSE )

OK, we are done with the R spiel.

Upload the saved file to CDH QuickStart VM, or copy and paste the dump of the X matrix to a file you want to create on the VM (the VMware Player supports cross-system clipboard operations.) Optionally, you can upload the file to HDFS.

Now we are ready to work in Spark.

Spark k-means Application

I am going to use the Python REPL Spark shell as, in many cases, working in Python on Spark is more in line with the nature of such data experimentation activities.

The code we are going to use has been adapted from Spark k-means examples.

Before you can start coding ML programs in Python on Spark, you need to install the NumPy scientific computing package; you can do so using this command on CentOS (on which CDH Quick Start environment is running):
sudo yum install numpy

Fire up the Python Spark Shell by running the following command:

Now enter the following Python code:

from __future__ import print_function
import numpy as np
from pyspark.mllib.clustering import KMeans
from pyspark.mllib.clustering import KMeansModel

inputFile = "...Location of the uploaded or created file (file:// | hdfs://) ..."
k = 3

lines = sc.textFile(inputFile)

data = line: np.array([float(x) for x in line.split(' ')]))

model = KMeans.train(data, k, runs=7)

for x in model.clusterCenters: print(x)

The data = line: np.array([float(x) for x in line.split(' ')])) statement takes each line from the lines RDD, and converts it into a two-element array of the float type. Then it constructs another RDD (more specifically, an instance of PythonRDD) which, in essence, represents the original two-dimensional data of our R matrix.

The float(x) float conversion uses the Python’s neat list comprehension idiom.

The centerpiece of the program is this statement:
model = KMeans.train(data, k, runs=7)
which has the same disposition of elements we saw in the kmeans R function above.

MLlib implements k-means clustering with the support for parallel runs (the k-means|| algorithm by Bahmani et al), which is controlled by the runs named parameter above.

Note: k-means is an iterative algorithm that makes multiple passes over the input data, and caching the input RDD is an efficient (and cheap!) way to improve algorithm’s performance. We do not do it in our rather small data set.

This statement: for x in model.clusterCenters: print(x) prints a 2D array that contains the coordinates in feature values of three centroids identified in our input data. In my run, I had these values:

[ 9.56460123 10.36706413]
[ 5.16458397 5.03103274]
[ 2.1412337 2.03188558]

Interestingly, Python k-means implementation seems to be falling behind Spark’s Scala implementation which already has a method for calculating the Within Set Sum of Squared Error (WSSSE) metric, confusingly named in Spark computeCost. In Python, you will have to calculate it yourself. It may well be that it was done on purpose to illustrate how elegantly you can do it in Python!

After training our model, which appears to be right on the money in identifying the cluster centroid coordinates (expressed in feature values), we can perform the classification step.

Enter the following command:
model.predict ([9,9])

You should see the following output:

This is the index of the first row in the centroids table above.

Enter the following command:
model.predict ([3,2])

You should see the following output:

Which corresponds to the 3rd row in the centroids table above ([ 2.1412337 2.03188558])

Save, Load and Use Again Your Model

Spark’s MLlib implementation of k-means allows you to easily serialize and persist the trained model out to any of the supported storage types: local file system, HDFS, S3, etc., so that you can re-load and use it for classification at a later time. A very useful feature, indeed.

That’s all I wanted to write about in today’s blog.

Till next time!

  1. No comments yet.
(will not be published)