+ def mark(self,stream_name):
+ self.streams[stream_name]=time.time()
+
+ def clean_streams(self,timeout=15):
+ self._resultlock.acquire()
+ to_stop=set()
+ for stream,lasttime in self.streams.iteritems():
+ if time.time()-lasttime>timeout:
+ to_stop.add(stream)
+ for stream in to_stop:
+ try:
+ self.stopBroadcast(stream)
+ except:
+ pass
+ self._resultlock.release()
+
+ def check_stream(self,stream_name):
+ if stream_name in self.streams:
+ self.streams[stream_name]=time.time()
+ return True
+ else:
+ return False
+