X-Git-Url: https://git.rvb.name/rtl-433.git/blobdiff_plain/ca13278b24eb61443559bcb61e64627fba3d8823..6d15c6f967221af825cf84e3ed12b96c763b127b:/tests/mqtt_rtl_433_test.py diff --git a/tests/mqtt_rtl_433_test.py b/tests/mqtt_rtl_433_test.py new file mode 100755 index 0000000..bead871 --- /dev/null +++ b/tests/mqtt_rtl_433_test.py @@ -0,0 +1,107 @@ +#!/usr/bin/env python3 +""" MQTT test client for receiving rtl_433 JSON data + +Example program for receiving and parsing sensor data from rtl_433 sent +as MQTT network messages. Recommended way of sending rtl_433 data on network is: + +$ rtl_433 -F json -U | mosquitto_pub -t home/rtl_433 -l + +An MQTT broker e.g. 'mosquitto' must be running on local compputer + +Copyright (C) 2017 Tommy Vestermark +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; either version 2 of the License, or +(at your option) any later version. +""" + +import multiprocessing as mp +import logging +import paho.mqtt.client as mqtt +import json +import sys, os +import time, datetime + +MQTT_SERVER = "127.0.0.1" +MQTT_TOPIC_PREFIX = "home/rtl_433" +TIMEOUT_STALE_SENSOR = 600 # Seconds before showing a timeout indicator + +#log = logging.getLogger() # Single process logger +log = mp.log_to_stderr() # Multiprocessing capable logger +mqtt_client = mqtt.Client("RTL_433_Test") + +sensor_state = dict() # Dictionary containing accumulated sensor state + + +def print_sensor_state(): + """ Print accumulated sensor state """ + time_now = datetime.datetime.utcnow().replace(microsecond=0) + print("\nUpdate per {} UTC".format(time_now.isoformat(sep=' '))) + for model in sensor_state: + print(model) + for ID in sensor_state[model]: + data = sensor_state[model][ID]['data'].copy() + timestamp = data.pop('time') + timedelta = (time_now-timestamp).total_seconds() + indicator = "*" if (timedelta < 2) else "~" if (timedelta > TIMEOUT_STALE_SENSOR) else " " # Indicator for new and stale data + print(" ID {:5} {}{} {}".format(ID, timestamp.isoformat(sep=' '), indicator, data)) + sys.stdout.flush() # Print in real-time + + +def on_connect(client, userdata, rc): + """ Callback for when the client receives a CONNACK response from the server. """ + log.info("MQTT Connection: " + mqtt.connack_string(rc)) + if rc != 0: + log.error("Could not connect. RC: " + str(rc)) + exit() + # Subscribing in on_connect() means that if we lose the connection and reconnect then subscriptions will be renewed. + client.subscribe(MQTT_TOPIC_PREFIX + "/#") + + +def on_disconnect(client, userdata, rc): + if rc != 0: + log.error("Unexpected disconnection. RC: " + str(rc)) + + +def on_message(client, userdata, msg): + """ Callback for when a PUBLISH message is received from the server. """ + if msg.topic.startswith(MQTT_TOPIC_PREFIX): + try: + # Decode JSON payload + d = json.loads(msg.payload.decode()) + except: + log.warning("JSON decode error: " + msg.payload.decode()) + return + + # Convert time string to datetime object + time_str = d.get('time', "0000-00-00 00:00:00") + time_utc = datetime.datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S") + d['time'] = time_utc + # Update sensor_state + sensor_model = d.pop('model', 'unknown') + sensor_id = d.pop('id', 0) + sensor_state.setdefault(sensor_model,{}).setdefault(sensor_id,{})['data'] = d + print_sensor_state() + else: + log.info("Uknown topic: " + msg.topic + "\t" + msg.payload.decode()) + + +# Setup MQTT client +mqtt_client.on_connect = on_connect +mqtt_client.on_disconnect = on_disconnect +mqtt_client.on_message = on_message +mqtt_client.connect(MQTT_SERVER) +mqtt_client.loop_start() + + +def main(): + """MQTT Test Client""" + log.setLevel(logging.INFO) + log.info("MQTT RTL_433 Test Client") + + while True: + time.sleep(1) + + +if __name__ == "__main__": + main()