#!/usr/bin/python from pulsectl import Pulse,PulseLoopStop import threading from time import sleep import sys from ConfigParser import ConfigParser import paho.mqtt.client as paho # ====================== PulseAudio-related part ======================== sink_name=None muted=None volume=None callback_changed=None paLock1=threading.RLock() paLock2=threading.RLock() stopFlag=False debug=False def Init(): global pulse pulse = Pulse("mqtt-pa") def Reconnect(): pulse.close() Init() def GetDefaultOut(): return pulse.server_info().default_sink_name def GetDefaultSink(): sinkname=GetDefaultOut() for sink in pulse.sink_list(): if sink.name==sinkname: return sink def GetDefaultVolume(): return pulse.volume_get_all_chans(GetDefaultSink()) def SetDefaultVolume(volume): pulse.volume_set_all_chans(GetDefaultSink(), volume/100.0) if debug: print "volume set %s" % volume def IsDefaultMuted(): return GetDefaultSink().mute<>0 def MuteDefault(mute = True): pulse.mute(GetDefaultSink(),mute) if debug: print "mute set %s" % mute def AcquirePALock(): tname=threading.current_thread().name if debug: print tname," aquiring action..." paLock1.acquire() if debug: print tname," aquired action..." pulse.event_listen_stop() if debug: print tname," event_listener stop command sent..." print tname," aquiring loop..." paLock2.acquire() if debug: print tname," aquired loop..." def ReleasePALock(): tname=threading.current_thread().name if debug: print tname," releasing loop..." paLock2.release() if debug: print tname," released loop..." print tname," releasing action..." paLock1.release() if debug: print tname," released action..." def AquireLoopLock(): tname=threading.current_thread().name if debug: print tname," aquiring loop..." paLock2.acquire() if debug: print tname," aquired loop..." def ReleaseLoopLock(): tname=threading.current_thread().name if debug: print tname," releasing loop..." paLock2.release() if debug: print tname," released loop..." def EventListener(callback): pulse.event_mask_set('all') pulse.event_callback_set(callback) AquireLoopLock() try: pulse.event_listen() finally: ReleaseLoopLock() def EventProcess(ev): raise PulseLoopStop def StateProcess(): global sink_name,muted,volume AcquirePALock() try: tname=threading.current_thread().name current_sink=GetDefaultOut() current_vol=round(GetDefaultVolume()*100,2) current_muted="ON" if IsDefaultMuted() else "OFF" if current_sink<>sink_name: sink_name=current_sink if debug: print 'sink='+sink_name if callback_changed: if debug: print 'callback...' callback_changed('sink',sink_name) if current_vol<>volume: volume=current_vol if debug: print 'volume='+str(volume) if callback_changed: if debug: print 'callback...' callback_changed('volume',str(volume)) if current_muted<>muted: muted=current_muted if debug: print 'muted='+muted if callback_changed: if debug: print 'callback...' callback_changed('muted',muted) finally: ReleasePALock() def PAListener(): while not stopFlag: EventListener(EventProcess) StateProcess() def RunBackground(process): stopFlag=False thread = threading.Thread(target=process,name="Background") thread.start() def StopBackground(): global stopFlag stopFlag=True pulse.event_listen_stop() def CommandGetDefaultOut(): AcquirePALock() try: result=GetDefaultOut() finally: ReleasePALock() return result def CommandGetDefaultVolume(): AcquirePALock() try: result=GetDefaultVolume() finally: ReleasePALock() return result def CommandIsDefaultMuted(): AcquirePALock() try: result=IsDefaultMuted() finally: ReleasePALock() return result def CommandSetDefaultVolume(volume): AcquirePALock() try: SetDefaultVolume(volume) finally: ReleasePALock() def CommandMuteDefault(mute=True): AcquirePALock() try: MuteDefault(mute) finally: ReleasePALock() # ====================== MQTT-related part ======================== lockMQTT = threading.RLock() def on_message(mosq, obj, msg): if debug: print("Received: " + msg.topic + " " + str(msg.payload)) try: subtopic=msg.topic[len(mqtt_topic_in)+1:] payload=msg.payload if subtopic=="volume": CommandSetDefaultVolume(float(payload)) elif subtopic=="muted": if payload=="ON": payload=True elif payload=="OFF": payload=False else: payload=int(payload) CommandMuteDefault(payload) else: if debug: print "Unknown command" except: if debug: print "Command failed" def InitMQTT(): global client,mqtt_topic_in,mqtt_topic_out conffile = sys.argv[1] config = ConfigParser() config.add_section('mqtt') # set defaults for anonymous auth config.set('mqtt', 'username', '') config.set('mqtt', 'password', '') config.set('mqtt', 'port', '1883') config.set('mqtt', 'in', 'pulse/in') config.set('mqtt', 'out', 'pulse/out') config.read(conffile) mqtt_server = config.get('mqtt', 'server') mqtt_port = config.getint('mqtt', 'port') mqtt_username = config.get('mqtt', 'username') mqtt_password = config.get('mqtt', 'password') mqtt_topic_in = config.get('mqtt', 'in') mqtt_topic_out = config.get('mqtt', 'out') client = paho.Client('pulse') client.username_pw_set(mqtt_username, mqtt_password) client.on_message=on_message client.connect(mqtt_server, port=mqtt_port) client.subscribe(mqtt_topic_in+'/#',1) def MQTTCallback(param,value): lockMQTT.acquire() try: client.publish(mqtt_topic_out+'/'+param, payload=value) if debug: print "Sent "+param+"="+value finally: lockMQTT.release() def StartPulseListener(): global callback_changed Init() InitMQTT() callback_changed=MQTTCallback StateProcess() RunBackground(PAListener) StartPulseListener() try: while True: try: client.loop() except KeyboardInterrupt: break finally: StopBackground()