# To suppress warnings generated by the code
def warn(*args, **kwargs):
pass
import warnings
= warn
warnings.warn 'ignore') warnings.filterwarnings(
Import Libraries
Persist Model - MPG
Objectives
- Save a trained model.
- Load a saved model.
- Make predictions using the loaded model.
Setup
We will be using the following libraries:
PySpark
for connecting to the Spark Cluster
Suppress Warnings
To suppress warnings generated by our code, we’ll use this code block
==3.1.2 -q
pip install pyspark-q pip install findspark
# FindSpark simplifies the process of using Apache Spark with Python
import findspark
findspark.init()
#import functions/Classes for sparkml
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
# import functions/Classes for metrics
from pyspark.ml.evaluation import RegressionEvaluator
Start Spark Session
#Create SparkSession
#Ignore any warnings by SparkSession command
= SparkSession.builder.appName("Model Persistence").getOrCreate() spark
Data
- Modified version of car mileage dataset. Original dataset available at https://archive.ics.uci.edu/ml/datasets/auto+mpg
- Modified version of diamonds dataset. Original dataset available at https://www.openml.org/search?type=data&sort=runs&id=42225&status=active
Download Data Locally
import wget
"https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0231EN-SkillsNetwork/datasets/mpg.csv")
wget.download (
8.104|:443... connected.
200 OK
HTTP request sent, awaiting response... 13891 (14K) [text/csv]
Length: .4’
Saving to: ‘mpg.csv
.4 100%[===================>] 13.57K --.-KB/s in 0s
mpg.csv
2024-11-11 18:56:13 (42.7 MB/s) - ‘mpg.csv.4’ saved [13891/13891]
Load MPG into SParkDF
# using the spark.read.csv function we load the data into a dataframe.
# the header = True mentions that there is a header row in out csv file
# the inferSchema = True, tells spark to automatically find out the data types of the columns.
# Load mpg dataset
= spark.read.csv("mpg.csv", header=True, inferSchema=True)
mpg_data
mpg_data.printSchema()
root|-- MPG: double (nullable = true)
|-- Cylinders: integer (nullable = true)
|-- Engine Disp: double (nullable = true)
|-- Horsepower: integer (nullable = true)
|-- Weight: integer (nullable = true)
|-- Accelerate: double (nullable = true)
|-- Year: integer (nullable = true)
|-- Origin: string (nullable = true)
5)
mpg_data.show(
+----+---------+-----------+----------+------+----------+----+--------+
| MPG|Cylinders|Engine Disp|Horsepower|Weight|Accelerate|Year| Origin|
+----+---------+-----------+----------+------+----------+----+--------+
|15.0| 8| 390.0| 190| 3850| 8.5| 70|American|
|21.0| 6| 199.0| 90| 2648| 15.0| 70|American|
|18.0| 6| 199.0| 97| 2774| 15.5| 70|American|
|16.0| 8| 304.0| 150| 3433| 12.0| 70|American|
|14.0| 8| 455.0| 225| 3086| 10.0| 70|American|
+----+---------+-----------+----------+------+----------+----+--------+
5 rows only showing top
VectorAssembler
We ask the VectorAssembler to group a bunch of inputCols as single column named “features”
# Prepare feature vector
= VectorAssembler(inputCols=["Cylinders", "Engine Disp", "Horsepower", "Weight", "Accelerate", "Year"], outputCol="features")
assembler = assembler.transform(mpg_data) mpg_transformed_data
Display the assembled “features” and the label column “MPG”
"features","MPG").show()
mpg_transformed_data.select(+--------------------+----+
| features| MPG|
+--------------------+----+
|[8.0,390.0,190.0,...|15.0|
|[6.0,199.0,90.0,2...|21.0|
|[6.0,199.0,97.0,2...|18.0|
|[8.0,304.0,150.0,...|16.0|
|[8.0,455.0,225.0,...|14.0|
|[8.0,350.0,165.0,...|15.0|
|[8.0,307.0,130.0,...|18.0|
|[8.0,454.0,220.0,...|14.0|
|[8.0,400.0,150.0,...|15.0|
|[8.0,307.0,200.0,...|10.0|
|[8.0,383.0,170.0,...|15.0|
|[8.0,318.0,210.0,...|11.0|
|[8.0,360.0,215.0,...|10.0|
|[8.0,429.0,198.0,...|15.0|
|[6.0,200.0,85.0,2...|21.0|
|[8.0,302.0,140.0,...|17.0|
|[8.0,304.0,193.0,...| 9.0|
|[8.0,340.0,160.0,...|14.0|
|[6.0,198.0,95.0,2...|22.0|
|[8.0,440.0,215.0,...|14.0|
+--------------------+----+
20 rows only showing top
Split Data
# Split data into training and testing sets
= mpg_transformed_data.randomSplit([0.7, 0.3]) (training_data, testing_data)
Create/Train Model
Create a LR model and train the model using the pipeline on training data set
# Train linear regression model
# Ignore any warnings
= LinearRegression(labelCol="MPG", featuresCol="features")
lr = Pipeline(stages=[lr])
pipeline = pipeline.fit(training_data) model
Save Model
Create a folder where the model will to be saved
mkdir model_storage
Persist Model
# Persist the model to the path "./model_stoarage/"
"./model_storage/")
model.write().overwrite().save(
#The overwrite method is used to overwrite the model if it already exists,
#and the save method is used to specify the path where the model should be saved.
Load Model
from pyspark.ml.pipeline import PipelineModel
# Load persisted model
= PipelineModel.load("./model_storage/") loaded_model
Train/ Predict using Loaded Model
Train
# Make predictions on test data
= loaded_model.transform(testing_data)
predictions #In the above example, we use the load method of the PipelineModel object to load the persisted model from disk. We can then use this loaded model to make predictions on new data using the transform method.
Predict
# Make predictions on testing data
= model.transform(testing_data)
predictions
"prediction").show(5)
predictions.select(+------------------+
| prediction|
+------------------+
|16.522389919352698|
|10.265848330973004|
|10.776385698311891|
|11.298718166038366|
| 16.88156870043919|
+------------------+
5 rows only showing top
Stop Spark
spark.stop()