Import Libraries

ET with SparkSQL - MPG

Objectives


  1. Load a csv dataset into a dataframe
  2. Create a temporary view based on a dataframe
  3. Run SQL queries on the view
  4. Analyze a dataset using SparkSQL

Setup


We will be using the following libraries:

  • PySpark for connecting to the Spark Cluster

Suppress Warnings

To suppress warnings generated by our code, we’ll use this code block

# To suppress warnings generated by the code
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')
# FindSpark simplifies the process of using Apache Spark with Python
import findspark
findspark.init()

from pyspark.sql import SparkSession

Start Spark Session


#Create SparkSession
#Ignore any warnings by SparkSession command

spark = SparkSession.builder.appName("SpqrkSQL").getOrCreate()

Data


Download Data Locally

import wget
wget.download ("https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0231EN-SkillsNetwork/datasets/mpg.csv")

8.104|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 13891 (14K) [text/csv]
Saving to: ‘mpg.csv.1

mpg.csv.1           100%[===================>]  13.57K  --.-KB/s    in 0s      

2024-11-10 14:49:06 (47.2 MB/s) - ‘mpg.csv.1’ saved [13891/13891]

Load CSB into SParkDF

mpg_data = spark.read.csv("mpg.csv", header=True, inferSchema=True)

View Schema

mpg_data.printSchema()
root
 |-- MPG: double (nullable = true)
 |-- Cylinders: integer (nullable = true)
 |-- Engine Disp: double (nullable = true)
 |-- Horsepower: integer (nullable = true)
 |-- Weight: integer (nullable = true)
 |-- Accelerate: double (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Origin: string (nullable = true)

Create Temp View


mpg_data.createOrReplaceTempView("mileage")

Query Data


Select all cars whose mileage is more than 40

results = spark.sql("SELECT * FROM mileage WHERE MPG > 40"")
results.show()
+----+---------+-----------+----------+------+----------+----+--------+
| MPG|Cylinders|Engine Disp|Horsepower|Weight|Accelerate|Year|  Origin|
+----+---------+-----------+----------+------+----------+----+--------+
|43.1|        4|       90.0|        48|  1985|      21.5|  78|European|
|43.4|        4|       90.0|        48|  2335|      23.7|  80|European|
|41.5|        4|       98.0|        76|  2144|      14.7|  80|European|
|44.3|        4|       90.0|        48|  2085|      21.7|  80|European|
|40.8|        4|       85.0|        65|  2110|      19.2|  80|Japanese|
|44.6|        4|       91.0|        67|  1850|      13.8|  80|Japanese|
|46.6|        4|       86.0|        65|  2110|      17.9|  80|Japanese|
|44.0|        4|       97.0|        52|  2130|      24.6|  82|European|
+----+---------+-----------+----------+------+----------+----+--------+

Analyze Data

Unique Value

List all unique Origins

spark.sql("SELECT distinct Origin FROM mileage").show()
+--------+
|  Origin|
+--------+
|European|
|Japanese|
|American|
+--------+

Count Value

Count all Japanese cars

spark.sql("SELECT count(*) AS Japanese_Cars FROM mileage WHERE Origin = 'Japanese'").show()
+-------------+
|Japanese_Cars|
+-------------+
|           79|
+-------------+

Count Conditionals

Count the number of cars with mileage > 40

spark.sql("SELECT count(*) AS Over_40 FROM mileage WHERE MPG>40").show()
+-------+
|Over_40|
+-------+
|      8|
+-------+

Group & Order by Value

Show the number of cars made in different years

spark.sql("SELECT Year, count(Year) FROM mileage group by Year Order By year").show()
+----+-----------+
|Year|count(Year)|
+----+-----------+
|  70|         29|
|  71|         27|
|  72|         28|
|  73|         40|
|  74|         26|
|  75|         30|
|  76|         34|
|  77|         28|
|  78|         36|
|  79|         29|
|  80|         27|
|  81|         28|
|  82|         30|
+----+-----------+

Max Value

What is the maximum MPG in the dataset

spark.sql("SELECT max(MPG) FROM mileage").show()
+--------+
|max(MPG)|
+--------+
|    46.6|
+--------+

Stop Spark Session

spark.stop()