Избегаем "залипания" последнего сообщения в MQTT.
[openhab-process.git] / mqtt-pulse / mqtt_pulse.py
1 #!/usr/bin/python
2
3 from pulsectl import Pulse,PulseLoopStop
4
5 import threading
6 from time import sleep
7
8 import sys
9 from ConfigParser import ConfigParser
10 import paho.mqtt.client as paho
11
12 # ====================== PulseAudio-related part ========================
13
14 sink_name=None
15 muted=None
16 volume=None
17
18 callback_changed=None
19
20 paLock1=threading.RLock()
21 paLock2=threading.RLock()
22 stopFlag=False
23
24 debug=False
25
26 def Init():
27   global pulse
28   pulse = Pulse("mqtt-pa")
29
30 def Reconnect():
31   pulse.close()
32   Init()
33
34 def GetDefaultOut():
35   return pulse.server_info().default_sink_name
36
37 def GetDefaultSink():
38   sinkname=GetDefaultOut()
39   for sink in pulse.sink_list():
40     if sink.name==sinkname:
41       return sink
42
43 def GetDefaultVolume():
44   return pulse.volume_get_all_chans(GetDefaultSink())
45
46 def SetDefaultVolume(volume):
47   pulse.volume_set_all_chans(GetDefaultSink(), volume)
48
49 def IsDefaultMuted():
50   return GetDefaultSink().mute<>0
51   
52 def MuteDefault(mute = True):
53   return pulse.mute(GetDefaultSink(),mute)
54
55 def AcquirePALock():
56   tname=threading.current_thread().name
57   if debug:
58     print tname," aquiring action..."
59   paLock1.acquire()
60   if debug:
61     print tname," aquired action..."
62   pulse.event_listen_stop()
63   if debug:
64     print tname," event_listener stop command sent..."
65     print tname," aquiring loop..."
66   paLock2.acquire()
67   if debug:
68     print tname," aquired loop..."
69
70 def ReleasePALock():
71   tname=threading.current_thread().name
72   if debug:
73     print tname," releasing loop..."
74   paLock2.release()
75   if debug:
76     print tname," released loop..."
77     print tname," releasing action..."
78   paLock1.release()
79   if debug:
80     print tname," released action..."
81
82 def AquireLoopLock():
83   tname=threading.current_thread().name
84   if debug:
85     print tname," aquiring loop..."
86   paLock2.acquire()
87   if debug:
88     print tname," aquired loop..."
89
90 def ReleaseLoopLock():
91   tname=threading.current_thread().name
92   if debug:
93     print tname," releasing loop..."
94   paLock2.release()
95   if debug:
96     print tname," released loop..."
97
98 def EventListener(callback):
99   pulse.event_mask_set('all')
100   pulse.event_callback_set(callback)
101   AquireLoopLock()
102   try:
103     pulse.event_listen()
104   finally:
105     ReleaseLoopLock()
106
107 def EventProcess(ev):
108   raise PulseLoopStop
109
110 def StateProcess():
111   global sink_name,muted,volume
112   AcquirePALock()
113   try:
114     tname=threading.current_thread().name
115     current_sink=GetDefaultOut()
116     current_vol=round(GetDefaultVolume(),2)
117     current_muted="ON" if IsDefaultMuted() else "OFF"
118     if current_sink<>sink_name:
119       sink_name=current_sink
120       if debug:
121         print 'sink='+sink_name
122       if callback_changed:
123         if debug:
124           print 'callback...'
125         callback_changed('sink',sink_name)
126     if current_vol<>volume:
127       volume=current_vol
128       if debug:
129         print 'volume='+str(volume)
130       if callback_changed:
131         if debug:
132           print 'callback...'
133         callback_changed('volume',str(volume))
134     if current_muted<>muted:
135       muted=current_muted
136       if debug:
137         print 'muted='+muted
138       if callback_changed:
139         if debug:
140           print 'callback...'
141         callback_changed('muted',muted)
142   finally:
143     ReleasePALock()
144
145 def PAListener():
146   while not stopFlag:    
147     EventListener(EventProcess)
148     StateProcess()
149
150 def RunBackground(process):
151   stopFlag=False
152   thread = threading.Thread(target=process,name="Background")
153   thread.start()
154
155 def StopBackground():
156   global stopFlag
157   stopFlag=True
158   pulse.event_listen_stop()
159
160 def CommandGetDefaultOut():
161   AcquirePALock()
162   try:
163     result=GetDefaultOut()
164   finally:
165     ReleasePALock()
166   return result
167
168 def CommandGetDefaultVolume():
169   AcquirePALock()
170   try:
171     result=GetDefaultVolume()
172   finally:
173     ReleasePALock()
174   return result
175
176 def CommandIsDefaultMuted():
177   AcquirePALock()
178   try:
179     result=IsDefaultMuted()
180   finally:
181     ReleasePALock()
182   return result
183
184 def CommandSetDefaultVolume(volume):
185   AcquirePALock()
186   try:
187     SetDefaultVolume(volume)    
188   finally:
189     ReleasePALock()
190
191 def CommandMuteDefault(mute=True):
192   AcquirePALock()
193   try:
194     MuteDefault(mute)    
195   finally:
196     ReleasePALock()
197
198 # ====================== MQTT-related part ========================
199
200 lockMQTT = threading.RLock()
201
202 def on_message(mosq, obj, msg):
203   if debug:
204     print("Received: " + msg.topic + " " + str(msg.payload))  
205   try:
206     subtopic=msg.topic[len(mqtt_topic_in)+1:]
207     payload=msg.payload
208     if subtopic=="volume":
209       CommandSetDefaultVolume(float(payload))
210     elif subtopic=="muted":
211       if payload=="ON":
212         payload=True
213       elif payload=="OFF":
214         payload=False
215       else:
216         payload=int(payload)  
217       CommandMuteDefault(payload)
218     else:
219       if debug:
220         print "Unknown command"  
221   except:
222     if debug:
223       print "Command failed"    
224   
225 def InitMQTT():
226
227   global client,mqtt_topic_in,mqtt_topic_out
228
229   conffile = sys.argv[1]
230
231   config = ConfigParser()
232   config.add_section('mqtt')
233   # set defaults for anonymous auth
234   config.set('mqtt', 'username', '')
235   config.set('mqtt', 'password', '')
236   config.set('mqtt', 'port', '1883')
237   config.set('mqtt', 'in', 'pulse/in')
238   config.set('mqtt', 'out', 'pulse/out')
239   config.read(conffile)
240
241   mqtt_server = config.get('mqtt', 'server')
242   mqtt_port = config.getint('mqtt', 'port')
243   mqtt_username = config.get('mqtt', 'username')
244   mqtt_password = config.get('mqtt', 'password')
245   mqtt_topic_in = config.get('mqtt', 'in')
246   mqtt_topic_out = config.get('mqtt', 'out')
247
248   client = paho.Client('pulse')
249   client.username_pw_set(mqtt_username, mqtt_password)
250   client.on_message=on_message
251   client.connect(mqtt_server, port=mqtt_port)
252   client.subscribe(mqtt_topic_in+'/#',1)
253
254 def MQTTCallback(param,value):
255
256   lockMQTT.acquire()
257   try:
258     client.publish(mqtt_topic_out+'/'+param, payload=value)
259     if debug:
260       print "Sent "+param+"="+value
261   finally:
262     lockMQTT.release()
263
264 def StartPulseListener():
265   global callback_changed
266   Init()
267   InitMQTT()
268   callback_changed=MQTTCallback
269   StateProcess()
270   RunBackground(PAListener)
271
272 StartPulseListener()
273
274 try:
275   while True:
276     try:
277       client.loop()
278     except KeyboardInterrupt:
279       break
280 finally:
281   StopBackground()