# To suppress warnings generated by the code
def warn(*args, **kwargs):
pass
import warnings
= warn
warnings.warn 'ignore') warnings.filterwarnings(
ML Pipeline SparkML - MPG
Objectives
- Create a machine learning pipeline.
- Add stages to the pipeline.
- Run the pipeline.
- Create a machine learning pipeline for regression.
- Create a machine learning pipeline for classification.
Setup
We will be using the following libraries:
PySpark
for connecting to the Spark Cluster
Suppress Warnings
To suppress warnings generated by our code, we’ll use this code block
Import Libraries
==3.1.2 -q
pip install pyspark-q pip install findspark
# FindSpark simplifies the process of using Apache Spark with Python
import findspark
findspark.init()
#import functions/Classes for sparkml
from pyspark.ml.regression import LinearRegression
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import StringIndexer
from pyspark.sql import SparkSession
# import functions/Classes for pipeline creation
from pyspark.ml import Pipeline
# import functions/Classes for metrics
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
Start Spark Session
#Create SparkSession
#Ignore any warnings by SparkSession command
= SparkSession.builder.appName("ML Pipeline").getOrCreate() spark
Data
- Modified version of car mileage dataset. Original dataset available at https://archive.ics.uci.edu/ml/datasets/auto+mpg
Download Data Locally
import wget
"https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0231EN-SkillsNetwork/datasets/mpg.csv")
wget.download (
8.104|:443... connected.
200 OK
HTTP request sent, awaiting response... 13891 (14K) [text/csv]
Length: .3’
Saving to: ‘mpg.csv
.3 100%[===================>] 13.57K --.-KB/s in 0s
mpg.csv
2024-11-11 18:12:21 (26.9 MB/s) - ‘mpg.csv.3’ saved [13891/13891]
Load MPG into SParkDF
= spark.read.csv("mpg.csv", header=True, inferSchema=True) mpg_data
View Schema
mpg_data.printSchema()
root|-- MPG: double (nullable = true)
|-- Cylinders: integer (nullable = true)
|-- Engine Disp: double (nullable = true)
|-- Horsepower: integer (nullable = true)
|-- Weight: integer (nullable = true)
|-- Accelerate: double (nullable = true)
|-- Year: integer (nullable = true)
|-- Origin: string (nullable = true)
View Data
5)
mpg_data.show(+----+---------+-----------+----------+------+----------+----+--------+
| MPG|Cylinders|Engine Disp|Horsepower|Weight|Accelerate|Year| Origin|
+----+---------+-----------+----------+------+----------+----+--------+
|15.0| 8| 390.0| 190| 3850| 8.5| 70|American|
|21.0| 6| 199.0| 90| 2648| 15.0| 70|American|
|18.0| 6| 199.0| 97| 2774| 15.5| 70|American|
|16.0| 8| 304.0| 150| 3433| 12.0| 70|American|
|14.0| 8| 455.0| 225| 3086| 10.0| 70|American|
+----+---------+-----------+----------+------+----------+----+--------+
5 rows only showing top
Define Pipeline Stages
# Stage 1 - assemble the input columns into a single vector
= VectorAssembler(inputCols=["Weight", "Horsepower", "Engine Disp"], outputCol="features")
vectorAssembler # Stage 2 - scale the features using standard scaler
= StandardScaler(inputCol="features", outputCol="scaledFeatures")
scaler # Stage 3 - create a linear regression instance
= LinearRegression(featuresCol="scaledFeatures", labelCol="MPG") lr
Build Pipeline
# Build the pipeline
# All the stages of the pipeline are mentioned in the order of execution.
= Pipeline(stages=[vectorAssembler, scaler, lr]) pipeline
Split Data
# Split the data into training and testing sets
= mpg_data.randomSplit([0.7, 0.3], seed=42) (trainingData, testData)
Fit Pipeline
# Fit the pipeline to the training data
# ignore any warnings. The warnings are due to the simplified settings and the security settings of the lab
= pipeline.fit(trainingData) model
Make Predictions
= model.transform(testData) predictions
RMSE
= RegressionEvaluator(labelCol="MPG", predictionCol="prediction", metricName="rmse")
evaluator = evaluator.evaluate(predictions)
rmse print("Root Mean Squared Error (RMSE) =", rmse)
= 4.69281610922322 Root Mean Squared Error (RMSE)