Корректная обработка ошибок при неправильном имени канала.
[vpproxy.git] / vphttp.py
1 #!/usr/bin/env python2
2 # -*- coding: utf-8 -*-
3 '''
4 VPProxy: HTTP/HLS Stream to HTTP Multiplexing Proxy
5
6 Based on AceProxy (https://github.com/ValdikSS/AceProxy) design
7 '''
8 import traceback
9 import gevent
10 import gevent.monkey
11 # Monkeypatching and all the stuff
12 gevent.monkey.patch_all()
13 # Startup delay for daemon restart
14 gevent.sleep(5)
15 import glob
16 import os
17 import signal
18 import sys
19 import logging
20 import psutil
21 import BaseHTTPServer
22 import SocketServer
23 from socket import error as SocketException
24 from socket import SHUT_RDWR
25 import urllib2
26 import hashlib
27 import vpconfig
28 from vpconfig import VPConfig
29 import vlcclient
30 import gc
31 import plugins.modules.ipaddr as ipaddr
32 from clientcounter import ClientCounter
33 from plugins.modules.PluginInterface import VPProxyPlugin
34 try:
35     import pwd
36     import grp
37 except ImportError:
38     pass
39
40
41
42 class HTTPHandler(BaseHTTPServer.BaseHTTPRequestHandler):
43
44     requestlist = []
45
46     def handle_one_request(self):
47         '''
48         Add request to requestlist, handle request and remove from the list
49         '''
50         HTTPHandler.requestlist.append(self)
51         BaseHTTPServer.BaseHTTPRequestHandler.handle_one_request(self)
52         HTTPHandler.requestlist.remove(self)
53
54     def closeConnection(self):
55         '''
56         Disconnecting client
57         '''
58         if self.clientconnected:
59             self.clientconnected = False
60             try:
61                 self.wfile.close()
62                 self.rfile.close()
63                 self.connection.shutdown(SHUT_RDWR)
64             except:
65                 pass
66
67     def dieWithError(self, errorcode=500):
68         '''
69         Close connection with error
70         '''
71         logging.warning("Dying with error")
72         if self.clientconnected:
73             self.send_error(errorcode)
74             self.end_headers()
75             self.closeConnection()
76
77     def proxyReadWrite(self):
78         '''
79         Read video stream and send it to client
80         '''
81         logger = logging.getLogger('http_proxyReadWrite')
82         logger.debug("Started")
83
84         self.vlcstate = True
85         self.streamstate = True
86
87         try:
88             while True:
89
90                 if not self.clientconnected:
91                     logger.debug("Client is not connected, terminating")
92                     break
93
94                 data = self.video.read(4096)
95                 if data and self.clientconnected:
96                     self.wfile.write(data)
97                 else:
98                     logger.warning("Video connection closed")
99                     break
100
101         except SocketException:
102             # Video connection dropped
103             logger.warning("Video connection dropped")
104         finally:
105             self.video.close()
106             self.closeConnection()
107
108     def hangDetector(self):
109         '''
110         Detect client disconnection while in the middle of something
111         or just normal connection close.
112         '''
113         logger = logging.getLogger('http_hangDetector')
114         try:
115             while True:
116                 if not self.rfile.read():
117                     break
118         except:
119             pass
120         finally:
121             self.clientconnected = False
122             logger.debug("Client disconnected")
123             try:
124                 self.requestgreenlet.kill()
125             except:
126                 pass
127             finally:
128                 gevent.sleep()
129             return
130
131     def do_HEAD(self):
132         return self.do_GET(headers_only=True)
133
134     def do_GET(self, headers_only=False):
135         '''
136         GET request handler
137         '''
138         logger = logging.getLogger('http_HTTPHandler')
139         self.clientconnected = True
140         # Don't wait videodestroydelay if error happened
141         self.errorhappened = True
142         # Headers sent flag for fake headers UAs
143         self.headerssent = False
144         # Current greenlet
145         self.requestgreenlet = gevent.getcurrent()
146         # Connected client IP address
147         self.clientip = self.request.getpeername()[0]
148
149         if VPConfig.firewall:
150             # If firewall enabled
151             self.clientinrange = any(map(lambda i: ipaddr.IPAddress(self.clientip) \
152                                 in ipaddr.IPNetwork(i), VPConfig.firewallnetranges))
153
154             if (VPConfig.firewallblacklistmode and self.clientinrange) or \
155                 (not VPConfig.firewallblacklistmode and not self.clientinrange):
156                     logger.info('Dropping connection from ' + self.clientip + ' due to ' + \
157                                 'firewall rules')
158                     self.dieWithError(403)  # 403 Forbidden
159                     return
160
161         logger.info("Accepted connection from " + self.clientip + " path " + self.path)
162
163         try:
164             self.splittedpath = self.path.split('/')
165             self.reqtype = self.splittedpath[1].lower()
166             # If first parameter is 'pid' or 'torrent' or it should be handled
167             # by plugin
168             if not (self.reqtype in ('get','mp4','ogg','ogv') or self.reqtype in VPStuff.pluginshandlers):
169                 self.dieWithError(400)  # 400 Bad Request
170                 return
171         except IndexError:
172             self.dieWithError(400)  # 400 Bad Request
173             return
174
175         # Handle request with plugin handler
176         if self.reqtype in VPStuff.pluginshandlers:
177             try:
178                 VPStuff.pluginshandlers.get(self.reqtype).handle(self)
179             except Exception as e:
180                 logger.error('Plugin exception: ' + repr(e))
181                 logger.error(traceback.format_exc())
182                 self.dieWithError()
183             finally:
184                 self.closeConnection()
185                 return
186         self.handleRequest(headers_only)
187
188     def handleRequest(self, headers_only):
189
190         # Limit concurrent connections
191         if 0 < VPConfig.maxconns <= VPStuff.clientcounter.total:
192             logger.debug("Maximum connections reached, can't serve this")
193             self.dieWithError(503)  # 503 Service Unavailable
194             return
195
196         # Pretend to work fine with Fake UAs or HEAD request.
197         useragent = self.headers.get('User-Agent')
198         logger.debug("HTTP User Agent:"+useragent)
199         fakeua = useragent and useragent in VPConfig.fakeuas
200         if headers_only or fakeua:
201             if fakeua:
202                 logger.debug("Got fake UA: " + self.headers.get('User-Agent'))
203             # Return 200 and exit
204             self.send_response(200)
205             self.send_header("Content-Type", "video/mpeg")
206             self.end_headers()
207             self.closeConnection()
208             return
209
210         self.path_unquoted = urllib2.unquote('/'.join(self.splittedpath[2:]))
211         # Make list with parameters
212         self.params = list()
213         for i in xrange(3, 8):
214             try:
215                 self.params.append(int(self.splittedpath[i]))
216             except (IndexError, ValueError):
217                 self.params.append('0')
218
219         # Adding client to clientcounter
220         clients = VPStuff.clientcounter.add(self.reqtype+'\\'+self.path_unquoted, self.clientip)
221         # If we are the one client, but sucessfully got vp instance from clientcounter,
222         # then somebody is waiting in the videodestroydelay state
223
224         # Check if we are first client
225         if VPStuff.clientcounter.get(self.reqtype+'\\'+self.path_unquoted)==1:
226             logger.debug("First client, should create VLC session")
227             shouldcreatevp = True
228         else:
229             logger.debug("Can reuse existing session")
230             shouldcreatevp = False
231
232         self.vlcid = hashlib.md5(self.reqtype+'\\'+self.path_unquoted).hexdigest()
233
234         # Send fake headers if this User-Agent is in fakeheaderuas tuple
235         if fakeua:
236             logger.debug(
237                 "Sending fake headers for " + useragent)
238             self.send_response(200)
239             self.send_header('Cache-Control','no-cache, no-store, must-revalidate');
240             self.send_header('Pragma','no-cache');
241             if self.reqtype in ("ogg","ogv"):
242                 self.send_header("Content-Type", "video/ogg")
243             else:
244                 self.send_header("Content-Type", "video/mpeg")
245             self.end_headers()
246             # Do not send real headers at all
247             self.headerssent = True
248
249         try:
250             self.hanggreenlet = gevent.spawn(self.hangDetector)
251             logger.debug("hangDetector spawned")
252             gevent.sleep()
253
254             # Getting URL
255             self.errorhappened = False
256
257             if shouldcreatevp:
258                 logger.debug("Got url " + self.path_unquoted)
259                 # Force ffmpeg demuxing if set in config
260                 if VPConfig.vlcforceffmpeg:
261                     self.vlcprefix = 'http/ffmpeg://'
262                 else:
263                     self.vlcprefix = ''
264
265                 logger.info("Starting broadcasting "+self.path)                    
266                 VPStuff.vlcclient.startBroadcast(
267                     self.vlcid, self.vlcprefix + self.path_unquoted, VPConfig.vlcmux, VPConfig.vlcpreaccess, self.reqtype)
268                 # Sleep a bit, because sometimes VLC doesn't open port in
269                 # time
270                 gevent.sleep(0.5)
271
272             # Building new VLC url
273             self.url = 'http://' + VPConfig.vlchost + \
274                 ':' + str(VPConfig.vlcoutport) + '/' + self.vlcid
275             logger.debug("VLC url " + self.url)
276
277             # Sending client headers to videostream
278             self.video = urllib2.Request(self.url)
279             for key in self.headers.dict:
280                 self.video.add_header(key, self.headers.dict[key])
281
282             self.video = urllib2.urlopen(self.video)
283
284             # Sending videostream headers to client
285             if not self.headerssent:
286                 self.send_response(self.video.getcode())
287                 if self.video.info().dict.has_key('connection'):
288                     del self.video.info().dict['connection']
289                 if self.video.info().dict.has_key('server'):
290                     del self.video.info().dict['server']
291                 if self.video.info().dict.has_key('transfer-encoding'):
292                     del self.video.info().dict['transfer-encoding']
293                 if self.video.info().dict.has_key('content-type'):
294                     del self.video.info().dict['content-type']
295                 if self.video.info().dict.has_key('keep-alive'):
296                     del self.video.info().dict['keep-alive']
297
298                 for key in self.video.info().dict:
299                     self.send_header(key, self.video.info().dict[key])
300
301                 self.send_header('Cache-Control','no-cache, no-store, must-revalidate');
302                 self.send_header('Pragma','no-cache');
303
304                 if self.reqtype=="ogg":
305                     self.send_header("Content-Type", "video/ogg")
306                 else:
307                     self.send_header("Content-Type", "video/mpeg")
308
309                 # End headers. Next goes video data
310                 self.end_headers()
311                 logger.debug("Headers sent")
312
313             # Run proxyReadWrite
314             self.proxyReadWrite()
315
316             # Waiting until hangDetector is joined
317             self.hanggreenlet.join()
318             logger.debug("Request handler finished")
319
320         except (vpclient.VPException, vlcclient.VlcException, urllib2.URLError) as e:
321             logger.error("Exception: " + repr(e))
322             self.errorhappened = True
323             self.dieWithError()
324         except gevent.GreenletExit:
325             # hangDetector told us about client disconnection
326             pass
327         except Exception:
328             # Unknown exception
329             logger.error(traceback.format_exc())
330             self.errorhappened = True
331             self.dieWithError()
332         finally:
333             logger.debug("END REQUEST")
334             logger.info("Closed connection from " + self.clientip + " path " + self.path)
335             VPStuff.clientcounter.delete(self.reqtype+'\\'+self.path_unquoted, self.clientip)
336             if not VPStuff.clientcounter.get(self.reqtype+'\\'+self.path_unquoted):
337                 try:
338                     logger.debug("That was the last client, destroying VPClient")
339                     logger.info("Stopping broadcasting " + self.path)
340                     VPStuff.vlcclient.stopBroadcast(self.vlcid)
341                 except:
342                     pass
343                 self.vp.destroy()
344             if not self.headerssent:
345                 logger.error("Problem receiving video stream, no headers!")
346                 if VPStuff.clientcounter.total == 0:
347                     logger.error("Probably VLC hang")
348                     VPStuff.vlc.kill()
349
350 class HTTPServer(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer):
351
352     def handle_error(self, request, client_address):
353         # Do not print HTTP tracebacks
354         pass
355
356
357 class VPStuff(object):
358     '''
359     Inter-class interaction class
360     '''
361     vlcclient=None
362
363 # taken from http://stackoverflow.com/questions/2699907/dropping-root-permissions-in-python
364 def drop_privileges(uid_name, gid_name='nogroup'):
365
366     # Get the uid/gid from the name
367     running_uid = pwd.getpwnam(uid_name).pw_uid
368     running_uid_home = pwd.getpwnam(uid_name).pw_dir
369     running_gid = grp.getgrnam(gid_name).gr_gid
370
371     # Remove group privileges
372     os.setgroups([])
373
374     # Try setting the new uid/gid
375     os.setgid(running_gid)
376     os.setuid(running_uid)
377
378     # Ensure a very conservative umask
379     old_umask = os.umask(077)
380
381     if os.getuid() == running_uid and os.getgid() == running_gid:
382         # could be useful
383         os.environ['HOME'] = running_uid_home
384         return True
385     return False
386
387 logging.basicConfig(
388     filename=VPConfig.logpath + 'vphttp.log' if VPConfig.loggingtoafile else None,
389     format='%(asctime)s %(levelname)s %(name)s: %(message)s', datefmt='%d.%m.%Y %H:%M:%S', level=VPConfig.debug)
390 logger = logging.getLogger('INIT')
391
392 # Loading plugins
393 # Trying to change dir (would fail in freezed state)
394 try:
395     os.chdir(os.path.dirname(os.path.realpath(__file__)))
396 except:
397     pass
398 # Creating dict of handlers
399 VPStuff.pluginshandlers = dict()
400 # And a list with plugin instances
401 VPStuff.pluginlist = list()
402 pluginsmatch = glob.glob('plugins/*_plugin.py')
403 sys.path.insert(0, 'plugins')
404 pluginslist = [os.path.splitext(os.path.basename(x))[0] for x in pluginsmatch]
405 for i in pluginslist:
406     plugin = __import__(i)
407     plugname = i.split('_')[0].capitalize()
408     try:
409         plugininstance = getattr(plugin, plugname)(VPConfig, VPStuff)
410     except Exception as e:
411         logger.error("Cannot load plugin " + plugname + ": " + repr(e))
412         continue
413     logger.debug('Plugin loaded: ' + plugname)
414     for j in plugininstance.handlers:
415         logger.info("Registering handler '" + j +"'")
416         VPStuff.pluginshandlers[j] = plugininstance
417     VPStuff.pluginlist.append(plugininstance)
418
419 # Check whether we can bind to the defined port safely
420 if os.getuid() != 0 and VPConfig.httpport <= 1024:
421     logger.error("Cannot bind to port " + str(VPConfig.httpport) + " without root privileges")
422     sys.exit(1)
423
424 server = HTTPServer((VPConfig.httphost, VPConfig.httpport), HTTPHandler)
425 logger = logging.getLogger('HTTP')
426
427 # Dropping root privileges if needed
428 if VPConfig.vpproxyuser and os.getuid() == 0:
429     if drop_privileges(VPConfig.vpproxyuser):
430         logger.info("Dropped privileges to user " + VPConfig.vpproxyuser)
431     else:
432         logger.error("Cannot drop privileges to user " + VPConfig.vpproxyuser)
433         sys.exit(1)
434
435 # Creating ClientCounter
436 VPStuff.clientcounter = ClientCounter()
437
438 DEVNULL = open(os.devnull, 'wb')
439
440 # Spawning procedures
441 def spawnVLC(cmd, delay = 0):
442     try:
443         VPStuff.vlc = psutil.Popen(cmd) #, stdout=DEVNULL, stderr=DEVNULL)
444         gevent.sleep(delay)
445         return True
446     except:
447         return False
448
449 def connectVLC():
450     try:
451         VPStuff.vlcclient = vlcclient.VlcClient(
452             host=VPConfig.vlchost, port=VPConfig.vlcport, password=VPConfig.vlcpass,
453             out_port=VPConfig.vlcoutport)
454         return True
455     except vlcclient.VlcException as e:
456         return False
457
458 def isRunning(process):
459     if psutil.version_info[0] >= 2:
460         if process.is_running() and process.status() != psutil.STATUS_ZOMBIE:
461             return True
462     else:  # for older versions of psutil
463         if process.is_running() and process.status != psutil.STATUS_ZOMBIE:
464             return True
465     return False
466
467 def findProcess(name):
468     for proc in psutil.process_iter():
469         try:
470             pinfo = proc.as_dict(attrs=['pid', 'name'])
471             if pinfo['name'] == name:
472                 return pinfo['pid']
473         except psutil.AccessDenied:
474             # System process
475             pass
476         except psutil.NoSuchProcess:
477             # Process terminated
478             pass
479     return None
480
481 def clean_proc():
482     # Trying to close all spawned processes gracefully
483     if isRunning(VPStuff.vlc):
484         if VPStuff.vlcclient:
485             VPStuff.vlcclient.destroy()
486         gevent.sleep(1)
487     if isRunning(VPStuff.vlc):
488         # or not :)
489         VPStuff.vlc.kill()
490
491 # This is what we call to stop the server completely
492 def shutdown(signum = 0, frame = 0):
493     logger.info("Stopping server...")
494     # Closing all client connections
495     for connection in server.RequestHandlerClass.requestlist:
496         try:
497             # Set errorhappened to prevent waiting for videodestroydelay
498             connection.errorhappened = True
499             connection.closeConnection()
500         except:
501             logger.warning("Cannot kill a connection!")
502     clean_proc()
503     server.server_close()
504     sys.exit()
505
506 def _reloadconfig(signum=None, frame=None):
507     '''
508     Reload configuration file.
509     SIGHUP handler.
510     '''
511     global VPConfig
512
513     logger = logging.getLogger('reloadconfig')
514     reload(vpconfig)
515     from vpconfig import VPConfig
516     logger.info('Config reloaded')
517
518 # setting signal handlers
519 try:
520     gevent.signal(signal.SIGHUP, _reloadconfig)
521     gevent.signal(signal.SIGTERM, shutdown)
522 except AttributeError:
523     pass
524
525 name = 'vlc'
526 VPStuff.vlcProc = VPConfig.vlccmd.split()
527 if spawnVLC(VPStuff.vlcProc, VPConfig.vlcspawntimeout) and connectVLC():
528     logger.info("VLC spawned with pid " + str(VPStuff.vlc.pid))
529 else:
530     logger.error('Cannot spawn or connect to VLC!')
531     clean_proc()
532     sys.exit(1)
533
534 try:
535     logger.info("Using gevent %s" % gevent.__version__)
536     logger.info("Using psutil %s" % psutil.__version__)
537     logger.info("Using VLC %s" % VPStuff.vlcclient._vlcver)
538     logger.info("Server started.")
539     while True:
540         if not isRunning(VPStuff.vlc):
541             del VPStuff.vlc
542             if spawnVLC(VPStuff.vlcProc, VPConfig.vlcspawntimeout) and connectVLC():
543                 logger.info("VLC died, respawned it with pid " + str(VPStuff.vlc.pid))
544             else:
545                 logger.error("Cannot spawn VLC!")
546                 clean_proc()
547                 sys.exit(1)
548         # Return to our server tasks
549         server.handle_request()
550 except (KeyboardInterrupt, SystemExit):
551     shutdown()