# To suppress warnings generated by the code
def warn(*args, **kwargs):
pass
import warnings
= warn
warnings.warn 'ignore') warnings.filterwarnings(
Clustering Model in Spark - Customers
We will use SparkML to create a clustering model
Objectives
- Use PySpark to connect to a spark cluster.
- Create a spark session.
- Read a csv file into a data frame.
- Use KMeans algorithm to cluster the data
- Stop the spark session
Setup
We will be using the following libraries:
PySpark
for connecting to the Spark Cluster
Suppress Warnings
To suppress warnings generated by our code, we’ll use this code block
Import Libraries
# FindSpark simplifies the process of using Apache Spark with Python
import findspark
findspark.init()
from pyspark.sql import SparkSession
#import functions/Classes for sparkml
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
# import functions/Classes for metrics
from pyspark.ml.evaluation import RegressionEvaluator
Start Spark Session - Task 1
#Create SparkSession
#Ignore any warnings by SparkSession command
= SparkSession.builder.appName("Clustreing using SparkML").getOrCreate() spark
Data
Modified version of Wholesale customers dataset. Original dataset available at https://archive.ics.uci.edu/ml/datasets/Wholesale+customers
Seeds dataset. Available at https://archive.ics.uci.edu/ml/datasets/seeds
Download Data Locally
import wget
"https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0231EN-SkillsNetwork/datasets/customers.csv")
wget.download (
200 OK
HTTP request sent, awaiting response... 8909 (8.7K) [text/csv]
Length:
Saving to: ‘customers.csv’
100%[===================>] 8.70K --.-KB/s in 0s
customers.csv
2024-11-09 22:41:59 (31.3 MB/s) - ‘customers.csv’ saved [8909/8909]
Load CSV into SparkDF
# using the spark.read.csv function we load the data into a dataframe.
# the header = True mentions that there is a header row in out csv file
# the inferSchema = True, tells spark to automatically find out the data types of the columns.
# Load mpg dataset
= spark.read.csv("customer.csv", header=True, inferSchema=True) customer_data
View Schema
customer_data.printSchema()
root|-- Fresh_Food: integer (nullable = true)
|-- Milk: integer (nullable = true)
|-- Grocery: integer (nullable = true)
|-- Frozen_Food: integer (nullable = true)
View Data
5)
customer_data.show(
+----------+----+-------+-----------+
|Fresh_Food|Milk|Grocery|Frozen_Food|
+----------+----+-------+-----------+
|12669 |9656|7561 |214 |
|7057 |9810|9568 |1762 |
|6353 |8808|7684 |2405 |
|13265 |1196|4221 |6404 |
|22615 |5410|7198 |3915 |
+----------+----+-------+-----------+
5 rows only showing top
Create Feature Vector
You must also tell the KMeans algorithm how many clusters to create out of your data
# Assemble the features into a single vector column
= ['Fresh_Food', 'Milk', 'Grocery', 'Frozen_Food']
feature_cols = VectorAssembler(inputCols=feature_cols, outputCol="features")
assembler = assembler.transform(customer_data) customer_transformed_data
# Tell KMeans how many cluster you want
= 3 number_of_clusters
Create Clustering Model
= KMeans(k = number_of_clusters) kmeans
Train Model
= kmeans.fit(customer_transformed_data) model
Make Predictions
# Make predictions on the dataset
= model.transform(customer_transformed_data) predictions
View Predictions
# Display the results
5)
predictions.show(
+----------+----+-------+-----------+--------------------+----------+
|Fresh_Food|Milk|Grocery|Frozen_Food| features|prediction|
+----------+----+-------+-----------+--------------------+----------+
| 12669|9656| 7561| 214|[12669.0,9656.0,7...| 0|
| 7057|9810| 9568| 1762|[7057.0,9810.0,95...| 0|
| 6353|8808| 7684| 2405|[6353.0,8808.0,76...| 0|
| 13265|1196| 4221| 6404|[13265.0,1196.0,4...| 0|
| 22615|5410| 7198| 3915|[22615.0,5410.0,7...| 2|
+----------+----+-------+-----------+--------------------+----------+
5 rows only showing top
View Customers per Cluster
Group the predictions per customer
# Group by cluster and count customers per cluster
'prediction').count().show()
predictions.groupBy(
+----------+-----+
|prediction|count|
+----------+-----+
| 1| 49|
| 2| 60|
| 0| 331|
+----------+-----+
Change # of Clusters
Let’s go back and change the number of clusters to 4 to see
# Tell KMeans how many cluster you want
= 3
number_of_clusters = KMeans(k = number_of_clusters)
kmeans = kmeans.fit(customer_transformed_data)
model # Make predictions on the dataset
= model.transform(customer_transformed_data)
predictions
# Display the results
5)
predictions.show(
+----------+----+-------+-----------+--------------------+----------+
|Fresh_Food|Milk|Grocery|Frozen_Food| features|prediction|
+----------+----+-------+-----------+--------------------+----------+
| 12669|9656| 7561| 214|[12669.0,9656.0,7...| 0|
| 7057|9810| 9568| 1762|[7057.0,9810.0,95...| 0|
| 6353|8808| 7684| 2405|[6353.0,8808.0,76...| 0|
| 13265|1196| 4221| 6404|[13265.0,1196.0,4...| 0|
| 22615|5410| 7198| 3915|[22615.0,5410.0,7...| 2|
+----------+----+-------+-----------+--------------------+----------+
5 rows
only showing top
# Group by cluster and count customers per cluster
'prediction').count().show()
predictions.groupBy(
+----------+-----+
|prediction|count|
+----------+-----+
| 1| 49|
| 2| 60|
| 0| 331|
+----------+-----+
Change the number of clusters to 2
# Tell KMeans how many cluster you want
= 2
number_of_clusters = KMeans(k = number_of_clusters)
kmeans = kmeans.fit(customer_transformed_data)
model # Make predictions on the dataset
= model.transform(customer_transformed_data)
predictions
# Display the results
5)
predictions.show(+----------+----+-------+-----------+--------------------+----------+
|Fresh_Food|Milk|Grocery|Frozen_Food| features|prediction|
+----------+----+-------+-----------+--------------------+----------+
| 12669|9656| 7561| 214|[12669.0,9656.0,7...| 0|
| 7057|9810| 9568| 1762|[7057.0,9810.0,95...| 0|
| 6353|8808| 7684| 2405|[6353.0,8808.0,76...| 0|
| 13265|1196| 4221| 6404|[13265.0,1196.0,4...| 0|
| 22615|5410| 7198| 3915|[22615.0,5410.0,7...| 1|
+----------+----+-------+-----------+--------------------+----------+
5 rows
only showing top
# Group by cluster and count customers per cluster
'prediction').count().show()
predictions.groupBy(+----------+-----+
|prediction|count|
+----------+-----+
| 1| 65|
| 0| 375|
+----------+-----+
Stop Spark
spark.stop()