6980cc835dfc8e2aa6caa9794e282ce3c8ec7443
[weathermon.git] / bin / weathermon
1 #!/usr/bin/lua
2
3 local json = require("json")
4 local socket = require("socket")
5
6 local http = require("socket.http")
7
8 require "wm_util"
9
10 function getConfig(configname)
11
12   local command,f
13
14   local uci=require("uci")
15   local cur=uci.cursor()
16   local config
17   if configname then
18     config=configname
19   else
20     config="weathermon"
21   end
22
23   web_url  = cur.get(config,"web","url")
24   web_user = cur.get(config,"web","user")
25   web_timeout = cur.get(config,"web","timeout")
26   web_pass = cur.get(config,"web","password")
27   web_devid = cur.get(config,"web","devid")
28
29   web_iface = cur.get(config,"web","iface")
30  
31   if not web_timeout then
32     web_timeout = 10
33   end
34
35   if web_iface then
36   
37     command = '/sbin/ifconfig '..web_iface..' | grep \'\\<inet\\>\' | sed -n \'1p\' | tr -s \' \' | cut -d \' \' -f3 | cut -d \':\' -f2'
38     f=io.popen(command,'r')
39     ip_addr=f:read()
40   
41   end
42
43   if not web_devid then
44   
45     if web_iface then
46       io.input("/sys/class/net/"..web_iface.."/address")
47     else
48       io.input("/sys/class/net/eth0/address")
49     end
50
51     local mac = io.read("*line")
52     mac = mac:gsub(":","")
53     mac = mac:upper()
54
55     web_devid = mac
56
57   end
58
59   logging = cur.get(config,"logging","enabled") 
60   touch_file = cur.get(config,"logging","touch_file") 
61
62   backlogdb = cur.get(config,"process","backlogdb")
63   logdb = cur.get(config,"process","logdb")
64
65   serial_port = cur.get(config,"serial","port")
66   serial_baud = cur.get(config,"serial","baud")
67
68   input_file = cur.get(config,"input","file")
69   input_exec = cur.get(config,"input","exec")
70   alarm_exec = cur.get(config,"alarm","exec")
71
72   if serial_port then
73
74     command = "stty -F  "..serial_port.." "..serial_baud
75     capture(command)
76
77   end
78
79   mqtt_host = cur.get(config,"mqtt","host")
80   mqtt_port = cur.get(config,"mqtt","port")
81   mqtt_id = cur.get(config,"mqtt","id")
82   mqtt_topic = cur.get(config,"mqtt","topic")
83   mqtt_alarm_topic = cur.get(config,"mqtt","alarm_topic")
84
85   mqtt_user = cur.get(config,"mqtt","user")
86   mqtt_passwd = cur.get(config,"mqtt","password")
87
88   if mqtt_host and not mqtt_id then
89     mqtt_id="weather-"..web_devid
90   end
91
92   if mqtt_host and not mqtt_port then
93     mqtt_port = 1883
94   end
95
96   if mqtt_host and not mqtt_topic then
97     mqtt_topic = 'weathermon/{dev}/{type}/{id}/{param}'
98   end
99
100   if mqtt_host and not mqtt_alarm_topic then
101     mqtt_alarm_topic = 'alarm/{dev}/{type}/{id}'
102   end
103
104   dump_file = cur.get(config,"process","dump_file")
105
106 end
107
108 function printLog(str)
109   if logging=="on" then
110     capture("logger -t weathermon "..str)
111     print(str)  
112   elseif logging=="syslog" then
113     capture("logger -t weathermon "..str)
114   elseif logging=="stdout" then 
115     print(str)  
116   end 
117 end
118
119 function submitValue(type,id,param,val)
120
121   if web_url then
122
123     local url = web_url.."?stype="..url_encode(type).."&sid="..url_encode(id).."&param="..url_encode(param).."&value="..url_encode(val)
124
125     if web_user then
126       url = url:gsub("//","//"..web_user..":"..web_pass.."@",1)
127     end
128
129     local result,code = http.request ({
130       url=url, create=function()
131         local req_sock = socket.tcp()
132         req_sock:settimeout(web_timeout)
133         return req_sock
134       end})
135
136     if code ~= 200 then
137       print("writing record to backlog...")
138       backlog_con:execute(string.format("INSERT INTO queue(time_stamp,sensor_id,sensor,param,value) VALUES (datetime('now','localtime'),'%s','%s','%s',%f)",id,type,param,val))
139     end
140
141   end
142   
143   if logdb then
144     log_con:execute(string.format("INSERT INTO log(time_stamp,sensor_id,sensor,param,value) VALUES (datetime('now','localtime'),'%s','%s','%s',%f)",id,type,param,val))
145   end
146
147   if touch_file then
148     touch(touch_file)
149   end  
150   
151 end
152
153 function storeRecord(id,sensor,param,value) 
154
155   if not records[id] then
156     records[id] = {}
157   end  
158   
159   records[id]["timestamp"] = os.date("%Y-%m-%dT%H:%M:%S")
160   
161   if not records[id][sensor] then
162     records[id][sensor] = {}
163   end
164   
165   records[id][sensor][param] = value
166
167 end
168
169 function processJson(str)
170
171   msg=json.decode(str)
172
173   sensor={}
174
175   for key,value in pairs(msg) do
176     if value then
177       if key=="model" or key=="device" then
178         sensor_type=value
179       elseif key=="id" then
180         sensor_id=value
181       elseif key=='time' then
182         sensor_time=value
183       else
184         sensor[key]=value
185       end
186     end
187   end
188
189   if not sensor_id then
190     sensor_id = web_devid
191   end
192
193   if not (sensor_type==nil or sensor_id==nil or sensor_type=='' or sensor_id=='') then
194     if next(sensor)==nil then
195       sensor["command"]="alarm"
196     end
197     local record = {}
198     for k,v in pairs(sensor) do
199       storeRecord(sensor_id,sensor_type,k,v)
200       printLog("Type = "..sensor_type..", ID = "..sensor_id..", Param = "..k..", Value = \""..v.."\"")
201       submitValue(sensor_type,sensor_id,k,v)
202       if mqtt_client then
203         mqtt_path=string.gsub(mqtt_topic,"{(.-)}", 
204           function (name) 
205             if name=="dev" then
206               return mqtt_encode(web_devid)
207             elseif name=="type" then
208               return mqtt_encode(sensor_type)
209             elseif name=="id" then
210               return mqtt_encode(sensor_id)
211             elseif name=="param" then
212               return k
213             else
214               return '{'..name..'}'
215             end      
216           end)
217         mqtt_client:connect(mqtt_host,mqtt_port)
218         mqtt_client:publish(mqtt_path,v)
219         mqtt_client:disconnect()
220       end  
221     end
222   else
223     printLog("Cannot parse sensor input: "..str)
224   end
225
226 end
227
228 function processLine(str)
229
230   msg=split(line,':')
231   msg_type=msg[1] or ''
232   msg_body=msg[2] or ''
233   if msg_type=="STATUS" then
234     printLog("Status: "..msg_body)
235   elseif msg_type=="ERROR" then
236     printLog("Error: "..msg_body)  
237   elseif msg_type=="SENSOR" then
238     printLog("SENSOR: "..msg_body)  
239     sens = split(msg_body,",")
240     sensor = {}
241     idx = 1
242     sensor_type = nil
243     sensor_id = web_devid
244     for i,rec in ipairs(sens) do
245       recrd=split(rec,'=')
246       key=recrd[1] or ''
247       value=recrd[2] or ''
248       if value then
249         if key=="TYPE" then
250           sensor_type=value
251         elseif key=="ID" then
252           sensor_id=value
253         else
254           sensor[key]=value
255         end
256       end
257     end
258     if not (sensor_type==nil or sensor_id==nil or sensor_type=='' or sensor_id=='') then
259       local record = {}
260       for k,v in pairs(sensor) do
261         storeRecord(sensor_id,sensor_type,k,v)
262         printLog("Type = "..sensor_type..", ID = "..sensor_id..", Param = "..k..", Value = "..v)
263         submitValue(sensor_type,sensor_id,k,v)
264         if mqtt_client then
265           mqtt_path=string.gsub(mqtt_topic,"{(.-)}", 
266             function (name) 
267               if name=="dev" then
268                 return web_devid
269               elseif name=="type" then
270                 return sensor_type
271               elseif name=="id" then
272                 return sensor_id
273               elseif name=="param" then
274                 return k
275               else
276                 return '{'..name..'}'
277               end      
278             end)
279           mqtt_client:connect(mqtt_host,mqtt_port)
280           mqtt_client:publish(mqtt_path,v)
281           mqtt_client:disconnect()
282         end  
283       end
284     else
285       printLog("Cannot parse sensor input: "..msg_body)
286     end
287   elseif msg_type=="ALARM" then
288     printLog("ALARM: "..msg_body)  
289     sens = split(msg_body,",")
290     sensor = {}
291     idx = 1
292     sensor_type = nil
293     sensor_id = web_devid
294     mqtt_param = {}
295     for i,rec in ipairs(sens) do
296       recrd=split(rec,'=')
297       key=recrd[1] or ''
298       value=recrd[2] or ''
299       if value then
300         if key=="TYPE" then
301           alarm_type=value
302         elseif key=="ID" then
303           alarm_id=value
304         end
305       end
306     end
307     if not (alarm_type==nil or alarm_id==nil or alarm_type=='' or alarm_id=='') then
308       if mqtt_client then
309         mqtt_path=string.gsub(mqtt_alarm_topic,"{(.-)}", 
310           function (name) 
311             if name=="dev" then
312               return web_devid
313             elseif name=="type" then
314               return sensor_type
315             elseif name=="id" then
316               return sensor_id
317             else
318               return '{'..name..'}'
319             end      
320           end)
321         mqtt_client:connect(mqtt_host,mqtt_port)
322         mqtt_client:publish(mqtt_path,msg_body)
323         mqtt_client:disconnect()
324       end
325       if alarm_exec then
326         command=alarm_exec..
327           " \""..string.gsub(alarm_type,"\"","\\\"")..
328           "\" \""..string.gsub(alarm_id,"\"","\\\"")..
329           "\" \""..string.gsub(msg_body,"\"","\\\"").."\""
330         capture(command)
331       end
332     else
333       printLog("Cannot parse alarm input: "..msg_body)
334     end
335   end
336
337 end
338
339 getConfig(arg[1])
340
341 if backlogdb or logdb then
342   local dbdriver = require "luasql.sqlite3"
343   env = assert(dbdriver.sqlite3())
344 end
345
346 if backlogdb then
347   if not file_exists(backlogdb) then
348     touch(backlogdb)
349     backlog_con = assert(env:connect(backlogdb))
350     backlog_con:execute("CREATE TABLE queue(time_stamp datetime,sensor_id varchar(16),sensor varchar(16),param varchar(16),value float)")
351   else
352     backlog_con = assert(env:connect(backlogdb))
353   end
354 end
355
356 if logdb then
357   if not file_exists(logdb) then
358     touch(logdb)
359     log_con = assert(env:connect(logdb))
360     log_con:execute("CREATE TABLE log(time_stamp datetime,sensor_id varchar(16),sensor varchar(16),param varchar(16),value float)")
361     log_con:execute("CREATE INDEX log_idx ON log(sensor_id,sensor,param,time_stamp)")
362   else
363     log_con = assert(env:connect(logdb))
364   end
365 end
366
367 if mqtt_host then
368   MQTT = require "mosquitto"
369   mqtt_client = MQTT.new(mqtt_id)
370   if mqtt_user then
371     mqtt_client:login_set(mqtt_user, mqtt_passwd)
372   end
373 end
374
375 if serial_port then
376   serialin=io.open(serial_port,"r")
377 elseif input_file == "-" then
378   serialin=io.stdin;
379 elseif input_file then
380   serialin=io.open(input_file,"r")
381 elseif input_exec then
382   serialin=io.popen(input_exec,"r")
383 else
384   printLog("No input selected")
385   return
386 end  
387
388 records = {}
389
390 while 1 do
391
392   line=serialin:read("*l")
393
394   if line == nil then
395     break
396   end
397
398   pcall(function ()
399   
400     printLog("Received: "..line)
401     if startswith(line,'{') then
402       processJson(line)
403     else
404       processLine(line)
405     end
406
407     if dump_file then
408       local f = io.open(dump_file,"w")
409       io.output(f)
410       io.write(json.encode(records))
411       io.close(f)
412     end
413   end)
414     
415 end