Автопереподключение к MQTT и для случая обработки не-JSON (все еще используется на...
[weathermon.git] / bin / weathermon
1 #!/usr/bin/lua
2
3 local json = require("json")
4 local signal = require("posix.signal")
5
6 require "wm_util"
7
8 function getConfig(configname)
9
10   local command,f
11
12   local uci=require("uci")
13   local cur=uci.cursor()
14   local config
15   if configname then
16     config=configname
17   else
18     config="weathermon"
19   end
20
21   web_url  = cur.get(config,"web","url")
22   web_user = cur.get(config,"web","user")
23   web_timeout = cur.get(config,"web","timeout")
24   web_pass = cur.get(config,"web","password")
25  
26   if not web_timeout then
27     web_timeout = 10
28   end
29
30   web_devid = get_devid(config)
31
32   logging = cur.get(config,"logging","enabled") 
33   touch_file = cur.get(config,"logging","touch_file") 
34
35   backlogdb = cur.get(config,"process","backlogdb")
36   logdb = cur.get(config,"process","logdb")
37
38   serial_port = cur.get(config,"serial","port")
39   serial_baud = cur.get(config,"serial","baud")
40
41   input_file = cur.get(config,"input","file")
42   input_exec = cur.get(config,"input","exec")
43   alarm_exec = cur.get(config,"alarm","exec")
44
45   if serial_port then
46
47     command = "stty -F  "..serial_port.." "..serial_baud
48     capture(command)
49
50   end
51
52   mqtt_host = cur.get(config,"mqtt","host")
53   mqtt_port = cur.get(config,"mqtt","port")
54   mqtt_id = cur.get(config,"mqtt","id")
55   mqtt_topic = cur.get(config,"mqtt","topic")
56   mqtt_alarm_topic = cur.get(config,"mqtt","alarm_topic")
57
58   mqtt_user = cur.get(config,"mqtt","user")
59   mqtt_passwd = cur.get(config,"mqtt","password")
60
61   if mqtt_host and not mqtt_id then
62     mqtt_id="weather-"..web_devid
63   end
64
65   if mqtt_host and not mqtt_port then
66     mqtt_port = 1883
67   end
68
69   if mqtt_host and not mqtt_topic then
70     mqtt_topic = 'weathermon/{dev}/{type}/{id}/{param}'
71   end
72
73   if mqtt_host and not mqtt_alarm_topic then
74     mqtt_alarm_topic = 'alarm/{dev}/{type}/{id}'
75   end
76
77   dump_file = cur.get(config,"process","dump_file")
78
79 end
80
81 function printLog(str)
82   if logging=="on" then
83     capture("logger -t weathermon "..str)
84     print(str)  
85   elseif logging=="syslog" then
86     capture("logger -t weathermon "..str)
87   elseif logging=="stdout" then 
88     print(str)  
89   end 
90 end
91
92 function submitValue(type,id,param,val)
93
94   val = tonumber(val)
95
96   if web_url and val then
97
98     local url = web_url.."?stype="..url_encode(type).."&sid="..url_encode(id).."&param="..url_encode(param).."&value="..url_encode(val)
99
100     if web_user then
101       url = url:gsub("//","//"..web_user..":"..web_pass.."@",1)
102     end
103
104     local result,code = http.request (url)
105
106     if code ~= 200 and backlog_con then
107       printLog("writing record to backlog...")
108       backlog_con:execute(string.format("INSERT INTO queue(time_stamp,sensor_id,sensor,param,value) VALUES (datetime('now','localtime'),'%s','%s','%s',%f)",id,type,param,val))
109     end
110
111   end
112   
113   if logdb then
114     log_con:execute(string.format("INSERT INTO log(time_stamp,sensor_id,sensor,param,value) VALUES (datetime('now','localtime'),'%s','%s','%s',%f)",id,type,param,val))
115   end
116
117 end
118
119 function storeRecord(id,sensor,param,value) 
120
121   if not records[id] then
122     records[id] = {}
123   end  
124   
125   records[id]["timestamp"] = os.date("%Y-%m-%dT%H:%M:%S")
126   
127   if not records[id][sensor] then
128     records[id][sensor] = {}
129   end
130   
131   records[id][sensor][param] = value
132
133 end
134
135 function processJson(str)
136
137   msg=json.decode(str)
138
139   sensor={}
140
141   for key,value in pairs(msg) do
142     if value then
143       if key=="model" or key=="device" then
144         sensor_type=value
145       elseif key=="id" then
146         sensor_id=value
147       elseif key=='time' then
148         sensor_time=value
149       else
150         sensor[key]=value
151       end
152     end
153   end
154
155   if not sensor_id then
156     sensor_id = web_devid
157   end
158
159   if not (sensor_type==nil or sensor_id==nil or sensor_type=='' or sensor_id=='') then
160     if next(sensor)==nil then
161       sensor["command"]="alarm"
162     end
163     local record = {}
164     for k,v in pairs(sensor) do
165       storeRecord(sensor_id,sensor_type,k,v)
166       printLog("Type = "..sensor_type..", ID = "..sensor_id..", Param = "..k..", Value = \""..v.."\"")
167       if web_url then
168         submitValue(sensor_type,sensor_id,k,v)
169       end  
170       if mqtt_client then
171         mqtt_path=string.gsub(mqtt_topic,"{(.-)}", 
172           function (name) 
173             if name=="dev" then
174               return mqtt_encode(web_devid)
175             elseif name=="type" then
176               return mqtt_encode(sensor_type)
177             elseif name=="id" then
178               return mqtt_encode(sensor_id)
179             elseif name=="param" then
180               return k
181             else
182               return '{'..name..'}'
183             end      
184           end)
185         if not mqtt_client:socket() then
186           mqtt_client:reconnect()
187         end  
188         mqtt_client:publish(mqtt_path,v,0,false)
189         mqtt_client:loop()
190       end  
191     end
192   else
193     printLog("Cannot parse sensor input: "..str)
194   end
195
196 end
197
198 function processLine(str)
199
200   msg=split(line,':')
201   msg_type=msg[1] or ''
202   msg_body=msg[2] or ''
203   if msg_type=="STATUS" then
204     printLog("Status: "..msg_body)
205   elseif msg_type=="ERROR" then
206     printLog("Error: "..msg_body)  
207   elseif msg_type=="SENSOR" then
208     printLog("SENSOR: "..msg_body)  
209     sens = split(msg_body,",")
210     sensor = {}
211     idx = 1
212     sensor_type = nil
213     sensor_id = web_devid
214     for i,rec in ipairs(sens) do
215       recrd=split(rec,'=')
216       key=recrd[1] or ''
217       value=recrd[2] or ''
218       if value then
219         if key=="TYPE" then
220           sensor_type=value
221         elseif key=="ID" then
222           sensor_id=value
223         else
224           sensor[key]=value
225         end
226       end
227     end
228     if not (sensor_type==nil or sensor_id==nil or sensor_type=='' or sensor_id=='') then
229       local record = {}
230       for k,v in pairs(sensor) do
231         storeRecord(sensor_id,sensor_type,k,v)
232         printLog("Type = "..sensor_type..", ID = "..sensor_id..", Param = "..k..", Value = "..v)
233         if web_url then 
234           submitValue(sensor_type,sensor_id,k,v)
235         end  
236         if mqtt_client then
237           mqtt_path=string.gsub(mqtt_topic,"{(.-)}", 
238             function (name) 
239               if name=="dev" then
240                 return web_devid
241               elseif name=="type" then
242                 return sensor_type
243               elseif name=="id" then
244                 return sensor_id
245               elseif name=="param" then
246                 return k
247               else
248                 return '{'..name..'}'
249               end      
250             end)
251           if not mqtt_client:socket() then
252             mqtt_client:reconnect()
253           end  
254           mqtt_client:publish(mqtt_path,v,0,false)
255           mqtt_client:loop()
256         end  
257       end
258     else
259       printLog("Cannot parse sensor input: "..msg_body)
260     end
261   elseif msg_type=="ALARM" then
262     printLog("ALARM: "..msg_body)  
263     sens = split(msg_body,",")
264     sensor = {}
265     idx = 1
266     sensor_type = nil
267     sensor_id = web_devid
268     mqtt_param = {}
269     for i,rec in ipairs(sens) do
270       recrd=split(rec,'=')
271       key=recrd[1] or ''
272       value=recrd[2] or ''
273       if value then
274         if key=="TYPE" then
275           alarm_type=value
276         elseif key=="ID" then
277           alarm_id=value
278         end
279       end
280     end
281     if not (alarm_type==nil or alarm_id==nil or alarm_type=='' or alarm_id=='') then
282       if mqtt_client then
283         mqtt_path=string.gsub(mqtt_alarm_topic,"{(.-)}", 
284           function (name) 
285             if name=="dev" then
286               return web_devid
287             elseif name=="type" then
288               return sensor_type
289             elseif name=="id" then
290               return sensor_id
291             else
292               return '{'..name..'}'
293             end      
294           end)
295         if not mqtt_client:socket() then
296           mqtt_client:reconnect()
297         end  
298         mqtt_client:publish(mqtt_path,msg_body,0,false)
299         mqtt_client:loop()
300       end
301       if alarm_exec then
302         command=alarm_exec..
303           " \""..string.gsub(alarm_type,"\"","\\\"")..
304           "\" \""..string.gsub(alarm_id,"\"","\\\"")..
305           "\" \""..string.gsub(msg_body,"\"","\\\"").."\""
306         capture(command)
307       end
308     else
309       printLog("Cannot parse alarm input: "..msg_body)
310     end
311   end
312
313 end
314
315 getConfig(arg[1])
316
317 signal.signal(signal.SIGTERM, function(signum)
318
319   printLog("Terminating...")
320   local pids = get_children()
321   for k,v in pairs(pids) do
322     printLog("Terminating subprocess "..tostring(v).."...")
323     signal.kill(v,signal.SIGTERM)
324   end
325   printLog("Exiting...")
326   os.exit(0)
327
328 end)
329
330 if backlogdb or logdb then
331   local dbdriver = require "luasql.sqlite3"
332   env = assert(dbdriver.sqlite3())
333 end
334
335 if backlogdb then
336   if not file_exists(backlogdb) then
337     touch(backlogdb)
338   end  
339   backlog_con = assert(env:connect(backlogdb))
340   backlog_con:execute("CREATE TABLE queue(time_stamp datetime,sensor_id varchar(16),sensor varchar(16),param varchar(16),value float)")
341 end
342
343 if logdb then
344   if not file_exists(logdb) then
345     touch(logdb)
346   end  
347   log_con = assert(env:connect(logdb))
348   log_con:execute("CREATE TABLE log(time_stamp datetime,sensor_id varchar(16),sensor varchar(16),param varchar(16),value float)")
349   log_con:execute("CREATE INDEX log_idx ON log(sensor_id,sensor,param,time_stamp)")
350 end
351
352 if web_url then
353   http = require("socket.http")
354   http.TIMEOUT = web_timeout
355 end
356
357 if mqtt_host then
358   MQTT = require "mosquitto"
359   mqtt_client = MQTT.new(mqtt_id)
360   if mqtt_user then
361     mqtt_client:login_set(mqtt_user, mqtt_passwd)
362   end
363   mqtt_client:connect(mqtt_host,mqtt_port)
364 end
365
366 if serial_port then
367   serialin=io.open(serial_port,"r")
368 elseif input_file == "-" then
369   serialin=io.stdin;
370 elseif input_file then
371   serialin=io.open(input_file,"r")
372 elseif input_exec then
373   serialin=io.popen(input_exec,"r")
374 else
375   printLog("No input selected")
376   return
377 end  
378
379 serialin:setvbuf('no')
380
381 records = {}
382
383 while 1 do
384
385   line=serialin:read("*l")
386
387   if line == nil then
388     break
389   end
390
391   pcall(function ()
392   
393     printLog("Received: "..line)
394     if startswith(line,'{') then
395       processJson(line)
396     else
397       processLine(line)
398     end
399
400     if touch_file then
401       pcall(function () 
402         touch(touch_file) 
403       end)
404     end  
405
406     if dump_file then
407       pcall(function ()
408         local f = io.open(dump_file,"w")
409         io.output(f)
410         io.write(json.encode(records))
411         io.close(f)
412       end)  
413     end
414   end)
415     
416 end