2c5419c218453ed67d908cbe12da39dcb647c918
[weathermon.git] / bin / weathermon
1 #!/usr/bin/lua
2
3 local json = require("json")
4 local signal = require("posix.signal")
5
6 require "wm_util"
7
8 function getConfig(configname)
9
10   local command,f
11
12   local uci=require("uci")
13   local cur=uci.cursor()
14   local config
15   if configname then
16     config=configname
17   else
18     config="weathermon"
19   end
20
21   web_url  = cur.get(config,"web","url")
22   web_user = cur.get(config,"web","user")
23   web_timeout = cur.get(config,"web","timeout")
24   web_pass = cur.get(config,"web","password")
25  
26   if not web_timeout then
27     web_timeout = 10
28   end
29
30   web_devid = get_devid(config)
31
32   logging = cur.get(config,"logging","enabled") 
33   touch_file = cur.get(config,"logging","touch_file") 
34
35   backlogdb = cur.get(config,"process","backlogdb")
36   logdb = cur.get(config,"process","logdb")
37
38   serial_port = cur.get(config,"serial","port")
39   serial_baud = cur.get(config,"serial","baud")
40
41   input_file = cur.get(config,"input","file")
42   input_exec = cur.get(config,"input","exec")
43   alarm_exec = cur.get(config,"alarm","exec")
44
45   if serial_port then
46
47     command = "stty -F  "..serial_port.." "..serial_baud
48     capture(command)
49
50   end
51
52   mqtt_host = cur.get(config,"mqtt","host")
53   mqtt_port = cur.get(config,"mqtt","port")
54   mqtt_id = cur.get(config,"mqtt","id")
55   mqtt_topic = cur.get(config,"mqtt","topic")
56   mqtt_alarm_topic = cur.get(config,"mqtt","alarm_topic")
57
58   mqtt_user = cur.get(config,"mqtt","user")
59   mqtt_passwd = cur.get(config,"mqtt","password")
60
61   if mqtt_host and not mqtt_id then
62     mqtt_id="weather-"..web_devid
63   end
64
65   if mqtt_host and not mqtt_port then
66     mqtt_port = 1883
67   end
68
69   if mqtt_host and not mqtt_topic then
70     mqtt_topic = 'weathermon/{dev}/{type}/{id}/{param}'
71   end
72
73   if mqtt_host and not mqtt_alarm_topic then
74     mqtt_alarm_topic = 'alarm/{dev}/{type}/{id}'
75   end
76
77   dump_file = cur.get(config,"process","dump_file")
78
79 end
80
81 function printLog(str)
82   if logging=="on" then
83     capture("logger -t weathermon "..str)
84     print(str)  
85   elseif logging=="syslog" then
86     capture("logger -t weathermon "..str)
87   elseif logging=="stdout" then 
88     print(str)  
89   end 
90 end
91
92 function unlock_db(file)
93
94   print("Unlocking DB "..file)
95   os.execute("sqlite3 -readonly \""..file.."\" \".backup /tmp/weathermon.db\"")
96   os.execute("mv /tmp/weathermon.db \""..file.."\"")
97
98 end
99
100 function submitValue(type,id,param,val)
101
102   val = tonumber(val)
103
104   if web_url and val then
105
106     local url = web_url.."?stype="..url_encode(type).."&sid="..url_encode(id).."&param="..url_encode(param).."&value="..url_encode(val)
107
108     if web_user then
109       url = url:gsub("//","//"..web_user..":"..web_pass.."@",1)
110     end
111
112     local result,code = http.request (url)
113
114     if code ~= 200 and backlogdb then
115       printLog("writing record to backlog...")
116       local backlog_con = assert(env:connect(backlogdb))
117       backlog_con:execute('BEGIN TRANSACTION')
118       backlog_con:execute(string.format("INSERT INTO queue(time_stamp,sensor_id,sensor,param,value) VALUES (datetime('now','localtime'),'%s','%s','%s',%f)",id,type,param,val))
119       local n,err = backlog_con:execute('COMMIT TRANSACTION')
120       backlog_con:close()
121       
122       if err == "LuaSQL: database is locked" then
123         unlock_db(backlogdb);
124       end
125       
126     end
127
128   end
129   
130   if logdb then
131     print(logdb)
132     local log_con = assert(env:connect(logdb))
133     log_con:execute('BEGIN TRANSACTION')
134     log_con:execute(string.format("INSERT INTO log(time_stamp,sensor_id,sensor,param,value) VALUES (datetime('now','localtime'),'%s','%s','%s',%f)",id,type,param,val))
135     local n,err = log_con:execute('COMMIT TRANSACTION')
136     log_con:close()
137
138     print(n,err)
139
140     if err == "LuaSQL: database is locked" then
141       unlock_db(logdb);
142     end
143
144   end
145
146 end
147
148 function storeRecord(id,sensor,param,value) 
149
150   if not records[id] then
151     records[id] = {}
152   end  
153   
154   records[id]["timestamp"] = os.date("%Y-%m-%dT%H:%M:%S")
155   
156   if not records[id][sensor] then
157     records[id][sensor] = {}
158   end
159   
160   records[id][sensor][param] = value
161
162 end
163
164 function processJson(str)
165
166   msg=json.decode(str)
167
168   sensor={}
169
170   for key,value in pairs(msg) do
171     if value then
172       if key=="model" or key=="device" then
173         sensor_type=value
174       elseif key=="id" then
175         sensor_id=value
176       elseif key=='time' then
177         sensor_time=value
178       else
179         sensor[key]=value
180       end
181     end
182   end
183
184   if not sensor_id then
185     sensor_id = web_devid
186   end
187
188   if not (sensor_type==nil or sensor_id==nil or sensor_type=='' or sensor_id=='') then
189     local record = {}
190     for k,v in pairs(sensor) do
191       storeRecord(sensor_id,sensor_type,k,v)
192       printLog("Type = "..sensor_type..", ID = "..sensor_id..", Param = "..k..", Value = \""..v.."\"")
193       if web_url then
194         submitValue(sensor_type,sensor_id,k,v)
195       end  
196       if mqtt_client then
197         mqtt_path=string.gsub(mqtt_topic,"{(.-)}", 
198           function (name) 
199             if name=="dev" then
200               return mqtt_encode(web_devid)
201             elseif name=="type" then
202               return mqtt_encode(sensor_type)
203             elseif name=="id" then
204               return mqtt_encode(sensor_id)
205             elseif name=="param" then
206               return k
207             else
208               return '{'..name..'}'
209             end      
210           end)
211         if not mqtt_client:socket() then
212           mqtt_client:reconnect()
213         end  
214         mqtt_client:publish(mqtt_path,v,0,false)
215         mqtt_client:loop()
216       end  
217     end
218   else
219     printLog("Cannot parse sensor input: "..str)
220   end
221
222 end
223
224 function processLine(str)
225
226   msg=split(line,':')
227   msg_type=msg[1] or ''
228   msg_body=msg[2] or ''
229   if msg_type=="STATUS" then
230     printLog("Status: "..msg_body)
231   elseif msg_type=="ERROR" then
232     printLog("Error: "..msg_body)  
233   elseif msg_type=="SENSOR" then
234     printLog("SENSOR: "..msg_body)  
235     sens = split(msg_body,",")
236     sensor = {}
237     idx = 1
238     sensor_type = nil
239     sensor_id = web_devid
240     for i,rec in ipairs(sens) do
241       recrd=split(rec,'=')
242       key=recrd[1] or ''
243       value=recrd[2] or ''
244       if value then
245         if key=="TYPE" then
246           sensor_type=value
247         elseif key=="ID" then
248           sensor_id=value
249         else
250           sensor[key]=value
251         end
252       end
253     end
254     if not (sensor_type==nil or sensor_id==nil or sensor_type=='' or sensor_id=='') then
255       local record = {}
256       for k,v in pairs(sensor) do
257         storeRecord(sensor_id,sensor_type,k,v)
258         printLog("Type = "..sensor_type..", ID = "..sensor_id..", Param = "..k..", Value = "..v)
259         if web_url then 
260           submitValue(sensor_type,sensor_id,k,v)
261         end  
262         if mqtt_client then
263           mqtt_path=string.gsub(mqtt_topic,"{(.-)}", 
264             function (name) 
265               if name=="dev" then
266                 return web_devid
267               elseif name=="type" then
268                 return sensor_type
269               elseif name=="id" then
270                 return sensor_id
271               elseif name=="param" then
272                 return k
273               else
274                 return '{'..name..'}'
275               end      
276             end)
277           if not mqtt_client:socket() then
278             mqtt_client:reconnect()
279           end  
280           mqtt_client:publish(mqtt_path,v,0,false)
281           mqtt_client:loop()
282         end  
283       end
284     else
285       printLog("Cannot parse sensor input: "..msg_body)
286     end
287   elseif msg_type=="ALARM" then
288     printLog("ALARM: "..msg_body)  
289     sens = split(msg_body,",")
290     sensor = {}
291     idx = 1
292     sensor_type = nil
293     sensor_id = web_devid
294     mqtt_param = {}
295     for i,rec in ipairs(sens) do
296       recrd=split(rec,'=')
297       key=recrd[1] or ''
298       value=recrd[2] or ''
299       if value then
300         if key=="TYPE" then
301           alarm_type=value
302         elseif key=="ID" then
303           alarm_id=value
304         end
305       end
306     end
307     if not (alarm_type==nil or alarm_id==nil or alarm_type=='' or alarm_id=='') then
308       if mqtt_client then
309         mqtt_path=string.gsub(mqtt_alarm_topic,"{(.-)}", 
310           function (name) 
311             if name=="dev" then
312               return web_devid
313             elseif name=="type" then
314               return sensor_type
315             elseif name=="id" then
316               return sensor_id
317             else
318               return '{'..name..'}'
319             end      
320           end)
321         if not mqtt_client:socket() then
322           mqtt_client:reconnect()
323         end  
324         mqtt_client:publish(mqtt_path,msg_body,0,false)
325         mqtt_client:loop()
326       end
327       if alarm_exec then
328         command=alarm_exec..
329           " \""..string.gsub(alarm_type,"\"","\\\"")..
330           "\" \""..string.gsub(alarm_id,"\"","\\\"")..
331           "\" \""..string.gsub(msg_body,"\"","\\\"").."\""
332         capture(command)
333       end
334     else
335       printLog("Cannot parse alarm input: "..msg_body)
336     end
337   end
338
339 end
340
341 getConfig(arg[1])
342
343 signal.signal(signal.SIGTERM, function(signum)
344
345   printLog("Terminating...")
346   local pids = get_children()
347   for k,v in pairs(pids) do
348     printLog("Terminating subprocess "..tostring(v).."...")
349     signal.kill(v,signal.SIGTERM)
350   end
351   printLog("Exiting...")
352   os.exit(0)
353
354 end)
355
356 if backlogdb or logdb then
357   local dbdriver = require "luasql.sqlite3"
358   env = assert(dbdriver.sqlite3())
359 end
360
361 if backlogdb then
362   if not file_exists(backlogdb) then
363     touch(backlogdb)
364   end  
365   local backlog_con = assert(env:connect(backlogdb))
366   backlog_con:execute("CREATE TABLE queue(time_stamp datetime,sensor_id varchar(16),sensor varchar(16),param varchar(16),value float)")
367   backlog_con:close()
368 end
369
370 if logdb then
371   if not file_exists(logdb) then
372     touch(logdb)
373   end  
374   local log_con = assert(env:connect(logdb))
375   log_con:execute("CREATE TABLE log(time_stamp datetime,sensor_id varchar(16),sensor varchar(16),param varchar(16),value float)")
376   log_con:execute("CREATE INDEX log_idx ON log(sensor_id,sensor,param,time_stamp)")
377   log_con:close()
378 end
379
380 if web_url then
381   http = require("socket.http")
382   http.TIMEOUT = web_timeout
383 end
384
385 if mqtt_host then
386   MQTT = require "mosquitto"
387   mqtt_client = MQTT.new(mqtt_id)
388   if mqtt_user then
389     mqtt_client:login_set(mqtt_user, mqtt_passwd)
390   end
391   mqtt_client:connect(mqtt_host,mqtt_port)
392 end
393
394 if serial_port then
395   serialin=io.open(serial_port,"r")
396 elseif input_file == "-" then
397   serialin=io.stdin;
398 elseif input_file then
399   serialin=io.open(input_file,"r")
400 elseif input_exec then
401   serialin=io.popen(input_exec,"r")
402 else
403   printLog("No input selected")
404   return
405 end  
406
407 serialin:setvbuf('no')
408
409 records = {}
410
411 while 1 do
412
413   line=serialin:read("*l")
414
415   if line == nil then
416     break
417   end
418
419 --  pcall(function ()
420   
421     printLog("Received: "..line)
422     if startswith(line,'{') then
423       processJson(line)
424     else
425       processLine(line)
426     end
427
428     if touch_file then
429       pcall(function () 
430         touch(touch_file) 
431       end)
432     end  
433
434     if dump_file then
435       pcall(function ()
436         local f = io.open(dump_file,"w")
437         io.output(f)
438         io.write(json.encode(records))
439         io.close(f)
440       end)  
441     end
442 --  end)
443     
444 end