bd9ca80200d5e226cddfce4c1c212d98463f8aa8
[weathermon.git] / bin / weathermon
1 #!/usr/bin/lua
2
3 local json = require("json")
4 local signal = require("posix.signal")
5
6 require "wm_util"
7
8 function getConfig(configname)
9
10   local command,f
11
12   local uci=require("uci")
13   local cur=uci.cursor()
14   local config
15   if configname then
16     config=configname
17   else
18     config="weathermon"
19   end
20
21   web_url  = cur.get(config,"web","url")
22   web_user = cur.get(config,"web","user")
23   web_timeout = cur.get(config,"web","timeout")
24   web_pass = cur.get(config,"web","password")
25  
26   if not web_timeout then
27     web_timeout = 10
28   end
29
30   web_devid = get_devid(config)
31
32   logging = cur.get(config,"logging","enabled") 
33   touch_file = cur.get(config,"logging","touch_file") 
34
35   backlogdb = cur.get(config,"process","backlogdb")
36   logdb = cur.get(config,"process","logdb")
37
38   serial_port = cur.get(config,"serial","port")
39   serial_baud = cur.get(config,"serial","baud")
40
41   input_file = cur.get(config,"input","file")
42   input_exec = cur.get(config,"input","exec")
43   alarm_exec = cur.get(config,"alarm","exec")
44
45   if serial_port then
46
47     command = "stty -F  "..serial_port.." "..serial_baud
48     capture(command)
49
50   end
51
52   mqtt_host = cur.get(config,"mqtt","host")
53   mqtt_port = cur.get(config,"mqtt","port")
54   mqtt_id = cur.get(config,"mqtt","id")
55   mqtt_topic = cur.get(config,"mqtt","topic")
56   mqtt_alarm_topic = cur.get(config,"mqtt","alarm_topic")
57
58   mqtt_user = cur.get(config,"mqtt","user")
59   mqtt_passwd = cur.get(config,"mqtt","password")
60
61   if mqtt_host and not mqtt_id then
62     mqtt_id="weather-"..web_devid
63   end
64
65   if mqtt_host and not mqtt_port then
66     mqtt_port = 1883
67   end
68
69   if mqtt_host and not mqtt_topic then
70     mqtt_topic = 'weathermon/{dev}/{type}/{id}/{param}'
71   end
72
73   if mqtt_host and not mqtt_alarm_topic then
74     mqtt_alarm_topic = 'alarm/{dev}/{type}/{id}'
75   end
76
77   dump_file = cur.get(config,"process","dump_file")
78
79 end
80
81 function printLog(str)
82   if logging=="on" then
83     capture("logger -t weathermon "..str)
84     print(str)  
85   elseif logging=="syslog" then
86     capture("logger -t weathermon "..str)
87   elseif logging=="stdout" then 
88     print(str)  
89   end 
90 end
91
92 function submitValue(type,id,param,val)
93
94   val = tonumber(val)
95
96   if web_url and val then
97
98     local url = web_url.."?stype="..url_encode(type).."&sid="..url_encode(id).."&param="..url_encode(param).."&value="..url_encode(val)
99
100     if web_user then
101       url = url:gsub("//","//"..web_user..":"..web_pass.."@",1)
102     end
103
104     local result,code = http.request (url)
105
106     if code ~= 200 and backlog_con then
107       printLog("writing record to backlog...")
108       backlog_con:execute(string.format("INSERT INTO queue(time_stamp,sensor_id,sensor,param,value) VALUES (datetime('now','localtime'),'%s','%s','%s',%f)",id,type,param,val))
109     end
110
111   end
112   
113   if logdb then
114     log_con:execute(string.format("INSERT INTO log(time_stamp,sensor_id,sensor,param,value) VALUES (datetime('now','localtime'),'%s','%s','%s',%f)",id,type,param,val))
115   end
116
117 end
118
119 function storeRecord(id,sensor,param,value) 
120
121   if not records[id] then
122     records[id] = {}
123   end  
124   
125   records[id]["timestamp"] = os.date("%Y-%m-%dT%H:%M:%S")
126   
127   if not records[id][sensor] then
128     records[id][sensor] = {}
129   end
130   
131   records[id][sensor][param] = value
132
133 end
134
135 function processJson(str)
136
137   msg=json.decode(str)
138
139   sensor={}
140
141   for key,value in pairs(msg) do
142     if value then
143       if key=="model" or key=="device" then
144         sensor_type=value
145       elseif key=="id" then
146         sensor_id=value
147       elseif key=='time' then
148         sensor_time=value
149       else
150         sensor[key]=value
151       end
152     end
153   end
154
155   if not sensor_id then
156     sensor_id = web_devid
157   end
158
159   if not (sensor_type==nil or sensor_id==nil or sensor_type=='' or sensor_id=='') then
160     local record = {}
161     for k,v in pairs(sensor) do
162       storeRecord(sensor_id,sensor_type,k,v)
163       printLog("Type = "..sensor_type..", ID = "..sensor_id..", Param = "..k..", Value = \""..v.."\"")
164       if web_url then
165         submitValue(sensor_type,sensor_id,k,v)
166       end  
167       if mqtt_client then
168         mqtt_path=string.gsub(mqtt_topic,"{(.-)}", 
169           function (name) 
170             if name=="dev" then
171               return mqtt_encode(web_devid)
172             elseif name=="type" then
173               return mqtt_encode(sensor_type)
174             elseif name=="id" then
175               return mqtt_encode(sensor_id)
176             elseif name=="param" then
177               return k
178             else
179               return '{'..name..'}'
180             end      
181           end)
182         if not mqtt_client:socket() then
183           mqtt_client:reconnect()
184         end  
185         mqtt_client:publish(mqtt_path,v,0,false)
186         mqtt_client:loop()
187       end  
188     end
189   else
190     printLog("Cannot parse sensor input: "..str)
191   end
192
193 end
194
195 function processLine(str)
196
197   msg=split(line,':')
198   msg_type=msg[1] or ''
199   msg_body=msg[2] or ''
200   if msg_type=="STATUS" then
201     printLog("Status: "..msg_body)
202   elseif msg_type=="ERROR" then
203     printLog("Error: "..msg_body)  
204   elseif msg_type=="SENSOR" then
205     printLog("SENSOR: "..msg_body)  
206     sens = split(msg_body,",")
207     sensor = {}
208     idx = 1
209     sensor_type = nil
210     sensor_id = web_devid
211     for i,rec in ipairs(sens) do
212       recrd=split(rec,'=')
213       key=recrd[1] or ''
214       value=recrd[2] or ''
215       if value then
216         if key=="TYPE" then
217           sensor_type=value
218         elseif key=="ID" then
219           sensor_id=value
220         else
221           sensor[key]=value
222         end
223       end
224     end
225     if not (sensor_type==nil or sensor_id==nil or sensor_type=='' or sensor_id=='') then
226       local record = {}
227       for k,v in pairs(sensor) do
228         storeRecord(sensor_id,sensor_type,k,v)
229         printLog("Type = "..sensor_type..", ID = "..sensor_id..", Param = "..k..", Value = "..v)
230         if web_url then 
231           submitValue(sensor_type,sensor_id,k,v)
232         end  
233         if mqtt_client then
234           mqtt_path=string.gsub(mqtt_topic,"{(.-)}", 
235             function (name) 
236               if name=="dev" then
237                 return web_devid
238               elseif name=="type" then
239                 return sensor_type
240               elseif name=="id" then
241                 return sensor_id
242               elseif name=="param" then
243                 return k
244               else
245                 return '{'..name..'}'
246               end      
247             end)
248           if not mqtt_client:socket() then
249             mqtt_client:reconnect()
250           end  
251           mqtt_client:publish(mqtt_path,v,0,false)
252           mqtt_client:loop()
253         end  
254       end
255     else
256       printLog("Cannot parse sensor input: "..msg_body)
257     end
258   elseif msg_type=="ALARM" then
259     printLog("ALARM: "..msg_body)  
260     sens = split(msg_body,",")
261     sensor = {}
262     idx = 1
263     sensor_type = nil
264     sensor_id = web_devid
265     mqtt_param = {}
266     for i,rec in ipairs(sens) do
267       recrd=split(rec,'=')
268       key=recrd[1] or ''
269       value=recrd[2] or ''
270       if value then
271         if key=="TYPE" then
272           alarm_type=value
273         elseif key=="ID" then
274           alarm_id=value
275         end
276       end
277     end
278     if not (alarm_type==nil or alarm_id==nil or alarm_type=='' or alarm_id=='') then
279       if mqtt_client then
280         mqtt_path=string.gsub(mqtt_alarm_topic,"{(.-)}", 
281           function (name) 
282             if name=="dev" then
283               return web_devid
284             elseif name=="type" then
285               return sensor_type
286             elseif name=="id" then
287               return sensor_id
288             else
289               return '{'..name..'}'
290             end      
291           end)
292         if not mqtt_client:socket() then
293           mqtt_client:reconnect()
294         end  
295         mqtt_client:publish(mqtt_path,msg_body,0,false)
296         mqtt_client:loop()
297       end
298       if alarm_exec then
299         command=alarm_exec..
300           " \""..string.gsub(alarm_type,"\"","\\\"")..
301           "\" \""..string.gsub(alarm_id,"\"","\\\"")..
302           "\" \""..string.gsub(msg_body,"\"","\\\"").."\""
303         capture(command)
304       end
305     else
306       printLog("Cannot parse alarm input: "..msg_body)
307     end
308   end
309
310 end
311
312 getConfig(arg[1])
313
314 signal.signal(signal.SIGTERM, function(signum)
315
316   printLog("Terminating...")
317   local pids = get_children()
318   for k,v in pairs(pids) do
319     printLog("Terminating subprocess "..tostring(v).."...")
320     signal.kill(v,signal.SIGTERM)
321   end
322   printLog("Exiting...")
323   os.exit(0)
324
325 end)
326
327 if backlogdb or logdb then
328   local dbdriver = require "luasql.sqlite3"
329   env = assert(dbdriver.sqlite3())
330 end
331
332 if backlogdb then
333   if not file_exists(backlogdb) then
334     touch(backlogdb)
335   end  
336   backlog_con = assert(env:connect(backlogdb))
337   backlog_con:execute("CREATE TABLE queue(time_stamp datetime,sensor_id varchar(16),sensor varchar(16),param varchar(16),value float)")
338 end
339
340 if logdb then
341   if not file_exists(logdb) then
342     touch(logdb)
343   end  
344   log_con = assert(env:connect(logdb))
345   log_con:execute("CREATE TABLE log(time_stamp datetime,sensor_id varchar(16),sensor varchar(16),param varchar(16),value float)")
346   log_con:execute("CREATE INDEX log_idx ON log(sensor_id,sensor,param,time_stamp)")
347 end
348
349 if web_url then
350   http = require("socket.http")
351   http.TIMEOUT = web_timeout
352 end
353
354 if mqtt_host then
355   MQTT = require "mosquitto"
356   mqtt_client = MQTT.new(mqtt_id)
357   if mqtt_user then
358     mqtt_client:login_set(mqtt_user, mqtt_passwd)
359   end
360   mqtt_client:connect(mqtt_host,mqtt_port)
361 end
362
363 if serial_port then
364   serialin=io.open(serial_port,"r")
365 elseif input_file == "-" then
366   serialin=io.stdin;
367 elseif input_file then
368   serialin=io.open(input_file,"r")
369 elseif input_exec then
370   serialin=io.popen(input_exec,"r")
371 else
372   printLog("No input selected")
373   return
374 end  
375
376 serialin:setvbuf('no')
377
378 records = {}
379
380 while 1 do
381
382   line=serialin:read("*l")
383
384   if line == nil then
385     break
386   end
387
388   pcall(function ()
389   
390     printLog("Received: "..line)
391     if startswith(line,'{') then
392       processJson(line)
393     else
394       processLine(line)
395     end
396
397     if touch_file then
398       pcall(function () 
399         touch(touch_file) 
400       end)
401     end  
402
403     if dump_file then
404       pcall(function ()
405         local f = io.open(dump_file,"w")
406         io.output(f)
407         io.write(json.encode(records))
408         io.close(f)
409       end)  
410     end
411
412   end)
413     
414 end