# To suppress warnings generated by the code
def warn(*args, **kwargs):
import warnings
= warn
warnings.warn 'ignore') warnings.filterwarnings(
Import Libraries
ETL with Spark ProverbTokenizer
- Use the feature extractor CountVectorizer
- Use the feature extractor TF-IDF
- Use the feature transformer Tokenizer
- Use the feature transformer StopWordsRemover
- Use the feature transformer StringIndexer
- Use the feature transformer StandardScaler
We will be using the following libraries:
for connecting to the Spark Cluster
Suppress Warnings
To suppress warnings generated by our code, we’ll use this code block
# FindSpark simplifies the process of using Apache Spark with Python
import findspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import rand
Create Spark Session
#Create SparkSession
#Ignore any warnings by SparkSession command
= SparkSession.builder.appName("Feature ETL with Spark").getOrCreate() spark
Load Proverbs
import wget
wget.download (
8.104|:443... connected.
200 OK
HTTP request sent, awaiting response... 846 [text/csv]
Saving to: ‘proverbs.csv’
100%[===================>] 846 --.-KB/s in 0s
2024-11-11 17:31:13 (8.04 MB/s) - ‘proverbs.csv’ saved [846/846]
Load Data Locally
# Load proverbs dataset
= spark.read.csv("proverbs.csv", header=True, inferSchema=True)
= False)
textdata.show(truncate +---+-----------------------------------------------------------+
|id |text |
|1 |When in Rome do as the Romans do. |
|2 |Do not judge a book by its cover. |
|3 |Actions speak louder than words. |
|4 |A picture is worth a thousand words. |
|5 |If at first you do not succeed try try again. |
|6 |Practice makes perfect. |
|7 |An apple a day keeps the doctor away. |
|8 |When the going gets tough the tough get going. |
|9 |All is fair in love and war. |
|10 |Too many cooks spoil the broth. |
|11 |You can not make an omelette without breaking eggs. |
|12 |The early bird catches the worm. |
|13 |Better late than never. |
|14 |Honesty is the best policy. |
|15 |A penny saved is a penny earned. |
|16 |Two wrongs do not make a right. |
|17 |The grass is always greener on the other side of the fence.|
|18 |Do not count your chickens before they're hatched. |
|19 |Laughter is the best medicine. |
|20 |Rome wasn't built in a day. |
20 rows only showing top
Load MPG
import wget
wget.download (
8.104|:443... connected.
200 OK
HTTP request sent, awaiting response... 13891 (14K) [text/csv]
Length: .2’
Saving to: ‘mpg.csv
.2 100%[===================>] 13.57K --.-KB/s in 0s
2024-11-11 17:33:40 (41.3 MB/s) - ‘mpg.csv.2’ saved [13891/13891]
Load locally
# Load mpg dataset
= spark.read.csv("mpg.csv", header=True, inferSchema=True)
| MPG|Cylinders|Engine Disp|Horsepower|Weight|Accelerate|Year| Origin|
|15.0| 8| 390.0| 190| 3850| 8.5| 70|American|
|21.0| 6| 199.0| 90| 2648| 15.0| 70|American|
|18.0| 6| 199.0| 97| 2774| 15.5| 70|American|
|16.0| 8| 304.0| 150| 3433| 12.0| 70|American|
|14.0| 8| 455.0| 225| 3086| 10.0| 70|American|
|15.0| 8| 350.0| 165| 3693| 11.5| 70|American|
|18.0| 8| 307.0| 130| 3504| 12.0| 70|American|
|14.0| 8| 454.0| 220| 4354| 9.0| 70|American|
|15.0| 8| 400.0| 150| 3761| 9.5| 70|American|
|10.0| 8| 307.0| 200| 4376| 15.0| 70|American|
|15.0| 8| 383.0| 170| 3563| 10.0| 70|American|
|11.0| 8| 318.0| 210| 4382| 13.5| 70|American|
|10.0| 8| 360.0| 215| 4615| 14.0| 70|American|
|15.0| 8| 429.0| 198| 4341| 10.0| 70|American|
|21.0| 6| 200.0| 85| 2587| 16.0| 70|American|
|17.0| 8| 302.0| 140| 3449| 10.5| 70|American|
| 9.0| 8| 304.0| 193| 4732| 18.5| 70|American|
|14.0| 8| 340.0| 160| 3609| 8.0| 70|American|
|22.0| 6| 198.0| 95| 2833| 15.5| 70|American|
|14.0| 8| 440.0| 215| 4312| 8.5| 70|American|
20 rows only showing top
A tokenizer is used to break a sentence into words
Tokenize Proverbs
Here is the data
#display the dataframe
= False)
textdata.show(truncate +---+-----------------------------------------------------------+
|id |text |
|1 |When in Rome do as the Romans do. |
|2 |Do not judge a book by its cover. |
|3 |Actions speak louder than words. |
|4 |A picture is worth a thousand words. |
|5 |If at first you do not succeed try try again. |
|6 |Practice makes perfect. |
|7 |An apple a day keeps the doctor away. |
|8 |When the going gets tough the tough get going. |
|9 |All is fair in love and war. |
|10 |Too many cooks spoil the broth. |
|11 |You can not make an omelette without breaking eggs. |
|12 |The early bird catches the worm. |
|13 |Better late than never. |
|14 |Honesty is the best policy. |
|15 |A penny saved is a penny earned. |
|16 |Two wrongs do not make a right. |
|17 |The grass is always greener on the other side of the fence.|
|18 |Do not count your chickens before they're hatched. |
|19 |Laughter is the best medicine. |
|20 |Rome wasn't built in a day. |
20 rows only showing top
Create Tokenizer
#create tokenizer instance.
#import tokenizer
from pyspark.ml.feature import Tokenizer
#mention the column to be tokenized as inputcol
#mention the output column name where the tokens are to be stored.
= Tokenizer(inputCol="text", outputCol="words")
= tokenizer.transform(textdata) textdata
Display Tokenized Data
#display the tokenized data
|id |words |
|1 |[when, in, rome, do, as, the, romans, do.] |
|2 |[do, not, judge, a, book, by, its, cover.] |
|3 |[actions, speak, louder, than, words.] |
|4 |[a, picture, is, worth, a, thousand, words.] |
|5 |[if, at, first, you, do, not, succeed, try, try, again.] |
|6 |[practice, makes, perfect.] |
|7 |[an, apple, a, day, keeps, the, doctor, away.] |
|8 |[when, the, going, gets, tough, the, tough, get, going.] |
|9 |[all, is, fair, in, love, and, war.] |
|10 |[too, many, cooks, spoil, the, broth.] |
|11 |[you, can, not, make, an, omelette, without, breaking, eggs.] |
|12 |[the, early, bird, catches, the, worm.] |
|13 |[better, late, than, never.] |
|14 |[honesty, is, the, best, policy.] |
|15 |[a, penny, saved, is, a, penny, earned.] |
|16 |[two, wrongs, do, not, make, a, right.] |
|17 |[the, grass, is, always, greener, on, the, other, side, of, the, fence.]|
|18 |[do, not, count, your, chickens, before, they're, hatched.] |
|19 |[laughter, is, the, best, medicine.] |
|20 |[rome, wasn't, built, in, a, day.] |
20 rows only showing top
CountVectorizer is used to convert text into numerical format. It gives the count of each word in a given document.
#import CountVectorizer
from pyspark.ml.feature import CountVectorizer
Create Vectorizer Object
# Create a CountVectorizer object
# mention the column to be count vectorized as inputcol
# mention the output column name where the count vectors are to be stored.
= CountVectorizer(inputCol="words", outputCol="features") cv
Fit Model
# Fit the CountVectorizer model on the input data
= cv.fit(textdata) model
Transform Input to Vectors
# Transform the input data to bag-of-words vectors
= model.transform(textdata) result
Display DF
# display the dataframe
"words","features").show(truncate = False)
|words |features |
|[when, in, rome, do, as, the, romans, do.] |(99,[0,4,5,6,17,38,78,95],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]) |
|[do, not, judge, a, book, by, its, cover.] |(99,[1,3,4,19,21,22,62,70],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]) |
|[actions, speak, louder, than, words.] |(99,[9,16,58,66,89],[1.0,1.0,1.0,1.0,1.0]) |
|[a, picture, is, worth, a, thousand, words.] |(99,[1,2,16,61,79,83],[2.0,1.0,1.0,1.0,1.0,1.0]) |
|[if, at, first, you, do, not, succeed, try, try, again.] |(99,[3,4,11,13,26,40,49,69,75],[1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0]) |
|[practice, makes, perfect.] |(99,[23,24,52],[1.0,1.0,1.0]) |
|[an, apple, a, day, keeps, the, doctor, away.] |(99,[0,1,7,33,41,46,73,91],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]) |
|[when, the, going, gets, tough, the, tough, get, going.] |(99,[0,12,14,17,20,60,88],[2.0,2.0,1.0,1.0,1.0,1.0,1.0]) |
|[all, is, fair, in, love, and, war.] |(99,[2,5,27,36,74,77,97],[1.0,1.0,1.0,1.0,1.0,1.0,1.0]) |
|[too, many, cooks, spoil, the, broth.] |(99,[0,32,34,35,37,59],[1.0,1.0,1.0,1.0,1.0,1.0]) |
|[you, can, not, make, an, omelette, without, breaking, eggs.] |(99,[3,7,13,15,44,55,56,68,93],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]) |
|[the, early, bird, catches, the, worm.] |(99,[0,29,43,50,54],[2.0,1.0,1.0,1.0,1.0]) |
|[better, late, than, never.] |(99,[9,31,42,64],[1.0,1.0,1.0,1.0]) |
|[honesty, is, the, best, policy.] |(99,[0,2,10,39,98],[1.0,1.0,1.0,1.0,1.0]) |
|[a, penny, saved, is, a, penny, earned.] |(99,[1,2,8,18,81],[2.0,1.0,2.0,1.0,1.0]) |
|[two, wrongs, do, not, make, a, right.] |(99,[1,3,4,15,57,67,84],[1.0,1.0,1.0,1.0,1.0,1.0,1.0]) |
|[the, grass, is, always, greener, on, the, other, side, of, the, fence.]|(99,[0,2,30,45,53,65,71,85,90,94],[3.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])|
|[do, not, count, your, chickens, before, they're, hatched.] |(99,[3,4,47,48,51,63,80,96],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]) |
|[laughter, is, the, best, medicine.] |(99,[0,2,10,25,87],[1.0,1.0,1.0,1.0,1.0]) |
|[rome, wasn't, built, in, a, day.] |(99,[1,5,6,28,72,82],[1.0,1.0,1.0,1.0,1.0,1.0]) |
20 rows only showing top
StringIndexer - MPG
StringIndexer converts a column of strings into a column of integers.
Create DF with Sample Data
#import StringIndexer
from pyspark.ml.feature import StringIndexer
Index the Strings
# index the strings in the column "Origin" and store their indexes in the column "OriginIndex"
= StringIndexer(inputCol="Origin", outputCol="OriginIndex")
indexer = indexer.fit(mpgdata).transform(mpgdata)
| MPG|Cylinders|Engine Disp|Horsepower|Weight|Accelerate|Year| Origin|OriginIndex|
|23.8| 4| 151.0| 85| 2855| 17.6| 78|American| 0.0|
|28.0| 4| 97.0| 92| 2288| 17.0| 72|Japanese| 1.0|
|17.0| 6| 250.0| 100| 3329| 15.5| 71|American| 0.0|
|21.5| 3| 80.0| 110| 2720| 13.5| 77|Japanese| 1.0|
|18.2| 8| 318.0| 135| 3830| 15.2| 79|American| 0.0|
|37.0| 4| 119.0| 92| 2434| 15.0| 80|Japanese| 1.0|
|15.0| 8| 302.0| 130| 4295| 14.9| 77|American| 0.0|
|23.2| 4| 156.0| 105| 2745| 16.7| 78|American| 0.0|
|32.4| 4| 107.0| 72| 2290| 17.0| 80|Japanese| 1.0|
|26.0| 4| 97.0| 46| 1835| 20.5| 70|European| 2.0|
|37.0| 4| 91.0| 68| 2025| 18.2| 82|Japanese| 1.0|
|27.0| 4| 97.0| 88| 2130| 14.5| 71|Japanese| 1.0|
|18.0| 6| 250.0| 88| 3139| 14.5| 71|American| 0.0|
|16.0| 8| 400.0| 230| 4278| 9.5| 73|American| 0.0|
|13.0| 8| 400.0| 170| 4746| 12.0| 71|American| 0.0|
|34.1| 4| 91.0| 68| 1985| 16.0| 81|Japanese| 1.0|
|14.0| 8| 302.0| 137| 4042| 14.5| 73|American| 0.0|
|22.0| 6| 225.0| 100| 3233| 15.4| 76|American| 0.0|
|17.5| 6| 258.0| 95| 3193| 17.8| 76|American| 0.0|
|27.4| 4| 121.0| 80| 2670| 15.0| 79|American| 0.0|
20 rows only showing top
StandardScaler transforms the data so that it has a mean of 0 and a standard deviation of 1
- Create a single column named “feaures” using the columns “Cylinders”, “Engine Disp”, “Horsepower”, “Weight”
Create DF
#import StandardScaler
from pyspark.ml.feature import VectorAssembler
= VectorAssembler(inputCols=["Cylinders", "Engine Disp", "Horsepower", "Weight"], outputCol="features")
= assembler.transform(mpgdata)
#show the dataframe
"MPG","features").show(truncate = False)
|MPG |features |
|21.0|[6.0,199.0,90.0,2648.0] |
|18.0|[6.0,199.0,97.0,2774.0] |
|21.0|[6.0,200.0,85.0,2587.0] |
|9.0 |[8.0,304.0,193.0,4732.0]|
|22.0|[6.0,198.0,95.0,2833.0] |
20 rows only showing top
Scale with StandardScaler
Use StandardScaler to scale the “features” column of the dataframe “mpg_transformed_data” and save the scaled data into the “scaledFeatures” column.
from pyspark.ml.feature import StandardScaler
= StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=True)
= scaler.fit(mpg_transformed_data)
= scalerModel.transform(mpg_transformed_data) scaledData
Show Scaled Data
# Show the scaled data
"features","scaledFeatures").show(truncate = False)
|features |scaledFeatures |
|[8.0,390.0,190.0,3850.0]|[1.48205302652896,1.869079955831451,2.222084561602166,1.027093462353608] |
|[6.0,199.0,90.0,2648.0] |[0.3095711165403583,0.043843985634147174,-0.37591456792553746,-0.38801882543985255]|
|[6.0,199.0,97.0,2774.0] |[0.3095711165403583,0.043843985634147174,-0.1940546288585982,-0.2396792678175763] |
|[8.0,304.0,150.0,3433.0]|[1.48205302652896,1.0472459587792617,1.1828849097910845,0.5361601645084557] |
|[8.0,455.0,225.0,3086.0]|[1.48205302652896,2.4902335582546176,3.131384256936862,0.12763773200901246] |
|[8.0,350.0,165.0,3693.0]|[1.48205302652896,1.4868315851095026,1.57258477922024,0.8422576643639463] |
|[8.0,307.0,130.0,3504.0]|[1.48205302652896,1.0759145865834079,0.6632850838855439,0.619748327930532] |
|[8.0,454.0,220.0,4354.0]|[1.48205302652896,2.480677348986569,3.001484300460477,1.6204516928427128] |
|[8.0,400.0,150.0,3761.0]|[1.48205302652896,1.964642048511938,1.1828849097910845,0.9223139335569208] |
|[8.0,307.0,200.0,4376.0]|[1.48205302652896,1.0759145865834079,2.481884474554936,1.646352250522793] |
|[8.0,383.0,170.0,3563.0]|[1.48205302652896,1.80218649095511,1.7024847356966253,0.689208914436201] |
|[8.0,318.0,210.0,4382.0]|[1.48205302652896,1.1810328885319439,2.7416843875077066,1.6534160389809966] |
|[8.0,360.0,215.0,4615.0]|[1.48205302652896,1.5823936777899896,2.871584343984092,1.9277264907745708] |
|[8.0,429.0,198.0,4341.0]|[1.48205302652896,2.241772117285351,2.4299244919643823,1.6051468178499384] |
|[6.0,200.0,85.0,2587.0] |[0.3095711165403583,0.053400194902195885,-0.5058145244019226,-0.4598340080982561] |
|[8.0,302.0,140.0,3449.0]|[1.48205302652896,1.0281335402431644,0.9230849968383142,0.5549969337303321] |
|[8.0,304.0,193.0,4732.0]|[1.48205302652896,1.0472459587792617,2.300024535487997,2.0654703657095417] |
|[8.0,340.0,160.0,3609.0]|[1.48205302652896,1.3912694924290154,1.442684822743855,0.7433646259490956] |
|[6.0,198.0,95.0,2833.0] |[0.3095711165403583,0.03428777636609846,-0.24601461144915227,-0.17021868131190726] |
|[8.0,440.0,215.0,4312.0]|[1.48205302652896,2.3468904192338864,2.871584343984092,1.5710051736352875] |
20 rows only showing top
Stop Spark