# To suppress warnings generated by the code
def warn(*args, **kwargs):
pass
import warnings
= warn
warnings.warn 'ignore') warnings.filterwarnings(
Import Libraries
ET with SparkSQL - MPG
Objectives
- Load a csv dataset into a dataframe
- Create a temporary view based on a dataframe
- Run SQL queries on the view
- Analyze a dataset using SparkSQL
Setup
We will be using the following libraries:
PySpark
for connecting to the Spark Cluster
Suppress Warnings
To suppress warnings generated by our code, we’ll use this code block
# FindSpark simplifies the process of using Apache Spark with Python
import findspark
findspark.init()
from pyspark.sql import SparkSession
Start Spark Session
#Create SparkSession
#Ignore any warnings by SparkSession command
= SparkSession.builder.appName("SpqrkSQL").getOrCreate() spark
Data
- Modified version of car mileage dataset. Original dataset available at https://archive.ics.uci.edu/ml/datasets/auto+mpg
Download Data Locally
import wget
"https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0231EN-SkillsNetwork/datasets/mpg.csv")
wget.download (
8.104|:443... connected.
200 OK
HTTP request sent, awaiting response... 13891 (14K) [text/csv]
Length: .1’
Saving to: ‘mpg.csv
.1 100%[===================>] 13.57K --.-KB/s in 0s
mpg.csv
2024-11-10 14:49:06 (47.2 MB/s) - ‘mpg.csv.1’ saved [13891/13891]
Load CSB into SParkDF
= spark.read.csv("mpg.csv", header=True, inferSchema=True) mpg_data
View Schema
mpg_data.printSchema()
root|-- MPG: double (nullable = true)
|-- Cylinders: integer (nullable = true)
|-- Engine Disp: double (nullable = true)
|-- Horsepower: integer (nullable = true)
|-- Weight: integer (nullable = true)
|-- Accelerate: double (nullable = true)
|-- Year: integer (nullable = true)
|-- Origin: string (nullable = true)
Create Temp View
"mileage") mpg_data.createOrReplaceTempView(
Query Data
Select all cars whose mileage is more than 40
= spark.sql("SELECT * FROM mileage WHERE MPG > 40"")
results results.show()
+----+---------+-----------+----------+------+----------+----+--------+
| MPG|Cylinders|Engine Disp|Horsepower|Weight|Accelerate|Year| Origin|
+----+---------+-----------+----------+------+----------+----+--------+
|43.1| 4| 90.0| 48| 1985| 21.5| 78|European|
|43.4| 4| 90.0| 48| 2335| 23.7| 80|European|
|41.5| 4| 98.0| 76| 2144| 14.7| 80|European|
|44.3| 4| 90.0| 48| 2085| 21.7| 80|European|
|40.8| 4| 85.0| 65| 2110| 19.2| 80|Japanese|
|44.6| 4| 91.0| 67| 1850| 13.8| 80|Japanese|
|46.6| 4| 86.0| 65| 2110| 17.9| 80|Japanese|
|44.0| 4| 97.0| 52| 2130| 24.6| 82|European|
+----+---------+-----------+----------+------+----------+----+--------+
Analyze Data
Unique Value
List all unique Origins
"SELECT distinct Origin FROM mileage").show()
spark.sql(+--------+
| Origin|
+--------+
|European|
|Japanese|
|American|
+--------+
Count Value
Count all Japanese cars
"SELECT count(*) AS Japanese_Cars FROM mileage WHERE Origin = 'Japanese'").show()
spark.sql(+-------------+
|Japanese_Cars|
+-------------+
| 79|
+-------------+
Count Conditionals
Count the number of cars with mileage > 40
"SELECT count(*) AS Over_40 FROM mileage WHERE MPG>40").show()
spark.sql(+-------+
|Over_40|
+-------+
| 8|
+-------+
Group & Order by Value
Show the number of cars made in different years
"SELECT Year, count(Year) FROM mileage group by Year Order By year").show()
spark.sql(+----+-----------+
|Year|count(Year)|
+----+-----------+
| 70| 29|
| 71| 27|
| 72| 28|
| 73| 40|
| 74| 26|
| 75| 30|
| 76| 34|
| 77| 28|
| 78| 36|
| 79| 29|
| 80| 27|
| 81| 28|
| 82| 30|
+----+-----------+
Max Value
What is the maximum MPG in the dataset
"SELECT max(MPG) FROM mileage").show()
spark.sql(+--------+
|max(MPG)|
+--------+
| 46.6|
+--------+
Stop Spark Session
spark.stop()