Import Libraries

Pipeline LN Model - MPG

We will create a pipeline for a linear regression model that will predict the MPG from ‘Cylinders’,‘Engine_Disp’,‘Horsepower’,‘Weight’,‘Accelerate’,‘Year’

Objectives


Building this pipeline will be a 4 part project:

  • Part 1 ETL

    • Load a csv dataset
    • Remove duplicates if any
    • Drop rows with null values if any
    • Make transformations
    • Store the cleaned data in parquet format
  • Part 2 Machine Learning Pipeline creation

    • Create a machine learning pipeline for prediction
  • Part 3 Model evaluation

    • Evaluate the model using metrics
    • Print the intercept and the coefficients
  • Part 4 Model Persistance

    • Cave the model for future production use
    • Load and verify the stored model

Setup


We will be using the following libraries:

  • PySpark for connecting to the Spark Cluster

Suppress Warnings

To suppress warnings generated by our code, we’ll use this code block

# To suppress warnings generated by the code
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')
pip install pyspark==3.1.2 -q
pip install findspark -q
# FindSpark simplifies the process of using Apache Spark with Python
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.pipeline import PipelineModel
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import StandardScaler

Start Spark Session


#Create SparkSession
#Ignore any warnings by SparkSession command

spark = SparkSession.builder.appName("Project").getOrCreate()

Data


Download Data Locally

import wget
wget.download ("https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/datasets/mpg-raw.csv")

Connecting to cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)|169.63.118.104|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 14354 (14K) [text/csv]
Saving to: ‘mpg-raw.csv’

mpg-raw.csv         100%[===================>]  14.02K  --.-KB/s    in 0.001s  

2024-11-12 15:49:36 (27.0 MB/s) - ‘mpg-raw.csv’ saved [14354/14354]

Load into SparkDF

# using the spark.read.csv function we load the data into a dataframe.
# the header = True mentions that there is a header row in out csv file
# the inferSchema = True, tells spark to automatically find out the data types of the columns.

# Load mpg dataset
df = spark.read.csv("mpg-raw.csv", header=True, inferSchema=True)

df.printSchema()
root
 |-- MPG: double (nullable = true)
 |-- Cylinders: integer (nullable = true)
 |-- Engine Disp: double (nullable = true)
 |-- Horsepower: integer (nullable = true)
 |-- Weight: integer (nullable = true)
 |-- Accelerate: double (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Origin: string (nullable = true)
 
df.show(5)

+----+---------+-----------+----------+------+----------+----+--------+
| MPG|Cylinders|Engine Disp|Horsepower|Weight|Accelerate|Year|  Origin|
+----+---------+-----------+----------+------+----------+----+--------+
|46.6|        4|       86.0|        65|  2110|      17.9|  80|Japanese|
|44.6|        4|       91.0|        67|  1850|      13.8|  80|Japanese|
|44.3|        4|       90.0|        48|  2085|      21.7|  80|European|
|44.0|        4|       97.0|        52|  2130|      24.6|  82|European|
|43.4|        4|       90.0|        48|  2335|      23.7|  80|European|
+----+---------+-----------+----------+------+----------+----+--------+
only showing top 5 rows

ETL - Part 1


Count Cars/Origin

df.groupBy('Origin').count().orderBy('count').show()

+--------+-----+
|  Origin|count|
+--------+-----+
|    null|    1|
|European|   70|
|Japanese|   88|
|American|  247|
+--------+-----+

Total # of Rows

rowcount1 = df.count()
print(rowcount1)
406

Drop Duplicate Rows

df = df.dropDuplicates()

Recount Rows

rowcount2 = df.count()
print(rowcount2)
392

Drop NA rows

df = df.dropna()

Recount

rowcount3 = df.count()
print(rowcount3)
385

Rename Column

  • Rename from Engine Disp to Engine_Disp
df = df.withColumnRenamed("Engine Disp","Engine_Disp")
df.show(5)
+----+---------+-----------+----------+------+----------+----+--------+
| MPG|Cylinders|Engine_Disp|Horsepower|Weight|Accelerate|Year|  Origin|
+----+---------+-----------+----------+------+----------+----+--------+
|24.0|        4|      134.0|        96|  2702|      13.5|  75|Japanese|
|18.0|        6|      250.0|        88|  3139|      14.5|  71|American|
|29.0|        4|       68.0|        49|  1867|      19.5|  73|European|
|22.4|        6|      231.0|       110|  3415|      15.8|  81|American|
|20.5|        6|      231.0|       105|  3425|      16.9|  77|American|
+----+---------+-----------+----------+------+----------+----+--------+
only showing top 5 rows

Save DF to Parquet file

df.write.mode("overwrite").parquet("mpg-cleaned.parquet")

Evaluate Section

Run the code cell below. If the code throws up any errors, go back and review the code you have written.

print("Part 1 - Evaluation")

print("Total rows = ", rowcount1)
print("Total rows after dropping duplicate rows = ", rowcount2)
print("Total rows after dropping duplicate rows and rows with null values = ", rowcount3)
print("Renamed column name = ", df.columns[2])

import os

print("mpg-cleaned.parquet exists :", os.path.isdir("mpg-cleaned.parquet"))

Part 1 - Evaluation
Total rows =  406
Total rows after dropping duplicate rows =  392
Total rows after dropping duplicate rows and rows with null values =  385
Renamed column name =  Engine_Disp
mpg-cleaned.parquet exists : True

ML Pipeline Creation - Part 2


Parquet to DF

Load data from “mpg-cleaned.parquet” into a dataframe

df = spark.read.parquet("mpg-cleaned.parquet")
rowcount4 = df.count()
print(rowcount4)
385

Show Data & Schema

df.show(5)
+----+---------+-----------+----------+------+----------+----+--------+
| MPG|Cylinders|Engine_Disp|Horsepower|Weight|Accelerate|Year|  Origin|
+----+---------+-----------+----------+------+----------+----+--------+
|32.2|        4|      108.0|        75|  2265|      15.2|  80|Japanese|
|28.0|        4|      107.0|        86|  2464|      15.5|  76|European|
|26.0|        4|      156.0|        92|  2585|      14.5|  82|American|
|25.0|        4|      104.0|        95|  2375|      17.5|  70|European|
|25.0|        4|      140.0|        75|  2542|      17.0|  74|American|
+----+---------+-----------+----------+------+----------+----+--------+
only showing top 5 rows

df.printSchema()
root
 |-- MPG: double (nullable = true)
 |-- Cylinders: integer (nullable = true)
 |-- Engine_Disp: double (nullable = true)
 |-- Horsepower: integer (nullable = true)
 |-- Weight: integer (nullable = true)
 |-- Accelerate: double (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Origin: string (nullable = true)

Define StringIndexer Stage

Convert string col “Origin” into “OriginIndex”

# Stage - 1 Using StringIndexer convert the string column "Origin" into "OriginIndex" 
indexer = StringIndexer(inputCol="Origin", outputCol="OriginIndex")

Define VectorAssembler Stage

Assemble the input columns ‘Cylinders’,‘Engine_Disp’,‘Horsepower’,‘Weight’,‘Accelerate’,‘Year’ into a single column “features”

# Stage - 2  Assemble the input columns 'Cylinders','Engine_Disp','Horsepower','Weight','Accelerate','Year' into a single column "features"
assembler = VectorAssembler(inputCols=['Cylinders','Engine_Disp','Horsepower','Weight','Accelerate','Year'], outputCol="features")

Define StandardScaler Stage

Scale the “features” using standard scaler and store in “scaledFeatures” column

# Stage 3 - scale the "features" using standard scaler and store in "scaledFeatures" column
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")

Define Model Creation Stage

Create a Linear Regression stage to predict MPG

#Stage 4 - Create a LinearRegression stage to predict "MPG"
lr = LinearRegression(featuresCol="scaledFeatures", labelCol="MPG")

Build Pipeline

# Build the pipeline
# All the stages of the pipeline are mentioned in the order of execution.
pipeline = Pipeline(stages=[indexer, assembler, scaler, lr])

Split Data

# Split the data into training and testing sets with 70:30 split. Use 42 as seed
(trainingData, testingData) = df.randomSplit([0.7, 0.3], seed=42)

Fit Pipeline

# Fit the pipeline to the training data
pipelinemodel = pipeline.fit(trainingData)

Evaluate Section

Run the code cell below. If the code throws up any errors, go back and review the code you have written.

print("Part 2 - Evaluation")
print("Total rows = ", rowcount4)
ps = [str(x).split("_")[0] for x in pipeline.getStages()]

print("Pipeline Stage 1 = ", ps[0])
print("Pipeline Stage 2 = ", ps[1])
print("Pipeline Stage 3 = ", ps[2])

print("Label column = ", lr.getLabelCol())

Part 2 - Evaluation
Total rows =  385
Pipeline Stage 1 =  StringIndexer
Pipeline Stage 2 =  VectorAssembler
Pipeline Stage 3 =  StandardScaler
Label column =  MPG

Model Evaluation - Part 3


Predict

# Make predictions on testing data
predictions = pipelinemodel.transform(testingData)

MSE

from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="MPG", metricName="mse")
mse = evaluator.evaluate(predictions)
print("Mean Squared Error (MSE) = ",mse)

Mean Squared Error (MSE) =  10.899168410636618

MAE

evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="MPG", metricName="mae")
mae = evaluator.evaluate(predictions)
print("Mean Average Error (MAE) = ",mae)

Mean Average Error (MAE) =  2.6256840060950757

R-Squared

evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="MPG", metricName="r2")
r2 = evaluator.evaluate(predictions)
print("R - Squared = ", r2)

R - Squared =  0.8202762026606878

Evaluate Section

print("Part 3 - Evaluation")

print("Mean Squared Error = ", round(mse,2))
print("Mean Absolute Error = ", round(mae,2))
print("R Squared = ", round(r2,2))

lrModel = pipelineModel.stages[-1]

print("Intercept = ", round(lrModel.intercept,2))
Part 3 - Evaluation
Mean Squared Error =  10.9
Mean Absolute Error =  2.63
R Squared =  0.82
Intercept =  -13.9

Model Persistence - Part 4


Save Model

# Save the pipeline model
pipelineModel.write().save("Practice_Project")

Load Saved Model

# Load persisted model
loadedPipelineModel = PipelineModel.load("Practice_Project")

Predict with Persisted Model

predictions = loadedPipelineModel.transform(testingData)

Show Predicted Values

predictions.select("MPG","prediction").show()
+----+------------------+
| MPG|        prediction|
+----+------------------+
|13.0|11.318809914955523|
|13.0|14.368910120039477|
|13.0|10.684980370654177|
|15.0|13.066592339902503|
|15.5|15.669787158219059|
|18.0|19.815977805319374|
|18.0|22.299804385989464|
|18.0|20.053788782604773|
|18.6| 20.89085676127957|
|19.0|  24.8621804403861|
|21.5|26.265038242693652|
|22.0|23.098606013406698|
|23.0|21.278313625209034|
|24.0|22.969583765345735|
|25.1| 27.03028314401816|
|26.0| 27.95096978803409|
|26.0|25.754412517142416|
|27.0|28.400109085156263|
|29.0| 28.12374555454902|
|30.0|30.249466720931085|
+----+------------------+
only showing top 20 rows

Evaluate Section

Run the code cell below. If the code throws up any errors, go back and review the code you have written.

print("Part 4 - Evaluation")

loadedmodel = loadedPipelineModel.stages[-1]
totalstages = len(loadedPipelineModel.stages)
inputcolumns = loadedPipelineModel.stages[1].getInputCols()

print("Number of stages in the pipeline = ", totalstages)
for i,j in zip(inputcolumns, loadedmodel.coefficients):
    print(f"Coefficient for {i} is {round(j,4)}")
    
Part 4 - Evaluation
Number of stages in the pipeline =  4
Coefficient for Cylinders is -1.1901
Coefficient for Engine_Disp is 1.0404
Coefficient for Horsepower is -0.0605
Coefficient for Weight is -5.3403
Coefficient for Accelerate is 0.1052
Coefficient for Year is 2.7429   
spark.stop()