Import Libraries

ML Pipeline SparkML - MPG

Objectives


  1. Create a machine learning pipeline.
  2. Add stages to the pipeline.
  3. Run the pipeline.
  4. Create a machine learning pipeline for regression.
  5. Create a machine learning pipeline for classification.

Setup


We will be using the following libraries:

  • PySpark for connecting to the Spark Cluster

Suppress Warnings

To suppress warnings generated by our code, we’ll use this code block

# To suppress warnings generated by the code
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')
pip install pyspark==3.1.2 -q
pip install findspark -q
# FindSpark simplifies the process of using Apache Spark with Python
import findspark
findspark.init()

#import functions/Classes for sparkml

from pyspark.ml.regression import LinearRegression
from pyspark.ml.classification import LogisticRegression

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import StringIndexer

from pyspark.sql import SparkSession


# import functions/Classes for pipeline creation

from pyspark.ml import Pipeline

# import functions/Classes for metrics
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

Start Spark Session


#Create SparkSession
#Ignore any warnings by SparkSession command

spark = SparkSession.builder.appName("ML Pipeline").getOrCreate()

Data


Download Data Locally

import wget
wget.download ("https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0231EN-SkillsNetwork/datasets/mpg.csv")

8.104|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 13891 (14K) [text/csv]
Saving to: ‘mpg.csv.3

mpg.csv.3           100%[===================>]  13.57K  --.-KB/s    in 0s      

2024-11-11 18:12:21 (26.9 MB/s) - ‘mpg.csv.3’ saved [13891/13891]

Load MPG into SParkDF

mpg_data = spark.read.csv("mpg.csv", header=True, inferSchema=True)

View Schema

mpg_data.printSchema()
root
 |-- MPG: double (nullable = true)
 |-- Cylinders: integer (nullable = true)
 |-- Engine Disp: double (nullable = true)
 |-- Horsepower: integer (nullable = true)
 |-- Weight: integer (nullable = true)
 |-- Accelerate: double (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Origin: string (nullable = true)

View Data

mpg_data.show(5)
+----+---------+-----------+----------+------+----------+----+--------+
| MPG|Cylinders|Engine Disp|Horsepower|Weight|Accelerate|Year|  Origin|
+----+---------+-----------+----------+------+----------+----+--------+
|15.0|        8|      390.0|       190|  3850|       8.5|  70|American|
|21.0|        6|      199.0|        90|  2648|      15.0|  70|American|
|18.0|        6|      199.0|        97|  2774|      15.5|  70|American|
|16.0|        8|      304.0|       150|  3433|      12.0|  70|American|
|14.0|        8|      455.0|       225|  3086|      10.0|  70|American|
+----+---------+-----------+----------+------+----------+----+--------+
only showing top 5 rows

Define Pipeline Stages


# Stage 1 - assemble the input columns into a single vector 
vectorAssembler = VectorAssembler(inputCols=["Weight", "Horsepower", "Engine Disp"], outputCol="features")
# Stage 2 - scale the features using standard scaler
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")
# Stage 3 - create a linear regression instance
lr = LinearRegression(featuresCol="scaledFeatures", labelCol="MPG")

Build Pipeline


# Build the pipeline
# All the stages of the pipeline are mentioned in the order of execution.
pipeline = Pipeline(stages=[vectorAssembler, scaler, lr])

Split Data


# Split the data into training and testing sets
(trainingData, testData) = mpg_data.randomSplit([0.7, 0.3], seed=42)

Fit Pipeline


# Fit the pipeline to the training data
# ignore any warnings. The warnings are due to the simplified settings and the security settings of the lab

model = pipeline.fit(trainingData)

Make Predictions


predictions = model.transform(testData)

RMSE

evaluator = RegressionEvaluator(labelCol="MPG", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) =", rmse)

Root Mean Squared Error (RMSE) = 4.69281610922322