# To suppress warnings generated by the code
def warn(*args, **kwargs):
pass
import warnings
= warn
warnings.warn 'ignore') warnings.filterwarnings(
Import Libraries
Pipeline LN Model - MPG
We will create a pipeline for a linear regression model that will predict the MPG from ‘Cylinders’,‘Engine_Disp’,‘Horsepower’,‘Weight’,‘Accelerate’,‘Year’
Objectives
Building this pipeline will be a 4 part project:
Part 1 ETL
- Load a csv dataset
- Remove duplicates if any
- Drop rows with null values if any
- Make transformations
- Store the cleaned data in parquet format
Part 2 Machine Learning Pipeline creation
- Create a machine learning pipeline for prediction
Part 3 Model evaluation
- Evaluate the model using metrics
- Print the intercept and the coefficients
Part 4 Model Persistance
- Cave the model for future production use
- Load and verify the stored model
Setup
We will be using the following libraries:
PySpark
for connecting to the Spark Cluster
Suppress Warnings
To suppress warnings generated by our code, we’ll use this code block
==3.1.2 -q
pip install pyspark-q pip install findspark
# FindSpark simplifies the process of using Apache Spark with Python
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.pipeline import PipelineModel
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import StandardScaler
Start Spark Session
#Create SparkSession
#Ignore any warnings by SparkSession command
= SparkSession.builder.appName("Project").getOrCreate() spark
Data
- Modified version of car mileage dataset. Original dataset available at https://archive.ics.uci.edu/ml/datasets/auto+mpg
Download Data Locally
import wget
"https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/datasets/mpg-raw.csv")
wget.download (
-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)|169.63.118.104|:443... connected.
Connecting to cf200 OK
HTTP request sent, awaiting response... 14354 (14K) [text/csv]
Length: -raw.csv’
Saving to: ‘mpg
-raw.csv 100%[===================>] 14.02K --.-KB/s in 0.001s
mpg
2024-11-12 15:49:36 (27.0 MB/s) - ‘mpg-raw.csv’ saved [14354/14354]
Load into SparkDF
# using the spark.read.csv function we load the data into a dataframe.
# the header = True mentions that there is a header row in out csv file
# the inferSchema = True, tells spark to automatically find out the data types of the columns.
# Load mpg dataset
= spark.read.csv("mpg-raw.csv", header=True, inferSchema=True)
df
df.printSchema()
root|-- MPG: double (nullable = true)
|-- Cylinders: integer (nullable = true)
|-- Engine Disp: double (nullable = true)
|-- Horsepower: integer (nullable = true)
|-- Weight: integer (nullable = true)
|-- Accelerate: double (nullable = true)
|-- Year: integer (nullable = true)
|-- Origin: string (nullable = true)
5)
df.show(
+----+---------+-----------+----------+------+----------+----+--------+
| MPG|Cylinders|Engine Disp|Horsepower|Weight|Accelerate|Year| Origin|
+----+---------+-----------+----------+------+----------+----+--------+
|46.6| 4| 86.0| 65| 2110| 17.9| 80|Japanese|
|44.6| 4| 91.0| 67| 1850| 13.8| 80|Japanese|
|44.3| 4| 90.0| 48| 2085| 21.7| 80|European|
|44.0| 4| 97.0| 52| 2130| 24.6| 82|European|
|43.4| 4| 90.0| 48| 2335| 23.7| 80|European|
+----+---------+-----------+----------+------+----------+----+--------+
5 rows only showing top
ETL - Part 1
Count Cars/Origin
'Origin').count().orderBy('count').show()
df.groupBy(
+--------+-----+
| Origin|count|
+--------+-----+
| null| 1|
|European| 70|
|Japanese| 88|
|American| 247|
+--------+-----+
Total # of Rows
= df.count()
rowcount1 print(rowcount1)
406
Drop Duplicate Rows
= df.dropDuplicates() df
Recount Rows
= df.count()
rowcount2 print(rowcount2)
392
Drop NA rows
= df.dropna() df
Recount
= df.count()
rowcount3 print(rowcount3)
385
Rename Column
- Rename from Engine Disp to Engine_Disp
= df.withColumnRenamed("Engine Disp","Engine_Disp")
df 5)
df.show(+----+---------+-----------+----------+------+----------+----+--------+
| MPG|Cylinders|Engine_Disp|Horsepower|Weight|Accelerate|Year| Origin|
+----+---------+-----------+----------+------+----------+----+--------+
|24.0| 4| 134.0| 96| 2702| 13.5| 75|Japanese|
|18.0| 6| 250.0| 88| 3139| 14.5| 71|American|
|29.0| 4| 68.0| 49| 1867| 19.5| 73|European|
|22.4| 6| 231.0| 110| 3415| 15.8| 81|American|
|20.5| 6| 231.0| 105| 3425| 16.9| 77|American|
+----+---------+-----------+----------+------+----------+----+--------+
5 rows only showing top
Save DF to Parquet file
"overwrite").parquet("mpg-cleaned.parquet") df.write.mode(
Evaluate Section
Run the code cell below. If the code throws up any errors, go back and review the code you have written.
print("Part 1 - Evaluation")
print("Total rows = ", rowcount1)
print("Total rows after dropping duplicate rows = ", rowcount2)
print("Total rows after dropping duplicate rows and rows with null values = ", rowcount3)
print("Renamed column name = ", df.columns[2])
import os
print("mpg-cleaned.parquet exists :", os.path.isdir("mpg-cleaned.parquet"))
1 - Evaluation
Part = 406
Total rows = 392
Total rows after dropping duplicate rows and rows with null values = 385
Total rows after dropping duplicate rows = Engine_Disp
Renamed column name -cleaned.parquet exists : True mpg
ML Pipeline Creation - Part 2
Parquet to DF
Load data from “mpg-cleaned.parquet” into a dataframe
= spark.read.parquet("mpg-cleaned.parquet")
df = df.count()
rowcount4 print(rowcount4)
385
Show Data & Schema
5)
df.show(+----+---------+-----------+----------+------+----------+----+--------+
| MPG|Cylinders|Engine_Disp|Horsepower|Weight|Accelerate|Year| Origin|
+----+---------+-----------+----------+------+----------+----+--------+
|32.2| 4| 108.0| 75| 2265| 15.2| 80|Japanese|
|28.0| 4| 107.0| 86| 2464| 15.5| 76|European|
|26.0| 4| 156.0| 92| 2585| 14.5| 82|American|
|25.0| 4| 104.0| 95| 2375| 17.5| 70|European|
|25.0| 4| 140.0| 75| 2542| 17.0| 74|American|
+----+---------+-----------+----------+------+----------+----+--------+
5 rows
only showing top
df.printSchema()
root|-- MPG: double (nullable = true)
|-- Cylinders: integer (nullable = true)
|-- Engine_Disp: double (nullable = true)
|-- Horsepower: integer (nullable = true)
|-- Weight: integer (nullable = true)
|-- Accelerate: double (nullable = true)
|-- Year: integer (nullable = true)
|-- Origin: string (nullable = true)
Define StringIndexer Stage
Convert string col “Origin” into “OriginIndex”
# Stage - 1 Using StringIndexer convert the string column "Origin" into "OriginIndex"
= StringIndexer(inputCol="Origin", outputCol="OriginIndex") indexer
Define VectorAssembler Stage
Assemble the input columns ‘Cylinders’,‘Engine_Disp’,‘Horsepower’,‘Weight’,‘Accelerate’,‘Year’ into a single column “features”
# Stage - 2 Assemble the input columns 'Cylinders','Engine_Disp','Horsepower','Weight','Accelerate','Year' into a single column "features"
= VectorAssembler(inputCols=['Cylinders','Engine_Disp','Horsepower','Weight','Accelerate','Year'], outputCol="features") assembler
Define StandardScaler Stage
Scale the “features” using standard scaler and store in “scaledFeatures” column
# Stage 3 - scale the "features" using standard scaler and store in "scaledFeatures" column
= StandardScaler(inputCol="features", outputCol="scaledFeatures") scaler
Define Model Creation Stage
Create a Linear Regression stage to predict MPG
#Stage 4 - Create a LinearRegression stage to predict "MPG"
= LinearRegression(featuresCol="scaledFeatures", labelCol="MPG") lr
Build Pipeline
# Build the pipeline
# All the stages of the pipeline are mentioned in the order of execution.
= Pipeline(stages=[indexer, assembler, scaler, lr]) pipeline
Split Data
# Split the data into training and testing sets with 70:30 split. Use 42 as seed
= df.randomSplit([0.7, 0.3], seed=42) (trainingData, testingData)
Fit Pipeline
# Fit the pipeline to the training data
= pipeline.fit(trainingData) pipelinemodel
Evaluate Section
Run the code cell below. If the code throws up any errors, go back and review the code you have written.
print("Part 2 - Evaluation")
print("Total rows = ", rowcount4)
= [str(x).split("_")[0] for x in pipeline.getStages()]
ps
print("Pipeline Stage 1 = ", ps[0])
print("Pipeline Stage 2 = ", ps[1])
print("Pipeline Stage 3 = ", ps[2])
print("Label column = ", lr.getLabelCol())
2 - Evaluation
Part = 385
Total rows 1 = StringIndexer
Pipeline Stage 2 = VectorAssembler
Pipeline Stage 3 = StandardScaler
Pipeline Stage = MPG Label column
Model Evaluation - Part 3
Predict
# Make predictions on testing data
= pipelinemodel.transform(testingData) predictions
MSE
from pyspark.ml.evaluation import RegressionEvaluator
= RegressionEvaluator(predictionCol="prediction", labelCol="MPG", metricName="mse")
evaluator = evaluator.evaluate(predictions)
mse print("Mean Squared Error (MSE) = ",mse)
= 10.899168410636618 Mean Squared Error (MSE)
MAE
= RegressionEvaluator(predictionCol="prediction", labelCol="MPG", metricName="mae")
evaluator = evaluator.evaluate(predictions)
mae print("Mean Average Error (MAE) = ",mae)
= 2.6256840060950757 Mean Average Error (MAE)
R-Squared
= RegressionEvaluator(predictionCol="prediction", labelCol="MPG", metricName="r2")
evaluator = evaluator.evaluate(predictions)
r2 print("R - Squared = ", r2)
- Squared = 0.8202762026606878 R
Evaluate Section
print("Part 3 - Evaluation")
print("Mean Squared Error = ", round(mse,2))
print("Mean Absolute Error = ", round(mae,2))
print("R Squared = ", round(r2,2))
= pipelineModel.stages[-1]
lrModel
print("Intercept = ", round(lrModel.intercept,2))
3 - Evaluation
Part = 10.9
Mean Squared Error = 2.63
Mean Absolute Error = 0.82
R Squared = -13.9 Intercept
Model Persistence - Part 4
Save Model
# Save the pipeline model
"Practice_Project") pipelineModel.write().save(
Load Saved Model
# Load persisted model
= PipelineModel.load("Practice_Project") loadedPipelineModel
Predict with Persisted Model
= loadedPipelineModel.transform(testingData) predictions
Show Predicted Values
"MPG","prediction").show()
predictions.select(+----+------------------+
| MPG| prediction|
+----+------------------+
|13.0|11.318809914955523|
|13.0|14.368910120039477|
|13.0|10.684980370654177|
|15.0|13.066592339902503|
|15.5|15.669787158219059|
|18.0|19.815977805319374|
|18.0|22.299804385989464|
|18.0|20.053788782604773|
|18.6| 20.89085676127957|
|19.0| 24.8621804403861|
|21.5|26.265038242693652|
|22.0|23.098606013406698|
|23.0|21.278313625209034|
|24.0|22.969583765345735|
|25.1| 27.03028314401816|
|26.0| 27.95096978803409|
|26.0|25.754412517142416|
|27.0|28.400109085156263|
|29.0| 28.12374555454902|
|30.0|30.249466720931085|
+----+------------------+
20 rows only showing top
Evaluate Section
Run the code cell below. If the code throws up any errors, go back and review the code you have written.
print("Part 4 - Evaluation")
= loadedPipelineModel.stages[-1]
loadedmodel = len(loadedPipelineModel.stages)
totalstages = loadedPipelineModel.stages[1].getInputCols()
inputcolumns
print("Number of stages in the pipeline = ", totalstages)
for i,j in zip(inputcolumns, loadedmodel.coefficients):
print(f"Coefficient for {i} is {round(j,4)}")
4 - Evaluation
Part in the pipeline = 4
Number of stages for Cylinders is -1.1901
Coefficient for Engine_Disp is 1.0404
Coefficient for Horsepower is -0.0605
Coefficient for Weight is -5.3403
Coefficient for Accelerate is 0.1052
Coefficient for Year is 2.7429 Coefficient
spark.stop()