ML Pipeline SparkML - MPG

Objectives


  1. Create a machine learning pipeline.
  2. Add stages to the pipeline.
  3. Run the pipeline.
  4. Create a machine learning pipeline for regression.
  5. Create a machine learning pipeline for classification.

Setup


We will be using the following libraries:

  • PySpark for connecting to the Spark Cluster

Suppress Warnings

To suppress warnings generated by our code, we’ll use this code block

# To suppress warnings generated by the code
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')

Import Libraries

pip install pyspark==3.1.2 -q
pip install findspark -q
# FindSpark simplifies the process of using Apache Spark with Python
import findspark
findspark.init()

#import functions/Classes for sparkml

from pyspark.ml.regression import LinearRegression
from pyspark.ml.classification import LogisticRegression

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import StringIndexer

from pyspark.sql import SparkSession


# import functions/Classes for pipeline creation

from pyspark.ml import Pipeline

# import functions/Classes for metrics
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

Start Spark Session


#Create SparkSession
#Ignore any warnings by SparkSession command

spark = SparkSession.builder.appName("ML Pipeline").getOrCreate()

Data


Download Data Locally

import wget
wget.download ("https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0231EN-SkillsNetwork/datasets/iris.csv")

8.104|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 4612 (4.5K) [text/csv]
Saving to: ‘iris.csv’

iris.csv            100%[===================>]   4.50K  --.-KB/s    in 0s      

2024-11-11 18:19:58 (39.1 MB/s) - ‘iris.csv’ saved [4612/4612]

Load MPG into SParkDF

iris_data = spark.read.csv("iris.csv", header=True, inferSchema=True)

View Schema

iris_data.printSchema()
root
 |-- SepalLengthCm: double (nullable = true)
 |-- SepalWidthCm: double (nullable = true)
 |-- PetalLengthCm: double (nullable = true)
 |-- PetalWidthCm: double (nullable = true)
 |-- Species: string (nullable = true)

View Data

iris_data.show(5)
+-------------+------------+-------------+------------+-----------+
|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|    Species|
+-------------+------------+-------------+------------+-----------+
|          5.1|         3.5|          1.4|         0.2|Iris-setosa|
|          4.9|         3.0|          1.4|         0.2|Iris-setosa|
|          4.7|         3.2|          1.3|         0.2|Iris-setosa|
|          4.6|         3.1|          1.5|         0.2|Iris-setosa|
|          5.0|         3.6|          1.4|         0.2|Iris-setosa|
+-------------+------------+-------------+------------+-----------+
only showing top 5 rows

Define Pipeline Stages


  1. Create an indexer stage using StringIndexer that will convert the Species column into a numeric column named “label”
  2. Create a vectorAssembler stage that creates a feature vector named features using “SepalLengthCm”, “SepalWidthCm”, “PetalLengthCm”,“PetalWidthCm”
  3. Create a scaler stage that scales the features using standard scaler, name the output columns as scaledFeatures
  4. Create a logistic regression stage using featuresCol=“scaledFeatures”, labelCol=“label”
# Stage 1 - 
indexer = StringIndexer(inputCol="Species", outputCol="label")

# Stage 2 - 
vectorAssembler = VectorAssembler(inputCols=["SepalLengthCm", "SepalWidthCm", "PetalLengthCm","PetalWidthCm"], outputCol="features")

# Stage 3 - 
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")

# Stage 4 -
classifier = LogisticRegression(featuresCol="scaledFeatures", labelCol="label")

Build Pipeline


# Build the pipeline
# All the stages of the pipeline are mentioned in the order of execution.
pipeline = Pipeline(stages=[indexer,vectorAssembler, scaler, classifier])

Split Data


# Split the data into training and testing sets
(trainingData, testData) = iris_data.randomSplit([0.7, 0.3], seed=42)

Fit Pipeline


# Fit the pipeline to the training data
# ignore any warnings. The warnings are due to the simplified settings and the security settings of the lab

model = pipeline.fit(trainingData)

Make Predictions


predictions = model.transform(testData)

Accuracy

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy =", accuracy)

Accuracy = 0.9722222222222222