From: Roman Bazalevsky Date: Fri, 23 Sep 2016 19:20:58 +0000 (+0300) Subject: 1) MQTT-интерфейс к Asterisk для отслеживания состояний звонков X-Git-Url: https://git.rvb.name/openhab-process.git/commitdiff_plain/dec6ca23b56d7ce8f5e535c421c540029c104297 1) MQTT-интерфейс к Asterisk для отслеживания состояний звонков 2) MQTT-интерфейс к PulseAudio (работает в две стороны) --- diff --git a/mqtt-agi/mqtt b/mqtt-agi/mqtt new file mode 100755 index 0000000..6f69c7e --- /dev/null +++ b/mqtt-agi/mqtt @@ -0,0 +1,66 @@ +#!/usr/bin/env python + +# +# Usage: +# AGI(/etc/asterisk/agi/mqtt,/etc/asterisk/agi/mqtt.cfg,calls/missed) +# or, to override the extension: +# AGI(/etc/asterisk/agi/mqtt,/etc/asterisk/agi/mqtt.cfg,calls/missed,+43123456789) +# + +import sys +from ConfigParser import ConfigParser +import paho.mqtt.client as paho + +import json + +from pprint import pprint + +conffile, topic = sys.argv[1:3] + +config = ConfigParser() +config.add_section('mqtt') +# set defaults for anonymous auth +config.set('mqtt', 'username', '') +config.set('mqtt', 'password', '') +config.set('mqtt', 'port', '1883') +config.read(conffile) + +mqtt_server = config.get('mqtt', 'server') +mqtt_port = config.getint('mqtt', 'port') +mqtt_username = config.get('mqtt', 'username') +mqtt_password = config.get('mqtt', 'password') + +agi = [] +while 1: + line = sys.stdin.readline() + if not line or line == "\n": + break + agi.append(line) + +agi = dict([line.rstrip('\n').replace('agi_', '', 1).split(': ', 1) for line in agi]) + +if len(sys.argv) > 3: + agi['extension'] = sys.argv[3] + +def agi_exit(rc, *args): + if rc != 0: + print "VERBOSE rc=%s %s" % (rc, args) + sys.exit(rc) + +def on_connect(mosq, rc, *args): + if rc != 0: + agi_exit(1, "Connection failed: %d" % rc) + +def on_publish(mosq, *args): + # done + agi_exit(0) + +client = paho.Client('agi') +client.username_pw_set(mqtt_username, mqtt_password) +client.connect(mqtt_server, port=mqtt_port) +client.on_connect = on_connect +client.on_publish = on_publish + +client.publish(topic, payload=json.dumps(agi)) +client.loop() +agi_exit(1, "Message publish timed out") diff --git a/mqtt-agi/mqtt.cfg b/mqtt-agi/mqtt.cfg new file mode 100644 index 0000000..c8e33a0 --- /dev/null +++ b/mqtt-agi/mqtt.cfg @@ -0,0 +1,4 @@ +[mqtt] +server = localhost +username = asterisk +password = mypassword diff --git a/mqtt-mpd/mqmpd.cfg~ b/mqtt-mpd/mqmpd.cfg~ deleted file mode 100644 index 51dec1e..0000000 --- a/mqtt-mpd/mqmpd.cfg~ +++ /dev/null @@ -1,8 +0,0 @@ -mqhost=localhost -mqport=1883 -mquser="mpd" -mqpassword="wsufytcvtldtltv" -mqtopic="mpd/out" -mqcmd="mpd/in" -hosts=("estia" "nefele" "orpheus") -passwd=("gdetotamdaleko" "11093008" "malenkayakorobka") diff --git a/mqtt-pulse/DEADJOE b/mqtt-pulse/DEADJOE new file mode 100644 index 0000000..7069859 --- /dev/null +++ b/mqtt-pulse/DEADJOE @@ -0,0 +1,147 @@ + +*** These modified files were found in JOE when it aborted on Fri Sep 23 20:31:28 2016 +*** JOE was aborted because the terminal closed + +*** Файл '(БезИмени)' +mqtt_pulse.py + +*** Файл 'mqtt_pulse.py' +#!/usr/bin/python + +from pulsectl import Pulse,PulseLoopStop + +import threading + +sink_name=None +muted=False +volume=None + +paLock1=threading.RLock() +paLock2=threading.RLock() +stopFlag=False + +def Init(): + global pulse + pulse = Pulse("mqtt-pa") + +def Reconnect(): + pulse.close() + Init() + +def GetDefaultOut(): + return pulse.server_info().default_sink_name + +def GetDefaultSink(): + sinkname=GetDefaultOut() + for sink in pulse.sink_list(): + if sink.name==sinkname: + return sink + +def GetDefaultVolume(): + return pulse.volume_get_all_chans(GetDefaultSink()) + +def IsDefaultMuted(): + return GetDefaultSink().mute<>0 + +def MuteDefault(mute = True): + return pulse.mute(GetDefaultSink(),mute) + +def EventListener(callback): + pulse.event_mask_set('all') + pulse.event_callback_set(callback) + pulse.event_listen() + +def EventProcess(ev): + raise PulseLoopStop + +def AcquirePALock(): + paLock1.acquire() + + pulse.event_listen_stop() + paLock2.acquire() + +def ReleasePALock(): + paLock2.release() + paLock1.release() + +def StateProcess(): + global sink_name,muted,volume + try: + tname=threading.current_thread().name +# print tname+">trying to aquire lock" + AcquirePALock() +# print tname+">lock aquired" + current_sink=GetDefaultOut() + current_vol=round(GetDefaultVolume(),2) + current_muted=IsDefaultMuted() + if current_sink<>sink_name: + sink_name=current_sink + print tname+">sink: "+sink_name + if current_vol<>volume: + volume=current_vol + print tname+">volume: "+str(volume) + if current_muted<>muted: + muted=current_muted + print tname+">muted: "+str(muted) + finally: + ReleasePALock() +# print tname+">lock released" + +def PAListener(): + while not stopFlag: +# print "entering wait loop" + EventListener(EventProcess) +# print "event or break happened" + StateProcess() + +def RunBackground(process): + stopFlag=False + thread = threading.Thread(target=process,name="Background") + thread.start() + +def StopBackground(): + global stopFlag + stopFlag=True + pulse.event_listen_stop() + +def CommandGetDefaultOut(): + try: + tname=threading.current_thread().name +# print tname+">trying to aquire lock" + AcquirePALock() +# print tname+">lock aquired" + result=GetDefaultOut() + finally: + ReleasePALock() +# print tname+">lock released" + return result + +def CommandGetDefaultVolume(): + try: + tname=threading.current_thread().name +# print tname+">trying to aquire lock" + AcquirePALock() +# print tname+">lock aquired" + result=GetDefaultVolume() + finally: + ReleasePALock() +# print tname+">lock released" + return result + +def CommandIsDefaultMuted(): + try: + tname=threading.current_thread().name +# print tname+">trying to aquire lock" + AcquirePALock() +# print tname+">lock aquired" + result=IsDefaultMuted() + finally: + ReleasePALock() +# print tname+">lock released" + return result + +*** Файл '* Startup Log *' +Processing '/etc/joe/joerc'... +Processing '/etc/joe/ftyperc'... +Finished processing /etc/joe/ftyperc +Finished processing /etc/joe/joerc diff --git a/mqtt-pulse/mqtt.cfg b/mqtt-pulse/mqtt.cfg new file mode 100644 index 0000000..6cbf04c --- /dev/null +++ b/mqtt-pulse/mqtt.cfg @@ -0,0 +1,7 @@ +[mqtt] +server = localhost +username = pulse +password = mypassword +in = pulse/host/in +out = pulse/host/out + diff --git a/mqtt-pulse/mqtt_pulse.py b/mqtt-pulse/mqtt_pulse.py new file mode 100755 index 0000000..9faa0d7 --- /dev/null +++ b/mqtt-pulse/mqtt_pulse.py @@ -0,0 +1,281 @@ +#!/usr/bin/python + +from pulsectl import Pulse,PulseLoopStop + +import threading +from time import sleep + +import sys +from ConfigParser import ConfigParser +import paho.mqtt.client as paho + +# ====================== PulseAudio-related part ======================== + +sink_name=None +muted=None +volume=None + +callback_changed=None + +paLock1=threading.RLock() +paLock2=threading.RLock() +stopFlag=False + +debug=False + +def Init(): + global pulse + pulse = Pulse("mqtt-pa") + +def Reconnect(): + pulse.close() + Init() + +def GetDefaultOut(): + return pulse.server_info().default_sink_name + +def GetDefaultSink(): + sinkname=GetDefaultOut() + for sink in pulse.sink_list(): + if sink.name==sinkname: + return sink + +def GetDefaultVolume(): + return pulse.volume_get_all_chans(GetDefaultSink()) + +def SetDefaultVolume(volume): + pulse.volume_set_all_chans(GetDefaultSink(), volume) + +def IsDefaultMuted(): + return GetDefaultSink().mute<>0 + +def MuteDefault(mute = True): + return pulse.mute(GetDefaultSink(),mute) + +def AcquirePALock(): + tname=threading.current_thread().name + if debug: + print tname," aquiring action..." + paLock1.acquire() + if debug: + print tname," aquired action..." + pulse.event_listen_stop() + if debug: + print tname," event_listener stop command sent..." + print tname," aquiring loop..." + paLock2.acquire() + if debug: + print tname," aquired loop..." + +def ReleasePALock(): + tname=threading.current_thread().name + if debug: + print tname," releasing loop..." + paLock2.release() + if debug: + print tname," released loop..." + print tname," releasing action..." + paLock1.release() + if debug: + print tname," released action..." + +def AquireLoopLock(): + tname=threading.current_thread().name + if debug: + print tname," aquiring loop..." + paLock2.acquire() + if debug: + print tname," aquired loop..." + +def ReleaseLoopLock(): + tname=threading.current_thread().name + if debug: + print tname," releasing loop..." + paLock2.release() + if debug: + print tname," released loop..." + +def EventListener(callback): + pulse.event_mask_set('all') + pulse.event_callback_set(callback) + AquireLoopLock() + try: + pulse.event_listen() + finally: + ReleaseLoopLock() + +def EventProcess(ev): + raise PulseLoopStop + +def StateProcess(): + global sink_name,muted,volume + AcquirePALock() + try: + tname=threading.current_thread().name + current_sink=GetDefaultOut() + current_vol=round(GetDefaultVolume(),2) + current_muted="ON" if IsDefaultMuted() else "OFF" + if current_sink<>sink_name: + sink_name=current_sink + if debug: + print 'sink='+sink_name + if callback_changed: + if debug: + print 'callback...' + callback_changed('sink',sink_name) + if current_vol<>volume: + volume=current_vol + if debug: + print 'volume='+str(volume) + if callback_changed: + if debug: + print 'callback...' + callback_changed('volume',str(volume)) + if current_muted<>muted: + muted=current_muted + if debug: + print 'muted='+muted + if callback_changed: + if debug: + print 'callback...' + callback_changed('muted',muted) + finally: + ReleasePALock() + +def PAListener(): + while not stopFlag: + EventListener(EventProcess) + StateProcess() + +def RunBackground(process): + stopFlag=False + thread = threading.Thread(target=process,name="Background") + thread.start() + +def StopBackground(): + global stopFlag + stopFlag=True + pulse.event_listen_stop() + +def CommandGetDefaultOut(): + AcquirePALock() + try: + result=GetDefaultOut() + finally: + ReleasePALock() + return result + +def CommandGetDefaultVolume(): + AcquirePALock() + try: + result=GetDefaultVolume() + finally: + ReleasePALock() + return result + +def CommandIsDefaultMuted(): + AcquirePALock() + try: + result=IsDefaultMuted() + finally: + ReleasePALock() + return result + +def CommandSetDefaultVolume(volume): + AcquirePALock() + try: + SetDefaultVolume(volume) + finally: + ReleasePALock() + +def CommandMuteDefault(mute=True): + AcquirePALock() + try: + MuteDefault(mute) + finally: + ReleasePALock() + +# ====================== MQTT-related part ======================== + +lockMQTT = threading.RLock() + +def on_message(mosq, obj, msg): + if debug: + print("Received: " + msg.topic + " " + str(msg.payload)) + try: + subtopic=msg.topic[len(mqtt_topic_in)+1:] + payload=msg.payload + if subtopic=="volume": + CommandSetDefaultVolume(float(payload)) + elif subtopic=="muted": + if payload=="ON": + payload=True + elif payload=="OFF": + payload=False + else: + payload=int(payload) + CommandMuteDefault(payload) + else: + if debug: + print "Unknown command" + except: + if debug: + print "Command failed" + +def InitMQTT(): + + global client,mqtt_topic_in,mqtt_topic_out + + conffile = sys.argv[1] + + config = ConfigParser() + config.add_section('mqtt') + # set defaults for anonymous auth + config.set('mqtt', 'username', '') + config.set('mqtt', 'password', '') + config.set('mqtt', 'port', '1883') + config.set('mqtt', 'in', 'pulse/in') + config.set('mqtt', 'out', 'pulse/out') + config.read(conffile) + + mqtt_server = config.get('mqtt', 'server') + mqtt_port = config.getint('mqtt', 'port') + mqtt_username = config.get('mqtt', 'username') + mqtt_password = config.get('mqtt', 'password') + mqtt_topic_in = config.get('mqtt', 'in') + mqtt_topic_out = config.get('mqtt', 'out') + + client = paho.Client('pulse') + client.username_pw_set(mqtt_username, mqtt_password) + client.on_message=on_message + client.connect(mqtt_server, port=mqtt_port) + client.subscribe(mqtt_topic_in+'/#',1) + +def MQTTCallback(param,value): + + lockMQTT.acquire() + try: + client.publish(mqtt_topic_out+'/'+param, payload=value) + if debug: + print "Sent "+param+"="+value + finally: + lockMQTT.release() + +def StartPulseListener(): + global callback_changed + Init() + InitMQTT() + callback_changed=MQTTCallback + StateProcess() + RunBackground(PAListener) + +StartPulseListener() + +try: + while True: + try: + client.loop() + except KeyboardInterrupt: + break +finally: + StopBackground() diff --git a/mqtt-pulse/mqtt_pulse.pyc b/mqtt-pulse/mqtt_pulse.pyc new file mode 100644 index 0000000..af1559e Binary files /dev/null and b/mqtt-pulse/mqtt_pulse.pyc differ diff --git a/sensors-postprocess/openhab-db.conf b/sensors-postprocess/openhab-db.conf new file mode 100644 index 0000000..1a1c175 --- /dev/null +++ b/sensors-postprocess/openhab-db.conf @@ -0,0 +1,10 @@ +[mysql] +host=localhost +user=openhab +passwd=mypassword +db=openhab +[openhab] +template=Sensor% +[filter] +window=5 +threshold=10