#!/usr/bin/lua local json = require("json") local signal = require("posix.signal") require "wm_util" function getConfig(configname) local command,f local uci=require("uci") local cur=uci.cursor() local config if configname then config=configname else config="weathermon" end web_url = cur.get(config,"web","url") web_user = cur.get(config,"web","user") web_timeout = cur.get(config,"web","timeout") web_pass = cur.get(config,"web","password") if not web_timeout then web_timeout = 10 end web_devid = get_devid(config) logging = cur.get(config,"logging","enabled") touch_file = cur.get(config,"logging","touch_file") backlogdb = cur.get(config,"process","backlogdb") logdb = cur.get(config,"process","logdb") serial_port = cur.get(config,"serial","port") serial_baud = cur.get(config,"serial","baud") input_file = cur.get(config,"input","file") input_exec = cur.get(config,"input","exec") alarm_exec = cur.get(config,"alarm","exec") if serial_port then command = "stty -F "..serial_port.." "..serial_baud capture(command) end mqtt_host = cur.get(config,"mqtt","host") mqtt_port = cur.get(config,"mqtt","port") mqtt_id = cur.get(config,"mqtt","id") mqtt_topic = cur.get(config,"mqtt","topic") mqtt_alarm_topic = cur.get(config,"mqtt","alarm_topic") mqtt_user = cur.get(config,"mqtt","user") mqtt_passwd = cur.get(config,"mqtt","password") if mqtt_host and not mqtt_id then mqtt_id="weather-"..web_devid end if mqtt_host and not mqtt_port then mqtt_port = 1883 end if mqtt_host and not mqtt_topic then mqtt_topic = 'weathermon/{dev}/{type}/{id}/{param}' end if mqtt_host and not mqtt_alarm_topic then mqtt_alarm_topic = 'alarm/{dev}/{type}/{id}' end dump_file = cur.get(config,"process","dump_file") end function printLog(str) if logging=="on" then capture("logger -t weathermon "..str) print(str) elseif logging=="syslog" then capture("logger -t weathermon "..str) elseif logging=="stdout" then print(str) end end function unlock_db(file) print("Unlocking DB "..file) os.execute("sqlite3 -readonly \""..file.."\" \".backup /tmp/weathermon.db\"") os.execute("mv /tmp/weathermon.db \""..file.."\"") end function submitValue(type,id,param,val) val = tonumber(val) if web_url and val then local url = web_url.."?stype="..url_encode(type).."&sid="..url_encode(id).."¶m="..url_encode(param).."&value="..url_encode(val) if web_user then url = url:gsub("//","//"..web_user..":"..web_pass.."@",1) end local result,code = http.request (url) if code ~= 200 and backlogdb then printLog("writing record to backlog...") local backlog_con = assert(env:connect(backlogdb)) local n,err = backlog_con:execute(string.format("INSERT INTO queue(time_stamp,sensor_id,sensor,param,value) VALUES (datetime('now','localtime'),'%s','%s','%s',%f)",id,type,param,val)) backlog_con:close() if err == "LuaSQL: database is locked" then unlock_db(backlogdb); end end end if logdb then print(logdb) local log_con = assert(env:connect(logdb)) local n,err = log_con:execute(string.format("INSERT INTO log(time_stamp,sensor_id,sensor,param,value) VALUES (datetime('now','localtime'),'%s','%s','%s',%f)",id,type,param,val)) log_con:close() if err == "LuaSQL: database is locked" then unlock_db(logdb); end end end function storeRecord(id,sensor,param,value) if not records[id] then records[id] = {} end records[id]["timestamp"] = os.date("%Y-%m-%dT%H:%M:%S") if not records[id][sensor] then records[id][sensor] = {} end records[id][sensor][param] = value end function processJson(str) msg=json.decode(str) sensor={} for key,value in pairs(msg) do if value then if key=="model" or key=="device" then sensor_type=value elseif key=="id" then sensor_id=value elseif key=='time' then sensor_time=value else sensor[key]=value end end end if not sensor_id then sensor_id = web_devid end if not (sensor_type==nil or sensor_id==nil or sensor_type=='' or sensor_id=='') then local record = {} for k,v in pairs(sensor) do storeRecord(sensor_id,sensor_type,k,v) printLog("Type = "..sensor_type..", ID = "..sensor_id..", Param = "..k..", Value = \""..v.."\"") if web_url then submitValue(sensor_type,sensor_id,k,v) end if mqtt_client then mqtt_path=string.gsub(mqtt_topic,"{(.-)}", function (name) if name=="dev" then return mqtt_encode(web_devid) elseif name=="type" then return mqtt_encode(sensor_type) elseif name=="id" then return mqtt_encode(sensor_id) elseif name=="param" then return k else return '{'..name..'}' end end) if not mqtt_client:socket() then mqtt_client:reconnect() end mqtt_client:publish(mqtt_path,v,0,false) mqtt_client:loop() end end else printLog("Cannot parse sensor input: "..str) end end function processLine(str) msg=split(line,':') msg_type=msg[1] or '' msg_body=msg[2] or '' if msg_type=="STATUS" then printLog("Status: "..msg_body) elseif msg_type=="ERROR" then printLog("Error: "..msg_body) elseif msg_type=="SENSOR" then printLog("SENSOR: "..msg_body) sens = split(msg_body,",") sensor = {} idx = 1 sensor_type = nil sensor_id = web_devid for i,rec in ipairs(sens) do recrd=split(rec,'=') key=recrd[1] or '' value=recrd[2] or '' if value then if key=="TYPE" then sensor_type=value elseif key=="ID" then sensor_id=value else sensor[key]=value end end end if not (sensor_type==nil or sensor_id==nil or sensor_type=='' or sensor_id=='') then local record = {} for k,v in pairs(sensor) do storeRecord(sensor_id,sensor_type,k,v) printLog("Type = "..sensor_type..", ID = "..sensor_id..", Param = "..k..", Value = "..v) if web_url then submitValue(sensor_type,sensor_id,k,v) end if mqtt_client then mqtt_path=string.gsub(mqtt_topic,"{(.-)}", function (name) if name=="dev" then return web_devid elseif name=="type" then return sensor_type elseif name=="id" then return sensor_id elseif name=="param" then return k else return '{'..name..'}' end end) if not mqtt_client:socket() then mqtt_client:reconnect() end mqtt_client:publish(mqtt_path,v,0,false) mqtt_client:loop() end end else printLog("Cannot parse sensor input: "..msg_body) end elseif msg_type=="ALARM" then printLog("ALARM: "..msg_body) sens = split(msg_body,",") sensor = {} idx = 1 sensor_type = nil sensor_id = web_devid mqtt_param = {} for i,rec in ipairs(sens) do recrd=split(rec,'=') key=recrd[1] or '' value=recrd[2] or '' if value then if key=="TYPE" then alarm_type=value elseif key=="ID" then alarm_id=value end end end if not (alarm_type==nil or alarm_id==nil or alarm_type=='' or alarm_id=='') then if mqtt_client then mqtt_path=string.gsub(mqtt_alarm_topic,"{(.-)}", function (name) if name=="dev" then return web_devid elseif name=="type" then return sensor_type elseif name=="id" then return sensor_id else return '{'..name..'}' end end) if not mqtt_client:socket() then mqtt_client:reconnect() end mqtt_client:publish(mqtt_path,msg_body,0,false) mqtt_client:loop() end if alarm_exec then command=alarm_exec.. " \""..string.gsub(alarm_type,"\"","\\\"").. "\" \""..string.gsub(alarm_id,"\"","\\\"").. "\" \""..string.gsub(msg_body,"\"","\\\"").."\"" capture(command) end else printLog("Cannot parse alarm input: "..msg_body) end end end getConfig(arg[1]) signal.signal(signal.SIGTERM, function(signum) printLog("Terminating...") local pids = get_children() for k,v in pairs(pids) do printLog("Terminating subprocess "..tostring(v).."...") signal.kill(v,signal.SIGTERM) end printLog("Exiting...") os.exit(0) end) if backlogdb or logdb then local dbdriver = require "luasql.sqlite3" env = assert(dbdriver.sqlite3()) end if backlogdb then if not file_exists(backlogdb) then touch(backlogdb) end local backlog_con = assert(env:connect(backlogdb)) backlog_con:execute("CREATE TABLE queue(time_stamp datetime,sensor_id varchar(16),sensor varchar(16),param varchar(16),value float)") backlog_con:close() end if logdb then if not file_exists(logdb) then touch(logdb) end local log_con = assert(env:connect(logdb)) log_con:execute("CREATE TABLE log(time_stamp datetime,sensor_id varchar(16),sensor varchar(16),param varchar(16),value float)") log_con:execute("CREATE INDEX log_idx ON log(sensor_id,sensor,param,time_stamp)") log_con:close() end if web_url then http = require("socket.http") http.TIMEOUT = web_timeout end if mqtt_host then MQTT = require "mosquitto" mqtt_client = MQTT.new(mqtt_id) if mqtt_user then mqtt_client:login_set(mqtt_user, mqtt_passwd) end mqtt_client:connect(mqtt_host,mqtt_port) end if serial_port then serialin=io.open(serial_port,"r") elseif input_file == "-" then serialin=io.stdin; elseif input_file then serialin=io.open(input_file,"r") elseif input_exec then serialin=io.popen(input_exec,"r") else printLog("No input selected") return end serialin:setvbuf('no') records = {} while 1 do line=serialin:read("*l") if line == nil then break end -- pcall(function () printLog("Received: "..line) if startswith(line,'{') then processJson(line) else processLine(line) end if touch_file then pcall(function () touch(touch_file) end) end if dump_file then pcall(function () local f = io.open(dump_file,"w") io.output(f) io.write(json.encode(records)) io.close(f) end) end -- end) end