From: Roman Bazalevsky Date: Thu, 19 May 2016 17:06:13 +0000 (+0300) Subject: Добавлена корректная обработка падения процесса-поставщика данных (в моем случае... X-Git-Url: https://git.rvb.name/weathermon.git/commitdiff_plain/decb6723ffef77eb4e5ac3ceea675cd7fe0450b0?ds=sidebyside Добавлена корректная обработка падения процесса-поставщика данных (в моем случае - rtl_433-приемника). Обеспечен перезапуск дочернего процесса. --- diff --git a/weathermon b/weathermon new file mode 100755 index 0000000..8470f63 --- /dev/null +++ b/weathermon @@ -0,0 +1,433 @@ +#!/usr/bin/python + +import serial + +from os import listdir,system +from os.path import isfile, join + +from pprint import pprint + +from termios import tcflush, TCIOFLUSH + +from time import sleep,time + +from uuid import getnode + +from hashlib import md5 + +import socket + +import sys,traceback + +import pycurl +from urllib import urlencode +from StringIO import StringIO + +searchpath = '/dev/serial/by-id/' +path = None +baud = None +timeout = None + +proc = None + +external_submit_interval = 320 +owm_submit_interval = 320 +expire_interval = 1200 +submit_time = time() +submit_time_owm = time() + +import MySQLdb +import ConfigParser + +import thread +from threading import Timer + +def find_port(): + + global serial_num + global path + + files = listdir(searchpath) + for f in files: + if serialnum in f: + return join(searchpath,f) + return None + +def open_port(path): + + global proc + + print "Opening path "+path + + if path == "-": + return sys.stdin + + if path[0] == "=": + import subprocess + sleep(3) + command=path[1:] + try: + command,args=command.split(' ',1) + proc = subprocess.Popen([command,args],stdout=subprocess.PIPE) + except: + proc = subprocess.Popen([command],stdout=subprocess.PIPE) + import subprocess + return proc.stdout + + ser = serial.Serial(path,baud,timeout=timeout) + if ser.portstr: + tcflush(ser,TCIOFLUSH); + return ser + +def read_port(ser): + + try: + timeout_timer = Timer(timeout, thread.interrupt_main) + timeout_timer.start() + + line='' + while line=='': + line = ser.readline() + line = line.strip() + + return line + + except KeyboardInterrupt: + return "<>" + finally: + timeout_timer.cancel() + +def read_loop(ser,callback): + + global proc + + while True: + + try: + line=read_port(ser) + if line=="<>": + print "Reopening port..." + print line + ser.close() + if proc: + try: + print "Terminating process..." + proc.terminate() + sleep(5) + finally: + None + ser=open_port(path) + if line: + callback(line) + except KeyboardInterrupt: + break + finally: + None + +def print_log(str): + global logging + print str + if logging == "on": + system("logger -t weathermon \""+str+"\"") + +def submit_narodmon(): + + param = { 'ID':devid } + + c = database.cursor() + c.execute( + ''' + select nm_id,value from + ( + SELECT sensor_id,parameter_id,max(timestamp) timestamp,round(avg(value),1) value FROM meteo.sensor_values + where + timestamp>=date_add(now(), INTERVAL -300 SECOND) + group by sensor_id,parameter_id + ) v,meteo.ext_sensors e + where v.sensor_id=e.sensor_id and v.parameter_id=e.param_id + and nm_id is not null + ''' + ) + + queue=c.fetchall() + + pprint(queue) + + for (sensor,value) in queue: + param[sensor] = value + pprint (param) + + url = "http://narodmon.ru/post.php" + + try: + + response_buffer = StringIO() + curl = pycurl.Curl() + + curl.setopt(curl.URL, url) + curl.setopt(curl.WRITEFUNCTION, response_buffer.write) + curl.setopt(curl.POST, 1) + curl.setopt(curl.POSTFIELDS, urlencode(param)) + + curl.perform() + curl.close() + + response_value = response_buffer.getvalue() + + print_log('Narodmon response: '+response_value) + + return True + + except: + + exc_type, exc_value, exc_traceback = sys.exc_info() + traceback.print_tb(exc_traceback, limit=1, file=sys.stdout) + traceback.print_exception(exc_type, exc_value, exc_traceback, + limit=2, file=sys.stdout) + return False + +def submit_owm(): + + url = "http://openweathermap.org/data/post" + params = {'name':owm_station, 'lat':owm_lat, 'long':owm_lon} + + c = database.cursor() + c.execute( + ''' + select owm_id,value from + ( + SELECT sensor_id,parameter_id,max(timestamp) timestamp,round(avg(value),1) value FROM meteo.sensor_values + where + timestamp>=date_add(now(), INTERVAL -300 SECOND) + group by sensor_id,parameter_id + ) v,meteo.ext_sensors e + where v.sensor_id=e.sensor_id and v.parameter_id=e.param_id + and owm_id is not null + ''' + ) + + queue=c.fetchall() + + pprint(queue) + + for (sensor,value) in queue: + params[sensor]=value + pprint (params) + + try: + + response_buffer = StringIO() + curl = pycurl.Curl() + + curl.setopt(curl.URL, url) + curl.setopt(curl.USERPWD, '%s:%s' % (owmuser, owmpasswd)) + curl.setopt(curl.WRITEFUNCTION, response_buffer.write) + curl.setopt(curl.POST, 1) + curl.setopt(curl.POSTFIELDS, urlencode(params)) + + curl.perform() + curl.close() + + response_value = response_buffer.getvalue() + + print_log('Openweathermap response: '+response_value) + + return True + + except: + + exc_type, exc_value, exc_traceback = sys.exc_info() + traceback.print_tb(exc_traceback, limit=1, file=sys.stdout) + traceback.print_exception(exc_type, exc_value, exc_traceback, + limit=2, file=sys.stdout) + return False + +def submit_data(sensor_type,sensor_id,sensor_param,param_value): + global submit_time + global submit_time_owm + global external_submit_interval + global owm_submit_interval + c = database.cursor() + c.execute('CALL meteo.submit_value(%s,%s,%s,%s,NULL)', (sensor_type,sensor_id,sensor_param,param_value)) + database.commit() + if narmon=='on' and time()>submit_time+external_submit_interval: + submit_narodmon() + submit_time=time() + if owmuser and time()>submit_time_owm+owm_submit_interval: + submit_owm() + submit_time_owm=time() + +def process_str(str): + print_log("Received: "+str) + try: + msg_type, msg_body = str.split(':') + except: + return + try: + if msg_type == 'STATUS': + print_log('Status: '+msg_body) + elif msg_type == 'ERROR': + print_log('Error: '+ msg_body) + elif msg_type == 'SENSOR': + sens = msg_body.split(',') + sensor = {} + sensor_type = None + sensor_id = None + for rec in sens: + key,value = rec.split('=') + value=value.strip() + if len(value)>0: + if key == 'TYPE': + sensor_type = value + elif key == 'ID': + sensor_id = value + else: + sensor[key] = value + if sensor_type: + if not sensor_id: + sensor_id=devid; + for key in sensor: + if sensor[key]: + print_log('Type = '+sensor_type+', ID = '+sensor_id+', Param = '+key+', Value = '+sensor[key]) + submit_data(sensor_type,sensor_id,key,sensor[key]) + else: + print_log('Error: got empty parameter value for '+sensor_type+'.'+sensor_id+'.'+key) + elif msg_type == "ALARM": + alarm = msg_body.split(',') + device_type = None + device_id = None + for rec in alarm: + key,value = rec.split('=') + value=value.strip() + if len(value)>0: + if key == 'TYPE': + device_type = value + if key == 'ID': + device_id = value + if device_type: + if not device_id: + device_id=devid; + print_log("Alarm: Type = "+device_type+", ID = "+device_id) + if alarm_script: + try: + proc = subprocess.Popen([alarm_script,device_type,device_id,msg_body]) + except: + print_log("Failed to execute alarm script") + except: + print_log('Exception processing...') + exc_type, exc_value, exc_traceback = sys.exc_info() + traceback.print_tb(exc_traceback, limit=1, file=sys.stdout) + traceback.print_exception(exc_type, exc_value, exc_traceback, + limit=5, file=sys.stdout) + try: + database.close() + except: + None + reconnect() + +def weather_mon(): + + global path + + if path is None: + path = find_port() + ser = open_port(path) + read_loop(ser,process_str) + +def reconnect(): + + connected = False + + while not connected: + + try: + + global database + database = MySQLdb.connect(host=dbhost,user=dbuser,passwd=dbpasswd,use_unicode=True,connect_timeout=10) + database.set_character_set('utf8') + c = database.cursor() + c.execute('SET NAMES utf8;') + print_log("Database connected...") + connected = True + + except: + + print_log("Error connecting database") + sleep(30) + +def main(): + weather_mon() + +def init(): + + global dbhost,dbuser,dbpasswd,path,serialnum,logging; + global timeout,baud,narmon,devid; + global owmuser,owmpasswd,owm_temp,owm_pres,owm_humi,owm_lat,owm_lon,owm_station; + global alarm_script; + + try: + + cfg = ConfigParser.RawConfigParser(allow_no_value=True) + cfg.readfp(open('/etc/weathermon.conf')) + dbhost = cfg.get("mysql","host") + dbuser = cfg.get("mysql","user") + dbpasswd = cfg.get("mysql","passwd") + try: + path = cfg.get("serial","port"); + except: + path = None + try: + serialnum = cfg.get("serial","id") + except: + serialnum = None + try: + logging = cfg.get("logging","enabled") + except: + logging = None + try: + timeout = int(cfg.get("serial","timeout")) + except: + timeout = 120 + try: + baud = int(cfg.get("serial","baud")) + except: + baud = 57600 + try: + narmon = cfg.get("narodmon","enable") + except: + narmon = 'off' + try: + devid = cfg.get("narodmon","devid") + except: + devid = "{:0>12X}".format(getnode()) + try: + owmuser = cfg.get("openweathermap","user") + owmpasswd = cfg.get("openweathermap",'passwd') + except: + owmuser = None + if owmuser: + owm_temp = cfg.get("openweathermap",'temp') + owm_pres = cfg.get("openweathermap",'pres') + owm_humi = cfg.get("openweathermap",'humi') + owm_lat = cfg.get("openweathermap",'lat') + owm_lon = cfg.get("openweathermap",'lon') + owm_station = cfg.get("openweathermap",'station') + try: + alarm_script = cfg.get("alarm","exec") + import subprocess + except: + alarm_script = None + + reconnect() + + except: + + print_log("Cannot intialize system") + exit() + +if __name__ == "__main__": + import sys + reload(sys) + sys.setdefaultencoding('utf-8') + init() + main() diff --git a/weathermon.uci~ b/weathermon.uci~ deleted file mode 100644 index 490ba91..0000000 --- a/weathermon.uci~ +++ /dev/null @@ -1,12 +0,0 @@ -config internal 'web' - option url http://estia.rvb-home.lan/meteo/send.php - option user meteo - option password somestrictpassword - -config internal 'serial' - option port /dev/ttyUSB0 - option timeout 100 - option baud 9600 - -config internal 'logging' - option enabled on