# To suppress warnings generated by the code
def warn(*args, **kwargs):
pass
import warnings
= warn
warnings.warn 'ignore') warnings.filterwarnings(
Import Libraries
Streaming Sensor Monitoring - HVAC
Background
SBS, specializes in optimizing HVAC (heating, ventilation, and air conditioning) systems to enhance comfort and energy efficiency in commercial buildings. By monitoring temperature and humidity levels in real-time across various rooms, the company aims to ensure optimal indoor conditions and preemptively address potential HVAC issues.
With a continuous influx of sensor data, Smart Building Solutions needs to process and analyze this data in real-time to maintain the quality of the indoor environment.
Data set description
The simulated data set comprises:
room_id
: Unique identifier for each room (e.g., R001, R002).temperature
: Current temperature reading from the sensor (in °C).humidity
: Current humidity level reading from the sensor (in %).timestamp
: Time when the reading was recorded (automatically generated by Spark). The data is generated at a rate of 5 rows per second, simulating multiple rooms with various environmental conditions.
Challenges
Monitoring indoor environmental conditions poses several challenges:
- High data velocity: Continuous data from multiple sensors can overwhelm traditional systems.
- Need for immediate alerts: Delays in identifying critical conditions can lead to discomfort or system inefficiencies.
- Need for data aggregation and analysis: Efficiently aggregating and analyzing real-time data for proactive maintenance and optimization is essential.
Objectives
- Explain the distributed architecture of Spark in the context of smart building monitoring
- Simulate real-time sensor data for HVAC systems in a building
- Perform SQL queries to detect critical environmental conditions and calculate average readings
- Determine the aggregated results to the console for immediate insights into room conditions
You will understand the Spark’s distributed architecture. You also will understand how to simulate real-time sensor data for temperature and humidity, execute SQL queries to identify critical environmental conditions, and output aggregated results for immediate insights.
Setup
Suppress Warnings
To suppress warnings generated by our code, we’ll use this code block
# FindSpark simplifies the process of using Apache Spark with Python
import findspark
findspark.init()
from pyspark.sql import SparkSession
#import functions/Classes for sparkml
from pyspark.ml.clustering import KMeans
Start Spark Session
#Create SparkSession
#Ignore any warnings by SparkSession command
= SparkSession.builder.appName("Smart HVAC Monitoring").getOrCreate() spark
Simulate Sensor Data
Use Spark’s rate source to generate continuous readings from multiple rooms.
from pyspark.sql.functions import expr, rand,when
# Simulate sensor data with room IDs and readings
= spark.readStream.format("rate").option("rowsPerSecond", 5).load() \
sensor_data "room_id", expr("CAST(value % 10 AS STRING)")) \
.withColumn("temperature", when(expr("value % 10 == 0"), 15) # Set temperature to 15 for one specific record
.withColumn(20 + rand() * 25)) \
.otherwise("humidity", expr("40 + rand() * 30")) .withColumn(
Create TempView
# Create a temporary SQL view for the sensor data
"sensor_table") sensor_data.createOrReplaceTempView(
Query Data
Define Queries
Define SQL queries for aggregation and analysis
- Critical temperature query: Detect rooms with critical temperature levels
- Average readings query: Calculate average readings over a 1-minute window
- Attention needed query: Identify rooms that need immediate attention based on humidity levels
# SQL Query to detect rooms with critical temperatures
= """
critical_temperature_query SELECT
room_id,
temperature,
humidity,
timestamp
FROM sensor_table
WHERE temperature < 18 OR temperature > 60
"""
# SQL Query to calculate average readings over a 1-minute window
= """
average_readings_query SELECT
room_id,
AVG(temperature) AS avg_temperature,
AVG(humidity) AS avg_humidity,
window.start AS window_start
FROM sensor_table
GROUP BY room_id, window(timestamp, '1 minute')
"""
# SQL Query to find rooms that need immediate attention based on humidity
= """
attention_needed_query SELECT
room_id,
COUNT(*) AS critical_readings
FROM sensor_table
WHERE humidity < 45 OR humidity > 75
GROUP BY room_id
"""
Query the Data
Create Streaming DF
# Execute the critical temperature query
= spark.sql(critical_temperature_query)
critical_temperatures_stream
# Execute the average readings query
= spark.sql(average_readings_query)
average_readings_stream
# Execute the attention needed query
= spark.sql(attention_needed_query) attention_needed_stream
Output Queries
Display results in real time from each query
# Output the results to the console for all queries
= critical_temperatures_stream.writeStream \
critical_query "append") \
.outputMode(format("console") \
."Critical Temperatures") \
.queryName(
.start()
= average_readings_stream.writeStream \
average_query "complete") \
.outputMode(format("console") \
."Average Readings") \
.queryName(
.start()
= attention_needed_stream.writeStream \
attention_query "complete") \
.outputMode(format("console") \
."Attention Needed") \
.queryName(
.start()
# OUTPUT
-------------------------------------------
0
Batch: -------------------------------------------
+-------+-----------+--------+---------+
|room_id|temperature|humidity|timestamp|
+-------+-----------+--------+---------+
+-------+-----------+--------+---------+
-------------------------------------------
1
Batch: -------------------------------------------
+-------+-----------+------------------+--------------------+
|room_id|temperature| humidity| timestamp|
+-------+-----------+------------------+--------------------+
| 0| 15.0| 53.50163855231966|2024-11-10 16:34:...|
| 0| 15.0| 43.67782978570206|2024-11-10 16:34:...|
| 0| 15.0|54.956310853392594|2024-11-10 16:34:...|
| 0| 15.0|42.050346866531584|2024-11-10 16:34:...|
+-------+-----------+------------------+--------------------+
-------------------------------------------
0
Batch: -------------------------------------------
+-------+---------------+------------+------------+
|room_id|avg_temperature|avg_humidity|window_start|
+-------+---------------+------------+------------+
+-------+---------------+------------+------------+
-------------------------------------------
2
Batch: -------------------------------------------
+-------+-----------+------------------+--------------------+
|room_id|temperature| humidity| timestamp|
+-------+-----------+------------------+--------------------+
| 0| 15.0| 60.35485860928195|2024-11-10 16:34:...|
| 0| 15.0|56.841748430871405|2024-11-10 16:34:...|
+-------+-----------+------------------+--------------------+
-------------------------------------------
0
Batch: -------------------------------------------
+-------+-----------------+
|room_id|critical_readings|
+-------+-----------------+
+-------+-----------------+
-------------------------------------------
3
Batch: -------------------------------------------
-------------------------------------------
1
Batch: -------------------------------------------
+-------+-----------+------------------+--------------------+
|room_id|temperature| humidity| timestamp|
+-------+-----------+------------------+--------------------+
| 0| 15.0| 60.52116674837685|2024-11-10 16:34:...|
| 0| 15.0|53.234704114821156|2024-11-10 16:34:...|
| 0| 15.0| 61.05123960892497|2024-11-10 16:34:...|
| 0| 15.0| 43.11507429428427|2024-11-10 16:34:...|
| 0| 15.0| 47.80419520443808|2024-11-10 16:34:...|
| 0| 15.0| 59.17572509036224|2024-11-10 16:34:...|
| 0| 15.0|51.659978419091345|2024-11-10 16:34:...|
| 0| 15.0| 46.24606003011726|2024-11-10 16:34:...|
| 0| 15.0| 65.88541393944361|2024-11-10 16:34:...|
| 0| 15.0| 47.6648676325129|2024-11-10 16:34:...|
| 0| 15.0|41.192956779395345|2024-11-10 16:34:...|
| 0| 15.0| 48.33583958823361|2024-11-10 16:34:...|
| 0| 15.0| 63.51283803271804|2024-11-10 16:34:...|
| 0| 15.0| 49.46230814681846|2024-11-10 16:34:...|
| 0| 15.0| 69.47282632062922|2024-11-10 16:34:...|
| 0| 15.0| 44.90209397339871|2024-11-10 16:34:...|
| 0| 15.0| 59.94657116708723|2024-11-10 16:34:...|
| 0| 15.0| 69.3285138410589|2024-11-10 16:34:...|
| 0| 15.0| 55.66443897183046|2024-11-10 16:34:...|
| 0| 15.0|50.335983184846214|2024-11-10 16:34:...|
+-------+-----------+------------------+--------------------+
20 rows
only showing top
+-------+------------------+------------------+-------------------+
|room_id| avg_temperature| avg_humidity| window_start|
+-------+------------------+------------------+-------------------+
| 1| 32.76300640056677| 55.83322304998182|2024-11-10 16:34:00|
| 6|31.599281969780733| 54.31957125861709|2024-11-10 16:34:00|
| 0| 15.0| 54.86224915201184|2024-11-10 16:34:00|
| 7| 32.19592394408673| 52.48017193021937|2024-11-10 16:34:00|
| 5|34.442352417962596| 56.01309408524613|2024-11-10 16:34:00|
| 3| 32.22797816392215|51.680516221933765|2024-11-10 16:34:00|
| 9| 33.57554505557305| 49.93570367835278|2024-11-10 16:34:00|
| 4| 32.6300721796979| 53.23019602240481|2024-11-10 16:34:00|
| 8| 33.16914189350683|56.533159625675594|2024-11-10 16:34:00|
| 2| 33.4508268621457| 56.92370938768508|2024-11-10 16:34:00|
+-------+------------------+------------------+-------------------+
2
Batch: -------------------------------------------
+-------+-----------------+
|room_id|critical_readings|
+-------+-----------------+
| 7| 4|
| 3| 7|
| 8| 4|
| 0| 5|
| 5| 6|
| 6| 7|
| 9| 5|
| 1| 8|
| 4| 6|
| 2| 11|
+-------+-----------------+
-------------------------------------------
7
Batch: -------------------------------------------
+-------+-----------+-----------------+--------------------+
|room_id|temperature| humidity| timestamp|
+-------+-----------+-----------------+--------------------+
| 0| 15.0|68.54026695502095|2024-11-10 16:35:...|
| 0| 15.0| 53.5461013265676|2024-11-10 16:35:...|
| 0| 15.0|64.16574313997725|2024-11-10 16:35:...|
| 0| 15.0|68.07221967857618|2024-11-10 16:35:...|
| 0| 15.0|42.73578133221992|2024-11-10 16:35:...|
| 0| 15.0| 61.4728522207287|2024-11-10 16:35:...|
| 0| 15.0|62.36773364843525|2024-11-10 16:35:...|
+-------+-----------+-----------------+--------------------+
-------------------------------------------
3
Batch: -------------------------------------------
+-------+------------------+------------------+-------------------+
|room_id| avg_temperature| avg_humidity| window_start|
+-------+------------------+------------------+-------------------+
| 1|32.738996702236335| 56.11535396285104|2024-11-10 16:34:00|
| 4| 32.93470750409059| 55.63457608835043|2024-11-10 16:35:00|
| 6|31.253068174164998|52.822925319092995|2024-11-10 16:34:00|
| 0| 15.0|56.808693953559036|2024-11-10 16:34:00|
| 3| 33.35474787013408| 53.53285029479027|2024-11-10 16:35:00|
| 9|31.520259823498016| 55.13760961248574|2024-11-10 16:35:00|
| 7| 32.09723745519446| 58.01743134086591|2024-11-10 16:35:00|
| 0| 15.0| 55.42466173182326|2024-11-10 16:35:00|
| 8|32.669103647680764| 57.05938964193508|2024-11-10 16:35:00|
| 7|31.591731611547623| 54.59876975414006|2024-11-10 16:34:00|
| 5|32.353789797777786| 54.34324027368991|2024-11-10 16:34:00|
| 3|32.247179473057486| 52.26573996356538|2024-11-10 16:34:00|
| 9| 33.22067200354551|54.058638824197374|2024-11-10 16:34:00|
| 6| 32.37975404543154|54.470785934098295|2024-11-10 16:35:00|
| 4| 33.27757777526928| 53.3628878841838|2024-11-10 16:34:00|
| 1|33.268124737285234| 57.03193401954078|2024-11-10 16:35:00|
| 5| 35.14004722919333| 51.79676012258354|2024-11-10 16:35:00|
| 2| 31.15934863455892| 55.40482440544203|2024-11-10 16:35:00|
| 8| 31.9204413655718|56.480107214969586|2024-11-10 16:34:00|
| 2|34.829563449463805|53.472635219224216|2024-11-10 16:34:00|
+-------+------------------+------------------+-------------------+
-------------------------------------------
8
Batch: -------------------------------------------
+-------+-----------+------------------+--------------------+
|room_id|temperature| humidity| timestamp|
+-------+-----------+------------------+--------------------+
| 0| 15.0| 44.19899381972439|2024-11-10 16:35:...|
| 0| 15.0| 46.57568407023478|2024-11-10 16:36:...|
| 0| 15.0|42.510301401322806|2024-11-10 16:35:...|
| 0| 15.0| 60.46306760173239|2024-11-10 16:36:...|
| 0| 15.0| 44.98471489366687|2024-11-10 16:36:...|
| 0| 15.0|62.830024296949006|2024-11-10 16:36:...|
| 0| 15.0| 60.37559665698927|2024-11-10 16:36:...|
+-------+-----------+------------------+--------------------+
-------------------------------------------
3
Batch: -------------------------------------------
+-------+-----------------+
|room_id|critical_readings|
+-------+-----------------+
| 7| 8|
| 3| 8|
| 8| 5|
| 0| 9|
| 5| 8|
| 6| 10|
| 9| 8|
| 1| 9|
| 4| 10|
| 2| 14|
+-------+-----------------+
Keep Streams Running
Ensure that the streaming queries continue to run to process incoming data.
# Keep the streams running
print("********Critical Temperature Values*******")
critical_query.awaitTermination()
print("********Average Readings Values********")
average_query.awaitTermination()print("********Attention Needed Values********")
attention_query.awaitTermination()
********Critical Temperature Values*******
-------------------------------------------
9
Batch: -------------------------------------------
-------------------------------------------
4
Batch: -------------------------------------------
+-------+-----------+------------------+--------------------+
|room_id|temperature| humidity| timestamp|
+-------+-----------+------------------+--------------------+
| 0| 15.0| 65.25785746183612|2024-11-10 16:36:...|
| 0| 15.0| 63.59005303850523|2024-11-10 16:36:...|
| 0| 15.0| 51.80459401856719|2024-11-10 16:36:...|
| 0| 15.0|52.016990950858776|2024-11-10 16:36:...|
| 0| 15.0|59.028181071977336|2024-11-10 16:36:...|
| 0| 15.0|62.319577250559185|2024-11-10 16:36:...|
| 0| 15.0| 65.77341094029384|2024-11-10 16:36:...|
+-------+-----------+------------------+--------------------+
+-------+------------------+------------------+-------------------+
|room_id| avg_temperature| avg_humidity| window_start|
+-------+------------------+------------------+-------------------+
| 3| 31.26689134263122|55.222681788454715|2024-11-10 16:36:00|
| 5| 30.25598458310833|57.156551891466904|2024-11-10 16:36:00|
| 1|32.738996702236335| 56.11535396285104|2024-11-10 16:34:00|
| 4|32.805317775448295| 56.19239398354054|2024-11-10 16:35:00|
| 6|31.253068174164998|52.822925319092995|2024-11-10 16:34:00|
| 7| 35.53240901321019| 55.04233603639229|2024-11-10 16:36:00|
| 9| 28.89614866402356| 59.21107270393841|2024-11-10 16:36:00|
| 0| 15.0|56.808693953559036|2024-11-10 16:34:00|
| 1|31.564660137613792|55.257994538765345|2024-11-10 16:36:00|
| 3|33.388467252687185| 54.75901823489416|2024-11-10 16:35:00|
| 6| 34.31281009448467| 56.87442223018632|2024-11-10 16:36:00|
| 9| 32.75448315064278| 54.40204739335334|2024-11-10 16:35:00|
| 7| 32.28993394858937| 56.98815844963872|2024-11-10 16:35:00|
| 0| 15.0|55.967461843512545|2024-11-10 16:35:00|
| 8|33.221365462754626| 56.20614214041025|2024-11-10 16:35:00|
| 7|31.591731611547623| 54.59876975414006|2024-11-10 16:34:00|
| 5|32.353789797777786| 54.34324027368991|2024-11-10 16:34:00|
| 3|32.247179473057486| 52.26573996356538|2024-11-10 16:34:00|
| 4|29.153413691757812| 60.14237852117598|2024-11-10 16:36:00|
| 2| 33.70749714214621| 54.62390390067408|2024-11-10 16:36:00|
+-------+------------------+------------------+-------------------+
20 rows only showing top
Stop Spark
spark.stop()