# To suppress warnings generated by the code
def warn(*args, **kwargs):
pass
import warnings
= warn
warnings.warn 'ignore')
warnings.filterwarnings(
# FindSpark simplifies the process of using Apache Spark with Python
import findspark
findspark.init()
from pyspark.sql import SparkSession
#import functions/Classes for sparkml
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
# import functions/Classes for metrics
from pyspark.ml.evaluation import RegressionEvaluator
LR Model in Spark - Diamonds
This is an expansion on Build LR & Test/Evaluate LR Model in Python - Diamonds examples but this time we’ll use SparkML to build a LR Model that will predict the price of Diamonds
Objectives
- Use PySpark to connect to a spark cluster.
- Create a spark session.
- Read a csv file into a data frame.
- Split the dataset into training and testing sets.
- Use VectorAssembler to combine multiple columns into a single vector column
- Use Linear Regression to build a prediction model.
- Use metrics to evaluate the model.
- Stop the spark session
Start Spark Session
#Create SparkSession
= SparkSession.builder.appName("Regressing using SparkML").getOrCreate() spark
Import Data
import wget
"https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0231EN-SkillsNetwork/datasets/diamonds.csv") wget.download (
CSV to SparkDF
# using the spark.read.csv function we load the data into a dataframe.
# the header = True mentions that there is a header row in out csv file
# the inferSchema = True, tells spark to automatically find out the data types of the columns.
# Load mpg dataset
= spark.read.csv("diamonds.csv", header=True, inferSchema=True)
diamond_data 5)
diamond_data.show(
+---+-----+-------+-----+-------+-----+-----+-----+----+----+----+
| s|carat| cut|color|clarity|depth|table|price| x| y| z|
+---+-----+-------+-----+-------+-----+-----+-----+----+----+----+
| 1| 0.23| Ideal| E| SI2| 61.5| 55.0| 326|3.95|3.98|2.43|
| 2| 0.21|Premium| E| SI1| 59.8| 61.0| 326|3.89|3.84|2.31|
| 3| 0.23| Good| E| VS1| 56.9| 65.0| 327|4.05|4.07|2.31|
| 4| 0.29|Premium| I| VS2| 62.4| 58.0| 334| 4.2|4.23|2.63|
| 5| 0.31| Good| J| SI2| 63.3| 58.0| 335|4.34|4.35|2.75|
+---+-----+-------+-----+-------+-----+-----+-----+----+----+----+
5 rows only showing top
Identify Label/Input Columns
- use the price column as label/target column
- use the columns carat,depth and table as features
= VectorAssembler(inputCols=["carat", "depth", "table"], outputCol="features")
assembler = assembler.transform(diamond_data)
diamond_transformed_data
# Print the vectorized features/input and label/target columns
"features", "price").show()
diamond_transformed_data.select(+----------------+-----+
| features|price|
+----------------+-----+
|[0.23,61.5,55.0]| 326|
|[0.21,59.8,61.0]| 326|
|[0.23,56.9,65.0]| 327|
|[0.29,62.4,58.0]| 334|
|[0.31,63.3,58.0]| 335|
|[0.24,62.8,57.0]| 336|
|[0.24,62.3,57.0]| 336|
|[0.26,61.9,55.0]| 337|
|[0.22,65.1,61.0]| 337|
|[0.23,59.4,61.0]| 338|
| [0.3,64.0,55.0]| 339|
|[0.23,62.8,56.0]| 340|
|[0.22,60.4,61.0]| 342|
|[0.31,62.2,54.0]| 344|
| [0.2,60.2,62.0]| 345|
|[0.32,60.9,58.0]| 345|
| [0.3,62.0,54.0]| 348|
| [0.3,63.4,54.0]| 351|
| [0.3,63.8,56.0]| 351|
| [0.3,62.7,59.0]| 351|
+----------------+-----+
20 rows only showing top
Split Data
# Split data into training and testing sets
= diamond_transformed_data.randomSplit([0.7, 0.3], seed=42) (training_data, testing_data)
Build/Train Model
# Train linear regression model
# Ignore any warnings
= LinearRegression(featuresCol="features", labelCol="price")
lr = lr.fit(training_data) model
Predict Price
# Make predictions on testing data
= model.transform(testing_data) predictions
Evaluate Model
R squared
The closer to 1 the better
#R-squared (R2): R2 is a statistical measure that represents the proportion of variance in the dependent variable (target) that is explained by the independent variables (features).
= RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="r2")
evaluator = evaluator.evaluate(predictions)
r2 print("R Squared =", r2)
# OUTPUT
= 0.854508517843993 R Squared
MAE Mean Absolute Error
Lower the better
= RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="mae")
evaluator = evaluator.evaluate(predictions)
mae print("MAE =", mae)
# OUTPUT
= 994.7282983463749 MAE
RMSE Root Mean Squared Error
The lower the better
= RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="rmse")
evaluator = evaluator.evaluate(predictions)
mae print("RMSE =", rmse)
# OUTPUT
= 1534.8181642609825 RMSE