Переход на короткоживущие соединения с sqlite - на MT7688 возникают блокировки.
[weathermon.git] / bin / weathermon
1 #!/usr/bin/lua
2
3 local json = require("json")
4 local signal = require("posix.signal")
5
6 require "wm_util"
7
8 function getConfig(configname)
9
10   local command,f
11
12   local uci=require("uci")
13   local cur=uci.cursor()
14   local config
15   if configname then
16     config=configname
17   else
18     config="weathermon"
19   end
20
21   web_url  = cur.get(config,"web","url")
22   web_user = cur.get(config,"web","user")
23   web_timeout = cur.get(config,"web","timeout")
24   web_pass = cur.get(config,"web","password")
25  
26   if not web_timeout then
27     web_timeout = 10
28   end
29
30   web_devid = get_devid(config)
31
32   logging = cur.get(config,"logging","enabled") 
33   touch_file = cur.get(config,"logging","touch_file") 
34
35   backlogdb = cur.get(config,"process","backlogdb")
36   logdb = cur.get(config,"process","logdb")
37
38   serial_port = cur.get(config,"serial","port")
39   serial_baud = cur.get(config,"serial","baud")
40
41   input_file = cur.get(config,"input","file")
42   input_exec = cur.get(config,"input","exec")
43   alarm_exec = cur.get(config,"alarm","exec")
44
45   if serial_port then
46
47     command = "stty -F  "..serial_port.." "..serial_baud
48     capture(command)
49
50   end
51
52   mqtt_host = cur.get(config,"mqtt","host")
53   mqtt_port = cur.get(config,"mqtt","port")
54   mqtt_id = cur.get(config,"mqtt","id")
55   mqtt_topic = cur.get(config,"mqtt","topic")
56   mqtt_alarm_topic = cur.get(config,"mqtt","alarm_topic")
57
58   mqtt_user = cur.get(config,"mqtt","user")
59   mqtt_passwd = cur.get(config,"mqtt","password")
60
61   if mqtt_host and not mqtt_id then
62     mqtt_id="weather-"..web_devid
63   end
64
65   if mqtt_host and not mqtt_port then
66     mqtt_port = 1883
67   end
68
69   if mqtt_host and not mqtt_topic then
70     mqtt_topic = 'weathermon/{dev}/{type}/{id}/{param}'
71   end
72
73   if mqtt_host and not mqtt_alarm_topic then
74     mqtt_alarm_topic = 'alarm/{dev}/{type}/{id}'
75   end
76
77   dump_file = cur.get(config,"process","dump_file")
78
79 end
80
81 function printLog(str)
82   if logging=="on" then
83     capture("logger -t weathermon "..str)
84     print(str)  
85   elseif logging=="syslog" then
86     capture("logger -t weathermon "..str)
87   elseif logging=="stdout" then 
88     print(str)  
89   end 
90 end
91
92 function submitValue(type,id,param,val)
93
94   val = tonumber(val)
95
96   if web_url and val then
97
98     local url = web_url.."?stype="..url_encode(type).."&sid="..url_encode(id).."&param="..url_encode(param).."&value="..url_encode(val)
99
100     if web_user then
101       url = url:gsub("//","//"..web_user..":"..web_pass.."@",1)
102     end
103
104     local result,code = http.request (url)
105
106     if code ~= 200 and backlogdb then
107       printLog("writing record to backlog...")
108       local backlog_con = assert(env:connect(backlogdb))
109       backlog_con:execute('BEGIN TRANSACTION')
110       backlog_con:execute(string.format("INSERT INTO queue(time_stamp,sensor_id,sensor,param,value) VALUES (datetime('now','localtime'),'%s','%s','%s',%f)",id,type,param,val))
111       backlog_con:execute('COMMIT')
112       backlog_con:close()
113     end
114
115   end
116   
117   if logdb then
118     local log_con = assert(env:connect(logdb))
119     log_con:execute('BEGIN TRANSACTION')
120     log_con:execute(string.format("INSERT INTO log(time_stamp,sensor_id,sensor,param,value) VALUES (datetime('now','localtime'),'%s','%s','%s',%f)",id,type,param,val))
121     log_con:execute('COMMIT')
122     log_con:close()
123   end
124
125 end
126
127 function storeRecord(id,sensor,param,value) 
128
129   if not records[id] then
130     records[id] = {}
131   end  
132   
133   records[id]["timestamp"] = os.date("%Y-%m-%dT%H:%M:%S")
134   
135   if not records[id][sensor] then
136     records[id][sensor] = {}
137   end
138   
139   records[id][sensor][param] = value
140
141 end
142
143 function processJson(str)
144
145   msg=json.decode(str)
146
147   sensor={}
148
149   for key,value in pairs(msg) do
150     if value then
151       if key=="model" or key=="device" then
152         sensor_type=value
153       elseif key=="id" then
154         sensor_id=value
155       elseif key=='time' then
156         sensor_time=value
157       else
158         sensor[key]=value
159       end
160     end
161   end
162
163   if not sensor_id then
164     sensor_id = web_devid
165   end
166
167   if not (sensor_type==nil or sensor_id==nil or sensor_type=='' or sensor_id=='') then
168     local record = {}
169     for k,v in pairs(sensor) do
170       storeRecord(sensor_id,sensor_type,k,v)
171       printLog("Type = "..sensor_type..", ID = "..sensor_id..", Param = "..k..", Value = \""..v.."\"")
172       if web_url then
173         submitValue(sensor_type,sensor_id,k,v)
174       end  
175       if mqtt_client then
176         mqtt_path=string.gsub(mqtt_topic,"{(.-)}", 
177           function (name) 
178             if name=="dev" then
179               return mqtt_encode(web_devid)
180             elseif name=="type" then
181               return mqtt_encode(sensor_type)
182             elseif name=="id" then
183               return mqtt_encode(sensor_id)
184             elseif name=="param" then
185               return k
186             else
187               return '{'..name..'}'
188             end      
189           end)
190         if not mqtt_client:socket() then
191           mqtt_client:reconnect()
192         end  
193         mqtt_client:publish(mqtt_path,v,0,false)
194         mqtt_client:loop()
195       end  
196     end
197   else
198     printLog("Cannot parse sensor input: "..str)
199   end
200
201 end
202
203 function processLine(str)
204
205   msg=split(line,':')
206   msg_type=msg[1] or ''
207   msg_body=msg[2] or ''
208   if msg_type=="STATUS" then
209     printLog("Status: "..msg_body)
210   elseif msg_type=="ERROR" then
211     printLog("Error: "..msg_body)  
212   elseif msg_type=="SENSOR" then
213     printLog("SENSOR: "..msg_body)  
214     sens = split(msg_body,",")
215     sensor = {}
216     idx = 1
217     sensor_type = nil
218     sensor_id = web_devid
219     for i,rec in ipairs(sens) do
220       recrd=split(rec,'=')
221       key=recrd[1] or ''
222       value=recrd[2] or ''
223       if value then
224         if key=="TYPE" then
225           sensor_type=value
226         elseif key=="ID" then
227           sensor_id=value
228         else
229           sensor[key]=value
230         end
231       end
232     end
233     if not (sensor_type==nil or sensor_id==nil or sensor_type=='' or sensor_id=='') then
234       local record = {}
235       for k,v in pairs(sensor) do
236         storeRecord(sensor_id,sensor_type,k,v)
237         printLog("Type = "..sensor_type..", ID = "..sensor_id..", Param = "..k..", Value = "..v)
238         if web_url then 
239           submitValue(sensor_type,sensor_id,k,v)
240         end  
241         if mqtt_client then
242           mqtt_path=string.gsub(mqtt_topic,"{(.-)}", 
243             function (name) 
244               if name=="dev" then
245                 return web_devid
246               elseif name=="type" then
247                 return sensor_type
248               elseif name=="id" then
249                 return sensor_id
250               elseif name=="param" then
251                 return k
252               else
253                 return '{'..name..'}'
254               end      
255             end)
256           if not mqtt_client:socket() then
257             mqtt_client:reconnect()
258           end  
259           mqtt_client:publish(mqtt_path,v,0,false)
260           mqtt_client:loop()
261         end  
262       end
263     else
264       printLog("Cannot parse sensor input: "..msg_body)
265     end
266   elseif msg_type=="ALARM" then
267     printLog("ALARM: "..msg_body)  
268     sens = split(msg_body,",")
269     sensor = {}
270     idx = 1
271     sensor_type = nil
272     sensor_id = web_devid
273     mqtt_param = {}
274     for i,rec in ipairs(sens) do
275       recrd=split(rec,'=')
276       key=recrd[1] or ''
277       value=recrd[2] or ''
278       if value then
279         if key=="TYPE" then
280           alarm_type=value
281         elseif key=="ID" then
282           alarm_id=value
283         end
284       end
285     end
286     if not (alarm_type==nil or alarm_id==nil or alarm_type=='' or alarm_id=='') then
287       if mqtt_client then
288         mqtt_path=string.gsub(mqtt_alarm_topic,"{(.-)}", 
289           function (name) 
290             if name=="dev" then
291               return web_devid
292             elseif name=="type" then
293               return sensor_type
294             elseif name=="id" then
295               return sensor_id
296             else
297               return '{'..name..'}'
298             end      
299           end)
300         if not mqtt_client:socket() then
301           mqtt_client:reconnect()
302         end  
303         mqtt_client:publish(mqtt_path,msg_body,0,false)
304         mqtt_client:loop()
305       end
306       if alarm_exec then
307         command=alarm_exec..
308           " \""..string.gsub(alarm_type,"\"","\\\"")..
309           "\" \""..string.gsub(alarm_id,"\"","\\\"")..
310           "\" \""..string.gsub(msg_body,"\"","\\\"").."\""
311         capture(command)
312       end
313     else
314       printLog("Cannot parse alarm input: "..msg_body)
315     end
316   end
317
318 end
319
320 getConfig(arg[1])
321
322 signal.signal(signal.SIGTERM, function(signum)
323
324   printLog("Terminating...")
325   local pids = get_children()
326   for k,v in pairs(pids) do
327     printLog("Terminating subprocess "..tostring(v).."...")
328     signal.kill(v,signal.SIGTERM)
329   end
330   printLog("Exiting...")
331   os.exit(0)
332
333 end)
334
335 if backlogdb or logdb then
336   local dbdriver = require "luasql.sqlite3"
337   env = assert(dbdriver.sqlite3())
338 end
339
340 if backlogdb then
341   if not file_exists(backlogdb) then
342     touch(backlogdb)
343   end  
344   local backlog_con = assert(env:connect(backlogdb))
345   backlog_con:execute("CREATE TABLE queue(time_stamp datetime,sensor_id varchar(16),sensor varchar(16),param varchar(16),value float)")
346   backlog_con:close()
347 end
348
349 if logdb then
350   if not file_exists(logdb) then
351     touch(logdb)
352   end  
353   local log_con = assert(env:connect(logdb))
354   log_con:execute("CREATE TABLE log(time_stamp datetime,sensor_id varchar(16),sensor varchar(16),param varchar(16),value float)")
355   log_con:execute("CREATE INDEX log_idx ON log(sensor_id,sensor,param,time_stamp)")
356   log_con:close()
357 end
358
359 if web_url then
360   http = require("socket.http")
361   http.TIMEOUT = web_timeout
362 end
363
364 if mqtt_host then
365   MQTT = require "mosquitto"
366   mqtt_client = MQTT.new(mqtt_id)
367   if mqtt_user then
368     mqtt_client:login_set(mqtt_user, mqtt_passwd)
369   end
370   mqtt_client:connect(mqtt_host,mqtt_port)
371 end
372
373 if serial_port then
374   serialin=io.open(serial_port,"r")
375 elseif input_file == "-" then
376   serialin=io.stdin;
377 elseif input_file then
378   serialin=io.open(input_file,"r")
379 elseif input_exec then
380   serialin=io.popen(input_exec,"r")
381 else
382   printLog("No input selected")
383   return
384 end  
385
386 serialin:setvbuf('no')
387
388 records = {}
389
390 while 1 do
391
392   line=serialin:read("*l")
393
394   if line == nil then
395     break
396   end
397
398   pcall(function ()
399   
400     printLog("Received: "..line)
401     if startswith(line,'{') then
402       processJson(line)
403     else
404       processLine(line)
405     end
406
407     if touch_file then
408       pcall(function () 
409         touch(touch_file) 
410       end)
411     end  
412
413     if dump_file then
414       pcall(function ()
415         local f = io.open(dump_file,"w")
416         io.output(f)
417         io.write(json.encode(records))
418         io.close(f)
419       end)  
420     end
421   end)
422     
423 end