Import Libraries

Streaming Sensor Monitoring - HVAC

Background


SBS, specializes in optimizing HVAC (heating, ventilation, and air conditioning) systems to enhance comfort and energy efficiency in commercial buildings. By monitoring temperature and humidity levels in real-time across various rooms, the company aims to ensure optimal indoor conditions and preemptively address potential HVAC issues.

With a continuous influx of sensor data, Smart Building Solutions needs to process and analyze this data in real-time to maintain the quality of the indoor environment.

Data set description

The simulated data set comprises:

  • room_id: Unique identifier for each room (e.g., R001, R002).
  • temperature: Current temperature reading from the sensor (in °C).
  • humidity: Current humidity level reading from the sensor (in %).
  • timestamp: Time when the reading was recorded (automatically generated by Spark). The data is generated at a rate of 5 rows per second, simulating multiple rooms with various environmental conditions.

Challenges

Monitoring indoor environmental conditions poses several challenges:

  • High data velocity: Continuous data from multiple sensors can overwhelm traditional systems.
  • Need for immediate alerts: Delays in identifying critical conditions can lead to discomfort or system inefficiencies.
  • Need for data aggregation and analysis: Efficiently aggregating and analyzing real-time data for proactive maintenance and optimization is essential.

Objectives


  1. Explain the distributed architecture of Spark in the context of smart building monitoring
  2. Simulate real-time sensor data for HVAC systems in a building
  3. Perform SQL queries to detect critical environmental conditions and calculate average readings
  4. Determine the aggregated results to the console for immediate insights into room conditions

You will understand the Spark’s distributed architecture. You also will understand how to simulate real-time sensor data for temperature and humidity, execute SQL queries to identify critical environmental conditions, and output aggregated results for immediate insights.

Setup


Suppress Warnings

To suppress warnings generated by our code, we’ll use this code block

# To suppress warnings generated by the code
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')
# FindSpark simplifies the process of using Apache Spark with Python
import findspark
findspark.init()

from pyspark.sql import SparkSession

#import functions/Classes for sparkml
from pyspark.ml.clustering import KMeans

Start Spark Session


#Create SparkSession
#Ignore any warnings by SparkSession command

spark = SparkSession.builder.appName("Smart HVAC Monitoring").getOrCreate()

Simulate Sensor Data


Use Spark’s rate source to generate continuous readings from multiple rooms.

from pyspark.sql.functions import expr, rand,when

# Simulate sensor data with room IDs and readings
sensor_data = spark.readStream.format("rate").option("rowsPerSecond", 5).load() \
    .withColumn("room_id", expr("CAST(value % 10 AS STRING)")) \
    .withColumn("temperature", when(expr("value % 10 == 0"), 15)  # Set temperature to 15 for one specific record
                .otherwise(20 + rand() * 25)) \
    .withColumn("humidity", expr("40 + rand() * 30"))

Create TempView

# Create a temporary SQL view for the sensor data
sensor_data.createOrReplaceTempView("sensor_table")

Query Data


Define Queries

Define SQL queries for aggregation and analysis

  • Critical temperature query: Detect rooms with critical temperature levels
  • Average readings query: Calculate average readings over a 1-minute window
  • Attention needed query: Identify rooms that need immediate attention based on humidity levels
# SQL Query to detect rooms with critical temperatures
critical_temperature_query = """
    SELECT 
        room_id, 
        temperature, 
        humidity, 
        timestamp 
    FROM sensor_table 
    WHERE temperature < 18 OR temperature > 60
"""

# SQL Query to calculate average readings over a 1-minute window
average_readings_query = """
    SELECT 
        room_id, 
        AVG(temperature) AS avg_temperature, 
        AVG(humidity) AS avg_humidity, 
        window.start AS window_start 
    FROM sensor_table
    GROUP BY room_id, window(timestamp, '1 minute')
"""

# SQL Query to find rooms that need immediate attention based on humidity
attention_needed_query = """
    SELECT 
        room_id, 
        COUNT(*) AS critical_readings 
    FROM sensor_table 
    WHERE humidity < 45 OR humidity > 75
    GROUP BY room_id
"""

Query the Data

Create Streaming DF

# Execute the critical temperature query
critical_temperatures_stream = spark.sql(critical_temperature_query)


# Execute the average readings query
average_readings_stream = spark.sql(average_readings_query)

# Execute the attention needed query
attention_needed_stream = spark.sql(attention_needed_query)

Output Queries

Display results in real time from each query

# Output the results to the console for all queries
critical_query = critical_temperatures_stream.writeStream \
    .outputMode("append") \
    .format("console") \
    .queryName("Critical Temperatures") \
    .start()

average_query = average_readings_stream.writeStream \
    .outputMode("complete") \
    .format("console") \
    .queryName("Average Readings") \
    .start()

attention_query = attention_needed_stream.writeStream \
    .outputMode("complete") \
    .format("console") \
    .queryName("Attention Needed") \
    .start()
    
# OUTPUT
-------------------------------------------
Batch: 0
-------------------------------------------
+-------+-----------+--------+---------+
|room_id|temperature|humidity|timestamp|
+-------+-----------+--------+---------+
+-------+-----------+--------+---------+

-------------------------------------------
Batch: 1
-------------------------------------------
+-------+-----------+------------------+--------------------+
|room_id|temperature|          humidity|           timestamp|
+-------+-----------+------------------+--------------------+
|      0|       15.0| 53.50163855231966|2024-11-10 16:34:...|
|      0|       15.0| 43.67782978570206|2024-11-10 16:34:...|
|      0|       15.0|54.956310853392594|2024-11-10 16:34:...|
|      0|       15.0|42.050346866531584|2024-11-10 16:34:...|
+-------+-----------+------------------+--------------------+

-------------------------------------------
Batch: 0
-------------------------------------------
+-------+---------------+------------+------------+
|room_id|avg_temperature|avg_humidity|window_start|
+-------+---------------+------------+------------+
+-------+---------------+------------+------------+

-------------------------------------------
Batch: 2
-------------------------------------------
+-------+-----------+------------------+--------------------+
|room_id|temperature|          humidity|           timestamp|
+-------+-----------+------------------+--------------------+
|      0|       15.0| 60.35485860928195|2024-11-10 16:34:...|
|      0|       15.0|56.841748430871405|2024-11-10 16:34:...|
+-------+-----------+------------------+--------------------+

-------------------------------------------
Batch: 0
-------------------------------------------
+-------+-----------------+
|room_id|critical_readings|
+-------+-----------------+
+-------+-----------------+
-------------------------------------------
Batch: 3
-------------------------------------------
-------------------------------------------
Batch: 1
-------------------------------------------
+-------+-----------+------------------+--------------------+
|room_id|temperature|          humidity|           timestamp|
+-------+-----------+------------------+--------------------+
|      0|       15.0| 60.52116674837685|2024-11-10 16:34:...|
|      0|       15.0|53.234704114821156|2024-11-10 16:34:...|
|      0|       15.0| 61.05123960892497|2024-11-10 16:34:...|
|      0|       15.0| 43.11507429428427|2024-11-10 16:34:...|
|      0|       15.0| 47.80419520443808|2024-11-10 16:34:...|
|      0|       15.0| 59.17572509036224|2024-11-10 16:34:...|
|      0|       15.0|51.659978419091345|2024-11-10 16:34:...|
|      0|       15.0| 46.24606003011726|2024-11-10 16:34:...|
|      0|       15.0| 65.88541393944361|2024-11-10 16:34:...|
|      0|       15.0|  47.6648676325129|2024-11-10 16:34:...|
|      0|       15.0|41.192956779395345|2024-11-10 16:34:...|
|      0|       15.0| 48.33583958823361|2024-11-10 16:34:...|
|      0|       15.0| 63.51283803271804|2024-11-10 16:34:...|
|      0|       15.0| 49.46230814681846|2024-11-10 16:34:...|
|      0|       15.0| 69.47282632062922|2024-11-10 16:34:...|
|      0|       15.0| 44.90209397339871|2024-11-10 16:34:...|
|      0|       15.0| 59.94657116708723|2024-11-10 16:34:...|
|      0|       15.0|  69.3285138410589|2024-11-10 16:34:...|
|      0|       15.0| 55.66443897183046|2024-11-10 16:34:...|
|      0|       15.0|50.335983184846214|2024-11-10 16:34:...|
+-------+-----------+------------------+--------------------+
only showing top 20 rows

+-------+------------------+------------------+-------------------+
|room_id|   avg_temperature|      avg_humidity|       window_start|
+-------+------------------+------------------+-------------------+
|      1| 32.76300640056677| 55.83322304998182|2024-11-10 16:34:00|
|      6|31.599281969780733| 54.31957125861709|2024-11-10 16:34:00|
|      0|              15.0| 54.86224915201184|2024-11-10 16:34:00|
|      7| 32.19592394408673| 52.48017193021937|2024-11-10 16:34:00|
|      5|34.442352417962596| 56.01309408524613|2024-11-10 16:34:00|
|      3| 32.22797816392215|51.680516221933765|2024-11-10 16:34:00|
|      9| 33.57554505557305| 49.93570367835278|2024-11-10 16:34:00|
|      4|  32.6300721796979| 53.23019602240481|2024-11-10 16:34:00|
|      8| 33.16914189350683|56.533159625675594|2024-11-10 16:34:00|
|      2|  33.4508268621457| 56.92370938768508|2024-11-10 16:34:00|
+-------+------------------+------------------+-------------------+
Batch: 2
-------------------------------------------
+-------+-----------------+
|room_id|critical_readings|
+-------+-----------------+
|      7|                4|
|      3|                7|
|      8|                4|
|      0|                5|
|      5|                6|
|      6|                7|
|      9|                5|
|      1|                8|
|      4|                6|
|      2|               11|
+-------+-----------------+

                                                                                
-------------------------------------------
Batch: 7
-------------------------------------------
+-------+-----------+-----------------+--------------------+
|room_id|temperature|         humidity|           timestamp|
+-------+-----------+-----------------+--------------------+
|      0|       15.0|68.54026695502095|2024-11-10 16:35:...|
|      0|       15.0| 53.5461013265676|2024-11-10 16:35:...|
|      0|       15.0|64.16574313997725|2024-11-10 16:35:...|
|      0|       15.0|68.07221967857618|2024-11-10 16:35:...|
|      0|       15.0|42.73578133221992|2024-11-10 16:35:...|
|      0|       15.0| 61.4728522207287|2024-11-10 16:35:...|
|      0|       15.0|62.36773364843525|2024-11-10 16:35:...|
+-------+-----------+-----------------+--------------------+

-------------------------------------------
Batch: 3
-------------------------------------------
+-------+------------------+------------------+-------------------+
|room_id|   avg_temperature|      avg_humidity|       window_start|
+-------+------------------+------------------+-------------------+
|      1|32.738996702236335| 56.11535396285104|2024-11-10 16:34:00|
|      4| 32.93470750409059| 55.63457608835043|2024-11-10 16:35:00|
|      6|31.253068174164998|52.822925319092995|2024-11-10 16:34:00|
|      0|              15.0|56.808693953559036|2024-11-10 16:34:00|
|      3| 33.35474787013408| 53.53285029479027|2024-11-10 16:35:00|
|      9|31.520259823498016| 55.13760961248574|2024-11-10 16:35:00|
|      7| 32.09723745519446| 58.01743134086591|2024-11-10 16:35:00|
|      0|              15.0| 55.42466173182326|2024-11-10 16:35:00|
|      8|32.669103647680764| 57.05938964193508|2024-11-10 16:35:00|
|      7|31.591731611547623| 54.59876975414006|2024-11-10 16:34:00|
|      5|32.353789797777786| 54.34324027368991|2024-11-10 16:34:00|
|      3|32.247179473057486| 52.26573996356538|2024-11-10 16:34:00|
|      9| 33.22067200354551|54.058638824197374|2024-11-10 16:34:00|
|      6| 32.37975404543154|54.470785934098295|2024-11-10 16:35:00|
|      4| 33.27757777526928|  53.3628878841838|2024-11-10 16:34:00|
|      1|33.268124737285234| 57.03193401954078|2024-11-10 16:35:00|
|      5| 35.14004722919333| 51.79676012258354|2024-11-10 16:35:00|
|      2| 31.15934863455892| 55.40482440544203|2024-11-10 16:35:00|
|      8|  31.9204413655718|56.480107214969586|2024-11-10 16:34:00|
|      2|34.829563449463805|53.472635219224216|2024-11-10 16:34:00|
+-------+------------------+------------------+-------------------+

                                                                                
-------------------------------------------
Batch: 8
-------------------------------------------
+-------+-----------+------------------+--------------------+
|room_id|temperature|          humidity|           timestamp|
+-------+-----------+------------------+--------------------+
|      0|       15.0| 44.19899381972439|2024-11-10 16:35:...|
|      0|       15.0| 46.57568407023478|2024-11-10 16:36:...|
|      0|       15.0|42.510301401322806|2024-11-10 16:35:...|
|      0|       15.0| 60.46306760173239|2024-11-10 16:36:...|
|      0|       15.0| 44.98471489366687|2024-11-10 16:36:...|
|      0|       15.0|62.830024296949006|2024-11-10 16:36:...|
|      0|       15.0| 60.37559665698927|2024-11-10 16:36:...|
+-------+-----------+------------------+--------------------+

-------------------------------------------
Batch: 3
-------------------------------------------
+-------+-----------------+
|room_id|critical_readings|
+-------+-----------------+
|      7|                8|
|      3|                8|
|      8|                5|
|      0|                9|
|      5|                8|
|      6|               10|
|      9|                8|
|      1|                9|
|      4|               10|
|      2|               14|
+-------+-----------------+

Keep Streams Running

Ensure that the streaming queries continue to run to process incoming data.

# Keep the streams running

print("********Critical Temperature Values*******")
critical_query.awaitTermination()

print("********Average Readings Values********")
average_query.awaitTermination()
print("********Attention Needed Values********")
attention_query.awaitTermination()

********Critical Temperature Values*******
                                                                                
-------------------------------------------
Batch: 9
-------------------------------------------
-------------------------------------------
Batch: 4
-------------------------------------------
+-------+-----------+------------------+--------------------+
|room_id|temperature|          humidity|           timestamp|
+-------+-----------+------------------+--------------------+
|      0|       15.0| 65.25785746183612|2024-11-10 16:36:...|
|      0|       15.0| 63.59005303850523|2024-11-10 16:36:...|
|      0|       15.0| 51.80459401856719|2024-11-10 16:36:...|
|      0|       15.0|52.016990950858776|2024-11-10 16:36:...|
|      0|       15.0|59.028181071977336|2024-11-10 16:36:...|
|      0|       15.0|62.319577250559185|2024-11-10 16:36:...|
|      0|       15.0| 65.77341094029384|2024-11-10 16:36:...|
+-------+-----------+------------------+--------------------+

+-------+------------------+------------------+-------------------+
|room_id|   avg_temperature|      avg_humidity|       window_start|
+-------+------------------+------------------+-------------------+
|      3| 31.26689134263122|55.222681788454715|2024-11-10 16:36:00|
|      5| 30.25598458310833|57.156551891466904|2024-11-10 16:36:00|
|      1|32.738996702236335| 56.11535396285104|2024-11-10 16:34:00|
|      4|32.805317775448295| 56.19239398354054|2024-11-10 16:35:00|
|      6|31.253068174164998|52.822925319092995|2024-11-10 16:34:00|
|      7| 35.53240901321019| 55.04233603639229|2024-11-10 16:36:00|
|      9| 28.89614866402356| 59.21107270393841|2024-11-10 16:36:00|
|      0|              15.0|56.808693953559036|2024-11-10 16:34:00|
|      1|31.564660137613792|55.257994538765345|2024-11-10 16:36:00|
|      3|33.388467252687185| 54.75901823489416|2024-11-10 16:35:00|
|      6| 34.31281009448467| 56.87442223018632|2024-11-10 16:36:00|
|      9| 32.75448315064278| 54.40204739335334|2024-11-10 16:35:00|
|      7| 32.28993394858937| 56.98815844963872|2024-11-10 16:35:00|
|      0|              15.0|55.967461843512545|2024-11-10 16:35:00|
|      8|33.221365462754626| 56.20614214041025|2024-11-10 16:35:00|
|      7|31.591731611547623| 54.59876975414006|2024-11-10 16:34:00|
|      5|32.353789797777786| 54.34324027368991|2024-11-10 16:34:00|
|      3|32.247179473057486| 52.26573996356538|2024-11-10 16:34:00|
|      4|29.153413691757812| 60.14237852117598|2024-11-10 16:36:00|
|      2| 33.70749714214621| 54.62390390067408|2024-11-10 16:36:00|
+-------+------------------+------------------+-------------------+
only showing top 20 rows

Stop Spark

spark.stop()