X-Git-Url: https://git.rvb.name/weathermon.git/blobdiff_plain/e32107a7fe79ce34f3bdf860410a6d5455efdca7..d563aba7f65b64c452a6e69af6dc92354422bec6:/server/weathermon-mqtt?ds=inline diff --git a/server/weathermon-mqtt b/server/weathermon-mqtt index c6b6a82..809b4e9 100755 --- a/server/weathermon-mqtt +++ b/server/weathermon-mqtt @@ -1,94 +1,89 @@ -#!/usr/bin/python -u +#!/usr/bin/lua -import paho.mqtt.client as paho -import sys -from ConfigParser import ConfigParser -import MySQLdb -from pprint import pprint -import json -from dateutil.parser import parser -tparser=parser() +local random = math.random +local json = require "json" +math.randomseed(os.time()) -def on_message(mosq, obj, msg): - topic=msg.topic - payload=json.loads(msg.payload) - timestamp=tparser.parse(payload['Time']) - for sensor_type in payload: - if sensor_type != 'Time' and sensor_type != 'TempUnit': - sensor_data=payload[sensor_type] - for param in sensor_data: - try: - value=sensor_data[param] - try: - c = database.cursor() - c.execute('CALL meteo.submit_mqtt(%s,%s,%s,%s,NULL)', (topic,sensor_type,param,value)) - database.commit() - print topic,sensor_type,param,value - except: - print "Failed to submit data" - except: - None +local function uuid() + local template ='xxxx-xxxx' + return string.gsub(template, '[x]', function (c) + local v = random(0, 0xf) + return string.format('%x', v) + end) +end -def Topics(): +function process_MSG(mid, topic, payload) + print(topic, payload) + pcall(function(topic,payload) + print(payload) + payload = json.decode(payload) + local time = os.date(payload['Time']) + if not time then time = os.date(payload['time']); end + local model = payload['model'] + local id = payload['id'] + for sensor_type,sensor_data in pairs(payload) do + if sensor_type ~= "Time" and sensor_type ~= "TempUnit" and sensor_type ~= "model" and sensor_type ~="id" and sensor_type ~= "time" then + if model then + conn:execute(string.format("CALL meteo.submit_mqtt('%s','%s','%s','%s',NULL)", topic,model,sensor_type,sensor_data)) + conn:commit() + print(topic,model,sensor_type,sensor_data) + else + for param,value in pairs(sensor_data) do + conn:execute(string.format("CALL meteo.submit_mqtt('%s','%s','%s','%s',NULL)", topic,sensor_type,param,value)) + conn:commit() + print(topic,sensor_type,param,value) + end + end + end + end + end, topic, payload) +end - c = database.cursor() - c.execute( - ''' - select topic from mqtt_topics where topic<>"" - ''' - ) +uci = require "uci" - topics=c.fetchall() - return topics - - -def Init(): +config_name = arg[1] +if not config_name then + config_name = "weathermon_mqtt" +end - global client,database +mqtt_host = uci.get(config_name,"mqtt","server") +if not mqtt_host then + mqtt_host="127.0.0.1" +end - conffile = sys.argv[1] +mqtt_port = uci.get(config_name,"mqtt","port") +if not mqtt_port then + mqtt_host=1883 +end - config = ConfigParser() - config.add_section('mqtt') - # set defaults for anonymous auth - config.set('mqtt', 'username', '') - config.set('mqtt', 'password', '') - config.set('mqtt', 'port', '1883') - config.read(conffile) +mqtt_user = uci.get(config_name,"mqtt","username") +mqtt_pwd = uci.get(config_name,"mqtt","password") - mqtt_server = config.get('mqtt', 'server') - mqtt_port = config.getint('mqtt', 'port') - mqtt_username = config.get('mqtt', 'username') - mqtt_password = config.get('mqtt', 'password') +mqtt_id = "wm-mqtt-"..uuid() - mysql_server = config.get('mysql', 'server') - mysql_username = config.get('mysql','username') - mysql_password = config.get('mysql','password') - mysql_db = config.get('mysql','db') - - client = paho.Client('weather') - client.username_pw_set(mqtt_username, mqtt_password) - client.on_message=on_message - client.connect(mqtt_server, port=mqtt_port) +db_server = uci.get(config_name,"db","server") +db_name = uci.get(config_name,"db","db") +db_user = uci.get(config_name,"db","username") +db_pwd = uci.get(config_name,"db","password") - database = MySQLdb.connect(host=mysql_server,user=mysql_username,passwd=mysql_password,db=mysql_db,use_unicode=True,connect_timeout=10) - database.set_character_set('utf8') - c = database.cursor() - c.execute('SET NAMES utf8;') +env = require("luasql.mysql").mysql() +conn = env:connect(db_name,db_user,db_pwd,db_server) +conn:execute("SET NAMES utf8") - topics=[] - for topic in Topics(): - topics.append((topic[0].encode('UTF-8'),1)) - - client.subscribe(topics) +MQTT = require "mosquitto" +mqtt_client = MQTT.new(mqtt_id) +if mqtt_user then + mqtt_client:login_set(mqtt_user, mqtt_pwd) +end -Init() +mqtt_client:connect("estia.rvb-home.lan",1883) +mqtt_client.ON_MESSAGE = process_MSG -try: - while True: - try: - client.loop() - except: - break -finally: - exit() +cur = conn:execute("SELECT DISTINCT topic FROM mqtt_topics WHERE topic<>''") +rec = cur:fetch() +while rec do + mqtt_client:subscribe(rec) + rec = cur:fetch() +end + +mqtt_client:loop_forever()