Переход с webm на ogg по причинам несовместимости с новыми версиями ffmpeg.
[vpproxy.git] / vphttp.py
1 #!/usr/bin/env python2
2 # -*- coding: utf-8 -*-
3 '''
4 VPProxy: HTTP/HLS Stream to HTTP Multiplexing Proxy
5
6 Based on AceProxy (https://github.com/ValdikSS/AceProxy) design
7 '''
8 import traceback
9 import gevent
10 import gevent.monkey
11 # Monkeypatching and all the stuff
12 gevent.monkey.patch_all()
13 # Startup delay for daemon restart
14 gevent.sleep(5)
15 import glob
16 import os
17 import signal
18 import sys
19 import logging
20 import psutil
21 import BaseHTTPServer
22 import SocketServer
23 from socket import error as SocketException
24 from socket import SHUT_RDWR
25 import urllib2
26 import hashlib
27 import vpconfig
28 from vpconfig import VPConfig
29 import vlcclient
30 import plugins.modules.ipaddr as ipaddr
31 from clientcounter import ClientCounter
32 from plugins.modules.PluginInterface import VPProxyPlugin
33 try:
34     import pwd
35     import grp
36 except ImportError:
37     pass
38
39
40
41 class HTTPHandler(BaseHTTPServer.BaseHTTPRequestHandler):
42
43     requestlist = []
44
45     def handle_one_request(self):
46         '''
47         Add request to requestlist, handle request and remove from the list
48         '''
49         HTTPHandler.requestlist.append(self)
50         BaseHTTPServer.BaseHTTPRequestHandler.handle_one_request(self)
51         HTTPHandler.requestlist.remove(self)
52
53     def closeConnection(self):
54         '''
55         Disconnecting client
56         '''
57         if self.clientconnected:
58             self.clientconnected = False
59             try:
60                 self.wfile.close()
61                 self.rfile.close()
62                 self.connection.shutdown(SHUT_RDWR)
63             except:
64                 pass
65
66     def dieWithError(self, errorcode=500):
67         '''
68         Close connection with error
69         '''
70         logging.warning("Dying with error")
71         if self.clientconnected:
72             self.send_error(errorcode)
73             self.end_headers()
74             self.closeConnection()
75
76     def proxyReadWrite(self):
77         '''
78         Read video stream and send it to client
79         '''
80         logger = logging.getLogger('http_proxyReadWrite')
81         logger.debug("Started")
82
83         self.vlcstate = True
84         self.streamstate = True
85
86         try:
87             while True:
88
89                 if not self.clientconnected:
90                     logger.debug("Client is not connected, terminating")
91                     break
92
93                 data = self.video.read(4096)
94                 if data and self.clientconnected:
95                     self.wfile.write(data)
96                 else:
97                     logger.warning("Video connection closed")
98                     break
99
100         except SocketException:
101             # Video connection dropped
102             logger.warning("Video connection dropped")
103         finally:
104             self.video.close()
105             self.closeConnection()
106
107     def hangDetector(self):
108         '''
109         Detect client disconnection while in the middle of something
110         or just normal connection close.
111         '''
112         logger = logging.getLogger('http_hangDetector')
113         try:
114             while True:
115                 if not self.rfile.read():
116                     break
117         except:
118             pass
119         finally:
120             self.clientconnected = False
121             logger.debug("Client disconnected")
122             try:
123                 self.requestgreenlet.kill()
124             except:
125                 pass
126             finally:
127                 gevent.sleep()
128             return
129
130     def do_HEAD(self):
131         return self.do_GET(headers_only=True)
132
133     def do_GET(self, headers_only=False):
134         '''
135         GET request handler
136         '''
137         logger = logging.getLogger('http_HTTPHandler')
138         self.clientconnected = True
139         # Don't wait videodestroydelay if error happened
140         self.errorhappened = True
141         # Headers sent flag for fake headers UAs
142         self.headerssent = False
143         # Current greenlet
144         self.requestgreenlet = gevent.getcurrent()
145         # Connected client IP address
146         self.clientip = self.request.getpeername()[0]
147
148         if VPConfig.firewall:
149             # If firewall enabled
150             self.clientinrange = any(map(lambda i: ipaddr.IPAddress(self.clientip) \
151                                 in ipaddr.IPNetwork(i), VPConfig.firewallnetranges))
152
153             if (VPConfig.firewallblacklistmode and self.clientinrange) or \
154                 (not VPConfig.firewallblacklistmode and not self.clientinrange):
155                     logger.info('Dropping connection from ' + self.clientip + ' due to ' + \
156                                 'firewall rules')
157                     self.dieWithError(403)  # 403 Forbidden
158                     return
159
160         logger.info("Accepted connection from " + self.clientip + " path " + self.path)
161
162         try:
163             self.splittedpath = self.path.split('/')
164             self.reqtype = self.splittedpath[1].lower()
165             # If first parameter is 'pid' or 'torrent' or it should be handled
166             # by plugin
167             if not (self.reqtype in ('get','mp4','ogg') or self.reqtype in VPStuff.pluginshandlers):
168                 self.dieWithError(400)  # 400 Bad Request
169                 return
170         except IndexError:
171             self.dieWithError(400)  # 400 Bad Request
172             return
173
174         # Handle request with plugin handler
175         if self.reqtype in VPStuff.pluginshandlers:
176             try:
177                 VPStuff.pluginshandlers.get(self.reqtype).handle(self)
178             except Exception as e:
179                 logger.error('Plugin exception: ' + repr(e))
180                 logger.error(traceback.format_exc())
181                 self.dieWithError()
182             finally:
183                 self.closeConnection()
184                 return
185         self.handleRequest(headers_only)
186
187     def handleRequest(self, headers_only):
188
189         # Limit concurrent connections
190         print VPStuff.clientcounter.total
191         if 0 < VPConfig.maxconns <= VPStuff.clientcounter.total:
192             logger.debug("Maximum connections reached, can't serve this")
193             self.dieWithError(503)  # 503 Service Unavailable
194             return
195
196         # Pretend to work fine with Fake UAs or HEAD request.
197         useragent = self.headers.get('User-Agent')
198         logger.debug("HTTP User Agent:"+useragent)
199         fakeua = useragent and useragent in VPConfig.fakeuas
200         if headers_only or fakeua:
201             if fakeua:
202                 logger.debug("Got fake UA: " + self.headers.get('User-Agent'))
203             # Return 200 and exit
204             self.send_response(200)
205             self.send_header("Content-Type", "video/mpeg")
206             self.end_headers()
207             self.closeConnection()
208             return
209
210         self.path_unquoted = urllib2.unquote('/'.join(self.splittedpath[2:]))
211         # Make list with parameters
212         self.params = list()
213         for i in xrange(3, 8):
214             try:
215                 self.params.append(int(self.splittedpath[i]))
216             except (IndexError, ValueError):
217                 self.params.append('0')
218
219         # Adding client to clientcounter
220         clients = VPStuff.clientcounter.add(self.reqtype+'\\'+self.path_unquoted, self.clientip)
221         # If we are the one client, but sucessfully got vp instance from clientcounter,
222         # then somebody is waiting in the videodestroydelay state
223
224         # Check if we are first client
225         if VPStuff.clientcounter.get(self.reqtype+'\\'+self.path_unquoted)==1:
226             logger.debug("First client, should create VLC session")
227             shouldcreatevp = True
228         else:
229             logger.debug("Can reuse existing session")
230             shouldcreatevp = False
231
232         self.vlcid = hashlib.md5(self.reqtype+'\\'+self.path_unquoted).hexdigest()
233
234         # Send fake headers if this User-Agent is in fakeheaderuas tuple
235         if fakeua:
236             logger.debug(
237                 "Sending fake headers for " + useragent)
238             self.send_response(200)
239             if self.reqtype=="ogg":
240                 self.send_header("Content-Type", "video/ogg")
241             else:
242                 self.send_header("Content-Type", "video/mpeg")
243             self.end_headers()
244             # Do not send real headers at all
245             self.headerssent = True
246
247         try:
248             self.hanggreenlet = gevent.spawn(self.hangDetector)
249             logger.debug("hangDetector spawned")
250             gevent.sleep()
251
252             # Getting URL
253             self.errorhappened = False
254
255             if shouldcreatevp:
256                 logger.debug("Got url " + self.path_unquoted)
257                 # Force ffmpeg demuxing if set in config
258                 if VPConfig.vlcforceffmpeg:
259                     self.vlcprefix = 'http/ffmpeg://'
260                 else:
261                     self.vlcprefix = ''
262
263                 logger.info("Starting broadcasting "+self.path)                    
264                 VPStuff.vlcclient.startBroadcast(
265                     self.vlcid, self.vlcprefix + self.path_unquoted, VPConfig.vlcmux, VPConfig.vlcpreaccess, self.reqtype)
266                 # Sleep a bit, because sometimes VLC doesn't open port in
267                 # time
268                 gevent.sleep(0.5)
269
270             # Building new VLC url
271             self.url = 'http://' + VPConfig.vlchost + \
272                 ':' + str(VPConfig.vlcoutport) + '/' + self.vlcid
273             logger.debug("VLC url " + self.url)
274
275             # Sending client headers to videostream
276             self.video = urllib2.Request(self.url)
277             for key in self.headers.dict:
278                 self.video.add_header(key, self.headers.dict[key])
279
280             self.video = urllib2.urlopen(self.video)
281
282             # Sending videostream headers to client
283             if not self.headerssent:
284                 self.send_response(self.video.getcode())
285                 if self.video.info().dict.has_key('connection'):
286                     del self.video.info().dict['connection']
287                 if self.video.info().dict.has_key('server'):
288                     del self.video.info().dict['server']
289                 if self.video.info().dict.has_key('transfer-encoding'):
290                     del self.video.info().dict['transfer-encoding']
291                 if self.video.info().dict.has_key('content-type'):
292                     del self.video.info().dict['content-type']
293                 if self.video.info().dict.has_key('keep-alive'):
294                     del self.video.info().dict['keep-alive']
295
296                 for key in self.video.info().dict:
297                     self.send_header(key, self.video.info().dict[key])
298
299                 if self.reqtype=="ogg":
300                     self.send_header("Content-Type", "video/ogg")
301                 else:
302                     self.send_header("Content-Type", "video/mpeg")
303
304                 # End headers. Next goes video data
305                 self.end_headers()
306                 logger.debug("Headers sent")
307
308             # Run proxyReadWrite
309             self.proxyReadWrite()
310
311             # Waiting until hangDetector is joined
312             self.hanggreenlet.join()
313             logger.debug("Request handler finished")
314
315         except (vpclient.VPException, vlcclient.VlcException, urllib2.URLError) as e:
316             logger.error("Exception: " + repr(e))
317             self.errorhappened = True
318             self.dieWithError()
319         except gevent.GreenletExit:
320             # hangDetector told us about client disconnection
321             pass
322         except Exception:
323             # Unknown exception
324             logger.error(traceback.format_exc())
325             self.errorhappened = True
326             self.dieWithError()
327         finally:
328             logger.debug("END REQUEST")
329             logger.info("Closed connection from " + self.clientip + " path " + self.path)
330             VPStuff.clientcounter.delete(self.reqtype+'\\'+self.path_unquoted, self.clientip)
331             if not VPStuff.clientcounter.get(self.reqtype+'\\'+self.path_unquoted):
332                 try:
333                     logger.debug("That was the last client, destroying VPClient")
334                     logger.info("Stopping broadcasting " + self.path)
335                     VPStuff.vlcclient.stopBroadcast(self.vlcid)
336                 except:
337                     pass
338                 self.vp.destroy()
339
340
341 class HTTPServer(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer):
342
343     def handle_error(self, request, client_address):
344         # Do not print HTTP tracebacks
345         pass
346
347
348 class VPStuff(object):
349     '''
350     Inter-class interaction class
351     '''
352     vlcclient=None
353
354 # taken from http://stackoverflow.com/questions/2699907/dropping-root-permissions-in-python
355 def drop_privileges(uid_name, gid_name='nogroup'):
356
357     # Get the uid/gid from the name
358     running_uid = pwd.getpwnam(uid_name).pw_uid
359     running_uid_home = pwd.getpwnam(uid_name).pw_dir
360     running_gid = grp.getgrnam(gid_name).gr_gid
361
362     # Remove group privileges
363     os.setgroups([])
364
365     # Try setting the new uid/gid
366     os.setgid(running_gid)
367     os.setuid(running_uid)
368
369     # Ensure a very conservative umask
370     old_umask = os.umask(077)
371
372     if os.getuid() == running_uid and os.getgid() == running_gid:
373         # could be useful
374         os.environ['HOME'] = running_uid_home
375         return True
376     return False
377
378 logging.basicConfig(
379     filename=VPConfig.logpath + 'vphttp.log' if VPConfig.loggingtoafile else None,
380     format='%(asctime)s %(levelname)s %(name)s: %(message)s', datefmt='%d.%m.%Y %H:%M:%S', level=VPConfig.debug)
381 logger = logging.getLogger('INIT')
382
383 # Loading plugins
384 # Trying to change dir (would fail in freezed state)
385 try:
386     os.chdir(os.path.dirname(os.path.realpath(__file__)))
387 except:
388     pass
389 # Creating dict of handlers
390 VPStuff.pluginshandlers = dict()
391 # And a list with plugin instances
392 VPStuff.pluginlist = list()
393 pluginsmatch = glob.glob('plugins/*_plugin.py')
394 sys.path.insert(0, 'plugins')
395 pluginslist = [os.path.splitext(os.path.basename(x))[0] for x in pluginsmatch]
396 for i in pluginslist:
397     plugin = __import__(i)
398     plugname = i.split('_')[0].capitalize()
399     try:
400         plugininstance = getattr(plugin, plugname)(VPConfig, VPStuff)
401     except Exception as e:
402         logger.error("Cannot load plugin " + plugname + ": " + repr(e))
403         continue
404     logger.debug('Plugin loaded: ' + plugname)
405     for j in plugininstance.handlers:
406         VPStuff.pluginshandlers[j] = plugininstance
407     VPStuff.pluginlist.append(plugininstance)
408
409 # Check whether we can bind to the defined port safely
410 if os.getuid() != 0 and VPConfig.httpport <= 1024:
411     logger.error("Cannot bind to port " + str(VPConfig.httpport) + " without root privileges")
412     sys.exit(1)
413
414 server = HTTPServer((VPConfig.httphost, VPConfig.httpport), HTTPHandler)
415 logger = logging.getLogger('HTTP')
416
417 # Dropping root privileges if needed
418 if VPConfig.vpproxyuser and os.getuid() == 0:
419     if drop_privileges(VPConfig.vpproxyuser):
420         logger.info("Dropped privileges to user " + VPConfig.vpproxyuser)
421     else:
422         logger.error("Cannot drop privileges to user " + VPConfig.vpproxyuser)
423         sys.exit(1)
424
425 # Creating ClientCounter
426 VPStuff.clientcounter = ClientCounter()
427
428 DEVNULL = open(os.devnull, 'wb')
429
430 # Spawning procedures
431 def spawnVLC(cmd, delay = 0):
432     try:
433         VPStuff.vlc = psutil.Popen(cmd) #, stdout=DEVNULL, stderr=DEVNULL)
434         gevent.sleep(delay)
435         return True
436     except:
437         return False
438
439 def connectVLC():
440     try:
441         VPStuff.vlcclient = vlcclient.VlcClient(
442             host=VPConfig.vlchost, port=VPConfig.vlcport, password=VPConfig.vlcpass,
443             out_port=VPConfig.vlcoutport)
444         return True
445     except vlcclient.VlcException as e:
446         return False
447
448 def isRunning(process):
449     if psutil.version_info[0] >= 2:
450         if process.is_running() and process.status() != psutil.STATUS_ZOMBIE:
451             return True
452     else:  # for older versions of psutil
453         if process.is_running() and process.status != psutil.STATUS_ZOMBIE:
454             return True
455     return False
456
457 def findProcess(name):
458     for proc in psutil.process_iter():
459         try:
460             pinfo = proc.as_dict(attrs=['pid', 'name'])
461             if pinfo['name'] == name:
462                 return pinfo['pid']
463         except psutil.AccessDenied:
464             # System process
465             pass
466         except psutil.NoSuchProcess:
467             # Process terminated
468             pass
469     return None
470
471 def clean_proc():
472     # Trying to close all spawned processes gracefully
473     if isRunning(VPStuff.vlc):
474         if VPStuff.vlcclient:
475             VPStuff.vlcclient.destroy()
476         gevent.sleep(1)
477     if isRunning(VPStuff.vlc):
478         # or not :)
479         VPStuff.vlc.kill()
480
481 # This is what we call to stop the server completely
482 def shutdown(signum = 0, frame = 0):
483     logger.info("Stopping server...")
484     # Closing all client connections
485     for connection in server.RequestHandlerClass.requestlist:
486         try:
487             # Set errorhappened to prevent waiting for videodestroydelay
488             connection.errorhappened = True
489             connection.closeConnection()
490         except:
491             logger.warning("Cannot kill a connection!")
492     clean_proc()
493     server.server_close()
494     sys.exit()
495
496 def _reloadconfig(signum=None, frame=None):
497     '''
498     Reload configuration file.
499     SIGHUP handler.
500     '''
501     global VPConfig
502
503     logger = logging.getLogger('reloadconfig')
504     reload(vpconfig)
505     from vpconfig import VPConfig
506     logger.info('Config reloaded')
507
508 # setting signal handlers
509 try:
510     gevent.signal(signal.SIGHUP, _reloadconfig)
511     gevent.signal(signal.SIGTERM, shutdown)
512 except AttributeError:
513     pass
514
515 name = 'vlc'
516 VPStuff.vlcProc = VPConfig.vlccmd.split()
517 if spawnVLC(VPStuff.vlcProc, VPConfig.vlcspawntimeout) and connectVLC():
518     logger.info("VLC spawned with pid " + str(VPStuff.vlc.pid))
519 else:
520     logger.error('Cannot spawn or connect to VLC!')
521     clean_proc()
522     sys.exit(1)
523
524 try:
525     logger.info("Using gevent %s" % gevent.__version__)
526     logger.info("Using psutil %s" % psutil.__version__)
527     logger.info("Using VLC %s" % VPStuff.vlcclient._vlcver)
528     logger.info("Server started.")
529     while True:
530         if not isRunning(VPStuff.vlc):
531             del VPStuff.vlc
532             if spawnVLC(VPStuff.vlcProc, VPConfig.vlcspawntimeout) and connectVLC():
533                 logger.info("VLC died, respawned it with pid " + str(VPStuff.vlc.pid))
534             else:
535                 logger.error("Cannot spawn VLC!")
536                 clean_proc()
537                 sys.exit(1)
538         # Return to our server tasks
539         server.handle_request()
540 except (KeyboardInterrupt, SystemExit):
541     shutdown()