Import Libraries
Persist Model - MPG
- Save a trained model.
- Load a saved model.
- Make predictions using the loaded model.
We will be using the following libraries:
for connecting to the Spark Cluster
Suppress Warnings
To suppress warnings generated by our code, we’ll use this code block
# FindSpark simplifies the process of using Apache Spark with Python
import findspark
#import functions/Classes for sparkml
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
# import functions/Classes for metrics
from pyspark.ml.evaluation import RegressionEvaluator
Start Spark Session
#Create SparkSession
#Ignore any warnings by SparkSession command
= SparkSession.builder.appName("Model Persistence").getOrCreate() spark
- Modified version of car mileage dataset. Original dataset available at https://archive.ics.uci.edu/ml/datasets/auto+mpg
- Modified version of diamonds dataset. Original dataset available at https://www.openml.org/search?type=data&sort=runs&id=42225&status=active
Download Data Locally
import wget
wget.download (
Load MPG into SParkDF
# using the spark.read.csv function we load the data into a dataframe.
# the header = True mentions that there is a header row in out csv file
# the inferSchema = True, tells spark to automatically find out the data types of the columns.
# Load mpg dataset
mpg_data = spark.read.csv("mpg.csv", header=True, inferSchema=True)
root|-- MPG: double (nullable = true)
|-- Cylinders: integer (nullable = true)
|-- Engine Disp: double (nullable = true)
|-- Horsepower: integer (nullable = true)
|-- Weight: integer (nullable = true)
|-- Accelerate: double (nullable = true)
|-- Year: integer (nullable = true)
|-- Origin: string (nullable = true)
| MPG|Cylinders|Engine Disp|Horsepower|Weight|Accelerate|Year| Origin|
|15.0| 8| 390.0| 190| 3850| 8.5| 70|American|
|21.0| 6| 199.0| 90| 2648| 15.0| 70|American|
|18.0| 6| 199.0| 97| 2774| 15.5| 70|American|
|16.0| 8| 304.0| 150| 3433| 12.0| 70|American|
|14.0| 8| 455.0| 225| 3086| 10.0| 70|American|
5 rows only showing top
We ask the VectorAssembler to group a bunch of inputCols as single column named “features”
# Prepare feature vector
assembler = VectorAssembler(inputCols=["Cylinders", "Engine Disp", "Horsepower", "Weight", "Accelerate", "Year"], outputCol="features")
mpg_transformed_data = assembler.transform(mpg_data)
Display the assembled “features” and the label column “MPG”
| features| MPG|
|[8.0,304.0,193.0,...| 9.0|
20 rows only showing top
Split Data
# Split data into training and testing sets
(training_data, testing_data) = mpg_transformed_data.randomSplit([0.7, 0.3])
Create/Train Model
Create a LR model and train the model using the pipeline on training data set
# Train linear regression model
# Ignore any warnings
lr = LinearRegression(labelCol="MPG", featuresCol="features")
pipeline = Pipeline(stages=[lr])
model = pipeline.fit(training_data)
Save Model
Create a folder where the model will to be saved
mkdir model_storage
Persist Model
# Persist the model to the path "./model_stoarage/"
#The overwrite method is used to overwrite the model if it already exists,
#and the save method is used to specify the path where the model should be saved.
Load Model
from pyspark.ml.pipeline import PipelineModel
# Load persisted model
loaded_model = PipelineModel.load("./model_storage/")
Train/ Predict using Loaded Model
# Make predictions on test data
predictions = loaded_model.transform(testing_data)
predictions #In the above example, we use the load method of the PipelineModel object to load the persisted model from disk. We can then use this loaded model to make predictions on new data using the transform method.
# Make predictions on testing data
predictions = model.transform(testing_data)
| prediction|
| 16.88156870043919|
5 rows only showing top
Stop Spark