Избегаем "залипания" последнего сообщения в MQTT.
[openhab-process.git] / mqtt-pulse / mqtt-pulse
1 #!/usr/bin/python
2
3 from pulsectl import Pulse,PulseLoopStop
4
5 import threading
6 from time import sleep
7
8 import sys
9 from ConfigParser import ConfigParser
10 import paho.mqtt.client as paho
11
12 # ====================== PulseAudio-related part ========================
13
14 sink_name=None
15 muted=None
16 volume=None
17
18 callback_changed=None
19
20 paLock1=threading.RLock()
21 paLock2=threading.RLock()
22 stopFlag=False
23
24 debug=False
25
26 def Init():
27   global pulse
28   pulse = Pulse("mqtt-pa")
29
30 def Reconnect():
31   pulse.close()
32   Init()
33
34 def GetDefaultOut():
35   return pulse.server_info().default_sink_name
36
37 def GetDefaultSink():
38   sinkname=GetDefaultOut()
39   for sink in pulse.sink_list():
40     if sink.name==sinkname:
41       return sink
42
43 def GetDefaultVolume():
44   return pulse.volume_get_all_chans(GetDefaultSink())
45
46 def SetDefaultVolume(volume):
47   pulse.volume_set_all_chans(GetDefaultSink(), volume/100.0)
48   if debug:
49     print "volume set %s" % volume
50
51 def IsDefaultMuted():
52   return GetDefaultSink().mute<>0
53   
54 def MuteDefault(mute = True):
55   pulse.mute(GetDefaultSink(),mute)
56   if debug:
57     print "mute set %s" % mute
58
59 def AcquirePALock():
60   tname=threading.current_thread().name
61   if debug:
62     print tname," aquiring action..."
63   paLock1.acquire()
64   if debug:
65     print tname," aquired action..."
66   pulse.event_listen_stop()
67   if debug:
68     print tname," event_listener stop command sent..."
69     print tname," aquiring loop..."
70   paLock2.acquire()
71   if debug:
72     print tname," aquired loop..."
73
74 def ReleasePALock():
75   tname=threading.current_thread().name
76   if debug:
77     print tname," releasing loop..."
78   paLock2.release()
79   if debug:
80     print tname," released loop..."
81     print tname," releasing action..."
82   paLock1.release()
83   if debug:
84     print tname," released action..."
85
86 def AquireLoopLock():
87   tname=threading.current_thread().name
88   if debug:
89     print tname," aquiring loop..."
90   paLock2.acquire()
91   if debug:
92     print tname," aquired loop..."
93
94 def ReleaseLoopLock():
95   tname=threading.current_thread().name
96   if debug:
97     print tname," releasing loop..."
98   paLock2.release()
99   if debug:
100     print tname," released loop..."
101
102 def EventListener(callback):
103   pulse.event_mask_set('all')
104   pulse.event_callback_set(callback)
105   AquireLoopLock()
106   try:
107     pulse.event_listen()
108   finally:
109     ReleaseLoopLock()
110
111 def EventProcess(ev):
112   raise PulseLoopStop
113
114 def StateProcess():
115   global sink_name,muted,volume
116   AcquirePALock()
117   try:
118     tname=threading.current_thread().name
119     current_sink=GetDefaultOut()
120     current_vol=round(GetDefaultVolume()*100,2)
121     current_muted="ON" if IsDefaultMuted() else "OFF"
122     if current_sink<>sink_name:
123       sink_name=current_sink
124       if debug:
125         print 'sink='+sink_name
126       if callback_changed:
127         if debug:
128           print 'callback...'
129         callback_changed('sink',sink_name)
130     if current_vol<>volume:
131       volume=current_vol
132       if debug:
133         print 'volume='+str(volume)
134       if callback_changed:
135         if debug:
136           print 'callback...'
137         callback_changed('volume',str(volume))
138     if current_muted<>muted:
139       muted=current_muted
140       if debug:
141         print 'muted='+muted
142       if callback_changed:
143         if debug:
144           print 'callback...'
145         callback_changed('muted',muted)
146   finally:
147     ReleasePALock()
148
149 def PAListener():
150   while not stopFlag:    
151     EventListener(EventProcess)
152     StateProcess()
153
154 def RunBackground(process):
155   stopFlag=False
156   thread = threading.Thread(target=process,name="Background")
157   thread.start()
158
159 def StopBackground():
160   global stopFlag
161   stopFlag=True
162   pulse.event_listen_stop()
163
164 def CommandGetDefaultOut():
165   AcquirePALock()
166   try:
167     result=GetDefaultOut()
168   finally:
169     ReleasePALock()
170   return result
171
172 def CommandGetDefaultVolume():
173   AcquirePALock()
174   try:
175     result=GetDefaultVolume()
176   finally:
177     ReleasePALock()
178   return result
179
180 def CommandIsDefaultMuted():
181   AcquirePALock()
182   try:
183     result=IsDefaultMuted()
184   finally:
185     ReleasePALock()
186   return result
187
188 def CommandSetDefaultVolume(volume):
189   AcquirePALock()
190   try:
191     SetDefaultVolume(volume)    
192   finally:
193     ReleasePALock()
194
195 def CommandMuteDefault(mute=True):
196   AcquirePALock()
197   try:
198     MuteDefault(mute)    
199   finally:
200     ReleasePALock()
201
202 # ====================== MQTT-related part ========================
203
204 lockMQTT = threading.RLock()
205
206 def on_message(mosq, obj, msg):
207   if debug:
208     print("Received: " + msg.topic + " " + str(msg.payload))  
209   try:
210     subtopic=msg.topic[len(mqtt_topic_in)+1:]
211     payload=msg.payload
212     if subtopic=="volume":
213       CommandSetDefaultVolume(float(payload))
214     elif subtopic=="muted":
215       if payload=="ON":
216         payload=True
217       elif payload=="OFF":
218         payload=False
219       else:
220         payload=int(payload)  
221       CommandMuteDefault(payload)
222     else:
223       if debug:
224         print "Unknown command"  
225   except:
226     if debug:
227       print "Command failed"    
228   
229 def InitMQTT():
230
231   global client,mqtt_topic_in,mqtt_topic_out
232
233   conffile = sys.argv[1]
234
235   config = ConfigParser()
236   config.add_section('mqtt')
237   # set defaults for anonymous auth
238   config.set('mqtt', 'username', '')
239   config.set('mqtt', 'password', '')
240   config.set('mqtt', 'port', '1883')
241   config.set('mqtt', 'in', 'pulse/in')
242   config.set('mqtt', 'out', 'pulse/out')
243   config.read(conffile)
244
245   mqtt_server = config.get('mqtt', 'server')
246   mqtt_port = config.getint('mqtt', 'port')
247   mqtt_username = config.get('mqtt', 'username')
248   mqtt_password = config.get('mqtt', 'password')
249   mqtt_topic_in = config.get('mqtt', 'in')
250   mqtt_topic_out = config.get('mqtt', 'out')
251
252   client = paho.Client('pulse')
253   client.username_pw_set(mqtt_username, mqtt_password)
254   client.on_message=on_message
255   client.connect(mqtt_server, port=mqtt_port)
256   client.subscribe(mqtt_topic_in+'/#',1)
257
258 def MQTTCallback(param,value):
259
260   lockMQTT.acquire()
261   try:
262     client.publish(mqtt_topic_out+'/'+param, payload=value)
263     if debug:
264       print "Sent "+param+"="+value
265   finally:
266     lockMQTT.release()
267
268 def StartPulseListener():
269   global callback_changed
270   Init()
271   InitMQTT()
272   callback_changed=MQTTCallback
273   StateProcess()
274   RunBackground(PAListener)
275
276 StartPulseListener()
277
278 try:
279   while True:
280     try:
281       client.loop()
282     except KeyboardInterrupt:
283       break
284 finally:
285   StopBackground()