Переход на асинхронное закрытие каналов с таймаутом, чтобы при обрыве соединения...
[vpproxy.git] / vphttp.py
1 #!/usr/bin/env python2
2 # -*- coding: utf-8 -*-
3 '''
4 VPProxy: HTTP/HLS Stream to HTTP Multiplexing Proxy
5
6 Based on AceProxy (https://github.com/ValdikSS/AceProxy) design
7 '''
8 import traceback
9 import gevent
10 import gevent.monkey
11 # Monkeypatching and all the stuff
12 gevent.monkey.patch_all()
13 # Startup delay for daemon restart
14 gevent.sleep(5)
15 import glob
16 import os
17 import signal
18 import sys
19 import logging
20 import psutil
21 import BaseHTTPServer
22 import SocketServer
23 from socket import error as SocketException
24 from socket import SHUT_RDWR
25 import urllib2
26 import hashlib
27 import vpconfig
28 from vpconfig import VPConfig
29 import vlcclient
30 import gc
31 import plugins.modules.ipaddr as ipaddr
32 from clientcounter import ClientCounter
33 from plugins.modules.PluginInterface import VPProxyPlugin
34 try:
35     import pwd
36     import grp
37 except ImportError:
38     pass
39
40 import uuid
41
42 from apscheduler.schedulers.background import BackgroundScheduler
43
44 class HTTPHandler(BaseHTTPServer.BaseHTTPRequestHandler):
45
46     requestlist = []
47
48     def handle_one_request(self):
49         '''
50         Add request to requestlist, handle request and remove from the list
51         '''
52         HTTPHandler.requestlist.append(self)
53         BaseHTTPServer.BaseHTTPRequestHandler.handle_one_request(self)
54         HTTPHandler.requestlist.remove(self)
55
56     def closeConnection(self):
57         '''
58         Disconnecting client
59         '''
60         if self.clientconnected:
61             self.clientconnected = False
62             try:
63                 self.wfile.close()
64                 self.rfile.close()
65                 self.connection.shutdown(SHUT_RDWR)
66             except:
67                 pass
68
69     def dieWithError(self, errorcode=500):
70         '''
71         Close connection with error
72         '''
73         logging.warning("Dying with error")
74         if self.clientconnected:
75             self.send_error(errorcode)
76             self.end_headers()
77             self.closeConnection()
78
79     def proxyReadWrite(self):
80         '''
81         Read video stream and send it to client
82         '''
83         logger = logging.getLogger('http_proxyReadWrite')
84         logger.debug("Started")
85
86         self.vlcstate = True
87         self.streamstate = True
88
89         try:
90             while True:
91             
92                 if not self.clientconnected:
93                     logger.debug("Client is not connected, terminating")
94                     break
95
96                 VPStuff.vlcclient.mark(self.vlcid)
97                 data = self.video.read(4096)
98                 if data and self.clientconnected:
99                     self.wfile.write(data)
100                 else:
101                     logger.warning("Video connection closed")
102                     break
103
104         except SocketException:
105             # Video connection dropped
106             logger.warning("Video connection dropped")
107         finally:
108             self.video.close()
109             self.closeConnection()
110
111     def hangDetector(self):
112         '''
113         Detect client disconnection while in the middle of something
114         or just normal connection close.
115         '''
116         logger = logging.getLogger('http_hangDetector')
117         try:
118             while True:
119                 if not self.rfile.read():
120                     break
121         except:
122             pass
123         finally:
124             self.clientconnected = False
125             logger.debug("Client disconnected")
126             try:
127                 self.requestgreenlet.kill()
128             except:
129                 pass
130             finally:
131                 gevent.sleep()
132             return
133
134     def do_HEAD(self):
135         return self.do_GET(headers_only=True)
136
137     def do_GET(self, headers_only=False):
138         '''
139         GET request handler
140         '''
141         logger = logging.getLogger('http_HTTPHandler')
142         self.clientconnected = True
143         # Don't wait videodestroydelay if error happened
144         self.errorhappened = True
145         # Headers sent flag for fake headers UAs
146         self.headerssent = False
147         # Current greenlet
148         self.requestgreenlet = gevent.getcurrent()
149         # Connected client IP address
150         self.clientip = self.request.getpeername()[0]
151
152         if VPConfig.firewall:
153             # If firewall enabled
154             self.clientinrange = any(map(lambda i: ipaddr.IPAddress(self.clientip) \
155                                 in ipaddr.IPNetwork(i), VPConfig.firewallnetranges))
156
157             if (VPConfig.firewallblacklistmode and self.clientinrange) or \
158                 (not VPConfig.firewallblacklistmode and not self.clientinrange):
159                     logger.info('Dropping connection from ' + self.clientip + ' due to ' + \
160                                 'firewall rules')
161                     self.dieWithError(403)  # 403 Forbidden
162                     return
163
164         logger.info("Accepted connection from " + self.clientip + " path " + self.path)
165
166         try:
167             self.splittedpath = self.path.split('/')
168             self.reqtype = self.splittedpath[1].lower()
169             # If first parameter is 'pid' or 'torrent' or it should be handled
170             # by plugin
171             if not (self.reqtype in ('get','mp4','ogg','ogv') or self.reqtype in VPStuff.pluginshandlers):
172                 self.dieWithError(400)  # 400 Bad Request
173                 return
174         except IndexError:
175             self.dieWithError(400)  # 400 Bad Request
176             return
177
178         # Handle request with plugin handler
179         if self.reqtype in VPStuff.pluginshandlers:
180             try:
181                 VPStuff.pluginshandlers.get(self.reqtype).handle(self)
182             except Exception as e:
183                 logger.error('Plugin exception: ' + repr(e))
184                 logger.error(traceback.format_exc())
185                 self.dieWithError()
186             finally:
187                 self.closeConnection()
188                 return
189         self.handleRequest(headers_only)
190
191     def handleRequest(self, headers_only):
192       
193         self.unic = uuid.uuid4()  
194
195         # Limit concurrent connections
196         if 0 < VPConfig.maxconns <= VPStuff.clientcounter.total:
197             logger.debug("Maximum connections reached, can't serve this")
198             self.dieWithError(503)  # 503 Service Unavailable
199             return
200
201         # Pretend to work fine with Fake UAs or HEAD request.
202         useragent = self.headers.get('User-Agent')
203         logger.debug("HTTP User Agent:"+useragent)
204         fakeua = useragent and useragent in VPConfig.fakeuas
205         if headers_only or fakeua:
206             if fakeua:
207                 logger.debug("Got fake UA: " + self.headers.get('User-Agent'))
208             # Return 200 and exit
209             self.send_response(200)
210             self.send_header("Content-Type", "video/mpeg")
211             self.end_headers()
212             self.closeConnection()
213             return
214
215         self.path_unquoted = urllib2.unquote('/'.join(self.splittedpath[2:]))
216         # Make list with parameters
217         self.params = list()
218         for i in xrange(3, 8):
219             try:
220                 self.params.append(int(self.splittedpath[i]))
221             except (IndexError, ValueError):
222                 self.params.append('0')
223
224         # Adding client to clientcounter
225         clients = VPStuff.clientcounter.add(self.reqtype+'\\'+self.path_unquoted, self.clientip, self.unic)
226         # If we are the one client, but sucessfully got vp instance from clientcounter,
227         # then somebody is waiting in the videodestroydelay state
228
229         # Check if we are first client
230         if VPStuff.clientcounter.get(self.reqtype+'\\'+self.path_unquoted)==1:
231             logger.debug("First client, should create VLC session")
232             shouldcreatevp = True
233         else:
234             logger.debug("Can reuse existing session")
235             shouldcreatevp = False
236
237         self.vlcid = hashlib.md5(self.reqtype+'\\'+self.path_unquoted).hexdigest()
238
239         # Send fake headers if this User-Agent is in fakeheaderuas tuple
240         if fakeua:
241             logger.debug(
242                 "Sending fake headers for " + useragent)
243             self.send_response(200)
244             self.send_header('Cache-Control','no-cache, no-store, must-revalidate');
245             self.send_header('Pragma','no-cache');
246             if self.reqtype in ("ogg","ogv"):
247                 self.send_header("Content-Type", "video/ogg")
248             else:
249                 self.send_header("Content-Type", "video/mpeg")
250             self.end_headers()
251             # Do not send real headers at all
252             self.headerssent = True
253
254         try:
255             self.hanggreenlet = gevent.spawn(self.hangDetector)
256             logger.debug("hangDetector spawned")
257             gevent.sleep()
258
259             # Getting URL
260             self.errorhappened = False
261
262             if shouldcreatevp:
263                 logger.debug("Got url " + self.path_unquoted)
264                 # Force ffmpeg demuxing if set in config
265                 if VPConfig.vlcforceffmpeg:
266                     self.vlcprefix = 'http/ffmpeg://'
267                 else:
268                     self.vlcprefix = ''
269
270                 logger.info("Starting broadcasting "+self.path)                    
271                 VPStuff.vlcclient.startBroadcast(
272                     self.vlcid, self.vlcprefix + self.path_unquoted, VPConfig.vlcmux, VPConfig.vlcpreaccess, self.reqtype)
273                 # Sleep a bit, because sometimes VLC doesn't open port in
274                 # time
275                 gevent.sleep(0.5)
276
277             # Building new VLC url
278             self.url = 'http://' + VPConfig.vlchost + \
279                 ':' + str(VPConfig.vlcoutport) + '/' + self.vlcid
280             logger.debug("VLC url " + self.url)
281
282             # Sending client headers to videostream
283             self.video = urllib2.Request(self.url)
284             for key in self.headers.dict:
285                 self.video.add_header(key, self.headers.dict[key])
286
287             self.video = urllib2.urlopen(self.video)
288
289             # Sending videostream headers to client
290             if not self.headerssent:
291                 self.send_response(self.video.getcode())
292                 if self.video.info().dict.has_key('connection'):
293                     del self.video.info().dict['connection']
294                 if self.video.info().dict.has_key('server'):
295                     del self.video.info().dict['server']
296                 if self.video.info().dict.has_key('transfer-encoding'):
297                     del self.video.info().dict['transfer-encoding']
298                 if self.video.info().dict.has_key('content-type'):
299                     del self.video.info().dict['content-type']
300                 if self.video.info().dict.has_key('keep-alive'):
301                     del self.video.info().dict['keep-alive']
302
303                 for key in self.video.info().dict:
304                     self.send_header(key, self.video.info().dict[key])
305
306                 self.send_header('Cache-Control','no-cache, no-store, must-revalidate');
307                 self.send_header('Pragma','no-cache');
308
309                 if self.reqtype=="ogg":
310                     self.send_header("Content-Type", "video/ogg")
311                 else:
312                     self.send_header("Content-Type", "video/mpeg")
313
314                 # End headers. Next goes video data
315                 self.end_headers()
316                 logger.debug("Headers sent")
317                 self.headerssent = True
318
319             # Run proxyReadWrite
320             self.proxyReadWrite()
321
322             # Waiting until hangDetector is joined
323             self.hanggreenlet.join()
324             logger.debug("Request handler finished")
325
326         except (vpclient.VPException, vlcclient.VlcException, urllib2.URLError) as e:
327             logger.error("Exception: " + repr(e))
328             self.errorhappened = True
329             self.dieWithError()
330         except gevent.GreenletExit:
331             # hangDetector told us about client disconnection
332             pass
333         except Exception:
334             # Unknown exception
335             logger.error(traceback.format_exc())
336             self.errorhappened = True
337             self.dieWithError()
338         finally:
339             logger.debug("END REQUEST")
340             logger.info("Closed connection from " + self.clientip + " path " + self.path)
341             VPStuff.clientcounter.delete(self.reqtype+'\\'+self.path_unquoted, self.clientip, self.unic)
342 #            if not VPStuff.clientcounter.get(self.reqtype+'\\'+self.path_unquoted):
343 #                try:
344 #                    logger.debug("That was the last client, destroying VPClient")
345 #                    logger.info("Stopping broadcasting " + self.path)
346 #                    VPStuff.vlcclient.stopBroadcast(self.vlcid)
347 #                except vlcclient.VlcException:    
348 #                  logger.error("VLC connection problem, %s client(s)!" % VPStuff.clientcounter.total)
349 #                  if VPStuff.clientcounter.total == 0:
350 #                      logger.error("Probably VLC hang")
351 #                      VPStuff.vlc.kill()
352 #                except:
353 #                    pass
354             self.vp.destroy()
355
356 class HTTPServer(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer):
357
358     def handle_error(self, request, client_address):
359         # Do not print HTTP tracebacks
360         pass
361
362
363 class VPStuff(object):
364     '''
365     Inter-class interaction class
366     '''
367     vlcclient=None
368
369 # taken from http://stackoverflow.com/questions/2699907/dropping-root-permissions-in-python
370 def drop_privileges(uid_name, gid_name='nogroup'):
371
372     # Get the uid/gid from the name
373     running_uid = pwd.getpwnam(uid_name).pw_uid
374     running_uid_home = pwd.getpwnam(uid_name).pw_dir
375     running_gid = grp.getgrnam(gid_name).gr_gid
376
377     # Remove group privileges
378     os.setgroups([])
379
380     # Try setting the new uid/gid
381     os.setgid(running_gid)
382     os.setuid(running_uid)
383
384     # Ensure a very conservative umask
385     old_umask = os.umask(077)
386
387     if os.getuid() == running_uid and os.getgid() == running_gid:
388         # could be useful
389         os.environ['HOME'] = running_uid_home
390         return True
391     return False
392
393 logging.basicConfig(
394     filename=VPConfig.logpath + 'vphttp.log' if VPConfig.loggingtoafile else None,
395     format='%(asctime)s %(levelname)s %(name)s: %(message)s', datefmt='%d.%m.%Y %H:%M:%S', level=VPConfig.debug)
396 logger = logging.getLogger('INIT')
397
398 # Loading plugins
399 # Trying to change dir (would fail in freezed state)
400 try:
401     os.chdir(os.path.dirname(os.path.realpath(__file__)))
402 except:
403     pass
404 # Creating dict of handlers
405 VPStuff.pluginshandlers = dict()
406 # And a list with plugin instances
407 VPStuff.pluginlist = list()
408 pluginsmatch = glob.glob('plugins/*_plugin.py')
409 sys.path.insert(0, 'plugins')
410 pluginslist = [os.path.splitext(os.path.basename(x))[0] for x in pluginsmatch]
411 for i in pluginslist:
412     plugin = __import__(i)
413     plugname = i.split('_')[0].capitalize()
414     try:
415         plugininstance = getattr(plugin, plugname)(VPConfig, VPStuff)
416     except Exception as e:
417         logger.error("Cannot load plugin " + plugname + ": " + repr(e))
418         continue
419     logger.debug('Plugin loaded: ' + plugname)
420     for j in plugininstance.handlers:
421         logger.info("Registering handler '" + j +"'")
422         VPStuff.pluginshandlers[j] = plugininstance
423     VPStuff.pluginlist.append(plugininstance)
424
425 # Check whether we can bind to the defined port safely
426 if os.getuid() != 0 and VPConfig.httpport <= 1024:
427     logger.error("Cannot bind to port " + str(VPConfig.httpport) + " without root privileges")
428     sys.exit(1)
429
430 server = HTTPServer((VPConfig.httphost, VPConfig.httpport), HTTPHandler)
431 logger = logging.getLogger('HTTP')
432
433 # Dropping root privileges if needed
434 if VPConfig.vpproxyuser and os.getuid() == 0:
435     if drop_privileges(VPConfig.vpproxyuser):
436         logger.info("Dropped privileges to user " + VPConfig.vpproxyuser)
437     else:
438         logger.error("Cannot drop privileges to user " + VPConfig.vpproxyuser)
439         sys.exit(1)
440
441 # Creating ClientCounter
442 VPStuff.clientcounter = ClientCounter()
443
444 DEVNULL = open(os.devnull, 'wb')
445
446 # Spawning procedures
447 def spawnVLC(cmd, delay = 0):
448     try:
449         VPStuff.vlc = psutil.Popen(cmd) #, stdout=DEVNULL, stderr=DEVNULL)
450         gevent.sleep(delay)
451         return True
452     except:
453         return False
454
455 def connectVLC():
456     try:
457         VPStuff.vlcclient = vlcclient.VlcClient(
458             host=VPConfig.vlchost, port=VPConfig.vlcport, password=VPConfig.vlcpass,
459             out_port=VPConfig.vlcoutport)
460         return True
461     except vlcclient.VlcException as e:
462         return False
463
464 def isRunning(process):
465     if psutil.version_info[0] >= 2:
466         if process.is_running() and process.status() != psutil.STATUS_ZOMBIE:
467             return True
468     else:  # for older versions of psutil
469         if process.is_running() and process.status != psutil.STATUS_ZOMBIE:
470             return True
471     return False
472
473 def findProcess(name):
474     for proc in psutil.process_iter():
475         try:
476             pinfo = proc.as_dict(attrs=['pid', 'name'])
477             if pinfo['name'] == name:
478                 return pinfo['pid']
479         except psutil.AccessDenied:
480             # System process
481             pass
482         except psutil.NoSuchProcess:
483             # Process terminated
484             pass
485     return None
486
487 def clean_proc():
488     # Trying to close all spawned processes gracefully
489     if isRunning(VPStuff.vlc):
490         if VPStuff.vlcclient:
491             VPStuff.vlcclient.destroy()
492         gevent.sleep(1)
493     if isRunning(VPStuff.vlc):
494         # or not :)
495         VPStuff.vlc.kill()
496
497 # This is what we call to stop the server completely
498 def shutdown(signum = 0, frame = 0):
499     logger.info("Stopping server...")
500     # Closing all client connections
501     for connection in server.RequestHandlerClass.requestlist:
502         try:
503             # Set errorhappened to prevent waiting for videodestroydelay
504             connection.errorhappened = True
505             connection.closeConnection()
506         except:
507             logger.warning("Cannot kill a connection!")
508     clean_proc()
509     server.server_close()
510     sys.exit()
511
512 def _reloadconfig(signum=None, frame=None):
513     '''
514     Reload configuration file.
515     SIGHUP handler.
516     '''
517     global VPConfig
518
519     logger = logging.getLogger('reloadconfig')
520     reload(vpconfig)
521     from vpconfig import VPConfig
522     logger.info('Config reloaded')
523
524 sched = BackgroundScheduler()
525 sched.start()
526
527 def clean_streams():
528   VPStuff.vlcclient.clean_streams(15)
529
530 job = sched.add_job(clean_streams, 'interval', seconds=15)
531
532 # setting signal handlers
533 try:
534     gevent.signal(signal.SIGHUP, _reloadconfig)
535     gevent.signal(signal.SIGTERM, shutdown)
536 except AttributeError:
537     pass
538
539 name = 'vlc'
540 VPStuff.vlcProc = VPConfig.vlccmd.split()
541 if spawnVLC(VPStuff.vlcProc, VPConfig.vlcspawntimeout) and connectVLC():
542     logger.info("VLC spawned with pid " + str(VPStuff.vlc.pid))
543 else:
544     logger.error('Cannot spawn or connect to VLC!')
545     clean_proc()
546     sys.exit(1)
547
548 try:
549     logger.info("Using gevent %s" % gevent.__version__)
550     logger.info("Usig psutil %s" % psutil.__version__)
551     logger.info("Using VLC %s" % VPStuff.vlcclient._vlcver)
552     logger.info("Server started.")
553     while True:
554         if not isRunning(VPStuff.vlc):
555             del VPStuff.vlc
556             if spawnVLC(VPStuff.vlcProc, VPConfig.vlcspawntimeout) and connectVLC():
557                 logger.info("VLC died, respawned it with pid " + str(VPStuff.vlc.pid))
558             else:
559                 logger.error("Cannot spawn VLC!")
560                 clean_proc()
561                 sys.exit(1)
562         # Return to our server tasks
563         server.handle_request()
564 except (KeyboardInterrupt, SystemExit):
565     shutdown()