ETL with Spark ProverbTokenizer

Objectives


  1. Use the feature extractor CountVectorizer
  2. Use the feature extractor TF-IDF
  3. Use the feature transformer Tokenizer
  4. Use the feature transformer StopWordsRemover
  5. Use the feature transformer StringIndexer
  6. Use the feature transformer StandardScaler

Setup


We will be using the following libraries:

  • PySpark for connecting to the Spark Cluster

Suppress Warnings

To suppress warnings generated by our code, we’ll use this code block

# To suppress warnings generated by the code
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')

Import Libraries

# FindSpark simplifies the process of using Apache Spark with Python
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import rand

Create Spark Session


#Create SparkSession
#Ignore any warnings by SparkSession command

spark = SparkSession.builder.appName("Feature ETL with Spark").getOrCreate()

Data


Load Proverbs

import wget
wget.download ("https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/datasets/proverbs.csv")

8.104|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 846 [text/csv]
Saving to: ‘proverbs.csv’

proverbs.csv        100%[===================>]     846  --.-KB/s    in 0s      

2024-11-11 17:31:13 (8.04 MB/s) - ‘proverbs.csv’ saved [846/846]

Load Data Locally

# Load proverbs dataset
textdata = spark.read.csv("proverbs.csv", header=True, inferSchema=True)

textdata.show(truncate = False)
+---+-----------------------------------------------------------+
|id |text                                                       |
+---+-----------------------------------------------------------+
|1  |When in Rome do as the Romans do.                          |
|2  |Do not judge a book by its cover.                          |
|3  |Actions speak louder than words.                           |
|4  |A picture is worth a thousand words.                       |
|5  |If at first you do not succeed try try again.              |
|6  |Practice makes perfect.                                    |
|7  |An apple a day keeps the doctor away.                      |
|8  |When the going gets tough the tough get going.             |
|9  |All is fair in love and war.                               |
|10 |Too many cooks spoil the broth.                            |
|11 |You can not make an omelette without breaking eggs.        |
|12 |The early bird catches the worm.                           |
|13 |Better late than never.                                    |
|14 |Honesty is the best policy.                                |
|15 |A penny saved is a penny earned.                           |
|16 |Two wrongs do not make a right.                            |
|17 |The grass is always greener on the other side of the fence.|
|18 |Do not count your chickens before they're hatched.         |
|19 |Laughter is the best medicine.                             |
|20 |Rome wasn't built in a day.                                |
+---+-----------------------------------------------------------+
only showing top 20 rows

Load MPG

import wget
wget.download ("https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/datasets/mpg.csv")

8.104|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 13891 (14K) [text/csv]
Saving to: ‘mpg.csv.2

mpg.csv.2           100%[===================>]  13.57K  --.-KB/s    in 0s      

2024-11-11 17:33:40 (41.3 MB/s) - ‘mpg.csv.2’ saved [13891/13891]

Load locally

# Load mpg dataset
mpgdata = spark.read.csv("mpg.csv", header=True, inferSchema=True)
mpgdata.show()
+----+---------+-----------+----------+------+----------+----+--------+
| MPG|Cylinders|Engine Disp|Horsepower|Weight|Accelerate|Year|  Origin|
+----+---------+-----------+----------+------+----------+----+--------+
|15.0|        8|      390.0|       190|  3850|       8.5|  70|American|
|21.0|        6|      199.0|        90|  2648|      15.0|  70|American|
|18.0|        6|      199.0|        97|  2774|      15.5|  70|American|
|16.0|        8|      304.0|       150|  3433|      12.0|  70|American|
|14.0|        8|      455.0|       225|  3086|      10.0|  70|American|
|15.0|        8|      350.0|       165|  3693|      11.5|  70|American|
|18.0|        8|      307.0|       130|  3504|      12.0|  70|American|
|14.0|        8|      454.0|       220|  4354|       9.0|  70|American|
|15.0|        8|      400.0|       150|  3761|       9.5|  70|American|
|10.0|        8|      307.0|       200|  4376|      15.0|  70|American|
|15.0|        8|      383.0|       170|  3563|      10.0|  70|American|
|11.0|        8|      318.0|       210|  4382|      13.5|  70|American|
|10.0|        8|      360.0|       215|  4615|      14.0|  70|American|
|15.0|        8|      429.0|       198|  4341|      10.0|  70|American|
|21.0|        6|      200.0|        85|  2587|      16.0|  70|American|
|17.0|        8|      302.0|       140|  3449|      10.5|  70|American|
| 9.0|        8|      304.0|       193|  4732|      18.5|  70|American|
|14.0|        8|      340.0|       160|  3609|       8.0|  70|American|
|22.0|        6|      198.0|        95|  2833|      15.5|  70|American|
|14.0|        8|      440.0|       215|  4312|       8.5|  70|American|
+----+---------+-----------+----------+------+----------+----+--------+
only showing top 20 rows

Tokenizer


A tokenizer is used to break a sentence into words

Tokenize Proverbs

Here is the data

#display the dataframe
textdata.show(truncate = False)
+---+-----------------------------------------------------------+
|id |text                                                       |
+---+-----------------------------------------------------------+
|1  |When in Rome do as the Romans do.                          |
|2  |Do not judge a book by its cover.                          |
|3  |Actions speak louder than words.                           |
|4  |A picture is worth a thousand words.                       |
|5  |If at first you do not succeed try try again.              |
|6  |Practice makes perfect.                                    |
|7  |An apple a day keeps the doctor away.                      |
|8  |When the going gets tough the tough get going.             |
|9  |All is fair in love and war.                               |
|10 |Too many cooks spoil the broth.                            |
|11 |You can not make an omelette without breaking eggs.        |
|12 |The early bird catches the worm.                           |
|13 |Better late than never.                                    |
|14 |Honesty is the best policy.                                |
|15 |A penny saved is a penny earned.                           |
|16 |Two wrongs do not make a right.                            |
|17 |The grass is always greener on the other side of the fence.|
|18 |Do not count your chickens before they're hatched.         |
|19 |Laughter is the best medicine.                             |
|20 |Rome wasn't built in a day.                                |
+---+-----------------------------------------------------------+
only showing top 20 rows

Create Tokenizer

#create tokenizer instance.
#import tokenizer
from pyspark.ml.feature import Tokenizer

#mention the column to be tokenized as inputcol
#mention the output column name where the tokens are to be stored.
tokenizer = Tokenizer(inputCol="text", outputCol="words")

textdata = tokenizer.transform(textdata)

Display Tokenized Data

#display the tokenized data
textdata.select("id","words").show(truncate=False)
+---+------------------------------------------------------------------------+
|id |words                                                                   |
+---+------------------------------------------------------------------------+
|1  |[when, in, rome, do, as, the, romans, do.]                              |
|2  |[do, not, judge, a, book, by, its, cover.]                              |
|3  |[actions, speak, louder, than, words.]                                  |
|4  |[a, picture, is, worth, a, thousand, words.]                            |
|5  |[if, at, first, you, do, not, succeed, try, try, again.]                |
|6  |[practice, makes, perfect.]                                             |
|7  |[an, apple, a, day, keeps, the, doctor, away.]                          |
|8  |[when, the, going, gets, tough, the, tough, get, going.]                |
|9  |[all, is, fair, in, love, and, war.]                                    |
|10 |[too, many, cooks, spoil, the, broth.]                                  |
|11 |[you, can, not, make, an, omelette, without, breaking, eggs.]           |
|12 |[the, early, bird, catches, the, worm.]                                 |
|13 |[better, late, than, never.]                                            |
|14 |[honesty, is, the, best, policy.]                                       |
|15 |[a, penny, saved, is, a, penny, earned.]                                |
|16 |[two, wrongs, do, not, make, a, right.]                                 |
|17 |[the, grass, is, always, greener, on, the, other, side, of, the, fence.]|
|18 |[do, not, count, your, chickens, before, they're, hatched.]             |
|19 |[laughter, is, the, best, medicine.]                                    |
|20 |[rome, wasn't, built, in, a, day.]                                      |
+---+------------------------------------------------------------------------+
only showing top 20 rows

CountVectorizer


CountVectorizer is used to convert text into numerical format. It gives the count of each word in a given document.

#import CountVectorizer
from pyspark.ml.feature import CountVectorizer

Create Vectorizer Object

# Create a CountVectorizer object
# mention the column to be count vectorized as inputcol
# mention the output column name where the count vectors are to be stored.
cv = CountVectorizer(inputCol="words", outputCol="features")

Fit Model

# Fit the CountVectorizer model on the input data
model = cv.fit(textdata)

Transform Input to Vectors

# Transform the input data to bag-of-words vectors
result = model.transform(textdata)

Display DF

# display the dataframe
result.select("words","features").show(truncate = False)
+------------------------------------------------------------------------+----------------------------------------------------------------------------+
|words                                                                   |features                                                                    |
+------------------------------------------------------------------------+----------------------------------------------------------------------------+
|[when, in, rome, do, as, the, romans, do.]                              |(99,[0,4,5,6,17,38,78,95],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])                |
|[do, not, judge, a, book, by, its, cover.]                              |(99,[1,3,4,19,21,22,62,70],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])               |
|[actions, speak, louder, than, words.]                                  |(99,[9,16,58,66,89],[1.0,1.0,1.0,1.0,1.0])                                  |
|[a, picture, is, worth, a, thousand, words.]                            |(99,[1,2,16,61,79,83],[2.0,1.0,1.0,1.0,1.0,1.0])                            |
|[if, at, first, you, do, not, succeed, try, try, again.]                |(99,[3,4,11,13,26,40,49,69,75],[1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0])       |
|[practice, makes, perfect.]                                             |(99,[23,24,52],[1.0,1.0,1.0])                                               |
|[an, apple, a, day, keeps, the, doctor, away.]                          |(99,[0,1,7,33,41,46,73,91],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])               |
|[when, the, going, gets, tough, the, tough, get, going.]                |(99,[0,12,14,17,20,60,88],[2.0,2.0,1.0,1.0,1.0,1.0,1.0])                    |
|[all, is, fair, in, love, and, war.]                                    |(99,[2,5,27,36,74,77,97],[1.0,1.0,1.0,1.0,1.0,1.0,1.0])                     |
|[too, many, cooks, spoil, the, broth.]                                  |(99,[0,32,34,35,37,59],[1.0,1.0,1.0,1.0,1.0,1.0])                           |
|[you, can, not, make, an, omelette, without, breaking, eggs.]           |(99,[3,7,13,15,44,55,56,68,93],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])       |
|[the, early, bird, catches, the, worm.]                                 |(99,[0,29,43,50,54],[2.0,1.0,1.0,1.0,1.0])                                  |
|[better, late, than, never.]                                            |(99,[9,31,42,64],[1.0,1.0,1.0,1.0])                                         |
|[honesty, is, the, best, policy.]                                       |(99,[0,2,10,39,98],[1.0,1.0,1.0,1.0,1.0])                                   |
|[a, penny, saved, is, a, penny, earned.]                                |(99,[1,2,8,18,81],[2.0,1.0,2.0,1.0,1.0])                                    |
|[two, wrongs, do, not, make, a, right.]                                 |(99,[1,3,4,15,57,67,84],[1.0,1.0,1.0,1.0,1.0,1.0,1.0])                      |
|[the, grass, is, always, greener, on, the, other, side, of, the, fence.]|(99,[0,2,30,45,53,65,71,85,90,94],[3.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])|
|[do, not, count, your, chickens, before, they're, hatched.]             |(99,[3,4,47,48,51,63,80,96],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])              |
|[laughter, is, the, best, medicine.]                                    |(99,[0,2,10,25,87],[1.0,1.0,1.0,1.0,1.0])                                   |
|[rome, wasn't, built, in, a, day.]                                      |(99,[1,5,6,28,72,82],[1.0,1.0,1.0,1.0,1.0,1.0])                             |
+------------------------------------------------------------------------+----------------------------------------------------------------------------+
only showing top 20 rows

StringIndexer - MPG


StringIndexer converts a column of strings into a column of integers.

Create DF with Sample Data

#import StringIndexer
from pyspark.ml.feature import StringIndexer

Index the Strings

# index the strings in the column "Origin" and store their indexes in the column "OriginIndex"
indexer = StringIndexer(inputCol="Origin", outputCol="OriginIndex")
indexed = indexer.fit(mpgdata).transform(mpgdata)

indexed.orderBy(rand()).show()
+----+---------+-----------+----------+------+----------+----+--------+-----------+
| MPG|Cylinders|Engine Disp|Horsepower|Weight|Accelerate|Year|  Origin|OriginIndex|
+----+---------+-----------+----------+------+----------+----+--------+-----------+
|23.8|        4|      151.0|        85|  2855|      17.6|  78|American|        0.0|
|28.0|        4|       97.0|        92|  2288|      17.0|  72|Japanese|        1.0|
|17.0|        6|      250.0|       100|  3329|      15.5|  71|American|        0.0|
|21.5|        3|       80.0|       110|  2720|      13.5|  77|Japanese|        1.0|
|18.2|        8|      318.0|       135|  3830|      15.2|  79|American|        0.0|
|37.0|        4|      119.0|        92|  2434|      15.0|  80|Japanese|        1.0|
|15.0|        8|      302.0|       130|  4295|      14.9|  77|American|        0.0|
|23.2|        4|      156.0|       105|  2745|      16.7|  78|American|        0.0|
|32.4|        4|      107.0|        72|  2290|      17.0|  80|Japanese|        1.0|
|26.0|        4|       97.0|        46|  1835|      20.5|  70|European|        2.0|
|37.0|        4|       91.0|        68|  2025|      18.2|  82|Japanese|        1.0|
|27.0|        4|       97.0|        88|  2130|      14.5|  71|Japanese|        1.0|
|18.0|        6|      250.0|        88|  3139|      14.5|  71|American|        0.0|
|16.0|        8|      400.0|       230|  4278|       9.5|  73|American|        0.0|
|13.0|        8|      400.0|       170|  4746|      12.0|  71|American|        0.0|
|34.1|        4|       91.0|        68|  1985|      16.0|  81|Japanese|        1.0|
|14.0|        8|      302.0|       137|  4042|      14.5|  73|American|        0.0|
|22.0|        6|      225.0|       100|  3233|      15.4|  76|American|        0.0|
|17.5|        6|      258.0|        95|  3193|      17.8|  76|American|        0.0|
|27.4|        4|      121.0|        80|  2670|      15.0|  79|American|        0.0|
+----+---------+-----------+----------+------+----------+----+--------+-----------+
only showing top 20 rows

StandardScaler


StandardScaler transforms the data so that it has a mean of 0 and a standard deviation of 1

  • Create a single column named “feaures” using the columns “Cylinders”, “Engine Disp”, “Horsepower”, “Weight”

Create DF

#import StandardScaler
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=["Cylinders", "Engine Disp", "Horsepower", "Weight"], outputCol="features")

mpg_transformed_data = assembler.transform(mpgdata)

#show the dataframe
mpg_transformed_data.select("MPG","features").show(truncate = False)

+----+------------------------+
|MPG |features                |
+----+------------------------+
|15.0|[8.0,390.0,190.0,3850.0]|
|21.0|[6.0,199.0,90.0,2648.0] |
|18.0|[6.0,199.0,97.0,2774.0] |
|16.0|[8.0,304.0,150.0,3433.0]|
|14.0|[8.0,455.0,225.0,3086.0]|
|15.0|[8.0,350.0,165.0,3693.0]|
|18.0|[8.0,307.0,130.0,3504.0]|
|14.0|[8.0,454.0,220.0,4354.0]|
|15.0|[8.0,400.0,150.0,3761.0]|
|10.0|[8.0,307.0,200.0,4376.0]|
|15.0|[8.0,383.0,170.0,3563.0]|
|11.0|[8.0,318.0,210.0,4382.0]|
|10.0|[8.0,360.0,215.0,4615.0]|
|15.0|[8.0,429.0,198.0,4341.0]|
|21.0|[6.0,200.0,85.0,2587.0] |
|17.0|[8.0,302.0,140.0,3449.0]|
|9.0 |[8.0,304.0,193.0,4732.0]|
|14.0|[8.0,340.0,160.0,3609.0]|
|22.0|[6.0,198.0,95.0,2833.0] |
|14.0|[8.0,440.0,215.0,4312.0]|
+----+------------------------+
only showing top 20 rows

Scale with StandardScaler

Use StandardScaler to scale the “features” column of the dataframe “mpg_transformed_data” and save the scaled data into the “scaledFeatures” column.

from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=True)

scalerModel = scaler.fit(mpg_transformed_data)

scaledData = scalerModel.transform(mpg_transformed_data)

Show Scaled Data

# Show the scaled data
scaledData.select("features","scaledFeatures").show(truncate = False)
+------------------------+-----------------------------------------------------------------------------------+
|features                |scaledFeatures                                                                     |
+------------------------+-----------------------------------------------------------------------------------+
|[8.0,390.0,190.0,3850.0]|[1.48205302652896,1.869079955831451,2.222084561602166,1.027093462353608]           |
|[6.0,199.0,90.0,2648.0] |[0.3095711165403583,0.043843985634147174,-0.37591456792553746,-0.38801882543985255]|
|[6.0,199.0,97.0,2774.0] |[0.3095711165403583,0.043843985634147174,-0.1940546288585982,-0.2396792678175763]  |
|[8.0,304.0,150.0,3433.0]|[1.48205302652896,1.0472459587792617,1.1828849097910845,0.5361601645084557]        |
|[8.0,455.0,225.0,3086.0]|[1.48205302652896,2.4902335582546176,3.131384256936862,0.12763773200901246]        |
|[8.0,350.0,165.0,3693.0]|[1.48205302652896,1.4868315851095026,1.57258477922024,0.8422576643639463]          |
|[8.0,307.0,130.0,3504.0]|[1.48205302652896,1.0759145865834079,0.6632850838855439,0.619748327930532]         |
|[8.0,454.0,220.0,4354.0]|[1.48205302652896,2.480677348986569,3.001484300460477,1.6204516928427128]          |
|[8.0,400.0,150.0,3761.0]|[1.48205302652896,1.964642048511938,1.1828849097910845,0.9223139335569208]         |
|[8.0,307.0,200.0,4376.0]|[1.48205302652896,1.0759145865834079,2.481884474554936,1.646352250522793]          |
|[8.0,383.0,170.0,3563.0]|[1.48205302652896,1.80218649095511,1.7024847356966253,0.689208914436201]           |
|[8.0,318.0,210.0,4382.0]|[1.48205302652896,1.1810328885319439,2.7416843875077066,1.6534160389809966]        |
|[8.0,360.0,215.0,4615.0]|[1.48205302652896,1.5823936777899896,2.871584343984092,1.9277264907745708]         |
|[8.0,429.0,198.0,4341.0]|[1.48205302652896,2.241772117285351,2.4299244919643823,1.6051468178499384]         |
|[6.0,200.0,85.0,2587.0] |[0.3095711165403583,0.053400194902195885,-0.5058145244019226,-0.4598340080982561]  |
|[8.0,302.0,140.0,3449.0]|[1.48205302652896,1.0281335402431644,0.9230849968383142,0.5549969337303321]        |
|[8.0,304.0,193.0,4732.0]|[1.48205302652896,1.0472459587792617,2.300024535487997,2.0654703657095417]         |
|[8.0,340.0,160.0,3609.0]|[1.48205302652896,1.3912694924290154,1.442684822743855,0.7433646259490956]         |
|[6.0,198.0,95.0,2833.0] |[0.3095711165403583,0.03428777636609846,-0.24601461144915227,-0.17021868131190726] |
|[8.0,440.0,215.0,4312.0]|[1.48205302652896,2.3468904192338864,2.871584343984092,1.5710051736352875]         |
+------------------------+-----------------------------------------------------------------------------------+
only showing top 20 rows

Stop Spark

stop.spark()