Штатный способ работы с таймаутами работает стабильнее традиционного с функцией откры...
[weathermon.git] / bin / weathermon
1 #!/usr/bin/lua
2
3 local json = require("json")
4
5 require "wm_util"
6
7 function getConfig(configname)
8
9   local command,f
10
11   local uci=require("uci")
12   local cur=uci.cursor()
13   local config
14   if configname then
15     config=configname
16   else
17     config="weathermon"
18   end
19
20   web_url  = cur.get(config,"web","url")
21   web_user = cur.get(config,"web","user")
22   web_timeout = cur.get(config,"web","timeout")
23   web_pass = cur.get(config,"web","password")
24  
25   if not web_timeout then
26     web_timeout = 10
27   end
28
29   web_devid = get_devid(config)
30
31   logging = cur.get(config,"logging","enabled") 
32   touch_file = cur.get(config,"logging","touch_file") 
33
34   backlogdb = cur.get(config,"process","backlogdb")
35   logdb = cur.get(config,"process","logdb")
36
37   serial_port = cur.get(config,"serial","port")
38   serial_baud = cur.get(config,"serial","baud")
39
40   input_file = cur.get(config,"input","file")
41   input_exec = cur.get(config,"input","exec")
42   alarm_exec = cur.get(config,"alarm","exec")
43
44   if serial_port then
45
46     command = "stty -F  "..serial_port.." "..serial_baud
47     capture(command)
48
49   end
50
51   mqtt_host = cur.get(config,"mqtt","host")
52   mqtt_port = cur.get(config,"mqtt","port")
53   mqtt_id = cur.get(config,"mqtt","id")
54   mqtt_topic = cur.get(config,"mqtt","topic")
55   mqtt_alarm_topic = cur.get(config,"mqtt","alarm_topic")
56
57   mqtt_user = cur.get(config,"mqtt","user")
58   mqtt_passwd = cur.get(config,"mqtt","password")
59
60   if mqtt_host and not mqtt_id then
61     mqtt_id="weather-"..web_devid
62   end
63
64   if mqtt_host and not mqtt_port then
65     mqtt_port = 1883
66   end
67
68   if mqtt_host and not mqtt_topic then
69     mqtt_topic = 'weathermon/{dev}/{type}/{id}/{param}'
70   end
71
72   if mqtt_host and not mqtt_alarm_topic then
73     mqtt_alarm_topic = 'alarm/{dev}/{type}/{id}'
74   end
75
76   dump_file = cur.get(config,"process","dump_file")
77
78 end
79
80 function printLog(str)
81   if logging=="on" then
82     capture("logger -t weathermon "..str)
83     print(str)  
84   elseif logging=="syslog" then
85     capture("logger -t weathermon "..str)
86   elseif logging=="stdout" then 
87     print(str)  
88   end 
89 end
90
91 function submitValue(type,id,param,val)
92
93   val = tonumber(val)
94
95   if web_url and val then
96
97     local url = web_url.."?stype="..url_encode(type).."&sid="..url_encode(id).."&param="..url_encode(param).."&value="..url_encode(val)
98
99     if web_user then
100       url = url:gsub("//","//"..web_user..":"..web_pass.."@",1)
101     end
102
103     local result,code = http.request (url)
104
105     if code ~= 200 and backlog_con then
106       printLog("writing record to backlog...")
107       backlog_con:execute(string.format("INSERT INTO queue(time_stamp,sensor_id,sensor,param,value) VALUES (datetime('now','localtime'),'%s','%s','%s',%f)",id,type,param,val))
108     end
109
110   end
111   
112   if logdb then
113     log_con:execute(string.format("INSERT INTO log(time_stamp,sensor_id,sensor,param,value) VALUES (datetime('now','localtime'),'%s','%s','%s',%f)",id,type,param,val))
114   end
115
116 end
117
118 function storeRecord(id,sensor,param,value) 
119
120   if not records[id] then
121     records[id] = {}
122   end  
123   
124   records[id]["timestamp"] = os.date("%Y-%m-%dT%H:%M:%S")
125   
126   if not records[id][sensor] then
127     records[id][sensor] = {}
128   end
129   
130   records[id][sensor][param] = value
131
132 end
133
134 function processJson(str)
135
136   msg=json.decode(str)
137
138   sensor={}
139
140   for key,value in pairs(msg) do
141     if value then
142       if key=="model" or key=="device" then
143         sensor_type=value
144       elseif key=="id" then
145         sensor_id=value
146       elseif key=='time' then
147         sensor_time=value
148       else
149         sensor[key]=value
150       end
151     end
152   end
153
154   if not sensor_id then
155     sensor_id = web_devid
156   end
157
158   if not (sensor_type==nil or sensor_id==nil or sensor_type=='' or sensor_id=='') then
159     if next(sensor)==nil then
160       sensor["command"]="alarm"
161     end
162     local record = {}
163     for k,v in pairs(sensor) do
164       storeRecord(sensor_id,sensor_type,k,v)
165       printLog("Type = "..sensor_type..", ID = "..sensor_id..", Param = "..k..", Value = \""..v.."\"")
166       if web_url then
167         submitValue(sensor_type,sensor_id,k,v)
168       end  
169       if mqtt_client then
170         mqtt_path=string.gsub(mqtt_topic,"{(.-)}", 
171           function (name) 
172             if name=="dev" then
173               return mqtt_encode(web_devid)
174             elseif name=="type" then
175               return mqtt_encode(sensor_type)
176             elseif name=="id" then
177               return mqtt_encode(sensor_id)
178             elseif name=="param" then
179               return k
180             else
181               return '{'..name..'}'
182             end      
183           end)
184         if not mqtt_client:socket() then
185           mqtt_client:reconnect()
186         end  
187         mqtt_client:publish(mqtt_path,v,0,0)
188         mqtt_client:loop()
189       end  
190     end
191   else
192     printLog("Cannot parse sensor input: "..str)
193   end
194
195 end
196
197 function processLine(str)
198
199   msg=split(line,':')
200   msg_type=msg[1] or ''
201   msg_body=msg[2] or ''
202   if msg_type=="STATUS" then
203     printLog("Status: "..msg_body)
204   elseif msg_type=="ERROR" then
205     printLog("Error: "..msg_body)  
206   elseif msg_type=="SENSOR" then
207     printLog("SENSOR: "..msg_body)  
208     sens = split(msg_body,",")
209     sensor = {}
210     idx = 1
211     sensor_type = nil
212     sensor_id = web_devid
213     for i,rec in ipairs(sens) do
214       recrd=split(rec,'=')
215       key=recrd[1] or ''
216       value=recrd[2] or ''
217       if value then
218         if key=="TYPE" then
219           sensor_type=value
220         elseif key=="ID" then
221           sensor_id=value
222         else
223           sensor[key]=value
224         end
225       end
226     end
227     if not (sensor_type==nil or sensor_id==nil or sensor_type=='' or sensor_id=='') then
228       local record = {}
229       for k,v in pairs(sensor) do
230         storeRecord(sensor_id,sensor_type,k,v)
231         printLog("Type = "..sensor_type..", ID = "..sensor_id..", Param = "..k..", Value = "..v)
232         if web_url then 
233           submitValue(sensor_type,sensor_id,k,v)
234         end  
235         if mqtt_client then
236           mqtt_path=string.gsub(mqtt_topic,"{(.-)}", 
237             function (name) 
238               if name=="dev" then
239                 return web_devid
240               elseif name=="type" then
241                 return sensor_type
242               elseif name=="id" then
243                 return sensor_id
244               elseif name=="param" then
245                 return k
246               else
247                 return '{'..name..'}'
248               end      
249             end)
250           mqtt_client:publish(mqtt_path,v,0,0)
251           mqtt_client:loop()
252         end  
253       end
254     else
255       printLog("Cannot parse sensor input: "..msg_body)
256     end
257   elseif msg_type=="ALARM" then
258     printLog("ALARM: "..msg_body)  
259     sens = split(msg_body,",")
260     sensor = {}
261     idx = 1
262     sensor_type = nil
263     sensor_id = web_devid
264     mqtt_param = {}
265     for i,rec in ipairs(sens) do
266       recrd=split(rec,'=')
267       key=recrd[1] or ''
268       value=recrd[2] or ''
269       if value then
270         if key=="TYPE" then
271           alarm_type=value
272         elseif key=="ID" then
273           alarm_id=value
274         end
275       end
276     end
277     if not (alarm_type==nil or alarm_id==nil or alarm_type=='' or alarm_id=='') then
278       if mqtt_client then
279         mqtt_path=string.gsub(mqtt_alarm_topic,"{(.-)}", 
280           function (name) 
281             if name=="dev" then
282               return web_devid
283             elseif name=="type" then
284               return sensor_type
285             elseif name=="id" then
286               return sensor_id
287             else
288               return '{'..name..'}'
289             end      
290           end)
291         mqtt_client:publish(mqtt_path,msg_body,0,0)
292         mqtt_client:loop()
293       end
294       if alarm_exec then
295         command=alarm_exec..
296           " \""..string.gsub(alarm_type,"\"","\\\"")..
297           "\" \""..string.gsub(alarm_id,"\"","\\\"")..
298           "\" \""..string.gsub(msg_body,"\"","\\\"").."\""
299         capture(command)
300       end
301     else
302       printLog("Cannot parse alarm input: "..msg_body)
303     end
304   end
305
306 end
307
308 getConfig(arg[1])
309
310 if backlogdb or logdb then
311   local dbdriver = require "luasql.sqlite3"
312   env = assert(dbdriver.sqlite3())
313 end
314
315 if backlogdb then
316   if not file_exists(backlogdb) then
317     touch(backlogdb)
318   end  
319   backlog_con = assert(env:connect(backlogdb))
320   backlog_con:execute("CREATE TABLE queue(time_stamp datetime,sensor_id varchar(16),sensor varchar(16),param varchar(16),value float)")
321 end
322
323 if logdb then
324   if not file_exists(logdb) then
325     touch(logdb)
326   end  
327   log_con = assert(env:connect(logdb))
328   log_con:execute("CREATE TABLE log(time_stamp datetime,sensor_id varchar(16),sensor varchar(16),param varchar(16),value float)")
329   log_con:execute("CREATE INDEX log_idx ON log(sensor_id,sensor,param,time_stamp)")
330 end
331
332 if web_url then
333   http = require("socket.http")
334   http.TIMEOUT = web_timeout
335 end
336
337 if mqtt_host then
338   MQTT = require "mosquitto"
339   mqtt_client = MQTT.new(mqtt_id)
340   if mqtt_user then
341     mqtt_client:login_set(mqtt_user, mqtt_passwd)
342   end
343   mqtt_client:connect(mqtt_host,mqtt_port)
344 end
345
346 if serial_port then
347   serialin=io.open(serial_port,"r")
348 elseif input_file == "-" then
349   serialin=io.stdin;
350 elseif input_file then
351   serialin=io.open(input_file,"r")
352 elseif input_exec then
353   serialin=io.popen(input_exec,"r")
354 else
355   printLog("No input selected")
356   return
357 end  
358
359 serialin:setvbuf('no')
360
361 records = {}
362
363 while 1 do
364
365   line=serialin:read("*l")
366
367   if line == nil then
368     break
369   end
370
371   pcall(function ()
372   
373     printLog("Received: "..line)
374     if startswith(line,'{') then
375       processJson(line)
376     else
377       processLine(line)
378     end
379
380     if touch_file then
381       pcall(function () 
382         touch(touch_file) 
383       end)
384     end  
385
386     if dump_file then
387       pcall(function ()
388         local f = io.open(dump_file,"w")
389         io.output(f)
390         io.write(json.encode(records))
391         io.close(f)
392       end)  
393     end
394   end)
395     
396 end