# To suppress warnings generated by the code
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')Import Libraries
ML Pipeline SparkML - MPG
Objectives
- Create a machine learning pipeline.
- Add stages to the pipeline.
- Run the pipeline.
- Create a machine learning pipeline for regression.
- Create a machine learning pipeline for classification.
Setup
We will be using the following libraries:
PySparkfor connecting to the Spark Cluster
Suppress Warnings
To suppress warnings generated by our code, we’ll use this code block
pip install pyspark==3.1.2 -q
pip install findspark -q# FindSpark simplifies the process of using Apache Spark with Python
import findspark
findspark.init()
#import functions/Classes for sparkml
from pyspark.ml.regression import LinearRegression
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import StringIndexer
from pyspark.sql import SparkSession
# import functions/Classes for pipeline creation
from pyspark.ml import Pipeline
# import functions/Classes for metrics
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluatorStart Spark Session
#Create SparkSession
#Ignore any warnings by SparkSession command
spark = SparkSession.builder.appName("ML Pipeline").getOrCreate()Data
- Modified version of car mileage dataset. Original dataset available at https://archive.ics.uci.edu/ml/datasets/auto+mpg
Download Data Locally
import wget
wget.download ("https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0231EN-SkillsNetwork/datasets/mpg.csv")
8.104|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 13891 (14K) [text/csv]
Saving to: ‘mpg.csv.3’
mpg.csv.3 100%[===================>] 13.57K --.-KB/s in 0s
2024-11-11 18:12:21 (26.9 MB/s) - ‘mpg.csv.3’ saved [13891/13891]Load MPG into SParkDF
mpg_data = spark.read.csv("mpg.csv", header=True, inferSchema=True)View Schema
mpg_data.printSchema()
root
|-- MPG: double (nullable = true)
|-- Cylinders: integer (nullable = true)
|-- Engine Disp: double (nullable = true)
|-- Horsepower: integer (nullable = true)
|-- Weight: integer (nullable = true)
|-- Accelerate: double (nullable = true)
|-- Year: integer (nullable = true)
|-- Origin: string (nullable = true)View Data
mpg_data.show(5)
+----+---------+-----------+----------+------+----------+----+--------+
| MPG|Cylinders|Engine Disp|Horsepower|Weight|Accelerate|Year| Origin|
+----+---------+-----------+----------+------+----------+----+--------+
|15.0| 8| 390.0| 190| 3850| 8.5| 70|American|
|21.0| 6| 199.0| 90| 2648| 15.0| 70|American|
|18.0| 6| 199.0| 97| 2774| 15.5| 70|American|
|16.0| 8| 304.0| 150| 3433| 12.0| 70|American|
|14.0| 8| 455.0| 225| 3086| 10.0| 70|American|
+----+---------+-----------+----------+------+----------+----+--------+
only showing top 5 rowsDefine Pipeline Stages
# Stage 1 - assemble the input columns into a single vector
vectorAssembler = VectorAssembler(inputCols=["Weight", "Horsepower", "Engine Disp"], outputCol="features")
# Stage 2 - scale the features using standard scaler
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")
# Stage 3 - create a linear regression instance
lr = LinearRegression(featuresCol="scaledFeatures", labelCol="MPG")Build Pipeline
# Build the pipeline
# All the stages of the pipeline are mentioned in the order of execution.
pipeline = Pipeline(stages=[vectorAssembler, scaler, lr])Split Data
# Split the data into training and testing sets
(trainingData, testData) = mpg_data.randomSplit([0.7, 0.3], seed=42)Fit Pipeline
# Fit the pipeline to the training data
# ignore any warnings. The warnings are due to the simplified settings and the security settings of the lab
model = pipeline.fit(trainingData)Make Predictions
predictions = model.transform(testData)RMSE
evaluator = RegressionEvaluator(labelCol="MPG", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) =", rmse)
Root Mean Squared Error (RMSE) = 4.69281610922322