+#!/usr/bin/python
+
+from pulsectl import Pulse,PulseLoopStop
+
+import threading
+from time import sleep
+
+import sys
+from ConfigParser import ConfigParser
+import paho.mqtt.client as paho
+
+# ====================== PulseAudio-related part ========================
+
+sink_name=None
+muted=None
+volume=None
+
+callback_changed=None
+
+paLock1=threading.RLock()
+paLock2=threading.RLock()
+stopFlag=False
+
+debug=False
+
+def Init():
+ global pulse
+ pulse = Pulse("mqtt-pa")
+
+def Reconnect():
+ pulse.close()
+ Init()
+
+def GetDefaultOut():
+ return pulse.server_info().default_sink_name
+
+def GetDefaultSink():
+ sinkname=GetDefaultOut()
+ for sink in pulse.sink_list():
+ if sink.name==sinkname:
+ return sink
+
+def GetDefaultVolume():
+ return pulse.volume_get_all_chans(GetDefaultSink())
+
+def SetDefaultVolume(volume):
+ pulse.volume_set_all_chans(GetDefaultSink(), volume)
+
+def IsDefaultMuted():
+ return GetDefaultSink().mute<>0
+
+def MuteDefault(mute = True):
+ return pulse.mute(GetDefaultSink(),mute)
+
+def AcquirePALock():
+ tname=threading.current_thread().name
+ if debug:
+ print tname," aquiring action..."
+ paLock1.acquire()
+ if debug:
+ print tname," aquired action..."
+ pulse.event_listen_stop()
+ if debug:
+ print tname," event_listener stop command sent..."
+ print tname," aquiring loop..."
+ paLock2.acquire()
+ if debug:
+ print tname," aquired loop..."
+
+def ReleasePALock():
+ tname=threading.current_thread().name
+ if debug:
+ print tname," releasing loop..."
+ paLock2.release()
+ if debug:
+ print tname," released loop..."
+ print tname," releasing action..."
+ paLock1.release()
+ if debug:
+ print tname," released action..."
+
+def AquireLoopLock():
+ tname=threading.current_thread().name
+ if debug:
+ print tname," aquiring loop..."
+ paLock2.acquire()
+ if debug:
+ print tname," aquired loop..."
+
+def ReleaseLoopLock():
+ tname=threading.current_thread().name
+ if debug:
+ print tname," releasing loop..."
+ paLock2.release()
+ if debug:
+ print tname," released loop..."
+
+def EventListener(callback):
+ pulse.event_mask_set('all')
+ pulse.event_callback_set(callback)
+ AquireLoopLock()
+ try:
+ pulse.event_listen()
+ finally:
+ ReleaseLoopLock()
+
+def EventProcess(ev):
+ raise PulseLoopStop
+
+def StateProcess():
+ global sink_name,muted,volume
+ AcquirePALock()
+ try:
+ tname=threading.current_thread().name
+ current_sink=GetDefaultOut()
+ current_vol=round(GetDefaultVolume(),2)
+ current_muted="ON" if IsDefaultMuted() else "OFF"
+ if current_sink<>sink_name:
+ sink_name=current_sink
+ if debug:
+ print 'sink='+sink_name
+ if callback_changed:
+ if debug:
+ print 'callback...'
+ callback_changed('sink',sink_name)
+ if current_vol<>volume:
+ volume=current_vol
+ if debug:
+ print 'volume='+str(volume)
+ if callback_changed:
+ if debug:
+ print 'callback...'
+ callback_changed('volume',str(volume))
+ if current_muted<>muted:
+ muted=current_muted
+ if debug:
+ print 'muted='+muted
+ if callback_changed:
+ if debug:
+ print 'callback...'
+ callback_changed('muted',muted)
+ finally:
+ ReleasePALock()
+
+def PAListener():
+ while not stopFlag:
+ EventListener(EventProcess)
+ StateProcess()
+
+def RunBackground(process):
+ stopFlag=False
+ thread = threading.Thread(target=process,name="Background")
+ thread.start()
+
+def StopBackground():
+ global stopFlag
+ stopFlag=True
+ pulse.event_listen_stop()
+
+def CommandGetDefaultOut():
+ AcquirePALock()
+ try:
+ result=GetDefaultOut()
+ finally:
+ ReleasePALock()
+ return result
+
+def CommandGetDefaultVolume():
+ AcquirePALock()
+ try:
+ result=GetDefaultVolume()
+ finally:
+ ReleasePALock()
+ return result
+
+def CommandIsDefaultMuted():
+ AcquirePALock()
+ try:
+ result=IsDefaultMuted()
+ finally:
+ ReleasePALock()
+ return result
+
+def CommandSetDefaultVolume(volume):
+ AcquirePALock()
+ try:
+ SetDefaultVolume(volume)
+ finally:
+ ReleasePALock()
+
+def CommandMuteDefault(mute=True):
+ AcquirePALock()
+ try:
+ MuteDefault(mute)
+ finally:
+ ReleasePALock()
+
+# ====================== MQTT-related part ========================
+
+lockMQTT = threading.RLock()
+
+def on_message(mosq, obj, msg):
+ if debug:
+ print("Received: " + msg.topic + " " + str(msg.payload))
+ try:
+ subtopic=msg.topic[len(mqtt_topic_in)+1:]
+ payload=msg.payload
+ if subtopic=="volume":
+ CommandSetDefaultVolume(float(payload))
+ elif subtopic=="muted":
+ if payload=="ON":
+ payload=True
+ elif payload=="OFF":
+ payload=False
+ else:
+ payload=int(payload)
+ CommandMuteDefault(payload)
+ else:
+ if debug:
+ print "Unknown command"
+ except:
+ if debug:
+ print "Command failed"
+
+def InitMQTT():
+
+ global client,mqtt_topic_in,mqtt_topic_out
+
+ conffile = sys.argv[1]
+
+ config = ConfigParser()
+ config.add_section('mqtt')
+ # set defaults for anonymous auth
+ config.set('mqtt', 'username', '')
+ config.set('mqtt', 'password', '')
+ config.set('mqtt', 'port', '1883')
+ config.set('mqtt', 'in', 'pulse/in')
+ config.set('mqtt', 'out', 'pulse/out')
+ config.read(conffile)
+
+ mqtt_server = config.get('mqtt', 'server')
+ mqtt_port = config.getint('mqtt', 'port')
+ mqtt_username = config.get('mqtt', 'username')
+ mqtt_password = config.get('mqtt', 'password')
+ mqtt_topic_in = config.get('mqtt', 'in')
+ mqtt_topic_out = config.get('mqtt', 'out')
+
+ client = paho.Client('pulse')
+ client.username_pw_set(mqtt_username, mqtt_password)
+ client.on_message=on_message
+ client.connect(mqtt_server, port=mqtt_port)
+ client.subscribe(mqtt_topic_in+'/#',1)
+
+def MQTTCallback(param,value):
+
+ lockMQTT.acquire()
+ try:
+ client.publish(mqtt_topic_out+'/'+param, payload=value)
+ if debug:
+ print "Sent "+param+"="+value
+ finally:
+ lockMQTT.release()
+
+def StartPulseListener():
+ global callback_changed
+ Init()
+ InitMQTT()
+ callback_changed=MQTTCallback
+ StateProcess()
+ RunBackground(PAListener)
+
+StartPulseListener()
+
+try:
+ while True:
+ try:
+ client.loop()
+ except KeyboardInterrupt:
+ break
+finally:
+ StopBackground()