From: Roman Bazalevsky Date: Fri, 23 Sep 2016 15:07:02 +0000 (+0300) Subject: Наборчик для общения между несколькими mpd и openhab через mqtt. X-Git-Url: https://git.rvb.name/openhab-process.git/commitdiff_plain/0d4dd2a297fe44d645e64a1e5d48c1f37294c571?ds=sidebyside Наборчик для общения между несколькими mpd и openhab через mqtt. --- diff --git a/filter_sensors.py b/filter_sensors.py deleted file mode 100644 index 8a5c0b9..0000000 --- a/filter_sensors.py +++ /dev/null @@ -1,117 +0,0 @@ -#!/usr/bin/python - -import MySQLdb -import ConfigParser - -from pprint import pprint -import datetime - -import numpy as np - -import scipy.signal - -global database - -def GetTables(name): - if database: - c = database.cursor() - c.execute("SELECT * FROM Items WHERE ItemName like %s",[name]) - return c.fetchall() - else: - print "No connection to DB" - exit() - -def Today(): - dt = datetime.datetime.now() - d_truncated = datetime.date(dt.year, dt.month, dt.day) - return d_truncated - -def Tomorrow(): - dt = Today() - return dt + datetime.timedelta(days=1) - -def Yesterday(): - dt = Today() - return dt - datetime.timedelta(days=1) - -def GetData(id,fromDate=Yesterday(),toDate=Today()): - if database: - c = database.cursor() - c.execute("SELECT * FROM Item"+str(id).strip()+" WHERE Time>=%s AND Time<%s",[fromDate.strftime('%Y-%m-%d %H:%M:%S'),toDate.strftime('%Y-%m-%d %H:%M:%S')]) - return c.fetchall() - else: - print "No connection to DB" - exit() - -def FixRecord(id,date,value): - if database: - c = database.cursor() - command="UPDATE Item"+str(id).strip()+" SET Value={} WHERE Time='{}'".format(value,date.strftime('%Y-%m-%d %H:%M:%S')) - print command - c.execute(command) - else: - print "No connection to DB" - exit() - -try: - - cfg = ConfigParser.RawConfigParser(allow_no_value=True) - cfg.readfp(open('/etc/openhab-db.conf')) - dbhost = cfg.get("mysql","host") - dbuser = cfg.get("mysql","user") - dbpasswd = cfg.get("mysql","passwd") - dbdb = cfg.get("mysql","db") - - itemTemplate = cfg.get("openhab","template") - - filterWindow = int(cfg.get("filter","window")) - filterThreshold = float(cfg.get("filter","threshold")) - -except: - - print "Error reading configuration file" - exit() - -try: - - database = MySQLdb.connect(host=dbhost,user=dbuser,passwd=dbpasswd,db=dbdb,use_unicode=True) - database.set_character_set('utf8') - c = database.cursor() - c.execute('SET NAMES utf8;') - - print "Connected..." - -except: - - print "Error connecting database" - exit() - -tables = GetTables(itemTemplate) - -for id,name in tables: - - print "Processing: "+name - - data=GetData(id) - sTime=[] - sValue=[] - for rec in data: - sTime.append(rec[0]) - sValue.append(rec[1]) - sValue=np.array(sValue) - - sValueFilt=scipy.signal.medfilt(sValue,5) - - sValueDiff=abs(sValue-sValueFilt) - - avg=np.mean(sValueDiff) - - for i in range(0,len(sTime)-1): - if sValueDiff[i]>avg*filterThreshold: - print "fixing %s : %5.2f %5.2f %5.2f" % (sTime[i],sValue[i],sValueFilt[i],sValueDiff[i]) - FixRecord(id,sTime[i],sValueFilt[i]) - - database.commit() - -print "Processed " - \ No newline at end of file diff --git a/mqtt-mpd/mqmpd b/mqtt-mpd/mqmpd new file mode 100755 index 0000000..9867146 --- /dev/null +++ b/mqtt-mpd/mqmpd @@ -0,0 +1,112 @@ +#!/bin/bash + +if [ "$1" = "stop" ] +then + + for i in `pgrep -f "/bin/bash /opt/mqtt-mpd/mqmpdj"` + do + killtree $i 9 + done + + exit 0 + +fi + +. /opt/mqtt-mpd/mqmpd.cfg + +# запускаем процессы, мониторящие состояние + +i=0 +while [ "x${hosts[i]}" != "x" ] +do + + /opt/mqtt-mpd/mqmpdj ${hosts[i]} ${passwd[i]} & + + i=$(( $i + 1 )) + +done + +# сами остаемся слушать команды + +mosquitto_sub -v -h $mqhost -p $mqport -t "$mqcmd/#" -u $mquser -P $mqpassword | while read line +do + + template="s!$mqcmd/!!" + addr=`echo $line | cut -f 1 -d " " | sed $template` + arg=`echo $line | cut -f 2 -d " "` + echo $addr $arg + + host=`echo $addr | cut -f 1 -d "/"` + cmd=`echo $addr | cut -f 2 -d "/"` + + i=0 + while [ "x${hosts[i]}" != "x" ] + do + if [ "${hosts[i]}" = "$host" ] + then + mpdpass=${passwd[i]} + if [ "$mpdpass" = "-" ] + then + mpccmd="mpc -h $host" + else + mpccmd="mpc -h $host -P $mpdpass" + fi + break + fi + i=$(( $i + 1 )) + done + + arg=`printf "%q" $arg` + if [[ "$arg" =~ [A-Za-z0-9]+ ]] + then + + case $cmd in + + volume) + reply=( `$mpccmd volume`) + currvolume=`echo ${reply[1]} | sed 's/%//'` + if [ "$currvolume" ] + then + if [ "$arg" -gt "$currvolume" ] + then + delta=$(( $arg - $currvolume )) + $mpccmd volume +$delta + fi + if [ "$arg" -lt "$currvolume" ] + then + delta=$(( $currvolume - $arg )) + $mpccmd volume -$delta + fi + fi + ;; + status) + case $arg in + playing) + $mpccmd play + ;; + stopped) + $mpccmd stop + ;; + paused) + $mpccmd pause + ;; + *) + ;; + esac + ;; + repeat|random|single|consume) + $mpccmd $cmd $arg + ;; + *) + echo "Неизвестная команда $cmd" + ;; + + esac + + else + + echo "Недопустимый аргумент $arg" + + fi + +done diff --git a/mqtt-mpd/mqmpd.cfg b/mqtt-mpd/mqmpd.cfg new file mode 100644 index 0000000..65cf9dc --- /dev/null +++ b/mqtt-mpd/mqmpd.cfg @@ -0,0 +1,8 @@ +mqhost=localhost +mqport=1883 +mquser="mpd" +mqpassword="mypassword" +mqtopic="mpd/out" +mqcmd="mpd/in" +hosts=("host1" "host2" "host3") +passwd=("password1" "-" "password3") diff --git a/mqtt-mpd/mqmpd.cfg~ b/mqtt-mpd/mqmpd.cfg~ new file mode 100644 index 0000000..51dec1e --- /dev/null +++ b/mqtt-mpd/mqmpd.cfg~ @@ -0,0 +1,8 @@ +mqhost=localhost +mqport=1883 +mquser="mpd" +mqpassword="wsufytcvtldtltv" +mqtopic="mpd/out" +mqcmd="mpd/in" +hosts=("estia" "nefele" "orpheus") +passwd=("gdetotamdaleko" "11093008" "malenkayakorobka") diff --git a/mqtt-mpd/mqmpdj b/mqtt-mpd/mqmpdj new file mode 100755 index 0000000..70603f2 --- /dev/null +++ b/mqtt-mpd/mqmpdj @@ -0,0 +1,77 @@ +#!/bin/bash + +. /opt/mqtt-mpd/mqmpd.cfg + +host=$1 +passwd=$2 + +if [ "$passwd" = "-" ] +then + + command="mpc -h $host" + +else + + command="mpc -h $host -P $passwd" + +fi + +while : +do + $command idle + if [ $? = 0 ] + then +# current=`mpc -h $host -P $passwd current | sed 's/"/\"/g'` +# volume=`mpc -h $host -P $passwd volume | cut -d: -f2 | sed -e 's/^[[:space:]]*//' -e 's/[[:space:]]*$//'` + + mapfile -t array < <( $command ) + if [ ${#array[@]} = 3 ] + then + current=${array[0]} + str2=( ${array[1]}) + str3=( ${array[2]}) + status=`echo ${str2[0]} | sed 's/^\[//' | sed 's/\]$//'` + current_track_num=`echo ${str2[1]} | cut -d '/' -f 1 | sed 's/^#//'` + tracks_in_list=`echo ${str2[1]} | cut -d '/' -f 2` + current_pos=`echo ${str2[2]} | cut -d '/' -f 1` + track_length=`echo ${str2[2]} | cut -d '/' -f 2` + volume=`echo ${str3[1]} | sed 's/%//'` + repeat=${str3[3]} + random=${str3[5]} + single=${str3[7]} + consume=${str3[9]} + else + current='-' + status='stopped' + current_track_num='0' + tracks_in_list='0' + current_pos='-' + track_length='-' + str3=( ${array[0]}) + volume=`echo ${str3[1]} | sed 's/%//'` + if [ "$volume" = "n/a" ] + then + volume="0" + fi + repeat=${str3[3]} + random=${str3[5]} + single=${str3[7]} + consume=${str3[9]} + fi + + mosquitto_pub -h $mqhost -p $mqport -t $mqtopic/$host/nowplaying -m "$current" -u $mquser -P $mqpassword + mosquitto_pub -h $mqhost -p $mqport -t $mqtopic/$host/volume -m "$volume" -u $mquser -P $mqpassword + mosquitto_pub -h $mqhost -p $mqport -t $mqtopic/$host/status -m "$status" -u $mquser -P $mqpassword + mosquitto_pub -h $mqhost -p $mqport -t $mqtopic/$host/currentnum -m "$current_track_num" -u $mquser -P $mqpassword + mosquitto_pub -h $mqhost -p $mqport -t $mqtopic/$host/tracks -m "$tracks_in_list" -u $mquser -P $mqpassword + mosquitto_pub -h $mqhost -p $mqport -t $mqtopic/$host/currentpos -m "$current_pos" -u $mquser -P $mqpassword + mosquitto_pub -h $mqhost -p $mqport -t $mqtopic/$host/tracklen -m "$track_length" -u $mquser -P $mqpassword + mosquitto_pub -h $mqhost -p $mqport -t $mqtopic/$host/repeat -m "$repeat" -u $mquser -P $mqpassword + mosquitto_pub -h $mqhost -p $mqport -t $mqtopic/$host/random -m "$random" -u $mquser -P $mqpassword + mosquitto_pub -h $mqhost -p $mqport -t $mqtopic/$host/single -m "$single" -u $mquser -P $mqpassword + mosquitto_pub -h $mqhost -p $mqport -t $mqtopic/$host/consume -m "$consume" -u $mquser -P $mqpassword + + else + sleep 30 + fi +done diff --git a/sensors-postprocess/filter_sensors.py b/sensors-postprocess/filter_sensors.py new file mode 100644 index 0000000..6a0a11c --- /dev/null +++ b/sensors-postprocess/filter_sensors.py @@ -0,0 +1,131 @@ +#!/usr/bin/python + +import MySQLdb +import ConfigParser +import sys + +from pprint import pprint +import datetime + +import numpy as np + +import scipy.signal + +global database + +def GetTables(name): + if database: + c = database.cursor() + c.execute("SELECT * FROM Items WHERE ItemName like %s",[name]) + return c.fetchall() + else: + print "No connection to DB" + exit() + +def Today(): + dt = datetime.datetime.now() + d_truncated = datetime.date(dt.year, dt.month, dt.day) + return d_truncated + +def Tomorrow(): + dt = Today() + return dt + datetime.timedelta(days=1) + +def Yesterday(): + dt = Today() + return dt - datetime.timedelta(days=1) + +def GetData(id,fromDate=Yesterday(),toDate=Today()): + if database: + c = database.cursor() + c.execute("SELECT * FROM Item"+str(id).strip()+" WHERE Time>=%s AND Time<%s",[fromDate.strftime('%Y-%m-%d %H:%M:%S'),toDate.strftime('%Y-%m-%d %H:%M:%S')]) + return c.fetchall() + else: + print "No connection to DB" + exit() + +def FixRecord(id,date,value): + if database: + c = database.cursor() + command="UPDATE Item"+str(id).strip()+" SET Value={} WHERE Time='{}'".format(value,date.strftime('%Y-%m-%d %H:%M:%S')) + print command + c.execute(command) + else: + print "No connection to DB" + exit() + +def ProcessTable(id): + + if not current: + data=GetData(id) + else: + data=GetData(id,Today(),Tomorrow()) + sTime=[] + sValue=[] + for rec in data: + sTime.append(rec[0]) + sValue.append(rec[1]) + sValue=np.array(sValue) + + sValueFilt=scipy.signal.medfilt(sValue,5) + + sValueDiff=abs(sValue-sValueFilt) + + avg=np.mean(sValueDiff) + + for i in range(0,len(sTime)-1): + if sValueDiff[i]>avg*filterThreshold: + print "fixing %s : %5.2f %5.2f %5.2f" % (sTime[i],sValue[i],sValueFilt[i],sValueDiff[i]) + FixRecord(id,sTime[i],sValueFilt[i]) + + database.commit() + +if len(sys.argv)==2 and sys.argv[1]=='current': + current=True +else: + current=False + + +try: + + cfg = ConfigParser.RawConfigParser(allow_no_value=True) + cfg.readfp(open('/etc/openhab-db.conf')) + dbhost = cfg.get("mysql","host") + dbuser = cfg.get("mysql","user") + dbpasswd = cfg.get("mysql","passwd") + dbdb = cfg.get("mysql","db") + + itemTemplate = cfg.get("openhab","template") + + filterWindow = int(cfg.get("filter","window")) + filterThreshold = float(cfg.get("filter","threshold")) + +except: + + print "Error reading configuration file" + exit() + +try: + + database = MySQLdb.connect(host=dbhost,user=dbuser,passwd=dbpasswd,db=dbdb,use_unicode=True) + database.set_character_set('utf8') + c = database.cursor() + c.execute('SET NAMES utf8;') + + print "Connected..." + +except: + + print "Error connecting database" + exit() + +tables = GetTables(itemTemplate) + +for id,name in tables: + + print "Processing: "+name + + ProcessTable(id) + +print "Processed " + \ No newline at end of file