Clustering Model in Spark - Seeds

We will use SparkML to create a clustering model

Objectives


  1. Use PySpark to connect to a spark cluster.
  2. Create a spark session.
  3. Read a csv file into a data frame.
  4. Use KMeans algorithm to cluster the data
  5. Stop the spark session

Setup


We will be using the following libraries:

  • PySpark for connecting to the Spark Cluster

Suppress Warnings

To suppress warnings generated by our code, we’ll use this code block

# To suppress warnings generated by the code
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')

Import Libraries

# FindSpark simplifies the process of using Apache Spark with Python
import findspark
findspark.init()

from pyspark.sql import SparkSession

#import functions/Classes for sparkml
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans

# import functions/Classes for metrics
from pyspark.ml.evaluation import RegressionEvaluator

Start Spark Session - Task 1


#Create SparkSession
#Ignore any warnings by SparkSession command

spark = SparkSession.builder.appName("Clustreing using SparkML").getOrCreate()

Data


Download Data Locally

import wget
wget.download ("https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0231EN-SkillsNetwork/datasets/seeds.csv")

8.104|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 8973 (8.8K) [text/csv]
Saving to: ‘seeds.csv’

seeds.csv           100%[===================>]   8.76K  --.-KB/s    in 0s      

2024-11-09 22:55:23 (32.2 MB/s) - ‘seeds.csv’ saved [8973/8973]

Load CSV into SparkDF

# using the spark.read.csv function we load the data into a dataframe.
# the header = True mentions that there is a header row in out csv file
# the inferSchema = True, tells spark to automatically find out the data types of the columns.

# Load mpg dataset
seed_data = spark.read.csv("seeds.csv", header=True, inferSchema=True)

View Schema

seed_data.printSchema()

root
 |-- area: double (nullable = true)
 |-- perimeter: double (nullable = true)
 |-- compactness: double (nullable = true)
 |-- length of kernel: double (nullable = true)
 |-- width of kernel: double (nullable = true)
 |-- asymmetry coefficient: double (nullable = true)
 |-- length of kernel groove: double (nullable = true)

View Data

seed_data.show(n=5, truncate=False, vertical=True)

-RECORD 0-------------------------
 area                    | 15.26  
 perimeter               | 14.84  
 compactness             | 0.871  
 length of kernel        | 5.763  
 width of kernel         | 3.312  
 asymmetry coefficient   | 2.221  
 length of kernel groove | 5.22   
-RECORD 1-------------------------
 area                    | 14.88  
 perimeter               | 14.57  
 compactness             | 0.8811 
 length of kernel        | 5.554  
 width of kernel         | 3.333  
 asymmetry coefficient   | 1.018  
 length of kernel groove | 4.956  
-RECORD 2-------------------------
 area                    | 14.29  
 perimeter               | 14.09  
 compactness             | 0.905  
 length of kernel        | 5.291  
 width of kernel         | 3.337  
 asymmetry coefficient   | 2.699  
 length of kernel groove | 4.825  
-RECORD 3-------------------------
 area                    | 13.84  
 perimeter               | 13.94  
 compactness             | 0.8955 
 length of kernel        | 5.324  
 width of kernel         | 3.379  
 asymmetry coefficient   | 2.259  
 length of kernel groove | 4.805  
-RECORD 4-------------------------
 area                    | 16.14  
 perimeter               | 14.99  
 compactness             | 0.9034 
 length of kernel        | 5.658  
 width of kernel         | 3.562  
 asymmetry coefficient   | 1.355  
 length of kernel groove | 5.175  
only showing top 5 rows

# OR
seed_data.show(5)

Create Feature Vector


You must also tell the KMeans algorithm how many clusters to create out of your data

# Assemble the features into a single vector column
feature_cols = ['area',
 'perimeter',
 'compactness',
 'length of kernel',
 'width of kernel',
 'asymmetry coefficient',
 'length of kernel groove']

assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
seed_transformed_data = assembler.transform(seed_data)
# Tell KMeans how many cluster you want
number_of_clusters = 3

Create Clustering Model


kmeans = KMeans(k = number_of_clusters)

Train Model

model = kmeans.fit(seed_transformed_data)

Make Predictions


# Make predictions on the dataset
predictions = model.transform(seed_transformed_data)

View Predictions

# View Predictions results
predictions.show(n=5, truncate=False, vertical=True)

-RECORD 0---------------------------------------------------------------
 area                    | 15.26                                        
 perimeter               | 14.84                                        
 compactness             | 0.871                                        
 length of kernel        | 5.763                                        
 width of kernel         | 3.312                                        
 asymmetry coefficient   | 2.221                                        
 length of kernel groove | 5.22                                         
 features                | [15.26,14.84,0.871,5.763,3.312,2.221,5.22]   
 prediction              | 0                                            
-RECORD 1---------------------------------------------------------------
 area                    | 14.88                                        
 perimeter               | 14.57                                        
 compactness             | 0.8811                                       
 length of kernel        | 5.554                                        
 width of kernel         | 3.333                                        
 asymmetry coefficient   | 1.018                                        
 length of kernel groove | 4.956                                        
 features                | [14.88,14.57,0.8811,5.554,3.333,1.018,4.956] 
 prediction              | 0                                            
-RECORD 2---------------------------------------------------------------
 area                    | 14.29                                        
 perimeter               | 14.09                                        
 compactness             | 0.905                                        
 length of kernel        | 5.291                                        
 width of kernel         | 3.337                                        
 asymmetry coefficient   | 2.699                                        
 length of kernel groove | 4.825                                        
 features                | [14.29,14.09,0.905,5.291,3.337,2.699,4.825]  
 prediction              | 0                                            
-RECORD 3---------------------------------------------------------------
 area                    | 13.84                                        
 perimeter               | 13.94                                        
 compactness             | 0.8955                                       
 length of kernel        | 5.324                                        
 width of kernel         | 3.379                                        
 asymmetry coefficient   | 2.259                                        
 length of kernel groove | 4.805                                        
 features                | [13.84,13.94,0.8955,5.324,3.379,2.259,4.805] 
 prediction              | 0                                            
-RECORD 4---------------------------------------------------------------
 area                    | 16.14                                        
 perimeter               | 14.99                                        
 compactness             | 0.9034                                       
 length of kernel        | 5.658                                        
 width of kernel         | 3.562                                        
 asymmetry coefficient   | 1.355                                        
 length of kernel groove | 5.175                                        
 features                | [16.14,14.99,0.9034,5.658,3.562,1.355,5.175] 
 prediction              | 0                                            
only showing top 5 rows

View Customers per Cluster

Group the predictions per customer

# Group by cluster and count customers per cluster
predictions.groupBy('prediction').count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|         1|   82|
|         2|   61|
|         0|   67|
+----------+-----+

Stop Spark

spark.stop()