Всегда пытаемся создать таблицы при старте - поможет от пустого файла в бэкапе и...
[weathermon.git] / bin / weathermon
1 #!/usr/bin/lua
2
3 local json = require("json")
4 local socket = require("socket")
5
6 local http = require("socket.http")
7
8 require "wm_util"
9
10 function getConfig(configname)
11
12   local command,f
13
14   local uci=require("uci")
15   local cur=uci.cursor()
16   local config
17   if configname then
18     config=configname
19   else
20     config="weathermon"
21   end
22
23   web_url  = cur.get(config,"web","url")
24   web_user = cur.get(config,"web","user")
25   web_timeout = cur.get(config,"web","timeout")
26   web_pass = cur.get(config,"web","password")
27   web_devid = cur.get(config,"web","devid")
28
29   web_iface = cur.get(config,"web","iface")
30  
31   if not web_timeout then
32     web_timeout = 10
33   end
34
35   if web_iface then
36   
37     command = '/sbin/ifconfig '..web_iface..' | grep \'\\<inet\\>\' | sed -n \'1p\' | tr -s \' \' | cut -d \' \' -f3 | cut -d \':\' -f2'
38     f=io.popen(command,'r')
39     ip_addr=f:read()
40   
41   end
42
43   if not web_devid then
44   
45     if web_iface then
46       io.input("/sys/class/net/"..web_iface.."/address")
47     else
48       io.input("/sys/class/net/eth0/address")
49     end
50
51     local mac = io.read("*line")
52     mac = mac:gsub(":","")
53     mac = mac:upper()
54
55     web_devid = mac
56
57   end
58
59   logging = cur.get(config,"logging","enabled") 
60   touch_file = cur.get(config,"logging","touch_file") 
61
62   backlogdb = cur.get(config,"process","backlogdb")
63   logdb = cur.get(config,"process","logdb")
64
65   serial_port = cur.get(config,"serial","port")
66   serial_baud = cur.get(config,"serial","baud")
67
68   input_file = cur.get(config,"input","file")
69   input_exec = cur.get(config,"input","exec")
70   alarm_exec = cur.get(config,"alarm","exec")
71
72   if serial_port then
73
74     command = "stty -F  "..serial_port.." "..serial_baud
75     capture(command)
76
77   end
78
79   mqtt_host = cur.get(config,"mqtt","host")
80   mqtt_port = cur.get(config,"mqtt","port")
81   mqtt_id = cur.get(config,"mqtt","id")
82   mqtt_topic = cur.get(config,"mqtt","topic")
83   mqtt_alarm_topic = cur.get(config,"mqtt","alarm_topic")
84
85   mqtt_user = cur.get(config,"mqtt","user")
86   mqtt_passwd = cur.get(config,"mqtt","password")
87
88   if mqtt_host and not mqtt_id then
89     mqtt_id="weather-"..web_devid
90   end
91
92   if mqtt_host and not mqtt_port then
93     mqtt_port = 1883
94   end
95
96   if mqtt_host and not mqtt_topic then
97     mqtt_topic = 'weathermon/{dev}/{type}/{id}/{param}'
98   end
99
100   if mqtt_host and not mqtt_alarm_topic then
101     mqtt_alarm_topic = 'alarm/{dev}/{type}/{id}'
102   end
103
104   dump_file = cur.get(config,"process","dump_file")
105
106 end
107
108 function printLog(str)
109   if logging=="on" then
110     capture("logger -t weathermon "..str)
111     print(str)  
112   elseif logging=="syslog" then
113     capture("logger -t weathermon "..str)
114   elseif logging=="stdout" then 
115     print(str)  
116   end 
117 end
118
119 function submitValue(type,id,param,val)
120
121   val = tonumber(val)
122
123   if web_url and val then
124
125     local url = web_url.."?stype="..url_encode(type).."&sid="..url_encode(id).."&param="..url_encode(param).."&value="..url_encode(val)
126
127     if web_user then
128       url = url:gsub("//","//"..web_user..":"..web_pass.."@",1)
129     end
130
131     local result,code = http.request ({
132       url=url, create=function()
133         local req_sock = socket.tcp()
134         req_sock:settimeout(web_timeout)
135         return req_sock
136       end})
137
138     if code ~= 200 and backlog_con then
139       printLog("writing record to backlog...")
140       backlog_con:execute(string.format("INSERT INTO queue(time_stamp,sensor_id,sensor,param,value) VALUES (datetime('now','localtime'),'%s','%s','%s',%f)",id,type,param,val))
141     end
142
143   end
144   
145   if logdb then
146     log_con:execute(string.format("INSERT INTO log(time_stamp,sensor_id,sensor,param,value) VALUES (datetime('now','localtime'),'%s','%s','%s',%f)",id,type,param,val))
147   end
148
149   if touch_file then
150     touch(touch_file)
151   end  
152   
153 end
154
155 function storeRecord(id,sensor,param,value) 
156
157   if not records[id] then
158     records[id] = {}
159   end  
160   
161   records[id]["timestamp"] = os.date("%Y-%m-%dT%H:%M:%S")
162   
163   if not records[id][sensor] then
164     records[id][sensor] = {}
165   end
166   
167   records[id][sensor][param] = value
168
169 end
170
171 function processJson(str)
172
173   msg=json.decode(str)
174
175   sensor={}
176
177   for key,value in pairs(msg) do
178     if value then
179       if key=="model" or key=="device" then
180         sensor_type=value
181       elseif key=="id" then
182         sensor_id=value
183       elseif key=='time' then
184         sensor_time=value
185       else
186         sensor[key]=value
187       end
188     end
189   end
190
191   if not sensor_id then
192     sensor_id = web_devid
193   end
194
195   if not (sensor_type==nil or sensor_id==nil or sensor_type=='' or sensor_id=='') then
196     if next(sensor)==nil then
197       sensor["command"]="alarm"
198     end
199     local record = {}
200     for k,v in pairs(sensor) do
201       storeRecord(sensor_id,sensor_type,k,v)
202       printLog("Type = "..sensor_type..", ID = "..sensor_id..", Param = "..k..", Value = \""..v.."\"")
203       submitValue(sensor_type,sensor_id,k,v)
204       if mqtt_client then
205         mqtt_path=string.gsub(mqtt_topic,"{(.-)}", 
206           function (name) 
207             if name=="dev" then
208               return mqtt_encode(web_devid)
209             elseif name=="type" then
210               return mqtt_encode(sensor_type)
211             elseif name=="id" then
212               return mqtt_encode(sensor_id)
213             elseif name=="param" then
214               return k
215             else
216               return '{'..name..'}'
217             end      
218           end)
219         mqtt_client:connect(mqtt_host,mqtt_port)
220         mqtt_client:publish(mqtt_path,v)
221         mqtt_client:disconnect()
222       end  
223     end
224   else
225     printLog("Cannot parse sensor input: "..str)
226   end
227
228 end
229
230 function processLine(str)
231
232   msg=split(line,':')
233   msg_type=msg[1] or ''
234   msg_body=msg[2] or ''
235   if msg_type=="STATUS" then
236     printLog("Status: "..msg_body)
237   elseif msg_type=="ERROR" then
238     printLog("Error: "..msg_body)  
239   elseif msg_type=="SENSOR" then
240     printLog("SENSOR: "..msg_body)  
241     sens = split(msg_body,",")
242     sensor = {}
243     idx = 1
244     sensor_type = nil
245     sensor_id = web_devid
246     for i,rec in ipairs(sens) do
247       recrd=split(rec,'=')
248       key=recrd[1] or ''
249       value=recrd[2] or ''
250       if value then
251         if key=="TYPE" then
252           sensor_type=value
253         elseif key=="ID" then
254           sensor_id=value
255         else
256           sensor[key]=value
257         end
258       end
259     end
260     if not (sensor_type==nil or sensor_id==nil or sensor_type=='' or sensor_id=='') then
261       local record = {}
262       for k,v in pairs(sensor) do
263         storeRecord(sensor_id,sensor_type,k,v)
264         printLog("Type = "..sensor_type..", ID = "..sensor_id..", Param = "..k..", Value = "..v)
265         submitValue(sensor_type,sensor_id,k,v)
266         if mqtt_client then
267           mqtt_path=string.gsub(mqtt_topic,"{(.-)}", 
268             function (name) 
269               if name=="dev" then
270                 return web_devid
271               elseif name=="type" then
272                 return sensor_type
273               elseif name=="id" then
274                 return sensor_id
275               elseif name=="param" then
276                 return k
277               else
278                 return '{'..name..'}'
279               end      
280             end)
281           mqtt_client:connect(mqtt_host,mqtt_port)
282           mqtt_client:publish(mqtt_path,v)
283           mqtt_client:disconnect()
284         end  
285       end
286     else
287       printLog("Cannot parse sensor input: "..msg_body)
288     end
289   elseif msg_type=="ALARM" then
290     printLog("ALARM: "..msg_body)  
291     sens = split(msg_body,",")
292     sensor = {}
293     idx = 1
294     sensor_type = nil
295     sensor_id = web_devid
296     mqtt_param = {}
297     for i,rec in ipairs(sens) do
298       recrd=split(rec,'=')
299       key=recrd[1] or ''
300       value=recrd[2] or ''
301       if value then
302         if key=="TYPE" then
303           alarm_type=value
304         elseif key=="ID" then
305           alarm_id=value
306         end
307       end
308     end
309     if not (alarm_type==nil or alarm_id==nil or alarm_type=='' or alarm_id=='') then
310       if mqtt_client then
311         mqtt_path=string.gsub(mqtt_alarm_topic,"{(.-)}", 
312           function (name) 
313             if name=="dev" then
314               return web_devid
315             elseif name=="type" then
316               return sensor_type
317             elseif name=="id" then
318               return sensor_id
319             else
320               return '{'..name..'}'
321             end      
322           end)
323         mqtt_client:connect(mqtt_host,mqtt_port)
324         mqtt_client:publish(mqtt_path,msg_body)
325         mqtt_client:disconnect()
326       end
327       if alarm_exec then
328         command=alarm_exec..
329           " \""..string.gsub(alarm_type,"\"","\\\"")..
330           "\" \""..string.gsub(alarm_id,"\"","\\\"")..
331           "\" \""..string.gsub(msg_body,"\"","\\\"").."\""
332         capture(command)
333       end
334     else
335       printLog("Cannot parse alarm input: "..msg_body)
336     end
337   end
338
339 end
340
341 getConfig(arg[1])
342
343 if backlogdb or logdb then
344   local dbdriver = require "luasql.sqlite3"
345   env = assert(dbdriver.sqlite3())
346 end
347
348 if backlogdb then
349   if not file_exists(backlogdb) then
350     touch(backlogdb)
351   end  
352   backlog_con = assert(env:connect(backlogdb))
353   backlog_con:execute("CREATE TABLE queue(time_stamp datetime,sensor_id varchar(16),sensor varchar(16),param varchar(16),value float)")
354 end
355
356 if logdb then
357   if not file_exists(logdb) then
358     touch(logdb)
359   end  
360   log_con = assert(env:connect(logdb))
361   log_con:execute("CREATE TABLE log(time_stamp datetime,sensor_id varchar(16),sensor varchar(16),param varchar(16),value float)")
362   log_con:execute("CREATE INDEX log_idx ON log(sensor_id,sensor,param,time_stamp)")
363 end
364
365 if mqtt_host then
366   MQTT = require "mosquitto"
367   mqtt_client = MQTT.new(mqtt_id)
368   if mqtt_user then
369     mqtt_client:login_set(mqtt_user, mqtt_passwd)
370   end
371 end
372
373 if serial_port then
374   serialin=io.open(serial_port,"r")
375 elseif input_file == "-" then
376   serialin=io.stdin;
377 elseif input_file then
378   serialin=io.open(input_file,"r")
379 elseif input_exec then
380   serialin=io.popen(input_exec,"r")
381 else
382   printLog("No input selected")
383   return
384 end  
385
386 serialin:setvbuf('no')
387
388 records = {}
389
390 while 1 do
391
392   line=serialin:read("*l")
393
394   if line == nil then
395     break
396   end
397
398   pcall(function ()
399   
400     printLog("Received: "..line)
401     if startswith(line,'{') then
402       processJson(line)
403     else
404       processLine(line)
405     end
406
407     if dump_file then
408       local f = io.open(dump_file,"w")
409       io.output(f)
410       io.write(json.encode(records))
411       io.close(f)
412     end
413   end)
414     
415 end