X-Git-Url: https://git.rvb.name/weathermon.git/blobdiff_plain/3a48d89e76fd2ab87dabf08632474c529cf77ad4..817101e336c2e0812345e8583aed2dae3007c5be:/weathermon diff --git a/weathermon b/weathermon index 8c94dd0..19688cb 100755 --- a/weathermon +++ b/weathermon @@ -2,9 +2,11 @@ import serial -from os import listdir +from os import listdir,system from os.path import isfile, join +from pprint import pprint + from termios import tcflush, TCIOFLUSH from time import sleep,time @@ -22,20 +24,28 @@ from urllib import urlencode from StringIO import StringIO searchpath = '/dev/serial/by-id/' -baud = 9600 -timeout = 5 +path = None +baud = None +timeout = None + +proc = None external_submit_interval = 320 +owm_submit_interval = 320 expire_interval = 1200 submit_time = time() -submit_queue = {} +submit_time_owm = time() import MySQLdb import ConfigParser +import thread +from threading import Timer + def find_port(): global serial_num + global path files = listdir(searchpath) for f in files: @@ -45,22 +55,71 @@ def find_port(): def open_port(path): - ser = serial.Serial(path,baud,timeout) + global proc, debug + + if debug>0: + print "Opening path "+path + + if path == "-": + return sys.stdin + + if path[0] == "=": + import subprocess + sleep(3) + command=path[1:] + try: + command,args=command.split(' ',1) + proc = subprocess.Popen([command,args],stdout=subprocess.PIPE) + except: + proc = subprocess.Popen([command],stdout=subprocess.PIPE) + import subprocess + return proc.stdout + + ser = serial.Serial(path,baud,timeout=timeout) if ser.portstr: tcflush(ser,TCIOFLUSH); return ser def read_port(ser): - line = ser.readline() - return line.strip() + try: + timeout_timer = Timer(timeout, thread.interrupt_main) + timeout_timer.start() + + line='' + while line=='': + line = ser.readline() + line = line.strip() + + return line + + except KeyboardInterrupt: + return "<>" + finally: + timeout_timer.cancel() def read_loop(ser,callback): + global proc + while True: try: line=read_port(ser) + if line=="<>": + if debug>0: + print "Reopening port..." + print line + ser.close() + if proc: + try: + if debug>0: + print "Terminating process..." + proc.terminate() + sleep(5) + finally: + None + ser=open_port(path) if line: callback(line) except KeyboardInterrupt: @@ -68,17 +127,42 @@ def read_loop(ser,callback): finally: None -def submit_narodmon(queue): +def print_log(str): + global logging + if debug>0: + print str + if logging == "on": + system("logger -t weathermon \""+str+"\"") + +def submit_narodmon(): - param = { 'ID':"{:X}".format(getnode())} + param = { 'ID':devid } - for sensor in queue: - value = submit_queue[sensor]['val'] - timestamp = submit_queue[sensor]['timestamp'] - digest = md5(sensor).hexdigest()[:18] - param[digest] = value; + c = database.cursor() + c.execute( + ''' + select nm_id,value from + ( + SELECT sensor_id,parameter_id,max(timestamp) timestamp,round(avg(value),1) value FROM meteo.sensor_values + where + timestamp>=date_add(now(), INTERVAL -300 SECOND) + group by sensor_id,parameter_id + ) v,meteo.ext_sensors e + where v.sensor_id=e.sensor_id and v.parameter_id=e.param_id + and nm_id is not null + ''' + ) + + queue=c.fetchall() + + if debug>1: + pprint(queue) + + for (sensor,value) in queue: + param[sensor] = value - print param + if debug>1: + pprint (param) url = "http://narodmon.ru/post.php" @@ -97,31 +181,49 @@ def submit_narodmon(queue): response_value = response_buffer.getvalue() - print 'Content: ', response_value + print_log('Narodmon response: '+response_value) return True except: - + exc_type, exc_value, exc_traceback = sys.exc_info() traceback.print_tb(exc_traceback, limit=1, file=sys.stdout) traceback.print_exception(exc_type, exc_value, exc_traceback, limit=2, file=sys.stdout) return False -def submit_owm(queue): +def submit_owm(): url = "http://openweathermap.org/data/post" params = {'name':owm_station, 'lat':owm_lat, 'long':owm_lon} - try: + c = database.cursor() + c.execute( + ''' + select owm_id,value from + ( + SELECT sensor_id,parameter_id,max(timestamp) timestamp,round(avg(value),1) value FROM meteo.sensor_values + where + timestamp>=date_add(now(), INTERVAL -300 SECOND) + group by sensor_id,parameter_id + ) v,meteo.ext_sensors e + where v.sensor_id=e.sensor_id and v.parameter_id=e.param_id + and owm_id is not null + ''' + ) + + queue=c.fetchall() + + if debug>1: + pprint(queue) + + for (sensor,value) in queue: + params[sensor]=value + if debug>1: + pprint (params) - try: - params['temp'] = queue[owm_temp]['val'] - params['pressure'] = queue[owm_pres]['val'] - params['humidity'] = queue[owm_humi]['val'] - except: - return False + try: response_buffer = StringIO() curl = pycurl.Curl() @@ -137,7 +239,7 @@ def submit_owm(queue): response_value = response_buffer.getvalue() - print 'Content: ', response_value + print_log('Openweathermap response: '+response_value) return True @@ -149,39 +251,32 @@ def submit_owm(queue): limit=2, file=sys.stdout) return False -def purge_queue(): - global submit_queue - clean = [] - for key in submit_queue: - if submit_queue[key]['timestamp'] < time()-expire_interval: - print "Expired value for "+key - clean.append(key) - for i in clean: - del submit_queue[i] - def submit_data(sensor_type,sensor_id,sensor_param,param_value): global submit_time - global submit_queue + global submit_time_owm + global external_submit_interval + global owm_submit_interval c = database.cursor() - c.execute('CALL meteo.submit_value(%s,%s,%s,%s)', (sensor_type,sensor_id,sensor_param,param_value)) + c.execute('CALL meteo.submit_value(%s,%s,%s,%s,NULL)', (sensor_type,sensor_id,sensor_param,param_value)) database.commit() - submit_queue[sensor_type+'.'+sensor_id+'.'+sensor_param]={'val':param_value,'timestamp':time()} - if time()>submit_time+external_submit_interval: - if submit_narodmon(submit_queue): - if owmuser: - submit_owm(submit_queue) - print 'Purging queue...' - submit_time=time() - purge_queue() - + if narmon=='on' and time()>submit_time+external_submit_interval: + submit_narodmon() + submit_time=time() + if owmuser and time()>submit_time_owm+owm_submit_interval: + submit_owm() + submit_time_owm=time() def process_str(str): + print_log("Received: "+str) try: msg_type, msg_body = str.split(':') + except: + return + try: if msg_type == 'STATUS': - print 'Status: ', msg_body + print_log('Status: '+msg_body) elif msg_type == 'ERROR': - print 'Error: ', msg_body + print_log('Error: '+ msg_body) elif msg_type == 'SENSOR': sens = msg_body.split(',') sensor = {} @@ -189,80 +284,163 @@ def process_str(str): sensor_id = None for rec in sens: key,value = rec.split('=') - if key == 'TYPE': - sensor_type = value - elif key == 'ID': - sensor_id = value - else: - sensor[key] = value + value=value.strip() + if len(value)>0: + if key == 'TYPE': + sensor_type = value + elif key == 'ID': + sensor_id = value + else: + sensor[key] = value if sensor_type: if not sensor_id: - sensor_id='DEFAULT'; + sensor_id=devid; for key in sensor: - print 'Type = ', sensor_type, ', ID = ', sensor_id, ', Param = ', key, ', Value = ', sensor[key] - submit_data(sensor_type,sensor_id,key,sensor[key]) + if sensor[key]: + print_log('Type = '+sensor_type+', ID = '+sensor_id+', Param = '+key+', Value = '+sensor[key]) + submit_data(sensor_type,sensor_id,key,sensor[key]) + else: + print_log('Error: got empty parameter value for '+sensor_type+'.'+sensor_id+'.'+key) + elif msg_type == "ALARM": + alarm = msg_body.split(',') + device_type = None + device_id = None + for rec in alarm: + key,value = rec.split('=') + value=value.strip() + if len(value)>0: + if key == 'TYPE': + device_type = value + if key == 'ID': + device_id = value + if device_type: + if not device_id: + device_id=devid; + print_log("Alarm: Type = "+device_type+", ID = "+device_id) + if alarm_script: + try: + proc = subprocess.Popen([alarm_script,device_type,device_id,msg_body]) + except: + print_log("Failed to execute alarm script") except: - print 'Exception processing...' + print_log('Exception processing...') + exc_type, exc_value, exc_traceback = sys.exc_info() + traceback.print_tb(exc_traceback, limit=1, file=sys.stdout) + traceback.print_exception(exc_type, exc_value, exc_traceback, + limit=5, file=sys.stdout) try: database.close() except: None reconnect() -def print_str(str): - print str - def weather_mon(): - path = find_port() + global path + + if path is None: + path = find_port() ser = open_port(path) read_loop(ser,process_str) def reconnect(): - - try: - global database - database = MySQLdb.connect(host=dbhost,user=dbuser,passwd=dbpasswd,use_unicode=True,connect_timeout=10) - database.set_character_set('utf8') - c = database.cursor() - c.execute('SET NAMES utf8;') - print "Database connected..." - - except: + connected = False + + while not connected: + + try: + + global database + database = MySQLdb.connect(host=dbhost,user=dbuser,passwd=dbpasswd,use_unicode=True,connect_timeout=10) + database.set_character_set('utf8') + c = database.cursor() + c.execute('SET NAMES utf8;') + print_log("Database connected...") + connected = True + + except: - print "Error connecting database" - sleep(30) + print_log("Error connecting database") + sleep(30) def main(): weather_mon() -try: - - cfg = ConfigParser.RawConfigParser(allow_no_value=True) - cfg.readfp(open('/etc/weathermon.conf')) - dbhost = cfg.get("mysql","host") - dbuser = cfg.get("mysql","user") - dbpasswd = cfg.get("mysql","passwd") - serialnum = cfg.get("serial","id") - owmuser = cfg.get("openweathermap","user") - owmpasswd = cfg.get("openweathermap",'passwd') - if owmuser: - owm_temp = cfg.get("openweathermap",'temp') - owm_pres = cfg.get("openweathermap",'pres') - owm_humi = cfg.get("openweathermap",'humi') - owm_lat = cfg.get("openweathermap",'lat') - owm_lon = cfg.get("openweathermap",'lon') - owm_station = cfg.get("openweathermap",'station') - reconnect() +def init(): + + global dbhost,dbuser,dbpasswd,path,serialnum,logging,debug; + global timeout,baud,narmon,devid; + global owmuser,owmpasswd,owm_temp,owm_pres,owm_humi,owm_lat,owm_lon,owm_station; + global alarm_script; + + try: + + cfg = ConfigParser.RawConfigParser(allow_no_value=True) + cfg.readfp(open('/etc/weathermon.conf')) + dbhost = cfg.get("mysql","host") + dbuser = cfg.get("mysql","user") + dbpasswd = cfg.get("mysql","passwd") + try: + debug = cfg.get("logging","debug") + except: + debug = 0 + try: + path = cfg.get("serial","port"); + except: + path = None + try: + serialnum = cfg.get("serial","id") + except: + serialnum = None + try: + logging = cfg.get("logging","enabled") + except: + logging = None + try: + timeout = int(cfg.get("serial","timeout")) + except: + timeout = 120 + try: + baud = int(cfg.get("serial","baud")) + except: + baud = 57600 + try: + narmon = cfg.get("narodmon","enable") + except: + narmon = 'off' + try: + devid = cfg.get("narodmon","devid") + except: + devid = "{:0>12X}".format(getnode()) + try: + owmuser = cfg.get("openweathermap","user") + owmpasswd = cfg.get("openweathermap",'passwd') + except: + owmuser = None + if owmuser: + owm_temp = cfg.get("openweathermap",'temp') + owm_pres = cfg.get("openweathermap",'pres') + owm_humi = cfg.get("openweathermap",'humi') + owm_lat = cfg.get("openweathermap",'lat') + owm_lon = cfg.get("openweathermap",'lon') + owm_station = cfg.get("openweathermap",'station') + try: + alarm_script = cfg.get("alarm","exec") + import subprocess + except: + alarm_script = None + + reconnect() -except: + except: - print "Cannot intialize system" - exit() + print_log("Cannot intialize system") + exit() if __name__ == "__main__": import sys reload(sys) sys.setdefaultencoding('utf-8') + init() main()