# To suppress warnings generated by the code
def warn(*args, **kwargs):
pass
import warnings
= warn
warnings.warn 'ignore') warnings.filterwarnings(
Import Libraries - T1
Pipeline LN Model - NASA Sound
In this project, we will work with the modified version of the NASA Airfoil Self Noise dataset.
The goal is to clean the dataset, create a Machine Learning pipeline, evaluate the model’s performance, and persist it for future use.
Stage 1 - Clean the dataset and remove duplicate rows and rows with null values. Doing this step ensures the data is consistent and reliable for further analysis.
Stage 2 - Construct a Machine Learning pipeline with three stages, including a regression stage. This pipeline will enable you to build a model that predicts the SoundLevel based on other columns in the dataset.
Stage 3 - Once you’ve trained the model, you will evaluate its performance using appropriate metrics to assess its accuracy and effectiveness.
Stage 4 - We will persist with the model, saving it for future use. This step ensures that the trained model can be stored and retrieved later, enabling its deployment in real-world applications and making predictions on new data.
Objectives
Building this pipeline will be a 4 part project:
Part 1 Perform ETL activity
- Load a csv dataset
- Remove duplicates if any
- Drop rows with null values if any
- Make transformations
- Store the cleaned data in parquet format
Part 2 Create a Machine Learning Pipeline
- Create a machine learning pipeline for prediction
Part 3 Evaluate the Model
- Evaluate the model using relevant metrics
Part 4 Persist the Model
- Save the model for future production use
- Load and verify the stored model
Setup
We will be using the following libraries:
PySpark
for connecting to the Spark Cluster
Suppress Warnings
To suppress warnings generated by our code, we’ll use this code block
==3.1.2 -q
pip install pyspark-q pip install findspark
# FindSpark simplifies the process of using Apache Spark with Python
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.pipeline import PipelineModel
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import StandardScaler
Start Spark Session -T2
#Create SparkSession
#Ignore any warnings by SparkSession command
= SparkSession.builder.appName("Project").getOrCreate() spark
Data
- The original dataset can be found here NASA airfoil self noise dataset. https://archive.ics.uci.edu/dataset/291/airfoil+self+noise
- This dataset is licensed under a Creative Commons Attribution 4.0 International (CC BY 4.0) license.
Download Data Locally
import wget
"https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/datasets/NASA_airfoil_noise_raw.csv")
wget.download (
8.104|:443... connected.
200 OK
HTTP request sent, awaiting response... 60682 (59K) [text/csv]
Length:
Saving to: ‘NASA_airfoil_noise_raw.csv’
100%[===================>] 59.26K --.-KB/s in 0.001s
NASA_airfoil_noise_
2024-11-12 20:45:46 (61.8 MB/s) - ‘NASA_airfoil_noise_raw.csv’ saved [60682/60682]
Load into SparkDF -T3
= spark.read.csv("NASA_airfoil_noise_raw.csv", header=True, inferSchema=True)
df
df.printSchema()
root|-- Frequency: integer (nullable = true)
|-- AngleOfAttack: double (nullable = true)
|-- ChordLength: double (nullable = true)
|-- FreeStreamVelocity: double (nullable = true)
|-- SuctionSideDisplacement: double (nullable = true)
|-- SoundLevel: double (nullable = true)
Show Top 5 Rows - T4
5)
df.show(+---------+-------------+-----------+------------------+-----------------------+----------+
|Frequency|AngleOfAttack|ChordLength|FreeStreamVelocity|SuctionSideDisplacement|SoundLevel|
+---------+-------------+-----------+------------------+-----------------------+----------+
| 800| 0.0| 0.3048| 71.3| 0.00266337| 126.201|
| 1000| 0.0| 0.3048| 71.3| 0.00266337| 125.201|
| 1250| 0.0| 0.3048| 71.3| 0.00266337| 125.951|
| 1600| 0.0| 0.3048| 71.3| 0.00266337| 127.591|
| 2000| 0.0| 0.3048| 71.3| 0.00266337| 127.461|
+---------+-------------+-----------+------------------+-----------------------+----------+
5 rows only showing top
ETL - Part 1
Total # of Rows - T6
= df.count()
rowcount1 print(rowcount1)
1522
Drop Duplicate Rows - T7
= df.dropDuplicates() df
Recount Rows - T8
= df.count()
rowcount2 print(rowcount2)
1503
Drop NA rows - T9
= df.dropna() df
Recount - T10
= df.count()
rowcount3 print(rowcount3)
1499
Rename Column - T11
- Rename from SoundLevel to SoundLevelDecibels
= df.withColumnRenamed("SoundLevel","SoundLevelDecibels")
df 5)
df.show(+---------+-------------+-----------+------------------+-----------------------+------------------+
|Frequency|AngleOfAttack|ChordLength|FreeStreamVelocity|SuctionSideDisplacement|SoundLevelDecibels|
+---------+-------------+-----------+------------------+-----------------------+------------------+
| 4000| 3.0| 0.3048| 31.7| 0.00529514| 115.608|
| 3150| 2.0| 0.2286| 31.7| 0.00372371| 121.527|
| 2000| 7.3| 0.2286| 31.7| 0.0132672| 115.309|
| 2000| 5.4| 0.1524| 71.3| 0.00401199| 131.111|
| 500| 9.9| 0.1524| 71.3| 0.0193001| 131.279|
+---------+-------------+-----------+------------------+-----------------------+------------------+
5 rows only showing top
Save DF to Parquet file - T12
Save the data to parquet file: NASA_airfoil_noise_cleaned.parquet
"overwrite").parquet("NASA_airfoil_noise_cleaned.parquet") df.write.mode(
Evaluate Section
Run the code cell below. If the code throws up any errors, go back and review the code you have written.
print("Part 1 - Evaluation")
print("Total rows = ", rowcount1)
print("Total rows after dropping duplicate rows = ", rowcount2)
print("Total rows after dropping duplicate rows and rows with null values = ", rowcount3)
print("New column name = ", df.columns[-1])
import os
print("NASA_airfoil_noise_cleaned.parquet exists :", os.path.isdir("NASA_airfoil_noise_cleaned.parquet"))
1 - Evaluation
Part = 1522
Total rows = 1503
Total rows after dropping duplicate rows and rows with null values = 1499
Total rows after dropping duplicate rows = SoundLevelDecibels
New column name True NASA_airfoil_noise_cleaned.parquet exists :
ML Pipeline Creation - Part 2
Parquet to DF - T1
Load data from “mpg-cleaned.parquet” into a dataframe
= spark.read.parquet("NASA_airfoil_noise_cleaned.parquet") df
Print Total # of Rows - T2
= df.count()
rowcount4 print(rowcount4)
1499
Define VectorAssembler Stage - T3
Assemble the input columns into a single column “features”. Use all the columns except SoundLevelDecibels as input features.
# Stage - 1 Assemble the input columns into a single column "features". Use all the columns except SoundLevelDecibels as input features.
= VectorAssembler(inputCols=['Frequency','AngleOfAttack','ChordLength','FreeStreamVelocity','SuctionSideDisplacement'], outputCol="features") assembler
Define StandardScaler Stage - T4
Scale the “features” using standard scaler and store in “scaledFeatures” column
# Stage 2 - scale the "features" using standard scaler and store in "scaledFeatures" column
= StandardScaler(inputCol="features", outputCol="scaledFeatures") scaler
Define Model Creation Stage - T5
Create a LinearRegression stage to predict “SoundLevelDecibels”
#Stage 3 - Create a LinearRegression stage to predict "SoundLevelDecibels"
= LinearRegression(featuresCol="scaledFeatures", labelCol="SoundLevelDecibels") lr
Build Pipeline - T6
# Build the pipeline
# All the stages of the pipeline are mentioned in the order of execution.
= Pipeline(stages=[assembler, scaler, lr]) pipeline
Split Data - T7
# Split the data into training and testing sets with 70:30 split. Use 42 as seed
= df.randomSplit([0.7, 0.3], seed=42) (trainingData, testingData)
Fit Pipeline - T8
# Fit the pipeline to the training data
= pipeline.fit(trainingData) pipelinemodel
Evaluate Section
Run the code cell below. If the code throws up any errors, go back and review the code you have written.
print("Part 2 - Evaluation")
print("Total rows = ", rowcount4)
= [str(x).split("_")[0] for x in pipeline.getStages()]
ps
print("Pipeline Stage 1 = ", ps[0])
print("Pipeline Stage 2 = ", ps[1])
print("Pipeline Stage 3 = ", ps[2])
print("Label column = ", lr.getLabelCol())
2 - Evaluation
Part = 1499
Total rows 1 = VectorAssembler
Pipeline Stage 2 = StandardScaler
Pipeline Stage 3 = LinearRegression
Pipeline Stage = SoundLevelDecibels Label column
Model Evaluation - Part 3
Predict - T1
# Make predictions on testing data
= pipelinemodel.transform(testingData)
predictions
"SoundLevelDecibels","prediction").show()
predictions.select(+------------------+------------------+
|SoundLevelDecibels| prediction|
+------------------+------------------+
| 127.315|123.64344009624753|
| 119.975|123.48695788614877|
| 121.783|124.38983849684254|
| 127.224|121.44706993294302|
| 122.229|125.68312652454188|
| 122.754|119.00135887553772|
| 127.564| 126.5260736531985|
| 126.149|124.72369322766609|
| 120.076|129.24665689814083|
| 138.123|130.62951864347926|
| 127.314|127.13089533096246|
| 130.715|125.89163510250594|
| 129.367|129.30423088951827|
| 122.905| 129.9451290107514|
| 127.127|128.10022579415522|
| 127.417|126.28072873047776|
| 128.698|122.06234864411809|
| 131.073|122.38819030163322|
| 135.368|125.76877819819666|
| 124.514|125.43708134590952|
+------------------+------------------+
20 rows only showing top
MSE - T2
from pyspark.ml.evaluation import RegressionEvaluator
= RegressionEvaluator(predictionCol="prediction", labelCol="SoundLevelDecibels", metricName="mse")
evaluator = evaluator.evaluate(predictions)
mse print("Mean Squared Error (MSE) = ",mse)
= 22.593754071348812 Mean Squared Error (MSE)
MAE - T3
= RegressionEvaluator(predictionCol="prediction", labelCol="SoundLevelDecibels", metricName="mae")
evaluator = evaluator.evaluate(predictions)
mae print("Mean Average Error (MAE) = ",mae)
= 3.7336902294631287 Mean Average Error (MAE)
R-Squared - T4
= RegressionEvaluator(predictionCol="prediction", labelCol="SoundLevelDecibels", metricName="r2")
evaluator = evaluator.evaluate(predictions)
r2 print("R - Squared = ", r2)
- Squared = 0.5426016508689058 R
Evaluate Section
print("Part 3 - Evaluation")
print("Mean Squared Error = ", round(mse,2))
print("Mean Absolute Error = ", round(mae,2))
print("R Squared = ", round(r2,2))
= pipelinemodel.stages[-1]
lrModel
print("Intercept = ", round(lrModel.intercept,2))
3 - Evaluation
Part = 22.59
Mean Squared Error = 3.73
Mean Absolute Error = 0.54
R Squared = 132.6 Intercept
Persist Model - Part 4
Save Model - T1
# Save the pipeline model
"Final_Project") pipelinemodel.write().save(
Load Saved Model - T2
# Load persisted model
= PipelineModel.load("Final_Project") loadedPipelineModel
Predict with Persisted Model - T3
= loadedPipelineModel.transform(testingData) predictions
Show Predicted Values - T4
"SoundLevelDecibels","prediction").show()
predictions.select(+------------------+------------------+
|SoundLevelDecibels| prediction|
+------------------+------------------+
| 127.315|123.64344009624753|
| 119.975|123.48695788614877|
| 121.783|124.38983849684254|
| 127.224|121.44706993294302|
| 122.229|125.68312652454188|
| 122.754|119.00135887553772|
| 127.564| 126.5260736531985|
| 126.149|124.72369322766609|
| 120.076|129.24665689814083|
| 138.123|130.62951864347926|
| 127.314|127.13089533096246|
| 130.715|125.89163510250594|
| 129.367|129.30423088951827|
| 122.905| 129.9451290107514|
| 127.127|128.10022579415522|
| 127.417|126.28072873047776|
| 128.698|122.06234864411809|
| 131.073|122.38819030163322|
| 135.368|125.76877819819666|
| 124.514|125.43708134590952|
+------------------+------------------+
20 rows only showing top
Evaluate Section
Run the code cell below. If the code throws up any errors, go back and review the code you have written.
print("Part 4 - Evaluation")
= loadedPipelineModel.stages[-1]
loadedmodel = len(loadedPipelineModel.stages)
totalstages = loadedPipelineModel.stages[0].getInputCols()
inputcolumns
print("Number of stages in the pipeline = ", totalstages)
for i,j in zip(inputcolumns, loadedmodel.coefficients):
print(f"Coefficient for {i} is {round(j,4)}")
4 - Evaluation
Part in the pipeline = 3
Number of stages for Frequency is -3.9728
Coefficient for AngleOfAttack is -2.4775
Coefficient for ChordLength is -3.3818
Coefficient for FreeStreamVelocity is 1.5789
Coefficient for SuctionSideDisplacement is -1.6465 Coefficient
spark.stop()
= LinearRegression(featuresCol="scaledFeatures", labelCol="SoundLevelDecibels")
lr
= RegressionEvaluator(labelCol="SoundLevelDecibels", predictionCol="prediction", metricName="mse") evaluator