2 """ MQTT test client for receiving rtl_433 JSON data
4 Example program for receiving and parsing sensor data from rtl_433 sent
5 as MQTT network messages. Recommended way of sending rtl_433 data on network is:
7 $ rtl_433 -F json -U | mosquitto_pub -t home/rtl_433 -l
9 An MQTT broker e.g. 'mosquitto' must be running on local compputer
11 Copyright (C) 2017 Tommy Vestermark
12 This program is free software; you can redistribute it and/or modify
13 it under the terms of the GNU General Public License as published by
14 the Free Software Foundation; either version 2 of the License, or
15 (at your option) any later version.
18 import multiprocessing as mp
20 import paho.mqtt.client as mqtt
25 MQTT_SERVER = "127.0.0.1"
26 MQTT_TOPIC_PREFIX = "home/rtl_433"
27 TIMEOUT_STALE_SENSOR = 600 # Seconds before showing a timeout indicator
29 #log = logging.getLogger() # Single process logger
30 log = mp.log_to_stderr() # Multiprocessing capable logger
31 mqtt_client = mqtt.Client("RTL_433_Test")
33 sensor_state = dict() # Dictionary containing accumulated sensor state
36 def print_sensor_state():
37 """ Print accumulated sensor state """
38 time_now = datetime.datetime.utcnow().replace(microsecond=0)
39 print("\nUpdate per {} UTC".format(time_now.isoformat(sep=' ')))
40 for model in sensor_state:
42 for ID in sensor_state[model]:
43 data = sensor_state[model][ID]['data'].copy()
44 timestamp = data.pop('time')
45 timedelta = (time_now-timestamp).total_seconds()
46 indicator = "*" if (timedelta < 2) else "~" if (timedelta > TIMEOUT_STALE_SENSOR) else " " # Indicator for new and stale data
47 print(" ID {:5} {}{} {}".format(ID, timestamp.isoformat(sep=' '), indicator, data))
48 sys.stdout.flush() # Print in real-time
51 def on_connect(client, userdata, rc):
52 """ Callback for when the client receives a CONNACK response from the server. """
53 log.info("MQTT Connection: " + mqtt.connack_string(rc))
55 log.error("Could not connect. RC: " + str(rc))
57 # Subscribing in on_connect() means that if we lose the connection and reconnect then subscriptions will be renewed.
58 client.subscribe(MQTT_TOPIC_PREFIX + "/#")
61 def on_disconnect(client, userdata, rc):
63 log.error("Unexpected disconnection. RC: " + str(rc))
66 def on_message(client, userdata, msg):
67 """ Callback for when a PUBLISH message is received from the server. """
68 if msg.topic.startswith(MQTT_TOPIC_PREFIX):
71 d = json.loads(msg.payload.decode())
73 log.warning("JSON decode error: " + msg.payload.decode())
76 # Convert time string to datetime object
77 time_str = d.get('time', "0000-00-00 00:00:00")
78 time_utc = datetime.datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S")
81 sensor_model = d.pop('model', 'unknown')
82 sensor_id = d.pop('id', 0)
83 sensor_state.setdefault(sensor_model,{}).setdefault(sensor_id,{})['data'] = d
86 log.info("Uknown topic: " + msg.topic + "\t" + msg.payload.decode())
90 mqtt_client.on_connect = on_connect
91 mqtt_client.on_disconnect = on_disconnect
92 mqtt_client.on_message = on_message
93 mqtt_client.connect(MQTT_SERVER)
94 mqtt_client.loop_start()
98 """MQTT Test Client"""
99 log.setLevel(logging.INFO)
100 log.info("MQTT RTL_433 Test Client")
106 if __name__ == "__main__":