Переход на асинхронное закрытие каналов с таймаутом, чтобы при обрыве соединения...
[vpproxy.git] / vlcclient / vlcclient.py
1 '''
2 Minimal VLC VLM client for AceProxy. Client class.
3 '''
4
5 import gevent
6 import gevent.event
7 import gevent.coros
8 import telnetlib
9 import logging
10 from vlcmessages import *
11 import time
12 from pprint import pprint
13
14 class VlcException(Exception):
15
16     '''
17     Exception from VlcClient
18     '''
19     pass
20
21
22 class VlcClient(object):
23
24     '''
25     VLC Client class
26     '''
27
28     def __init__(
29         self, host='127.0.0.1', port=4212, password='admin', connect_timeout=10,
30             result_timeout=10, out_port=8081):
31         # Receive buffer
32         self._recvbuffer = None
33         # Output port
34         self._out_port = out_port
35         # VLC socket
36         self._socket = None
37         # Result timeout
38         self._resulttimeout = result_timeout
39         # Shutting down flag
40         self._shuttingDown = gevent.event.Event()
41         # Authentication done event
42         self._auth = gevent.event.AsyncResult()
43         # Request lock
44         self._resultlock = gevent.coros.RLock()
45         # Request result
46         self._result = gevent.event.AsyncResult()
47         # VLC version string
48         self._vlcver = None
49         # Saving password
50         self._password = password
51
52         # Logger
53         logger = logging.getLogger('VlcClient_init')
54
55         # Streams
56         self.streams=dict()
57
58         # Making connection
59         try:
60             self._socket = telnetlib.Telnet(host, port, connect_timeout)
61             logger.info("Successfully connected with VLC socket!")
62         except Exception as e:
63             raise VlcException(
64                 "Socket creation error! VLC is not running? ERROR: " + repr(e))
65
66         # Spawning recvData greenlet
67         gevent.spawn(self._recvData)
68         gevent.sleep()
69
70         # Waiting for authentication event
71         try:
72             if self._auth.get(timeout=self._resulttimeout) == False:
73                 errmsg = "Authentication error"
74                 logger.error(errmsg)
75                 raise VlcException(errmsg)
76         except gevent.Timeout:
77             errmsg = "Authentication timeout"
78             logger.error(errmsg)
79             raise VlcException(errmsg)
80
81     def __del__(self):
82         # Destructor just calls destroy() method
83         self.destroy()
84
85     def destroy(self):
86         # Logger
87         logger = logging.getLogger("VlcClient_destroy")
88
89         if self._shuttingDown.isSet():
90             # Already in the middle of destroying
91             return
92
93         # If socket is still alive (connected)
94         if self._socket:
95             try:
96                 logger.info("Destroying VlcClient...")
97                 self._write(VlcMessage.request.SHUTDOWN)
98                 # Set shuttingDown flag for recvData
99                 self._shuttingDown.set()
100             except:
101                 # Ignore exceptions on destroy
102                 pass
103
104     def _write(self, message):
105
106         logger = logging.getLogger("VlcClient_write")
107
108         # Return if in the middle of destroying
109         if self._shuttingDown.isSet():
110             return
111
112         try:
113             # Write message
114             logger.info('VLC command: ' + message)
115             self._socket.write(message + "\r\n")
116         except EOFError as e:
117             raise VlcException("Vlc Write error! ERROR: " + repr(e))
118
119     def _broadcast(self, brtype, stream_name, input=None, muxer='ts', pre_access='', qtype='default'):
120         if self._shuttingDown.isSet():
121             return
122
123         # Start/stop broadcast with VLC
124         # Logger
125         if brtype == True:
126             broadcast = 'startBroadcast'
127         else:
128             broadcast = 'stopBroadcast'
129
130         logger = logging.getLogger("VlcClient_" + broadcast)
131         # Clear AsyncResult
132         self._result = gevent.event.AsyncResult()
133         # Get lock
134         self._resultlock.acquire()
135         # Write message to VLC socket
136         if brtype == True:
137             msg = VlcMessage.request.startBroadcast(stream_name, input, self._out_port, muxer, pre_access, qtype)
138             self._write(msg)
139         else:
140             if stream_name not in self.streams:
141                 self._resultlock.release()
142                 logger.error("Attempting to delete not existing stream %s" % stream_name)
143                 return
144             self._write(VlcMessage.request.stopBroadcast(stream_name))
145
146         try:
147             gevent.sleep()
148             result = self._result.get(timeout=self._resulttimeout)
149             if result == False:
150                 logger.error(broadcast + " error")
151                 raise VlcException(broadcast + " error")
152         except gevent.Timeout:
153             logger.error(broadcast + " result timeout")
154             raise VlcException(broadcast + " result timeout")
155         finally:
156             logger.info("working with %s stream: %s" % (stream_name,broadcast))
157             if brtype == True:
158                 self.streams[stream_name]=time.time()
159             else:
160                 del self.streams[stream_name]
161             self._resultlock.release()
162             logger.info("worked with %s stream: %s" % (stream_name,broadcast))
163
164         if brtype == True:
165             logger.info("Broadcast started")
166         else:
167             logger.info("Broadcast stopped")
168
169     def startBroadcast(self, stream_name, input, muxer='ts', pre_access='', qtype='default'):
170         logger = logging.getLogger("VlcClient_startBroadcast")
171         logger.debug("Starting broadcast......")
172         return self._broadcast(True, stream_name, input, muxer, pre_access, qtype)
173
174     def stopBroadcast(self, stream_name):
175         return self._broadcast(False, stream_name)
176
177     def mark(self,stream_name):
178       self.streams[stream_name]=time.time()
179
180     def clean_streams(self,timeout=15):
181       self._resultlock.acquire()
182       to_stop=set()
183       for stream,lasttime in self.streams.iteritems():
184         print stream,lasttime
185         if time.time()-lasttime>timeout:
186           to_stop.add(stream)
187       for stream in to_stop:    
188         self.stopBroadcast(stream)
189       self._resultlock.release()
190
191     def pauseBroadcast(self, stream_name):
192         return self._write(VlcMessage.request.pauseBroadcast(stream_name))
193
194     def playBroadcast(self, stream_name):
195         return self._write(VlcMessage.request.playBroadcast(stream_name))
196
197     def _recvData(self):
198         # Logger
199         logger = logging.getLogger("VlcClient_recvData")
200
201         while True:
202             gevent.sleep()
203             try:
204                 self._recvbuffer = self._socket.read_until("\n")
205                 # Stripping "> " from VLC
206                 self._recvbuffer = self._recvbuffer.lstrip("> ")
207             except:
208                 # If something happened during read, abandon reader
209                 if not self._shuttingDown.isSet():
210                     logger.error("Exception at socket read")
211                     self._shuttingDown.set()
212                 return
213
214             # Parsing everything only if the string is not empty
215             if self._recvbuffer:
216                 if not self._vlcver:
217                     # First line (VLC version)
218                     self._vlcver = self._recvbuffer.strip()
219                     # Send password here since PASSWORD doesn't have \n
220                     self._write(self._password)
221
222                 elif self._recvbuffer.startswith(VlcMessage.response.SHUTDOWN):
223                     # Exit from this loop
224                     logger.debug("Got SHUTDOWN from VLC")
225                     return
226
227                 elif self._recvbuffer.startswith(VlcMessage.response.WRONGPASS):
228                     # Wrong password
229                     logger.error("Wrong VLC password!")
230                     self._auth.set(False)
231                     return
232
233                 elif self._recvbuffer.startswith(VlcMessage.response.AUTHOK):
234                     # Authentication OK
235                     logger.info("Authentication successful")
236                     self._auth.set(True)
237
238                 elif VlcMessage.response.BROADCASTEXISTS in self._recvbuffer:
239                     # Broadcast already exists
240                     logger.error("Broadcast already exists!")
241                     self._result.set(False)
242
243                 elif VlcMessage.response.STOPERR in self._recvbuffer:
244                     # Media unknown (stopping non-existent stream)
245                     logger.error("Broadcast does not exist!")
246                     self._result.set(False)
247
248                 # Do not move this before error handlers!
249                 elif self._recvbuffer.startswith(VlcMessage.response.STARTOK):
250                     # Broadcast started
251                     logger.info("Broadcast started")
252                     self._result.set(True)
253
254                 elif self._recvbuffer.startswith(VlcMessage.response.STOPOK):
255                     # Broadcast stopped
256                     logger.info("Broadcast stopped")
257                     self._result.set(True)