# To suppress warnings generated by the code
def warn(*args, **kwargs):
pass
import warnings
= warn
warnings.warn 'ignore') warnings.filterwarnings(
ML Pipeline SparkML - MPG
Objectives
- Create a machine learning pipeline.
- Add stages to the pipeline.
- Run the pipeline.
- Create a machine learning pipeline for regression.
- Create a machine learning pipeline for classification.
Setup
We will be using the following libraries:
PySpark
for connecting to the Spark Cluster
Suppress Warnings
To suppress warnings generated by our code, we’ll use this code block
Import Libraries
==3.1.2 -q
pip install pyspark-q pip install findspark
# FindSpark simplifies the process of using Apache Spark with Python
import findspark
findspark.init()
#import functions/Classes for sparkml
from pyspark.ml.regression import LinearRegression
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import StringIndexer
from pyspark.sql import SparkSession
# import functions/Classes for pipeline creation
from pyspark.ml import Pipeline
# import functions/Classes for metrics
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
Start Spark Session
#Create SparkSession
#Ignore any warnings by SparkSession command
= SparkSession.builder.appName("ML Pipeline").getOrCreate() spark
Data
- Modified version of iris dataset. Original dataset available at https://archive.ics.uci.edu/ml/datasets/Iris
Download Data Locally
import wget
"https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0231EN-SkillsNetwork/datasets/iris.csv")
wget.download (
8.104|:443... connected.
200 OK
HTTP request sent, awaiting response... 4612 (4.5K) [text/csv]
Length:
Saving to: ‘iris.csv’
100%[===================>] 4.50K --.-KB/s in 0s
iris.csv
2024-11-11 18:19:58 (39.1 MB/s) - ‘iris.csv’ saved [4612/4612]
Load MPG into SParkDF
= spark.read.csv("iris.csv", header=True, inferSchema=True) iris_data
View Schema
iris_data.printSchema()
root|-- SepalLengthCm: double (nullable = true)
|-- SepalWidthCm: double (nullable = true)
|-- PetalLengthCm: double (nullable = true)
|-- PetalWidthCm: double (nullable = true)
|-- Species: string (nullable = true)
View Data
5)
iris_data.show(+-------------+------------+-------------+------------+-----------+
|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm| Species|
+-------------+------------+-------------+------------+-----------+
| 5.1| 3.5| 1.4| 0.2|Iris-setosa|
| 4.9| 3.0| 1.4| 0.2|Iris-setosa|
| 4.7| 3.2| 1.3| 0.2|Iris-setosa|
| 4.6| 3.1| 1.5| 0.2|Iris-setosa|
| 5.0| 3.6| 1.4| 0.2|Iris-setosa|
+-------------+------------+-------------+------------+-----------+
5 rows only showing top
Define Pipeline Stages
- Create an indexer stage using StringIndexer that will convert the Species column into a numeric column named “label”
- Create a vectorAssembler stage that creates a feature vector named features using “SepalLengthCm”, “SepalWidthCm”, “PetalLengthCm”,“PetalWidthCm”
- Create a scaler stage that scales the features using standard scaler, name the output columns as scaledFeatures
- Create a logistic regression stage using featuresCol=“scaledFeatures”, labelCol=“label”
# Stage 1 -
= StringIndexer(inputCol="Species", outputCol="label")
indexer
# Stage 2 -
= VectorAssembler(inputCols=["SepalLengthCm", "SepalWidthCm", "PetalLengthCm","PetalWidthCm"], outputCol="features")
vectorAssembler
# Stage 3 -
= StandardScaler(inputCol="features", outputCol="scaledFeatures")
scaler
# Stage 4 -
= LogisticRegression(featuresCol="scaledFeatures", labelCol="label") classifier
Build Pipeline
# Build the pipeline
# All the stages of the pipeline are mentioned in the order of execution.
= Pipeline(stages=[indexer,vectorAssembler, scaler, classifier]) pipeline
Split Data
# Split the data into training and testing sets
= iris_data.randomSplit([0.7, 0.3], seed=42) (trainingData, testData)
Fit Pipeline
# Fit the pipeline to the training data
# ignore any warnings. The warnings are due to the simplified settings and the security settings of the lab
= pipeline.fit(trainingData) model
Make Predictions
= model.transform(testData) predictions
Accuracy
= MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
evaluator = evaluator.evaluate(predictions)
accuracy print("Accuracy =", accuracy)
= 0.9722222222222222 Accuracy