Автопереподключение к MQTT и для случая обработки не-JSON (все еще используется на...
[weathermon.git] / server / weathermon-mqtt
1 #!/usr/bin/lua
2
3 local random = math.random
4 local json = require "json"
5 math.randomseed(os.time())
6
7 local function uuid()
8     local template ='xxxx-xxxx'
9     return string.gsub(template, '[x]', function (c)
10         local v = random(0, 0xf)        
11         return string.format('%x', v)
12     end)
13 end
14
15 function process_MSG(mid, topic, payload)
16   print(topic, payload)
17   pcall(function(topic,payload)
18     payload = json.decode(payload)
19     local time = os.date(payload['Time'])
20     for sensor_type,sensor_data in pairs(payload) do
21       if sensor_type ~= "Time" and sensor_type ~= "TempUnit" then
22         for param,value in pairs(sensor_data) do
23           conn:execute(string.format("CALL meteo.submit_mqtt('%s','%s','%s','%s',NULL)", topic,sensor_type,param,value))
24           conn:commit()
25           print(topic,sensor_type,param,value)
26         end
27       end
28     end
29   end, topic, payload) 
30 end
31
32 uci = require "uci"
33
34 config_name = arg[1]
35 if not config_name then
36   config_name = "weathermon_mqtt"
37 end
38
39 mqtt_host = uci.get(config_name,"mqtt","server")
40 if not mqtt_host then
41   mqtt_host="127.0.0.1"
42 end
43
44 mqtt_port = uci.get(config_name,"mqtt","port")
45 if not mqtt_port then
46   mqtt_host=1883
47 end
48
49 mqtt_user = uci.get(config_name,"mqtt","username")
50 mqtt_pwd = uci.get(config_name,"mqtt","password")
51
52 mqtt_id = "wm-mqtt-"..uuid()
53
54 db_server = uci.get(config_name,"db","server")
55 db_name = uci.get(config_name,"db","db")
56 db_user = uci.get(config_name,"db","username")
57 db_pwd = uci.get(config_name,"db","password")
58
59 env = require("luasql.mysql").mysql()
60 conn = env:connect(db_name,db_user,db_pwd,db_server)
61 conn:execute("SET NAMES utf8")
62
63 MQTT = require "mosquitto"
64 mqtt_client = MQTT.new(mqtt_id)
65 if mqtt_user then
66   mqtt_client:login_set(mqtt_user, mqtt_pwd)
67 end
68
69 mqtt_client:connect("estia.rvb-home.lan",1883)
70 mqtt_client.ON_MESSAGE = process_MSG
71
72 cur = conn:execute("SELECT DISTINCT topic FROM mqtt_topics WHERE topic<>''")
73 rec = cur:fetch()
74 while rec do
75   mqtt_client:subscribe(rec)
76   rec = cur:fetch()
77 end
78  
79 mqtt_client:loop_forever()