DB "canary" added - with fail/restart when connection dropped
[weathermon.git] / bin / weathermon
1 #!/usr/bin/lua
2
3 local json = require("json")
4 local signal = require("posix.signal")
5
6 require "wm_util"
7
8 function getConfig(configname)
9
10   local command,f
11
12   local uci=require("uci")
13   local cur=uci.cursor()
14   local config
15   if configname then
16     config=configname
17   else
18     config="weathermon"
19   end
20
21   web_url  = cur.get(config,"web","url")
22   web_user = cur.get(config,"web","user")
23   web_timeout = cur.get(config,"web","timeout")
24   web_pass = cur.get(config,"web","password")
25  
26   if not web_timeout then
27     web_timeout = 10
28   end
29
30   web_devid = get_devid(config)
31
32   logging = cur.get(config,"logging","enabled") 
33   touch_file = cur.get(config,"logging","touch_file") 
34
35   backlogdb = cur.get(config,"process","backlogdb")
36   logdb = cur.get(config,"process","logdb")
37
38   serial_port = cur.get(config,"serial","port")
39   serial_baud = cur.get(config,"serial","baud")
40
41   input_file = cur.get(config,"input","file")
42   input_exec = cur.get(config,"input","exec")
43   alarm_exec = cur.get(config,"alarm","exec")
44
45   if serial_port then
46
47     command = "stty -F  "..serial_port.." "..serial_baud
48     capture(command)
49
50   end
51
52   mqtt_host = cur.get(config,"mqtt","host")
53   mqtt_port = cur.get(config,"mqtt","port")
54   mqtt_id = cur.get(config,"mqtt","id")
55   mqtt_topic = cur.get(config,"mqtt","topic")
56   mqtt_alarm_topic = cur.get(config,"mqtt","alarm_topic")
57
58   mqtt_user = cur.get(config,"mqtt","user")
59   mqtt_passwd = cur.get(config,"mqtt","password")
60
61   if mqtt_host and not mqtt_id then
62     mqtt_id="weather-"..web_devid
63   end
64
65   if mqtt_host and not mqtt_port then
66     mqtt_port = 1883
67   end
68
69   if mqtt_host and not mqtt_topic then
70     mqtt_topic = 'weathermon/{dev}/{type}/{id}/{param}'
71   end
72
73   if mqtt_host and not mqtt_alarm_topic then
74     mqtt_alarm_topic = 'alarm/{dev}/{type}/{id}'
75   end
76
77   dump_file = cur.get(config,"process","dump_file")
78
79 end
80
81 function printLog(str)
82   if logging=="on" then
83     capture("logger -t weathermon "..str)
84     print(str)  
85   elseif logging=="syslog" then
86     capture("logger -t weathermon "..str)
87   elseif logging=="stdout" then 
88     print(str)  
89   end 
90 end
91
92 function unlock_db(file)
93
94   print("Unlocking DB "..file)
95   os.execute("sqlite3 -readonly \""..file.."\" \".backup /tmp/weathermon.db\"")
96   os.execute("mv /tmp/weathermon.db \""..file.."\"")
97
98 end
99
100 function submitValue(type,id,param,val)
101
102   val = tonumber(val)
103
104   if web_url and val then
105
106     local url = web_url.."?stype="..url_encode(type).."&sid="..url_encode(id).."&param="..url_encode(param).."&value="..url_encode(val)
107
108     if web_user then
109       url = url:gsub("//","//"..web_user..":"..web_pass.."@",1)
110     end
111
112     local result,code = http.request (url)
113
114     if code ~= 200 and backlogdb then
115       printLog("writing record to backlog...")
116       local backlog_con = assert(env:connect(backlogdb))
117       local n,err = backlog_con:execute(string.format("INSERT INTO queue(time_stamp,sensor_id,sensor,param,value) VALUES (datetime('now','localtime'),'%s','%s','%s',%f)",id,type,param,val))
118       backlog_con:close()
119       
120       if err == "LuaSQL: database is locked" then
121         unlock_db(backlogdb);
122       end
123       
124     end
125
126   end
127   
128   if logdb then
129     print(logdb)
130     local log_con = assert(env:connect(logdb))
131     local n,err = log_con:execute(string.format("INSERT INTO log(time_stamp,sensor_id,sensor,param,value) VALUES (datetime('now','localtime'),'%s','%s','%s',%f)",id,type,param,val))
132     log_con:close()
133
134     if err == "LuaSQL: database is locked" then
135       unlock_db(logdb);
136     end
137
138   end
139
140 end
141
142 function storeRecord(id,sensor,param,value) 
143
144   if not records[id] then
145     records[id] = {}
146   end  
147   
148   records[id]["timestamp"] = os.date("%Y-%m-%dT%H:%M:%S")
149   
150   if not records[id][sensor] then
151     records[id][sensor] = {}
152   end
153   
154   records[id][sensor][param] = value
155
156 end
157
158 function processJson(str)
159
160   msg=json.decode(str)
161
162   sensor={}
163
164   for key,value in pairs(msg) do
165     if value then
166       if key=="model" or key=="device" then
167         sensor_type=value
168       elseif key=="id" then
169         sensor_id=value
170       elseif key=='time' then
171         sensor_time=value
172       else
173         sensor[key]=value
174       end
175     end
176   end
177
178   if not sensor_id then
179     sensor_id = web_devid
180   end
181
182   if not (sensor_type==nil or sensor_id==nil or sensor_type=='' or sensor_id=='') then
183     local record = {}
184     for k,v in pairs(sensor) do
185       storeRecord(sensor_id,sensor_type,k,v)
186       printLog("Type = "..sensor_type..", ID = "..sensor_id..", Param = "..k..", Value = \""..v.."\"")
187       if web_url then
188         submitValue(sensor_type,sensor_id,k,v)
189       end  
190       if mqtt_client then
191         mqtt_path=string.gsub(mqtt_topic,"{(.-)}", 
192           function (name) 
193             if name=="dev" then
194               return mqtt_encode(web_devid)
195             elseif name=="type" then
196               return mqtt_encode(sensor_type)
197             elseif name=="id" then
198               return mqtt_encode(sensor_id)
199             elseif name=="param" then
200               return k
201             else
202               return '{'..name..'}'
203             end      
204           end)
205         if not mqtt_client:socket() then
206           mqtt_client:reconnect()
207         end  
208         mqtt_client:publish(mqtt_path,v,0,false)
209         mqtt_client:loop()
210       end  
211     end
212   else
213     printLog("Cannot parse sensor input: "..str)
214   end
215
216 end
217
218 function processLine(str)
219
220   msg=split(line,':')
221   msg_type=msg[1] or ''
222   msg_body=msg[2] or ''
223   if msg_type=="STATUS" then
224     printLog("Status: "..msg_body)
225   elseif msg_type=="ERROR" then
226     printLog("Error: "..msg_body)  
227   elseif msg_type=="SENSOR" then
228     printLog("SENSOR: "..msg_body)  
229     sens = split(msg_body,",")
230     sensor = {}
231     idx = 1
232     sensor_type = nil
233     sensor_id = web_devid
234     for i,rec in ipairs(sens) do
235       recrd=split(rec,'=')
236       key=recrd[1] or ''
237       value=recrd[2] or ''
238       if value then
239         if key=="TYPE" then
240           sensor_type=value
241         elseif key=="ID" then
242           sensor_id=value
243         else
244           sensor[key]=value
245         end
246       end
247     end
248     if not (sensor_type==nil or sensor_id==nil or sensor_type=='' or sensor_id=='') then
249       local record = {}
250       for k,v in pairs(sensor) do
251         storeRecord(sensor_id,sensor_type,k,v)
252         printLog("Type = "..sensor_type..", ID = "..sensor_id..", Param = "..k..", Value = "..v)
253         if web_url then 
254           submitValue(sensor_type,sensor_id,k,v)
255         end  
256         if mqtt_client then
257           mqtt_path=string.gsub(mqtt_topic,"{(.-)}", 
258             function (name) 
259               if name=="dev" then
260                 return web_devid
261               elseif name=="type" then
262                 return sensor_type
263               elseif name=="id" then
264                 return sensor_id
265               elseif name=="param" then
266                 return k
267               else
268                 return '{'..name..'}'
269               end      
270             end)
271           if not mqtt_client:socket() then
272             mqtt_client:reconnect()
273           end  
274           mqtt_client:publish(mqtt_path,v,0,false)
275           mqtt_client:loop()
276         end  
277       end
278     else
279       printLog("Cannot parse sensor input: "..msg_body)
280     end
281   elseif msg_type=="ALARM" then
282     printLog("ALARM: "..msg_body)  
283     sens = split(msg_body,",")
284     sensor = {}
285     idx = 1
286     sensor_type = nil
287     sensor_id = web_devid
288     mqtt_param = {}
289     for i,rec in ipairs(sens) do
290       recrd=split(rec,'=')
291       key=recrd[1] or ''
292       value=recrd[2] or ''
293       if value then
294         if key=="TYPE" then
295           alarm_type=value
296         elseif key=="ID" then
297           alarm_id=value
298         end
299       end
300     end
301     if not (alarm_type==nil or alarm_id==nil or alarm_type=='' or alarm_id=='') then
302       if mqtt_client then
303         mqtt_path=string.gsub(mqtt_alarm_topic,"{(.-)}", 
304           function (name) 
305             if name=="dev" then
306               return web_devid
307             elseif name=="type" then
308               return sensor_type
309             elseif name=="id" then
310               return sensor_id
311             else
312               return '{'..name..'}'
313             end      
314           end)
315         if not mqtt_client:socket() then
316           mqtt_client:reconnect()
317         end  
318         mqtt_client:publish(mqtt_path,msg_body,0,false)
319         mqtt_client:loop()
320       end
321       if alarm_exec then
322         command=alarm_exec..
323           " \""..string.gsub(alarm_type,"\"","\\\"")..
324           "\" \""..string.gsub(alarm_id,"\"","\\\"")..
325           "\" \""..string.gsub(msg_body,"\"","\\\"").."\""
326         capture(command)
327       end
328     else
329       printLog("Cannot parse alarm input: "..msg_body)
330     end
331   end
332
333 end
334
335 getConfig(arg[1])
336
337 signal.signal(signal.SIGTERM, function(signum)
338
339   printLog("Terminating...")
340   local pids = get_children()
341   for k,v in pairs(pids) do
342     printLog("Terminating subprocess "..tostring(v).."...")
343     signal.kill(v,signal.SIGTERM)
344   end
345   printLog("Exiting...")
346   os.exit(0)
347
348 end)
349
350 if backlogdb or logdb then
351   local dbdriver = require "luasql.sqlite3"
352   env = assert(dbdriver.sqlite3())
353 end
354
355 if backlogdb then
356   if not file_exists(backlogdb) then
357     touch(backlogdb)
358   end  
359   local backlog_con = assert(env:connect(backlogdb))
360   backlog_con:execute("CREATE TABLE queue(time_stamp datetime,sensor_id varchar(16),sensor varchar(16),param varchar(16),value float)")
361   backlog_con:close()
362 end
363
364 if logdb then
365   if not file_exists(logdb) then
366     touch(logdb)
367   end  
368   local log_con = assert(env:connect(logdb))
369   log_con:execute("CREATE TABLE log(time_stamp datetime,sensor_id varchar(16),sensor varchar(16),param varchar(16),value float)")
370   log_con:execute("CREATE INDEX log_idx ON log(sensor_id,sensor,param,time_stamp)")
371   log_con:close()
372 end
373
374 if web_url then
375   http = require("socket.http")
376   http.TIMEOUT = web_timeout
377 end
378
379 if mqtt_host then
380   MQTT = require "mosquitto"
381   mqtt_client = MQTT.new(mqtt_id)
382   if mqtt_user then
383     mqtt_client:login_set(mqtt_user, mqtt_passwd)
384   end
385   mqtt_client:connect(mqtt_host,mqtt_port)
386 end
387
388 if serial_port then
389   serialin=io.open(serial_port,"r")
390 elseif input_file == "-" then
391   serialin=io.stdin;
392 elseif input_file then
393   serialin=io.open(input_file,"r")
394 elseif input_exec then
395   serialin=io.popen(input_exec,"r")
396 else
397   printLog("No input selected")
398   return
399 end  
400
401 serialin:setvbuf('no')
402
403 records = {}
404
405 while 1 do
406
407   line=serialin:read("*l")
408
409   if line == nil then
410     break
411   end
412
413 --  pcall(function ()
414   
415     printLog("Received: "..line)
416     if startswith(line,'{') then
417       processJson(line)
418     else
419       processLine(line)
420     end
421
422     if touch_file then
423       pcall(function () 
424         touch(touch_file) 
425       end)
426     end  
427
428     if dump_file then
429       pcall(function ()
430         local f = io.open(dump_file,"w")
431         io.output(f)
432         io.write(json.encode(records))
433         io.close(f)
434       end)  
435     end
436 --  end)
437     
438 end