Import Libraries

ETL with Spark WordTokenizer

Objectives


  1. Use the feature extractor CountVectorizer
  2. Use the feature extractor TF-IDF
  3. Use the feature transformer Tokenizer
  4. Use the feature transformer StopWordsRemover
  5. Use the feature transformer StringIndexer
  6. Use the feature transformer StandardScaler

Data


Setup


We will be using the following libraries:

  • PySpark for connecting to the Spark Cluster

Suppress Warnings

To suppress warnings generated by our code, we’ll use this code block

# To suppress warnings generated by the code
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')
# FindSpark simplifies the process of using Apache Spark with Python
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import rand

Create Spark Session


#Create SparkSession
#Ignore any warnings by SparkSession command

spark = SparkSession.builder.appName("Feature ETL with Spark").getOrCreate()

Tokenizer


A tokenizer is used to break a sentence into words

#import tokenizer
from pyspark.ml.feature import Tokenizer

Create DF

#create a sample dataframe
sentenceDataFrame = spark.createDataFrame([
    (1, "Spark is a distributed computing system."),
    (2, "It provides interfaces for multiple languages"),
    (3, "Spark is built on top of Hadoop")
], ["id", "sentence"])

View DF

#display the dataframe
sentenceDataFrame.show(truncate = False)
+---+---------------------------------------------+
|id |sentence                                     |
+---+---------------------------------------------+
|1  |Spark is a distributed computing system.     |
|2  |It provides interfaces for multiple languages|
|3  |Spark is built on top of Hadoop              |
+---+---------------------------------------------+

Create Tokenizer

#create tokenizer instance.
#mention the column to be tokenized as inputcol
#mention the output column name where the tokens are to be stored.
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
# Tokenize
token_df = tokenizer.transform(sentenceDataFrame)

Display Tokenized Data

#display the tokenized data
token_df.show(truncate=False)
+---+---------------------------------------------+----------------------------------------------------+
|id |sentence                                     |words                                               |
+---+---------------------------------------------+----------------------------------------------------+
|1  |Spark is a distributed computing system.     |[spark, is, a, distributed, computing, system.]     |
|2  |It provides interfaces for multiple languages|[it, provides, interfaces, for, multiple, languages]|
|3  |Spark is built on top of Hadoop              |[spark, is, built, on, top, of, hadoop]             |
+---+---------------------------------------------+----------------------------------------------------+

CountVectorizer


CountVectorizer is used to convert text into numerical format. It gives the count of each word in a given document.

#import CountVectorizer
from pyspark.ml.feature import CountVectorizer

Create DF

#create a sample dataframe and display it.
textdata = [(1, "I love Spark Spark provides Python API ".split()),
            (2, "I love Python Spark supports Python".split()),
            (3, "Spark solves the big problem of big data".split())]

textdata = spark.createDataFrame(textdata, ["id", "words"])

textdata.show(truncate=False)
+---+-------------------------------------------------+
|id |words                                            |
+---+-------------------------------------------------+
|1  |[I, love, Spark, Spark, provides, Python, API]   |
|2  |[I, love, Python, Spark, supports, Python]       |
|3  |[Spark, solves, the, big, problem, of, big, data]|
+---+-------------------------------------------------+

Create Vectorizer Object

# Create a CountVectorizer object
# mention the column to be count vectorized as inputcol
# mention the output column name where the count vectors are to be stored.
cv = CountVectorizer(inputCol="words", outputCol="features")

Fit Model

# Fit the CountVectorizer model on the input data
model = cv.fit(textdata)

Transform Input to Vectors

# Transform the input data to bag-of-words vectors
result = model.transform(textdata)

Display DF

# display the dataframe
result.show(truncate=False)
+---+-------------------------------------------------+----------------------------------------------------+
|id |words                                            |features                                            |
+---+-------------------------------------------------+----------------------------------------------------+
|1  |[I, love, Spark, Spark, provides, Python, API]   |(13,[0,1,2,4,5,7],[2.0,1.0,1.0,1.0,1.0,1.0])        |
|2  |[I, love, Python, Spark, supports, Python]       |(13,[0,1,2,4,10],[1.0,2.0,1.0,1.0,1.0])             |
|3  |[Spark, solves, the, big, problem, of, big, data]|(13,[0,3,6,8,9,11,12],[1.0,2.0,1.0,1.0,1.0,1.0,1.0])|
+---+-------------------------------------------------+----------------------------------------------------+

TF-IDF


Term Frequency-Inverse Document Frequency is used to quantify the importance of a word in a document. TF-IDF is computed by multiplying the number of times a word occurs in a document by the inverse document frequency of the word.

#import necessary classes for TF-IDF calculation
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

Create DF

#create a sample dataframe and display it.
sentenceData = spark.createDataFrame([
        (1, "Spark supports python"),
        (2, "Spark is fast"),
        (3, "Spark is easy")
    ], ["id", "sentence"])

sentenceData.show(truncate = False)
+---+---------------------+
|id |sentence             |
+---+---------------------+
|1  |Spark supports python|
|2  |Spark is fast        |
|3  |Spark is easy        |
+---+---------------------+

Tokenize the Sentence

#tokenize the "sentence" column and store in the column "words"
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)
wordsData.show(truncate = False)
+---+---------------------+-------------------------+
|id |sentence             |words                    |
+---+---------------------+-------------------------+
|1  |Spark supports python|[spark, supports, python]|
|2  |Spark is fast        |[spark, is, fast]        |
|3  |Spark is easy        |[spark, is, easy]        |
+---+---------------------+-------------------------+

Create HashingTF Object

# Create a HashingTF object
# mention the "words" column as input
# mention the "rawFeatures" column as output

hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=10)
featurizedData = hashingTF.transform(wordsData)

featurizedData.show(truncate = False)
+---+---------------------+-------------------------+--------------------------+
|id |sentence             |words                    |rawFeatures               |
+---+---------------------+-------------------------+--------------------------+
|1  |Spark supports python|[spark, supports, python]|(10,[4,5,9],[1.0,1.0,1.0])|
|2  |Spark is fast        |[spark, is, fast]        |(10,[1,3,5],[1.0,1.0,1.0])|
|3  |Spark is easy        |[spark, is, easy]        |(10,[0,1,5],[1.0,1.0,1.0])|
+---+---------------------+-------------------------+--------------------------+

Create IDF Object

# Create an IDF object
# mention the "rawFeatures" column as input
# mention the "features" column as output

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
tfidfData = idfModel.transform(featurizedData)

Display IDF

#display the tf-idf data
tfidfData.select("sentence", "features").show(truncate=False)
+---------------------+---------------------------------------------------------+
|sentence             |features                                                 |
+---------------------+---------------------------------------------------------+
|Spark supports python|(10,[4,5,9],[0.6931471805599453,0.0,0.6931471805599453]) |
|Spark is fast        |(10,[1,3,5],[0.28768207245178085,0.6931471805599453,0.0])|
|Spark is easy        |(10,[0,1,5],[0.6931471805599453,0.28768207245178085,0.0])|
+---------------------+---------------------------------------------------------+

StopWordsRemover


StopWordsRemover is a transformer that filters out stop words like “a”,“an” and “the”.

Create DF

#import StopWordsRemover
from pyspark.ml.feature import StopWordsRemover

#create a dataframe with sample text and display it
textData = spark.createDataFrame([
    (1, ['Spark', 'is', 'an', 'open-source', 'distributed', 'computing', 'system']),
    (2, ['IT', 'has', 'interfaces', 'for', 'multiple', 'languages']),
    (3, ['It', 'has', 'a', 'wide', 'range', 'of', 'libraries', 'and', 'APIs'])
], ["id", "sentence"])

textData.show(truncate = False)
+---+------------------------------------------------------------+
|id |sentence                                                    |
+---+------------------------------------------------------------+
|1  |[Spark, is, an, open-source, distributed, computing, system]|
|2  |[IT, has, interfaces, for, multiple, languages]             |
|3  |[It, has, a, wide, range, of, libraries, and, APIs]         |
+---+------------------------------------------------------------+

Remove Stopwords

# remove stopwords from "sentence" column and store the result in "filtered_sentence" column
remover = StopWordsRemover(inputCol="sentence", outputCol="filtered_sentence")
textData = remover.transform(textData)

# display the dataframe
textData.show(truncate = False)
+---+------------------------------------------------------------+----------------------------------------------------+
|id |sentence                                                    |filtered_sentence                                   |
+---+------------------------------------------------------------+----------------------------------------------------+
|1  |[Spark, is, an, open-source, distributed, computing, system]|[Spark, open-source, distributed, computing, system]|
|2  |[IT, has, interfaces, for, multiple, languages]             |[interfaces, multiple, languages]                   |
|3  |[It, has, a, wide, range, of, libraries, and, APIs]         |[wide, range, libraries, APIs]                      |
+---+------------------------------------------------------------+----------------------------------------------------+

StringIndexer


StringIndexer converts a column of strings into a column of integers.

Create DF with Sample Data

#import StringIndexer
from pyspark.ml.feature import StringIndexer

#create a dataframe with sample text and display it
colors = spark.createDataFrame(
    [(0, "red"), (1, "red"), (2, "blue"), (3, "yellow" ), (4, "yellow"), (5, "yellow")],
    ["id", "color"])

colors.show()
+---+------+
| id| color|
+---+------+
|  0|   red|
|  1|   red|
|  2|  blue|
|  3|yellow|
|  4|yellow|
|  5|yellow|
+---+------+

Index the Strings

# index the strings in the column "color" and store their indexes in the column "colorIndex"
indexer = StringIndexer(inputCol="color", outputCol="colorIndex")
indexed = indexer.fit(colors).transform(colors)

indexed.show()
+---+------+----------+
| id| color|colorIndex|
+---+------+----------+
|  0|   red|       1.0|
|  1|   red|       1.0|
|  2|  blue|       2.0|
|  3|yellow|       0.0|
|  4|yellow|       0.0|
|  5|yellow|       0.0|
+---+------+----------+

StandardScaler


StandardScaler transforms the data so that it has a mean of 0 and a standard deviation of 1

Create DF

#import StandardScaler
from pyspark.ml.feature import StandardScaler

# Create a sample dataframe and display it
from pyspark.ml.linalg import Vectors
data = [(1, Vectors.dense([70, 170, 17])),
        (2, Vectors.dense([80, 165, 25])),
        (3, Vectors.dense([65, 150, 135]))]
df = spark.createDataFrame(data, ["id", "features"])

df.show()
+---+------------------+
| id|          features|
+---+------------------+
|  1| [70.0,170.0,17.0]|
|  2| [80.0,165.0,25.0]|
|  3|[65.0,150.0,135.0]|
+---+------------------+

Define Transformer

# Define the StandardScaler transformer
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=True)

Fit Transformer to Dataset

# Fit the transformer to the dataset
scalerModel = scaler.fit(df)

Scale Data

# Scale the data
scaledData = scalerModel.transform(df)

# Show the scaled data
scaledData.show(truncate = False)
+---+------------------+------------------------------------------------------------+
|id |features          |scaledFeatures                                              |
+---+------------------+------------------------------------------------------------+
|1  |[70.0,170.0,17.0] |[-0.218217890235993,0.8006407690254366,-0.6369487984517485] |
|2  |[80.0,165.0,25.0] |[1.0910894511799611,0.32025630761017515,-0.5156252177942725]|
|3  |[65.0,150.0,135.0]|[-0.8728715609439701,-1.120897076635609,1.152574016246021]  |
+---+------------------+------------------------------------------------------------+

Stop Spark

stop.spark()