Переход на асинхронное закрытие каналов с таймаутом, чтобы при обрыве соединения...
authorRoman Bazalevsky <rvb@rvb.name>
Mon, 2 Nov 2015 20:12:14 +0000 (23:12 +0300)
committerRoman Bazalevsky <rvb@rvb.name>
Mon, 2 Nov 2015 20:12:14 +0000 (23:12 +0300)
clientcounter.py
vlcclient/vlcclient.py
vphttp.py
xmlparse.py

index bcfd0b2eff746eb230f48297f6a1edf2e65d6d8e..a106d35d19733f20402cd093b9430029ad336239 100644 (file)
@@ -2,37 +2,54 @@
 Simple Client Counter for VLC VLM
 '''
 
 Simple Client Counter for VLC VLM
 '''
 
+import gevent
+import logging
 
 class ClientCounter(object):
 
     def __init__(self):
         self.clients = dict()
 
 class ClientCounter(object):
 
     def __init__(self):
         self.clients = dict()
+        self._lock = gevent.coros.RLock()
         self.total = 0
 
     def get(self, id):
         return self.clients.get(id, (False,))[0]
 
         self.total = 0
 
     def get(self, id):
         return self.clients.get(id, (False,))[0]
 
-    def add(self, id, ip):
+    def add(self, id, ip, unic):
+        self._lock.acquire()
+        logger = logging.getLogger('clientcounter_Add')
         if self.clients.has_key(id):
             self.clients[id][0] += 1
         if self.clients.has_key(id):
             self.clients[id][0] += 1
-            self.clients[id][1].append(ip)
+            self.clients[id][1].append([ip,unic])
+            logger.info('counter for %s incremented to %s' % (id,self.clients[id][0]))
         else:
         else:
-            self.clients[id] = [1, [ip]]
+            self.clients[id] = [1, [[ip,unic]]]
+            logger.info('counter for %s started (1)' % id)
 
         self.total += 1
 
         self.total += 1
+        logger.info('total count = %s' % self.total)
+
+        self._lock.release()
         return self.clients[id][0]
 
         return self.clients[id][0]
 
-    def delete(self, id, ip):
+    def delete(self, id, ip, unic):
+        self._lock.acquire()
+        logger = logging.getLogger('clientcounter_Del')
         if self.clients.has_key(id):
             self.total -= 1
         if self.clients.has_key(id):
             self.total -= 1
+            logger.info('total count = %s' % self.total)
             if self.clients[id][0] == 1:
                 del self.clients[id]
             if self.clients[id][0] == 1:
                 del self.clients[id]
+                logger.info('counter for %s decremented to zero' % id )
+                self._lock.release()
                 return False
             else:
                 self.clients[id][0] -= 1
                 return False
             else:
                 self.clients[id][0] -= 1
-                self.clients[id][1].remove(ip)
+                self.clients[id][1].remove([ip,unic])
+                logger.info('counter for %s decremented to %s' % (id,self.clients[id][0]))
         else:
         else:
+            self._lock.release()
             return False
 
             return False
 
+        self._lock.release()
         return self.clients[id][0]
         return self.clients[id][0]
-
index 69d9fb92f00507d2b71604ac833d04a4bf741049..80fc0f7850974d3aea32ffd03107a368d29a8b26 100644 (file)
@@ -8,7 +8,8 @@ import gevent.coros
 import telnetlib
 import logging
 from vlcmessages import *
 import telnetlib
 import logging
 from vlcmessages import *
-
+import time
+from pprint import pprint
 
 class VlcException(Exception):
 
 
 class VlcException(Exception):
 
@@ -51,10 +52,13 @@ class VlcClient(object):
         # Logger
         logger = logging.getLogger('VlcClient_init')
 
         # Logger
         logger = logging.getLogger('VlcClient_init')
 
+        # Streams
+        self.streams=dict()
+
         # Making connection
         try:
             self._socket = telnetlib.Telnet(host, port, connect_timeout)
         # Making connection
         try:
             self._socket = telnetlib.Telnet(host, port, connect_timeout)
-            logger.debug("Successfully connected with VLC socket!")
+            logger.info("Successfully connected with VLC socket!")
         except Exception as e:
             raise VlcException(
                 "Socket creation error! VLC is not running? ERROR: " + repr(e))
         except Exception as e:
             raise VlcException(
                 "Socket creation error! VLC is not running? ERROR: " + repr(e))
@@ -89,7 +93,7 @@ class VlcClient(object):
         # If socket is still alive (connected)
         if self._socket:
             try:
         # If socket is still alive (connected)
         if self._socket:
             try:
-                logger.debug("Destroying VlcClient...")
+                logger.info("Destroying VlcClient...")
                 self._write(VlcMessage.request.SHUTDOWN)
                 # Set shuttingDown flag for recvData
                 self._shuttingDown.set()
                 self._write(VlcMessage.request.SHUTDOWN)
                 # Set shuttingDown flag for recvData
                 self._shuttingDown.set()
@@ -107,7 +111,7 @@ class VlcClient(object):
 
         try:
             # Write message
 
         try:
             # Write message
-            logger.debug('VLC command: ' + message)
+            logger.info('VLC command: ' + message)
             self._socket.write(message + "\r\n")
         except EOFError as e:
             raise VlcException("Vlc Write error! ERROR: " + repr(e))
             self._socket.write(message + "\r\n")
         except EOFError as e:
             raise VlcException("Vlc Write error! ERROR: " + repr(e))
@@ -133,6 +137,10 @@ class VlcClient(object):
             msg = VlcMessage.request.startBroadcast(stream_name, input, self._out_port, muxer, pre_access, qtype)
             self._write(msg)
         else:
             msg = VlcMessage.request.startBroadcast(stream_name, input, self._out_port, muxer, pre_access, qtype)
             self._write(msg)
         else:
+            if stream_name not in self.streams:
+                self._resultlock.release()
+                logger.error("Attempting to delete not existing stream %s" % stream_name)
+                return
             self._write(VlcMessage.request.stopBroadcast(stream_name))
 
         try:
             self._write(VlcMessage.request.stopBroadcast(stream_name))
 
         try:
@@ -145,12 +153,18 @@ class VlcClient(object):
             logger.error(broadcast + " result timeout")
             raise VlcException(broadcast + " result timeout")
         finally:
             logger.error(broadcast + " result timeout")
             raise VlcException(broadcast + " result timeout")
         finally:
+            logger.info("working with %s stream: %s" % (stream_name,broadcast))
+            if brtype == True:
+                self.streams[stream_name]=time.time()
+            else:
+                del self.streams[stream_name]
             self._resultlock.release()
             self._resultlock.release()
+            logger.info("worked with %s stream: %s" % (stream_name,broadcast))
 
         if brtype == True:
 
         if brtype == True:
-            logger.debug("Broadcast started")
+            logger.info("Broadcast started")
         else:
         else:
-            logger.debug("Broadcast stopped")
+            logger.info("Broadcast stopped")
 
     def startBroadcast(self, stream_name, input, muxer='ts', pre_access='', qtype='default'):
         logger = logging.getLogger("VlcClient_startBroadcast")
 
     def startBroadcast(self, stream_name, input, muxer='ts', pre_access='', qtype='default'):
         logger = logging.getLogger("VlcClient_startBroadcast")
@@ -160,6 +174,20 @@ class VlcClient(object):
     def stopBroadcast(self, stream_name):
         return self._broadcast(False, stream_name)
 
     def stopBroadcast(self, stream_name):
         return self._broadcast(False, stream_name)
 
+    def mark(self,stream_name):
+      self.streams[stream_name]=time.time()
+
+    def clean_streams(self,timeout=15):
+      self._resultlock.acquire()
+      to_stop=set()
+      for stream,lasttime in self.streams.iteritems():
+        print stream,lasttime
+        if time.time()-lasttime>timeout:
+          to_stop.add(stream)
+      for stream in to_stop:    
+        self.stopBroadcast(stream)
+      self._resultlock.release()
+
     def pauseBroadcast(self, stream_name):
         return self._write(VlcMessage.request.pauseBroadcast(stream_name))
 
     def pauseBroadcast(self, stream_name):
         return self._write(VlcMessage.request.pauseBroadcast(stream_name))
 
@@ -220,10 +248,10 @@ class VlcClient(object):
                 # Do not move this before error handlers!
                 elif self._recvbuffer.startswith(VlcMessage.response.STARTOK):
                     # Broadcast started
                 # Do not move this before error handlers!
                 elif self._recvbuffer.startswith(VlcMessage.response.STARTOK):
                     # Broadcast started
-                    logger.debug("Broadcast started")
+                    logger.info("Broadcast started")
                     self._result.set(True)
 
                 elif self._recvbuffer.startswith(VlcMessage.response.STOPOK):
                     # Broadcast stopped
                     self._result.set(True)
 
                 elif self._recvbuffer.startswith(VlcMessage.response.STOPOK):
                     # Broadcast stopped
-                    logger.debug("Broadcast stopped")
+                    logger.info("Broadcast stopped")
                     self._result.set(True)
                     self._result.set(True)
index 0c09c5297bb8e040d32460a1201c517f473544ac..7ed52f787296d6cfc382be6e5b4938be851af4b3 100644 (file)
--- a/vphttp.py
+++ b/vphttp.py
@@ -37,7 +37,9 @@ try:
 except ImportError:
     pass
 
 except ImportError:
     pass
 
+import uuid
 
 
+from apscheduler.schedulers.background import BackgroundScheduler
 
 class HTTPHandler(BaseHTTPServer.BaseHTTPRequestHandler):
 
 
 class HTTPHandler(BaseHTTPServer.BaseHTTPRequestHandler):
 
@@ -86,11 +88,12 @@ class HTTPHandler(BaseHTTPServer.BaseHTTPRequestHandler):
 
         try:
             while True:
 
         try:
             while True:
-
+            
                 if not self.clientconnected:
                     logger.debug("Client is not connected, terminating")
                     break
 
                 if not self.clientconnected:
                     logger.debug("Client is not connected, terminating")
                     break
 
+                VPStuff.vlcclient.mark(self.vlcid)
                 data = self.video.read(4096)
                 if data and self.clientconnected:
                     self.wfile.write(data)
                 data = self.video.read(4096)
                 if data and self.clientconnected:
                     self.wfile.write(data)
@@ -186,6 +189,8 @@ class HTTPHandler(BaseHTTPServer.BaseHTTPRequestHandler):
         self.handleRequest(headers_only)
 
     def handleRequest(self, headers_only):
         self.handleRequest(headers_only)
 
     def handleRequest(self, headers_only):
+      
+        self.unic = uuid.uuid4()  
 
         # Limit concurrent connections
         if 0 < VPConfig.maxconns <= VPStuff.clientcounter.total:
 
         # Limit concurrent connections
         if 0 < VPConfig.maxconns <= VPStuff.clientcounter.total:
@@ -217,7 +222,7 @@ class HTTPHandler(BaseHTTPServer.BaseHTTPRequestHandler):
                 self.params.append('0')
 
         # Adding client to clientcounter
                 self.params.append('0')
 
         # Adding client to clientcounter
-        clients = VPStuff.clientcounter.add(self.reqtype+'\\'+self.path_unquoted, self.clientip)
+        clients = VPStuff.clientcounter.add(self.reqtype+'\\'+self.path_unquoted, self.clientip, self.unic)
         # If we are the one client, but sucessfully got vp instance from clientcounter,
         # then somebody is waiting in the videodestroydelay state
 
         # If we are the one client, but sucessfully got vp instance from clientcounter,
         # then somebody is waiting in the videodestroydelay state
 
@@ -309,6 +314,7 @@ class HTTPHandler(BaseHTTPServer.BaseHTTPRequestHandler):
                 # End headers. Next goes video data
                 self.end_headers()
                 logger.debug("Headers sent")
                 # End headers. Next goes video data
                 self.end_headers()
                 logger.debug("Headers sent")
+                self.headerssent = True
 
             # Run proxyReadWrite
             self.proxyReadWrite()
 
             # Run proxyReadWrite
             self.proxyReadWrite()
@@ -332,20 +338,20 @@ class HTTPHandler(BaseHTTPServer.BaseHTTPRequestHandler):
         finally:
             logger.debug("END REQUEST")
             logger.info("Closed connection from " + self.clientip + " path " + self.path)
         finally:
             logger.debug("END REQUEST")
             logger.info("Closed connection from " + self.clientip + " path " + self.path)
-            VPStuff.clientcounter.delete(self.reqtype+'\\'+self.path_unquoted, self.clientip)
-            if not VPStuff.clientcounter.get(self.reqtype+'\\'+self.path_unquoted):
-                try:
-                    logger.debug("That was the last client, destroying VPClient")
-                    logger.info("Stopping broadcasting " + self.path)
-                    VPStuff.vlcclient.stopBroadcast(self.vlcid)
-                except:
-                    pass
-                self.vp.destroy()
-            if not self.headerssent:
-                logger.error("Problem receiving video stream, no headers!")
-                if VPStuff.clientcounter.total == 0:
-                    logger.error("Probably VLC hang")
-                    VPStuff.vlc.kill()
+            VPStuff.clientcounter.delete(self.reqtype+'\\'+self.path_unquoted, self.clientip, self.unic)
+#            if not VPStuff.clientcounter.get(self.reqtype+'\\'+self.path_unquoted):
+#                try:
+#                    logger.debug("That was the last client, destroying VPClient")
+#                    logger.info("Stopping broadcasting " + self.path)
+#                    VPStuff.vlcclient.stopBroadcast(self.vlcid)
+#                except vlcclient.VlcException:    
+#                  logger.error("VLC connection problem, %s client(s)!" % VPStuff.clientcounter.total)
+#                  if VPStuff.clientcounter.total == 0:
+#                      logger.error("Probably VLC hang")
+#                      VPStuff.vlc.kill()
+#                except:
+#                    pass
+            self.vp.destroy()
 
 class HTTPServer(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer):
 
 
 class HTTPServer(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer):
 
@@ -515,6 +521,14 @@ def _reloadconfig(signum=None, frame=None):
     from vpconfig import VPConfig
     logger.info('Config reloaded')
 
     from vpconfig import VPConfig
     logger.info('Config reloaded')
 
+sched = BackgroundScheduler()
+sched.start()
+
+def clean_streams():
+  VPStuff.vlcclient.clean_streams(15)
+
+job = sched.add_job(clean_streams, 'interval', seconds=15)
+
 # setting signal handlers
 try:
     gevent.signal(signal.SIGHUP, _reloadconfig)
 # setting signal handlers
 try:
     gevent.signal(signal.SIGHUP, _reloadconfig)
@@ -533,7 +547,7 @@ else:
 
 try:
     logger.info("Using gevent %s" % gevent.__version__)
 
 try:
     logger.info("Using gevent %s" % gevent.__version__)
-    logger.info("Using psutil %s" % psutil.__version__)
+    logger.info("Usig psutil %s" % psutil.__version__)
     logger.info("Using VLC %s" % VPStuff.vlcclient._vlcver)
     logger.info("Server started.")
     while True:
     logger.info("Using VLC %s" % VPStuff.vlcclient._vlcver)
     logger.info("Server started.")
     while True:
index bc4c3cf532d66baf4bb91496e75e1042c304aac5..0e039f8ee9ad77f4e76d2a44f23f48dc211bb4fb 100644 (file)
@@ -2,7 +2,6 @@
 
 import xml.dom.minidom
 import sys,os
 
 import xml.dom.minidom
 import sys,os
-from pprint import pprint
 from plugins.modules.PlaylistGenerator import PlaylistGenerator
 
 tvguide_url="http://www.teleguide.info/download/new3/jtv.zip"
 from plugins.modules.PlaylistGenerator import PlaylistGenerator
 
 tvguide_url="http://www.teleguide.info/download/new3/jtv.zip"