# To suppress warnings generated by the code
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')Import Libraries
Classification Model in Spark - Beans
We will build, train, evaluate a classification model using spark to classify varieties of beans
Objectives
- Use PySpark to connect to a spark cluster.
- Create a spark session.
- Read a csv file into a data frame.
- Split the dataset into training and testing sets.
- Use VectorAssembler to combine multiple columns into a single vector column
- Use Logistic Regression to build a classification model.
- Use metrics to evaluate the model.
- Stop the spark session
Setup
We will be using the following libraries:
- PySparkfor connecting to the Spark Cluster
Suppress Warnings
To suppress warnings generated by our code, we’ll use this code block
# Install pyspark and findspark (not needed since already satisfied)
pip install pyspark==3.1.2 -q
pip install findspark -q
# FindSpark simplifies the process of using Apache Spark with Python
import findspark
findspark.init()
from pyspark.sql import SparkSession
#import functions/Classes for sparkml
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LogisticRegression
# import functions/Classes for metrics
from pyspark.ml.evaluation import MulticlassClassificationEvaluatorStart Spark Session
#Create SparkSession
#Ignore any warnings by SparkSession command
spark = SparkSession.builder.appName("Classification using SparkML").getOrCreate()Data
- Dry Bean dataset. Available at https://archive.ics.uci.edu/ml/datasets/Dry+Bean+Dataset
- Modified version of iris dataset. Available at https://archive.ics.uci.edu/ml/datasets/Iris
Download Data Locally
import wget
wget.download ("https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0231EN-SkillsNetwork/datasets/drybeans.csv")CSV to SparkDF
# using the spark.read.csv function we load the data into a dataframe.
# the header = True mentions that there is a header row in out csv file
# the inferSchema = True, tells spark to automatically find out the data types of the columns.
# Load beans dataset
beans_data = spark.read.csv("drybeans.csv", header=True, inferSchema=True)View Schema
beans_data.printSchema()
root
 |-- Area: integer (nullable = true)
 |-- Perimeter: double (nullable = true)
 |-- MajorAxisLength: double (nullable = true)
 |-- MinorAxisLength: double (nullable = true)
 |-- AspectRation: double (nullable = true)
 |-- Eccentricity: double (nullable = true)
 |-- ConvexArea: integer (nullable = true)
 |-- EquivDiameter: double (nullable = true)
 |-- Extent: double (nullable = true)
 |-- Solidity: double (nullable = true)
 |-- roundness: double (nullable = true)
 |-- Compactness: double (nullable = true)
 |-- ShapeFactor1: double (nullable = true)
 |-- ShapeFactor2: double (nullable = true)
 |-- ShapeFactor3: double (nullable = true)
 |-- ShapeFactor4: double (nullable = true)
 |-- Class: string (nullable = true)View Data
beans_data.select(['Area','Perimeter','Solidity','roundness','Compactness','Class']).show(5)
+-----+---------+-----------+-----------+-----------+-----+
| Area|Perimeter|   Solidity|  roundness|Compactness|Class|
+-----+---------+-----------+-----------+-----------+-----+
|28395|  610.291|0.988855999|0.958027126|0.913357755|SEKER|
|28734|  638.018|0.984985603|0.887033637|0.953860842|SEKER|
|29380|   624.11|0.989558774|0.947849473|0.908774239|SEKER|
|30008|  645.884|0.976695743|0.903936374|0.928328835|SEKER|
|30140|  620.134| 0.99089325|0.984877069|0.970515523|SEKER|
+-----+---------+-----------+-----------+-----------+-----+
only showing top 5 rowsGroupby & Count Column
Let’s group the data by the column Class to see how many classes and count each class members
beans_data.groupBy('Class').count().orderBy('count').show()
+--------+-----+
|   Class|count|
+--------+-----+
|  BOMBAY|  522|
|BARBUNYA| 1322|
|    CALI| 1630|
|   HOROZ| 1928|
|   SEKER| 2027|
|    SIRA| 2636|
|DERMASON| 3546|
+--------+-----+Set Input/Output
indexer = StringIndexer(inputCol="Class", outputCol="label")Convert String/Numeric
- It’s easier to work with numeric values rather than strings so
- Let’s convert “Class” column from string to numeric
# Convert Class column from string to numerical values
beans_data = indexer.fit(beans_data).transform(beans_data)View New Columns
- Now that we transformed the Class column to outputCol = ‘label’
- Let’s run the same code to grouby and count but this time use ‘label’ instead of Class column
- As you see the column label is of numeric values from 0-6
beans_data.groupBy('label').count().orderBy('count').show()
+-----+-----+
|label|count|
+-----+-----+
|  6.0|  522|
|  5.0| 1322|
|  4.0| 1630|
|  3.0| 1928|
|  2.0| 2027|
|  1.0| 2636|
|  0.0| 3546|
+-----+-----+Identify Label/Input Columns
- We ask the VectorAssembler to group a bunch of inputCols as single column named “features”
- Use “Area”,“Perimeter”,“Solidity”,“roundness”,“Compactness” as input columns
# Prepare feature vector
assembler = VectorAssembler(inputCols=["Area","Perimeter","Solidity","roundness","Compactness"], outputCol="features")
beans_transformed_data = assembler.transform(beans_data)Preview Columns
beans_transformed_data.select("features","label").show()
+--------------------+-----+
|            features|label|
+--------------------+-----+
|[28395.0,610.291,...|  2.0|
|[28734.0,638.018,...|  2.0|
|[29380.0,624.11,0...|  2.0|
|[30008.0,645.884,...|  2.0|
|[30140.0,620.134,...|  2.0|
|[30279.0,634.927,...|  2.0|
|[30477.0,670.033,...|  2.0|
|[30519.0,629.727,...|  2.0|
|[30685.0,635.681,...|  2.0|
|[30834.0,631.934,...|  2.0|
|[30917.0,640.765,...|  2.0|
|[31091.0,638.558,...|  2.0|
|[31107.0,640.594,...|  2.0|
|[31158.0,642.626,...|  2.0|
|[31158.0,641.105,...|  2.0|
|[31178.0,636.888,...|  2.0|
|[31202.0,644.454,...|  2.0|
|[31203.0,639.782,...|  2.0|
|[31272.0,638.666,...|  2.0|
|[31335.0,635.011,...|  2.0|
+--------------------+-----+
only showing top 20 rowsSplit Data
# Split data into training and testing sets
(training_data, testing_data) = beans_transformed_data.randomSplit([0.7, 0.3], seed=42)Build Model
lr = LogisticRegression(featuresCol="features", labelCol="label")Train Model
model = lr.fit(training_data)Predict Classes
# Make predictions on testing data
predictions = model.transform(testing_data)Evaluate Model
Accuracy
# Evaluate model performance
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy =", accuracy)
# OUTPUT
Accuracy = 0.8937376725838264Precision
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
precision = evaluator.evaluate(predictions)
print("Precision =", precision)
# OUTPUT
Precision = 0.8934405575280979Recall
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")
recall = evaluator.evaluate(predictions)
print("Recall =", recall)
# OUTPUT
Recall = 0.8937376725838265F1 Score
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
f1_score = evaluator.evaluate(predictions)
print("F1 score = ", f1_score)
# OUTPUT
F1 score =  0.8933106212109942Stop SparkSession
spark.stop()