Все-таки переподключение к брокеру лучше выполнять вручную...
[weathermon.git] / bin / weathermon
1 #!/usr/bin/lua
2
3 local json = require("json")
4
5 require "wm_util"
6
7 function getConfig(configname)
8
9   local command,f
10
11   local uci=require("uci")
12   local cur=uci.cursor()
13   local config
14   if configname then
15     config=configname
16   else
17     config="weathermon"
18   end
19
20   web_url  = cur.get(config,"web","url")
21   web_user = cur.get(config,"web","user")
22   web_timeout = cur.get(config,"web","timeout")
23   web_pass = cur.get(config,"web","password")
24  
25   if not web_timeout then
26     web_timeout = 10
27   end
28
29   web_devid = get_devid(config)
30
31   logging = cur.get(config,"logging","enabled") 
32   touch_file = cur.get(config,"logging","touch_file") 
33
34   backlogdb = cur.get(config,"process","backlogdb")
35   logdb = cur.get(config,"process","logdb")
36
37   serial_port = cur.get(config,"serial","port")
38   serial_baud = cur.get(config,"serial","baud")
39
40   input_file = cur.get(config,"input","file")
41   input_exec = cur.get(config,"input","exec")
42   alarm_exec = cur.get(config,"alarm","exec")
43
44   if serial_port then
45
46     command = "stty -F  "..serial_port.." "..serial_baud
47     capture(command)
48
49   end
50
51   mqtt_host = cur.get(config,"mqtt","host")
52   mqtt_port = cur.get(config,"mqtt","port")
53   mqtt_id = cur.get(config,"mqtt","id")
54   mqtt_topic = cur.get(config,"mqtt","topic")
55   mqtt_alarm_topic = cur.get(config,"mqtt","alarm_topic")
56
57   mqtt_user = cur.get(config,"mqtt","user")
58   mqtt_passwd = cur.get(config,"mqtt","password")
59
60   if mqtt_host and not mqtt_id then
61     mqtt_id="weather-"..web_devid
62   end
63
64   if mqtt_host and not mqtt_port then
65     mqtt_port = 1883
66   end
67
68   if mqtt_host and not mqtt_topic then
69     mqtt_topic = 'weathermon/{dev}/{type}/{id}/{param}'
70   end
71
72   if mqtt_host and not mqtt_alarm_topic then
73     mqtt_alarm_topic = 'alarm/{dev}/{type}/{id}'
74   end
75
76   dump_file = cur.get(config,"process","dump_file")
77
78 end
79
80 function printLog(str)
81   if logging=="on" then
82     capture("logger -t weathermon "..str)
83     print(str)  
84   elseif logging=="syslog" then
85     capture("logger -t weathermon "..str)
86   elseif logging=="stdout" then 
87     print(str)  
88   end 
89 end
90
91 function submitValue(type,id,param,val)
92
93   val = tonumber(val)
94
95   if web_url and val then
96
97     local url = web_url.."?stype="..url_encode(type).."&sid="..url_encode(id).."&param="..url_encode(param).."&value="..url_encode(val)
98
99     if web_user then
100       url = url:gsub("//","//"..web_user..":"..web_pass.."@",1)
101     end
102
103     local result,code = http.request ({
104       url=url, create=function()
105         local req_sock = socket.tcp()
106         req_sock:settimeout(web_timeout)
107         return req_sock
108       end})
109
110     if code ~= 200 and backlog_con then
111       printLog("writing record to backlog...")
112       backlog_con:execute(string.format("INSERT INTO queue(time_stamp,sensor_id,sensor,param,value) VALUES (datetime('now','localtime'),'%s','%s','%s',%f)",id,type,param,val))
113     end
114
115   end
116   
117   if logdb then
118     log_con:execute(string.format("INSERT INTO log(time_stamp,sensor_id,sensor,param,value) VALUES (datetime('now','localtime'),'%s','%s','%s',%f)",id,type,param,val))
119   end
120
121 end
122
123 function storeRecord(id,sensor,param,value) 
124
125   if not records[id] then
126     records[id] = {}
127   end  
128   
129   records[id]["timestamp"] = os.date("%Y-%m-%dT%H:%M:%S")
130   
131   if not records[id][sensor] then
132     records[id][sensor] = {}
133   end
134   
135   records[id][sensor][param] = value
136
137 end
138
139 function processJson(str)
140
141   msg=json.decode(str)
142
143   sensor={}
144
145   for key,value in pairs(msg) do
146     if value then
147       if key=="model" or key=="device" then
148         sensor_type=value
149       elseif key=="id" then
150         sensor_id=value
151       elseif key=='time' then
152         sensor_time=value
153       else
154         sensor[key]=value
155       end
156     end
157   end
158
159   if not sensor_id then
160     sensor_id = web_devid
161   end
162
163   if not (sensor_type==nil or sensor_id==nil or sensor_type=='' or sensor_id=='') then
164     if next(sensor)==nil then
165       sensor["command"]="alarm"
166     end
167     local record = {}
168     for k,v in pairs(sensor) do
169       storeRecord(sensor_id,sensor_type,k,v)
170       printLog("Type = "..sensor_type..", ID = "..sensor_id..", Param = "..k..", Value = \""..v.."\"")
171       if web_url then
172         submitValue(sensor_type,sensor_id,k,v)
173       end  
174       if mqtt_client then
175         mqtt_path=string.gsub(mqtt_topic,"{(.-)}", 
176           function (name) 
177             if name=="dev" then
178               return mqtt_encode(web_devid)
179             elseif name=="type" then
180               return mqtt_encode(sensor_type)
181             elseif name=="id" then
182               return mqtt_encode(sensor_id)
183             elseif name=="param" then
184               return k
185             else
186               return '{'..name..'}'
187             end      
188           end)
189         if not mqtt_client:socket() then
190           mqtt_client:reconnect()
191         end  
192         mqtt_client:publish(mqtt_path,v)
193         mqtt_client:loop()
194       end  
195     end
196   else
197     printLog("Cannot parse sensor input: "..str)
198   end
199
200 end
201
202 function processLine(str)
203
204   msg=split(line,':')
205   msg_type=msg[1] or ''
206   msg_body=msg[2] or ''
207   if msg_type=="STATUS" then
208     printLog("Status: "..msg_body)
209   elseif msg_type=="ERROR" then
210     printLog("Error: "..msg_body)  
211   elseif msg_type=="SENSOR" then
212     printLog("SENSOR: "..msg_body)  
213     sens = split(msg_body,",")
214     sensor = {}
215     idx = 1
216     sensor_type = nil
217     sensor_id = web_devid
218     for i,rec in ipairs(sens) do
219       recrd=split(rec,'=')
220       key=recrd[1] or ''
221       value=recrd[2] or ''
222       if value then
223         if key=="TYPE" then
224           sensor_type=value
225         elseif key=="ID" then
226           sensor_id=value
227         else
228           sensor[key]=value
229         end
230       end
231     end
232     if not (sensor_type==nil or sensor_id==nil or sensor_type=='' or sensor_id=='') then
233       local record = {}
234       for k,v in pairs(sensor) do
235         storeRecord(sensor_id,sensor_type,k,v)
236         printLog("Type = "..sensor_type..", ID = "..sensor_id..", Param = "..k..", Value = "..v)
237         if web_url then 
238           submitValue(sensor_type,sensor_id,k,v)
239         end  
240         if mqtt_client then
241           mqtt_path=string.gsub(mqtt_topic,"{(.-)}", 
242             function (name) 
243               if name=="dev" then
244                 return web_devid
245               elseif name=="type" then
246                 return sensor_type
247               elseif name=="id" then
248                 return sensor_id
249               elseif name=="param" then
250                 return k
251               else
252                 return '{'..name..'}'
253               end      
254             end)
255           mqtt_client:publish(mqtt_path,v)
256           mqtt_client:loop()
257         end  
258       end
259     else
260       printLog("Cannot parse sensor input: "..msg_body)
261     end
262   elseif msg_type=="ALARM" then
263     printLog("ALARM: "..msg_body)  
264     sens = split(msg_body,",")
265     sensor = {}
266     idx = 1
267     sensor_type = nil
268     sensor_id = web_devid
269     mqtt_param = {}
270     for i,rec in ipairs(sens) do
271       recrd=split(rec,'=')
272       key=recrd[1] or ''
273       value=recrd[2] or ''
274       if value then
275         if key=="TYPE" then
276           alarm_type=value
277         elseif key=="ID" then
278           alarm_id=value
279         end
280       end
281     end
282     if not (alarm_type==nil or alarm_id==nil or alarm_type=='' or alarm_id=='') then
283       if mqtt_client then
284         mqtt_path=string.gsub(mqtt_alarm_topic,"{(.-)}", 
285           function (name) 
286             if name=="dev" then
287               return web_devid
288             elseif name=="type" then
289               return sensor_type
290             elseif name=="id" then
291               return sensor_id
292             else
293               return '{'..name..'}'
294             end      
295           end)
296         mqtt_client:publish(mqtt_path,msg_body)
297         mqtt_client:loop()
298       end
299       if alarm_exec then
300         command=alarm_exec..
301           " \""..string.gsub(alarm_type,"\"","\\\"")..
302           "\" \""..string.gsub(alarm_id,"\"","\\\"")..
303           "\" \""..string.gsub(msg_body,"\"","\\\"").."\""
304         capture(command)
305       end
306     else
307       printLog("Cannot parse alarm input: "..msg_body)
308     end
309   end
310
311 end
312
313 getConfig(arg[1])
314
315 if backlogdb or logdb then
316   local dbdriver = require "luasql.sqlite3"
317   env = assert(dbdriver.sqlite3())
318 end
319
320 if backlogdb then
321   if not file_exists(backlogdb) then
322     touch(backlogdb)
323   end  
324   backlog_con = assert(env:connect(backlogdb))
325   backlog_con:execute("CREATE TABLE queue(time_stamp datetime,sensor_id varchar(16),sensor varchar(16),param varchar(16),value float)")
326 end
327
328 if logdb then
329   if not file_exists(logdb) then
330     touch(logdb)
331   end  
332   log_con = assert(env:connect(logdb))
333   log_con:execute("CREATE TABLE log(time_stamp datetime,sensor_id varchar(16),sensor varchar(16),param varchar(16),value float)")
334   log_con:execute("CREATE INDEX log_idx ON log(sensor_id,sensor,param,time_stamp)")
335 end
336
337 if web_url then
338   http = require("socket.http")
339   socket = require("socket")
340 end
341
342 if mqtt_host then
343   MQTT = require "mosquitto"
344   mqtt_client = MQTT.new(mqtt_id)
345   if mqtt_user then
346     mqtt_client:login_set(mqtt_user, mqtt_passwd)
347   end
348   mqtt_client:connect(mqtt_host,mqtt_port)
349 end
350
351 if serial_port then
352   serialin=io.open(serial_port,"r")
353 elseif input_file == "-" then
354   serialin=io.stdin;
355 elseif input_file then
356   serialin=io.open(input_file,"r")
357 elseif input_exec then
358   serialin=io.popen(input_exec,"r")
359 else
360   printLog("No input selected")
361   return
362 end  
363
364 serialin:setvbuf('no')
365
366 records = {}
367
368 while 1 do
369
370   line=serialin:read("*l")
371
372   if line == nil then
373     break
374   end
375
376   pcall(function ()
377   
378     printLog("Received: "..line)
379     if startswith(line,'{') then
380       processJson(line)
381     else
382       processLine(line)
383     end
384
385     if touch_file then
386       pcall(function () 
387         touch(touch_file) 
388       end)
389     end  
390
391     if dump_file then
392       pcall(function ()
393         local f = io.open(dump_file,"w")
394         io.output(f)
395         io.write(json.encode(records))
396         io.close(f)
397       end)  
398     end
399   end)
400     
401 end