6108bd81055d9b089e885c9e4380e4ffe331c6b9
[vpproxy.git] / vphttp.py
1 #!/usr/bin/env python2
2 # -*- coding: utf-8 -*-
3 '''
4 VPProxy: HTTP/HLS Stream to HTTP Multiplexing Proxy
5
6 Based on AceProxy (https://github.com/ValdikSS/AceProxy) design
7 '''
8 import traceback
9 import gevent
10 import gevent.monkey
11 # Monkeypatching and all the stuff
12 gevent.monkey.patch_all()
13 # Startup delay for daemon restart
14 gevent.sleep(5)
15 import glob
16 import os
17 import signal
18 import sys
19 import logging
20 import psutil
21 import BaseHTTPServer
22 import SocketServer
23 from socket import error as SocketException
24 from socket import SHUT_RDWR
25 import urllib2
26 import hashlib
27 import vpconfig
28 from vpconfig import VPConfig
29 import vlcclient
30 import plugins.modules.ipaddr as ipaddr
31 from clientcounter import ClientCounter
32 from plugins.modules.PluginInterface import VPProxyPlugin
33 try:
34     import pwd
35     import grp
36 except ImportError:
37     pass
38
39
40
41 class HTTPHandler(BaseHTTPServer.BaseHTTPRequestHandler):
42
43     requestlist = []
44
45     def handle_one_request(self):
46         '''
47         Add request to requestlist, handle request and remove from the list
48         '''
49         HTTPHandler.requestlist.append(self)
50         BaseHTTPServer.BaseHTTPRequestHandler.handle_one_request(self)
51         HTTPHandler.requestlist.remove(self)
52
53     def closeConnection(self):
54         '''
55         Disconnecting client
56         '''
57         if self.clientconnected:
58             self.clientconnected = False
59             try:
60                 self.wfile.close()
61                 self.rfile.close()
62                 self.connection.shutdown(SHUT_RDWR)
63             except:
64                 pass
65
66     def dieWithError(self, errorcode=500):
67         '''
68         Close connection with error
69         '''
70         logging.warning("Dying with error")
71         if self.clientconnected:
72             self.send_error(errorcode)
73             self.end_headers()
74             self.closeConnection()
75
76     def proxyReadWrite(self):
77         '''
78         Read video stream and send it to client
79         '''
80         logger = logging.getLogger('http_proxyReadWrite')
81         logger.debug("Started")
82
83         self.vlcstate = True
84         self.streamstate = True
85
86         try:
87             while True:
88
89                 if not self.clientconnected:
90                     logger.debug("Client is not connected, terminating")
91                     break
92
93                 data = self.video.read(4096)
94                 if data and self.clientconnected:
95                     self.wfile.write(data)
96                 else:
97                     logger.warning("Video connection closed")
98                     break
99
100         except SocketException:
101             # Video connection dropped
102             logger.warning("Video connection dropped")
103         finally:
104             self.video.close()
105             self.closeConnection()
106
107     def hangDetector(self):
108         '''
109         Detect client disconnection while in the middle of something
110         or just normal connection close.
111         '''
112         logger = logging.getLogger('http_hangDetector')
113         try:
114             while True:
115                 if not self.rfile.read():
116                     break
117         except:
118             pass
119         finally:
120             self.clientconnected = False
121             logger.debug("Client disconnected")
122             try:
123                 self.requestgreenlet.kill()
124             except:
125                 pass
126             finally:
127                 gevent.sleep()
128             return
129
130     def do_HEAD(self):
131         return self.do_GET(headers_only=True)
132
133     def do_GET(self, headers_only=False):
134         '''
135         GET request handler
136         '''
137         logger = logging.getLogger('http_HTTPHandler')
138         self.clientconnected = True
139         # Don't wait videodestroydelay if error happened
140         self.errorhappened = True
141         # Headers sent flag for fake headers UAs
142         self.headerssent = False
143         # Current greenlet
144         self.requestgreenlet = gevent.getcurrent()
145         # Connected client IP address
146         self.clientip = self.request.getpeername()[0]
147
148         if VPConfig.firewall:
149             # If firewall enabled
150             self.clientinrange = any(map(lambda i: ipaddr.IPAddress(self.clientip) \
151                                 in ipaddr.IPNetwork(i), VPConfig.firewallnetranges))
152
153             if (VPConfig.firewallblacklistmode and self.clientinrange) or \
154                 (not VPConfig.firewallblacklistmode and not self.clientinrange):
155                     logger.info('Dropping connection from ' + self.clientip + ' due to ' + \
156                                 'firewall rules')
157                     self.dieWithError(403)  # 403 Forbidden
158                     return
159
160         logger.info("Accepted connection from " + self.clientip + " path " + self.path)
161
162         try:
163             self.splittedpath = self.path.split('/')
164             self.reqtype = self.splittedpath[1].lower()
165             # If first parameter is 'pid' or 'torrent' or it should be handled
166             # by plugin
167             if not (self.reqtype in ('get','mp4','webm') or self.reqtype in VPStuff.pluginshandlers):
168                 self.dieWithError(400)  # 400 Bad Request
169                 return
170         except IndexError:
171             self.dieWithError(400)  # 400 Bad Request
172             return
173
174         # Handle request with plugin handler
175         if self.reqtype in VPStuff.pluginshandlers:
176             try:
177                 VPStuff.pluginshandlers.get(self.reqtype).handle(self)
178             except Exception as e:
179                 logger.error('Plugin exception: ' + repr(e))
180                 logger.error(traceback.format_exc())
181                 self.dieWithError()
182             finally:
183                 self.closeConnection()
184                 return
185         self.handleRequest(headers_only)
186
187     def handleRequest(self, headers_only):
188
189         # Limit concurrent connections
190         print VPStuff.clientcounter.total
191         if 0 < VPConfig.maxconns <= VPStuff.clientcounter.total:
192             logger.debug("Maximum connections reached, can't serve this")
193             self.dieWithError(503)  # 503 Service Unavailable
194             return
195
196         # Pretend to work fine with Fake UAs or HEAD request.
197         useragent = self.headers.get('User-Agent')
198         logger.debug("HTTP User Agent:"+useragent)
199         fakeua = useragent and useragent in VPConfig.fakeuas
200         if headers_only or fakeua:
201             if fakeua:
202                 logger.debug("Got fake UA: " + self.headers.get('User-Agent'))
203             # Return 200 and exit
204             self.send_response(200)
205             self.send_header("Content-Type", "video/mpeg")
206             self.end_headers()
207             self.closeConnection()
208             return
209
210         self.path_unquoted = urllib2.unquote('/'.join(self.splittedpath[2:]))
211         # Make list with parameters
212         self.params = list()
213         for i in xrange(3, 8):
214             try:
215                 self.params.append(int(self.splittedpath[i]))
216             except (IndexError, ValueError):
217                 self.params.append('0')
218
219         # Adding client to clientcounter
220         clients = VPStuff.clientcounter.add(self.reqtype+'\\'+self.path_unquoted, self.clientip)
221         # If we are the one client, but sucessfully got vp instance from clientcounter,
222         # then somebody is waiting in the videodestroydelay state
223
224         # Check if we are first client
225         if VPStuff.clientcounter.get(self.reqtype+'\\'+self.path_unquoted)==1:
226             logger.debug("First client, should create VLC session")
227             shouldcreatevp = True
228         else:
229             logger.debug("Can reuse existing session")
230             shouldcreatevp = False
231
232         self.vlcid = hashlib.md5(self.reqtype+'\\'+self.path_unquoted).hexdigest()
233
234         # Send fake headers if this User-Agent is in fakeheaderuas tuple
235         if fakeua:
236             logger.debug(
237                 "Sending fake headers for " + useragent)
238             self.send_response(200)
239             self.send_header("Content-Type", "video/mpeg")
240             self.end_headers()
241             # Do not send real headers at all
242             self.headerssent = True
243
244         try:
245             self.hanggreenlet = gevent.spawn(self.hangDetector)
246             logger.debug("hangDetector spawned")
247             gevent.sleep()
248
249             # Getting URL
250             self.errorhappened = False
251
252             if shouldcreatevp:
253                 logger.debug("Got url " + self.path_unquoted)
254                 # Force ffmpeg demuxing if set in config
255                 if VPConfig.vlcforceffmpeg:
256                     self.vlcprefix = 'http/ffmpeg://'
257                 else:
258                     self.vlcprefix = ''
259
260                 logger.info("Starting broadcasting "+self.path)                    
261                 VPStuff.vlcclient.startBroadcast(
262                     self.vlcid, self.vlcprefix + self.path_unquoted, VPConfig.vlcmux, VPConfig.vlcpreaccess, self.reqtype)
263                 # Sleep a bit, because sometimes VLC doesn't open port in
264                 # time
265                 gevent.sleep(0.5)
266
267             # Building new VLC url
268             self.url = 'http://' + VPConfig.vlchost + \
269                 ':' + str(VPConfig.vlcoutport) + '/' + self.vlcid
270             logger.debug("VLC url " + self.url)
271
272             # Sending client headers to videostream
273             self.video = urllib2.Request(self.url)
274             for key in self.headers.dict:
275                 self.video.add_header(key, self.headers.dict[key])
276
277             self.video = urllib2.urlopen(self.video)
278
279             # Sending videostream headers to client
280             if not self.headerssent:
281                 self.send_response(self.video.getcode())
282                 if self.video.info().dict.has_key('connection'):
283                     del self.video.info().dict['connection']
284                 if self.video.info().dict.has_key('server'):
285                     del self.video.info().dict['server']
286                 if self.video.info().dict.has_key('transfer-encoding'):
287                     del self.video.info().dict['transfer-encoding']
288                 if self.video.info().dict.has_key('content-type'):
289                     del self.video.info().dict['content-type']
290                 if self.video.info().dict.has_key('keep-alive'):
291                     del self.video.info().dict['keep-alive']
292
293                 for key in self.video.info().dict:
294                     self.send_header(key, self.video.info().dict[key])
295                 self.send_header("Content-Type", "video/mpeg")
296                 # End headers. Next goes video data
297                 self.end_headers()
298                 logger.debug("Headers sent")
299
300             # Run proxyReadWrite
301             self.proxyReadWrite()
302
303             # Waiting until hangDetector is joined
304             self.hanggreenlet.join()
305             logger.debug("Request handler finished")
306
307         except (vpclient.VPException, vlcclient.VlcException, urllib2.URLError) as e:
308             logger.error("Exception: " + repr(e))
309             self.errorhappened = True
310             self.dieWithError()
311         except gevent.GreenletExit:
312             # hangDetector told us about client disconnection
313             pass
314         except Exception:
315             # Unknown exception
316             logger.error(traceback.format_exc())
317             self.errorhappened = True
318             self.dieWithError()
319         finally:
320             logger.debug("END REQUEST")
321             logger.info("Closed connection from " + self.clientip + " path " + self.path)
322             VPStuff.clientcounter.delete(self.reqtype+'\\'+self.path_unquoted, self.clientip)
323             if not VPStuff.clientcounter.get(self.reqtype+'\\'+self.path_unquoted):
324                 try:
325                     logger.debug("That was the last client, destroying VPClient")
326                     logger.info("Stopping broadcasting " + self.path)
327                     VPStuff.vlcclient.stopBroadcast(self.vlcid)
328                 except:
329                     pass
330                 self.vp.destroy()
331
332
333 class HTTPServer(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer):
334
335     def handle_error(self, request, client_address):
336         # Do not print HTTP tracebacks
337         pass
338
339
340 class VPStuff(object):
341     '''
342     Inter-class interaction class
343     '''
344     vlcclient=None
345
346 # taken from http://stackoverflow.com/questions/2699907/dropping-root-permissions-in-python
347 def drop_privileges(uid_name, gid_name='nogroup'):
348
349     # Get the uid/gid from the name
350     running_uid = pwd.getpwnam(uid_name).pw_uid
351     running_uid_home = pwd.getpwnam(uid_name).pw_dir
352     running_gid = grp.getgrnam(gid_name).gr_gid
353
354     # Remove group privileges
355     os.setgroups([])
356
357     # Try setting the new uid/gid
358     os.setgid(running_gid)
359     os.setuid(running_uid)
360
361     # Ensure a very conservative umask
362     old_umask = os.umask(077)
363
364     if os.getuid() == running_uid and os.getgid() == running_gid:
365         # could be useful
366         os.environ['HOME'] = running_uid_home
367         return True
368     return False
369
370 logging.basicConfig(
371     filename=VPConfig.logpath + 'vphttp.log' if VPConfig.loggingtoafile else None,
372     format='%(asctime)s %(levelname)s %(name)s: %(message)s', datefmt='%d.%m.%Y %H:%M:%S', level=VPConfig.debug)
373 logger = logging.getLogger('INIT')
374
375 # Loading plugins
376 # Trying to change dir (would fail in freezed state)
377 try:
378     os.chdir(os.path.dirname(os.path.realpath(__file__)))
379 except:
380     pass
381 # Creating dict of handlers
382 VPStuff.pluginshandlers = dict()
383 # And a list with plugin instances
384 VPStuff.pluginlist = list()
385 pluginsmatch = glob.glob('plugins/*_plugin.py')
386 sys.path.insert(0, 'plugins')
387 pluginslist = [os.path.splitext(os.path.basename(x))[0] for x in pluginsmatch]
388 for i in pluginslist:
389     plugin = __import__(i)
390     plugname = i.split('_')[0].capitalize()
391     try:
392         plugininstance = getattr(plugin, plugname)(VPConfig, VPStuff)
393     except Exception as e:
394         logger.error("Cannot load plugin " + plugname + ": " + repr(e))
395         continue
396     logger.debug('Plugin loaded: ' + plugname)
397     for j in plugininstance.handlers:
398         VPStuff.pluginshandlers[j] = plugininstance
399     VPStuff.pluginlist.append(plugininstance)
400
401 # Check whether we can bind to the defined port safely
402 if os.getuid() != 0 and VPConfig.httpport <= 1024:
403     logger.error("Cannot bind to port " + str(VPConfig.httpport) + " without root privileges")
404     sys.exit(1)
405
406 server = HTTPServer((VPConfig.httphost, VPConfig.httpport), HTTPHandler)
407 logger = logging.getLogger('HTTP')
408
409 # Dropping root privileges if needed
410 if VPConfig.vpproxyuser and os.getuid() == 0:
411     if drop_privileges(VPConfig.vpproxyuser):
412         logger.info("Dropped privileges to user " + VPConfig.vpproxyuser)
413     else:
414         logger.error("Cannot drop privileges to user " + VPConfig.vpproxyuser)
415         sys.exit(1)
416
417 # Creating ClientCounter
418 VPStuff.clientcounter = ClientCounter()
419
420 DEVNULL = open(os.devnull, 'wb')
421
422 # Spawning procedures
423 def spawnVLC(cmd, delay = 0):
424     try:
425         VPStuff.vlc = psutil.Popen(cmd) #, stdout=DEVNULL, stderr=DEVNULL)
426         gevent.sleep(delay)
427         return True
428     except:
429         return False
430
431 def connectVLC():
432     try:
433         VPStuff.vlcclient = vlcclient.VlcClient(
434             host=VPConfig.vlchost, port=VPConfig.vlcport, password=VPConfig.vlcpass,
435             out_port=VPConfig.vlcoutport)
436         return True
437     except vlcclient.VlcException as e:
438         return False
439
440 def isRunning(process):
441     if psutil.version_info[0] >= 2:
442         if process.is_running() and process.status() != psutil.STATUS_ZOMBIE:
443             return True
444     else:  # for older versions of psutil
445         if process.is_running() and process.status != psutil.STATUS_ZOMBIE:
446             return True
447     return False
448
449 def findProcess(name):
450     for proc in psutil.process_iter():
451         try:
452             pinfo = proc.as_dict(attrs=['pid', 'name'])
453             if pinfo['name'] == name:
454                 return pinfo['pid']
455         except psutil.AccessDenied:
456             # System process
457             pass
458         except psutil.NoSuchProcess:
459             # Process terminated
460             pass
461     return None
462
463 def clean_proc():
464     # Trying to close all spawned processes gracefully
465     if isRunning(VPStuff.vlc):
466         if VPStuff.vlcclient:
467             VPStuff.vlcclient.destroy()
468         gevent.sleep(1)
469     if isRunning(VPStuff.vlc):
470         # or not :)
471         VPStuff.vlc.kill()
472
473 # This is what we call to stop the server completely
474 def shutdown(signum = 0, frame = 0):
475     logger.info("Stopping server...")
476     # Closing all client connections
477     for connection in server.RequestHandlerClass.requestlist:
478         try:
479             # Set errorhappened to prevent waiting for videodestroydelay
480             connection.errorhappened = True
481             connection.closeConnection()
482         except:
483             logger.warning("Cannot kill a connection!")
484     clean_proc()
485     server.server_close()
486     sys.exit()
487
488 def _reloadconfig(signum=None, frame=None):
489     '''
490     Reload configuration file.
491     SIGHUP handler.
492     '''
493     global VPConfig
494
495     logger = logging.getLogger('reloadconfig')
496     reload(vpconfig)
497     from vpconfig import VPConfig
498     logger.info('Config reloaded')
499
500 # setting signal handlers
501 try:
502     gevent.signal(signal.SIGHUP, _reloadconfig)
503     gevent.signal(signal.SIGTERM, shutdown)
504 except AttributeError:
505     pass
506
507 name = 'vlc'
508 VPStuff.vlcProc = VPConfig.vlccmd.split()
509 if spawnVLC(VPStuff.vlcProc, VPConfig.vlcspawntimeout) and connectVLC():
510     logger.info("VLC spawned with pid " + str(VPStuff.vlc.pid))
511 else:
512     logger.error('Cannot spawn or connect to VLC!')
513     clean_proc()
514     sys.exit(1)
515
516 try:
517     logger.info("Using gevent %s" % gevent.__version__)
518     logger.info("Using psutil %s" % psutil.__version__)
519     logger.info("Using VLC %s" % VPStuff.vlcclient._vlcver)
520     logger.info("Server started.")
521     while True:
522         if not isRunning(VPStuff.vlc):
523             del VPStuff.vlc
524             if spawnVLC(VPStuff.vlcProc, VPConfig.vlcspawntimeout) and connectVLC():
525                 logger.info("VLC died, respawned it with pid " + str(VPStuff.vlc.pid))
526             else:
527                 logger.error("Cannot spawn VLC!")
528                 clean_proc()
529                 sys.exit(1)
530         # Return to our server tasks
531         server.handle_request()
532 except (KeyboardInterrupt, SystemExit):
533     shutdown()