d16f871c18671c0d0e3bcb8e573889463132cdaf
[weathermon.git] / bin / weathermon
1 #!/usr/bin/lua
2
3 local json = require("json")
4
5 require "wm_util"
6
7 function getConfig(configname)
8
9   local command,f
10
11   local uci=require("uci")
12   local cur=uci.cursor()
13   local config
14   if configname then
15     config=configname
16   else
17     config="weathermon"
18   end
19
20   web_url  = cur.get(config,"web","url")
21   web_user = cur.get(config,"web","user")
22   web_timeout = cur.get(config,"web","timeout")
23   web_pass = cur.get(config,"web","password")
24  
25   if not web_timeout then
26     web_timeout = 10
27   end
28
29   web_devid = get_devid(config)
30
31   logging = cur.get(config,"logging","enabled") 
32   touch_file = cur.get(config,"logging","touch_file") 
33
34   backlogdb = cur.get(config,"process","backlogdb")
35   logdb = cur.get(config,"process","logdb")
36
37   serial_port = cur.get(config,"serial","port")
38   serial_baud = cur.get(config,"serial","baud")
39
40   input_file = cur.get(config,"input","file")
41   input_exec = cur.get(config,"input","exec")
42   alarm_exec = cur.get(config,"alarm","exec")
43
44   if serial_port then
45
46     command = "stty -F  "..serial_port.." "..serial_baud
47     capture(command)
48
49   end
50
51   mqtt_host = cur.get(config,"mqtt","host")
52   mqtt_port = cur.get(config,"mqtt","port")
53   mqtt_id = cur.get(config,"mqtt","id")
54   mqtt_topic = cur.get(config,"mqtt","topic")
55   mqtt_alarm_topic = cur.get(config,"mqtt","alarm_topic")
56
57   mqtt_user = cur.get(config,"mqtt","user")
58   mqtt_passwd = cur.get(config,"mqtt","password")
59
60   if mqtt_host and not mqtt_id then
61     mqtt_id="weather-"..web_devid
62   end
63
64   if mqtt_host and not mqtt_port then
65     mqtt_port = 1883
66   end
67
68   if mqtt_host and not mqtt_topic then
69     mqtt_topic = 'weathermon/{dev}/{type}/{id}/{param}'
70   end
71
72   if mqtt_host and not mqtt_alarm_topic then
73     mqtt_alarm_topic = 'alarm/{dev}/{type}/{id}'
74   end
75
76   dump_file = cur.get(config,"process","dump_file")
77
78 end
79
80 function printLog(str)
81   if logging=="on" then
82     capture("logger -t weathermon "..str)
83     print(str)  
84   elseif logging=="syslog" then
85     capture("logger -t weathermon "..str)
86   elseif logging=="stdout" then 
87     print(str)  
88   end 
89 end
90
91 function submitValue(type,id,param,val)
92
93   val = tonumber(val)
94
95   if web_url and val then
96
97     local url = web_url.."?stype="..url_encode(type).."&sid="..url_encode(id).."&param="..url_encode(param).."&value="..url_encode(val)
98
99     if web_user then
100       url = url:gsub("//","//"..web_user..":"..web_pass.."@",1)
101     end
102
103     local result,code = http.request ({
104       url=url, create=function()
105         local req_sock = socket.tcp()
106         req_sock:settimeout(web_timeout)
107         return req_sock
108       end})
109
110     if code ~= 200 and backlog_con then
111       printLog("writing record to backlog...")
112       backlog_con:execute(string.format("INSERT INTO queue(time_stamp,sensor_id,sensor,param,value) VALUES (datetime('now','localtime'),'%s','%s','%s',%f)",id,type,param,val))
113     end
114
115   end
116   
117   if logdb then
118     log_con:execute(string.format("INSERT INTO log(time_stamp,sensor_id,sensor,param,value) VALUES (datetime('now','localtime'),'%s','%s','%s',%f)",id,type,param,val))
119   end
120
121 end
122
123 function storeRecord(id,sensor,param,value) 
124
125   if not records[id] then
126     records[id] = {}
127   end  
128   
129   records[id]["timestamp"] = os.date("%Y-%m-%dT%H:%M:%S")
130   
131   if not records[id][sensor] then
132     records[id][sensor] = {}
133   end
134   
135   records[id][sensor][param] = value
136
137 end
138
139 function processJson(str)
140
141   msg=json.decode(str)
142
143   sensor={}
144
145   for key,value in pairs(msg) do
146     if value then
147       if key=="model" or key=="device" then
148         sensor_type=value
149       elseif key=="id" then
150         sensor_id=value
151       elseif key=='time' then
152         sensor_time=value
153       else
154         sensor[key]=value
155       end
156     end
157   end
158
159   if not sensor_id then
160     sensor_id = web_devid
161   end
162
163   if not (sensor_type==nil or sensor_id==nil or sensor_type=='' or sensor_id=='') then
164     if next(sensor)==nil then
165       sensor["command"]="alarm"
166     end
167     local record = {}
168     for k,v in pairs(sensor) do
169       storeRecord(sensor_id,sensor_type,k,v)
170       printLog("Type = "..sensor_type..", ID = "..sensor_id..", Param = "..k..", Value = \""..v.."\"")
171       if web_url then
172         submitValue(sensor_type,sensor_id,k,v)
173       end  
174       if mqtt_client then
175         mqtt_path=string.gsub(mqtt_topic,"{(.-)}", 
176           function (name) 
177             if name=="dev" then
178               return mqtt_encode(web_devid)
179             elseif name=="type" then
180               return mqtt_encode(sensor_type)
181             elseif name=="id" then
182               return mqtt_encode(sensor_id)
183             elseif name=="param" then
184               return k
185             else
186               return '{'..name..'}'
187             end      
188           end)
189         mqtt_client:publish(mqtt_path,v)
190         mqtt_client:loop()
191       end  
192     end
193   else
194     printLog("Cannot parse sensor input: "..str)
195   end
196
197 end
198
199 function processLine(str)
200
201   msg=split(line,':')
202   msg_type=msg[1] or ''
203   msg_body=msg[2] or ''
204   if msg_type=="STATUS" then
205     printLog("Status: "..msg_body)
206   elseif msg_type=="ERROR" then
207     printLog("Error: "..msg_body)  
208   elseif msg_type=="SENSOR" then
209     printLog("SENSOR: "..msg_body)  
210     sens = split(msg_body,",")
211     sensor = {}
212     idx = 1
213     sensor_type = nil
214     sensor_id = web_devid
215     for i,rec in ipairs(sens) do
216       recrd=split(rec,'=')
217       key=recrd[1] or ''
218       value=recrd[2] or ''
219       if value then
220         if key=="TYPE" then
221           sensor_type=value
222         elseif key=="ID" then
223           sensor_id=value
224         else
225           sensor[key]=value
226         end
227       end
228     end
229     if not (sensor_type==nil or sensor_id==nil or sensor_type=='' or sensor_id=='') then
230       local record = {}
231       for k,v in pairs(sensor) do
232         storeRecord(sensor_id,sensor_type,k,v)
233         printLog("Type = "..sensor_type..", ID = "..sensor_id..", Param = "..k..", Value = "..v)
234         if web_url then 
235           submitValue(sensor_type,sensor_id,k,v)
236         end  
237         if mqtt_client then
238           mqtt_path=string.gsub(mqtt_topic,"{(.-)}", 
239             function (name) 
240               if name=="dev" then
241                 return web_devid
242               elseif name=="type" then
243                 return sensor_type
244               elseif name=="id" then
245                 return sensor_id
246               elseif name=="param" then
247                 return k
248               else
249                 return '{'..name..'}'
250               end      
251             end)
252           mqtt_client:publish(mqtt_path,v)
253           mqtt_client:loop()
254         end  
255       end
256     else
257       printLog("Cannot parse sensor input: "..msg_body)
258     end
259   elseif msg_type=="ALARM" then
260     printLog("ALARM: "..msg_body)  
261     sens = split(msg_body,",")
262     sensor = {}
263     idx = 1
264     sensor_type = nil
265     sensor_id = web_devid
266     mqtt_param = {}
267     for i,rec in ipairs(sens) do
268       recrd=split(rec,'=')
269       key=recrd[1] or ''
270       value=recrd[2] or ''
271       if value then
272         if key=="TYPE" then
273           alarm_type=value
274         elseif key=="ID" then
275           alarm_id=value
276         end
277       end
278     end
279     if not (alarm_type==nil or alarm_id==nil or alarm_type=='' or alarm_id=='') then
280       if mqtt_client then
281         mqtt_path=string.gsub(mqtt_alarm_topic,"{(.-)}", 
282           function (name) 
283             if name=="dev" then
284               return web_devid
285             elseif name=="type" then
286               return sensor_type
287             elseif name=="id" then
288               return sensor_id
289             else
290               return '{'..name..'}'
291             end      
292           end)
293         mqtt_client:publish(mqtt_path,msg_body)
294         mqtt_client:loop()
295       end
296       if alarm_exec then
297         command=alarm_exec..
298           " \""..string.gsub(alarm_type,"\"","\\\"")..
299           "\" \""..string.gsub(alarm_id,"\"","\\\"")..
300           "\" \""..string.gsub(msg_body,"\"","\\\"").."\""
301         capture(command)
302       end
303     else
304       printLog("Cannot parse alarm input: "..msg_body)
305     end
306   end
307
308 end
309
310 getConfig(arg[1])
311
312 if backlogdb or logdb then
313   local dbdriver = require "luasql.sqlite3"
314   env = assert(dbdriver.sqlite3())
315 end
316
317 if backlogdb then
318   if not file_exists(backlogdb) then
319     touch(backlogdb)
320   end  
321   backlog_con = assert(env:connect(backlogdb))
322   backlog_con:execute("CREATE TABLE queue(time_stamp datetime,sensor_id varchar(16),sensor varchar(16),param varchar(16),value float)")
323 end
324
325 if logdb then
326   if not file_exists(logdb) then
327     touch(logdb)
328   end  
329   log_con = assert(env:connect(logdb))
330   log_con:execute("CREATE TABLE log(time_stamp datetime,sensor_id varchar(16),sensor varchar(16),param varchar(16),value float)")
331   log_con:execute("CREATE INDEX log_idx ON log(sensor_id,sensor,param,time_stamp)")
332 end
333
334 if web_url then
335   http = require("socket.http")
336   socket = require("socket")
337 end
338
339 if mqtt_host then
340   MQTT = require "mosquitto"
341   mqtt_client = MQTT.new(mqtt_id)
342   if mqtt_user then
343     mqtt_client:login_set(mqtt_user, mqtt_passwd)
344   end
345   mqtt_client:connect(mqtt_host,mqtt_port)
346 end
347
348 if serial_port then
349   serialin=io.open(serial_port,"r")
350 elseif input_file == "-" then
351   serialin=io.stdin;
352 elseif input_file then
353   serialin=io.open(input_file,"r")
354 elseif input_exec then
355   serialin=io.popen(input_exec,"r")
356 else
357   printLog("No input selected")
358   return
359 end  
360
361 serialin:setvbuf('no')
362
363 records = {}
364
365 while 1 do
366
367   line=serialin:read("*l")
368
369   if line == nil then
370     break
371   end
372
373   pcall(function ()
374   
375     printLog("Received: "..line)
376     if startswith(line,'{') then
377       processJson(line)
378     else
379       processLine(line)
380     end
381
382     if touch_file then
383       pcall(function () 
384         touch(touch_file) 
385       end)
386     end  
387
388     if dump_file then
389       pcall(function ()
390         local f = io.open(dump_file,"w")
391         io.output(f)
392         io.write(json.encode(records))
393         io.close(f)
394       end)  
395     end
396   end)
397     
398 end