Import Libraries - T1

Pipeline LN Model - NASA Sound

In this project, we will work with the modified version of the NASA Airfoil Self Noise dataset.

The goal is to clean the dataset, create a Machine Learning pipeline, evaluate the model’s performance, and persist it for future use.

Stage 1 - Clean the dataset and remove duplicate rows and rows with null values. Doing this step ensures the data is consistent and reliable for further analysis.

Stage 2 - Construct a Machine Learning pipeline with three stages, including a regression stage. This pipeline will enable you to build a model that predicts the SoundLevel based on other columns in the dataset.

Stage 3 - Once you’ve trained the model, you will evaluate its performance using appropriate metrics to assess its accuracy and effectiveness.

Stage 4 - We will persist with the model, saving it for future use. This step ensures that the trained model can be stored and retrieved later, enabling its deployment in real-world applications and making predictions on new data.

Objectives


Building this pipeline will be a 4 part project:

  • Part 1 Perform ETL activity

    • Load a csv dataset
    • Remove duplicates if any
    • Drop rows with null values if any
    • Make transformations
    • Store the cleaned data in parquet format
  • Part 2 Create a Machine Learning Pipeline

    • Create a machine learning pipeline for prediction
  • Part 3 Evaluate the Model

    • Evaluate the model using relevant metrics
  • Part 4 Persist the Model

    • Save the model for future production use
    • Load and verify the stored model

Setup


We will be using the following libraries:

  • PySpark for connecting to the Spark Cluster

Suppress Warnings

To suppress warnings generated by our code, we’ll use this code block

# To suppress warnings generated by the code
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')
pip install pyspark==3.1.2 -q
pip install findspark -q
# FindSpark simplifies the process of using Apache Spark with Python
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.pipeline import PipelineModel
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import StandardScaler

Start Spark Session -T2


#Create SparkSession
#Ignore any warnings by SparkSession command

spark = SparkSession.builder.appName("Project").getOrCreate()

Data


Download Data Locally

import wget
wget.download ("https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/datasets/NASA_airfoil_noise_raw.csv")

8.104|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 60682 (59K) [text/csv]
Saving to: ‘NASA_airfoil_noise_raw.csv’

NASA_airfoil_noise_ 100%[===================>]  59.26K  --.-KB/s    in 0.001s  

2024-11-12 20:45:46 (61.8 MB/s) - ‘NASA_airfoil_noise_raw.csv’ saved [60682/60682]

Load into SparkDF -T3

df = spark.read.csv("NASA_airfoil_noise_raw.csv", header=True, inferSchema=True)

df.printSchema()
root
 |-- Frequency: integer (nullable = true)
 |-- AngleOfAttack: double (nullable = true)
 |-- ChordLength: double (nullable = true)
 |-- FreeStreamVelocity: double (nullable = true)
 |-- SuctionSideDisplacement: double (nullable = true)
 |-- SoundLevel: double (nullable = true)

Show Top 5 Rows - T4

df.show(5)
+---------+-------------+-----------+------------------+-----------------------+----------+
|Frequency|AngleOfAttack|ChordLength|FreeStreamVelocity|SuctionSideDisplacement|SoundLevel|
+---------+-------------+-----------+------------------+-----------------------+----------+
|      800|          0.0|     0.3048|              71.3|             0.00266337|   126.201|
|     1000|          0.0|     0.3048|              71.3|             0.00266337|   125.201|
|     1250|          0.0|     0.3048|              71.3|             0.00266337|   125.951|
|     1600|          0.0|     0.3048|              71.3|             0.00266337|   127.591|
|     2000|          0.0|     0.3048|              71.3|             0.00266337|   127.461|
+---------+-------------+-----------+------------------+-----------------------+----------+
only showing top 5 rows

ETL - Part 1


Total # of Rows - T6

rowcount1 = df.count()
print(rowcount1)
1522

Drop Duplicate Rows - T7

df = df.dropDuplicates()

Recount Rows - T8

rowcount2 = df.count()
print(rowcount2)
1503

Drop NA rows - T9

df = df.dropna()

Recount - T10

rowcount3 = df.count()
print(rowcount3)
1499

Rename Column - T11

  • Rename from SoundLevel to SoundLevelDecibels
df = df.withColumnRenamed("SoundLevel","SoundLevelDecibels")
df.show(5)
+---------+-------------+-----------+------------------+-----------------------+------------------+
|Frequency|AngleOfAttack|ChordLength|FreeStreamVelocity|SuctionSideDisplacement|SoundLevelDecibels|
+---------+-------------+-----------+------------------+-----------------------+------------------+
|     4000|          3.0|     0.3048|              31.7|             0.00529514|           115.608|
|     3150|          2.0|     0.2286|              31.7|             0.00372371|           121.527|
|     2000|          7.3|     0.2286|              31.7|              0.0132672|           115.309|
|     2000|          5.4|     0.1524|              71.3|             0.00401199|           131.111|
|      500|          9.9|     0.1524|              71.3|              0.0193001|           131.279|
+---------+-------------+-----------+------------------+-----------------------+------------------+
only showing top 5 rows

Save DF to Parquet file - T12

Save the data to parquet file: NASA_airfoil_noise_cleaned.parquet

df.write.mode("overwrite").parquet("NASA_airfoil_noise_cleaned.parquet")

Evaluate Section

Run the code cell below. If the code throws up any errors, go back and review the code you have written.

print("Part 1 - Evaluation")

print("Total rows = ", rowcount1)
print("Total rows after dropping duplicate rows = ", rowcount2)
print("Total rows after dropping duplicate rows and rows with null values = ", rowcount3)
print("New column name = ", df.columns[-1])

import os

print("NASA_airfoil_noise_cleaned.parquet exists :", os.path.isdir("NASA_airfoil_noise_cleaned.parquet"))

Part 1 - Evaluation
Total rows =  1522
Total rows after dropping duplicate rows =  1503
Total rows after dropping duplicate rows and rows with null values =  1499
New column name =  SoundLevelDecibels
NASA_airfoil_noise_cleaned.parquet exists : True

ML Pipeline Creation - Part 2


Parquet to DF - T1

Load data from “mpg-cleaned.parquet” into a dataframe

df = spark.read.parquet("NASA_airfoil_noise_cleaned.parquet")

Define VectorAssembler Stage - T3

Assemble the input columns into a single column “features”. Use all the columns except SoundLevelDecibels as input features.

# Stage - 1  Assemble the input columns into a single column "features". Use all the columns except SoundLevelDecibels as input features.
assembler = VectorAssembler(inputCols=['Frequency','AngleOfAttack','ChordLength','FreeStreamVelocity','SuctionSideDisplacement'], outputCol="features")

Define StandardScaler Stage - T4

Scale the “features” using standard scaler and store in “scaledFeatures” column

# Stage 2 - scale the "features" using standard scaler and store in "scaledFeatures" column
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")

Define Model Creation Stage - T5

Create a LinearRegression stage to predict “SoundLevelDecibels”

#Stage 3 - Create a LinearRegression stage to predict "SoundLevelDecibels"
lr = LinearRegression(featuresCol="scaledFeatures", labelCol="SoundLevelDecibels")

Build Pipeline - T6

# Build the pipeline
# All the stages of the pipeline are mentioned in the order of execution.
pipeline = Pipeline(stages=[assembler, scaler, lr])

Split Data - T7

# Split the data into training and testing sets with 70:30 split. Use 42 as seed
(trainingData, testingData) = df.randomSplit([0.7, 0.3], seed=42)

Fit Pipeline - T8

# Fit the pipeline to the training data
pipelinemodel = pipeline.fit(trainingData)

Evaluate Section

Run the code cell below. If the code throws up any errors, go back and review the code you have written.

print("Part 2 - Evaluation")
print("Total rows = ", rowcount4)
ps = [str(x).split("_")[0] for x in pipeline.getStages()]

print("Pipeline Stage 1 = ", ps[0])
print("Pipeline Stage 2 = ", ps[1])
print("Pipeline Stage 3 = ", ps[2])

print("Label column = ", lr.getLabelCol())

Part 2 - Evaluation
Total rows =  1499
Pipeline Stage 1 =  VectorAssembler
Pipeline Stage 2 =  StandardScaler
Pipeline Stage 3 =  LinearRegression
Label column =  SoundLevelDecibels

Model Evaluation - Part 3


Predict - T1

# Make predictions on testing data
predictions = pipelinemodel.transform(testingData)

predictions.select("SoundLevelDecibels","prediction").show()
+------------------+------------------+
|SoundLevelDecibels|        prediction|
+------------------+------------------+
|           127.315|123.64344009624753|
|           119.975|123.48695788614877|
|           121.783|124.38983849684254|
|           127.224|121.44706993294302|
|           122.229|125.68312652454188|
|           122.754|119.00135887553772|
|           127.564| 126.5260736531985|
|           126.149|124.72369322766609|
|           120.076|129.24665689814083|
|           138.123|130.62951864347926|
|           127.314|127.13089533096246|
|           130.715|125.89163510250594|
|           129.367|129.30423088951827|
|           122.905| 129.9451290107514|
|           127.127|128.10022579415522|
|           127.417|126.28072873047776|
|           128.698|122.06234864411809|
|           131.073|122.38819030163322|
|           135.368|125.76877819819666|
|           124.514|125.43708134590952|
+------------------+------------------+
only showing top 20 rows

MSE - T2

from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="SoundLevelDecibels", metricName="mse")
mse = evaluator.evaluate(predictions)
print("Mean Squared Error (MSE) = ",mse)

Mean Squared Error (MSE) =  22.593754071348812

MAE - T3

evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="SoundLevelDecibels", metricName="mae")
mae = evaluator.evaluate(predictions)
print("Mean Average Error (MAE) = ",mae)

Mean Average Error (MAE) =  3.7336902294631287

R-Squared - T4

evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="SoundLevelDecibels", metricName="r2")
r2 = evaluator.evaluate(predictions)
print("R - Squared = ", r2)

R - Squared =  0.5426016508689058

Evaluate Section

print("Part 3 - Evaluation")

print("Mean Squared Error = ", round(mse,2))
print("Mean Absolute Error = ", round(mae,2))
print("R Squared = ", round(r2,2))

lrModel = pipelinemodel.stages[-1]

print("Intercept = ", round(lrModel.intercept,2))

Part 3 - Evaluation
Mean Squared Error =  22.59
Mean Absolute Error =  3.73
R Squared =  0.54
Intercept =  132.6

Persist Model - Part 4


Save Model - T1

# Save the pipeline model
pipelinemodel.write().save("Final_Project")

Load Saved Model - T2

# Load persisted model
loadedPipelineModel = PipelineModel.load("Final_Project")

Predict with Persisted Model - T3

predictions = loadedPipelineModel.transform(testingData)

Show Predicted Values - T4

predictions.select("SoundLevelDecibels","prediction").show()
+------------------+------------------+
|SoundLevelDecibels|        prediction|
+------------------+------------------+
|           127.315|123.64344009624753|
|           119.975|123.48695788614877|
|           121.783|124.38983849684254|
|           127.224|121.44706993294302|
|           122.229|125.68312652454188|
|           122.754|119.00135887553772|
|           127.564| 126.5260736531985|
|           126.149|124.72369322766609|
|           120.076|129.24665689814083|
|           138.123|130.62951864347926|
|           127.314|127.13089533096246|
|           130.715|125.89163510250594|
|           129.367|129.30423088951827|
|           122.905| 129.9451290107514|
|           127.127|128.10022579415522|
|           127.417|126.28072873047776|
|           128.698|122.06234864411809|
|           131.073|122.38819030163322|
|           135.368|125.76877819819666|
|           124.514|125.43708134590952|
+------------------+------------------+
only showing top 20 rows

Evaluate Section

Run the code cell below. If the code throws up any errors, go back and review the code you have written.

print("Part 4 - Evaluation")

loadedmodel = loadedPipelineModel.stages[-1]
totalstages = len(loadedPipelineModel.stages)
inputcolumns = loadedPipelineModel.stages[0].getInputCols()

print("Number of stages in the pipeline = ", totalstages)
for i,j in zip(inputcolumns, loadedmodel.coefficients):
    print(f"Coefficient for {i} is {round(j,4)}")
    
Part 4 - Evaluation
Number of stages in the pipeline =  3
Coefficient for Frequency is -3.9728
Coefficient for AngleOfAttack is -2.4775
Coefficient for ChordLength is -3.3818
Coefficient for FreeStreamVelocity is 1.5789
Coefficient for SuctionSideDisplacement is -1.6465   
spark.stop()
lr = LinearRegression(featuresCol="scaledFeatures", labelCol="SoundLevelDecibels")



evaluator = RegressionEvaluator(labelCol="SoundLevelDecibels", predictionCol="prediction", metricName="mse")