Classification Model in Spark - Beans

We will build, train, evaluate a classification model using spark to classify varieties of beans

Objectives


  1. Use PySpark to connect to a spark cluster.
  2. Create a spark session.
  3. Read a csv file into a data frame.
  4. Split the dataset into training and testing sets.
  5. Use VectorAssembler to combine multiple columns into a single vector column
  6. Use Logistic Regression to build a classification model.
  7. Use metrics to evaluate the model.
  8. Stop the spark session

Setup


We will be using the following libraries:

  • PySpark for connecting to the Spark Cluster

Suppress Warnings

To suppress warnings generated by our code, we’ll use this code block

# To suppress warnings generated by the code
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')

Import Libraries

# Install pyspark and findspark (not needed since already satisfied)
pip install pyspark==3.1.2 -q
pip install findspark -q

# FindSpark simplifies the process of using Apache Spark with Python
import findspark
findspark.init()

from pyspark.sql import SparkSession

#import functions/Classes for sparkml
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LogisticRegression

# import functions/Classes for metrics
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

Start Spark Session


#Create SparkSession
#Ignore any warnings by SparkSession command

spark = SparkSession.builder.appName("Classification using SparkML").getOrCreate()

Data


Download Data Locally

import wget
wget.download ("https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0231EN-SkillsNetwork/datasets/drybeans.csv")

CSV to SparkDF

# using the spark.read.csv function we load the data into a dataframe.
# the header = True mentions that there is a header row in out csv file
# the inferSchema = True, tells spark to automatically find out the data types of the columns.

# Load beans dataset
beans_data = spark.read.csv("drybeans.csv", header=True, inferSchema=True)

View Schema

beans_data.printSchema()

root
 |-- Area: integer (nullable = true)
 |-- Perimeter: double (nullable = true)
 |-- MajorAxisLength: double (nullable = true)
 |-- MinorAxisLength: double (nullable = true)
 |-- AspectRation: double (nullable = true)
 |-- Eccentricity: double (nullable = true)
 |-- ConvexArea: integer (nullable = true)
 |-- EquivDiameter: double (nullable = true)
 |-- Extent: double (nullable = true)
 |-- Solidity: double (nullable = true)
 |-- roundness: double (nullable = true)
 |-- Compactness: double (nullable = true)
 |-- ShapeFactor1: double (nullable = true)
 |-- ShapeFactor2: double (nullable = true)
 |-- ShapeFactor3: double (nullable = true)
 |-- ShapeFactor4: double (nullable = true)
 |-- Class: string (nullable = true)

View Data

beans_data.select(['Area','Perimeter','Solidity','roundness','Compactness','Class']).show(5)

+-----+---------+-----------+-----------+-----------+-----+
| Area|Perimeter|   Solidity|  roundness|Compactness|Class|
+-----+---------+-----------+-----------+-----------+-----+
|28395|  610.291|0.988855999|0.958027126|0.913357755|SEKER|
|28734|  638.018|0.984985603|0.887033637|0.953860842|SEKER|
|29380|   624.11|0.989558774|0.947849473|0.908774239|SEKER|
|30008|  645.884|0.976695743|0.903936374|0.928328835|SEKER|
|30140|  620.134| 0.99089325|0.984877069|0.970515523|SEKER|
+-----+---------+-----------+-----------+-----------+-----+
only showing top 5 rows

Groupby & Count Column

Let’s group the data by the column Class to see how many classes and count each class members

beans_data.groupBy('Class').count().orderBy('count').show()

+--------+-----+
|   Class|count|
+--------+-----+
|  BOMBAY|  522|
|BARBUNYA| 1322|
|    CALI| 1630|
|   HOROZ| 1928|
|   SEKER| 2027|
|    SIRA| 2636|
|DERMASON| 3546|
+--------+-----+

Set Input/Output

indexer = StringIndexer(inputCol="Class", outputCol="label")

Convert String/Numeric

  • It’s easier to work with numeric values rather than strings so
  • Let’s convert “Class” column from string to numeric
# Convert Class column from string to numerical values
beans_data = indexer.fit(beans_data).transform(beans_data)

View New Columns

  • Now that we transformed the Class column to outputCol = ‘label’
  • Let’s run the same code to grouby and count but this time use ‘label’ instead of Class column
  • As you see the column label is of numeric values from 0-6
beans_data.groupBy('label').count().orderBy('count').show()
+-----+-----+
|label|count|
+-----+-----+
|  6.0|  522|
|  5.0| 1322|
|  4.0| 1630|
|  3.0| 1928|
|  2.0| 2027|
|  1.0| 2636|
|  0.0| 3546|
+-----+-----+

Identify Label/Input Columns

  • We ask the VectorAssembler to group a bunch of inputCols as single column named “features”
  • Use “Area”,“Perimeter”,“Solidity”,“roundness”,“Compactness” as input columns
# Prepare feature vector
assembler = VectorAssembler(inputCols=["Area","Perimeter","Solidity","roundness","Compactness"], outputCol="features")
beans_transformed_data = assembler.transform(beans_data)

Preview Columns

beans_transformed_data.select("features","label").show()
+--------------------+-----+
|            features|label|
+--------------------+-----+
|[28395.0,610.291,...|  2.0|
|[28734.0,638.018,...|  2.0|
|[29380.0,624.11,0...|  2.0|
|[30008.0,645.884,...|  2.0|
|[30140.0,620.134,...|  2.0|
|[30279.0,634.927,...|  2.0|
|[30477.0,670.033,...|  2.0|
|[30519.0,629.727,...|  2.0|
|[30685.0,635.681,...|  2.0|
|[30834.0,631.934,...|  2.0|
|[30917.0,640.765,...|  2.0|
|[31091.0,638.558,...|  2.0|
|[31107.0,640.594,...|  2.0|
|[31158.0,642.626,...|  2.0|
|[31158.0,641.105,...|  2.0|
|[31178.0,636.888,...|  2.0|
|[31202.0,644.454,...|  2.0|
|[31203.0,639.782,...|  2.0|
|[31272.0,638.666,...|  2.0|
|[31335.0,635.011,...|  2.0|
+--------------------+-----+
only showing top 20 rows

Split Data

# Split data into training and testing sets
(training_data, testing_data) = beans_transformed_data.randomSplit([0.7, 0.3], seed=42)

Build Model

lr = LogisticRegression(featuresCol="features", labelCol="label")

Train Model

model = lr.fit(training_data)

Predict Classes

# Make predictions on testing data
predictions = model.transform(testing_data)

Evaluate Model

Accuracy

# Evaluate model performance
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy =", accuracy)

# OUTPUT
Accuracy = 0.8937376725838264

Precision

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
precision = evaluator.evaluate(predictions)
print("Precision =", precision)

# OUTPUT
Precision = 0.8934405575280979

Recall

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")
recall = evaluator.evaluate(predictions)
print("Recall =", recall)

# OUTPUT
Recall = 0.8937376725838265

F1 Score

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
f1_score = evaluator.evaluate(predictions)
print("F1 score = ", f1_score)

# OUTPUT
F1 score =  0.8933106212109942

Stop SparkSession

spark.stop()