X-Git-Url: https://git.rvb.name/weathermon.git/blobdiff_plain/e32107a7fe79ce34f3bdf860410a6d5455efdca7..d563aba7f65b64c452a6e69af6dc92354422bec6:/bin/weathermon diff --git a/bin/weathermon b/bin/weathermon index 03fad42..c789bea 100755 --- a/bin/weathermon +++ b/bin/weathermon @@ -1,9 +1,7 @@ #!/usr/bin/lua local json = require("json") -local socket = require("socket") - -local http = require("socket.http") +local signal = require("posix.signal") require "wm_util" @@ -24,37 +22,12 @@ function getConfig(configname) web_user = cur.get(config,"web","user") web_timeout = cur.get(config,"web","timeout") web_pass = cur.get(config,"web","password") - web_devid = cur.get(config,"web","devid") - - web_iface = cur.get(config,"web","iface") if not web_timeout then web_timeout = 10 end - if web_iface then - - command = '/sbin/ifconfig '..web_iface..' | grep \'\\\' | sed -n \'1p\' | tr -s \' \' | cut -d \' \' -f3 | cut -d \':\' -f2' - f=io.popen(command,'r') - ip_addr=f:read() - - end - - if not web_devid then - - if web_iface then - io.input("/sys/class/net/"..web_iface.."/address") - else - io.input("/sys/class/net/eth0/address") - end - - local mac = io.read("*line") - mac = mac:gsub(":","") - mac = mac:upper() - - web_devid = mac - - end + web_devid = get_devid(config) logging = cur.get(config,"logging","enabled") touch_file = cur.get(config,"logging","touch_file") @@ -116,9 +89,19 @@ function printLog(str) end end +function unlock_db(file) + + print("Unlocking DB "..file) + os.execute("sqlite3 -readonly \""..file.."\" \".backup /tmp/weathermon.db\"") + os.execute("mv /tmp/weathermon.db \""..file.."\"") + +end + function submitValue(type,id,param,val) - if web_url then + val = tonumber(val) + + if web_url and val then local url = web_url.."?stype="..url_encode(type).."&sid="..url_encode(id).."¶m="..url_encode(param).."&value="..url_encode(val) @@ -126,28 +109,34 @@ function submitValue(type,id,param,val) url = url:gsub("//","//"..web_user..":"..web_pass.."@",1) end - local result,code = http.request ({ - url=url, create=function() - local req_sock = socket.tcp() - req_sock:settimeout(web_timeout) - return req_sock - end}) + local result,code = http.request (url) - if code ~= 200 then - print("writing record to backlog...") - backlog_con:execute(string.format("INSERT INTO queue(time_stamp,sensor_id,sensor,param,value) VALUES (datetime('now','localtime'),'%s','%s','%s',%f)",id,type,param,val)) + if code ~= 200 and backlogdb then + printLog("writing record to backlog...") + local backlog_con = assert(env:connect(backlogdb)) + local n,err = backlog_con:execute(string.format("INSERT INTO queue(time_stamp,sensor_id,sensor,param,value) VALUES (datetime('now','localtime'),'%s','%s','%s',%f)",id,type,param,val)) + backlog_con:close() + + if err == "LuaSQL: database is locked" then + unlock_db(backlogdb); + end + end end if logdb then - log_con:execute(string.format("INSERT INTO log(time_stamp,sensor_id,sensor,param,value) VALUES (datetime('now','localtime'),'%s','%s','%s',%f)",id,type,param,val)) + print(logdb) + local log_con = assert(env:connect(logdb)) + local n,err = log_con:execute(string.format("INSERT INTO log(time_stamp,sensor_id,sensor,param,value) VALUES (datetime('now','localtime'),'%s','%s','%s',%f)",id,type,param,val)) + log_con:close() + + if err == "LuaSQL: database is locked" then + unlock_db(logdb); + end + end - if touch_file then - touch(touch_file) - end - end function storeRecord(id,sensor,param,value) @@ -191,14 +180,13 @@ function processJson(str) end if not (sensor_type==nil or sensor_id==nil or sensor_type=='' or sensor_id=='') then - if next(sensor)==nil then - sensor["command"]="alarm" - end local record = {} for k,v in pairs(sensor) do storeRecord(sensor_id,sensor_type,k,v) printLog("Type = "..sensor_type..", ID = "..sensor_id..", Param = "..k..", Value = \""..v.."\"") - submitValue(sensor_type,sensor_id,k,v) + if web_url then + submitValue(sensor_type,sensor_id,k,v) + end if mqtt_client then mqtt_path=string.gsub(mqtt_topic,"{(.-)}", function (name) @@ -214,9 +202,11 @@ function processJson(str) return '{'..name..'}' end end) - mqtt_client:connect(mqtt_host,mqtt_port) - mqtt_client:publish(mqtt_path,v) - mqtt_client:disconnect() + if not mqtt_client:socket() then + mqtt_client:reconnect() + end + mqtt_client:publish(mqtt_path,v,0,false) + mqtt_client:loop() end end else @@ -260,7 +250,9 @@ function processLine(str) for k,v in pairs(sensor) do storeRecord(sensor_id,sensor_type,k,v) printLog("Type = "..sensor_type..", ID = "..sensor_id..", Param = "..k..", Value = "..v) - submitValue(sensor_type,sensor_id,k,v) + if web_url then + submitValue(sensor_type,sensor_id,k,v) + end if mqtt_client then mqtt_path=string.gsub(mqtt_topic,"{(.-)}", function (name) @@ -276,9 +268,11 @@ function processLine(str) return '{'..name..'}' end end) - mqtt_client:connect(mqtt_host,mqtt_port) - mqtt_client:publish(mqtt_path,v) - mqtt_client:disconnect() + if not mqtt_client:socket() then + mqtt_client:reconnect() + end + mqtt_client:publish(mqtt_path,v,0,false) + mqtt_client:loop() end end else @@ -318,9 +312,11 @@ function processLine(str) return '{'..name..'}' end end) - mqtt_client:connect(mqtt_host,mqtt_port) - mqtt_client:publish(mqtt_path,msg_body) - mqtt_client:disconnect() + if not mqtt_client:socket() then + mqtt_client:reconnect() + end + mqtt_client:publish(mqtt_path,msg_body,0,false) + mqtt_client:loop() end if alarm_exec then command=alarm_exec.. @@ -338,6 +334,19 @@ end getConfig(arg[1]) +signal.signal(signal.SIGTERM, function(signum) + + printLog("Terminating...") + local pids = get_children() + for k,v in pairs(pids) do + printLog("Terminating subprocess "..tostring(v).."...") + signal.kill(v,signal.SIGTERM) + end + printLog("Exiting...") + os.exit(0) + +end) + if backlogdb or logdb then local dbdriver = require "luasql.sqlite3" env = assert(dbdriver.sqlite3()) @@ -346,22 +355,25 @@ end if backlogdb then if not file_exists(backlogdb) then touch(backlogdb) - backlog_con = assert(env:connect(backlogdb)) - backlog_con:execute("CREATE TABLE queue(time_stamp datetime,sensor_id varchar(16),sensor varchar(16),param varchar(16),value float)") - else - backlog_con = assert(env:connect(backlogdb)) - end + end + local backlog_con = assert(env:connect(backlogdb)) + backlog_con:execute("CREATE TABLE queue(time_stamp datetime,sensor_id varchar(16),sensor varchar(16),param varchar(16),value float)") + backlog_con:close() end if logdb then if not file_exists(logdb) then touch(logdb) - log_con = assert(env:connect(logdb)) - log_con:execute("CREATE TABLE log(time_stamp datetime,sensor_id varchar(16),sensor varchar(16),param varchar(16),value float)") - log_con:execute("CREATE INDEX log_idx ON log(sensor_id,sensor,param,time_stamp)") - else - log_con = assert(env:connect(logdb)) - end + end + local log_con = assert(env:connect(logdb)) + log_con:execute("CREATE TABLE log(time_stamp datetime,sensor_id varchar(16),sensor varchar(16),param varchar(16),value float)") + log_con:execute("CREATE INDEX log_idx ON log(sensor_id,sensor,param,time_stamp)") + log_con:close() +end + +if web_url then + http = require("socket.http") + http.TIMEOUT = web_timeout end if mqtt_host then @@ -370,6 +382,7 @@ if mqtt_host then if mqtt_user then mqtt_client:login_set(mqtt_user, mqtt_passwd) end + mqtt_client:connect(mqtt_host,mqtt_port) end if serial_port then @@ -385,24 +398,41 @@ else return end +serialin:setvbuf('no') + records = {} while 1 do + line=serialin:read("*l") + if line == nil then break end - printLog("Received: "..line); - if startswith(line,'{') then - processJson(line) - else - processLine(line) - end - if dump_file then - local f = io.open(dump_file,"w") - io.output(f) - io.write(json.encode(records)) - io.close(f) - end +-- pcall(function () + + printLog("Received: "..line) + if startswith(line,'{') then + processJson(line) + else + processLine(line) + end + + if touch_file then + pcall(function () + touch(touch_file) + end) + end + + if dump_file then + pcall(function () + local f = io.open(dump_file,"w") + io.output(f) + io.write(json.encode(records)) + io.close(f) + end) + end +-- end) + end