Bugfixes
[rtl-433.git] / tests / mqtt_rtl_433_test.py
1 #!/usr/bin/env python3
2 """ MQTT test client for receiving rtl_433 JSON data
3
4 Example program for receiving and parsing sensor data from rtl_433 sent
5 as MQTT network messages. Recommended way of sending rtl_433 data on network is:
6
7 $ rtl_433 -F json -U | mosquitto_pub -t home/rtl_433 -l
8
9 An MQTT broker e.g. 'mosquitto' must be running on local compputer
10
11 Copyright (C) 2017 Tommy Vestermark
12 This program is free software; you can redistribute it and/or modify
13 it under the terms of the GNU General Public License as published by
14 the Free Software Foundation; either version 2 of the License, or
15 (at your option) any later version.
16 """
17
18 import multiprocessing as mp
19 import logging
20 import paho.mqtt.client as mqtt
21 import json
22 import sys, os
23 import time, datetime
24
25 MQTT_SERVER = "127.0.0.1"
26 MQTT_TOPIC_PREFIX = "home/rtl_433"
27 TIMEOUT_STALE_SENSOR = 600      # Seconds before showing a timeout indicator
28
29 #log = logging.getLogger()  # Single process logger
30 log = mp.log_to_stderr()    # Multiprocessing capable logger
31 mqtt_client = mqtt.Client("RTL_433_Test")
32
33 sensor_state = dict()   # Dictionary containing accumulated sensor state
34
35
36 def print_sensor_state():
37     """ Print accumulated sensor state """
38     time_now = datetime.datetime.utcnow().replace(microsecond=0)
39     print("\nUpdate per {} UTC".format(time_now.isoformat(sep=' ')))
40     for model in sensor_state:
41         print(model)
42         for ID in sensor_state[model]:
43             data = sensor_state[model][ID]['data'].copy()
44             timestamp = data.pop('time')
45             timedelta = (time_now-timestamp).total_seconds()
46             indicator = "*" if (timedelta < 2) else "~" if (timedelta > TIMEOUT_STALE_SENSOR) else " "   # Indicator for new and stale data
47             print("  ID {:5} {}{} {}".format(ID, timestamp.isoformat(sep=' '), indicator, data))
48     sys.stdout.flush()  # Print in real-time
49
50
51 def on_connect(client, userdata, rc):
52     """ Callback for when the client receives a CONNACK response from the server. """
53     log.info("MQTT Connection: " + mqtt.connack_string(rc))
54     if rc != 0:
55         log.error("Could not connect. RC: " + str(rc))
56         exit()
57     # Subscribing in on_connect() means that if we lose the connection and reconnect then subscriptions will be renewed.
58     client.subscribe(MQTT_TOPIC_PREFIX + "/#")
59
60
61 def on_disconnect(client, userdata, rc):
62     if rc != 0:
63         log.error("Unexpected disconnection. RC: " + str(rc))
64
65
66 def on_message(client, userdata, msg):
67     """ Callback for when a PUBLISH message is received from the server. """
68     if msg.topic.startswith(MQTT_TOPIC_PREFIX):
69         try:
70             # Decode JSON payload
71             d = json.loads(msg.payload.decode())
72         except:
73             log.warning("JSON decode error: " + msg.payload.decode())
74             return
75
76         # Convert time string to datetime object
77         time_str = d.get('time', "0000-00-00 00:00:00")
78         time_utc = datetime.datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S")
79         d['time'] = time_utc
80         # Update sensor_state
81         sensor_model = d.pop('model', 'unknown')
82         sensor_id = d.pop('id', 0)
83         sensor_state.setdefault(sensor_model,{}).setdefault(sensor_id,{})['data'] = d
84         print_sensor_state()
85     else:
86         log.info("Uknown topic: " + msg.topic + "\t" + msg.payload.decode())
87
88
89 # Setup MQTT client
90 mqtt_client.on_connect = on_connect
91 mqtt_client.on_disconnect = on_disconnect
92 mqtt_client.on_message = on_message
93 mqtt_client.connect(MQTT_SERVER)
94 mqtt_client.loop_start()
95
96
97 def main():
98     """MQTT Test Client"""
99     log.setLevel(logging.INFO)
100     log.info("MQTT RTL_433 Test Client")
101
102     while True:
103         time.sleep(1)
104
105
106 if __name__ == "__main__":
107     main()