Обработка сообщений MQTT в формате rtl_433
[weathermon.git] / server / weathermon-mqtt
index c6b6a82db1f6959e4bd297eb0fb868f5d09551de..809b4e9521e85354b6fcc10e6945184122a717c2 100755 (executable)
@@ -1,94 +1,89 @@
-#!/usr/bin/python -u
+#!/usr/bin/lua
 
 
-import paho.mqtt.client as paho
-import sys
-from ConfigParser import ConfigParser
-import MySQLdb
-from pprint import pprint
-import json
-from dateutil.parser import parser
-tparser=parser()
+local random = math.random
+local json = require "json"
+math.randomseed(os.time())
 
 
-def on_message(mosq, obj, msg):
-  topic=msg.topic
-  payload=json.loads(msg.payload)
-  timestamp=tparser.parse(payload['Time'])
-  for sensor_type in payload:
-    if sensor_type != 'Time' and sensor_type != 'TempUnit':
-      sensor_data=payload[sensor_type]
-      for param in sensor_data:
-        try:
-          value=sensor_data[param]
-          try:
-            c = database.cursor()
-            c.execute('CALL meteo.submit_mqtt(%s,%s,%s,%s,NULL)', (topic,sensor_type,param,value))
-            database.commit()
-            print topic,sensor_type,param,value
-          except:
-            print "Failed to submit data"
-        except:
-          None    
+local function uuid()
+    local template ='xxxx-xxxx'
+    return string.gsub(template, '[x]', function (c)
+       local v = random(0, 0xf)        
+       return string.format('%x', v)
+    end)
+end
 
 
-def Topics():
+function process_MSG(mid, topic, payload)
+  print(topic, payload)
+  pcall(function(topic,payload)
+    print(payload)
+    payload = json.decode(payload)
+    local time = os.date(payload['Time'])
+    if not time then time = os.date(payload['time']); end 
+    local model = payload['model']
+    local id = payload['id']
+    for sensor_type,sensor_data in pairs(payload) do
+      if sensor_type ~= "Time" and sensor_type ~= "TempUnit" and sensor_type ~= "model" and sensor_type ~="id" and sensor_type ~= "time" then
+        if model then
+          conn:execute(string.format("CALL meteo.submit_mqtt('%s','%s','%s','%s',NULL)", topic,model,sensor_type,sensor_data))
+          conn:commit()
+          print(topic,model,sensor_type,sensor_data)
+        else 
+          for param,value in pairs(sensor_data) do
+            conn:execute(string.format("CALL meteo.submit_mqtt('%s','%s','%s','%s',NULL)", topic,sensor_type,param,value))
+            conn:commit()
+            print(topic,sensor_type,param,value)
+          end
+        end  
+      end
+    end
+  end, topic, payload) 
+end
 
 
-  c = database.cursor()
-  c.execute(
-    '''
-    select topic from mqtt_topics where topic<>""
-    '''
-  )
+uci = require "uci"
 
 
-  topics=c.fetchall()
-  return topics
-  
-  
-def Init():
+config_name = arg[1]
+if not config_name then
+  config_name = "weathermon_mqtt"
+end
 
 
-  global client,database
+mqtt_host = uci.get(config_name,"mqtt","server")
+if not mqtt_host then
+  mqtt_host="127.0.0.1"
+end
 
 
-  conffile = sys.argv[1]
+mqtt_port = uci.get(config_name,"mqtt","port")
+if not mqtt_port then
+  mqtt_host=1883
+end
 
 
-  config = ConfigParser()
-  config.add_section('mqtt')
-  # set defaults for anonymous auth
-  config.set('mqtt', 'username', '')
-  config.set('mqtt', 'password', '')
-  config.set('mqtt', 'port', '1883')
-  config.read(conffile)
+mqtt_user = uci.get(config_name,"mqtt","username")
+mqtt_pwd = uci.get(config_name,"mqtt","password")
 
 
-  mqtt_server = config.get('mqtt', 'server')
-  mqtt_port = config.getint('mqtt', 'port')
-  mqtt_username = config.get('mqtt', 'username')
-  mqtt_password = config.get('mqtt', 'password')
+mqtt_id = "wm-mqtt-"..uuid()
 
 
-  mysql_server = config.get('mysql', 'server')
-  mysql_username = config.get('mysql','username')
-  mysql_password = config.get('mysql','password')
-  mysql_db = config.get('mysql','db')  
-  
-  client = paho.Client('weather')
-  client.username_pw_set(mqtt_username, mqtt_password)
-  client.on_message=on_message
-  client.connect(mqtt_server, port=mqtt_port)
+db_server = uci.get(config_name,"db","server")
+db_name = uci.get(config_name,"db","db")
+db_user = uci.get(config_name,"db","username")
+db_pwd = uci.get(config_name,"db","password")
 
 
-  database = MySQLdb.connect(host=mysql_server,user=mysql_username,passwd=mysql_password,db=mysql_db,use_unicode=True,connect_timeout=10)
-  database.set_character_set('utf8')
-  c = database.cursor()
-  c.execute('SET NAMES utf8;')
+env = require("luasql.mysql").mysql()
+conn = env:connect(db_name,db_user,db_pwd,db_server)
+conn:execute("SET NAMES utf8")
 
 
-  topics=[]
-  for topic in Topics():
-    topics.append((topic[0].encode('UTF-8'),1))
-  
-  client.subscribe(topics)
+MQTT = require "mosquitto"
+mqtt_client = MQTT.new(mqtt_id)
+if mqtt_user then
+  mqtt_client:login_set(mqtt_user, mqtt_pwd)
+end
 
 
-Init()
+mqtt_client:connect("estia.rvb-home.lan",1883)
+mqtt_client.ON_MESSAGE = process_MSG
 
 
-try:
-  while True:
-    try:
-      client.loop()
-    except:
-      break
-finally:
-  exit()   
+cur = conn:execute("SELECT DISTINCT topic FROM mqtt_topics WHERE topic<>''")
+rec = cur:fetch()
+while rec do
+  mqtt_client:subscribe(rec)
+  rec = cur:fetch()
+end
+mqtt_client:loop_forever()