В связи с миграцией на OpenHAB2 с намертво прибитыми слайдерами 0..100 выполнен анало...
authorRoman Bazalevsky <rvb@rvb.name>
Wed, 19 Oct 2016 14:06:47 +0000 (17:06 +0300)
committerRoman Bazalevsky <rvb@rvb.name>
Wed, 19 Oct 2016 14:06:47 +0000 (17:06 +0300)
mqtt-pulse/mqtt-pulse [new file with mode: 0755]

diff --git a/mqtt-pulse/mqtt-pulse b/mqtt-pulse/mqtt-pulse
new file mode 100755 (executable)
index 0000000..54586d9
--- /dev/null
@@ -0,0 +1,285 @@
+#!/usr/bin/python
+
+from pulsectl import Pulse,PulseLoopStop
+
+import threading
+from time import sleep
+
+import sys
+from ConfigParser import ConfigParser
+import paho.mqtt.client as paho
+
+# ====================== PulseAudio-related part ========================
+
+sink_name=None
+muted=None
+volume=None
+
+callback_changed=None
+
+paLock1=threading.RLock()
+paLock2=threading.RLock()
+stopFlag=False
+
+debug=False
+
+def Init():
+  global pulse
+  pulse = Pulse("mqtt-pa")
+
+def Reconnect():
+  pulse.close()
+  Init()
+
+def GetDefaultOut():
+  return pulse.server_info().default_sink_name
+
+def GetDefaultSink():
+  sinkname=GetDefaultOut()
+  for sink in pulse.sink_list():
+    if sink.name==sinkname:
+      return sink
+
+def GetDefaultVolume():
+  return pulse.volume_get_all_chans(GetDefaultSink())
+
+def SetDefaultVolume(volume):
+  pulse.volume_set_all_chans(GetDefaultSink(), volume/100.0)
+  if debug:
+    print "volume set %s" % volume
+
+def IsDefaultMuted():
+  return GetDefaultSink().mute<>0
+  
+def MuteDefault(mute = True):
+  pulse.mute(GetDefaultSink(),mute)
+  if debug:
+    print "mute set %s" % mute
+
+def AcquirePALock():
+  tname=threading.current_thread().name
+  if debug:
+    print tname," aquiring action..."
+  paLock1.acquire()
+  if debug:
+    print tname," aquired action..."
+  pulse.event_listen_stop()
+  if debug:
+    print tname," event_listener stop command sent..."
+    print tname," aquiring loop..."
+  paLock2.acquire()
+  if debug:
+    print tname," aquired loop..."
+
+def ReleasePALock():
+  tname=threading.current_thread().name
+  if debug:
+    print tname," releasing loop..."
+  paLock2.release()
+  if debug:
+    print tname," released loop..."
+    print tname," releasing action..."
+  paLock1.release()
+  if debug:
+    print tname," released action..."
+
+def AquireLoopLock():
+  tname=threading.current_thread().name
+  if debug:
+    print tname," aquiring loop..."
+  paLock2.acquire()
+  if debug:
+    print tname," aquired loop..."
+
+def ReleaseLoopLock():
+  tname=threading.current_thread().name
+  if debug:
+    print tname," releasing loop..."
+  paLock2.release()
+  if debug:
+    print tname," released loop..."
+
+def EventListener(callback):
+  pulse.event_mask_set('all')
+  pulse.event_callback_set(callback)
+  AquireLoopLock()
+  try:
+    pulse.event_listen()
+  finally:
+    ReleaseLoopLock()
+
+def EventProcess(ev):
+  raise PulseLoopStop
+
+def StateProcess():
+  global sink_name,muted,volume
+  AcquirePALock()
+  try:
+    tname=threading.current_thread().name
+    current_sink=GetDefaultOut()
+    current_vol=round(GetDefaultVolume()*100,2)
+    current_muted="ON" if IsDefaultMuted() else "OFF"
+    if current_sink<>sink_name:
+      sink_name=current_sink
+      if debug:
+        print 'sink='+sink_name
+      if callback_changed:
+        if debug:
+          print 'callback...'
+        callback_changed('sink',sink_name)
+    if current_vol<>volume:
+      volume=current_vol
+      if debug:
+        print 'volume='+str(volume)
+      if callback_changed:
+        if debug:
+          print 'callback...'
+        callback_changed('volume',str(volume))
+    if current_muted<>muted:
+      muted=current_muted
+      if debug:
+        print 'muted='+muted
+      if callback_changed:
+        if debug:
+          print 'callback...'
+        callback_changed('muted',muted)
+  finally:
+    ReleasePALock()
+
+def PAListener():
+  while not stopFlag:    
+    EventListener(EventProcess)
+    StateProcess()
+
+def RunBackground(process):
+  stopFlag=False
+  thread = threading.Thread(target=process,name="Background")
+  thread.start()
+
+def StopBackground():
+  global stopFlag
+  stopFlag=True
+  pulse.event_listen_stop()
+
+def CommandGetDefaultOut():
+  AcquirePALock()
+  try:
+    result=GetDefaultOut()
+  finally:
+    ReleasePALock()
+  return result
+
+def CommandGetDefaultVolume():
+  AcquirePALock()
+  try:
+    result=GetDefaultVolume()
+  finally:
+    ReleasePALock()
+  return result
+
+def CommandIsDefaultMuted():
+  AcquirePALock()
+  try:
+    result=IsDefaultMuted()
+  finally:
+    ReleasePALock()
+  return result
+
+def CommandSetDefaultVolume(volume):
+  AcquirePALock()
+  try:
+    SetDefaultVolume(volume)    
+  finally:
+    ReleasePALock()
+
+def CommandMuteDefault(mute=True):
+  AcquirePALock()
+  try:
+    MuteDefault(mute)    
+  finally:
+    ReleasePALock()
+
+# ====================== MQTT-related part ========================
+
+lockMQTT = threading.RLock()
+
+def on_message(mosq, obj, msg):
+  if debug:
+    print("Received: " + msg.topic + " " + str(msg.payload))  
+  try:
+    subtopic=msg.topic[len(mqtt_topic_in)+1:]
+    payload=msg.payload
+    if subtopic=="volume":
+      CommandSetDefaultVolume(float(payload))
+    elif subtopic=="muted":
+      if payload=="ON":
+        payload=True
+      elif payload=="OFF":
+        payload=False
+      else:
+        payload=int(payload)  
+      CommandMuteDefault(payload)
+    else:
+      if debug:
+        print "Unknown command"  
+  except:
+    if debug:
+      print "Command failed"    
+  
+def InitMQTT():
+
+  global client,mqtt_topic_in,mqtt_topic_out
+
+  conffile = sys.argv[1]
+
+  config = ConfigParser()
+  config.add_section('mqtt')
+  # set defaults for anonymous auth
+  config.set('mqtt', 'username', '')
+  config.set('mqtt', 'password', '')
+  config.set('mqtt', 'port', '1883')
+  config.set('mqtt', 'in', 'pulse/in')
+  config.set('mqtt', 'out', 'pulse/out')
+  config.read(conffile)
+
+  mqtt_server = config.get('mqtt', 'server')
+  mqtt_port = config.getint('mqtt', 'port')
+  mqtt_username = config.get('mqtt', 'username')
+  mqtt_password = config.get('mqtt', 'password')
+  mqtt_topic_in = config.get('mqtt', 'in')
+  mqtt_topic_out = config.get('mqtt', 'out')
+
+  client = paho.Client('pulse')
+  client.username_pw_set(mqtt_username, mqtt_password)
+  client.on_message=on_message
+  client.connect(mqtt_server, port=mqtt_port)
+  client.subscribe(mqtt_topic_in+'/#',1)
+
+def MQTTCallback(param,value):
+
+  lockMQTT.acquire()
+  try:
+    client.publish(mqtt_topic_out+'/'+param, payload=value)
+    if debug:
+      print "Sent "+param+"="+value
+  finally:
+    lockMQTT.release()
+
+def StartPulseListener():
+  global callback_changed
+  Init()
+  InitMQTT()
+  callback_changed=MQTTCallback
+  StateProcess()
+  RunBackground(PAListener)
+
+StartPulseListener()
+
+try:
+  while True:
+    try:
+      client.loop()
+    except KeyboardInterrupt:
+      break
+finally:
+  StopBackground()