# To suppress warnings generated by the code
def warn(*args, **kwargs):
pass
import warnings
= warn
warnings.warn 'ignore') warnings.filterwarnings(
ETL with Spark Raw to File
Objectives
- Create a Spark Dataframe from the raw data and write to CSV file.
- Read from a csv file and write to parquet file
- Condense PARQUET to a single file.
- Read from a parquet file and write to csv file
- Create Spark Dataframe from the csv file
- Transform the data
- Load the data into a parquet file
Setup
We will be using the following libraries:
PySpark
for connecting to the Spark Cluster
Suppress Warnings
To suppress warnings generated by our code, we’ll use this code block
Import Libraries
# FindSpark simplifies the process of using Apache Spark with Python
import findspark
findspark.init()
from pyspark.sql import SparkSession
Start Spark Session
#Create SparkSession
#Ignore any warnings by SparkSession command
= SparkSession.builder.appName("ETL with Spark").getOrCreate() spark
Create DF from Raw Data
Create list of Raw Data
#create a list of tuples
#each tuple contains the student id, height and weight
= [("student1",64,90),
data "student2",59,100),
("student3",69,95),
("",70,110),
("student5",60,80),
("student3",69,95),
("student6",62,85),
("student7",65,80),
("student7",65,80)]
(
# some rows are intentionally duplicated
Create DF from list
#create a dataframe using createDataFrame and pass the data and the column names.
= spark.createDataFrame(data, ["student","height_inches","weight_pounds"])
df
df.show()+--------+-------------+-------------+
| student|height_inches|weight_pounds|
+--------+-------------+-------------+
|student1| 64| 90|
|student2| 59| 100|
|student3| 69| 95|
| | 70| 110|
|student5| 60| 80|
|student3| 69| 95|
|student6| 62| 85|
|student7| 65| 80|
|student7| 65| 80|
+--------+-------------+-------------+
Write DF to CSV
Note: In Apache Spark, when you use the write method to save a DataFrame to a CSV file, it indeed creates a directory rather than a single file. This is because Spark is designed to run in a distributed manner across multiple nodes, and it saves the output as multiple part files within a directory.The csv file is within the directory.
- Use overwrite .mode(“overwrite”) if not just omit
#If you do not wish to over write use df.write.csv("student-hw.csv", header=True)
"overwrite").csv("student-hw.csv", header=True) df.write.mode(
Read CSV Write
- Now that the data has been written in csv file using Spark, let’s created a SparkDF by reading the file into df
# Load student dataset
= spark.read.csv("student-hw.csv", header=True, inferSchema=True)
df
# display dataframe
df.show()+--------+-------------+-------------+
| student|height_inches|weight_pounds|
+--------+-------------+-------------+
|student7| 65| 80|
|student7| 65| 80|
|student2| 59| 100|
|student1| 64| 90|
|student3| 69| 95|
|student5| 60| 80|
|student3| 69| 95|
|student6| 62| 85|
| null| 70| 110|
+--------+-------------+-------------+
Count rown #
df.count()9
Drop Duplicates
= df.dropDuplicates()
df
df.count()7
df.show()+--------+-------------+-------------+
| student|height_inches|weight_pounds|
+--------+-------------+-------------+
|student6| 62| 85|
|student3| 69| 95|
|student2| 59| 100|
|student7| 65| 80|
| null| 70| 110|
|student1| 64| 90|
|student5| 60| 80|
+--------+-------------+-------------+
Drop NA
= df.dropna()
df
df.count()6
df.show()+--------+-------------+-------------+
| student|height_inches|weight_pounds|
+--------+-------------+-------------+
|student6| 62| 85|
|student3| 69| 95|
|student2| 59| 100|
|student7| 65| 80|
|student1| 64| 90|
|student5| 60| 80|
+--------+-------------+-------------+
Save to Paquet
Save to parquet:
Notice that there are a lot of .parquet files in the output. To improve parallellism, spark stores each dataframe in multiple partitions.When the data is saved as parquet file, each partition is saved as a separate file.
- if you do not wish to overwrite use the command
df.write.parquet("student-hw.parquet")
#Write the data to a Parquet file
"overwrite").parquet("student-hw.parquet") df.write.mode(
Verify Parquet Write
-l student-hw.parquet
ls 'total 7',
['-rw-r--r-- 1 jupyterlab resources 0 Nov 10 15:52 _SUCCESS',
'-rw-r--r-- 1 jupyterlab resources 467 Nov 10 15:52 part-00000-cad814c5-e053-4cbf-868a-a59bf921d852-c000.snappy.parquet',
'-rw-r--r-- 1 jupyterlab resources 911 Nov 10 15:52 part-00003-cad814c5-e053-4cbf-868a-a59bf921d852-c000.snappy.parquet',
'-rw-r--r-- 1 jupyterlab resources 911 Nov 10 15:52 part-00010-cad814c5-e053-4cbf-868a-a59bf921d852-c000.snappy.parquet',
'-rw-r--r-- 1 jupyterlab resources 911 Nov 10 15:52 part-00054-cad814c5-e053-4cbf-868a-a59bf921d852-c000.snappy.parquet',
'-rw-r--r-- 1 jupyterlab resources 911 Nov 10 15:52 part-00132-cad814c5-e053-4cbf-868a-a59bf921d852-c000.snappy.parquet',
'-rw-r--r-- 1 jupyterlab resources 911 Nov 10 15:52 part-00172-cad814c5-e053-4cbf-868a-a59bf921d852-c000.snappy.parquet',
'-rw-r--r-- 1 jupyterlab resources 911 Nov 10 15:52 part-00186-cad814c5-e053-4cbf-868a-a59bf921d852-c000.snappy.parquet']
Repartition DF
- Reduce the number of partitions in the dataframe to one
= df.repartition(1) df
ReWrite to Parquet
#Write the data to a Parquet file
"overwrite").parquet("student-hw-single.parquet") df.write.mode(
Verify Parquet Write
-l student-hw-single.parquet
ls
2
total -rw-r--r-- 1 jupyterlab resources 0 Nov 10 15:56 _SUCCESS
-rw-r--r-- 1 jupyterlab resources 944 Nov 10 15:56 part-00000-fbd0c002-94a6-4230-878e-250499c659e5-c000.snappy.parquet
Load Parquet to Spark DF
= spark.read.parquet("student-hw-single.parquet")
df
df.show()+--------+-------------+-------------+
| student|height_inches|weight_pounds|
+--------+-------------+-------------+
|student6| 62| 85|
|student3| 69| 95|
|student2| 59| 100|
|student7| 65| 80|
|student1| 64| 90|
|student5| 60| 80|
+--------+-------------+-------------+
Transform
#import the expr function that helps in transforming the data
from pyspark.sql.functions import expr
Convert inches to cent
# Convert inches to centimeters
# Multiply the column height_inches with 2.54 to get a new column height_centimeters
= df.withColumn("height_centimeters", expr("height_inches * 2.54"))
df
df.show()+--------+-------------+-------------+------------------+
| student|height_inches|weight_pounds|height_centimeters|
+--------+-------------+-------------+------------------+
|student6| 62| 85| 157.48|
|student3| 69| 95| 175.26|
|student2| 59| 100| 149.86|
|student7| 65| 80| 165.10|
|student1| 64| 90| 162.56|
|student5| 60| 80| 152.40|
+--------+-------------+-------------+------------------+
Convert # to kg
# Convert pounds to kilograms
# Multiply weight_pounds with 0.453592 to get a new column weight_kg
= df.withColumn("weight_kg", expr("weight_pounds * 0.453592"))
df
df.show()+--------+-------------+-------------+------------------+---------+
| student|height_inches|weight_pounds|height_centimeters|weight_kg|
+--------+-------------+-------------+------------------+---------+
|student6| 62| 85| 157.48|38.555320|
|student3| 69| 95| 175.26|43.091240|
|student2| 59| 100| 149.86|45.359200|
|student7| 65| 80| 165.10|36.287360|
|student1| 64| 90| 162.56|40.823280|
|student5| 60| 80| 152.40|36.287360|
+--------+-------------+-------------+------------------+---------+
Drop Columns
# drop the columns "height_inches","weight_pounds"
= df.drop("height_inches","weight_pounds")
df
df.show()+--------+------------------+---------+
| student|height_centimeters|weight_kg|
+--------+------------------+---------+
|student6| 157.48|38.555320|
|student3| 175.26|43.091240|
|student2| 149.86|45.359200|
|student7| 165.10|36.287360|
|student1| 162.56|40.823280|
|student5| 152.40|36.287360|
+--------+------------------+---------+
Rename Columns
# rename the lengthy column name "height_centimeters" to "height_cm"
= df.withColumnRenamed("height_centimeters","height_cm")
df
df.show()+--------+---------+---------+
| student|height_cm|weight_kg|
+--------+---------+---------+
|student6| 157.48|38.555320|
|student3| 175.26|43.091240|
|student2| 149.86|45.359200|
|student7| 165.10|36.287360|
|student1| 162.56|40.823280|
|student5| 152.40|36.287360|
+--------+---------+---------+
Save to CSV
"overwrite").csv("student_transformed.csv", header=True) df.write.mode(
Verify Save
# Load student dataset
= spark.read.csv("student_transformed.csv", header=True, inferSchema=True)
df # display dataframe
df.show()+--------+---------+---------+
| student|height_cm|weight_kg|
+--------+---------+---------+
|student6| 157.48| 38.55532|
|student3| 175.26| 43.09124|
|student2| 149.86| 45.3592|
|student7| 165.1| 36.28736|
|student1| 162.56| 40.82328|
|student5| 152.4| 36.28736|
+--------+---------+---------+
Extract CSV to SparkDF
Let’s backup and load the data from the csv file we saved earlier student_transformed.csv
# Load student dataset
= spark.read.csv("student_transformed.csv", header=True, inferSchema=True)
df # display dataframe
df.show()+--------+---------+---------+
| student|height_cm|weight_kg|
+--------+---------+---------+
|student6| 157.48| 38.55532|
|student3| 175.26| 43.09124|
|student2| 149.86| 45.3592|
|student7| 165.1| 36.28736|
|student1| 162.56| 40.82328|
|student5| 152.4| 36.28736|
+--------+---------+---------+
Transform
Cm to m
# Divide the column height_cm by 100 a new column height_cm
= df.withColumn("height_meters", expr("height_cm / 100"))
df # display dataframe
df.show()+--------+---------+---------+------------------+
| student|height_cm|weight_kg| height_meters|
+--------+---------+---------+------------------+
|student6| 157.48| 38.55532| 1.5748|
|student3| 175.26| 43.09124| 1.7526|
|student2| 149.86| 45.3592|1.4986000000000002|
|student7| 165.1| 36.28736| 1.651|
|student1| 162.56| 40.82328| 1.6256|
|student5| 152.4| 36.28736| 1.524|
+--------+---------+---------+------------------+
Create BMI column
# compute bmi using the below formula
# BMI = weight/(height * height)
# weight must be in kgs
# height must be in meters
= df.withColumn("bmi", expr("weight_kg/(height_meters*height_meters)"))
df # display dataframe
df.show()+--------+---------+---------+------------------+------------------+
| student|height_cm|weight_kg| height_meters| bmi|
+--------+---------+---------+------------------+------------------+
|student6| 157.48| 38.55532| 1.5748|15.546531093062187|
|student3| 175.26| 43.09124| 1.7526|14.028892161964118|
|student2| 149.86| 45.3592|1.4986000000000002|20.197328530250278|
|student7| 165.1| 36.28736| 1.651|13.312549228648752|
|student1| 162.56| 40.82328| 1.6256|15.448293591899683|
|student5| 152.4| 36.28736| 1.524|15.623755691955827|
+--------+---------+---------+------------------+------------------+
Drop Columns
# Drop the columns height_cm, weight_kg and height_meters"
= df.drop("height_cm","weight_kg","height_meters")
df # display dataframe
df.show()+--------+------------------+
| student| bmi|
+--------+------------------+
|student6|15.546531093062187|
|student3|14.028892161964118|
|student2|20.197328530250278|
|student7|13.312549228648752|
|student1|15.448293591899683|
|student5|15.623755691955827|
+--------+------------------+
Load CSV to Parquet
Write df to Parquet
#Write the data to a Parquet file, set the mode to overwrite
"overwrite").parquet("student_transformed.parquet") df.write.mode(
Stop Spark
spark.stop()