3 local json = require("json")
4 local signal = require("posix.signal")
8 function getConfig(configname)
12 local uci=require("uci")
13 local cur=uci.cursor()
21 web_url = cur.get(config,"web","url")
22 web_user = cur.get(config,"web","user")
23 web_timeout = cur.get(config,"web","timeout")
24 web_pass = cur.get(config,"web","password")
26 if not web_timeout then
30 web_devid = get_devid(config)
32 logging = cur.get(config,"logging","enabled")
33 touch_file = cur.get(config,"logging","touch_file")
35 backlogdb = cur.get(config,"process","backlogdb")
36 logdb = cur.get(config,"process","logdb")
38 serial_port = cur.get(config,"serial","port")
39 serial_baud = cur.get(config,"serial","baud")
41 input_file = cur.get(config,"input","file")
42 input_exec = cur.get(config,"input","exec")
43 alarm_exec = cur.get(config,"alarm","exec")
47 command = "stty -F "..serial_port.." "..serial_baud
52 mqtt_host = cur.get(config,"mqtt","host")
53 mqtt_port = cur.get(config,"mqtt","port")
54 mqtt_id = cur.get(config,"mqtt","id")
55 mqtt_topic = cur.get(config,"mqtt","topic")
56 mqtt_alarm_topic = cur.get(config,"mqtt","alarm_topic")
58 mqtt_user = cur.get(config,"mqtt","user")
59 mqtt_passwd = cur.get(config,"mqtt","password")
61 if mqtt_host and not mqtt_id then
62 mqtt_id="weather-"..web_devid
65 if mqtt_host and not mqtt_port then
69 if mqtt_host and not mqtt_topic then
70 mqtt_topic = 'weathermon/{dev}/{type}/{id}/{param}'
73 if mqtt_host and not mqtt_alarm_topic then
74 mqtt_alarm_topic = 'alarm/{dev}/{type}/{id}'
77 dump_file = cur.get(config,"process","dump_file")
81 function printLog(str)
83 capture("logger -t weathermon "..str)
85 elseif logging=="syslog" then
86 capture("logger -t weathermon "..str)
87 elseif logging=="stdout" then
92 function unlock_db(file)
94 print("Unlocking DB "..file)
95 os.execute("sqlite3 -readonly \""..file.."\" \".backup /tmp/weathermon.db\"")
96 os.execute("mv /tmp/weathermon.db \""..file.."\"")
100 function submitValue(type,id,param,val)
104 if web_url and val then
106 local url = web_url.."?stype="..url_encode(type).."&sid="..url_encode(id).."¶m="..url_encode(param).."&value="..url_encode(val)
109 url = url:gsub("//","//"..web_user..":"..web_pass.."@",1)
112 local result,code = http.request (url)
114 if code ~= 200 and backlogdb then
115 printLog("writing record to backlog...")
116 local backlog_con = assert(env:connect(backlogdb))
117 backlog_con:execute('BEGIN TRANSACTION')
118 backlog_con:execute(string.format("INSERT INTO queue(time_stamp,sensor_id,sensor,param,value) VALUES (datetime('now','localtime'),'%s','%s','%s',%f)",id,type,param,val))
119 local n,err = backlog_con:execute('COMMIT TRANSACTION')
122 if err == "LuaSQL: database is locked" then
123 unlock_db(backlogdb);
132 local log_con = assert(env:connect(logdb))
133 log_con:execute('BEGIN TRANSACTION')
134 log_con:execute(string.format("INSERT INTO log(time_stamp,sensor_id,sensor,param,value) VALUES (datetime('now','localtime'),'%s','%s','%s',%f)",id,type,param,val))
135 local n,err = log_con:execute('COMMIT TRANSACTION')
140 if err == "LuaSQL: database is locked" then
148 function storeRecord(id,sensor,param,value)
150 if not records[id] then
154 records[id]["timestamp"] = os.date("%Y-%m-%dT%H:%M:%S")
156 if not records[id][sensor] then
157 records[id][sensor] = {}
160 records[id][sensor][param] = value
164 function processJson(str)
170 for key,value in pairs(msg) do
172 if key=="model" or key=="device" then
174 elseif key=="id" then
176 elseif key=='time' then
184 if not sensor_id then
185 sensor_id = web_devid
188 if not (sensor_type==nil or sensor_id==nil or sensor_type=='' or sensor_id=='') then
190 for k,v in pairs(sensor) do
191 storeRecord(sensor_id,sensor_type,k,v)
192 printLog("Type = "..sensor_type..", ID = "..sensor_id..", Param = "..k..", Value = \""..v.."\"")
194 submitValue(sensor_type,sensor_id,k,v)
197 mqtt_path=string.gsub(mqtt_topic,"{(.-)}",
200 return mqtt_encode(web_devid)
201 elseif name=="type" then
202 return mqtt_encode(sensor_type)
203 elseif name=="id" then
204 return mqtt_encode(sensor_id)
205 elseif name=="param" then
208 return '{'..name..'}'
211 if not mqtt_client:socket() then
212 mqtt_client:reconnect()
214 mqtt_client:publish(mqtt_path,v,0,false)
219 printLog("Cannot parse sensor input: "..str)
224 function processLine(str)
227 msg_type=msg[1] or ''
228 msg_body=msg[2] or ''
229 if msg_type=="STATUS" then
230 printLog("Status: "..msg_body)
231 elseif msg_type=="ERROR" then
232 printLog("Error: "..msg_body)
233 elseif msg_type=="SENSOR" then
234 printLog("SENSOR: "..msg_body)
235 sens = split(msg_body,",")
239 sensor_id = web_devid
240 for i,rec in ipairs(sens) do
247 elseif key=="ID" then
254 if not (sensor_type==nil or sensor_id==nil or sensor_type=='' or sensor_id=='') then
256 for k,v in pairs(sensor) do
257 storeRecord(sensor_id,sensor_type,k,v)
258 printLog("Type = "..sensor_type..", ID = "..sensor_id..", Param = "..k..", Value = "..v)
260 submitValue(sensor_type,sensor_id,k,v)
263 mqtt_path=string.gsub(mqtt_topic,"{(.-)}",
267 elseif name=="type" then
269 elseif name=="id" then
271 elseif name=="param" then
274 return '{'..name..'}'
277 if not mqtt_client:socket() then
278 mqtt_client:reconnect()
280 mqtt_client:publish(mqtt_path,v,0,false)
285 printLog("Cannot parse sensor input: "..msg_body)
287 elseif msg_type=="ALARM" then
288 printLog("ALARM: "..msg_body)
289 sens = split(msg_body,",")
293 sensor_id = web_devid
295 for i,rec in ipairs(sens) do
302 elseif key=="ID" then
307 if not (alarm_type==nil or alarm_id==nil or alarm_type=='' or alarm_id=='') then
309 mqtt_path=string.gsub(mqtt_alarm_topic,"{(.-)}",
313 elseif name=="type" then
315 elseif name=="id" then
318 return '{'..name..'}'
321 if not mqtt_client:socket() then
322 mqtt_client:reconnect()
324 mqtt_client:publish(mqtt_path,msg_body,0,false)
329 " \""..string.gsub(alarm_type,"\"","\\\"")..
330 "\" \""..string.gsub(alarm_id,"\"","\\\"")..
331 "\" \""..string.gsub(msg_body,"\"","\\\"").."\""
335 printLog("Cannot parse alarm input: "..msg_body)
343 signal.signal(signal.SIGTERM, function(signum)
345 printLog("Terminating...")
346 local pids = get_children()
347 for k,v in pairs(pids) do
348 printLog("Terminating subprocess "..tostring(v).."...")
349 signal.kill(v,signal.SIGTERM)
351 printLog("Exiting...")
356 if backlogdb or logdb then
357 local dbdriver = require "luasql.sqlite3"
358 env = assert(dbdriver.sqlite3())
362 if not file_exists(backlogdb) then
365 local backlog_con = assert(env:connect(backlogdb))
366 backlog_con:execute("CREATE TABLE queue(time_stamp datetime,sensor_id varchar(16),sensor varchar(16),param varchar(16),value float)")
371 if not file_exists(logdb) then
374 local log_con = assert(env:connect(logdb))
375 log_con:execute("CREATE TABLE log(time_stamp datetime,sensor_id varchar(16),sensor varchar(16),param varchar(16),value float)")
376 log_con:execute("CREATE INDEX log_idx ON log(sensor_id,sensor,param,time_stamp)")
381 http = require("socket.http")
382 http.TIMEOUT = web_timeout
386 MQTT = require "mosquitto"
387 mqtt_client = MQTT.new(mqtt_id)
389 mqtt_client:login_set(mqtt_user, mqtt_passwd)
391 mqtt_client:connect(mqtt_host,mqtt_port)
395 serialin=io.open(serial_port,"r")
396 elseif input_file == "-" then
398 elseif input_file then
399 serialin=io.open(input_file,"r")
400 elseif input_exec then
401 serialin=io.popen(input_exec,"r")
403 printLog("No input selected")
407 serialin:setvbuf('no')
413 line=serialin:read("*l")
421 printLog("Received: "..line)
422 if startswith(line,'{') then
436 local f = io.open(dump_file,"w")
438 io.write(json.encode(records))