From: Roman Bazalevsky Date: Sat, 25 Mar 2017 20:56:25 +0000 (+0300) Subject: Переделана обработка сообщений MQTT в связи с переходом на прошивку Sonoff-Tasmota... X-Git-Url: https://git.rvb.name/weathermon.git/commitdiff_plain/7aaa06f870e4fb4954ec65091cc12f9f0bf58b04?ds=inline Переделана обработка сообщений MQTT в связи с переходом на прошивку Sonoff-Tasmota с JSON-форматом данных от сенсоров и поддержкой более чем одного сенсора на устройстве. --- diff --git a/filter_meteo.py b/filter_meteo.py index d4f0e7d..fc497db 100755 --- a/filter_meteo.py +++ b/filter_meteo.py @@ -35,6 +35,10 @@ def Yesterday(): dt = Today() return dt - datetime.timedelta(days=1) +def Prehistoric(): + dt = datetime.date(2000,01,01) + return dt + def GetData(sid,pid,fromDate=Yesterday(),toDate=Today()): if database: c = database.cursor() @@ -55,7 +59,9 @@ def FixRecord(id,value): def ProcessTable(sid,pid): - if not current: + if process_all: + data=GetData(sid,pid,Prehistoric(),Today()) + elif not current: data=GetData(sid,pid) else: data=GetData(sid,pid,Today(),Tomorrow()) @@ -90,6 +96,10 @@ if len(sys.argv)==2 and sys.argv[1]=='current': else: current=False +if len(sys.argv)==2 and sys.argv[1]=='all': + process_all=True +else: + process_all=False try: diff --git a/filter_sensors.py b/filter_sensors.py deleted file mode 100644 index 6a0a11c..0000000 --- a/filter_sensors.py +++ /dev/null @@ -1,131 +0,0 @@ -#!/usr/bin/python - -import MySQLdb -import ConfigParser -import sys - -from pprint import pprint -import datetime - -import numpy as np - -import scipy.signal - -global database - -def GetTables(name): - if database: - c = database.cursor() - c.execute("SELECT * FROM Items WHERE ItemName like %s",[name]) - return c.fetchall() - else: - print "No connection to DB" - exit() - -def Today(): - dt = datetime.datetime.now() - d_truncated = datetime.date(dt.year, dt.month, dt.day) - return d_truncated - -def Tomorrow(): - dt = Today() - return dt + datetime.timedelta(days=1) - -def Yesterday(): - dt = Today() - return dt - datetime.timedelta(days=1) - -def GetData(id,fromDate=Yesterday(),toDate=Today()): - if database: - c = database.cursor() - c.execute("SELECT * FROM Item"+str(id).strip()+" WHERE Time>=%s AND Time<%s",[fromDate.strftime('%Y-%m-%d %H:%M:%S'),toDate.strftime('%Y-%m-%d %H:%M:%S')]) - return c.fetchall() - else: - print "No connection to DB" - exit() - -def FixRecord(id,date,value): - if database: - c = database.cursor() - command="UPDATE Item"+str(id).strip()+" SET Value={} WHERE Time='{}'".format(value,date.strftime('%Y-%m-%d %H:%M:%S')) - print command - c.execute(command) - else: - print "No connection to DB" - exit() - -def ProcessTable(id): - - if not current: - data=GetData(id) - else: - data=GetData(id,Today(),Tomorrow()) - sTime=[] - sValue=[] - for rec in data: - sTime.append(rec[0]) - sValue.append(rec[1]) - sValue=np.array(sValue) - - sValueFilt=scipy.signal.medfilt(sValue,5) - - sValueDiff=abs(sValue-sValueFilt) - - avg=np.mean(sValueDiff) - - for i in range(0,len(sTime)-1): - if sValueDiff[i]>avg*filterThreshold: - print "fixing %s : %5.2f %5.2f %5.2f" % (sTime[i],sValue[i],sValueFilt[i],sValueDiff[i]) - FixRecord(id,sTime[i],sValueFilt[i]) - - database.commit() - -if len(sys.argv)==2 and sys.argv[1]=='current': - current=True -else: - current=False - - -try: - - cfg = ConfigParser.RawConfigParser(allow_no_value=True) - cfg.readfp(open('/etc/openhab-db.conf')) - dbhost = cfg.get("mysql","host") - dbuser = cfg.get("mysql","user") - dbpasswd = cfg.get("mysql","passwd") - dbdb = cfg.get("mysql","db") - - itemTemplate = cfg.get("openhab","template") - - filterWindow = int(cfg.get("filter","window")) - filterThreshold = float(cfg.get("filter","threshold")) - -except: - - print "Error reading configuration file" - exit() - -try: - - database = MySQLdb.connect(host=dbhost,user=dbuser,passwd=dbpasswd,db=dbdb,use_unicode=True) - database.set_character_set('utf8') - c = database.cursor() - c.execute('SET NAMES utf8;') - - print "Connected..." - -except: - - print "Error connecting database" - exit() - -tables = GetTables(itemTemplate) - -for id,name in tables: - - print "Processing: "+name - - ProcessTable(id) - -print "Processed " - \ No newline at end of file diff --git a/weathermon-mqtt b/weathermon-mqtt index 6c5fd37..1d96410 100755 --- a/weathermon-mqtt +++ b/weathermon-mqtt @@ -5,17 +5,26 @@ import sys from ConfigParser import ConfigParser import MySQLdb from pprint import pprint +import json +from dateutil.parser import parser +tparser=parser() def on_message(mosq, obj, msg): topic=msg.topic - payload=msg.payload - try: - c = database.cursor() - c.execute('CALL meteo.submit_mqtt(%s,%s,NULL)', (topic,payload)) - database.commit() - print topic,payload - except: - print "Failed to submit data" + payload=json.loads(msg.payload) + timestamp=tparser.parse(payload['Time']) + for sensor_type in payload: + if sensor_type != 'Time': + sensor_data=payload[sensor_type] + for param in sensor_data: + try: + value=sensor_data[param] + c = database.cursor() + c.execute('CALL meteo.submit_mqtt(%s,%s,%s,%s,NULL)', (topic,sensor_type,param,value)) + database.commit() + print topic,sensor_type,param,value + except: + print "Failed to submit data" def Topics(): diff --git a/weathermon.lua~ b/weathermon.lua~ deleted file mode 100755 index f2b584b..0000000 --- a/weathermon.lua~ +++ /dev/null @@ -1,273 +0,0 @@ -#!/usr/bin/lua - -function getConfig() - - local uci=require("uci") - local cur=uci.cursor() - local config="weathermon" - - web_url = cur.get(config,"web","url") - web_user = cur.get(config,"web","user") - web_pass = cur.get(config,"web","password") - web_devid = cur.get(config,"web","devid") - - web_iface = cur.get(config,"web","iface") - - if web_iface then - - command = '/sbin/ifconfig '..web_iface..' | grep \'\\\' | sed -n \'1p\' | tr -s \' \' | cut -d \' \' -f3 | cut -d \':\' -f2' - f=io.popen(command,'r') - ip_addr=f:read() - - end - - if not web_devid then - - if web_iface then - io.input("/sys/class/net/"..web_iface.."/address") - else - io.input("/sys/class/net/eth0/address") - end - - mac = io.read("*line") - mac = mac:gsub(":","") - mac = mac:upper() - - web_devid = mac - - end - - logging = cur.get(config,"logging","enabled") - - serial_port = cur.get(config,"serial","port") - serial_baud = cur.get(config,"serial","baud") - - input_file = cur.get(config,"input","file") - input_exec = cur.get(config,"input","exec") - alarm_exec = cur.get(config,"alarm","exec") - - if serial_port then - - command = "stty -F "..serial_port.." "..serial_baud - os.execute(command) - - end - - mqtt_host = cur.get(config,"mqtt","host") - mqtt_port = cur.get(config,"mqtt","port") - mqtt_id = cur.get(config,"mqtt","id") - mqtt_topic = cur.get(config,"mqtt","topic") - mqtt_alarm_topic = cur.get(config,"mqtt","alarm_topic") - - mqtt_user = cur.get(config,"mqtt","user") - mqtt_passwd = cur.get(config,"mqtt","password") - - if mqtt_host and not mqtt_id then - mqtt_id="weather-"..web_devid - end - - if mqtt_host and not mqtt_port then - mqtt_port = 1883 - end - - if mqtt_host and not mqtt_topic then - mqtt_topic = 'weathermon/{dev}/{type}/{id}/{param}' - end - - if mqtt_host and not mqtt_alarm_topic then - mqtt_alarm_topic = 'alarm/{dev}/{type}/{id}' - end - -end - -require "socket" - -function sleep(sec) - socket.select(nil, nil, sec) -end - -function splitStr(str,char) - - local res = {} - local idx = 1 - - - while str:len()>0 do - pos = str:find(char); - if pos == nil then - res[idx]=str - str="" - else - res[idx]=str:sub(1,pos-1) - idx=idx+1 - str=str:sub(pos+1) - end - end - - return res - -end - -function printLog(str) - print(str) - if logging=="on" then - os.execute("logger -t weathermon "..str) - end -end - -function submitValue(type,id,param,val) - - url = web_url.."?stype="..type.."&sid="..id.."¶m="..param.."&value="..val - - command = "curl" - - if web_iface then - command = command.." --interface "..ip_addr - end - - if web_user then - command = command.." -u "..web_user..":"..web_pass - end - - command = command.." \""..url.."\"" - - os.execute(command) - print() - -end - -function processLine(str) - - msg=splitStr(line,':') - msg_type=msg[1] or '' - msg_body=msg[2] or '' - if msg_type=="STATUS" then - printLog("Status: "..msg_body) - elseif msg_type=="ERROR" then - printLog("Error: "..msg_body) - elseif msg_type=="SENSOR" then - printLog("SENSOR: "..msg_body) - sens = splitStr(msg_body,",") - sensor = {} - idx = 1 - sensor_type = nil - sensor_id = web_devid - for i,rec in ipairs(sens) do - recrd=splitStr(rec,'=') - key=recrd[1] or '' - value=recrd[2] or '' - if value then - if key=="TYPE" then - sensor_type=value - elseif key=="ID" then - sensor_id=value - else - sensor[key]=value - end - end - end - if not (sensor_type==nil or sensor_id==nil or sensor_type=='' or sensor_id=='') then - for k,v in pairs(sensor) do - printLog("Type = "..sensor_type..", ID = "..sensor_id..", Param = "..k..", Value = "..v) - submitValue(sensor_type,sensor_id,k,v) - if mqtt_client then - mqtt_path=string.gsub(mqtt_topic,"{(.-)}", - function (name) - if name=="dev" then - return web_devid - elseif name=="type" then - return sensor_type - elseif name=="id" then - return sensor_id - elseif name=="param" then - return k - else - return '{'..name..'}' - end - end) - mqtt_client:publish(mqtt_path,v) - end - end - else - printLog("Cannot parse sensor input: "..msg_body) - end - elseif msg_type=="ALARM" then - printLog("ALARM: "..msg_body) - sens = splitStr(msg_body,",") - sensor = {} - idx = 1 - sensor_type = nil - sensor_id = web_devid - mqtt_param = {} - for i,rec in ipairs(sens) do - recrd=splitStr(rec,'=') - key=recrd[1] or '' - value=recrd[2] or '' - if value then - if key=="TYPE" then - alarm_type=value - elseif key=="ID" then - alarm_id=value - end - end - end - if not (alarm_type==nil or alarm_id==nil or alarm_type=='' or alarm_id=='') then - if mqtt_client then - mqtt_path=string.gsub(mqtt_alarm_topic,"{(.-)}", - function (name) - if name=="dev" then - return web_devid - elseif name=="type" then - return sensor_type - elseif name=="id" then - return sensor_id - else - return '{'..name..'}' - end - end) - mqtt_client:publish(mqtt_path,msg_body) - end - if alarm_exec then - command=alarm_exec.. - " \""..string.gsub(alarm_type,"\"","\\\"").. - "\" \""..string.gsub(alarm_id,"\"","\\\"").. - "\" \""..string.gsub(msg_body,"\"","\\\"").."\"" - os.execute(command) - end - else - printLog("Cannot parse alarm input: "..msg_body) - end - end - -end - -getConfig() - -if mqtt_host then - MQTT = require "paho.mqtt" - mqtt_client = MQTT.client.create(mqtt_host, mqtt_port) - if mqtt_user then - mqtt_client:auth(mqtt_user, mqtt_passwd) - end - mqtt_client:connect(mqtt_id) - json = require( "json" ) -end - -if serial_port then - serialin=io.open(serial_port,"r") -elseif input_file == "-" then - serialin=io.stdin; -elseif input_file then - serialin=io.open(input_file,"r") -elseif input_exec then - serialin=io.popen(input_exec,"r") -else - printLog("No input selected") - return -end -while 1 do - line=serialin:read() - print(line) - processLine(line) -end - diff --git a/weathermon_cron b/weathermon_cron index 30e21d8..bc4eb88 100755 --- a/weathermon_cron +++ b/weathermon_cron @@ -12,6 +12,8 @@ import ConfigParser import uuid +debug = True + def print_log(str): global logging,debug if debug>0: