Добавлен юнит для systemd
[vpproxy.git] / vphttp.py
1 #!/usr/bin/env python2
2 # -*- coding: utf-8 -*-
3 '''
4 VPProxy: HTTP/HLS Stream to HTTP Multiplexing Proxy
5
6 Based on AceProxy (https://github.com/ValdikSS/AceProxy) design
7 '''
8 import traceback
9 import gevent
10 import gevent.monkey
11 # Monkeypatching and all the stuff
12 gevent.monkey.patch_all()
13 # Startup delay for daemon restart
14 gevent.sleep(5)
15 import glob
16 import os
17 import signal
18 import sys
19 import logging
20 import psutil
21 import BaseHTTPServer
22 import SocketServer
23 from socket import error as SocketException
24 from socket import SHUT_RDWR
25 import urllib2
26 import hashlib
27 import vpconfig
28 from vpconfig import VPConfig
29 import vlcclient
30 import gc
31 import plugins.modules.ipaddr as ipaddr
32 from clientcounter import ClientCounter
33 from plugins.modules.PluginInterface import VPProxyPlugin
34 try:
35     import pwd
36     import grp
37 except ImportError:
38     pass
39
40 import uuid
41
42 from apscheduler.schedulers.background import BackgroundScheduler
43
44 class HTTPHandler(BaseHTTPServer.BaseHTTPRequestHandler):
45
46     requestlist = []
47
48     def handle_one_request(self):
49         '''
50         Add request to requestlist, handle request and remove from the list
51         '''
52         HTTPHandler.requestlist.append(self)
53         BaseHTTPServer.BaseHTTPRequestHandler.handle_one_request(self)
54         HTTPHandler.requestlist.remove(self)
55
56     def closeConnection(self):
57         '''
58         Disconnecting client
59         '''
60         if self.clientconnected:
61             self.clientconnected = False
62             try:
63                 self.wfile.close()
64                 self.rfile.close()
65                 self.connection.shutdown(SHUT_RDWR)
66             except:
67                 pass
68
69     def dieWithError(self, errorcode=500):
70         '''
71         Close connection with error
72         '''
73         logging.warning("Dying with error")
74         if self.clientconnected:
75             self.send_error(errorcode)
76             self.end_headers()
77             self.closeConnection()
78
79     def proxyReadWrite(self):
80         '''
81         Read video stream and send it to client
82         '''
83         logger = logging.getLogger('http_proxyReadWrite')
84         logger.debug("Started")
85
86         self.vlcstate = True
87         self.streamstate = True
88
89         try:
90             while True:
91             
92                 if not self.clientconnected:
93                     logger.debug("Client is not connected, terminating")
94                     break
95
96                 VPStuff.vlcclient.mark(self.vlcid)
97                 data = self.video.read(4096)
98                 if data and self.clientconnected:
99                     self.wfile.write(data)
100                 else:
101                     logger.warning("Video connection closed")
102                     break
103
104         except SocketException:
105             # Video connection dropped
106             logger.warning("Video connection dropped")
107         finally:
108             self.video.close()
109             self.closeConnection()
110
111     def hangDetector(self):
112         '''
113         Detect client disconnection while in the middle of something
114         or just normal connection close.
115         '''
116         logger = logging.getLogger('http_hangDetector')
117         try:
118             while True:
119                 if not self.rfile.read():
120                     break
121         except:
122             pass
123         finally:
124             self.clientconnected = False
125             logger.debug("Client disconnected")
126             try:
127                 self.requestgreenlet.kill()
128             except:
129                 pass
130             finally:
131                 gevent.sleep()
132             return
133
134     def do_HEAD(self):
135         return self.do_GET(headers_only=True)
136
137     def do_GET(self, headers_only=False):
138         '''
139         GET request handler
140         '''
141
142         logger = logging.getLogger('http_HTTPHandler')
143         self.clientconnected = True
144         # Don't wait videodestroydelay if error happened
145         self.errorhappened = True
146         # Headers sent flag for fake headers UAs
147         self.headerssent = False
148         # Current greenlet
149         self.requestgreenlet = gevent.getcurrent()
150         # Connected client IP address
151         self.clientip = self.request.getpeername()[0]
152
153         req_headers = self.headers 
154         self.client_data = { 
155           'ip': self.clientip, 
156           'forwarded-for': req_headers.get('X-Forwarded-For'),
157           'client-agent': req_headers.get('User-Agent'),
158           'uuid': uuid.uuid4()
159           }
160           
161         if VPConfig.firewall:
162             # If firewall enabled
163             self.clientinrange = any(map(lambda i: ipaddr.IPAddress(self.clientip) \
164                                 in ipaddr.IPNetwork(i), VPConfig.firewallnetranges))
165
166             if (VPConfig.firewallblacklistmode and self.clientinrange) or \
167                 (not VPConfig.firewallblacklistmode and not self.clientinrange):
168                     logger.info('Dropping connection from ' + self.clientip + ' due to ' + \
169                                 'firewall rules')
170                     self.dieWithError(403)  # 403 Forbidden
171                     return
172
173         logger.info("Accepted connection from " + self.clientip + " path " + self.path)
174
175         try:
176             self.splittedpath = self.path.split('/')
177             self.reqtype = self.splittedpath[1].lower()
178             # If first parameter is 'pid' or 'torrent' or it should be handled
179             # by plugin
180             if not (self.reqtype in ('get','mp4','ogg','ogv') or self.reqtype in VPStuff.pluginshandlers):
181                 self.dieWithError(400)  # 400 Bad Request
182                 return
183         except IndexError:
184             self.dieWithError(400)  # 400 Bad Request
185             return
186
187         # Handle request with plugin handler
188         if self.reqtype in VPStuff.pluginshandlers:
189             try:
190                 VPStuff.pluginshandlers.get(self.reqtype).handle(self)
191             except Exception as e:
192                 logger.error('Plugin exception: ' + repr(e))
193                 logger.error(traceback.format_exc())
194                 self.dieWithError()
195             finally:
196                 self.closeConnection()
197                 return
198         self.handleRequest(headers_only)
199
200     def handleRequest(self, headers_only):
201       
202         # Limit concurrent connections
203         if 0 < VPConfig.maxconns <= VPStuff.clientcounter.total:
204             logger.debug("Maximum connections reached, can't serve this")
205             self.dieWithError(503)  # 503 Service Unavailable
206             return
207
208         # Pretend to work fine with Fake UAs or HEAD request.
209         useragent = self.headers.get('User-Agent')
210         logger.debug("HTTP User Agent:"+useragent)
211         fakeua = useragent and useragent in VPConfig.fakeuas
212         if headers_only or fakeua:
213             if fakeua:
214                 logger.debug("Got fake UA: " + self.headers.get('User-Agent'))
215             # Return 200 and exit
216             self.send_response(200)
217             self.send_header("Content-Type", "video/mpeg")
218             self.end_headers()
219             self.closeConnection()
220             return
221
222         self.path_unquoted = urllib2.unquote('/'.join(self.splittedpath[2:]))
223         # Make list with parameters
224         self.params = list()
225         for i in xrange(3, 8):
226             try:
227                 self.params.append(int(self.splittedpath[i]))
228             except (IndexError, ValueError):
229                 self.params.append('0')
230
231         # Adding client to clientcounter
232         clients = VPStuff.clientcounter.add(self.reqtype+'/'+self.path_unquoted, self.client_data)
233         # If we are the one client, but sucessfully got vp instance from clientcounter,
234         # then somebody is waiting in the videodestroydelay state
235
236         # Check if we are first client
237
238         self.vlcid = hashlib.md5(self.reqtype+'/'+self.path_unquoted).hexdigest()
239
240         try:
241             if not VPStuff.vlcclient.check_stream(self.vlcid):
242                 logger.debug("First client, should create VLC session")
243                 shouldcreatevp = True
244             else:
245                 logger.debug("Can reuse existing session")
246                 shouldcreatevp = False
247         except Exception as e:
248             logger.error('Plugin exception: ' + repr(e))
249             logger.error(traceback.format_exc())
250             self.dieWithError()            
251
252         # Send fake headers if this User-Agent is in fakeheaderuas tuple
253         if fakeua:
254             logger.debug(
255                 "Sending fake headers for " + useragent)
256             self.send_response(200)
257             self.send_header('Cache-Control','no-cache, no-store, must-revalidate');
258             self.send_header('Pragma','no-cache');
259             if self.reqtype in ("ogg","ogv"):
260                 self.send_header("Content-Type", "video/ogg")
261             else:
262                 self.send_header("Content-Type", "video/mpeg")
263             self.end_headers()
264             # Do not send real headers at all
265             self.headerssent = True
266
267         try:
268             self.hanggreenlet = gevent.spawn(self.hangDetector)
269             logger.debug("hangDetector spawned")
270             gevent.sleep()
271
272             # Getting URL
273             self.errorhappened = False
274
275             if shouldcreatevp:
276                 logger.debug("Got url " + self.path_unquoted)
277                 # Force ffmpeg demuxing if set in config
278                 if VPConfig.vlcforceffmpeg:
279                     self.vlcprefix = 'http/ffmpeg://'
280                 else:
281                     self.vlcprefix = ''
282
283                 logger.info("Starting broadcasting "+self.path)                    
284                 VPStuff.vlcclient.startBroadcast(
285                     self.vlcid, self.vlcprefix + self.path_unquoted, VPConfig.vlcmux, VPConfig.vlcpreaccess, self.reqtype)
286                 # Sleep a bit, because sometimes VLC doesn't open port in
287                 # time
288                 gevent.sleep(0.5)
289
290             # Building new VLC url
291             self.url = 'http://' + VPConfig.vlchost + \
292                 ':' + str(VPConfig.vlcoutport) + '/' + self.vlcid
293             logger.debug("VLC url " + self.url)
294
295             # Sending client headers to videostream
296             self.video = urllib2.Request(self.url)
297             for key in self.headers.dict:
298                 self.video.add_header(key, self.headers.dict[key])
299
300             self.video = urllib2.urlopen(self.video)
301
302             # Sending videostream headers to client
303             if not self.headerssent:
304                 self.send_response(self.video.getcode())
305                 if self.video.info().dict.has_key('connection'):
306                     del self.video.info().dict['connection']
307                 if self.video.info().dict.has_key('server'):
308                     del self.video.info().dict['server']
309                 if self.video.info().dict.has_key('transfer-encoding'):
310                     del self.video.info().dict['transfer-encoding']
311                 if self.video.info().dict.has_key('content-type'):
312                     del self.video.info().dict['content-type']
313                 if self.video.info().dict.has_key('keep-alive'):
314                     del self.video.info().dict['keep-alive']
315
316                 for key in self.video.info().dict:
317                     self.send_header(key, self.video.info().dict[key])
318
319                 self.send_header('Cache-Control','no-cache, no-store, must-revalidate');
320                 self.send_header('Pragma','no-cache');
321
322                 if self.reqtype=="ogg":
323                     self.send_header("Content-Type", "video/ogg")
324                 else:
325                     self.send_header("Content-Type", "video/mpeg")
326
327                 # End headers. Next goes video data
328                 self.end_headers()
329                 logger.debug("Headers sent")
330                 self.headerssent = True
331
332             # Run proxyReadWrite
333             self.proxyReadWrite()
334
335             # Waiting until hangDetector is joined
336             self.hanggreenlet.join()
337             logger.debug("Request handler finished")
338         except (vlcclient.VlcException) as e:
339             logger.error("Exception: " + repr(e))
340             VPStuff.vlcerrors = VPStuff.vlcerrors + 1
341             logger.error("%s error(s) communicating VLC")
342             self.errorhappened = True
343             self.dieWithError()            
344         except (vpclient.VPException, vlcclient.VlcException, urllib2.URLError) as e:
345             logger.error("Exception: " + repr(e))
346             self.errorhappened = True
347             self.dieWithError()
348         except gevent.GreenletExit:
349             # hangDetector told us about client disconnection
350             pass
351         except Exception:
352             # Unknown exception
353             logger.error(traceback.format_exc())
354             self.errorhappened = True
355             self.dieWithError()
356         finally:
357             logger.debug("END REQUEST")
358             logger.info("Closed connection from " + self.clientip + " path " + self.path)
359             VPStuff.clientcounter.delete(self.reqtype+'/'+self.path_unquoted, self.client_data)
360             self.vp.destroy()
361
362 class HTTPServer(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer):
363
364     def handle_error(self, request, client_address):
365         # Do not print HTTP tracebacks
366         pass
367
368
369 class VPStuff(object):
370     '''
371     Inter-class interaction class
372     '''
373     vlcclient=None
374     vlcerrors=0
375
376 # taken from http://stackoverflow.com/questions/2699907/dropping-root-permissions-in-python
377 def drop_privileges(uid_name, gid_name='nogroup'):
378
379     # Get the uid/gid from the name
380     running_uid = pwd.getpwnam(uid_name).pw_uid
381     running_uid_home = pwd.getpwnam(uid_name).pw_dir
382     running_gid = grp.getgrnam(gid_name).gr_gid
383
384     # Remove group privileges
385     os.setgroups([])
386
387     # Try setting the new uid/gid
388     os.setgid(running_gid)
389     os.setuid(running_uid)
390
391     # Ensure a very conservative umask
392     old_umask = os.umask(077)
393
394     if os.getuid() == running_uid and os.getgid() == running_gid:
395         # could be useful
396         os.environ['HOME'] = running_uid_home
397         return True
398     return False
399
400 logging.basicConfig(
401     filename=VPConfig.logpath + 'vphttp.log' if VPConfig.loggingtoafile else None,
402     format='%(asctime)s %(levelname)s %(name)s: %(message)s', datefmt='%d.%m.%Y %H:%M:%S', level=VPConfig.debug)
403 logger = logging.getLogger('INIT')
404
405 # Loading plugins
406 # Trying to change dir (would fail in freezed state)
407 try:
408     os.chdir(os.path.dirname(os.path.realpath(__file__)))
409 except:
410     pass
411 # Creating dict of handlers
412 VPStuff.pluginshandlers = dict()
413 # And a list with plugin instances
414 VPStuff.pluginlist = list()
415 pluginsmatch = glob.glob('plugins/*_plugin.py')
416 sys.path.insert(0, 'plugins')
417 pluginslist = [os.path.splitext(os.path.basename(x))[0] for x in pluginsmatch]
418 for i in pluginslist:
419     plugin = __import__(i)
420     plugname = i.split('_')[0].capitalize()
421     try:
422         plugininstance = getattr(plugin, plugname)(VPConfig, VPStuff)
423     except Exception as e:
424         logger.error("Cannot load plugin " + plugname + ": " + repr(e))
425         continue
426     logger.debug('Plugin loaded: ' + plugname)
427     for j in plugininstance.handlers:
428         logger.info("Registering handler '" + j +"'")
429         VPStuff.pluginshandlers[j] = plugininstance
430     VPStuff.pluginlist.append(plugininstance)
431
432 # Check whether we can bind to the defined port safely
433 if os.getuid() != 0 and VPConfig.httpport <= 1024:
434     logger.error("Cannot bind to port " + str(VPConfig.httpport) + " without root privileges")
435     sys.exit(1)
436
437 server = HTTPServer((VPConfig.httphost, VPConfig.httpport), HTTPHandler)
438 logger = logging.getLogger('HTTP')
439
440 # Dropping root privileges if needed
441 if VPConfig.vpproxyuser and os.getuid() == 0:
442     if drop_privileges(VPConfig.vpproxyuser):
443         logger.info("Dropped privileges to user " + VPConfig.vpproxyuser)
444     else:
445         logger.error("Cannot drop privileges to user " + VPConfig.vpproxyuser)
446         sys.exit(1)
447
448 # Creating ClientCounter
449 VPStuff.clientcounter = ClientCounter()
450
451 DEVNULL = open(os.devnull, 'wb')
452
453 # Spawning procedures
454 def spawnVLC(cmd, delay = 0):
455     try:
456         VPStuff.vlc = psutil.Popen(cmd) #, stdout=DEVNULL, stderr=DEVNULL)
457         VPStuff.vlcerrors = 0
458         gevent.sleep(delay)
459         return True
460     except:
461         return False
462
463 def connectVLC():
464     try:
465         VPStuff.vlcclient = vlcclient.VlcClient(
466             host=VPConfig.vlchost, port=VPConfig.vlcport, password=VPConfig.vlcpass,
467             out_port=VPConfig.vlcoutport)
468         return True
469     except vlcclient.VlcException as e:
470         return False
471
472 def isRunning(process):
473     if psutil.version_info[0] >= 2:
474         if process.is_running() and process.status() != psutil.STATUS_ZOMBIE:
475             return True
476     else:  # for older versions of psutil
477         if process.is_running() and process.status != psutil.STATUS_ZOMBIE:
478             return True
479     return False
480
481 def findProcess(name):
482     for proc in psutil.process_iter():
483         try:
484             pinfo = proc.as_dict(attrs=['pid', 'name'])
485             if pinfo['name'] == name:
486                 return pinfo['pid']
487         except psutil.AccessDenied:
488             # System process
489             pass
490         except psutil.NoSuchProcess:
491             # Process terminated
492             pass
493     return None
494
495 def clean_proc():
496     # Trying to close all spawned processes gracefully
497     if isRunning(VPStuff.vlc):
498         if VPStuff.vlcclient:
499             VPStuff.vlcclient.destroy()
500         gevent.sleep(1)
501     if isRunning(VPStuff.vlc):
502         # or not :)
503         VPStuff.vlc.terminate()
504         gevent.sleep(1)
505         if isRunning(VPStuff.vlc):
506             VPStuff.vlc.kill()
507     del VPStuff.vlc
508
509 def restartVLC(cmd, delay = 0):
510     clean_proc()
511     if spawnVLC(cmd, delay):
512         if connectVLC():
513             return True
514     return False
515
516 # This is what we call to stop the server completely
517 def shutdown(signum = 0, frame = 0):
518     logger.info("Stopping server...")
519     # Closing all client connections
520     for connection in server.RequestHandlerClass.requestlist:
521         try:
522             # Set errorhappened to prevent waiting for videodestroydelay
523             connection.errorhappened = True
524             connection.closeConnection()
525         except:
526             logger.warning("Cannot kill a connection!")
527     clean_proc()
528     server.server_close()
529     sys.exit()
530
531 def _reloadconfig(signum=None, frame=None):
532     '''
533     Reload configuration file.
534     SIGHUP handler.
535     '''
536     global VPConfig
537
538     logger = logging.getLogger('reloadconfig')
539     reload(vpconfig)
540     from vpconfig import VPConfig
541     logger.info('Config reloaded')
542
543 sched = BackgroundScheduler()
544 sched.start()
545
546 def clean_streams():
547   if VPStuff.vlcclient:
548     VPStuff.vlcclient.clean_streams(VPConfig.videodestroydelay)
549
550 job = sched.add_job(clean_streams, 'interval', seconds=15)
551
552 # setting signal handlers
553 try:
554     gevent.signal(signal.SIGHUP, _reloadconfig)
555     gevent.signal(signal.SIGTERM, shutdown)
556 except AttributeError:
557     pass
558
559 name = 'vlc'
560 VPStuff.vlcProc = VPConfig.vlccmd.split()
561 if spawnVLC(VPStuff.vlcProc, VPConfig.vlcspawntimeout) and connectVLC():
562     logger.info("VLC spawned with pid " + str(VPStuff.vlc.pid))
563 else:
564     logger.error('Cannot spawn or connect to VLC!')
565     clean_proc()
566     sys.exit(1)
567
568 try:
569     logger.info("Using gevent %s" % gevent.__version__)
570     logger.info("Usig psutil %s" % psutil.__version__)
571     logger.info("Using VLC %s" % VPStuff.vlcclient._vlcver)
572     logger.info("Server started.")
573     while True:
574
575         if not isRunning(VPStuff.vlc):
576
577             del VPStuff.vlc
578             if spawnVLC(VPStuff.vlcProc, VPConfig.vlcspawntimeout) and connectVLC():
579                 logger.info("VLC died, respawned it with pid " + str(VPStuff.vlc.pid))
580             else:
581                 logger.error("Cannot spawn VLC!")
582                 clean_proc()
583                 sys.exit(1)
584
585         # Return to our server tasks
586         server.handle_request()
587
588         if VPStuff.vlcerrors>5:
589             if restartVLC(VPStuff.vlcProc, VPConfig.vlcspawntimeout):
590                 logger.info("VLC hung, respawned it with pid " + str(VPStuff.vlc.pid))
591             else:
592                 logger.error("Cannot spawn VLC!")
593                 clean_proc()
594                 sys.exit(1)
595                                                                 
596 except (KeyboardInterrupt, SystemExit):
597     sched.shutdown()
598     shutdown()