--- /dev/null
+#!/usr/bin/env python
+
+#
+# Usage:
+# AGI(/etc/asterisk/agi/mqtt,/etc/asterisk/agi/mqtt.cfg,calls/missed)
+# or, to override the extension:
+# AGI(/etc/asterisk/agi/mqtt,/etc/asterisk/agi/mqtt.cfg,calls/missed,+43123456789)
+#
+
+import sys
+from ConfigParser import ConfigParser
+import paho.mqtt.client as paho
+
+import json
+
+from pprint import pprint
+
+conffile, topic = sys.argv[1:3]
+
+config = ConfigParser()
+config.add_section('mqtt')
+# set defaults for anonymous auth
+config.set('mqtt', 'username', '')
+config.set('mqtt', 'password', '')
+config.set('mqtt', 'port', '1883')
+config.read(conffile)
+
+mqtt_server = config.get('mqtt', 'server')
+mqtt_port = config.getint('mqtt', 'port')
+mqtt_username = config.get('mqtt', 'username')
+mqtt_password = config.get('mqtt', 'password')
+
+agi = []
+while 1:
+ line = sys.stdin.readline()
+ if not line or line == "\n":
+ break
+ agi.append(line)
+
+agi = dict([line.rstrip('\n').replace('agi_', '', 1).split(': ', 1) for line in agi])
+
+if len(sys.argv) > 3:
+ agi['extension'] = sys.argv[3]
+
+def agi_exit(rc, *args):
+ if rc != 0:
+ print "VERBOSE rc=%s %s" % (rc, args)
+ sys.exit(rc)
+
+def on_connect(mosq, rc, *args):
+ if rc != 0:
+ agi_exit(1, "Connection failed: %d" % rc)
+
+def on_publish(mosq, *args):
+ # done
+ agi_exit(0)
+
+client = paho.Client('agi')
+client.username_pw_set(mqtt_username, mqtt_password)
+client.connect(mqtt_server, port=mqtt_port)
+client.on_connect = on_connect
+client.on_publish = on_publish
+
+client.publish(topic, payload=json.dumps(agi))
+client.loop()
+agi_exit(1, "Message publish timed out")
--- /dev/null
+
+*** These modified files were found in JOE when it aborted on Fri Sep 23 20:31:28 2016
+*** JOE was aborted because the terminal closed
+
+*** Файл '(БезИмени)'
+mqtt_pulse.py
+
+*** Файл 'mqtt_pulse.py'
+#!/usr/bin/python
+
+from pulsectl import Pulse,PulseLoopStop
+
+import threading
+
+sink_name=None
+muted=False
+volume=None
+
+paLock1=threading.RLock()
+paLock2=threading.RLock()
+stopFlag=False
+
+def Init():
+ global pulse
+ pulse = Pulse("mqtt-pa")
+
+def Reconnect():
+ pulse.close()
+ Init()
+
+def GetDefaultOut():
+ return pulse.server_info().default_sink_name
+
+def GetDefaultSink():
+ sinkname=GetDefaultOut()
+ for sink in pulse.sink_list():
+ if sink.name==sinkname:
+ return sink
+
+def GetDefaultVolume():
+ return pulse.volume_get_all_chans(GetDefaultSink())
+
+def IsDefaultMuted():
+ return GetDefaultSink().mute<>0
+
+def MuteDefault(mute = True):
+ return pulse.mute(GetDefaultSink(),mute)
+
+def EventListener(callback):
+ pulse.event_mask_set('all')
+ pulse.event_callback_set(callback)
+ pulse.event_listen()
+
+def EventProcess(ev):
+ raise PulseLoopStop
+
+def AcquirePALock():
+ paLock1.acquire()
+
+ pulse.event_listen_stop()
+ paLock2.acquire()
+
+def ReleasePALock():
+ paLock2.release()
+ paLock1.release()
+
+def StateProcess():
+ global sink_name,muted,volume
+ try:
+ tname=threading.current_thread().name
+# print tname+">trying to aquire lock"
+ AcquirePALock()
+# print tname+">lock aquired"
+ current_sink=GetDefaultOut()
+ current_vol=round(GetDefaultVolume(),2)
+ current_muted=IsDefaultMuted()
+ if current_sink<>sink_name:
+ sink_name=current_sink
+ print tname+">sink: "+sink_name
+ if current_vol<>volume:
+ volume=current_vol
+ print tname+">volume: "+str(volume)
+ if current_muted<>muted:
+ muted=current_muted
+ print tname+">muted: "+str(muted)
+ finally:
+ ReleasePALock()
+# print tname+">lock released"
+
+def PAListener():
+ while not stopFlag:
+# print "entering wait loop"
+ EventListener(EventProcess)
+# print "event or break happened"
+ StateProcess()
+
+def RunBackground(process):
+ stopFlag=False
+ thread = threading.Thread(target=process,name="Background")
+ thread.start()
+
+def StopBackground():
+ global stopFlag
+ stopFlag=True
+ pulse.event_listen_stop()
+
+def CommandGetDefaultOut():
+ try:
+ tname=threading.current_thread().name
+# print tname+">trying to aquire lock"
+ AcquirePALock()
+# print tname+">lock aquired"
+ result=GetDefaultOut()
+ finally:
+ ReleasePALock()
+# print tname+">lock released"
+ return result
+
+def CommandGetDefaultVolume():
+ try:
+ tname=threading.current_thread().name
+# print tname+">trying to aquire lock"
+ AcquirePALock()
+# print tname+">lock aquired"
+ result=GetDefaultVolume()
+ finally:
+ ReleasePALock()
+# print tname+">lock released"
+ return result
+
+def CommandIsDefaultMuted():
+ try:
+ tname=threading.current_thread().name
+# print tname+">trying to aquire lock"
+ AcquirePALock()
+# print tname+">lock aquired"
+ result=IsDefaultMuted()
+ finally:
+ ReleasePALock()
+# print tname+">lock released"
+ return result
+
+*** Файл '* Startup Log *'
+Processing '/etc/joe/joerc'...
+Processing '/etc/joe/ftyperc'...
+Finished processing /etc/joe/ftyperc
+Finished processing /etc/joe/joerc
--- /dev/null
+#!/usr/bin/python
+
+from pulsectl import Pulse,PulseLoopStop
+
+import threading
+from time import sleep
+
+import sys
+from ConfigParser import ConfigParser
+import paho.mqtt.client as paho
+
+# ====================== PulseAudio-related part ========================
+
+sink_name=None
+muted=None
+volume=None
+
+callback_changed=None
+
+paLock1=threading.RLock()
+paLock2=threading.RLock()
+stopFlag=False
+
+debug=False
+
+def Init():
+ global pulse
+ pulse = Pulse("mqtt-pa")
+
+def Reconnect():
+ pulse.close()
+ Init()
+
+def GetDefaultOut():
+ return pulse.server_info().default_sink_name
+
+def GetDefaultSink():
+ sinkname=GetDefaultOut()
+ for sink in pulse.sink_list():
+ if sink.name==sinkname:
+ return sink
+
+def GetDefaultVolume():
+ return pulse.volume_get_all_chans(GetDefaultSink())
+
+def SetDefaultVolume(volume):
+ pulse.volume_set_all_chans(GetDefaultSink(), volume)
+
+def IsDefaultMuted():
+ return GetDefaultSink().mute<>0
+
+def MuteDefault(mute = True):
+ return pulse.mute(GetDefaultSink(),mute)
+
+def AcquirePALock():
+ tname=threading.current_thread().name
+ if debug:
+ print tname," aquiring action..."
+ paLock1.acquire()
+ if debug:
+ print tname," aquired action..."
+ pulse.event_listen_stop()
+ if debug:
+ print tname," event_listener stop command sent..."
+ print tname," aquiring loop..."
+ paLock2.acquire()
+ if debug:
+ print tname," aquired loop..."
+
+def ReleasePALock():
+ tname=threading.current_thread().name
+ if debug:
+ print tname," releasing loop..."
+ paLock2.release()
+ if debug:
+ print tname," released loop..."
+ print tname," releasing action..."
+ paLock1.release()
+ if debug:
+ print tname," released action..."
+
+def AquireLoopLock():
+ tname=threading.current_thread().name
+ if debug:
+ print tname," aquiring loop..."
+ paLock2.acquire()
+ if debug:
+ print tname," aquired loop..."
+
+def ReleaseLoopLock():
+ tname=threading.current_thread().name
+ if debug:
+ print tname," releasing loop..."
+ paLock2.release()
+ if debug:
+ print tname," released loop..."
+
+def EventListener(callback):
+ pulse.event_mask_set('all')
+ pulse.event_callback_set(callback)
+ AquireLoopLock()
+ try:
+ pulse.event_listen()
+ finally:
+ ReleaseLoopLock()
+
+def EventProcess(ev):
+ raise PulseLoopStop
+
+def StateProcess():
+ global sink_name,muted,volume
+ AcquirePALock()
+ try:
+ tname=threading.current_thread().name
+ current_sink=GetDefaultOut()
+ current_vol=round(GetDefaultVolume(),2)
+ current_muted="ON" if IsDefaultMuted() else "OFF"
+ if current_sink<>sink_name:
+ sink_name=current_sink
+ if debug:
+ print 'sink='+sink_name
+ if callback_changed:
+ if debug:
+ print 'callback...'
+ callback_changed('sink',sink_name)
+ if current_vol<>volume:
+ volume=current_vol
+ if debug:
+ print 'volume='+str(volume)
+ if callback_changed:
+ if debug:
+ print 'callback...'
+ callback_changed('volume',str(volume))
+ if current_muted<>muted:
+ muted=current_muted
+ if debug:
+ print 'muted='+muted
+ if callback_changed:
+ if debug:
+ print 'callback...'
+ callback_changed('muted',muted)
+ finally:
+ ReleasePALock()
+
+def PAListener():
+ while not stopFlag:
+ EventListener(EventProcess)
+ StateProcess()
+
+def RunBackground(process):
+ stopFlag=False
+ thread = threading.Thread(target=process,name="Background")
+ thread.start()
+
+def StopBackground():
+ global stopFlag
+ stopFlag=True
+ pulse.event_listen_stop()
+
+def CommandGetDefaultOut():
+ AcquirePALock()
+ try:
+ result=GetDefaultOut()
+ finally:
+ ReleasePALock()
+ return result
+
+def CommandGetDefaultVolume():
+ AcquirePALock()
+ try:
+ result=GetDefaultVolume()
+ finally:
+ ReleasePALock()
+ return result
+
+def CommandIsDefaultMuted():
+ AcquirePALock()
+ try:
+ result=IsDefaultMuted()
+ finally:
+ ReleasePALock()
+ return result
+
+def CommandSetDefaultVolume(volume):
+ AcquirePALock()
+ try:
+ SetDefaultVolume(volume)
+ finally:
+ ReleasePALock()
+
+def CommandMuteDefault(mute=True):
+ AcquirePALock()
+ try:
+ MuteDefault(mute)
+ finally:
+ ReleasePALock()
+
+# ====================== MQTT-related part ========================
+
+lockMQTT = threading.RLock()
+
+def on_message(mosq, obj, msg):
+ if debug:
+ print("Received: " + msg.topic + " " + str(msg.payload))
+ try:
+ subtopic=msg.topic[len(mqtt_topic_in)+1:]
+ payload=msg.payload
+ if subtopic=="volume":
+ CommandSetDefaultVolume(float(payload))
+ elif subtopic=="muted":
+ if payload=="ON":
+ payload=True
+ elif payload=="OFF":
+ payload=False
+ else:
+ payload=int(payload)
+ CommandMuteDefault(payload)
+ else:
+ if debug:
+ print "Unknown command"
+ except:
+ if debug:
+ print "Command failed"
+
+def InitMQTT():
+
+ global client,mqtt_topic_in,mqtt_topic_out
+
+ conffile = sys.argv[1]
+
+ config = ConfigParser()
+ config.add_section('mqtt')
+ # set defaults for anonymous auth
+ config.set('mqtt', 'username', '')
+ config.set('mqtt', 'password', '')
+ config.set('mqtt', 'port', '1883')
+ config.set('mqtt', 'in', 'pulse/in')
+ config.set('mqtt', 'out', 'pulse/out')
+ config.read(conffile)
+
+ mqtt_server = config.get('mqtt', 'server')
+ mqtt_port = config.getint('mqtt', 'port')
+ mqtt_username = config.get('mqtt', 'username')
+ mqtt_password = config.get('mqtt', 'password')
+ mqtt_topic_in = config.get('mqtt', 'in')
+ mqtt_topic_out = config.get('mqtt', 'out')
+
+ client = paho.Client('pulse')
+ client.username_pw_set(mqtt_username, mqtt_password)
+ client.on_message=on_message
+ client.connect(mqtt_server, port=mqtt_port)
+ client.subscribe(mqtt_topic_in+'/#',1)
+
+def MQTTCallback(param,value):
+
+ lockMQTT.acquire()
+ try:
+ client.publish(mqtt_topic_out+'/'+param, payload=value)
+ if debug:
+ print "Sent "+param+"="+value
+ finally:
+ lockMQTT.release()
+
+def StartPulseListener():
+ global callback_changed
+ Init()
+ InitMQTT()
+ callback_changed=MQTTCallback
+ StateProcess()
+ RunBackground(PAListener)
+
+StartPulseListener()
+
+try:
+ while True:
+ try:
+ client.loop()
+ except KeyboardInterrupt:
+ break
+finally:
+ StopBackground()