Import Libraries

ET with SparkSQL - Diamonds

Objectives


  1. Load a csv dataset into a dataframe
  2. Create a temporary view based on a dataframe
  3. Run SQL queries on the view
  4. Analyze a dataset using SparkSQL

Setup


We will be using the following libraries:

  • PySpark for connecting to the Spark Cluster

Suppress Warnings

To suppress warnings generated by our code, we’ll use this code block

# To suppress warnings generated by the code
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')
# FindSpark simplifies the process of using Apache Spark with Python
import findspark
findspark.init()

from pyspark.sql import SparkSession

Start Spark Session


#Create SparkSession
#Ignore any warnings by SparkSession command

spark = SparkSession.builder.appName("SpqrkSQL").getOrCreate()

Data


Download Data Locally

import wget
wget.download ("https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0231EN-SkillsNetwork/datasets/diamonds.csv")

8.104|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 3192561 (3.0M) [text/csv]
Saving to: ‘diamonds.csv.1

diamonds.csv.1      100%[===================>]   3.04M  --.-KB/s    in 0.04s   

2024-11-10 15:18:36 (82.8 MB/s) - ‘diamonds.csv.1’ saved [3192561/3192561]

Load CSB into SParkDF

diamond_data = spark.read.csv("diamonds.csv", header=True, inferSchema=True)

View Schema

diamond_data.printSchema()
root
 |-- s: integer (nullable = true)
 |-- carat: double (nullable = true)
 |-- cut: string (nullable = true)
 |-- color: string (nullable = true)
 |-- clarity: string (nullable = true)
 |-- depth: double (nullable = true)
 |-- table: double (nullable = true)
 |-- price: integer (nullable = true)
 |-- x: double (nullable = true)
 |-- y: double (nullable = true)
 |-- z: double (nullable = true)

Create Temp View


diamond_data.createOrReplaceTempView("diamonds")

Query Data


Select all rows

results = spark.sql("SELECT * FROM diamonds")
results.show()
+---+-----+---------+-----+-------+-----+-----+-----+----+----+----+
|  s|carat|      cut|color|clarity|depth|table|price|   x|   y|   z|
+---+-----+---------+-----+-------+-----+-----+-----+----+----+----+
|  1| 0.23|    Ideal|    E|    SI2| 61.5| 55.0|  326|3.95|3.98|2.43|
|  2| 0.21|  Premium|    E|    SI1| 59.8| 61.0|  326|3.89|3.84|2.31|
|  3| 0.23|     Good|    E|    VS1| 56.9| 65.0|  327|4.05|4.07|2.31|
|  4| 0.29|  Premium|    I|    VS2| 62.4| 58.0|  334| 4.2|4.23|2.63|
|  5| 0.31|     Good|    J|    SI2| 63.3| 58.0|  335|4.34|4.35|2.75|
|  6| 0.24|Very Good|    J|   VVS2| 62.8| 57.0|  336|3.94|3.96|2.48|
|  7| 0.24|Very Good|    I|   VVS1| 62.3| 57.0|  336|3.95|3.98|2.47|
|  8| 0.26|Very Good|    H|    SI1| 61.9| 55.0|  337|4.07|4.11|2.53|
|  9| 0.22|     Fair|    E|    VS2| 65.1| 61.0|  337|3.87|3.78|2.49|
| 10| 0.23|Very Good|    H|    VS1| 59.4| 61.0|  338| 4.0|4.05|2.39|
| 11|  0.3|     Good|    J|    SI1| 64.0| 55.0|  339|4.25|4.28|2.73|
| 12| 0.23|    Ideal|    J|    VS1| 62.8| 56.0|  340|3.93| 3.9|2.46|
| 13| 0.22|  Premium|    F|    SI1| 60.4| 61.0|  342|3.88|3.84|2.33|
| 14| 0.31|    Ideal|    J|    SI2| 62.2| 54.0|  344|4.35|4.37|2.71|
| 15|  0.2|  Premium|    E|    SI2| 60.2| 62.0|  345|3.79|3.75|2.27|
| 16| 0.32|  Premium|    E|     I1| 60.9| 58.0|  345|4.38|4.42|2.68|
| 17|  0.3|    Ideal|    I|    SI2| 62.0| 54.0|  348|4.31|4.34|2.68|
| 18|  0.3|     Good|    J|    SI1| 63.4| 54.0|  351|4.23|4.29| 2.7|
| 19|  0.3|     Good|    J|    SI1| 63.8| 56.0|  351|4.23|4.26|2.71|
| 20|  0.3|Very Good|    J|    SI1| 62.7| 59.0|  351|4.21|4.27|2.66|
+---+-----+---------+-----+-------+-----+-----+-----+----+----+----+
only showing top 20 rows

Analyze Data

Unique Value

List all unique Cuts

spark.sql("SELECT distinct cut FROM diamonds").show()
+---------+
|      cut|
+---------+
|  Premium|
|    Ideal|
|     Good|
|     Fair|
|Very Good|
+---------+

Count Value

Count all Premium cuts

spark.sql("SELECT count(*) AS PremiumCuts FROM diamonds WHERE cut = 'Premium'").show()
+-----------+
|PremiumCuts|
+-----------+
|      13791|
+-----------+

Count Conditionals

Count the number of diamonds with table size > 65

spark.sql("SELECT count(*) AS Over_65 FROM diamonds WHERE table>65").show()
+-------+
|Over_65|
+-------+
|    181|
+-------+

Group & Order by Value

Show the number of diamonds under each color

spark.sql("SELECT color, count(color) AS Total FROM diamonds group by color Order By Total").show()
+-----+-----+
|color|Total|
+-----+-----+
|    J| 2808|
|    I| 5422|
|    D| 6775|
|    H| 8304|
|    F| 9542|
|    E| 9797|
|    G|11292|
+-----+-----+

Max Value

What is the maximum price in the dataset

spark.sql("SELECT max(price) FROM diamonds").show()
+----------+
|max(price)|
+----------+
|     18823|
+----------+

Stop Spark Session

spark.stop()