1) MQTT-интерфейс к Asterisk для отслеживания состояний звонков
authorRoman Bazalevsky <rvb@rvb.name>
Fri, 23 Sep 2016 19:20:58 +0000 (22:20 +0300)
committerRoman Bazalevsky <rvb@rvb.name>
Fri, 23 Sep 2016 19:20:58 +0000 (22:20 +0300)
2) MQTT-интерфейс к PulseAudio (работает в две стороны)

mqtt-agi/mqtt [new file with mode: 0755]
mqtt-agi/mqtt.cfg [new file with mode: 0644]
mqtt-mpd/mqmpd.cfg~ [deleted file]
mqtt-pulse/DEADJOE [new file with mode: 0644]
mqtt-pulse/mqtt.cfg [new file with mode: 0644]
mqtt-pulse/mqtt_pulse.py [new file with mode: 0755]
mqtt-pulse/mqtt_pulse.pyc [new file with mode: 0644]
sensors-postprocess/openhab-db.conf [new file with mode: 0644]

diff --git a/mqtt-agi/mqtt b/mqtt-agi/mqtt
new file mode 100755 (executable)
index 0000000..6f69c7e
--- /dev/null
@@ -0,0 +1,66 @@
+#!/usr/bin/env python
+
+#
+# Usage:
+# AGI(/etc/asterisk/agi/mqtt,/etc/asterisk/agi/mqtt.cfg,calls/missed)
+# or, to override the extension:
+# AGI(/etc/asterisk/agi/mqtt,/etc/asterisk/agi/mqtt.cfg,calls/missed,+43123456789)
+#
+
+import sys
+from ConfigParser import ConfigParser
+import paho.mqtt.client as paho
+
+import json
+
+from pprint import pprint
+
+conffile, topic = sys.argv[1:3]
+
+config = ConfigParser()
+config.add_section('mqtt')
+# set defaults for anonymous auth
+config.set('mqtt', 'username', '')
+config.set('mqtt', 'password', '')
+config.set('mqtt', 'port', '1883')
+config.read(conffile)
+
+mqtt_server = config.get('mqtt', 'server')
+mqtt_port = config.getint('mqtt', 'port')
+mqtt_username = config.get('mqtt', 'username')
+mqtt_password = config.get('mqtt', 'password')
+
+agi = []
+while 1:
+    line = sys.stdin.readline()
+    if not line or line == "\n":
+        break
+    agi.append(line)
+
+agi = dict([line.rstrip('\n').replace('agi_', '', 1).split(': ', 1) for line in agi])
+
+if len(sys.argv) > 3:
+    agi['extension'] = sys.argv[3]
+
+def agi_exit(rc, *args):
+    if rc != 0:
+        print "VERBOSE rc=%s %s" % (rc, args)
+    sys.exit(rc)
+
+def on_connect(mosq, rc, *args):
+    if rc != 0:
+        agi_exit(1, "Connection failed: %d" % rc)
+
+def on_publish(mosq, *args):
+    # done
+    agi_exit(0)
+
+client = paho.Client('agi')
+client.username_pw_set(mqtt_username, mqtt_password)
+client.connect(mqtt_server, port=mqtt_port)
+client.on_connect = on_connect
+client.on_publish = on_publish
+
+client.publish(topic, payload=json.dumps(agi))
+client.loop()
+agi_exit(1, "Message publish timed out")
diff --git a/mqtt-agi/mqtt.cfg b/mqtt-agi/mqtt.cfg
new file mode 100644 (file)
index 0000000..c8e33a0
--- /dev/null
@@ -0,0 +1,4 @@
+[mqtt]
+server = localhost
+username = asterisk
+password = mypassword
diff --git a/mqtt-mpd/mqmpd.cfg~ b/mqtt-mpd/mqmpd.cfg~
deleted file mode 100644 (file)
index 51dec1e..0000000
+++ /dev/null
@@ -1,8 +0,0 @@
-mqhost=localhost
-mqport=1883
-mquser="mpd"
-mqpassword="wsufytcvtldtltv"
-mqtopic="mpd/out"
-mqcmd="mpd/in"
-hosts=("estia" "nefele" "orpheus")
-passwd=("gdetotamdaleko" "11093008" "malenkayakorobka")
diff --git a/mqtt-pulse/DEADJOE b/mqtt-pulse/DEADJOE
new file mode 100644 (file)
index 0000000..7069859
--- /dev/null
@@ -0,0 +1,147 @@
+
+*** These modified files were found in JOE when it aborted on Fri Sep 23 20:31:28 2016
+*** JOE was aborted because the terminal closed
+
+*** Файл '(БезИмени)'
+mqtt_pulse.py
+
+*** Файл 'mqtt_pulse.py'
+#!/usr/bin/python
+
+from pulsectl import Pulse,PulseLoopStop
+
+import threading
+
+sink_name=None
+muted=False
+volume=None
+
+paLock1=threading.RLock()
+paLock2=threading.RLock()
+stopFlag=False
+
+def Init():
+  global pulse
+  pulse = Pulse("mqtt-pa")
+
+def Reconnect():
+  pulse.close()
+  Init()
+
+def GetDefaultOut():
+  return pulse.server_info().default_sink_name
+
+def GetDefaultSink():
+  sinkname=GetDefaultOut()
+  for sink in pulse.sink_list():
+    if sink.name==sinkname:
+      return sink
+
+def GetDefaultVolume():
+  return pulse.volume_get_all_chans(GetDefaultSink())
+
+def IsDefaultMuted():
+  return GetDefaultSink().mute<>0
+  
+def MuteDefault(mute = True):
+  return pulse.mute(GetDefaultSink(),mute)
+
+def EventListener(callback):
+  pulse.event_mask_set('all')
+  pulse.event_callback_set(callback)
+  pulse.event_listen()
+
+def EventProcess(ev):
+  raise PulseLoopStop
+
+def AcquirePALock():
+  paLock1.acquire()
+  
+  pulse.event_listen_stop()
+  paLock2.acquire()
+
+def ReleasePALock():
+  paLock2.release()
+  paLock1.release()
+
+def StateProcess():
+  global sink_name,muted,volume
+  try:
+    tname=threading.current_thread().name
+#    print tname+">trying to aquire lock"
+    AcquirePALock()
+#    print tname+">lock aquired"
+    current_sink=GetDefaultOut()
+    current_vol=round(GetDefaultVolume(),2)
+    current_muted=IsDefaultMuted()
+    if current_sink<>sink_name:
+      sink_name=current_sink
+      print tname+">sink: "+sink_name
+    if current_vol<>volume:
+      volume=current_vol
+      print tname+">volume: "+str(volume)
+    if current_muted<>muted:
+      muted=current_muted
+      print tname+">muted: "+str(muted)
+  finally:
+    ReleasePALock()
+#    print tname+">lock released"
+
+def PAListener():
+  while not stopFlag:    
+#    print "entering wait loop"
+    EventListener(EventProcess)
+#    print "event or break happened"
+    StateProcess()
+
+def RunBackground(process):
+  stopFlag=False
+  thread = threading.Thread(target=process,name="Background")
+  thread.start()
+
+def StopBackground():
+  global stopFlag
+  stopFlag=True
+  pulse.event_listen_stop()
+
+def CommandGetDefaultOut():
+  try:
+    tname=threading.current_thread().name
+#    print tname+">trying to aquire lock"
+    AcquirePALock()
+#    print tname+">lock aquired"
+    result=GetDefaultOut()
+  finally:
+    ReleasePALock()
+#    print tname+">lock released"
+  return result
+
+def CommandGetDefaultVolume():
+  try:
+    tname=threading.current_thread().name
+#    print tname+">trying to aquire lock"
+    AcquirePALock()
+#    print tname+">lock aquired"
+    result=GetDefaultVolume()
+  finally:
+    ReleasePALock()
+#    print tname+">lock released"
+  return result
+
+def CommandIsDefaultMuted():
+  try:
+    tname=threading.current_thread().name
+#    print tname+">trying to aquire lock"
+    AcquirePALock()
+#    print tname+">lock aquired"
+    result=IsDefaultMuted()
+  finally:
+    ReleasePALock()
+#    print tname+">lock released"
+  return result
+  
+*** Файл '* Startup Log *'
+Processing '/etc/joe/joerc'...
+Processing '/etc/joe/ftyperc'...
+Finished processing /etc/joe/ftyperc
+Finished processing /etc/joe/joerc
diff --git a/mqtt-pulse/mqtt.cfg b/mqtt-pulse/mqtt.cfg
new file mode 100644 (file)
index 0000000..6cbf04c
--- /dev/null
@@ -0,0 +1,7 @@
+[mqtt]
+server = localhost
+username = pulse
+password = mypassword
+in = pulse/host/in
+out = pulse/host/out
+
diff --git a/mqtt-pulse/mqtt_pulse.py b/mqtt-pulse/mqtt_pulse.py
new file mode 100755 (executable)
index 0000000..9faa0d7
--- /dev/null
@@ -0,0 +1,281 @@
+#!/usr/bin/python
+
+from pulsectl import Pulse,PulseLoopStop
+
+import threading
+from time import sleep
+
+import sys
+from ConfigParser import ConfigParser
+import paho.mqtt.client as paho
+
+# ====================== PulseAudio-related part ========================
+
+sink_name=None
+muted=None
+volume=None
+
+callback_changed=None
+
+paLock1=threading.RLock()
+paLock2=threading.RLock()
+stopFlag=False
+
+debug=False
+
+def Init():
+  global pulse
+  pulse = Pulse("mqtt-pa")
+
+def Reconnect():
+  pulse.close()
+  Init()
+
+def GetDefaultOut():
+  return pulse.server_info().default_sink_name
+
+def GetDefaultSink():
+  sinkname=GetDefaultOut()
+  for sink in pulse.sink_list():
+    if sink.name==sinkname:
+      return sink
+
+def GetDefaultVolume():
+  return pulse.volume_get_all_chans(GetDefaultSink())
+
+def SetDefaultVolume(volume):
+  pulse.volume_set_all_chans(GetDefaultSink(), volume)
+
+def IsDefaultMuted():
+  return GetDefaultSink().mute<>0
+  
+def MuteDefault(mute = True):
+  return pulse.mute(GetDefaultSink(),mute)
+
+def AcquirePALock():
+  tname=threading.current_thread().name
+  if debug:
+    print tname," aquiring action..."
+  paLock1.acquire()
+  if debug:
+    print tname," aquired action..."
+  pulse.event_listen_stop()
+  if debug:
+    print tname," event_listener stop command sent..."
+    print tname," aquiring loop..."
+  paLock2.acquire()
+  if debug:
+    print tname," aquired loop..."
+
+def ReleasePALock():
+  tname=threading.current_thread().name
+  if debug:
+    print tname," releasing loop..."
+  paLock2.release()
+  if debug:
+    print tname," released loop..."
+    print tname," releasing action..."
+  paLock1.release()
+  if debug:
+    print tname," released action..."
+
+def AquireLoopLock():
+  tname=threading.current_thread().name
+  if debug:
+    print tname," aquiring loop..."
+  paLock2.acquire()
+  if debug:
+    print tname," aquired loop..."
+
+def ReleaseLoopLock():
+  tname=threading.current_thread().name
+  if debug:
+    print tname," releasing loop..."
+  paLock2.release()
+  if debug:
+    print tname," released loop..."
+
+def EventListener(callback):
+  pulse.event_mask_set('all')
+  pulse.event_callback_set(callback)
+  AquireLoopLock()
+  try:
+    pulse.event_listen()
+  finally:
+    ReleaseLoopLock()
+
+def EventProcess(ev):
+  raise PulseLoopStop
+
+def StateProcess():
+  global sink_name,muted,volume
+  AcquirePALock()
+  try:
+    tname=threading.current_thread().name
+    current_sink=GetDefaultOut()
+    current_vol=round(GetDefaultVolume(),2)
+    current_muted="ON" if IsDefaultMuted() else "OFF"
+    if current_sink<>sink_name:
+      sink_name=current_sink
+      if debug:
+        print 'sink='+sink_name
+      if callback_changed:
+        if debug:
+          print 'callback...'
+        callback_changed('sink',sink_name)
+    if current_vol<>volume:
+      volume=current_vol
+      if debug:
+        print 'volume='+str(volume)
+      if callback_changed:
+        if debug:
+          print 'callback...'
+        callback_changed('volume',str(volume))
+    if current_muted<>muted:
+      muted=current_muted
+      if debug:
+        print 'muted='+muted
+      if callback_changed:
+        if debug:
+          print 'callback...'
+        callback_changed('muted',muted)
+  finally:
+    ReleasePALock()
+
+def PAListener():
+  while not stopFlag:    
+    EventListener(EventProcess)
+    StateProcess()
+
+def RunBackground(process):
+  stopFlag=False
+  thread = threading.Thread(target=process,name="Background")
+  thread.start()
+
+def StopBackground():
+  global stopFlag
+  stopFlag=True
+  pulse.event_listen_stop()
+
+def CommandGetDefaultOut():
+  AcquirePALock()
+  try:
+    result=GetDefaultOut()
+  finally:
+    ReleasePALock()
+  return result
+
+def CommandGetDefaultVolume():
+  AcquirePALock()
+  try:
+    result=GetDefaultVolume()
+  finally:
+    ReleasePALock()
+  return result
+
+def CommandIsDefaultMuted():
+  AcquirePALock()
+  try:
+    result=IsDefaultMuted()
+  finally:
+    ReleasePALock()
+  return result
+
+def CommandSetDefaultVolume(volume):
+  AcquirePALock()
+  try:
+    SetDefaultVolume(volume)    
+  finally:
+    ReleasePALock()
+
+def CommandMuteDefault(mute=True):
+  AcquirePALock()
+  try:
+    MuteDefault(mute)    
+  finally:
+    ReleasePALock()
+
+# ====================== MQTT-related part ========================
+
+lockMQTT = threading.RLock()
+
+def on_message(mosq, obj, msg):
+  if debug:
+    print("Received: " + msg.topic + " " + str(msg.payload))  
+  try:
+    subtopic=msg.topic[len(mqtt_topic_in)+1:]
+    payload=msg.payload
+    if subtopic=="volume":
+      CommandSetDefaultVolume(float(payload))
+    elif subtopic=="muted":
+      if payload=="ON":
+        payload=True
+      elif payload=="OFF":
+        payload=False
+      else:
+        payload=int(payload)  
+      CommandMuteDefault(payload)
+    else:
+      if debug:
+        print "Unknown command"  
+  except:
+    if debug:
+      print "Command failed"    
+  
+def InitMQTT():
+
+  global client,mqtt_topic_in,mqtt_topic_out
+
+  conffile = sys.argv[1]
+
+  config = ConfigParser()
+  config.add_section('mqtt')
+  # set defaults for anonymous auth
+  config.set('mqtt', 'username', '')
+  config.set('mqtt', 'password', '')
+  config.set('mqtt', 'port', '1883')
+  config.set('mqtt', 'in', 'pulse/in')
+  config.set('mqtt', 'out', 'pulse/out')
+  config.read(conffile)
+
+  mqtt_server = config.get('mqtt', 'server')
+  mqtt_port = config.getint('mqtt', 'port')
+  mqtt_username = config.get('mqtt', 'username')
+  mqtt_password = config.get('mqtt', 'password')
+  mqtt_topic_in = config.get('mqtt', 'in')
+  mqtt_topic_out = config.get('mqtt', 'out')
+
+  client = paho.Client('pulse')
+  client.username_pw_set(mqtt_username, mqtt_password)
+  client.on_message=on_message
+  client.connect(mqtt_server, port=mqtt_port)
+  client.subscribe(mqtt_topic_in+'/#',1)
+
+def MQTTCallback(param,value):
+
+  lockMQTT.acquire()
+  try:
+    client.publish(mqtt_topic_out+'/'+param, payload=value)
+    if debug:
+      print "Sent "+param+"="+value
+  finally:
+    lockMQTT.release()
+
+def StartPulseListener():
+  global callback_changed
+  Init()
+  InitMQTT()
+  callback_changed=MQTTCallback
+  StateProcess()
+  RunBackground(PAListener)
+
+StartPulseListener()
+
+try:
+  while True:
+    try:
+      client.loop()
+    except KeyboardInterrupt:
+      break
+finally:
+  StopBackground()    
diff --git a/mqtt-pulse/mqtt_pulse.pyc b/mqtt-pulse/mqtt_pulse.pyc
new file mode 100644 (file)
index 0000000..af1559e
Binary files /dev/null and b/mqtt-pulse/mqtt_pulse.pyc differ
diff --git a/sensors-postprocess/openhab-db.conf b/sensors-postprocess/openhab-db.conf
new file mode 100644 (file)
index 0000000..1a1c175
--- /dev/null
@@ -0,0 +1,10 @@
+[mysql]
+host=localhost
+user=openhab
+passwd=mypassword
+db=openhab
+[openhab]
+template=Sensor%
+[filter]
+window=5
+threshold=10