Транзакционная работа с БД для избежания блокировок.
[weathermon.git] / bin / weathermon
1 #!/usr/bin/lua
2
3 local json = require("json")
4 local signal = require("posix.signal")
5
6 require "wm_util"
7
8 function getConfig(configname)
9
10   local command,f
11
12   local uci=require("uci")
13   local cur=uci.cursor()
14   local config
15   if configname then
16     config=configname
17   else
18     config="weathermon"
19   end
20
21   web_url  = cur.get(config,"web","url")
22   web_user = cur.get(config,"web","user")
23   web_timeout = cur.get(config,"web","timeout")
24   web_pass = cur.get(config,"web","password")
25  
26   if not web_timeout then
27     web_timeout = 10
28   end
29
30   web_devid = get_devid(config)
31
32   logging = cur.get(config,"logging","enabled") 
33   touch_file = cur.get(config,"logging","touch_file") 
34
35   backlogdb = cur.get(config,"process","backlogdb")
36   logdb = cur.get(config,"process","logdb")
37
38   serial_port = cur.get(config,"serial","port")
39   serial_baud = cur.get(config,"serial","baud")
40
41   input_file = cur.get(config,"input","file")
42   input_exec = cur.get(config,"input","exec")
43   alarm_exec = cur.get(config,"alarm","exec")
44
45   if serial_port then
46
47     command = "stty -F  "..serial_port.." "..serial_baud
48     capture(command)
49
50   end
51
52   mqtt_host = cur.get(config,"mqtt","host")
53   mqtt_port = cur.get(config,"mqtt","port")
54   mqtt_id = cur.get(config,"mqtt","id")
55   mqtt_topic = cur.get(config,"mqtt","topic")
56   mqtt_alarm_topic = cur.get(config,"mqtt","alarm_topic")
57
58   mqtt_user = cur.get(config,"mqtt","user")
59   mqtt_passwd = cur.get(config,"mqtt","password")
60
61   if mqtt_host and not mqtt_id then
62     mqtt_id="weather-"..web_devid
63   end
64
65   if mqtt_host and not mqtt_port then
66     mqtt_port = 1883
67   end
68
69   if mqtt_host and not mqtt_topic then
70     mqtt_topic = 'weathermon/{dev}/{type}/{id}/{param}'
71   end
72
73   if mqtt_host and not mqtt_alarm_topic then
74     mqtt_alarm_topic = 'alarm/{dev}/{type}/{id}'
75   end
76
77   dump_file = cur.get(config,"process","dump_file")
78
79 end
80
81 function printLog(str)
82   if logging=="on" then
83     capture("logger -t weathermon "..str)
84     print(str)  
85   elseif logging=="syslog" then
86     capture("logger -t weathermon "..str)
87   elseif logging=="stdout" then 
88     print(str)  
89   end 
90 end
91
92 function submitValue(type,id,param,val)
93
94   val = tonumber(val)
95
96   if web_url and val then
97
98     local url = web_url.."?stype="..url_encode(type).."&sid="..url_encode(id).."&param="..url_encode(param).."&value="..url_encode(val)
99
100     if web_user then
101       url = url:gsub("//","//"..web_user..":"..web_pass.."@",1)
102     end
103
104     local result,code = http.request (url)
105
106     if code ~= 200 and backlog_con then
107       printLog("writing record to backlog...")
108       backlog_con:execute('BEGIN TRANSACTION')
109       backlog_con:execute(string.format("INSERT INTO queue(time_stamp,sensor_id,sensor,param,value) VALUES (datetime('now','localtime'),'%s','%s','%s',%f)",id,type,param,val))
110       backlog_con:execute('COMMIT')
111     end
112
113   end
114   
115   if logdb then
116     log_con:execute('BEGIN TRANSACTION')
117     log_con:execute(string.format("INSERT INTO log(time_stamp,sensor_id,sensor,param,value) VALUES (datetime('now','localtime'),'%s','%s','%s',%f)",id,type,param,val))
118     log_con:execute('COMMIT')
119   end
120
121 end
122
123 function storeRecord(id,sensor,param,value) 
124
125   if not records[id] then
126     records[id] = {}
127   end  
128   
129   records[id]["timestamp"] = os.date("%Y-%m-%dT%H:%M:%S")
130   
131   if not records[id][sensor] then
132     records[id][sensor] = {}
133   end
134   
135   records[id][sensor][param] = value
136
137 end
138
139 function processJson(str)
140
141   msg=json.decode(str)
142
143   sensor={}
144
145   for key,value in pairs(msg) do
146     if value then
147       if key=="model" or key=="device" then
148         sensor_type=value
149       elseif key=="id" then
150         sensor_id=value
151       elseif key=='time' then
152         sensor_time=value
153       else
154         sensor[key]=value
155       end
156     end
157   end
158
159   if not sensor_id then
160     sensor_id = web_devid
161   end
162
163   if not (sensor_type==nil or sensor_id==nil or sensor_type=='' or sensor_id=='') then
164     local record = {}
165     for k,v in pairs(sensor) do
166       storeRecord(sensor_id,sensor_type,k,v)
167       printLog("Type = "..sensor_type..", ID = "..sensor_id..", Param = "..k..", Value = \""..v.."\"")
168       if web_url then
169         submitValue(sensor_type,sensor_id,k,v)
170       end  
171       if mqtt_client then
172         mqtt_path=string.gsub(mqtt_topic,"{(.-)}", 
173           function (name) 
174             if name=="dev" then
175               return mqtt_encode(web_devid)
176             elseif name=="type" then
177               return mqtt_encode(sensor_type)
178             elseif name=="id" then
179               return mqtt_encode(sensor_id)
180             elseif name=="param" then
181               return k
182             else
183               return '{'..name..'}'
184             end      
185           end)
186         if not mqtt_client:socket() then
187           mqtt_client:reconnect()
188         end  
189         mqtt_client:publish(mqtt_path,v,0,false)
190         mqtt_client:loop()
191       end  
192     end
193   else
194     printLog("Cannot parse sensor input: "..str)
195   end
196
197 end
198
199 function processLine(str)
200
201   msg=split(line,':')
202   msg_type=msg[1] or ''
203   msg_body=msg[2] or ''
204   if msg_type=="STATUS" then
205     printLog("Status: "..msg_body)
206   elseif msg_type=="ERROR" then
207     printLog("Error: "..msg_body)  
208   elseif msg_type=="SENSOR" then
209     printLog("SENSOR: "..msg_body)  
210     sens = split(msg_body,",")
211     sensor = {}
212     idx = 1
213     sensor_type = nil
214     sensor_id = web_devid
215     for i,rec in ipairs(sens) do
216       recrd=split(rec,'=')
217       key=recrd[1] or ''
218       value=recrd[2] or ''
219       if value then
220         if key=="TYPE" then
221           sensor_type=value
222         elseif key=="ID" then
223           sensor_id=value
224         else
225           sensor[key]=value
226         end
227       end
228     end
229     if not (sensor_type==nil or sensor_id==nil or sensor_type=='' or sensor_id=='') then
230       local record = {}
231       for k,v in pairs(sensor) do
232         storeRecord(sensor_id,sensor_type,k,v)
233         printLog("Type = "..sensor_type..", ID = "..sensor_id..", Param = "..k..", Value = "..v)
234         if web_url then 
235           submitValue(sensor_type,sensor_id,k,v)
236         end  
237         if mqtt_client then
238           mqtt_path=string.gsub(mqtt_topic,"{(.-)}", 
239             function (name) 
240               if name=="dev" then
241                 return web_devid
242               elseif name=="type" then
243                 return sensor_type
244               elseif name=="id" then
245                 return sensor_id
246               elseif name=="param" then
247                 return k
248               else
249                 return '{'..name..'}'
250               end      
251             end)
252           if not mqtt_client:socket() then
253             mqtt_client:reconnect()
254           end  
255           mqtt_client:publish(mqtt_path,v,0,false)
256           mqtt_client:loop()
257         end  
258       end
259     else
260       printLog("Cannot parse sensor input: "..msg_body)
261     end
262   elseif msg_type=="ALARM" then
263     printLog("ALARM: "..msg_body)  
264     sens = split(msg_body,",")
265     sensor = {}
266     idx = 1
267     sensor_type = nil
268     sensor_id = web_devid
269     mqtt_param = {}
270     for i,rec in ipairs(sens) do
271       recrd=split(rec,'=')
272       key=recrd[1] or ''
273       value=recrd[2] or ''
274       if value then
275         if key=="TYPE" then
276           alarm_type=value
277         elseif key=="ID" then
278           alarm_id=value
279         end
280       end
281     end
282     if not (alarm_type==nil or alarm_id==nil or alarm_type=='' or alarm_id=='') then
283       if mqtt_client then
284         mqtt_path=string.gsub(mqtt_alarm_topic,"{(.-)}", 
285           function (name) 
286             if name=="dev" then
287               return web_devid
288             elseif name=="type" then
289               return sensor_type
290             elseif name=="id" then
291               return sensor_id
292             else
293               return '{'..name..'}'
294             end      
295           end)
296         if not mqtt_client:socket() then
297           mqtt_client:reconnect()
298         end  
299         mqtt_client:publish(mqtt_path,msg_body,0,false)
300         mqtt_client:loop()
301       end
302       if alarm_exec then
303         command=alarm_exec..
304           " \""..string.gsub(alarm_type,"\"","\\\"")..
305           "\" \""..string.gsub(alarm_id,"\"","\\\"")..
306           "\" \""..string.gsub(msg_body,"\"","\\\"").."\""
307         capture(command)
308       end
309     else
310       printLog("Cannot parse alarm input: "..msg_body)
311     end
312   end
313
314 end
315
316 getConfig(arg[1])
317
318 signal.signal(signal.SIGTERM, function(signum)
319
320   printLog("Terminating...")
321   local pids = get_children()
322   for k,v in pairs(pids) do
323     printLog("Terminating subprocess "..tostring(v).."...")
324     signal.kill(v,signal.SIGTERM)
325   end
326   printLog("Exiting...")
327   os.exit(0)
328
329 end)
330
331 if backlogdb or logdb then
332   local dbdriver = require "luasql.sqlite3"
333   env = assert(dbdriver.sqlite3())
334 end
335
336 if backlogdb then
337   if not file_exists(backlogdb) then
338     touch(backlogdb)
339   end  
340   backlog_con = assert(env:connect(backlogdb))
341   backlog_con:execute("CREATE TABLE queue(time_stamp datetime,sensor_id varchar(16),sensor varchar(16),param varchar(16),value float)")
342 end
343
344 if logdb then
345   if not file_exists(logdb) then
346     touch(logdb)
347   end  
348   log_con = assert(env:connect(logdb))
349   log_con:execute("CREATE TABLE log(time_stamp datetime,sensor_id varchar(16),sensor varchar(16),param varchar(16),value float)")
350   log_con:execute("CREATE INDEX log_idx ON log(sensor_id,sensor,param,time_stamp)")
351 end
352
353 if web_url then
354   http = require("socket.http")
355   http.TIMEOUT = web_timeout
356 end
357
358 if mqtt_host then
359   MQTT = require "mosquitto"
360   mqtt_client = MQTT.new(mqtt_id)
361   if mqtt_user then
362     mqtt_client:login_set(mqtt_user, mqtt_passwd)
363   end
364   mqtt_client:connect(mqtt_host,mqtt_port)
365 end
366
367 if serial_port then
368   serialin=io.open(serial_port,"r")
369 elseif input_file == "-" then
370   serialin=io.stdin;
371 elseif input_file then
372   serialin=io.open(input_file,"r")
373 elseif input_exec then
374   serialin=io.popen(input_exec,"r")
375 else
376   printLog("No input selected")
377   return
378 end  
379
380 serialin:setvbuf('no')
381
382 records = {}
383
384 while 1 do
385
386   line=serialin:read("*l")
387
388   if line == nil then
389     break
390   end
391
392   pcall(function ()
393   
394     printLog("Received: "..line)
395     if startswith(line,'{') then
396       processJson(line)
397     else
398       processLine(line)
399     end
400
401     if touch_file then
402       pcall(function () 
403         touch(touch_file) 
404       end)
405     end  
406
407     if dump_file then
408       pcall(function ()
409         local f = io.open(dump_file,"w")
410         io.output(f)
411         io.write(json.encode(records))
412         io.close(f)
413       end)  
414     end
415   end)
416     
417 end