Import Libraries

ETL with Spark Raw to File

Objectives


  1. Create a Spark Dataframe from the raw data and write to CSV file.
  2. Read from a csv file and write to parquet file
  3. Condense PARQUET to a single file.
  4. Read from a parquet file and write to csv file
  5. Create Spark Dataframe from the csv file
  6. Transform the data
  7. Load the data into a parquet file

Setup


We will be using the following libraries:

  • PySpark for connecting to the Spark Cluster

Suppress Warnings

To suppress warnings generated by our code, we’ll use this code block

# To suppress warnings generated by the code
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')
# FindSpark simplifies the process of using Apache Spark with Python
import findspark
findspark.init()

from pyspark.sql import SparkSession

Start Spark Session


#Create SparkSession
#Ignore any warnings by SparkSession command

spark = SparkSession.builder.appName("ETL with Spark").getOrCreate()

Create DF from Raw Data


Create list of Raw Data

#create a list of tuples
#each tuple contains the student id, height and weight
data = [("student1",64,90),
        ("student2",59,100),
        ("student3",69,95),
        ("",70,110),
        ("student5",60,80),
        ("student3",69,95),
        ("student6",62,85),
        ("student7",65,80),
        ("student7",65,80)]

# some rows are intentionally duplicated

Create DF from list

#create a dataframe using createDataFrame and pass the data and the column names.
df = spark.createDataFrame(data, ["student","height_inches","weight_pounds"])
df.show()
+--------+-------------+-------------+
| student|height_inches|weight_pounds|
+--------+-------------+-------------+
|student1|           64|           90|
|student2|           59|          100|
|student3|           69|           95|
|        |           70|          110|
|student5|           60|           80|
|student3|           69|           95|
|student6|           62|           85|
|student7|           65|           80|
|student7|           65|           80|
+--------+-------------+-------------+

Write DF to CSV

Note: In Apache Spark, when you use the write method to save a DataFrame to a CSV file, it indeed creates a directory rather than a single file. This is because Spark is designed to run in a distributed manner across multiple nodes, and it saves the output as multiple part files within a directory.The csv file is within the directory.

  • Use overwrite .mode(“overwrite”) if not just omit
#If you do not wish to over write use df.write.csv("student-hw.csv", header=True)
df.write.mode("overwrite").csv("student-hw.csv", header=True)

Read CSV Write


  • Now that the data has been written in csv file using Spark, let’s created a SparkDF by reading the file into df
# Load student dataset
df = spark.read.csv("student-hw.csv", header=True, inferSchema=True)

# display dataframe
df.show()
+--------+-------------+-------------+
| student|height_inches|weight_pounds|
+--------+-------------+-------------+
|student7|           65|           80|
|student7|           65|           80|
|student2|           59|          100|
|student1|           64|           90|
|student3|           69|           95|
|student5|           60|           80|
|student3|           69|           95|
|student6|           62|           85|
|    null|           70|          110|
+--------+-------------+-------------+

Count rown #

df.count()
9

Drop Duplicates

df = df.dropDuplicates()
df.count()
7

df.show()
+--------+-------------+-------------+
| student|height_inches|weight_pounds|
+--------+-------------+-------------+
|student6|           62|           85|
|student3|           69|           95|
|student2|           59|          100|
|student7|           65|           80|
|    null|           70|          110|
|student1|           64|           90|
|student5|           60|           80|
+--------+-------------+-------------+

Drop NA

df = df.dropna()
df.count()
6

df.show()
+--------+-------------+-------------+
| student|height_inches|weight_pounds|
+--------+-------------+-------------+
|student6|           62|           85|
|student3|           69|           95|
|student2|           59|          100|
|student7|           65|           80|
|student1|           64|           90|
|student5|           60|           80|
+--------+-------------+-------------+

Save to Paquet


Save to parquet:

Notice that there are a lot of .parquet files in the output. To improve parallellism, spark stores each dataframe in multiple partitions.When the data is saved as parquet file, each partition is saved as a separate file.

  • if you do not wish to overwrite use the command df.write.parquet("student-hw.parquet")
#Write the data to a Parquet file
df.write.mode("overwrite").parquet("student-hw.parquet")

Verify Parquet Write

ls -l student-hw.parquet
['total 7',
 '-rw-r--r-- 1 jupyterlab resources   0 Nov 10 15:52 _SUCCESS',
 '-rw-r--r-- 1 jupyterlab resources 467 Nov 10 15:52 part-00000-cad814c5-e053-4cbf-868a-a59bf921d852-c000.snappy.parquet',
 '-rw-r--r-- 1 jupyterlab resources 911 Nov 10 15:52 part-00003-cad814c5-e053-4cbf-868a-a59bf921d852-c000.snappy.parquet',
 '-rw-r--r-- 1 jupyterlab resources 911 Nov 10 15:52 part-00010-cad814c5-e053-4cbf-868a-a59bf921d852-c000.snappy.parquet',
 '-rw-r--r-- 1 jupyterlab resources 911 Nov 10 15:52 part-00054-cad814c5-e053-4cbf-868a-a59bf921d852-c000.snappy.parquet',
 '-rw-r--r-- 1 jupyterlab resources 911 Nov 10 15:52 part-00132-cad814c5-e053-4cbf-868a-a59bf921d852-c000.snappy.parquet',
 '-rw-r--r-- 1 jupyterlab resources 911 Nov 10 15:52 part-00172-cad814c5-e053-4cbf-868a-a59bf921d852-c000.snappy.parquet',
 '-rw-r--r-- 1 jupyterlab resources 911 Nov 10 15:52 part-00186-cad814c5-e053-4cbf-868a-a59bf921d852-c000.snappy.parquet']

Repartition DF

  • Reduce the number of partitions in the dataframe to one
df = df.repartition(1)

ReWrite to Parquet

#Write the data to a Parquet file
df.write.mode("overwrite").parquet("student-hw-single.parquet")

Verify Parquet Write

ls -l student-hw-single.parquet

total 2
-rw-r--r-- 1 jupyterlab resources   0 Nov 10 15:56 _SUCCESS
-rw-r--r-- 1 jupyterlab resources 944 Nov 10 15:56 part-00000-fbd0c002-94a6-4230-878e-250499c659e5-c000.snappy.parquet

Load Parquet to Spark DF


df = spark.read.parquet("student-hw-single.parquet")
df.show()
+--------+-------------+-------------+
| student|height_inches|weight_pounds|
+--------+-------------+-------------+
|student6|           62|           85|
|student3|           69|           95|
|student2|           59|          100|
|student7|           65|           80|
|student1|           64|           90|
|student5|           60|           80|
+--------+-------------+-------------+

Transform


#import the expr function that helps in transforming the data
from pyspark.sql.functions import expr

Convert inches to cent

# Convert inches to centimeters
# Multiply the column height_inches with 2.54 to get a new column height_centimeters
df = df.withColumn("height_centimeters", expr("height_inches * 2.54"))
df.show()
+--------+-------------+-------------+------------------+
| student|height_inches|weight_pounds|height_centimeters|
+--------+-------------+-------------+------------------+
|student6|           62|           85|            157.48|
|student3|           69|           95|            175.26|
|student2|           59|          100|            149.86|
|student7|           65|           80|            165.10|
|student1|           64|           90|            162.56|
|student5|           60|           80|            152.40|
+--------+-------------+-------------+------------------+

Convert # to kg

# Convert pounds to kilograms
# Multiply weight_pounds with 0.453592 to get a new column weight_kg
df = df.withColumn("weight_kg", expr("weight_pounds * 0.453592"))
df.show()
+--------+-------------+-------------+------------------+---------+
| student|height_inches|weight_pounds|height_centimeters|weight_kg|
+--------+-------------+-------------+------------------+---------+
|student6|           62|           85|            157.48|38.555320|
|student3|           69|           95|            175.26|43.091240|
|student2|           59|          100|            149.86|45.359200|
|student7|           65|           80|            165.10|36.287360|
|student1|           64|           90|            162.56|40.823280|
|student5|           60|           80|            152.40|36.287360|
+--------+-------------+-------------+------------------+---------+

Drop Columns

# drop the columns "height_inches","weight_pounds"
df = df.drop("height_inches","weight_pounds")
df.show()
+--------+------------------+---------+
| student|height_centimeters|weight_kg|
+--------+------------------+---------+
|student6|            157.48|38.555320|
|student3|            175.26|43.091240|
|student2|            149.86|45.359200|
|student7|            165.10|36.287360|
|student1|            162.56|40.823280|
|student5|            152.40|36.287360|
+--------+------------------+---------+

Rename Columns

# rename the lengthy column name "height_centimeters" to "height_cm"
df = df.withColumnRenamed("height_centimeters","height_cm")
df.show()
+--------+---------+---------+
| student|height_cm|weight_kg|
+--------+---------+---------+
|student6|   157.48|38.555320|
|student3|   175.26|43.091240|
|student2|   149.86|45.359200|
|student7|   165.10|36.287360|
|student1|   162.56|40.823280|
|student5|   152.40|36.287360|
+--------+---------+---------+

Save to CSV

df.write.mode("overwrite").csv("student_transformed.csv", header=True)

Verify Save

# Load student dataset
df = spark.read.csv("student_transformed.csv", header=True, inferSchema=True)
# display dataframe
df.show()
+--------+---------+---------+
| student|height_cm|weight_kg|
+--------+---------+---------+
|student6|   157.48| 38.55532|
|student3|   175.26| 43.09124|
|student2|   149.86|  45.3592|
|student7|    165.1| 36.28736|
|student1|   162.56| 40.82328|
|student5|    152.4| 36.28736|
+--------+---------+---------+

Extract CSV to SparkDF


Let’s backup and load the data from the csv file we saved earlier student_transformed.csv

# Load student dataset
df = spark.read.csv("student_transformed.csv", header=True, inferSchema=True)
# display dataframe
df.show()
+--------+---------+---------+
| student|height_cm|weight_kg|
+--------+---------+---------+
|student6|   157.48| 38.55532|
|student3|   175.26| 43.09124|
|student2|   149.86|  45.3592|
|student7|    165.1| 36.28736|
|student1|   162.56| 40.82328|
|student5|    152.4| 36.28736|
+--------+---------+---------+

Transform


Cm to m

# Divide the column height_cm by 100 a new column height_cm
df = df.withColumn("height_meters", expr("height_cm / 100"))
# display dataframe
df.show()
+--------+---------+---------+------------------+
| student|height_cm|weight_kg|     height_meters|
+--------+---------+---------+------------------+
|student6|   157.48| 38.55532|            1.5748|
|student3|   175.26| 43.09124|            1.7526|
|student2|   149.86|  45.3592|1.4986000000000002|
|student7|    165.1| 36.28736|             1.651|
|student1|   162.56| 40.82328|            1.6256|
|student5|    152.4| 36.28736|             1.524|
+--------+---------+---------+------------------+

Create BMI column

# compute bmi using the below formula
# BMI = weight/(height * height)
# weight must be in kgs
# height must be in meters
df = df.withColumn("bmi", expr("weight_kg/(height_meters*height_meters)"))
# display dataframe
df.show()
+--------+---------+---------+------------------+------------------+
| student|height_cm|weight_kg|     height_meters|               bmi|
+--------+---------+---------+------------------+------------------+
|student6|   157.48| 38.55532|            1.5748|15.546531093062187|
|student3|   175.26| 43.09124|            1.7526|14.028892161964118|
|student2|   149.86|  45.3592|1.4986000000000002|20.197328530250278|
|student7|    165.1| 36.28736|             1.651|13.312549228648752|
|student1|   162.56| 40.82328|            1.6256|15.448293591899683|
|student5|    152.4| 36.28736|             1.524|15.623755691955827|
+--------+---------+---------+------------------+------------------+

Drop Columns

# Drop the columns height_cm, weight_kg and height_meters"
df = df.drop("height_cm","weight_kg","height_meters")
# display dataframe
df.show()
+--------+------------------+
| student|               bmi|
+--------+------------------+
|student6|15.546531093062187|
|student3|14.028892161964118|
|student2|20.197328530250278|
|student7|13.312549228648752|
|student1|15.448293591899683|
|student5|15.623755691955827|
+--------+------------------+

Load CSV to Parquet


Write df to Parquet

#Write the data to a Parquet file, set the mode to overwrite
df.write.mode("overwrite").parquet("student_transformed.parquet")

Stop Spark

spark.stop()