Добавлена опциональная авторизация MQTT
[weathermon.git] / weathermon
index 7b78ca735ec97f0d0aa49415dea047d1297eccba..19688cb9607ed4c95f086eb9d13195353a8b82fa 100755 (executable)
@@ -24,20 +24,24 @@ from urllib import urlencode
 from StringIO import StringIO
 
 searchpath = '/dev/serial/by-id/'
-baud = 57600
 path = None
-timeout = 5
+baud = None
+timeout = None
+
+proc = None
 
 external_submit_interval = 320
 owm_submit_interval = 320
 expire_interval = 1200
 submit_time = time()
 submit_time_owm = time()
-submit_queue = {}
 
 import MySQLdb
 import ConfigParser
 
+import thread
+from threading import Timer
+
 def find_port():
 
   global serial_num
@@ -51,22 +55,71 @@ def find_port():
 
 def open_port(path):
 
-  ser = serial.Serial(path,baud,timeout)
+  global proc, debug
+
+  if debug>0:
+    print "Opening path "+path
+
+  if path == "-":
+    return sys.stdin
+
+  if path[0] == "=":
+    import subprocess
+    sleep(3)
+    command=path[1:]
+    try:
+      command,args=command.split(' ',1)
+      proc = subprocess.Popen([command,args],stdout=subprocess.PIPE)
+    except:
+      proc = subprocess.Popen([command],stdout=subprocess.PIPE)
+    import subprocess
+    return proc.stdout
+
+  ser = serial.Serial(path,baud,timeout=timeout)
   if ser.portstr:
     tcflush(ser,TCIOFLUSH);
   return ser
   
 def read_port(ser):
 
-  line = ser.readline()
-  return line.strip()
+  try: 
+    timeout_timer = Timer(timeout, thread.interrupt_main)
+    timeout_timer.start()
+
+    line=''
+    while line=='':
+      line = ser.readline()
+      line = line.strip()
+      
+    return line
+    
+  except KeyboardInterrupt:
+    return "<<TIMEOUT>>"
+  finally:
+    timeout_timer.cancel()
   
 def read_loop(ser,callback):
 
+  global proc
+
   while True:
   
     try:
       line=read_port(ser)
+      if line=="<<TIMEOUT>>":
+        if debug>0:
+          print "Reopening port..."
+          print line
+        ser.close()
+        if proc:
+          try:
+            if debug>0:
+              print "Terminating process..."
+            proc.terminate()
+            sleep(5)
+          finally:
+            None  
+        ser=open_port(path)
       if line:
         callback(line)
     except KeyboardInterrupt:
@@ -76,23 +129,40 @@ def read_loop(ser,callback):
 
 def print_log(str):
   global logging
-  print str
+  if debug>0:
+    print str
   if logging == "on":
     system("logger -t weathermon \""+str+"\"")
 
-def submit_narodmon(queue):
-
-  param = { 'ID':"{:X}".format(getnode())}
+def submit_narodmon():
 
-  pprint(queue)
+  param = { 'ID':devid }
 
-  for sensor in queue:
-    value = submit_queue[sensor]['val']
-    timestamp = submit_queue[sensor]['timestamp']
-    digest = md5(sensor).hexdigest()[:18]
-    param[digest] = value;
+  c = database.cursor()
+  c.execute(
+    '''
+    select nm_id,value from
+    (
+    SELECT sensor_id,parameter_id,max(timestamp) timestamp,round(avg(value),1) value FROM meteo.sensor_values
+    where 
+    timestamp>=date_add(now(), INTERVAL -300 SECOND)
+    group by sensor_id,parameter_id
+    ) v,meteo.ext_sensors e
+    where v.sensor_id=e.sensor_id and v.parameter_id=e.param_id
+    and nm_id is not null
+    '''
+  )
+
+  queue=c.fetchall()
+
+  if debug>1:
+    pprint(queue)
+
+  for (sensor,value) in queue:
+    param[sensor] = value  
   
-  pprint (param)
+  if debug>1:
+    pprint (param)
 
   url = "http://narodmon.ru/post.php"
 
@@ -116,26 +186,44 @@ def submit_narodmon(queue):
     return True
                                                                                       
   except:
-                                                                                        
+           
     exc_type, exc_value, exc_traceback = sys.exc_info()
     traceback.print_tb(exc_traceback, limit=1, file=sys.stdout)
     traceback.print_exception(exc_type, exc_value, exc_traceback,
                               limit=2, file=sys.stdout)  
     return False  
                                       
-def submit_owm(queue):
+def submit_owm():
 
   url = "http://openweathermap.org/data/post"
   params = {'name':owm_station, 'lat':owm_lat, 'long':owm_lon}
 
-  try:
+  c = database.cursor()
+  c.execute(
+    '''
+    select owm_id,value from
+    (
+    SELECT sensor_id,parameter_id,max(timestamp) timestamp,round(avg(value),1) value FROM meteo.sensor_values
+    where 
+    timestamp>=date_add(now(), INTERVAL -300 SECOND)
+    group by sensor_id,parameter_id
+    ) v,meteo.ext_sensors e
+    where v.sensor_id=e.sensor_id and v.parameter_id=e.param_id
+    and owm_id is not null
+    '''
+  )
+
+  queue=c.fetchall()
+
+  if debug>1:
+    pprint(queue)
+
+  for (sensor,value) in queue:
+    params[sensor]=value
+  if debug>1:  
+    pprint (params)
 
-    try:
-      params['temp'] = queue[owm_temp]['val']
-      params['pressure'] = queue[owm_pres]['val']
-      params['humidity'] = queue[owm_humi]['val']
-    except:
-      return False  
+  try:
 
     response_buffer = StringIO()
     curl = pycurl.Curl()
@@ -163,34 +251,20 @@ def submit_owm(queue):
                                   limit=2, file=sys.stdout)
     return False  
 
-def purge_queue():
-  global submit_queue
-  clean = []
-  for key in submit_queue:
-    if submit_queue[key]['timestamp'] < time()-expire_interval:
-      print_log("Expired value for "+key)
-      clean.append(key)
-  for i in clean:
-    del submit_queue[i]    
-  
 def submit_data(sensor_type,sensor_id,sensor_param,param_value):
   global submit_time
   global submit_time_owm
   global external_submit_interval
   global owm_submit_interval
-  global submit_queue
   c = database.cursor()
   c.execute('CALL meteo.submit_value(%s,%s,%s,%s,NULL)', (sensor_type,sensor_id,sensor_param,param_value))
   database.commit()
-  submit_queue[sensor_type+'.'+sensor_id+'.'+sensor_param]={'val':param_value,'timestamp':time()}
-  if time()>submit_time+external_submit_interval:
-    submit_narodmon(submit_queue)
+  if narmon=='on' and time()>submit_time+external_submit_interval:
+    submit_narodmon()
     submit_time=time()
   if owmuser and time()>submit_time_owm+owm_submit_interval:
-    submit_owm(submit_queue)
-    submit_time_owm=time()
-  purge_queue()
-        
+    submit_owm()
+    submit_time_owm=time()        
  
 def process_str(str):
   print_log("Received: "+str)
@@ -210,21 +284,44 @@ def process_str(str):
       sensor_id = None
       for rec in sens:
         key,value = rec.split('=')
-        if key == 'TYPE':
-          sensor_type = value
-        elif key == 'ID':
-          sensor_id = value  
-        else:  
-          sensor[key] = value
+       value=value.strip()
+       if len(value)>0:
+          if key == 'TYPE':
+            sensor_type = value
+          elif key == 'ID':
+            sensor_id = value  
+          else:  
+            sensor[key] = value
       if sensor_type:    
         if not sensor_id:
-          sensor_id='DEFAULT';    
+          sensor_id=devid;    
       for key in sensor:
-       if sensor[key] is not None:
+       if sensor[key]:
           print_log('Type = '+sensor_type+', ID = '+sensor_id+', Param = '+key+', Value = '+sensor[key])
           submit_data(sensor_type,sensor_id,key,sensor[key])
         else:
           print_log('Error: got empty parameter value for '+sensor_type+'.'+sensor_id+'.'+key)
+    elif msg_type == "ALARM":
+      alarm = msg_body.split(',')
+      device_type = None
+      device_id = None
+      for rec in alarm:
+        key,value = rec.split('=')
+        value=value.strip()
+        if len(value)>0:
+          if key == 'TYPE':
+            device_type = value
+          if key == 'ID':
+            device_id = value
+      if device_type:
+        if not device_id:
+          device_id=devid;
+        print_log("Alarm: Type = "+device_type+", ID = "+device_id)
+        if alarm_script:
+          try:
+            proc = subprocess.Popen([alarm_script,device_type,device_id,msg_body])
+          except:
+            print_log("Failed to execute alarm script")          
   except:
     print_log('Exception processing...')
     exc_type, exc_value, exc_traceback = sys.exc_info()
@@ -270,43 +367,80 @@ def reconnect():
 def main():
   weather_mon()
 
-try:
+def init():
+
+  global dbhost,dbuser,dbpasswd,path,serialnum,logging,debug;
+  global timeout,baud,narmon,devid;
+  global owmuser,owmpasswd,owm_temp,owm_pres,owm_humi,owm_lat,owm_lon,owm_station;
+  global alarm_script;
 
-  cfg = ConfigParser.RawConfigParser(allow_no_value=True)
-  cfg.readfp(open('/etc/weathermon.conf'))
-  dbhost = cfg.get("mysql","host")
-  dbuser = cfg.get("mysql","user")
-  dbpasswd = cfg.get("mysql","passwd")
-  try:
-    path = cfg.get("serial","port");
-  except:
-    path = None
-  try:    
-    serialnum = cfg.get("serial","id")
-  except:
-    serialnum = None  
   try:
-    logging = cfg.get("logging","enabled")
-  except:
-    logging = None
-  owmuser = cfg.get("openweathermap","user")
-  owmpasswd = cfg.get("openweathermap",'passwd')
-  if owmuser:
-    owm_temp = cfg.get("openweathermap",'temp')
-    owm_pres = cfg.get("openweathermap",'pres')
-    owm_humi = cfg.get("openweathermap",'humi')
-    owm_lat = cfg.get("openweathermap",'lat')
-    owm_lon = cfg.get("openweathermap",'lon')
-    owm_station = cfg.get("openweathermap",'station')
-  reconnect()
+
+    cfg = ConfigParser.RawConfigParser(allow_no_value=True)
+    cfg.readfp(open('/etc/weathermon.conf'))
+    dbhost = cfg.get("mysql","host")
+    dbuser = cfg.get("mysql","user")
+    dbpasswd = cfg.get("mysql","passwd")
+    try:
+      debug = cfg.get("logging","debug")
+    except:
+      debug = 0  
+    try:
+      path = cfg.get("serial","port");
+    except:
+      path = None
+    try:    
+      serialnum = cfg.get("serial","id")
+    except:
+      serialnum = None  
+    try:
+      logging = cfg.get("logging","enabled")
+    except:
+      logging = None
+    try:
+      timeout = int(cfg.get("serial","timeout"))
+    except:
+      timeout = 120
+    try:
+      baud = int(cfg.get("serial","baud"))
+    except:
+      baud = 57600
+    try:
+      narmon = cfg.get("narodmon","enable")
+    except:
+      narmon = 'off'  
+    try:
+      devid = cfg.get("narodmon","devid")
+    except:
+      devid = "{:0>12X}".format(getnode())   
+    try:  
+      owmuser = cfg.get("openweathermap","user")
+      owmpasswd = cfg.get("openweathermap",'passwd')
+    except:
+      owmuser = None  
+    if owmuser:
+      owm_temp = cfg.get("openweathermap",'temp')
+      owm_pres = cfg.get("openweathermap",'pres')
+      owm_humi = cfg.get("openweathermap",'humi')
+      owm_lat = cfg.get("openweathermap",'lat')
+      owm_lon = cfg.get("openweathermap",'lon')
+      owm_station = cfg.get("openweathermap",'station')
+    try:
+      alarm_script = cfg.get("alarm","exec")
+      import subprocess
+    except:
+      alarm_script = None
+        
+    reconnect()
  
-except:
+  except:
 
-  print_log("Cannot intialize system")
-  exit() 
+    print_log("Cannot intialize system")
+    exit() 
   
 if __name__ == "__main__":
   import sys
   reload(sys)
   sys.setdefaultencoding('utf-8')
+  init()
   main()