Добавлена обработка заголовков запроса X-Forwarded...
[vpproxy.git] / vphttp.py
1 #!/usr/bin/env python2
2 # -*- coding: utf-8 -*-
3 '''
4 VPProxy: HTTP/HLS Stream to HTTP Multiplexing Proxy
5
6 Based on AceProxy (https://github.com/ValdikSS/AceProxy) design
7 '''
8 import traceback
9 import gevent
10 import gevent.monkey
11 # Monkeypatching and all the stuff
12 gevent.monkey.patch_all()
13 # Startup delay for daemon restart
14 gevent.sleep(5)
15 import glob
16 import os
17 import signal
18 import sys
19 import logging
20 import psutil
21 import BaseHTTPServer
22 import SocketServer
23 from socket import error as SocketException
24 from socket import SHUT_RDWR
25 import urllib2
26 import hashlib
27 import vpconfig
28 from vpconfig import VPConfig
29 import vlcclient
30 import gc
31 import plugins.modules.ipaddr as ipaddr
32 from clientcounter import ClientCounter
33 from plugins.modules.PluginInterface import VPProxyPlugin
34 try:
35     import pwd
36     import grp
37 except ImportError:
38     pass
39
40 import uuid
41
42 from apscheduler.schedulers.background import BackgroundScheduler
43
44 class HTTPHandler(BaseHTTPServer.BaseHTTPRequestHandler):
45
46     requestlist = []
47
48     def handle_one_request(self):
49         '''
50         Add request to requestlist, handle request and remove from the list
51         '''
52         HTTPHandler.requestlist.append(self)
53         BaseHTTPServer.BaseHTTPRequestHandler.handle_one_request(self)
54         HTTPHandler.requestlist.remove(self)
55
56     def closeConnection(self):
57         '''
58         Disconnecting client
59         '''
60         if self.clientconnected:
61             self.clientconnected = False
62             try:
63                 self.wfile.close()
64                 self.rfile.close()
65                 self.connection.shutdown(SHUT_RDWR)
66             except:
67                 pass
68
69     def dieWithError(self, errorcode=500):
70         '''
71         Close connection with error
72         '''
73         logging.warning("Dying with error")
74         if self.clientconnected:
75             self.send_error(errorcode)
76             self.end_headers()
77             self.closeConnection()
78
79     def proxyReadWrite(self):
80         '''
81         Read video stream and send it to client
82         '''
83         logger = logging.getLogger('http_proxyReadWrite')
84         logger.debug("Started")
85
86         self.vlcstate = True
87         self.streamstate = True
88
89         try:
90             while True:
91             
92                 if not self.clientconnected:
93                     logger.debug("Client is not connected, terminating")
94                     break
95
96                 VPStuff.vlcclient.mark(self.vlcid)
97                 data = self.video.read(4096)
98                 if data and self.clientconnected:
99                     self.wfile.write(data)
100                 else:
101                     logger.warning("Video connection closed")
102                     break
103
104         except SocketException:
105             # Video connection dropped
106             logger.warning("Video connection dropped")
107         finally:
108             self.video.close()
109             self.closeConnection()
110
111     def hangDetector(self):
112         '''
113         Detect client disconnection while in the middle of something
114         or just normal connection close.
115         '''
116         logger = logging.getLogger('http_hangDetector')
117         try:
118             while True:
119                 if not self.rfile.read():
120                     break
121         except:
122             pass
123         finally:
124             self.clientconnected = False
125             logger.debug("Client disconnected")
126             try:
127                 self.requestgreenlet.kill()
128             except:
129                 pass
130             finally:
131                 gevent.sleep()
132             return
133
134     def do_HEAD(self):
135         return self.do_GET(headers_only=True)
136
137     def do_GET(self, headers_only=False):
138         '''
139         GET request handler
140         '''
141
142         logger = logging.getLogger('http_HTTPHandler')
143         self.clientconnected = True
144         # Don't wait videodestroydelay if error happened
145         self.errorhappened = True
146         # Headers sent flag for fake headers UAs
147         self.headerssent = False
148         # Current greenlet
149         self.requestgreenlet = gevent.getcurrent()
150         # Connected client IP address
151         self.clientip = self.request.getpeername()[0]
152
153         req_headers = self.headers 
154         self.client_data = { 
155           'ip': self.clientip, 
156           'forwarded-for': req_headers.get('X-Forwarded-For'),
157           'client-agent': req_headers.get('User-Agent'),
158           'uuid': uuid.uuid4()
159           }
160           
161         if VPConfig.firewall:
162             # If firewall enabled
163             self.clientinrange = any(map(lambda i: ipaddr.IPAddress(self.clientip) \
164                                 in ipaddr.IPNetwork(i), VPConfig.firewallnetranges))
165
166             if (VPConfig.firewallblacklistmode and self.clientinrange) or \
167                 (not VPConfig.firewallblacklistmode and not self.clientinrange):
168                     logger.info('Dropping connection from ' + self.clientip + ' due to ' + \
169                                 'firewall rules')
170                     self.dieWithError(403)  # 403 Forbidden
171                     return
172
173         logger.info("Accepted connection from " + self.clientip + " path " + self.path)
174
175         try:
176             self.splittedpath = self.path.split('/')
177             self.reqtype = self.splittedpath[1].lower()
178             # If first parameter is 'pid' or 'torrent' or it should be handled
179             # by plugin
180             if not (self.reqtype in ('get','mp4','ogg','ogv') or self.reqtype in VPStuff.pluginshandlers):
181                 self.dieWithError(400)  # 400 Bad Request
182                 return
183         except IndexError:
184             self.dieWithError(400)  # 400 Bad Request
185             return
186
187         # Handle request with plugin handler
188         if self.reqtype in VPStuff.pluginshandlers:
189             try:
190                 VPStuff.pluginshandlers.get(self.reqtype).handle(self)
191             except Exception as e:
192                 logger.error('Plugin exception: ' + repr(e))
193                 logger.error(traceback.format_exc())
194                 self.dieWithError()
195             finally:
196                 self.closeConnection()
197                 return
198         self.handleRequest(headers_only)
199
200     def handleRequest(self, headers_only):
201       
202         # Limit concurrent connections
203         if 0 < VPConfig.maxconns <= VPStuff.clientcounter.total:
204             logger.debug("Maximum connections reached, can't serve this")
205             self.dieWithError(503)  # 503 Service Unavailable
206             return
207
208         # Pretend to work fine with Fake UAs or HEAD request.
209         useragent = self.headers.get('User-Agent')
210         logger.debug("HTTP User Agent:"+useragent)
211         fakeua = useragent and useragent in VPConfig.fakeuas
212         if headers_only or fakeua:
213             if fakeua:
214                 logger.debug("Got fake UA: " + self.headers.get('User-Agent'))
215             # Return 200 and exit
216             self.send_response(200)
217             self.send_header("Content-Type", "video/mpeg")
218             self.end_headers()
219             self.closeConnection()
220             return
221
222         self.path_unquoted = urllib2.unquote('/'.join(self.splittedpath[2:]))
223         # Make list with parameters
224         self.params = list()
225         for i in xrange(3, 8):
226             try:
227                 self.params.append(int(self.splittedpath[i]))
228             except (IndexError, ValueError):
229                 self.params.append('0')
230
231         # Adding client to clientcounter
232         clients = VPStuff.clientcounter.add(self.reqtype+'/'+self.path_unquoted, self.client_data)
233         # If we are the one client, but sucessfully got vp instance from clientcounter,
234         # then somebody is waiting in the videodestroydelay state
235
236         # Check if we are first client
237         if VPStuff.clientcounter.get(self.reqtype+'/'+self.path_unquoted)==1:
238             logger.debug("First client, should create VLC session")
239             shouldcreatevp = True
240         else:
241             logger.debug("Can reuse existing session")
242             shouldcreatevp = False
243
244         self.vlcid = hashlib.md5(self.reqtype+'/'+self.path_unquoted).hexdigest()
245
246         # Send fake headers if this User-Agent is in fakeheaderuas tuple
247         if fakeua:
248             logger.debug(
249                 "Sending fake headers for " + useragent)
250             self.send_response(200)
251             self.send_header('Cache-Control','no-cache, no-store, must-revalidate');
252             self.send_header('Pragma','no-cache');
253             if self.reqtype in ("ogg","ogv"):
254                 self.send_header("Content-Type", "video/ogg")
255             else:
256                 self.send_header("Content-Type", "video/mpeg")
257             self.end_headers()
258             # Do not send real headers at all
259             self.headerssent = True
260
261         try:
262             self.hanggreenlet = gevent.spawn(self.hangDetector)
263             logger.debug("hangDetector spawned")
264             gevent.sleep()
265
266             # Getting URL
267             self.errorhappened = False
268
269             if shouldcreatevp:
270                 logger.debug("Got url " + self.path_unquoted)
271                 # Force ffmpeg demuxing if set in config
272                 if VPConfig.vlcforceffmpeg:
273                     self.vlcprefix = 'http/ffmpeg://'
274                 else:
275                     self.vlcprefix = ''
276
277                 logger.info("Starting broadcasting "+self.path)                    
278                 VPStuff.vlcclient.startBroadcast(
279                     self.vlcid, self.vlcprefix + self.path_unquoted, VPConfig.vlcmux, VPConfig.vlcpreaccess, self.reqtype)
280                 # Sleep a bit, because sometimes VLC doesn't open port in
281                 # time
282                 gevent.sleep(0.5)
283
284             # Building new VLC url
285             self.url = 'http://' + VPConfig.vlchost + \
286                 ':' + str(VPConfig.vlcoutport) + '/' + self.vlcid
287             logger.debug("VLC url " + self.url)
288
289             # Sending client headers to videostream
290             self.video = urllib2.Request(self.url)
291             for key in self.headers.dict:
292                 self.video.add_header(key, self.headers.dict[key])
293
294             self.video = urllib2.urlopen(self.video)
295
296             # Sending videostream headers to client
297             if not self.headerssent:
298                 self.send_response(self.video.getcode())
299                 if self.video.info().dict.has_key('connection'):
300                     del self.video.info().dict['connection']
301                 if self.video.info().dict.has_key('server'):
302                     del self.video.info().dict['server']
303                 if self.video.info().dict.has_key('transfer-encoding'):
304                     del self.video.info().dict['transfer-encoding']
305                 if self.video.info().dict.has_key('content-type'):
306                     del self.video.info().dict['content-type']
307                 if self.video.info().dict.has_key('keep-alive'):
308                     del self.video.info().dict['keep-alive']
309
310                 for key in self.video.info().dict:
311                     self.send_header(key, self.video.info().dict[key])
312
313                 self.send_header('Cache-Control','no-cache, no-store, must-revalidate');
314                 self.send_header('Pragma','no-cache');
315
316                 if self.reqtype=="ogg":
317                     self.send_header("Content-Type", "video/ogg")
318                 else:
319                     self.send_header("Content-Type", "video/mpeg")
320
321                 # End headers. Next goes video data
322                 self.end_headers()
323                 logger.debug("Headers sent")
324                 self.headerssent = True
325
326             # Run proxyReadWrite
327             self.proxyReadWrite()
328
329             # Waiting until hangDetector is joined
330             self.hanggreenlet.join()
331             logger.debug("Request handler finished")
332
333         except (vpclient.VPException, vlcclient.VlcException, urllib2.URLError) as e:
334             logger.error("Exception: " + repr(e))
335             self.errorhappened = True
336             self.dieWithError()
337         except gevent.GreenletExit:
338             # hangDetector told us about client disconnection
339             pass
340         except Exception:
341             # Unknown exception
342             logger.error(traceback.format_exc())
343             self.errorhappened = True
344             self.dieWithError()
345         finally:
346             logger.debug("END REQUEST")
347             logger.info("Closed connection from " + self.clientip + " path " + self.path)
348             VPStuff.clientcounter.delete(self.reqtype+'/'+self.path_unquoted, self.client_data)
349             self.vp.destroy()
350
351 class HTTPServer(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer):
352
353     def handle_error(self, request, client_address):
354         # Do not print HTTP tracebacks
355         pass
356
357
358 class VPStuff(object):
359     '''
360     Inter-class interaction class
361     '''
362     vlcclient=None
363
364 # taken from http://stackoverflow.com/questions/2699907/dropping-root-permissions-in-python
365 def drop_privileges(uid_name, gid_name='nogroup'):
366
367     # Get the uid/gid from the name
368     running_uid = pwd.getpwnam(uid_name).pw_uid
369     running_uid_home = pwd.getpwnam(uid_name).pw_dir
370     running_gid = grp.getgrnam(gid_name).gr_gid
371
372     # Remove group privileges
373     os.setgroups([])
374
375     # Try setting the new uid/gid
376     os.setgid(running_gid)
377     os.setuid(running_uid)
378
379     # Ensure a very conservative umask
380     old_umask = os.umask(077)
381
382     if os.getuid() == running_uid and os.getgid() == running_gid:
383         # could be useful
384         os.environ['HOME'] = running_uid_home
385         return True
386     return False
387
388 logging.basicConfig(
389     filename=VPConfig.logpath + 'vphttp.log' if VPConfig.loggingtoafile else None,
390     format='%(asctime)s %(levelname)s %(name)s: %(message)s', datefmt='%d.%m.%Y %H:%M:%S', level=VPConfig.debug)
391 logger = logging.getLogger('INIT')
392
393 # Loading plugins
394 # Trying to change dir (would fail in freezed state)
395 try:
396     os.chdir(os.path.dirname(os.path.realpath(__file__)))
397 except:
398     pass
399 # Creating dict of handlers
400 VPStuff.pluginshandlers = dict()
401 # And a list with plugin instances
402 VPStuff.pluginlist = list()
403 pluginsmatch = glob.glob('plugins/*_plugin.py')
404 sys.path.insert(0, 'plugins')
405 pluginslist = [os.path.splitext(os.path.basename(x))[0] for x in pluginsmatch]
406 for i in pluginslist:
407     plugin = __import__(i)
408     plugname = i.split('_')[0].capitalize()
409     try:
410         plugininstance = getattr(plugin, plugname)(VPConfig, VPStuff)
411     except Exception as e:
412         logger.error("Cannot load plugin " + plugname + ": " + repr(e))
413         continue
414     logger.debug('Plugin loaded: ' + plugname)
415     for j in plugininstance.handlers:
416         logger.info("Registering handler '" + j +"'")
417         VPStuff.pluginshandlers[j] = plugininstance
418     VPStuff.pluginlist.append(plugininstance)
419
420 # Check whether we can bind to the defined port safely
421 if os.getuid() != 0 and VPConfig.httpport <= 1024:
422     logger.error("Cannot bind to port " + str(VPConfig.httpport) + " without root privileges")
423     sys.exit(1)
424
425 server = HTTPServer((VPConfig.httphost, VPConfig.httpport), HTTPHandler)
426 logger = logging.getLogger('HTTP')
427
428 # Dropping root privileges if needed
429 if VPConfig.vpproxyuser and os.getuid() == 0:
430     if drop_privileges(VPConfig.vpproxyuser):
431         logger.info("Dropped privileges to user " + VPConfig.vpproxyuser)
432     else:
433         logger.error("Cannot drop privileges to user " + VPConfig.vpproxyuser)
434         sys.exit(1)
435
436 # Creating ClientCounter
437 VPStuff.clientcounter = ClientCounter()
438
439 DEVNULL = open(os.devnull, 'wb')
440
441 # Spawning procedures
442 def spawnVLC(cmd, delay = 0):
443     try:
444         VPStuff.vlc = psutil.Popen(cmd) #, stdout=DEVNULL, stderr=DEVNULL)
445         gevent.sleep(delay)
446         return True
447     except:
448         return False
449
450 def connectVLC():
451     try:
452         VPStuff.vlcclient = vlcclient.VlcClient(
453             host=VPConfig.vlchost, port=VPConfig.vlcport, password=VPConfig.vlcpass,
454             out_port=VPConfig.vlcoutport)
455         return True
456     except vlcclient.VlcException as e:
457         return False
458
459 def isRunning(process):
460     if psutil.version_info[0] >= 2:
461         if process.is_running() and process.status() != psutil.STATUS_ZOMBIE:
462             return True
463     else:  # for older versions of psutil
464         if process.is_running() and process.status != psutil.STATUS_ZOMBIE:
465             return True
466     return False
467
468 def findProcess(name):
469     for proc in psutil.process_iter():
470         try:
471             pinfo = proc.as_dict(attrs=['pid', 'name'])
472             if pinfo['name'] == name:
473                 return pinfo['pid']
474         except psutil.AccessDenied:
475             # System process
476             pass
477         except psutil.NoSuchProcess:
478             # Process terminated
479             pass
480     return None
481
482 def clean_proc():
483     # Trying to close all spawned processes gracefully
484     if isRunning(VPStuff.vlc):
485         if VPStuff.vlcclient:
486             VPStuff.vlcclient.destroy()
487         gevent.sleep(1)
488     if isRunning(VPStuff.vlc):
489         # or not :)
490         VPStuff.vlc.kill()
491
492 # This is what we call to stop the server completely
493 def shutdown(signum = 0, frame = 0):
494     logger.info("Stopping server...")
495     # Closing all client connections
496     for connection in server.RequestHandlerClass.requestlist:
497         try:
498             # Set errorhappened to prevent waiting for videodestroydelay
499             connection.errorhappened = True
500             connection.closeConnection()
501         except:
502             logger.warning("Cannot kill a connection!")
503     clean_proc()
504     server.server_close()
505     sys.exit()
506
507 def _reloadconfig(signum=None, frame=None):
508     '''
509     Reload configuration file.
510     SIGHUP handler.
511     '''
512     global VPConfig
513
514     logger = logging.getLogger('reloadconfig')
515     reload(vpconfig)
516     from vpconfig import VPConfig
517     logger.info('Config reloaded')
518
519 sched = BackgroundScheduler()
520 sched.start()
521
522 def clean_streams():
523   VPStuff.vlcclient.clean_streams(VPConfig.videodestroydelay)
524
525 job = sched.add_job(clean_streams, 'interval', seconds=15)
526
527 # setting signal handlers
528 try:
529     gevent.signal(signal.SIGHUP, _reloadconfig)
530     gevent.signal(signal.SIGTERM, shutdown)
531 except AttributeError:
532     pass
533
534 name = 'vlc'
535 VPStuff.vlcProc = VPConfig.vlccmd.split()
536 if spawnVLC(VPStuff.vlcProc, VPConfig.vlcspawntimeout) and connectVLC():
537     logger.info("VLC spawned with pid " + str(VPStuff.vlc.pid))
538 else:
539     logger.error('Cannot spawn or connect to VLC!')
540     clean_proc()
541     sys.exit(1)
542
543 try:
544     logger.info("Using gevent %s" % gevent.__version__)
545     logger.info("Usig psutil %s" % psutil.__version__)
546     logger.info("Using VLC %s" % VPStuff.vlcclient._vlcver)
547     logger.info("Server started.")
548     while True:
549         if not isRunning(VPStuff.vlc):
550             del VPStuff.vlc
551             if spawnVLC(VPStuff.vlcProc, VPConfig.vlcspawntimeout) and connectVLC():
552                 logger.info("VLC died, respawned it with pid " + str(VPStuff.vlc.pid))
553             else:
554                 logger.error("Cannot spawn VLC!")
555                 clean_proc()
556                 sys.exit(1)
557         # Return to our server tasks
558         server.handle_request()
559 except (KeyboardInterrupt, SystemExit):
560     sched.shutdown()
561     shutdown()