# To suppress warnings generated by the code
def warn(*args, **kwargs):
pass
import warnings
= warn
warnings.warn 'ignore') warnings.filterwarnings(
Import Libraries
Clustering Model in Spark - Seeds
We will use SparkML to create a clustering model
Objectives
- Use PySpark to connect to a spark cluster.
- Create a spark session.
- Read a csv file into a data frame.
- Use KMeans algorithm to cluster the data
- Stop the spark session
Setup
We will be using the following libraries:
PySpark
for connecting to the Spark Cluster
Suppress Warnings
To suppress warnings generated by our code, we’ll use this code block
# FindSpark simplifies the process of using Apache Spark with Python
import findspark
findspark.init()
from pyspark.sql import SparkSession
#import functions/Classes for sparkml
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
# import functions/Classes for metrics
from pyspark.ml.evaluation import RegressionEvaluator
Start Spark Session - Task 1
#Create SparkSession
#Ignore any warnings by SparkSession command
= SparkSession.builder.appName("Clustreing using SparkML").getOrCreate() spark
Data
- Seeds dataset. Available at https://archive.ics.uci.edu/ml/datasets/seeds
Download Data Locally
import wget
"https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0231EN-SkillsNetwork/datasets/seeds.csv")
wget.download (
8.104|:443... connected.
200 OK
HTTP request sent, awaiting response... 8973 (8.8K) [text/csv]
Length:
Saving to: ‘seeds.csv’
100%[===================>] 8.76K --.-KB/s in 0s
seeds.csv
2024-11-09 22:55:23 (32.2 MB/s) - ‘seeds.csv’ saved [8973/8973]
Load CSV into SparkDF
# using the spark.read.csv function we load the data into a dataframe.
# the header = True mentions that there is a header row in out csv file
# the inferSchema = True, tells spark to automatically find out the data types of the columns.
# Load mpg dataset
= spark.read.csv("seeds.csv", header=True, inferSchema=True) seed_data
View Schema
seed_data.printSchema()
root|-- area: double (nullable = true)
|-- perimeter: double (nullable = true)
|-- compactness: double (nullable = true)
|-- length of kernel: double (nullable = true)
|-- width of kernel: double (nullable = true)
|-- asymmetry coefficient: double (nullable = true)
|-- length of kernel groove: double (nullable = true)
View Data
=5, truncate=False, vertical=True)
seed_data.show(n
-RECORD 0-------------------------
| 15.26
area | 14.84
perimeter | 0.871
compactness | 5.763
length of kernel | 3.312
width of kernel | 2.221
asymmetry coefficient | 5.22
length of kernel groove -RECORD 1-------------------------
| 14.88
area | 14.57
perimeter | 0.8811
compactness | 5.554
length of kernel | 3.333
width of kernel | 1.018
asymmetry coefficient | 4.956
length of kernel groove -RECORD 2-------------------------
| 14.29
area | 14.09
perimeter | 0.905
compactness | 5.291
length of kernel | 3.337
width of kernel | 2.699
asymmetry coefficient | 4.825
length of kernel groove -RECORD 3-------------------------
| 13.84
area | 13.94
perimeter | 0.8955
compactness | 5.324
length of kernel | 3.379
width of kernel | 2.259
asymmetry coefficient | 4.805
length of kernel groove -RECORD 4-------------------------
| 16.14
area | 14.99
perimeter | 0.9034
compactness | 5.658
length of kernel | 3.562
width of kernel | 1.355
asymmetry coefficient | 5.175
length of kernel groove 5 rows
only showing top
# OR
5) seed_data.show(
Create Feature Vector
You must also tell the KMeans algorithm how many clusters to create out of your data
# Assemble the features into a single vector column
= ['area',
feature_cols 'perimeter',
'compactness',
'length of kernel',
'width of kernel',
'asymmetry coefficient',
'length of kernel groove']
= VectorAssembler(inputCols=feature_cols, outputCol="features")
assembler = assembler.transform(seed_data) seed_transformed_data
# Tell KMeans how many cluster you want
= 3 number_of_clusters
Create Clustering Model
= KMeans(k = number_of_clusters) kmeans
Train Model
= kmeans.fit(seed_transformed_data) model
Make Predictions
# Make predictions on the dataset
= model.transform(seed_transformed_data) predictions
View Predictions
# View Predictions results
=5, truncate=False, vertical=True)
predictions.show(n
-RECORD 0---------------------------------------------------------------
| 15.26
area | 14.84
perimeter | 0.871
compactness | 5.763
length of kernel | 3.312
width of kernel | 2.221
asymmetry coefficient | 5.22
length of kernel groove | [15.26,14.84,0.871,5.763,3.312,2.221,5.22]
features | 0
prediction -RECORD 1---------------------------------------------------------------
| 14.88
area | 14.57
perimeter | 0.8811
compactness | 5.554
length of kernel | 3.333
width of kernel | 1.018
asymmetry coefficient | 4.956
length of kernel groove | [14.88,14.57,0.8811,5.554,3.333,1.018,4.956]
features | 0
prediction -RECORD 2---------------------------------------------------------------
| 14.29
area | 14.09
perimeter | 0.905
compactness | 5.291
length of kernel | 3.337
width of kernel | 2.699
asymmetry coefficient | 4.825
length of kernel groove | [14.29,14.09,0.905,5.291,3.337,2.699,4.825]
features | 0
prediction -RECORD 3---------------------------------------------------------------
| 13.84
area | 13.94
perimeter | 0.8955
compactness | 5.324
length of kernel | 3.379
width of kernel | 2.259
asymmetry coefficient | 4.805
length of kernel groove | [13.84,13.94,0.8955,5.324,3.379,2.259,4.805]
features | 0
prediction -RECORD 4---------------------------------------------------------------
| 16.14
area | 14.99
perimeter | 0.9034
compactness | 5.658
length of kernel | 3.562
width of kernel | 1.355
asymmetry coefficient | 5.175
length of kernel groove | [16.14,14.99,0.9034,5.658,3.562,1.355,5.175]
features | 0
prediction 5 rows only showing top
View Customers per Cluster
Group the predictions per customer
# Group by cluster and count customers per cluster
'prediction').count().show()
predictions.groupBy(
+----------+-----+
|prediction|count|
+----------+-----+
| 1| 82|
| 2| 61|
| 0| 67|
+----------+-----+
Stop Spark
spark.stop()