# To suppress warnings generated by the code
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')Import Libraries
ETL with Spark Raw to File
Objectives
- Create a Spark Dataframe from the raw data and write to CSV file.
- Read from a csv file and write to parquet file
- Condense PARQUET to a single file.
- Read from a parquet file and write to csv file
- Create Spark Dataframe from the csv file
- Transform the data
- Load the data into a parquet file
Setup
We will be using the following libraries:
PySparkfor connecting to the Spark Cluster
Suppress Warnings
To suppress warnings generated by our code, we’ll use this code block
# FindSpark simplifies the process of using Apache Spark with Python
import findspark
findspark.init()
from pyspark.sql import SparkSessionStart Spark Session
#Create SparkSession
#Ignore any warnings by SparkSession command
spark = SparkSession.builder.appName("ETL with Spark").getOrCreate()Create DF from Raw Data
Create list of Raw Data
#create a list of tuples
#each tuple contains the student id, height and weight
data = [("student1",64,90),
("student2",59,100),
("student3",69,95),
("",70,110),
("student5",60,80),
("student3",69,95),
("student6",62,85),
("student7",65,80),
("student7",65,80)]
# some rows are intentionally duplicatedCreate DF from list
#create a dataframe using createDataFrame and pass the data and the column names.
df = spark.createDataFrame(data, ["student","height_inches","weight_pounds"])
df.show()
+--------+-------------+-------------+
| student|height_inches|weight_pounds|
+--------+-------------+-------------+
|student1| 64| 90|
|student2| 59| 100|
|student3| 69| 95|
| | 70| 110|
|student5| 60| 80|
|student3| 69| 95|
|student6| 62| 85|
|student7| 65| 80|
|student7| 65| 80|
+--------+-------------+-------------+Write DF to CSV
Note: In Apache Spark, when you use the write method to save a DataFrame to a CSV file, it indeed creates a directory rather than a single file. This is because Spark is designed to run in a distributed manner across multiple nodes, and it saves the output as multiple part files within a directory.The csv file is within the directory.

- Use overwrite .mode(“overwrite”) if not just omit
#If you do not wish to over write use df.write.csv("student-hw.csv", header=True)
df.write.mode("overwrite").csv("student-hw.csv", header=True)Read CSV Write
- Now that the data has been written in csv file using Spark, let’s created a SparkDF by reading the file into df
# Load student dataset
df = spark.read.csv("student-hw.csv", header=True, inferSchema=True)
# display dataframe
df.show()
+--------+-------------+-------------+
| student|height_inches|weight_pounds|
+--------+-------------+-------------+
|student7| 65| 80|
|student7| 65| 80|
|student2| 59| 100|
|student1| 64| 90|
|student3| 69| 95|
|student5| 60| 80|
|student3| 69| 95|
|student6| 62| 85|
| null| 70| 110|
+--------+-------------+-------------+Count rown #
df.count()
9Drop Duplicates
df = df.dropDuplicates()
df.count()
7
df.show()
+--------+-------------+-------------+
| student|height_inches|weight_pounds|
+--------+-------------+-------------+
|student6| 62| 85|
|student3| 69| 95|
|student2| 59| 100|
|student7| 65| 80|
| null| 70| 110|
|student1| 64| 90|
|student5| 60| 80|
+--------+-------------+-------------+Drop NA
df = df.dropna()
df.count()
6
df.show()
+--------+-------------+-------------+
| student|height_inches|weight_pounds|
+--------+-------------+-------------+
|student6| 62| 85|
|student3| 69| 95|
|student2| 59| 100|
|student7| 65| 80|
|student1| 64| 90|
|student5| 60| 80|
+--------+-------------+-------------+Save to Paquet
Save to parquet:
Notice that there are a lot of .parquet files in the output. To improve parallellism, spark stores each dataframe in multiple partitions.When the data is saved as parquet file, each partition is saved as a separate file.
- if you do not wish to overwrite use the command
df.write.parquet("student-hw.parquet")
#Write the data to a Parquet file
df.write.mode("overwrite").parquet("student-hw.parquet")Verify Parquet Write
ls -l student-hw.parquet
['total 7',
'-rw-r--r-- 1 jupyterlab resources 0 Nov 10 15:52 _SUCCESS',
'-rw-r--r-- 1 jupyterlab resources 467 Nov 10 15:52 part-00000-cad814c5-e053-4cbf-868a-a59bf921d852-c000.snappy.parquet',
'-rw-r--r-- 1 jupyterlab resources 911 Nov 10 15:52 part-00003-cad814c5-e053-4cbf-868a-a59bf921d852-c000.snappy.parquet',
'-rw-r--r-- 1 jupyterlab resources 911 Nov 10 15:52 part-00010-cad814c5-e053-4cbf-868a-a59bf921d852-c000.snappy.parquet',
'-rw-r--r-- 1 jupyterlab resources 911 Nov 10 15:52 part-00054-cad814c5-e053-4cbf-868a-a59bf921d852-c000.snappy.parquet',
'-rw-r--r-- 1 jupyterlab resources 911 Nov 10 15:52 part-00132-cad814c5-e053-4cbf-868a-a59bf921d852-c000.snappy.parquet',
'-rw-r--r-- 1 jupyterlab resources 911 Nov 10 15:52 part-00172-cad814c5-e053-4cbf-868a-a59bf921d852-c000.snappy.parquet',
'-rw-r--r-- 1 jupyterlab resources 911 Nov 10 15:52 part-00186-cad814c5-e053-4cbf-868a-a59bf921d852-c000.snappy.parquet']Repartition DF
- Reduce the number of partitions in the dataframe to one
df = df.repartition(1)ReWrite to Parquet
#Write the data to a Parquet file
df.write.mode("overwrite").parquet("student-hw-single.parquet")Verify Parquet Write
ls -l student-hw-single.parquet
total 2
-rw-r--r-- 1 jupyterlab resources 0 Nov 10 15:56 _SUCCESS
-rw-r--r-- 1 jupyterlab resources 944 Nov 10 15:56 part-00000-fbd0c002-94a6-4230-878e-250499c659e5-c000.snappy.parquetLoad Parquet to Spark DF
df = spark.read.parquet("student-hw-single.parquet")
df.show()
+--------+-------------+-------------+
| student|height_inches|weight_pounds|
+--------+-------------+-------------+
|student6| 62| 85|
|student3| 69| 95|
|student2| 59| 100|
|student7| 65| 80|
|student1| 64| 90|
|student5| 60| 80|
+--------+-------------+-------------+Transform
#import the expr function that helps in transforming the data
from pyspark.sql.functions import exprConvert inches to cent
# Convert inches to centimeters
# Multiply the column height_inches with 2.54 to get a new column height_centimeters
df = df.withColumn("height_centimeters", expr("height_inches * 2.54"))
df.show()
+--------+-------------+-------------+------------------+
| student|height_inches|weight_pounds|height_centimeters|
+--------+-------------+-------------+------------------+
|student6| 62| 85| 157.48|
|student3| 69| 95| 175.26|
|student2| 59| 100| 149.86|
|student7| 65| 80| 165.10|
|student1| 64| 90| 162.56|
|student5| 60| 80| 152.40|
+--------+-------------+-------------+------------------+Convert # to kg
# Convert pounds to kilograms
# Multiply weight_pounds with 0.453592 to get a new column weight_kg
df = df.withColumn("weight_kg", expr("weight_pounds * 0.453592"))
df.show()
+--------+-------------+-------------+------------------+---------+
| student|height_inches|weight_pounds|height_centimeters|weight_kg|
+--------+-------------+-------------+------------------+---------+
|student6| 62| 85| 157.48|38.555320|
|student3| 69| 95| 175.26|43.091240|
|student2| 59| 100| 149.86|45.359200|
|student7| 65| 80| 165.10|36.287360|
|student1| 64| 90| 162.56|40.823280|
|student5| 60| 80| 152.40|36.287360|
+--------+-------------+-------------+------------------+---------+Drop Columns
# drop the columns "height_inches","weight_pounds"
df = df.drop("height_inches","weight_pounds")
df.show()
+--------+------------------+---------+
| student|height_centimeters|weight_kg|
+--------+------------------+---------+
|student6| 157.48|38.555320|
|student3| 175.26|43.091240|
|student2| 149.86|45.359200|
|student7| 165.10|36.287360|
|student1| 162.56|40.823280|
|student5| 152.40|36.287360|
+--------+------------------+---------+Rename Columns
# rename the lengthy column name "height_centimeters" to "height_cm"
df = df.withColumnRenamed("height_centimeters","height_cm")
df.show()
+--------+---------+---------+
| student|height_cm|weight_kg|
+--------+---------+---------+
|student6| 157.48|38.555320|
|student3| 175.26|43.091240|
|student2| 149.86|45.359200|
|student7| 165.10|36.287360|
|student1| 162.56|40.823280|
|student5| 152.40|36.287360|
+--------+---------+---------+Save to CSV
df.write.mode("overwrite").csv("student_transformed.csv", header=True)Verify Save
# Load student dataset
df = spark.read.csv("student_transformed.csv", header=True, inferSchema=True)
# display dataframe
df.show()
+--------+---------+---------+
| student|height_cm|weight_kg|
+--------+---------+---------+
|student6| 157.48| 38.55532|
|student3| 175.26| 43.09124|
|student2| 149.86| 45.3592|
|student7| 165.1| 36.28736|
|student1| 162.56| 40.82328|
|student5| 152.4| 36.28736|
+--------+---------+---------+Extract CSV to SparkDF
Let’s backup and load the data from the csv file we saved earlier student_transformed.csv
# Load student dataset
df = spark.read.csv("student_transformed.csv", header=True, inferSchema=True)
# display dataframe
df.show()
+--------+---------+---------+
| student|height_cm|weight_kg|
+--------+---------+---------+
|student6| 157.48| 38.55532|
|student3| 175.26| 43.09124|
|student2| 149.86| 45.3592|
|student7| 165.1| 36.28736|
|student1| 162.56| 40.82328|
|student5| 152.4| 36.28736|
+--------+---------+---------+Transform
Cm to m
# Divide the column height_cm by 100 a new column height_cm
df = df.withColumn("height_meters", expr("height_cm / 100"))
# display dataframe
df.show()
+--------+---------+---------+------------------+
| student|height_cm|weight_kg| height_meters|
+--------+---------+---------+------------------+
|student6| 157.48| 38.55532| 1.5748|
|student3| 175.26| 43.09124| 1.7526|
|student2| 149.86| 45.3592|1.4986000000000002|
|student7| 165.1| 36.28736| 1.651|
|student1| 162.56| 40.82328| 1.6256|
|student5| 152.4| 36.28736| 1.524|
+--------+---------+---------+------------------+Create BMI column
# compute bmi using the below formula
# BMI = weight/(height * height)
# weight must be in kgs
# height must be in meters
df = df.withColumn("bmi", expr("weight_kg/(height_meters*height_meters)"))
# display dataframe
df.show()
+--------+---------+---------+------------------+------------------+
| student|height_cm|weight_kg| height_meters| bmi|
+--------+---------+---------+------------------+------------------+
|student6| 157.48| 38.55532| 1.5748|15.546531093062187|
|student3| 175.26| 43.09124| 1.7526|14.028892161964118|
|student2| 149.86| 45.3592|1.4986000000000002|20.197328530250278|
|student7| 165.1| 36.28736| 1.651|13.312549228648752|
|student1| 162.56| 40.82328| 1.6256|15.448293591899683|
|student5| 152.4| 36.28736| 1.524|15.623755691955827|
+--------+---------+---------+------------------+------------------+Drop Columns
# Drop the columns height_cm, weight_kg and height_meters"
df = df.drop("height_cm","weight_kg","height_meters")
# display dataframe
df.show()
+--------+------------------+
| student| bmi|
+--------+------------------+
|student6|15.546531093062187|
|student3|14.028892161964118|
|student2|20.197328530250278|
|student7|13.312549228648752|
|student1|15.448293591899683|
|student5|15.623755691955827|
+--------+------------------+Load CSV to Parquet
Write df to Parquet
#Write the data to a Parquet file, set the mode to overwrite
df.write.mode("overwrite").parquet("student_transformed.parquet")Stop Spark
spark.stop()