Переделана обработка сообщений MQTT в связи с переходом на прошивку Sonoff-Tasmota...
authorRoman Bazalevsky <rvb@rvb.name>
Sat, 25 Mar 2017 20:56:25 +0000 (23:56 +0300)
committerRoman Bazalevsky <rvb@rvb.name>
Sat, 25 Mar 2017 20:56:25 +0000 (23:56 +0300)
сенсоров и поддержкой более чем одного сенсора на устройстве.

filter_meteo.py
filter_sensors.py [deleted file]
weathermon-mqtt
weathermon.lua~ [deleted file]
weathermon_cron

index d4f0e7db8666ea770341255a90137f3ca20e8f15..fc497db694e18e8954dba99651f9f8b46e37cd37 100755 (executable)
@@ -35,6 +35,10 @@ def Yesterday():
   dt = Today()
   return dt - datetime.timedelta(days=1)
 
+def Prehistoric():
+  dt = datetime.date(2000,01,01)
+  return dt
+
 def GetData(sid,pid,fromDate=Yesterday(),toDate=Today()):
   if database:
     c = database.cursor()
@@ -55,7 +59,9 @@ def FixRecord(id,value):
 
 def ProcessTable(sid,pid):
 
-  if not current:
+  if process_all:
+    data=GetData(sid,pid,Prehistoric(),Today())
+  elif not current:
     data=GetData(sid,pid)
   else:
     data=GetData(sid,pid,Today(),Tomorrow())  
@@ -90,6 +96,10 @@ if len(sys.argv)==2 and sys.argv[1]=='current':
 else:
   current=False
 
+if len(sys.argv)==2 and sys.argv[1]=='all':
+  process_all=True
+else:
+  process_all=False
 
 try:
 
diff --git a/filter_sensors.py b/filter_sensors.py
deleted file mode 100644 (file)
index 6a0a11c..0000000
+++ /dev/null
@@ -1,131 +0,0 @@
-#!/usr/bin/python
-
-import MySQLdb
-import ConfigParser
-import sys
-
-from pprint import pprint
-import datetime
-
-import numpy as np
-
-import scipy.signal
-
-global database
-
-def GetTables(name):
-  if database:
-    c = database.cursor()
-    c.execute("SELECT * FROM Items WHERE ItemName like %s",[name])
-    return c.fetchall()
-  else:
-    print "No connection to DB"
-    exit()
-
-def Today():
-  dt = datetime.datetime.now()
-  d_truncated = datetime.date(dt.year, dt.month, dt.day)
-  return d_truncated
-  
-def Tomorrow():
-  dt = Today()
-  return dt + datetime.timedelta(days=1)
-
-def Yesterday():
-  dt = Today()
-  return dt - datetime.timedelta(days=1)
-
-def GetData(id,fromDate=Yesterday(),toDate=Today()):
-  if database:
-    c = database.cursor()
-    c.execute("SELECT * FROM Item"+str(id).strip()+" WHERE Time>=%s AND Time<%s",[fromDate.strftime('%Y-%m-%d %H:%M:%S'),toDate.strftime('%Y-%m-%d %H:%M:%S')])
-    return c.fetchall()
-  else:
-    print "No connection to DB"
-    exit()
-
-def FixRecord(id,date,value):
-  if database:
-    c = database.cursor()
-    command="UPDATE Item"+str(id).strip()+" SET Value={} WHERE Time='{}'".format(value,date.strftime('%Y-%m-%d %H:%M:%S'))
-    print command
-    c.execute(command)
-  else:
-    print "No connection to DB"
-    exit()
-
-def ProcessTable(id):
-
-  if not current:
-    data=GetData(id)
-  else:
-    data=GetData(id,Today(),Tomorrow())  
-  sTime=[]
-  sValue=[]
-  for rec in data:
-    sTime.append(rec[0])
-    sValue.append(rec[1])
-  sValue=np.array(sValue)
-
-  sValueFilt=scipy.signal.medfilt(sValue,5)
-
-  sValueDiff=abs(sValue-sValueFilt)
-  
-  avg=np.mean(sValueDiff)
-
-  for i in range(0,len(sTime)-1):
-    if sValueDiff[i]>avg*filterThreshold:
-      print "fixing %s : %5.2f %5.2f %5.2f" % (sTime[i],sValue[i],sValueFilt[i],sValueDiff[i])
-      FixRecord(id,sTime[i],sValueFilt[i])      
-
-  database.commit()
-
-if len(sys.argv)==2 and sys.argv[1]=='current':
-  current=True
-else:
-  current=False
-
-
-try:
-
-  cfg = ConfigParser.RawConfigParser(allow_no_value=True)
-  cfg.readfp(open('/etc/openhab-db.conf'))
-  dbhost = cfg.get("mysql","host")
-  dbuser = cfg.get("mysql","user")
-  dbpasswd = cfg.get("mysql","passwd")
-  dbdb = cfg.get("mysql","db")
-
-  itemTemplate = cfg.get("openhab","template")
-  
-  filterWindow = int(cfg.get("filter","window"))
-  filterThreshold = float(cfg.get("filter","threshold"))
-   
-except:
-
-  print "Error reading configuration file"
-  exit()
-
-try:
-
-  database = MySQLdb.connect(host=dbhost,user=dbuser,passwd=dbpasswd,db=dbdb,use_unicode=True)
-  database.set_character_set('utf8')
-  c = database.cursor()
-  c.execute('SET NAMES utf8;')
-
-  print "Connected..."
-
-except:
-
-  print "Error connecting database"
-  exit()
-
-tables = GetTables(itemTemplate)
-
-for id,name in tables:
-
-  print "Processing: "+name
-
-  ProcessTable(id)
-
-print "Processed "
-  
\ No newline at end of file
index 6c5fd378d949bcefddb8194820febff88f6e44fe..1d96410178be7c4a5817876d79ffcd0d9088e46e 100755 (executable)
@@ -5,17 +5,26 @@ import sys
 from ConfigParser import ConfigParser
 import MySQLdb
 from pprint import pprint
+import json
+from dateutil.parser import parser
+tparser=parser()
 
 def on_message(mosq, obj, msg):
   topic=msg.topic
-  payload=msg.payload
-  try:
-    c = database.cursor()
-    c.execute('CALL meteo.submit_mqtt(%s,%s,NULL)', (topic,payload))
-    database.commit()
-    print topic,payload
-  except:
-    print "Failed to submit data"  
+  payload=json.loads(msg.payload)
+  timestamp=tparser.parse(payload['Time'])
+  for sensor_type in payload:
+    if sensor_type != 'Time':
+      sensor_data=payload[sensor_type]
+      for param in sensor_data:
+        try:
+          value=sensor_data[param]
+          c = database.cursor()
+          c.execute('CALL meteo.submit_mqtt(%s,%s,%s,%s,NULL)', (topic,sensor_type,param,value))
+          database.commit()
+          print topic,sensor_type,param,value
+        except:
+          print "Failed to submit data"  
 
 def Topics():
 
diff --git a/weathermon.lua~ b/weathermon.lua~
deleted file mode 100755 (executable)
index f2b584b..0000000
+++ /dev/null
@@ -1,273 +0,0 @@
-#!/usr/bin/lua
-
-function getConfig()
-
-  local uci=require("uci")
-  local cur=uci.cursor()
-  local config="weathermon"
-
-  web_url  = cur.get(config,"web","url")
-  web_user = cur.get(config,"web","user")
-  web_pass = cur.get(config,"web","password")
-  web_devid = cur.get(config,"web","devid")
-
-  web_iface = cur.get(config,"web","iface")
-
-  if web_iface then
-  
-    command = '/sbin/ifconfig '..web_iface..' | grep \'\\<inet\\>\' | sed -n \'1p\' | tr -s \' \' | cut -d \' \' -f3 | cut -d \':\' -f2'
-    f=io.popen(command,'r')
-    ip_addr=f:read()
-  
-  end
-
-  if not web_devid then
-  
-    if web_iface then
-      io.input("/sys/class/net/"..web_iface.."/address")
-    else
-      io.input("/sys/class/net/eth0/address")
-    end
-
-    mac = io.read("*line")
-    mac = mac:gsub(":","")
-    mac = mac:upper()
-
-    web_devid = mac
-
-  end
-
-  logging = cur.get(config,"logging","enabled") 
-
-  serial_port = cur.get(config,"serial","port")
-  serial_baud = cur.get(config,"serial","baud")
-
-  input_file = cur.get(config,"input","file")
-  input_exec = cur.get(config,"input","exec")
-  alarm_exec = cur.get(config,"alarm","exec")
-
-  if serial_port then
-
-    command = "stty -F  "..serial_port.." "..serial_baud
-    os.execute(command)
-
-  end
-
-  mqtt_host = cur.get(config,"mqtt","host")
-  mqtt_port = cur.get(config,"mqtt","port")
-  mqtt_id = cur.get(config,"mqtt","id")
-  mqtt_topic = cur.get(config,"mqtt","topic")
-  mqtt_alarm_topic = cur.get(config,"mqtt","alarm_topic")
-
-  mqtt_user = cur.get(config,"mqtt","user")
-  mqtt_passwd = cur.get(config,"mqtt","password")
-
-  if mqtt_host and not mqtt_id then
-    mqtt_id="weather-"..web_devid
-  end
-
-  if mqtt_host and not mqtt_port then
-    mqtt_port = 1883
-  end
-
-  if mqtt_host and not mqtt_topic then
-    mqtt_topic = 'weathermon/{dev}/{type}/{id}/{param}'
-  end
-
-  if mqtt_host and not mqtt_alarm_topic then
-    mqtt_alarm_topic = 'alarm/{dev}/{type}/{id}'
-  end
-
-end
-
-require "socket"
-
-function sleep(sec)
-  socket.select(nil, nil, sec)
-end
-
-function splitStr(str,char)
-
-  local res = {}
-  local idx = 1
-
-
-  while str:len()>0 do
-    pos = str:find(char); 
-    if pos == nil then
-      res[idx]=str
-      str=""
-    else
-      res[idx]=str:sub(1,pos-1)
-      idx=idx+1
-      str=str:sub(pos+1)
-    end
-  end
-
-  return res
-
-end
-
-function printLog(str)
-  print(str)
-  if logging=="on" then
-    os.execute("logger -t weathermon "..str)
-  end 
-end
-
-function submitValue(type,id,param,val)
-
-  url = web_url.."?stype="..type.."&sid="..id.."&param="..param.."&value="..val
-
-  command = "curl"
-
-  if web_iface then
-    command = command.." --interface "..ip_addr
-  end
-
-  if web_user then
-    command = command.." -u "..web_user..":"..web_pass
-  end
-
-  command = command.." \""..url.."\""
-
-  os.execute(command)
-  print()
-
-end
-
-function processLine(str)
-
-  msg=splitStr(line,':')
-  msg_type=msg[1] or ''
-  msg_body=msg[2] or ''
-  if msg_type=="STATUS" then
-    printLog("Status: "..msg_body)
-  elseif msg_type=="ERROR" then
-    printLog("Error: "..msg_body)  
-  elseif msg_type=="SENSOR" then
-    printLog("SENSOR: "..msg_body)  
-    sens = splitStr(msg_body,",")
-    sensor = {}
-    idx = 1
-    sensor_type = nil
-    sensor_id = web_devid
-    for i,rec in ipairs(sens) do
-      recrd=splitStr(rec,'=')
-      key=recrd[1] or ''
-      value=recrd[2] or ''
-      if value then
-        if key=="TYPE" then
-          sensor_type=value
-        elseif key=="ID" then
-          sensor_id=value
-        else
-          sensor[key]=value
-        end
-      end
-    end
-    if not (sensor_type==nil or sensor_id==nil or sensor_type=='' or sensor_id=='') then
-      for k,v in pairs(sensor) do
-        printLog("Type = "..sensor_type..", ID = "..sensor_id..", Param = "..k..", Value = "..v)
-        submitValue(sensor_type,sensor_id,k,v)
-        if mqtt_client then
-          mqtt_path=string.gsub(mqtt_topic,"{(.-)}", 
-            function (name) 
-              if name=="dev" then
-                return web_devid
-              elseif name=="type" then
-                return sensor_type
-              elseif name=="id" then
-                return sensor_id
-              elseif name=="param" then
-                return k
-              else
-                return '{'..name..'}'
-              end      
-            end)
-          mqtt_client:publish(mqtt_path,v)
-        end  
-      end
-    else
-      printLog("Cannot parse sensor input: "..msg_body)
-    end
-  elseif msg_type=="ALARM" then
-    printLog("ALARM: "..msg_body)  
-    sens = splitStr(msg_body,",")
-    sensor = {}
-    idx = 1
-    sensor_type = nil
-    sensor_id = web_devid
-    mqtt_param = {}
-    for i,rec in ipairs(sens) do
-      recrd=splitStr(rec,'=')
-      key=recrd[1] or ''
-      value=recrd[2] or ''
-      if value then
-        if key=="TYPE" then
-          alarm_type=value
-        elseif key=="ID" then
-          alarm_id=value
-        end
-      end
-    end
-    if not (alarm_type==nil or alarm_id==nil or alarm_type=='' or alarm_id=='') then
-      if mqtt_client then
-        mqtt_path=string.gsub(mqtt_alarm_topic,"{(.-)}", 
-          function (name) 
-            if name=="dev" then
-              return web_devid
-            elseif name=="type" then
-              return sensor_type
-            elseif name=="id" then
-              return sensor_id
-            else
-              return '{'..name..'}'
-            end      
-          end)
-        mqtt_client:publish(mqtt_path,msg_body)
-      end
-      if alarm_exec then
-        command=alarm_exec..
-          " \""..string.gsub(alarm_type,"\"","\\\"")..
-          "\" \""..string.gsub(alarm_id,"\"","\\\"")..
-          "\" \""..string.gsub(msg_body,"\"","\\\"").."\""
-        os.execute(command)
-      end
-    else
-      printLog("Cannot parse alarm input: "..msg_body)
-    end
-  end
-
-end
-
-getConfig()
-
-if mqtt_host then
-  MQTT = require "paho.mqtt"
-  mqtt_client = MQTT.client.create(mqtt_host, mqtt_port)
-  if mqtt_user then
-    mqtt_client:auth(mqtt_user, mqtt_passwd)
-  end
-  mqtt_client:connect(mqtt_id)
-  json = require( "json" )
-end
-
-if serial_port then
-  serialin=io.open(serial_port,"r")
-elseif input_file == "-" then
-  serialin=io.stdin;
-elseif input_file then
-  serialin=io.open(input_file,"r")
-elseif input_exec then
-  serialin=io.popen(input_exec,"r")
-else
-  printLog("No input selected")
-  return
-end  
-while 1 do
-  line=serialin:read()
-  print(line)
-  processLine(line)
-end
-
index 30e21d85d9700c2bda63367544ce7f8d07f49f95..bc4eb88504a0d981f90d203c6d299c01c14d5a71 100755 (executable)
@@ -12,6 +12,8 @@ import ConfigParser
 
 import uuid
 
+debug = True
+
 def print_log(str):
   global logging,debug
   if debug>0: