Import Libraries

Persist Model - MPG

Objectives


  1. Save a trained model.
  2. Load a saved model.
  3. Make predictions using the loaded model.

Setup


We will be using the following libraries:

  • PySpark for connecting to the Spark Cluster

Suppress Warnings

To suppress warnings generated by our code, we’ll use this code block

# To suppress warnings generated by the code
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')
pip install pyspark==3.1.2 -q
pip install findspark -q
# FindSpark simplifies the process of using Apache Spark with Python
import findspark
findspark.init()

#import functions/Classes for sparkml

from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline

# import functions/Classes for metrics
from pyspark.ml.evaluation import RegressionEvaluator

Start Spark Session


#Create SparkSession
#Ignore any warnings by SparkSession command

spark = SparkSession.builder.appName("Model Persistence").getOrCreate()

Data


Download Data Locally

import wget
wget.download ("https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0231EN-SkillsNetwork/datasets/mpg.csv")

8.104|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 13891 (14K) [text/csv]
Saving to: ‘mpg.csv.4

mpg.csv.4           100%[===================>]  13.57K  --.-KB/s    in 0s      

2024-11-11 18:56:13 (42.7 MB/s) - ‘mpg.csv.4’ saved [13891/13891]

Load MPG into SParkDF

# using the spark.read.csv function we load the data into a dataframe.
# the header = True mentions that there is a header row in out csv file
# the inferSchema = True, tells spark to automatically find out the data types of the columns.

# Load mpg dataset
mpg_data = spark.read.csv("mpg.csv", header=True, inferSchema=True)

mpg_data.printSchema()
root
 |-- MPG: double (nullable = true)
 |-- Cylinders: integer (nullable = true)
 |-- Engine Disp: double (nullable = true)
 |-- Horsepower: integer (nullable = true)
 |-- Weight: integer (nullable = true)
 |-- Accelerate: double (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Origin: string (nullable = true)
 
mpg_data.show(5)

+----+---------+-----------+----------+------+----------+----+--------+
| MPG|Cylinders|Engine Disp|Horsepower|Weight|Accelerate|Year|  Origin|
+----+---------+-----------+----------+------+----------+----+--------+
|15.0|        8|      390.0|       190|  3850|       8.5|  70|American|
|21.0|        6|      199.0|        90|  2648|      15.0|  70|American|
|18.0|        6|      199.0|        97|  2774|      15.5|  70|American|
|16.0|        8|      304.0|       150|  3433|      12.0|  70|American|
|14.0|        8|      455.0|       225|  3086|      10.0|  70|American|
+----+---------+-----------+----------+------+----------+----+--------+
only showing top 5 rows

VectorAssembler

We ask the VectorAssembler to group a bunch of inputCols as single column named “features”

# Prepare feature vector
assembler = VectorAssembler(inputCols=["Cylinders", "Engine Disp", "Horsepower", "Weight", "Accelerate", "Year"], outputCol="features")
mpg_transformed_data = assembler.transform(mpg_data)

Display the assembled “features” and the label column “MPG”

mpg_transformed_data.select("features","MPG").show()
+--------------------+----+
|            features| MPG|
+--------------------+----+
|[8.0,390.0,190.0,...|15.0|
|[6.0,199.0,90.0,2...|21.0|
|[6.0,199.0,97.0,2...|18.0|
|[8.0,304.0,150.0,...|16.0|
|[8.0,455.0,225.0,...|14.0|
|[8.0,350.0,165.0,...|15.0|
|[8.0,307.0,130.0,...|18.0|
|[8.0,454.0,220.0,...|14.0|
|[8.0,400.0,150.0,...|15.0|
|[8.0,307.0,200.0,...|10.0|
|[8.0,383.0,170.0,...|15.0|
|[8.0,318.0,210.0,...|11.0|
|[8.0,360.0,215.0,...|10.0|
|[8.0,429.0,198.0,...|15.0|
|[6.0,200.0,85.0,2...|21.0|
|[8.0,302.0,140.0,...|17.0|
|[8.0,304.0,193.0,...| 9.0|
|[8.0,340.0,160.0,...|14.0|
|[6.0,198.0,95.0,2...|22.0|
|[8.0,440.0,215.0,...|14.0|
+--------------------+----+
only showing top 20 rows

Split Data

# Split data into training and testing sets
(training_data, testing_data) = mpg_transformed_data.randomSplit([0.7, 0.3])

Create/Train Model

Create a LR model and train the model using the pipeline on training data set

# Train linear regression model
# Ignore any warnings
lr = LinearRegression(labelCol="MPG", featuresCol="features")
pipeline = Pipeline(stages=[lr])
model = pipeline.fit(training_data)

Save Model

Create a folder where the model will to be saved

mkdir model_storage

Persist Model

# Persist the model to the path "./model_stoarage/"

model.write().overwrite().save("./model_storage/")

#The overwrite method is used to overwrite the model if it already exists,
#and the save method is used to specify the path where the model should be saved.

Load Model

from pyspark.ml.pipeline import PipelineModel

# Load persisted model
loaded_model = PipelineModel.load("./model_storage/")

Train/ Predict using Loaded Model

Train

# Make predictions on test data
predictions = loaded_model.transform(testing_data)
#In the above example, we use the load method of the PipelineModel object to load the persisted model from disk. We can then use this loaded model to make predictions on new data using the transform method.

Predict

# Make predictions on testing data
predictions = model.transform(testing_data)

predictions.select("prediction").show(5)
+------------------+
|        prediction|
+------------------+
|16.522389919352698|
|10.265848330973004|
|10.776385698311891|
|11.298718166038366|
| 16.88156870043919|
+------------------+
only showing top 5 rows

Stop Spark

spark.stop()