Clustering Model in Spark - Customers

We will use SparkML to create a clustering model

Objectives


  1. Use PySpark to connect to a spark cluster.
  2. Create a spark session.
  3. Read a csv file into a data frame.
  4. Use KMeans algorithm to cluster the data
  5. Stop the spark session

Setup


We will be using the following libraries:

  • PySpark for connecting to the Spark Cluster

Suppress Warnings

To suppress warnings generated by our code, we’ll use this code block

# To suppress warnings generated by the code
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')

Import Libraries

# FindSpark simplifies the process of using Apache Spark with Python
import findspark
findspark.init()

from pyspark.sql import SparkSession

#import functions/Classes for sparkml
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans

# import functions/Classes for metrics
from pyspark.ml.evaluation import RegressionEvaluator

Start Spark Session - Task 1


#Create SparkSession
#Ignore any warnings by SparkSession command

spark = SparkSession.builder.appName("Clustreing using SparkML").getOrCreate()

Data


Download Data Locally

import wget
wget.download ("https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0231EN-SkillsNetwork/datasets/customers.csv")

HTTP request sent, awaiting response... 200 OK
Length: 8909 (8.7K) [text/csv]
Saving to: ‘customers.csv’

customers.csv       100%[===================>]   8.70K  --.-KB/s    in 0s      

2024-11-09 22:41:59 (31.3 MB/s) - ‘customers.csv’ saved [8909/8909]

Load CSV into SparkDF

# using the spark.read.csv function we load the data into a dataframe.
# the header = True mentions that there is a header row in out csv file
# the inferSchema = True, tells spark to automatically find out the data types of the columns.

# Load mpg dataset
customer_data = spark.read.csv("customer.csv", header=True, inferSchema=True)

View Schema

customer_data.printSchema()

root
 |-- Fresh_Food: integer (nullable = true)
 |-- Milk: integer (nullable = true)
 |-- Grocery: integer (nullable = true)
 |-- Frozen_Food: integer (nullable = true)

View Data

customer_data.show(5)

+----------+----+-------+-----------+
|Fresh_Food|Milk|Grocery|Frozen_Food|
+----------+----+-------+-----------+
|12669     |9656|7561   |214        |
|7057      |9810|9568   |1762       |
|6353      |8808|7684   |2405       |
|13265     |1196|4221   |6404       |
|22615     |5410|7198   |3915       |
+----------+----+-------+-----------+
only showing top 5 rows

Create Feature Vector


You must also tell the KMeans algorithm how many clusters to create out of your data

# Assemble the features into a single vector column
feature_cols = ['Fresh_Food', 'Milk', 'Grocery', 'Frozen_Food']
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
customer_transformed_data = assembler.transform(customer_data)
# Tell KMeans how many cluster you want
number_of_clusters = 3

Create Clustering Model


kmeans = KMeans(k = number_of_clusters)

Train Model

model = kmeans.fit(customer_transformed_data)

Make Predictions


# Make predictions on the dataset
predictions = model.transform(customer_transformed_data)

View Predictions

# Display the results
predictions.show(5)

+----------+----+-------+-----------+--------------------+----------+
|Fresh_Food|Milk|Grocery|Frozen_Food|            features|prediction|
+----------+----+-------+-----------+--------------------+----------+
|     12669|9656|   7561|        214|[12669.0,9656.0,7...|         0|
|      7057|9810|   9568|       1762|[7057.0,9810.0,95...|         0|
|      6353|8808|   7684|       2405|[6353.0,8808.0,76...|         0|
|     13265|1196|   4221|       6404|[13265.0,1196.0,4...|         0|
|     22615|5410|   7198|       3915|[22615.0,5410.0,7...|         2|
+----------+----+-------+-----------+--------------------+----------+
only showing top 5 rows

View Customers per Cluster

Group the predictions per customer

# Group by cluster and count customers per cluster
predictions.groupBy('prediction').count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|         1|   49|
|         2|   60|
|         0|  331|
+----------+-----+

Change # of Clusters


Let’s go back and change the number of clusters to 4 to see

# Tell KMeans how many cluster you want
number_of_clusters = 3
kmeans = KMeans(k = number_of_clusters)
model = kmeans.fit(customer_transformed_data)
# Make predictions on the dataset
predictions = model.transform(customer_transformed_data)

# Display the results
predictions.show(5)

+----------+----+-------+-----------+--------------------+----------+
|Fresh_Food|Milk|Grocery|Frozen_Food|            features|prediction|
+----------+----+-------+-----------+--------------------+----------+
|     12669|9656|   7561|        214|[12669.0,9656.0,7...|         0|
|      7057|9810|   9568|       1762|[7057.0,9810.0,95...|         0|
|      6353|8808|   7684|       2405|[6353.0,8808.0,76...|         0|
|     13265|1196|   4221|       6404|[13265.0,1196.0,4...|         0|
|     22615|5410|   7198|       3915|[22615.0,5410.0,7...|         2|
+----------+----+-------+-----------+--------------------+----------+
only showing top 5 rows

# Group by cluster and count customers per cluster
predictions.groupBy('prediction').count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|         1|   49|
|         2|   60|
|         0|  331|
+----------+-----+

Change the number of clusters to 2

# Tell KMeans how many cluster you want
number_of_clusters = 2
kmeans = KMeans(k = number_of_clusters)
model = kmeans.fit(customer_transformed_data)
# Make predictions on the dataset
predictions = model.transform(customer_transformed_data)

# Display the results
predictions.show(5)
+----------+----+-------+-----------+--------------------+----------+
|Fresh_Food|Milk|Grocery|Frozen_Food|            features|prediction|
+----------+----+-------+-----------+--------------------+----------+
|     12669|9656|   7561|        214|[12669.0,9656.0,7...|         0|
|      7057|9810|   9568|       1762|[7057.0,9810.0,95...|         0|
|      6353|8808|   7684|       2405|[6353.0,8808.0,76...|         0|
|     13265|1196|   4221|       6404|[13265.0,1196.0,4...|         0|
|     22615|5410|   7198|       3915|[22615.0,5410.0,7...|         1|
+----------+----+-------+-----------+--------------------+----------+
only showing top 5 rows

# Group by cluster and count customers per cluster
predictions.groupBy('prediction').count().show()
+----------+-----+
|prediction|count|
+----------+-----+
|         1|   65|
|         0|  375|
+----------+-----+

Stop Spark

spark.stop()