Исправлена ошибка с непредумышленным установлением статуса retained для сообщений.
[weathermon.git] / bin / weathermon
1 #!/usr/bin/lua
2
3 local json = require("json")
4 local signal = require("posix.signal")
5
6 require "wm_util"
7
8 function getConfig(configname)
9
10   local command,f
11
12   local uci=require("uci")
13   local cur=uci.cursor()
14   local config
15   if configname then
16     config=configname
17   else
18     config="weathermon"
19   end
20
21   web_url  = cur.get(config,"web","url")
22   web_user = cur.get(config,"web","user")
23   web_timeout = cur.get(config,"web","timeout")
24   web_pass = cur.get(config,"web","password")
25  
26   if not web_timeout then
27     web_timeout = 10
28   end
29
30   web_devid = get_devid(config)
31
32   logging = cur.get(config,"logging","enabled") 
33   touch_file = cur.get(config,"logging","touch_file") 
34
35   backlogdb = cur.get(config,"process","backlogdb")
36   logdb = cur.get(config,"process","logdb")
37
38   serial_port = cur.get(config,"serial","port")
39   serial_baud = cur.get(config,"serial","baud")
40
41   input_file = cur.get(config,"input","file")
42   input_exec = cur.get(config,"input","exec")
43   alarm_exec = cur.get(config,"alarm","exec")
44
45   if serial_port then
46
47     command = "stty -F  "..serial_port.." "..serial_baud
48     capture(command)
49
50   end
51
52   mqtt_host = cur.get(config,"mqtt","host")
53   mqtt_port = cur.get(config,"mqtt","port")
54   mqtt_id = cur.get(config,"mqtt","id")
55   mqtt_topic = cur.get(config,"mqtt","topic")
56   mqtt_alarm_topic = cur.get(config,"mqtt","alarm_topic")
57
58   mqtt_user = cur.get(config,"mqtt","user")
59   mqtt_passwd = cur.get(config,"mqtt","password")
60
61   if mqtt_host and not mqtt_id then
62     mqtt_id="weather-"..web_devid
63   end
64
65   if mqtt_host and not mqtt_port then
66     mqtt_port = 1883
67   end
68
69   if mqtt_host and not mqtt_topic then
70     mqtt_topic = 'weathermon/{dev}/{type}/{id}/{param}'
71   end
72
73   if mqtt_host and not mqtt_alarm_topic then
74     mqtt_alarm_topic = 'alarm/{dev}/{type}/{id}'
75   end
76
77   dump_file = cur.get(config,"process","dump_file")
78
79 end
80
81 function printLog(str)
82   if logging=="on" then
83     capture("logger -t weathermon "..str)
84     print(str)  
85   elseif logging=="syslog" then
86     capture("logger -t weathermon "..str)
87   elseif logging=="stdout" then 
88     print(str)  
89   end 
90 end
91
92 function submitValue(type,id,param,val)
93
94   val = tonumber(val)
95
96   if web_url and val then
97
98     local url = web_url.."?stype="..url_encode(type).."&sid="..url_encode(id).."&param="..url_encode(param).."&value="..url_encode(val)
99
100     if web_user then
101       url = url:gsub("//","//"..web_user..":"..web_pass.."@",1)
102     end
103
104     local result,code = http.request (url)
105
106     if code ~= 200 and backlog_con then
107       printLog("writing record to backlog...")
108       backlog_con:execute(string.format("INSERT INTO queue(time_stamp,sensor_id,sensor,param,value) VALUES (datetime('now','localtime'),'%s','%s','%s',%f)",id,type,param,val))
109     end
110
111   end
112   
113   if logdb then
114     log_con:execute(string.format("INSERT INTO log(time_stamp,sensor_id,sensor,param,value) VALUES (datetime('now','localtime'),'%s','%s','%s',%f)",id,type,param,val))
115   end
116
117 end
118
119 function storeRecord(id,sensor,param,value) 
120
121   if not records[id] then
122     records[id] = {}
123   end  
124   
125   records[id]["timestamp"] = os.date("%Y-%m-%dT%H:%M:%S")
126   
127   if not records[id][sensor] then
128     records[id][sensor] = {}
129   end
130   
131   records[id][sensor][param] = value
132
133 end
134
135 function processJson(str)
136
137   msg=json.decode(str)
138
139   sensor={}
140
141   for key,value in pairs(msg) do
142     if value then
143       if key=="model" or key=="device" then
144         sensor_type=value
145       elseif key=="id" then
146         sensor_id=value
147       elseif key=='time' then
148         sensor_time=value
149       else
150         sensor[key]=value
151       end
152     end
153   end
154
155   if not sensor_id then
156     sensor_id = web_devid
157   end
158
159   if not (sensor_type==nil or sensor_id==nil or sensor_type=='' or sensor_id=='') then
160     if next(sensor)==nil then
161       sensor["command"]="alarm"
162     end
163     local record = {}
164     for k,v in pairs(sensor) do
165       storeRecord(sensor_id,sensor_type,k,v)
166       printLog("Type = "..sensor_type..", ID = "..sensor_id..", Param = "..k..", Value = \""..v.."\"")
167       if web_url then
168         submitValue(sensor_type,sensor_id,k,v)
169       end  
170       if mqtt_client then
171         mqtt_path=string.gsub(mqtt_topic,"{(.-)}", 
172           function (name) 
173             if name=="dev" then
174               return mqtt_encode(web_devid)
175             elseif name=="type" then
176               return mqtt_encode(sensor_type)
177             elseif name=="id" then
178               return mqtt_encode(sensor_id)
179             elseif name=="param" then
180               return k
181             else
182               return '{'..name..'}'
183             end      
184           end)
185         if not mqtt_client:socket() then
186           mqtt_client:reconnect()
187         end  
188         mqtt_client:publish(mqtt_path,v,0,false)
189         mqtt_client:loop()
190       end  
191     end
192   else
193     printLog("Cannot parse sensor input: "..str)
194   end
195
196 end
197
198 function processLine(str)
199
200   msg=split(line,':')
201   msg_type=msg[1] or ''
202   msg_body=msg[2] or ''
203   if msg_type=="STATUS" then
204     printLog("Status: "..msg_body)
205   elseif msg_type=="ERROR" then
206     printLog("Error: "..msg_body)  
207   elseif msg_type=="SENSOR" then
208     printLog("SENSOR: "..msg_body)  
209     sens = split(msg_body,",")
210     sensor = {}
211     idx = 1
212     sensor_type = nil
213     sensor_id = web_devid
214     for i,rec in ipairs(sens) do
215       recrd=split(rec,'=')
216       key=recrd[1] or ''
217       value=recrd[2] or ''
218       if value then
219         if key=="TYPE" then
220           sensor_type=value
221         elseif key=="ID" then
222           sensor_id=value
223         else
224           sensor[key]=value
225         end
226       end
227     end
228     if not (sensor_type==nil or sensor_id==nil or sensor_type=='' or sensor_id=='') then
229       local record = {}
230       for k,v in pairs(sensor) do
231         storeRecord(sensor_id,sensor_type,k,v)
232         printLog("Type = "..sensor_type..", ID = "..sensor_id..", Param = "..k..", Value = "..v)
233         if web_url then 
234           submitValue(sensor_type,sensor_id,k,v)
235         end  
236         if mqtt_client then
237           mqtt_path=string.gsub(mqtt_topic,"{(.-)}", 
238             function (name) 
239               if name=="dev" then
240                 return web_devid
241               elseif name=="type" then
242                 return sensor_type
243               elseif name=="id" then
244                 return sensor_id
245               elseif name=="param" then
246                 return k
247               else
248                 return '{'..name..'}'
249               end      
250             end)
251           mqtt_client:publish(mqtt_path,v,0,false)
252           mqtt_client:loop()
253         end  
254       end
255     else
256       printLog("Cannot parse sensor input: "..msg_body)
257     end
258   elseif msg_type=="ALARM" then
259     printLog("ALARM: "..msg_body)  
260     sens = split(msg_body,",")
261     sensor = {}
262     idx = 1
263     sensor_type = nil
264     sensor_id = web_devid
265     mqtt_param = {}
266     for i,rec in ipairs(sens) do
267       recrd=split(rec,'=')
268       key=recrd[1] or ''
269       value=recrd[2] or ''
270       if value then
271         if key=="TYPE" then
272           alarm_type=value
273         elseif key=="ID" then
274           alarm_id=value
275         end
276       end
277     end
278     if not (alarm_type==nil or alarm_id==nil or alarm_type=='' or alarm_id=='') then
279       if mqtt_client then
280         mqtt_path=string.gsub(mqtt_alarm_topic,"{(.-)}", 
281           function (name) 
282             if name=="dev" then
283               return web_devid
284             elseif name=="type" then
285               return sensor_type
286             elseif name=="id" then
287               return sensor_id
288             else
289               return '{'..name..'}'
290             end      
291           end)
292         mqtt_client:publish(mqtt_path,msg_body,0,false)
293         mqtt_client:loop()
294       end
295       if alarm_exec then
296         command=alarm_exec..
297           " \""..string.gsub(alarm_type,"\"","\\\"")..
298           "\" \""..string.gsub(alarm_id,"\"","\\\"")..
299           "\" \""..string.gsub(msg_body,"\"","\\\"").."\""
300         capture(command)
301       end
302     else
303       printLog("Cannot parse alarm input: "..msg_body)
304     end
305   end
306
307 end
308
309 getConfig(arg[1])
310
311 signal.signal(signal.SIGTERM, function(signum)
312
313   printLog("Terminating...")
314   local pids = get_children()
315   for k,v in pairs(pids) do
316     printLog("Terminating subprocess "..tostring(v).."...")
317     signal.kill(v,signal.SIGTERM)
318   end
319   printLog("Exiting...")
320   os.exit(0)
321
322 end)
323
324 if backlogdb or logdb then
325   local dbdriver = require "luasql.sqlite3"
326   env = assert(dbdriver.sqlite3())
327 end
328
329 if backlogdb then
330   if not file_exists(backlogdb) then
331     touch(backlogdb)
332   end  
333   backlog_con = assert(env:connect(backlogdb))
334   backlog_con:execute("CREATE TABLE queue(time_stamp datetime,sensor_id varchar(16),sensor varchar(16),param varchar(16),value float)")
335 end
336
337 if logdb then
338   if not file_exists(logdb) then
339     touch(logdb)
340   end  
341   log_con = assert(env:connect(logdb))
342   log_con:execute("CREATE TABLE log(time_stamp datetime,sensor_id varchar(16),sensor varchar(16),param varchar(16),value float)")
343   log_con:execute("CREATE INDEX log_idx ON log(sensor_id,sensor,param,time_stamp)")
344 end
345
346 if web_url then
347   http = require("socket.http")
348   http.TIMEOUT = web_timeout
349 end
350
351 if mqtt_host then
352   MQTT = require "mosquitto"
353   mqtt_client = MQTT.new(mqtt_id)
354   if mqtt_user then
355     mqtt_client:login_set(mqtt_user, mqtt_passwd)
356   end
357   mqtt_client:connect(mqtt_host,mqtt_port)
358 end
359
360 if serial_port then
361   serialin=io.open(serial_port,"r")
362 elseif input_file == "-" then
363   serialin=io.stdin;
364 elseif input_file then
365   serialin=io.open(input_file,"r")
366 elseif input_exec then
367   serialin=io.popen(input_exec,"r")
368 else
369   printLog("No input selected")
370   return
371 end  
372
373 serialin:setvbuf('no')
374
375 records = {}
376
377 while 1 do
378
379   line=serialin:read("*l")
380
381   if line == nil then
382     break
383   end
384
385   pcall(function ()
386   
387     printLog("Received: "..line)
388     if startswith(line,'{') then
389       processJson(line)
390     else
391       processLine(line)
392     end
393
394     if touch_file then
395       pcall(function () 
396         touch(touch_file) 
397       end)
398     end  
399
400     if dump_file then
401       pcall(function ()
402         local f = io.open(dump_file,"w")
403         io.output(f)
404         io.write(json.encode(records))
405         io.close(f)
406       end)  
407     end
408   end)
409     
410 end