# To suppress warnings generated by the code
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')Import Libraries
Clustering Model in Spark - Seeds
We will use SparkML to create a clustering model
Objectives
- Use PySpark to connect to a spark cluster.
- Create a spark session.
- Read a csv file into a data frame.
- Use KMeans algorithm to cluster the data
- Stop the spark session
Setup
We will be using the following libraries:
PySparkfor connecting to the Spark Cluster
Suppress Warnings
To suppress warnings generated by our code, we’ll use this code block
# FindSpark simplifies the process of using Apache Spark with Python
import findspark
findspark.init()
from pyspark.sql import SparkSession
#import functions/Classes for sparkml
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
# import functions/Classes for metrics
from pyspark.ml.evaluation import RegressionEvaluatorStart Spark Session - Task 1
#Create SparkSession
#Ignore any warnings by SparkSession command
spark = SparkSession.builder.appName("Clustreing using SparkML").getOrCreate()Data
- Seeds dataset. Available at https://archive.ics.uci.edu/ml/datasets/seeds
Download Data Locally
import wget
wget.download ("https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0231EN-SkillsNetwork/datasets/seeds.csv")
8.104|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 8973 (8.8K) [text/csv]
Saving to: ‘seeds.csv’
seeds.csv 100%[===================>] 8.76K --.-KB/s in 0s
2024-11-09 22:55:23 (32.2 MB/s) - ‘seeds.csv’ saved [8973/8973]Load CSV into SparkDF
# using the spark.read.csv function we load the data into a dataframe.
# the header = True mentions that there is a header row in out csv file
# the inferSchema = True, tells spark to automatically find out the data types of the columns.
# Load mpg dataset
seed_data = spark.read.csv("seeds.csv", header=True, inferSchema=True)View Schema
seed_data.printSchema()
root
|-- area: double (nullable = true)
|-- perimeter: double (nullable = true)
|-- compactness: double (nullable = true)
|-- length of kernel: double (nullable = true)
|-- width of kernel: double (nullable = true)
|-- asymmetry coefficient: double (nullable = true)
|-- length of kernel groove: double (nullable = true)View Data
seed_data.show(n=5, truncate=False, vertical=True)
-RECORD 0-------------------------
area | 15.26
perimeter | 14.84
compactness | 0.871
length of kernel | 5.763
width of kernel | 3.312
asymmetry coefficient | 2.221
length of kernel groove | 5.22
-RECORD 1-------------------------
area | 14.88
perimeter | 14.57
compactness | 0.8811
length of kernel | 5.554
width of kernel | 3.333
asymmetry coefficient | 1.018
length of kernel groove | 4.956
-RECORD 2-------------------------
area | 14.29
perimeter | 14.09
compactness | 0.905
length of kernel | 5.291
width of kernel | 3.337
asymmetry coefficient | 2.699
length of kernel groove | 4.825
-RECORD 3-------------------------
area | 13.84
perimeter | 13.94
compactness | 0.8955
length of kernel | 5.324
width of kernel | 3.379
asymmetry coefficient | 2.259
length of kernel groove | 4.805
-RECORD 4-------------------------
area | 16.14
perimeter | 14.99
compactness | 0.9034
length of kernel | 5.658
width of kernel | 3.562
asymmetry coefficient | 1.355
length of kernel groove | 5.175
only showing top 5 rows
# OR
seed_data.show(5)
Create Feature Vector
You must also tell the KMeans algorithm how many clusters to create out of your data
# Assemble the features into a single vector column
feature_cols = ['area',
'perimeter',
'compactness',
'length of kernel',
'width of kernel',
'asymmetry coefficient',
'length of kernel groove']
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
seed_transformed_data = assembler.transform(seed_data)# Tell KMeans how many cluster you want
number_of_clusters = 3Create Clustering Model
kmeans = KMeans(k = number_of_clusters)Train Model
model = kmeans.fit(seed_transformed_data)Make Predictions
# Make predictions on the dataset
predictions = model.transform(seed_transformed_data)View Predictions
# View Predictions results
predictions.show(n=5, truncate=False, vertical=True)
-RECORD 0---------------------------------------------------------------
area | 15.26
perimeter | 14.84
compactness | 0.871
length of kernel | 5.763
width of kernel | 3.312
asymmetry coefficient | 2.221
length of kernel groove | 5.22
features | [15.26,14.84,0.871,5.763,3.312,2.221,5.22]
prediction | 0
-RECORD 1---------------------------------------------------------------
area | 14.88
perimeter | 14.57
compactness | 0.8811
length of kernel | 5.554
width of kernel | 3.333
asymmetry coefficient | 1.018
length of kernel groove | 4.956
features | [14.88,14.57,0.8811,5.554,3.333,1.018,4.956]
prediction | 0
-RECORD 2---------------------------------------------------------------
area | 14.29
perimeter | 14.09
compactness | 0.905
length of kernel | 5.291
width of kernel | 3.337
asymmetry coefficient | 2.699
length of kernel groove | 4.825
features | [14.29,14.09,0.905,5.291,3.337,2.699,4.825]
prediction | 0
-RECORD 3---------------------------------------------------------------
area | 13.84
perimeter | 13.94
compactness | 0.8955
length of kernel | 5.324
width of kernel | 3.379
asymmetry coefficient | 2.259
length of kernel groove | 4.805
features | [13.84,13.94,0.8955,5.324,3.379,2.259,4.805]
prediction | 0
-RECORD 4---------------------------------------------------------------
area | 16.14
perimeter | 14.99
compactness | 0.9034
length of kernel | 5.658
width of kernel | 3.562
asymmetry coefficient | 1.355
length of kernel groove | 5.175
features | [16.14,14.99,0.9034,5.658,3.562,1.355,5.175]
prediction | 0
only showing top 5 rowsView Customers per Cluster
Group the predictions per customer
# Group by cluster and count customers per cluster
predictions.groupBy('prediction').count().show()
+----------+-----+
|prediction|count|
+----------+-----+
| 1| 82|
| 2| 61|
| 0| 67|
+----------+-----+Stop Spark
spark.stop()