From: Roman Bazalevsky Date: Mon, 2 Nov 2015 20:12:14 +0000 (+0300) Subject: Переход на асинхронное закрытие каналов с таймаутом, чтобы при обрыве соединения... X-Git-Url: https://git.rvb.name/vpproxy.git/commitdiff_plain/1aebc74d991c35b7eb82395434d543d44f536089 Переход на асинхронное закрытие каналов с таймаутом, чтобы при обрыве соединения не тратить время на восстановление. Заодно предположительно устранена проблема с неотвечающим при одновременных запросах VLC. --- diff --git a/clientcounter.py b/clientcounter.py index bcfd0b2..a106d35 100644 --- a/clientcounter.py +++ b/clientcounter.py @@ -2,37 +2,54 @@ Simple Client Counter for VLC VLM ''' +import gevent +import logging class ClientCounter(object): def __init__(self): self.clients = dict() + self._lock = gevent.coros.RLock() self.total = 0 def get(self, id): return self.clients.get(id, (False,))[0] - def add(self, id, ip): + def add(self, id, ip, unic): + self._lock.acquire() + logger = logging.getLogger('clientcounter_Add') if self.clients.has_key(id): self.clients[id][0] += 1 - self.clients[id][1].append(ip) + self.clients[id][1].append([ip,unic]) + logger.info('counter for %s incremented to %s' % (id,self.clients[id][0])) else: - self.clients[id] = [1, [ip]] + self.clients[id] = [1, [[ip,unic]]] + logger.info('counter for %s started (1)' % id) self.total += 1 + logger.info('total count = %s' % self.total) + + self._lock.release() return self.clients[id][0] - def delete(self, id, ip): + def delete(self, id, ip, unic): + self._lock.acquire() + logger = logging.getLogger('clientcounter_Del') if self.clients.has_key(id): self.total -= 1 + logger.info('total count = %s' % self.total) if self.clients[id][0] == 1: del self.clients[id] + logger.info('counter for %s decremented to zero' % id ) + self._lock.release() return False else: self.clients[id][0] -= 1 - self.clients[id][1].remove(ip) + self.clients[id][1].remove([ip,unic]) + logger.info('counter for %s decremented to %s' % (id,self.clients[id][0])) else: + self._lock.release() return False + self._lock.release() return self.clients[id][0] - diff --git a/vlcclient/vlcclient.py b/vlcclient/vlcclient.py index 69d9fb9..80fc0f7 100644 --- a/vlcclient/vlcclient.py +++ b/vlcclient/vlcclient.py @@ -8,7 +8,8 @@ import gevent.coros import telnetlib import logging from vlcmessages import * - +import time +from pprint import pprint class VlcException(Exception): @@ -51,10 +52,13 @@ class VlcClient(object): # Logger logger = logging.getLogger('VlcClient_init') + # Streams + self.streams=dict() + # Making connection try: self._socket = telnetlib.Telnet(host, port, connect_timeout) - logger.debug("Successfully connected with VLC socket!") + logger.info("Successfully connected with VLC socket!") except Exception as e: raise VlcException( "Socket creation error! VLC is not running? ERROR: " + repr(e)) @@ -89,7 +93,7 @@ class VlcClient(object): # If socket is still alive (connected) if self._socket: try: - logger.debug("Destroying VlcClient...") + logger.info("Destroying VlcClient...") self._write(VlcMessage.request.SHUTDOWN) # Set shuttingDown flag for recvData self._shuttingDown.set() @@ -107,7 +111,7 @@ class VlcClient(object): try: # Write message - logger.debug('VLC command: ' + message) + logger.info('VLC command: ' + message) self._socket.write(message + "\r\n") except EOFError as e: raise VlcException("Vlc Write error! ERROR: " + repr(e)) @@ -133,6 +137,10 @@ class VlcClient(object): msg = VlcMessage.request.startBroadcast(stream_name, input, self._out_port, muxer, pre_access, qtype) self._write(msg) else: + if stream_name not in self.streams: + self._resultlock.release() + logger.error("Attempting to delete not existing stream %s" % stream_name) + return self._write(VlcMessage.request.stopBroadcast(stream_name)) try: @@ -145,12 +153,18 @@ class VlcClient(object): logger.error(broadcast + " result timeout") raise VlcException(broadcast + " result timeout") finally: + logger.info("working with %s stream: %s" % (stream_name,broadcast)) + if brtype == True: + self.streams[stream_name]=time.time() + else: + del self.streams[stream_name] self._resultlock.release() + logger.info("worked with %s stream: %s" % (stream_name,broadcast)) if brtype == True: - logger.debug("Broadcast started") + logger.info("Broadcast started") else: - logger.debug("Broadcast stopped") + logger.info("Broadcast stopped") def startBroadcast(self, stream_name, input, muxer='ts', pre_access='', qtype='default'): logger = logging.getLogger("VlcClient_startBroadcast") @@ -160,6 +174,20 @@ class VlcClient(object): def stopBroadcast(self, stream_name): return self._broadcast(False, stream_name) + def mark(self,stream_name): + self.streams[stream_name]=time.time() + + def clean_streams(self,timeout=15): + self._resultlock.acquire() + to_stop=set() + for stream,lasttime in self.streams.iteritems(): + print stream,lasttime + if time.time()-lasttime>timeout: + to_stop.add(stream) + for stream in to_stop: + self.stopBroadcast(stream) + self._resultlock.release() + def pauseBroadcast(self, stream_name): return self._write(VlcMessage.request.pauseBroadcast(stream_name)) @@ -220,10 +248,10 @@ class VlcClient(object): # Do not move this before error handlers! elif self._recvbuffer.startswith(VlcMessage.response.STARTOK): # Broadcast started - logger.debug("Broadcast started") + logger.info("Broadcast started") self._result.set(True) elif self._recvbuffer.startswith(VlcMessage.response.STOPOK): # Broadcast stopped - logger.debug("Broadcast stopped") + logger.info("Broadcast stopped") self._result.set(True) diff --git a/vphttp.py b/vphttp.py index 0c09c52..7ed52f7 100644 --- a/vphttp.py +++ b/vphttp.py @@ -37,7 +37,9 @@ try: except ImportError: pass +import uuid +from apscheduler.schedulers.background import BackgroundScheduler class HTTPHandler(BaseHTTPServer.BaseHTTPRequestHandler): @@ -86,11 +88,12 @@ class HTTPHandler(BaseHTTPServer.BaseHTTPRequestHandler): try: while True: - + if not self.clientconnected: logger.debug("Client is not connected, terminating") break + VPStuff.vlcclient.mark(self.vlcid) data = self.video.read(4096) if data and self.clientconnected: self.wfile.write(data) @@ -186,6 +189,8 @@ class HTTPHandler(BaseHTTPServer.BaseHTTPRequestHandler): self.handleRequest(headers_only) def handleRequest(self, headers_only): + + self.unic = uuid.uuid4() # Limit concurrent connections if 0 < VPConfig.maxconns <= VPStuff.clientcounter.total: @@ -217,7 +222,7 @@ class HTTPHandler(BaseHTTPServer.BaseHTTPRequestHandler): self.params.append('0') # Adding client to clientcounter - clients = VPStuff.clientcounter.add(self.reqtype+'\\'+self.path_unquoted, self.clientip) + clients = VPStuff.clientcounter.add(self.reqtype+'\\'+self.path_unquoted, self.clientip, self.unic) # If we are the one client, but sucessfully got vp instance from clientcounter, # then somebody is waiting in the videodestroydelay state @@ -309,6 +314,7 @@ class HTTPHandler(BaseHTTPServer.BaseHTTPRequestHandler): # End headers. Next goes video data self.end_headers() logger.debug("Headers sent") + self.headerssent = True # Run proxyReadWrite self.proxyReadWrite() @@ -332,20 +338,20 @@ class HTTPHandler(BaseHTTPServer.BaseHTTPRequestHandler): finally: logger.debug("END REQUEST") logger.info("Closed connection from " + self.clientip + " path " + self.path) - VPStuff.clientcounter.delete(self.reqtype+'\\'+self.path_unquoted, self.clientip) - if not VPStuff.clientcounter.get(self.reqtype+'\\'+self.path_unquoted): - try: - logger.debug("That was the last client, destroying VPClient") - logger.info("Stopping broadcasting " + self.path) - VPStuff.vlcclient.stopBroadcast(self.vlcid) - except: - pass - self.vp.destroy() - if not self.headerssent: - logger.error("Problem receiving video stream, no headers!") - if VPStuff.clientcounter.total == 0: - logger.error("Probably VLC hang") - VPStuff.vlc.kill() + VPStuff.clientcounter.delete(self.reqtype+'\\'+self.path_unquoted, self.clientip, self.unic) +# if not VPStuff.clientcounter.get(self.reqtype+'\\'+self.path_unquoted): +# try: +# logger.debug("That was the last client, destroying VPClient") +# logger.info("Stopping broadcasting " + self.path) +# VPStuff.vlcclient.stopBroadcast(self.vlcid) +# except vlcclient.VlcException: +# logger.error("VLC connection problem, %s client(s)!" % VPStuff.clientcounter.total) +# if VPStuff.clientcounter.total == 0: +# logger.error("Probably VLC hang") +# VPStuff.vlc.kill() +# except: +# pass + self.vp.destroy() class HTTPServer(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer): @@ -515,6 +521,14 @@ def _reloadconfig(signum=None, frame=None): from vpconfig import VPConfig logger.info('Config reloaded') +sched = BackgroundScheduler() +sched.start() + +def clean_streams(): + VPStuff.vlcclient.clean_streams(15) + +job = sched.add_job(clean_streams, 'interval', seconds=15) + # setting signal handlers try: gevent.signal(signal.SIGHUP, _reloadconfig) @@ -533,7 +547,7 @@ else: try: logger.info("Using gevent %s" % gevent.__version__) - logger.info("Using psutil %s" % psutil.__version__) + logger.info("Usig psutil %s" % psutil.__version__) logger.info("Using VLC %s" % VPStuff.vlcclient._vlcver) logger.info("Server started.") while True: diff --git a/xmlparse.py b/xmlparse.py index bc4c3cf..0e039f8 100644 --- a/xmlparse.py +++ b/xmlparse.py @@ -2,7 +2,6 @@ import xml.dom.minidom import sys,os -from pprint import pprint from plugins.modules.PlaylistGenerator import PlaylistGenerator tvguide_url="http://www.teleguide.info/download/new3/jtv.zip"