From a2f5003eaf2c21902b091e12dd1a2c9fdbbe89b1 Mon Sep 17 00:00:00 2001 From: Roman Bazalevsky Date: Wed, 19 Oct 2016 17:06:47 +0300 Subject: [PATCH] =?utf8?q?=D0=92=20=D1=81=D0=B2=D1=8F=D0=B7=D0=B8=20=D1=81?= =?utf8?q?=20=D0=BC=D0=B8=D0=B3=D1=80=D0=B0=D1=86=D0=B8=D0=B5=D0=B9=20?= =?utf8?q?=D0=BD=D0=B0=20OpenHAB2=20=D1=81=20=D0=BD=D0=B0=D0=BC=D0=B5?= =?utf8?q?=D1=80=D1=82=D0=B2=D0=BE=20=D0=BF=D1=80=D0=B8=D0=B1=D0=B8=D1=82?= =?utf8?q?=D1=8B=D0=BC=D0=B8=20=D1=81=D0=BB=D0=B0=D0=B9=D0=B4=D0=B5=D1=80?= =?utf8?q?=D0=B0=D0=BC=D0=B8=200..100=20=D0=B2=D1=8B=D0=BF=D0=BE=D0=BB?= =?utf8?q?=D0=BD=D0=B5=D0=BD=20=D0=B0=D0=BD=D0=B0=D0=BB=D0=BE=D0=B3=D0=B8?= =?utf8?q?=D1=87=D0=BD=D1=8B=D0=B9=20=D0=BF=D0=B5=D1=80=D0=B5=D1=85=D0=BE?= =?utf8?q?=D0=B4=20=D0=B8=20=D1=82=D1=83=D1=82.?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- mqtt-pulse/mqtt-pulse | 285 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 285 insertions(+) create mode 100755 mqtt-pulse/mqtt-pulse diff --git a/mqtt-pulse/mqtt-pulse b/mqtt-pulse/mqtt-pulse new file mode 100755 index 0000000..54586d9 --- /dev/null +++ b/mqtt-pulse/mqtt-pulse @@ -0,0 +1,285 @@ +#!/usr/bin/python + +from pulsectl import Pulse,PulseLoopStop + +import threading +from time import sleep + +import sys +from ConfigParser import ConfigParser +import paho.mqtt.client as paho + +# ====================== PulseAudio-related part ======================== + +sink_name=None +muted=None +volume=None + +callback_changed=None + +paLock1=threading.RLock() +paLock2=threading.RLock() +stopFlag=False + +debug=False + +def Init(): + global pulse + pulse = Pulse("mqtt-pa") + +def Reconnect(): + pulse.close() + Init() + +def GetDefaultOut(): + return pulse.server_info().default_sink_name + +def GetDefaultSink(): + sinkname=GetDefaultOut() + for sink in pulse.sink_list(): + if sink.name==sinkname: + return sink + +def GetDefaultVolume(): + return pulse.volume_get_all_chans(GetDefaultSink()) + +def SetDefaultVolume(volume): + pulse.volume_set_all_chans(GetDefaultSink(), volume/100.0) + if debug: + print "volume set %s" % volume + +def IsDefaultMuted(): + return GetDefaultSink().mute<>0 + +def MuteDefault(mute = True): + pulse.mute(GetDefaultSink(),mute) + if debug: + print "mute set %s" % mute + +def AcquirePALock(): + tname=threading.current_thread().name + if debug: + print tname," aquiring action..." + paLock1.acquire() + if debug: + print tname," aquired action..." + pulse.event_listen_stop() + if debug: + print tname," event_listener stop command sent..." + print tname," aquiring loop..." + paLock2.acquire() + if debug: + print tname," aquired loop..." + +def ReleasePALock(): + tname=threading.current_thread().name + if debug: + print tname," releasing loop..." + paLock2.release() + if debug: + print tname," released loop..." + print tname," releasing action..." + paLock1.release() + if debug: + print tname," released action..." + +def AquireLoopLock(): + tname=threading.current_thread().name + if debug: + print tname," aquiring loop..." + paLock2.acquire() + if debug: + print tname," aquired loop..." + +def ReleaseLoopLock(): + tname=threading.current_thread().name + if debug: + print tname," releasing loop..." + paLock2.release() + if debug: + print tname," released loop..." + +def EventListener(callback): + pulse.event_mask_set('all') + pulse.event_callback_set(callback) + AquireLoopLock() + try: + pulse.event_listen() + finally: + ReleaseLoopLock() + +def EventProcess(ev): + raise PulseLoopStop + +def StateProcess(): + global sink_name,muted,volume + AcquirePALock() + try: + tname=threading.current_thread().name + current_sink=GetDefaultOut() + current_vol=round(GetDefaultVolume()*100,2) + current_muted="ON" if IsDefaultMuted() else "OFF" + if current_sink<>sink_name: + sink_name=current_sink + if debug: + print 'sink='+sink_name + if callback_changed: + if debug: + print 'callback...' + callback_changed('sink',sink_name) + if current_vol<>volume: + volume=current_vol + if debug: + print 'volume='+str(volume) + if callback_changed: + if debug: + print 'callback...' + callback_changed('volume',str(volume)) + if current_muted<>muted: + muted=current_muted + if debug: + print 'muted='+muted + if callback_changed: + if debug: + print 'callback...' + callback_changed('muted',muted) + finally: + ReleasePALock() + +def PAListener(): + while not stopFlag: + EventListener(EventProcess) + StateProcess() + +def RunBackground(process): + stopFlag=False + thread = threading.Thread(target=process,name="Background") + thread.start() + +def StopBackground(): + global stopFlag + stopFlag=True + pulse.event_listen_stop() + +def CommandGetDefaultOut(): + AcquirePALock() + try: + result=GetDefaultOut() + finally: + ReleasePALock() + return result + +def CommandGetDefaultVolume(): + AcquirePALock() + try: + result=GetDefaultVolume() + finally: + ReleasePALock() + return result + +def CommandIsDefaultMuted(): + AcquirePALock() + try: + result=IsDefaultMuted() + finally: + ReleasePALock() + return result + +def CommandSetDefaultVolume(volume): + AcquirePALock() + try: + SetDefaultVolume(volume) + finally: + ReleasePALock() + +def CommandMuteDefault(mute=True): + AcquirePALock() + try: + MuteDefault(mute) + finally: + ReleasePALock() + +# ====================== MQTT-related part ======================== + +lockMQTT = threading.RLock() + +def on_message(mosq, obj, msg): + if debug: + print("Received: " + msg.topic + " " + str(msg.payload)) + try: + subtopic=msg.topic[len(mqtt_topic_in)+1:] + payload=msg.payload + if subtopic=="volume": + CommandSetDefaultVolume(float(payload)) + elif subtopic=="muted": + if payload=="ON": + payload=True + elif payload=="OFF": + payload=False + else: + payload=int(payload) + CommandMuteDefault(payload) + else: + if debug: + print "Unknown command" + except: + if debug: + print "Command failed" + +def InitMQTT(): + + global client,mqtt_topic_in,mqtt_topic_out + + conffile = sys.argv[1] + + config = ConfigParser() + config.add_section('mqtt') + # set defaults for anonymous auth + config.set('mqtt', 'username', '') + config.set('mqtt', 'password', '') + config.set('mqtt', 'port', '1883') + config.set('mqtt', 'in', 'pulse/in') + config.set('mqtt', 'out', 'pulse/out') + config.read(conffile) + + mqtt_server = config.get('mqtt', 'server') + mqtt_port = config.getint('mqtt', 'port') + mqtt_username = config.get('mqtt', 'username') + mqtt_password = config.get('mqtt', 'password') + mqtt_topic_in = config.get('mqtt', 'in') + mqtt_topic_out = config.get('mqtt', 'out') + + client = paho.Client('pulse') + client.username_pw_set(mqtt_username, mqtt_password) + client.on_message=on_message + client.connect(mqtt_server, port=mqtt_port) + client.subscribe(mqtt_topic_in+'/#',1) + +def MQTTCallback(param,value): + + lockMQTT.acquire() + try: + client.publish(mqtt_topic_out+'/'+param, payload=value) + if debug: + print "Sent "+param+"="+value + finally: + lockMQTT.release() + +def StartPulseListener(): + global callback_changed + Init() + InitMQTT() + callback_changed=MQTTCallback + StateProcess() + RunBackground(PAListener) + +StartPulseListener() + +try: + while True: + try: + client.loop() + except KeyboardInterrupt: + break +finally: + StopBackground() -- 2.34.1