DB "canary" added - with fail/restart when connection dropped
[weathermon.git] / server / weathermon-mqtt
1 #!/usr/bin/lua5.1
2
3 local random = math.random
4 local json = require "json"
5 math.randomseed(os.time())
6
7 local function uuid()
8     local template ='xxxx-xxxx'
9     return string.gsub(template, '[x]', function (c)
10         local v = random(0, 0xf)        
11         return string.format('%x', v)
12     end)
13 end
14
15 function process_MSG(mid, topic, payload)
16   pcall(function(topic,payload)
17     payload = json.decode(payload)
18     local time = os.date(payload['Time'])
19     if not time then time = os.date(payload['time']); end 
20     local model = payload['model']
21     local id = payload['id']
22     if not model then
23       device = payload['device']
24       if device then
25         model = device['model']
26         id = device['ieeeAddr']
27       end
28     end
29     for sensor_type,sensor_data in pairs(payload) do
30       if sensor_type ~= "Time" and sensor_type ~= "TempUnit" and sensor_type ~= "model" and sensor_type ~="id" and sensor_type ~= "time" and sensor_type ~= "device" and sensor_type ~="linkquality" and sensor_type ~= "battery" and sensor_type ~= "last_seen" and sensor_type ~= "voltage" then
31         if model then
32           conn:execute(string.format("CALL meteo.submit_mqtt('%s','%s','%s','%s',NULL)", topic,model,sensor_type,sensor_data))
33           conn:commit()
34           print(string.format("%s/%s (%s) -> %s", topic, model, sensor_type, sensor_data))
35         else 
36           for param,value in pairs(sensor_data) do
37             conn:execute(string.format("CALL meteo.submit_mqtt('%s','%s','%s','%s',NULL)", topic,sensor_type,param,value))
38             conn:commit()
39             print(string.format("%s/%s (%s) -> %s", topic, sensor_type, param, value))
40           end
41         end  
42       end
43     end
44   end, topic, payload) 
45   conn:execute('select 1') -- will fail if connection to DB broken
46 end
47
48 uci = require "uci"
49
50 config_name = arg[1]
51 if not config_name then
52   config_name = "weathermon_mqtt"
53 end
54
55 mqtt_host = uci.get(config_name,"mqtt","server")
56 if not mqtt_host then
57   mqtt_host="127.0.0.1"
58 end
59
60 mqtt_port = uci.get(config_name,"mqtt","port")
61 if not mqtt_port then
62   mqtt_host=1883
63 end
64
65 mqtt_user = uci.get(config_name,"mqtt","username")
66 mqtt_pwd = uci.get(config_name,"mqtt","password")
67
68 mqtt_id = "wm-mqtt-"..uuid()
69
70 db_server = uci.get(config_name,"db","server")
71 db_name = uci.get(config_name,"db","db")
72 db_user = uci.get(config_name,"db","username")
73 db_pwd = uci.get(config_name,"db","password")
74
75 env = require("luasql.mysql").mysql()
76 conn = env:connect(db_name,db_user,db_pwd,db_server)
77 conn:execute("SET NAMES utf8")
78
79 MQTT = require "mosquitto"
80 mqtt_client = MQTT.new(mqtt_id)
81 if mqtt_user then
82   mqtt_client:login_set(mqtt_user, mqtt_pwd)
83 end
84
85 mqtt_client:connect("estia.rvb-home.lan",1883)
86 mqtt_client.ON_MESSAGE = process_MSG
87
88 cur = conn:execute("SELECT DISTINCT topic FROM mqtt_topics WHERE topic<>''")
89 rec = cur:fetch()
90 while rec do
91   mqtt_client:subscribe(rec)
92   rec = cur:fetch()
93 end
94  
95 mqtt_client:loop_forever()