# To suppress warnings generated by the code
def warn(*args, **kwargs):
pass
import warnings
= warn
warnings.warn 'ignore') warnings.filterwarnings(
Classification Model in Spark - Beans
We will build, train, evaluate a classification model using spark to classify varieties of beans
Objectives
- Use PySpark to connect to a spark cluster.
- Create a spark session.
- Read a csv file into a data frame.
- Split the dataset into training and testing sets.
- Use VectorAssembler to combine multiple columns into a single vector column
- Use Logistic Regression to build a classification model.
- Use metrics to evaluate the model.
- Stop the spark session
Setup
We will be using the following libraries:
PySpark
for connecting to the Spark Cluster
Suppress Warnings
To suppress warnings generated by our code, we’ll use this code block
Import Libraries
# Install pyspark and findspark (not needed since already satisfied)
==3.1.2 -q
pip install pyspark-q
pip install findspark
# FindSpark simplifies the process of using Apache Spark with Python
import findspark
findspark.init()
from pyspark.sql import SparkSession
#import functions/Classes for sparkml
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LogisticRegression
# import functions/Classes for metrics
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
Start Spark Session
#Create SparkSession
#Ignore any warnings by SparkSession command
= SparkSession.builder.appName("Classification using SparkML").getOrCreate() spark
Data
- Dry Bean dataset. Available at https://archive.ics.uci.edu/ml/datasets/Dry+Bean+Dataset
- Modified version of iris dataset. Available at https://archive.ics.uci.edu/ml/datasets/Iris
Download Data Locally
import wget
"https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0231EN-SkillsNetwork/datasets/drybeans.csv") wget.download (
CSV to SparkDF
# using the spark.read.csv function we load the data into a dataframe.
# the header = True mentions that there is a header row in out csv file
# the inferSchema = True, tells spark to automatically find out the data types of the columns.
# Load beans dataset
= spark.read.csv("drybeans.csv", header=True, inferSchema=True) beans_data
View Schema
beans_data.printSchema()
root|-- Area: integer (nullable = true)
|-- Perimeter: double (nullable = true)
|-- MajorAxisLength: double (nullable = true)
|-- MinorAxisLength: double (nullable = true)
|-- AspectRation: double (nullable = true)
|-- Eccentricity: double (nullable = true)
|-- ConvexArea: integer (nullable = true)
|-- EquivDiameter: double (nullable = true)
|-- Extent: double (nullable = true)
|-- Solidity: double (nullable = true)
|-- roundness: double (nullable = true)
|-- Compactness: double (nullable = true)
|-- ShapeFactor1: double (nullable = true)
|-- ShapeFactor2: double (nullable = true)
|-- ShapeFactor3: double (nullable = true)
|-- ShapeFactor4: double (nullable = true)
|-- Class: string (nullable = true)
View Data
'Area','Perimeter','Solidity','roundness','Compactness','Class']).show(5)
beans_data.select([
+-----+---------+-----------+-----------+-----------+-----+
| Area|Perimeter| Solidity| roundness|Compactness|Class|
+-----+---------+-----------+-----------+-----------+-----+
|28395| 610.291|0.988855999|0.958027126|0.913357755|SEKER|
|28734| 638.018|0.984985603|0.887033637|0.953860842|SEKER|
|29380| 624.11|0.989558774|0.947849473|0.908774239|SEKER|
|30008| 645.884|0.976695743|0.903936374|0.928328835|SEKER|
|30140| 620.134| 0.99089325|0.984877069|0.970515523|SEKER|
+-----+---------+-----------+-----------+-----------+-----+
5 rows only showing top
Groupby & Count Column
Let’s group the data by the column Class to see how many classes and count each class members
'Class').count().orderBy('count').show()
beans_data.groupBy(
+--------+-----+
| Class|count|
+--------+-----+
| BOMBAY| 522|
|BARBUNYA| 1322|
| CALI| 1630|
| HOROZ| 1928|
| SEKER| 2027|
| SIRA| 2636|
|DERMASON| 3546|
+--------+-----+
Set Input/Output
= StringIndexer(inputCol="Class", outputCol="label") indexer
Convert String/Numeric
- It’s easier to work with numeric values rather than strings so
- Let’s convert “Class” column from string to numeric
# Convert Class column from string to numerical values
= indexer.fit(beans_data).transform(beans_data) beans_data
View New Columns
- Now that we transformed the Class column to outputCol = ‘label’
- Let’s run the same code to grouby and count but this time use ‘label’ instead of Class column
- As you see the column label is of numeric values from 0-6
'label').count().orderBy('count').show()
beans_data.groupBy(+-----+-----+
|label|count|
+-----+-----+
| 6.0| 522|
| 5.0| 1322|
| 4.0| 1630|
| 3.0| 1928|
| 2.0| 2027|
| 1.0| 2636|
| 0.0| 3546|
+-----+-----+
Identify Label/Input Columns
- We ask the VectorAssembler to group a bunch of inputCols as single column named “features”
- Use “Area”,“Perimeter”,“Solidity”,“roundness”,“Compactness” as input columns
# Prepare feature vector
= VectorAssembler(inputCols=["Area","Perimeter","Solidity","roundness","Compactness"], outputCol="features")
assembler = assembler.transform(beans_data) beans_transformed_data
Preview Columns
"features","label").show()
beans_transformed_data.select(+--------------------+-----+
| features|label|
+--------------------+-----+
|[28395.0,610.291,...| 2.0|
|[28734.0,638.018,...| 2.0|
|[29380.0,624.11,0...| 2.0|
|[30008.0,645.884,...| 2.0|
|[30140.0,620.134,...| 2.0|
|[30279.0,634.927,...| 2.0|
|[30477.0,670.033,...| 2.0|
|[30519.0,629.727,...| 2.0|
|[30685.0,635.681,...| 2.0|
|[30834.0,631.934,...| 2.0|
|[30917.0,640.765,...| 2.0|
|[31091.0,638.558,...| 2.0|
|[31107.0,640.594,...| 2.0|
|[31158.0,642.626,...| 2.0|
|[31158.0,641.105,...| 2.0|
|[31178.0,636.888,...| 2.0|
|[31202.0,644.454,...| 2.0|
|[31203.0,639.782,...| 2.0|
|[31272.0,638.666,...| 2.0|
|[31335.0,635.011,...| 2.0|
+--------------------+-----+
20 rows only showing top
Split Data
# Split data into training and testing sets
= beans_transformed_data.randomSplit([0.7, 0.3], seed=42) (training_data, testing_data)
Build Model
= LogisticRegression(featuresCol="features", labelCol="label") lr
Train Model
= lr.fit(training_data) model
Predict Classes
# Make predictions on testing data
= model.transform(testing_data) predictions
Evaluate Model
Accuracy
# Evaluate model performance
= MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
evaluator = evaluator.evaluate(predictions)
accuracy print("Accuracy =", accuracy)
# OUTPUT
= 0.8937376725838264 Accuracy
Precision
= MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
evaluator = evaluator.evaluate(predictions)
precision print("Precision =", precision)
# OUTPUT
= 0.8934405575280979 Precision
Recall
= MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")
evaluator = evaluator.evaluate(predictions)
recall print("Recall =", recall)
# OUTPUT
= 0.8937376725838265 Recall
F1 Score
= MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
evaluator = evaluator.evaluate(predictions)
f1_score print("F1 score = ", f1_score)
# OUTPUT
= 0.8933106212109942 F1 score
Stop SparkSession
spark.stop()