# To suppress warnings generated by the code
def warn(*args, **kwargs):
pass
import warnings
= warn
warnings.warn 'ignore') warnings.filterwarnings(
ETL with Spark WordTokenizer
Objectives
- Use the feature extractor CountVectorizer
- Use the feature extractor TF-IDF
- Use the feature transformer Tokenizer
- Use the feature transformer StopWordsRemover
- Use the feature transformer StringIndexer
- Use the feature transformer StandardScaler
Data
- Modified version of car mileage dataset. Original dataset available at https://archive.ics.uci.edu/ml/datasets/auto+mpg
Setup
We will be using the following libraries:
PySpark
for connecting to the Spark Cluster
Suppress Warnings
To suppress warnings generated by our code, we’ll use this code block
Import Libraries
# FindSpark simplifies the process of using Apache Spark with Python
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import rand
Create Spark Session
#Create SparkSession
#Ignore any warnings by SparkSession command
= SparkSession.builder.appName("Feature ETL with Spark").getOrCreate() spark
Tokenizer
A tokenizer is used to break a sentence into words
#import tokenizer
from pyspark.ml.feature import Tokenizer
Create DF
#create a sample dataframe
= spark.createDataFrame([
sentenceDataFrame 1, "Spark is a distributed computing system."),
(2, "It provides interfaces for multiple languages"),
(3, "Spark is built on top of Hadoop")
("id", "sentence"]) ], [
View DF
#display the dataframe
= False)
sentenceDataFrame.show(truncate +---+---------------------------------------------+
|id |sentence |
+---+---------------------------------------------+
|1 |Spark is a distributed computing system. |
|2 |It provides interfaces for multiple languages|
|3 |Spark is built on top of Hadoop |
+---+---------------------------------------------+
Create Tokenizer
#create tokenizer instance.
#mention the column to be tokenized as inputcol
#mention the output column name where the tokens are to be stored.
= Tokenizer(inputCol="sentence", outputCol="words") tokenizer
# Tokenize
= tokenizer.transform(sentenceDataFrame) token_df
Display Tokenized Data
#display the tokenized data
=False)
token_df.show(truncate+---+---------------------------------------------+----------------------------------------------------+
|id |sentence |words |
+---+---------------------------------------------+----------------------------------------------------+
|1 |Spark is a distributed computing system. |[spark, is, a, distributed, computing, system.] |
|2 |It provides interfaces for multiple languages|[it, provides, interfaces, for, multiple, languages]|
|3 |Spark is built on top of Hadoop |[spark, is, built, on, top, of, hadoop] |
+---+---------------------------------------------+----------------------------------------------------+
CountVectorizer
CountVectorizer is used to convert text into numerical format. It gives the count of each word in a given document.
#import CountVectorizer
from pyspark.ml.feature import CountVectorizer
Create DF
#create a sample dataframe and display it.
= [(1, "I love Spark Spark provides Python API ".split()),
textdata 2, "I love Python Spark supports Python".split()),
(3, "Spark solves the big problem of big data".split())]
(
= spark.createDataFrame(textdata, ["id", "words"])
textdata
=False)
textdata.show(truncate+---+-------------------------------------------------+
|id |words |
+---+-------------------------------------------------+
|1 |[I, love, Spark, Spark, provides, Python, API] |
|2 |[I, love, Python, Spark, supports, Python] |
|3 |[Spark, solves, the, big, problem, of, big, data]|
+---+-------------------------------------------------+
Create Vectorizer Object
# Create a CountVectorizer object
# mention the column to be count vectorized as inputcol
# mention the output column name where the count vectors are to be stored.
= CountVectorizer(inputCol="words", outputCol="features") cv
Fit Model
# Fit the CountVectorizer model on the input data
= cv.fit(textdata) model
Transform Input to Vectors
# Transform the input data to bag-of-words vectors
= model.transform(textdata) result
Display DF
# display the dataframe
=False)
result.show(truncate+---+-------------------------------------------------+----------------------------------------------------+
|id |words |features |
+---+-------------------------------------------------+----------------------------------------------------+
|1 |[I, love, Spark, Spark, provides, Python, API] |(13,[0,1,2,4,5,7],[2.0,1.0,1.0,1.0,1.0,1.0]) |
|2 |[I, love, Python, Spark, supports, Python] |(13,[0,1,2,4,10],[1.0,2.0,1.0,1.0,1.0]) |
|3 |[Spark, solves, the, big, problem, of, big, data]|(13,[0,3,6,8,9,11,12],[1.0,2.0,1.0,1.0,1.0,1.0,1.0])|
+---+-------------------------------------------------+----------------------------------------------------+
TF-IDF
Term Frequency-Inverse Document Frequency is used to quantify the importance of a word in a document. TF-IDF is computed by multiplying the number of times a word occurs in a document by the inverse document frequency of the word.
#import necessary classes for TF-IDF calculation
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
Create DF
#create a sample dataframe and display it.
= spark.createDataFrame([
sentenceData 1, "Spark supports python"),
(2, "Spark is fast"),
(3, "Spark is easy")
("id", "sentence"])
], [
= False)
sentenceData.show(truncate +---+---------------------+
|id |sentence |
+---+---------------------+
|1 |Spark supports python|
|2 |Spark is fast |
|3 |Spark is easy |
+---+---------------------+
Tokenize the Sentence
#tokenize the "sentence" column and store in the column "words"
= Tokenizer(inputCol="sentence", outputCol="words")
tokenizer = tokenizer.transform(sentenceData)
wordsData = False)
wordsData.show(truncate +---+---------------------+-------------------------+
|id |sentence |words |
+---+---------------------+-------------------------+
|1 |Spark supports python|[spark, supports, python]|
|2 |Spark is fast |[spark, is, fast] |
|3 |Spark is easy |[spark, is, easy] |
+---+---------------------+-------------------------+
Create HashingTF Object
# Create a HashingTF object
# mention the "words" column as input
# mention the "rawFeatures" column as output
= HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=10)
hashingTF = hashingTF.transform(wordsData)
featurizedData
= False)
featurizedData.show(truncate +---+---------------------+-------------------------+--------------------------+
|id |sentence |words |rawFeatures |
+---+---------------------+-------------------------+--------------------------+
|1 |Spark supports python|[spark, supports, python]|(10,[4,5,9],[1.0,1.0,1.0])|
|2 |Spark is fast |[spark, is, fast] |(10,[1,3,5],[1.0,1.0,1.0])|
|3 |Spark is easy |[spark, is, easy] |(10,[0,1,5],[1.0,1.0,1.0])|
+---+---------------------+-------------------------+--------------------------+
Create IDF Object
# Create an IDF object
# mention the "rawFeatures" column as input
# mention the "features" column as output
= IDF(inputCol="rawFeatures", outputCol="features")
idf = idf.fit(featurizedData)
idfModel = idfModel.transform(featurizedData) tfidfData
Display IDF
#display the tf-idf data
"sentence", "features").show(truncate=False)
tfidfData.select(+---------------------+---------------------------------------------------------+
|sentence |features |
+---------------------+---------------------------------------------------------+
|Spark supports python|(10,[4,5,9],[0.6931471805599453,0.0,0.6931471805599453]) |
|Spark is fast |(10,[1,3,5],[0.28768207245178085,0.6931471805599453,0.0])|
|Spark is easy |(10,[0,1,5],[0.6931471805599453,0.28768207245178085,0.0])|
+---------------------+---------------------------------------------------------+
StopWordsRemover
StopWordsRemover is a transformer that filters out stop words like “a”,“an” and “the”.
Create DF
#import StopWordsRemover
from pyspark.ml.feature import StopWordsRemover
#create a dataframe with sample text and display it
= spark.createDataFrame([
textData 1, ['Spark', 'is', 'an', 'open-source', 'distributed', 'computing', 'system']),
(2, ['IT', 'has', 'interfaces', 'for', 'multiple', 'languages']),
(3, ['It', 'has', 'a', 'wide', 'range', 'of', 'libraries', 'and', 'APIs'])
("id", "sentence"])
], [
= False)
textData.show(truncate +---+------------------------------------------------------------+
|id |sentence |
+---+------------------------------------------------------------+
|1 |[Spark, is, an, open-source, distributed, computing, system]|
|2 |[IT, has, interfaces, for, multiple, languages] |
|3 |[It, has, a, wide, range, of, libraries, and, APIs] |
+---+------------------------------------------------------------+
Remove Stopwords
# remove stopwords from "sentence" column and store the result in "filtered_sentence" column
= StopWordsRemover(inputCol="sentence", outputCol="filtered_sentence")
remover = remover.transform(textData)
textData
# display the dataframe
= False)
textData.show(truncate +---+------------------------------------------------------------+----------------------------------------------------+
|id |sentence |filtered_sentence |
+---+------------------------------------------------------------+----------------------------------------------------+
|1 |[Spark, is, an, open-source, distributed, computing, system]|[Spark, open-source, distributed, computing, system]|
|2 |[IT, has, interfaces, for, multiple, languages] |[interfaces, multiple, languages] |
|3 |[It, has, a, wide, range, of, libraries, and, APIs] |[wide, range, libraries, APIs] |
+---+------------------------------------------------------------+----------------------------------------------------+
StringIndexer
StringIndexer converts a column of strings into a column of integers.
Create DF with Sample Data
#import StringIndexer
from pyspark.ml.feature import StringIndexer
#create a dataframe with sample text and display it
= spark.createDataFrame(
colors 0, "red"), (1, "red"), (2, "blue"), (3, "yellow" ), (4, "yellow"), (5, "yellow")],
[("id", "color"])
[
colors.show()+---+------+
| id| color|
+---+------+
| 0| red|
| 1| red|
| 2| blue|
| 3|yellow|
| 4|yellow|
| 5|yellow|
+---+------+
Index the Strings
# index the strings in the column "color" and store their indexes in the column "colorIndex"
= StringIndexer(inputCol="color", outputCol="colorIndex")
indexer = indexer.fit(colors).transform(colors)
indexed
indexed.show()+---+------+----------+
| id| color|colorIndex|
+---+------+----------+
| 0| red| 1.0|
| 1| red| 1.0|
| 2| blue| 2.0|
| 3|yellow| 0.0|
| 4|yellow| 0.0|
| 5|yellow| 0.0|
+---+------+----------+
StandardScaler
StandardScaler transforms the data so that it has a mean of 0 and a standard deviation of 1
Create DF
#import StandardScaler
from pyspark.ml.feature import StandardScaler
# Create a sample dataframe and display it
from pyspark.ml.linalg import Vectors
= [(1, Vectors.dense([70, 170, 17])),
data 2, Vectors.dense([80, 165, 25])),
(3, Vectors.dense([65, 150, 135]))]
(= spark.createDataFrame(data, ["id", "features"])
df
df.show()+---+------------------+
| id| features|
+---+------------------+
| 1| [70.0,170.0,17.0]|
| 2| [80.0,165.0,25.0]|
| 3|[65.0,150.0,135.0]|
+---+------------------+
Define Transformer
# Define the StandardScaler transformer
= StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=True) scaler
Fit Transformer to Dataset
# Fit the transformer to the dataset
= scaler.fit(df) scalerModel
Scale Data
# Scale the data
= scalerModel.transform(df)
scaledData
# Show the scaled data
= False)
scaledData.show(truncate +---+------------------+------------------------------------------------------------+
|id |features |scaledFeatures |
+---+------------------+------------------------------------------------------------+
|1 |[70.0,170.0,17.0] |[-0.218217890235993,0.8006407690254366,-0.6369487984517485] |
|2 |[80.0,165.0,25.0] |[1.0910894511799611,0.32025630761017515,-0.5156252177942725]|
|3 |[65.0,150.0,135.0]|[-0.8728715609439701,-1.120897076635609,1.152574016246021] |
+---+------------------+------------------------------------------------------------+
Stop Spark
stop.spark()