bda34851d4be98e4ef5b7df5b6ef2d710e5e44ee
[vpproxy.git] / vphttp.py
1 #!/usr/bin/env python2
2 # -*- coding: utf-8 -*-
3 '''
4 VPProxy: HTTP/HLS Stream to HTTP Multiplexing Proxy
5
6 Based on AceProxy (https://github.com/ValdikSS/AceProxy) design
7 '''
8 import traceback
9 import gevent
10 import gevent.monkey
11 # Monkeypatching and all the stuff
12 gevent.monkey.patch_all()
13 # Startup delay for daemon restart
14 gevent.sleep(5)
15 import glob
16 import os
17 import signal
18 import sys
19 import logging
20 import psutil
21 import BaseHTTPServer
22 import SocketServer
23 from socket import error as SocketException
24 from socket import SHUT_RDWR
25 import urllib2
26 import hashlib
27 import vpconfig
28 from vpconfig import VPConfig
29 import vlcclient
30 import plugins.modules.ipaddr as ipaddr
31 from clientcounter import ClientCounter
32 from plugins.modules.PluginInterface import VPProxyPlugin
33 try:
34     import pwd
35     import grp
36 except ImportError:
37     pass
38
39
40
41 class HTTPHandler(BaseHTTPServer.BaseHTTPRequestHandler):
42
43     requestlist = []
44
45     def handle_one_request(self):
46         '''
47         Add request to requestlist, handle request and remove from the list
48         '''
49         HTTPHandler.requestlist.append(self)
50         BaseHTTPServer.BaseHTTPRequestHandler.handle_one_request(self)
51         HTTPHandler.requestlist.remove(self)
52
53     def closeConnection(self):
54         '''
55         Disconnecting client
56         '''
57         if self.clientconnected:
58             self.clientconnected = False
59             try:
60                 self.wfile.close()
61                 self.rfile.close()
62                 self.connection.shutdown(SHUT_RDWR)
63             except:
64                 pass
65
66     def dieWithError(self, errorcode=500):
67         '''
68         Close connection with error
69         '''
70         logging.warning("Dying with error")
71         if self.clientconnected:
72             self.send_error(errorcode)
73             self.end_headers()
74             self.closeConnection()
75
76     def proxyReadWrite(self):
77         '''
78         Read video stream and send it to client
79         '''
80         logger = logging.getLogger('http_proxyReadWrite')
81         logger.debug("Started")
82
83         self.vlcstate = True
84         self.streamstate = True
85
86         try:
87             while True:
88
89                 if not self.clientconnected:
90                     logger.debug("Client is not connected, terminating")
91                     break
92
93                 data = self.video.read(4096)
94                 if data and self.clientconnected:
95                     self.wfile.write(data)
96                 else:
97                     logger.warning("Video connection closed")
98                     break
99
100         except SocketException:
101             # Video connection dropped
102             logger.warning("Video connection dropped")
103         finally:
104             self.video.close()
105             self.closeConnection()
106
107     def hangDetector(self):
108         '''
109         Detect client disconnection while in the middle of something
110         or just normal connection close.
111         '''
112         logger = logging.getLogger('http_hangDetector')
113         try:
114             while True:
115                 if not self.rfile.read():
116                     break
117         except:
118             pass
119         finally:
120             self.clientconnected = False
121             logger.debug("Client disconnected")
122             try:
123                 self.requestgreenlet.kill()
124             except:
125                 pass
126             finally:
127                 gevent.sleep()
128             return
129
130     def do_HEAD(self):
131         return self.do_GET(headers_only=True)
132
133     def do_GET(self, headers_only=False):
134         '''
135         GET request handler
136         '''
137         logger = logging.getLogger('http_HTTPHandler')
138         self.clientconnected = True
139         # Don't wait videodestroydelay if error happened
140         self.errorhappened = True
141         # Headers sent flag for fake headers UAs
142         self.headerssent = False
143         # Current greenlet
144         self.requestgreenlet = gevent.getcurrent()
145         # Connected client IP address
146         self.clientip = self.request.getpeername()[0]
147
148         if VPConfig.firewall:
149             # If firewall enabled
150             self.clientinrange = any(map(lambda i: ipaddr.IPAddress(self.clientip) \
151                                 in ipaddr.IPNetwork(i), VPConfig.firewallnetranges))
152
153             if (VPConfig.firewallblacklistmode and self.clientinrange) or \
154                 (not VPConfig.firewallblacklistmode and not self.clientinrange):
155                     logger.info('Dropping connection from ' + self.clientip + ' due to ' + \
156                                 'firewall rules')
157                     self.dieWithError(403)  # 403 Forbidden
158                     return
159
160         logger.info("Accepted connection from " + self.clientip + " path " + self.path)
161
162         try:
163             self.splittedpath = self.path.split('/')
164             self.reqtype = self.splittedpath[1].lower()
165             # If first parameter is 'pid' or 'torrent' or it should be handled
166             # by plugin
167             if not (self.reqtype in ('get','mp4','ogg') or self.reqtype in VPStuff.pluginshandlers):
168                 self.dieWithError(400)  # 400 Bad Request
169                 return
170         except IndexError:
171             self.dieWithError(400)  # 400 Bad Request
172             return
173
174         # Handle request with plugin handler
175         if self.reqtype in VPStuff.pluginshandlers:
176             try:
177                 VPStuff.pluginshandlers.get(self.reqtype).handle(self)
178             except Exception as e:
179                 logger.error('Plugin exception: ' + repr(e))
180                 logger.error(traceback.format_exc())
181                 self.dieWithError()
182             finally:
183                 self.closeConnection()
184                 return
185         self.handleRequest(headers_only)
186
187     def handleRequest(self, headers_only):
188
189         # Limit concurrent connections
190         if 0 < VPConfig.maxconns <= VPStuff.clientcounter.total:
191             logger.debug("Maximum connections reached, can't serve this")
192             self.dieWithError(503)  # 503 Service Unavailable
193             return
194
195         # Pretend to work fine with Fake UAs or HEAD request.
196         useragent = self.headers.get('User-Agent')
197         logger.debug("HTTP User Agent:"+useragent)
198         fakeua = useragent and useragent in VPConfig.fakeuas
199         if headers_only or fakeua:
200             if fakeua:
201                 logger.debug("Got fake UA: " + self.headers.get('User-Agent'))
202             # Return 200 and exit
203             self.send_response(200)
204             self.send_header("Content-Type", "video/mpeg")
205             self.end_headers()
206             self.closeConnection()
207             return
208
209         self.path_unquoted = urllib2.unquote('/'.join(self.splittedpath[2:]))
210         # Make list with parameters
211         self.params = list()
212         for i in xrange(3, 8):
213             try:
214                 self.params.append(int(self.splittedpath[i]))
215             except (IndexError, ValueError):
216                 self.params.append('0')
217
218         # Adding client to clientcounter
219         clients = VPStuff.clientcounter.add(self.reqtype+'\\'+self.path_unquoted, self.clientip)
220         # If we are the one client, but sucessfully got vp instance from clientcounter,
221         # then somebody is waiting in the videodestroydelay state
222
223         # Check if we are first client
224         if VPStuff.clientcounter.get(self.reqtype+'\\'+self.path_unquoted)==1:
225             logger.debug("First client, should create VLC session")
226             shouldcreatevp = True
227         else:
228             logger.debug("Can reuse existing session")
229             shouldcreatevp = False
230
231         self.vlcid = hashlib.md5(self.reqtype+'\\'+self.path_unquoted).hexdigest()
232
233         # Send fake headers if this User-Agent is in fakeheaderuas tuple
234         if fakeua:
235             logger.debug(
236                 "Sending fake headers for " + useragent)
237             self.send_response(200)
238             if self.reqtype=="ogg":
239                 self.send_header("Content-Type", "video/ogg")
240             else:
241                 self.send_header("Content-Type", "video/mpeg")
242             self.end_headers()
243             # Do not send real headers at all
244             self.headerssent = True
245
246         try:
247             self.hanggreenlet = gevent.spawn(self.hangDetector)
248             logger.debug("hangDetector spawned")
249             gevent.sleep()
250
251             # Getting URL
252             self.errorhappened = False
253
254             if shouldcreatevp:
255                 logger.debug("Got url " + self.path_unquoted)
256                 # Force ffmpeg demuxing if set in config
257                 if VPConfig.vlcforceffmpeg:
258                     self.vlcprefix = 'http/ffmpeg://'
259                 else:
260                     self.vlcprefix = ''
261
262                 logger.info("Starting broadcasting "+self.path)                    
263                 VPStuff.vlcclient.startBroadcast(
264                     self.vlcid, self.vlcprefix + self.path_unquoted, VPConfig.vlcmux, VPConfig.vlcpreaccess, self.reqtype)
265                 # Sleep a bit, because sometimes VLC doesn't open port in
266                 # time
267                 gevent.sleep(0.5)
268
269             # Building new VLC url
270             self.url = 'http://' + VPConfig.vlchost + \
271                 ':' + str(VPConfig.vlcoutport) + '/' + self.vlcid
272             logger.debug("VLC url " + self.url)
273
274             # Sending client headers to videostream
275             self.video = urllib2.Request(self.url)
276             for key in self.headers.dict:
277                 self.video.add_header(key, self.headers.dict[key])
278
279             self.video = urllib2.urlopen(self.video)
280
281             # Sending videostream headers to client
282             if not self.headerssent:
283                 self.send_response(self.video.getcode())
284                 if self.video.info().dict.has_key('connection'):
285                     del self.video.info().dict['connection']
286                 if self.video.info().dict.has_key('server'):
287                     del self.video.info().dict['server']
288                 if self.video.info().dict.has_key('transfer-encoding'):
289                     del self.video.info().dict['transfer-encoding']
290                 if self.video.info().dict.has_key('content-type'):
291                     del self.video.info().dict['content-type']
292                 if self.video.info().dict.has_key('keep-alive'):
293                     del self.video.info().dict['keep-alive']
294
295                 for key in self.video.info().dict:
296                     self.send_header(key, self.video.info().dict[key])
297
298                 if self.reqtype=="ogg":
299                     self.send_header("Content-Type", "video/ogg")
300                 else:
301                     self.send_header("Content-Type", "video/mpeg")
302
303                 # End headers. Next goes video data
304                 self.end_headers()
305                 logger.debug("Headers sent")
306
307             # Run proxyReadWrite
308             self.proxyReadWrite()
309
310             # Waiting until hangDetector is joined
311             self.hanggreenlet.join()
312             logger.debug("Request handler finished")
313
314         except (vpclient.VPException, vlcclient.VlcException, urllib2.URLError) as e:
315             logger.error("Exception: " + repr(e))
316             self.errorhappened = True
317             self.dieWithError()
318         except gevent.GreenletExit:
319             # hangDetector told us about client disconnection
320             pass
321         except Exception:
322             # Unknown exception
323             logger.error(traceback.format_exc())
324             self.errorhappened = True
325             self.dieWithError()
326         finally:
327             logger.debug("END REQUEST")
328             logger.info("Closed connection from " + self.clientip + " path " + self.path)
329             VPStuff.clientcounter.delete(self.reqtype+'\\'+self.path_unquoted, self.clientip)
330             if not VPStuff.clientcounter.get(self.reqtype+'\\'+self.path_unquoted):
331                 try:
332                     logger.debug("That was the last client, destroying VPClient")
333                     logger.info("Stopping broadcasting " + self.path)
334                     VPStuff.vlcclient.stopBroadcast(self.vlcid)
335                 except:
336                     pass
337                 self.vp.destroy()
338
339
340 class HTTPServer(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer):
341
342     def handle_error(self, request, client_address):
343         # Do not print HTTP tracebacks
344         pass
345
346
347 class VPStuff(object):
348     '''
349     Inter-class interaction class
350     '''
351     vlcclient=None
352
353 # taken from http://stackoverflow.com/questions/2699907/dropping-root-permissions-in-python
354 def drop_privileges(uid_name, gid_name='nogroup'):
355
356     # Get the uid/gid from the name
357     running_uid = pwd.getpwnam(uid_name).pw_uid
358     running_uid_home = pwd.getpwnam(uid_name).pw_dir
359     running_gid = grp.getgrnam(gid_name).gr_gid
360
361     # Remove group privileges
362     os.setgroups([])
363
364     # Try setting the new uid/gid
365     os.setgid(running_gid)
366     os.setuid(running_uid)
367
368     # Ensure a very conservative umask
369     old_umask = os.umask(077)
370
371     if os.getuid() == running_uid and os.getgid() == running_gid:
372         # could be useful
373         os.environ['HOME'] = running_uid_home
374         return True
375     return False
376
377 logging.basicConfig(
378     filename=VPConfig.logpath + 'vphttp.log' if VPConfig.loggingtoafile else None,
379     format='%(asctime)s %(levelname)s %(name)s: %(message)s', datefmt='%d.%m.%Y %H:%M:%S', level=VPConfig.debug)
380 logger = logging.getLogger('INIT')
381
382 # Loading plugins
383 # Trying to change dir (would fail in freezed state)
384 try:
385     os.chdir(os.path.dirname(os.path.realpath(__file__)))
386 except:
387     pass
388 # Creating dict of handlers
389 VPStuff.pluginshandlers = dict()
390 # And a list with plugin instances
391 VPStuff.pluginlist = list()
392 pluginsmatch = glob.glob('plugins/*_plugin.py')
393 sys.path.insert(0, 'plugins')
394 pluginslist = [os.path.splitext(os.path.basename(x))[0] for x in pluginsmatch]
395 for i in pluginslist:
396     plugin = __import__(i)
397     plugname = i.split('_')[0].capitalize()
398     try:
399         plugininstance = getattr(plugin, plugname)(VPConfig, VPStuff)
400     except Exception as e:
401         logger.error("Cannot load plugin " + plugname + ": " + repr(e))
402         continue
403     logger.debug('Plugin loaded: ' + plugname)
404     for j in plugininstance.handlers:
405         VPStuff.pluginshandlers[j] = plugininstance
406     VPStuff.pluginlist.append(plugininstance)
407
408 # Check whether we can bind to the defined port safely
409 if os.getuid() != 0 and VPConfig.httpport <= 1024:
410     logger.error("Cannot bind to port " + str(VPConfig.httpport) + " without root privileges")
411     sys.exit(1)
412
413 server = HTTPServer((VPConfig.httphost, VPConfig.httpport), HTTPHandler)
414 logger = logging.getLogger('HTTP')
415
416 # Dropping root privileges if needed
417 if VPConfig.vpproxyuser and os.getuid() == 0:
418     if drop_privileges(VPConfig.vpproxyuser):
419         logger.info("Dropped privileges to user " + VPConfig.vpproxyuser)
420     else:
421         logger.error("Cannot drop privileges to user " + VPConfig.vpproxyuser)
422         sys.exit(1)
423
424 # Creating ClientCounter
425 VPStuff.clientcounter = ClientCounter()
426
427 DEVNULL = open(os.devnull, 'wb')
428
429 # Spawning procedures
430 def spawnVLC(cmd, delay = 0):
431     try:
432         VPStuff.vlc = psutil.Popen(cmd) #, stdout=DEVNULL, stderr=DEVNULL)
433         gevent.sleep(delay)
434         return True
435     except:
436         return False
437
438 def connectVLC():
439     try:
440         VPStuff.vlcclient = vlcclient.VlcClient(
441             host=VPConfig.vlchost, port=VPConfig.vlcport, password=VPConfig.vlcpass,
442             out_port=VPConfig.vlcoutport)
443         return True
444     except vlcclient.VlcException as e:
445         return False
446
447 def isRunning(process):
448     if psutil.version_info[0] >= 2:
449         if process.is_running() and process.status() != psutil.STATUS_ZOMBIE:
450             return True
451     else:  # for older versions of psutil
452         if process.is_running() and process.status != psutil.STATUS_ZOMBIE:
453             return True
454     return False
455
456 def findProcess(name):
457     for proc in psutil.process_iter():
458         try:
459             pinfo = proc.as_dict(attrs=['pid', 'name'])
460             if pinfo['name'] == name:
461                 return pinfo['pid']
462         except psutil.AccessDenied:
463             # System process
464             pass
465         except psutil.NoSuchProcess:
466             # Process terminated
467             pass
468     return None
469
470 def clean_proc():
471     # Trying to close all spawned processes gracefully
472     if isRunning(VPStuff.vlc):
473         if VPStuff.vlcclient:
474             VPStuff.vlcclient.destroy()
475         gevent.sleep(1)
476     if isRunning(VPStuff.vlc):
477         # or not :)
478         VPStuff.vlc.kill()
479
480 # This is what we call to stop the server completely
481 def shutdown(signum = 0, frame = 0):
482     logger.info("Stopping server...")
483     # Closing all client connections
484     for connection in server.RequestHandlerClass.requestlist:
485         try:
486             # Set errorhappened to prevent waiting for videodestroydelay
487             connection.errorhappened = True
488             connection.closeConnection()
489         except:
490             logger.warning("Cannot kill a connection!")
491     clean_proc()
492     server.server_close()
493     sys.exit()
494
495 def _reloadconfig(signum=None, frame=None):
496     '''
497     Reload configuration file.
498     SIGHUP handler.
499     '''
500     global VPConfig
501
502     logger = logging.getLogger('reloadconfig')
503     reload(vpconfig)
504     from vpconfig import VPConfig
505     logger.info('Config reloaded')
506
507 # setting signal handlers
508 try:
509     gevent.signal(signal.SIGHUP, _reloadconfig)
510     gevent.signal(signal.SIGTERM, shutdown)
511 except AttributeError:
512     pass
513
514 name = 'vlc'
515 VPStuff.vlcProc = VPConfig.vlccmd.split()
516 if spawnVLC(VPStuff.vlcProc, VPConfig.vlcspawntimeout) and connectVLC():
517     logger.info("VLC spawned with pid " + str(VPStuff.vlc.pid))
518 else:
519     logger.error('Cannot spawn or connect to VLC!')
520     clean_proc()
521     sys.exit(1)
522
523 try:
524     logger.info("Using gevent %s" % gevent.__version__)
525     logger.info("Using psutil %s" % psutil.__version__)
526     logger.info("Using VLC %s" % VPStuff.vlcclient._vlcver)
527     logger.info("Server started.")
528     while True:
529         if not isRunning(VPStuff.vlc):
530             del VPStuff.vlc
531             if spawnVLC(VPStuff.vlcProc, VPConfig.vlcspawntimeout) and connectVLC():
532                 logger.info("VLC died, respawned it with pid " + str(VPStuff.vlc.pid))
533             else:
534                 logger.error("Cannot spawn VLC!")
535                 clean_proc()
536                 sys.exit(1)
537         # Return to our server tasks
538         server.handle_request()
539 except (KeyboardInterrupt, SystemExit):
540     shutdown()