Добавлена корректная обработка падения процесса-поставщика данных (в моем случае...
[weathermon.git] / weathermon
1 #!/usr/bin/python
2
3 import serial
4
5 from os import listdir,system
6 from os.path import isfile, join
7
8 from pprint import pprint
9
10 from termios import tcflush, TCIOFLUSH
11
12 from time import sleep,time
13
14 from uuid import getnode
15
16 from hashlib import md5
17
18 import socket
19
20 import sys,traceback
21
22 import pycurl
23 from urllib import urlencode
24 from StringIO import StringIO
25
26 searchpath = '/dev/serial/by-id/'
27 path = None
28 baud = None
29 timeout = None
30
31 proc = None
32
33 external_submit_interval = 320
34 owm_submit_interval = 320
35 expire_interval = 1200
36 submit_time = time()
37 submit_time_owm = time()
38
39 import MySQLdb
40 import ConfigParser
41
42 import thread
43 from threading import Timer
44
45 def find_port():
46
47   global serial_num
48   global path
49
50   files = listdir(searchpath)
51   for f in files:
52     if serialnum in f:
53       return join(searchpath,f)
54   return None
55
56 def open_port(path):
57
58   global proc
59
60   print "Opening path "+path
61
62   if path == "-":
63     return sys.stdin
64
65   if path[0] == "=":
66     import subprocess
67     sleep(3)
68     command=path[1:]
69     try:
70       command,args=command.split(' ',1)
71       proc = subprocess.Popen([command,args],stdout=subprocess.PIPE)
72     except:
73       proc = subprocess.Popen([command],stdout=subprocess.PIPE)
74     import subprocess
75     return proc.stdout
76
77   ser = serial.Serial(path,baud,timeout=timeout)
78   if ser.portstr:
79     tcflush(ser,TCIOFLUSH);
80   return ser
81   
82 def read_port(ser):
83
84   try: 
85     timeout_timer = Timer(timeout, thread.interrupt_main)
86     timeout_timer.start()
87
88     line=''
89     while line=='':
90       line = ser.readline()
91       line = line.strip()
92       
93     return line
94     
95   except KeyboardInterrupt:
96     return "<<TIMEOUT>>"
97   finally:
98     timeout_timer.cancel()
99   
100 def read_loop(ser,callback):
101
102   global proc
103
104   while True:
105   
106     try:
107       line=read_port(ser)
108       if line=="<<TIMEOUT>>":
109         print "Reopening port..."
110         print line
111         ser.close()
112         if proc:
113           try:
114             print "Terminating process..."
115             proc.terminate()
116             sleep(5)
117           finally:
118             None  
119         ser=open_port(path)
120       if line:
121         callback(line)
122     except KeyboardInterrupt:
123       break
124     finally:
125       None
126
127 def print_log(str):
128   global logging
129   print str
130   if logging == "on":
131     system("logger -t weathermon \""+str+"\"")
132
133 def submit_narodmon():
134
135   param = { 'ID':devid }
136
137   c = database.cursor()
138   c.execute(
139     '''
140     select nm_id,value from
141     (
142     SELECT sensor_id,parameter_id,max(timestamp) timestamp,round(avg(value),1) value FROM meteo.sensor_values
143     where 
144     timestamp>=date_add(now(), INTERVAL -300 SECOND)
145     group by sensor_id,parameter_id
146     ) v,meteo.ext_sensors e
147     where v.sensor_id=e.sensor_id and v.parameter_id=e.param_id
148     and nm_id is not null
149     '''
150   )
151
152   queue=c.fetchall()
153
154   pprint(queue)
155
156   for (sensor,value) in queue:
157     param[sensor] = value  
158   pprint (param)
159
160   url = "http://narodmon.ru/post.php"
161
162   try:
163       
164     response_buffer = StringIO()
165     curl = pycurl.Curl()
166                                               
167     curl.setopt(curl.URL, url)
168     curl.setopt(curl.WRITEFUNCTION, response_buffer.write)   
169     curl.setopt(curl.POST, 1)
170     curl.setopt(curl.POSTFIELDS, urlencode(param))
171                                                                   
172     curl.perform()
173     curl.close()  
174                                                                           
175     response_value = response_buffer.getvalue()
176                                                                               
177     print_log('Narodmon response: '+response_value)
178                                                                                   
179     return True
180                                                                                       
181   except:
182            
183     exc_type, exc_value, exc_traceback = sys.exc_info()
184     traceback.print_tb(exc_traceback, limit=1, file=sys.stdout)
185     traceback.print_exception(exc_type, exc_value, exc_traceback,
186                               limit=2, file=sys.stdout)  
187     return False  
188                                       
189 def submit_owm():
190
191   url = "http://openweathermap.org/data/post"
192   params = {'name':owm_station, 'lat':owm_lat, 'long':owm_lon}
193
194   c = database.cursor()
195   c.execute(
196     '''
197     select owm_id,value from
198     (
199     SELECT sensor_id,parameter_id,max(timestamp) timestamp,round(avg(value),1) value FROM meteo.sensor_values
200     where 
201     timestamp>=date_add(now(), INTERVAL -300 SECOND)
202     group by sensor_id,parameter_id
203     ) v,meteo.ext_sensors e
204     where v.sensor_id=e.sensor_id and v.parameter_id=e.param_id
205     and owm_id is not null
206     '''
207   )
208
209   queue=c.fetchall()
210
211   pprint(queue)
212
213   for (sensor,value) in queue:
214     params[sensor]=value
215   pprint (params)
216
217   try:
218
219     response_buffer = StringIO()
220     curl = pycurl.Curl()
221
222     curl.setopt(curl.URL, url)
223     curl.setopt(curl.USERPWD, '%s:%s' % (owmuser, owmpasswd))
224     curl.setopt(curl.WRITEFUNCTION, response_buffer.write)
225     curl.setopt(curl.POST, 1)
226     curl.setopt(curl.POSTFIELDS, urlencode(params))
227
228     curl.perform()
229     curl.close()
230     
231     response_value = response_buffer.getvalue()
232     
233     print_log('Openweathermap response: '+response_value)
234   
235     return True
236   
237   except:
238
239     exc_type, exc_value, exc_traceback = sys.exc_info()
240     traceback.print_tb(exc_traceback, limit=1, file=sys.stdout)
241     traceback.print_exception(exc_type, exc_value, exc_traceback,
242                                   limit=2, file=sys.stdout)
243     return False  
244
245 def submit_data(sensor_type,sensor_id,sensor_param,param_value):
246   global submit_time
247   global submit_time_owm
248   global external_submit_interval
249   global owm_submit_interval
250   c = database.cursor()
251   c.execute('CALL meteo.submit_value(%s,%s,%s,%s,NULL)', (sensor_type,sensor_id,sensor_param,param_value))
252   database.commit()
253   if narmon=='on' and time()>submit_time+external_submit_interval:
254     submit_narodmon()
255     submit_time=time()
256   if owmuser and time()>submit_time_owm+owm_submit_interval:
257     submit_owm()
258     submit_time_owm=time()        
259  
260 def process_str(str):
261   print_log("Received: "+str)
262   try:
263     msg_type, msg_body = str.split(':')
264   except:
265     return
266   try:  
267     if msg_type == 'STATUS':
268       print_log('Status: '+msg_body)
269     elif msg_type == 'ERROR':
270       print_log('Error: '+ msg_body)
271     elif msg_type == 'SENSOR':
272       sens = msg_body.split(',')
273       sensor = {}
274       sensor_type = None
275       sensor_id = None
276       for rec in sens:
277         key,value = rec.split('=')
278         value=value.strip()
279         if len(value)>0:
280           if key == 'TYPE':
281             sensor_type = value
282           elif key == 'ID':
283             sensor_id = value  
284           else:  
285             sensor[key] = value
286       if sensor_type:    
287         if not sensor_id:
288           sensor_id=devid;    
289       for key in sensor:
290         if sensor[key]:
291           print_log('Type = '+sensor_type+', ID = '+sensor_id+', Param = '+key+', Value = '+sensor[key])
292           submit_data(sensor_type,sensor_id,key,sensor[key])
293         else:
294           print_log('Error: got empty parameter value for '+sensor_type+'.'+sensor_id+'.'+key)
295     elif msg_type == "ALARM":
296       alarm = msg_body.split(',')
297       device_type = None
298       device_id = None
299       for rec in alarm:
300         key,value = rec.split('=')
301         value=value.strip()
302         if len(value)>0:
303           if key == 'TYPE':
304             device_type = value
305           if key == 'ID':
306             device_id = value
307       if device_type:
308         if not device_id:
309           device_id=devid;
310         print_log("Alarm: Type = "+device_type+", ID = "+device_id)
311         if alarm_script:
312           try:
313             proc = subprocess.Popen([alarm_script,device_type,device_id,msg_body])
314           except:
315             print_log("Failed to execute alarm script")          
316   except:
317     print_log('Exception processing...')
318     exc_type, exc_value, exc_traceback = sys.exc_info()
319     traceback.print_tb(exc_traceback, limit=1, file=sys.stdout)
320     traceback.print_exception(exc_type, exc_value, exc_traceback,
321                               limit=5, file=sys.stdout)  
322     try:
323       database.close()
324     except:
325       None
326     reconnect()
327
328 def weather_mon():
329
330   global path
331
332   if path is None:
333     path = find_port()
334   ser = open_port(path)
335   read_loop(ser,process_str)
336
337 def reconnect():
338
339   connected = False
340
341   while not connected:            
342
343     try:
344
345       global database
346       database = MySQLdb.connect(host=dbhost,user=dbuser,passwd=dbpasswd,use_unicode=True,connect_timeout=10)
347       database.set_character_set('utf8')
348       c = database.cursor()
349       c.execute('SET NAMES utf8;')
350       print_log("Database connected...")
351       connected = True 
352   
353     except:
354         
355       print_log("Error connecting database")
356       sleep(30)
357       
358 def main():
359   weather_mon()
360
361 def init():
362
363   global dbhost,dbuser,dbpasswd,path,serialnum,logging;
364   global timeout,baud,narmon,devid;
365   global owmuser,owmpasswd,owm_temp,owm_pres,owm_humi,owm_lat,owm_lon,owm_station;
366   global alarm_script;
367
368   try:
369
370     cfg = ConfigParser.RawConfigParser(allow_no_value=True)
371     cfg.readfp(open('/etc/weathermon.conf'))
372     dbhost = cfg.get("mysql","host")
373     dbuser = cfg.get("mysql","user")
374     dbpasswd = cfg.get("mysql","passwd")
375     try:
376       path = cfg.get("serial","port");
377     except:
378       path = None
379     try:    
380       serialnum = cfg.get("serial","id")
381     except:
382       serialnum = None  
383     try:
384       logging = cfg.get("logging","enabled")
385     except:
386       logging = None
387     try:
388       timeout = int(cfg.get("serial","timeout"))
389     except:
390       timeout = 120
391     try:
392       baud = int(cfg.get("serial","baud"))
393     except:
394       baud = 57600
395     try:
396       narmon = cfg.get("narodmon","enable")
397     except:
398       narmon = 'off'  
399     try:
400       devid = cfg.get("narodmon","devid")
401     except:
402       devid = "{:0>12X}".format(getnode())   
403     try:  
404       owmuser = cfg.get("openweathermap","user")
405       owmpasswd = cfg.get("openweathermap",'passwd')
406     except:
407       owmuser = None  
408     if owmuser:
409       owm_temp = cfg.get("openweathermap",'temp')
410       owm_pres = cfg.get("openweathermap",'pres')
411       owm_humi = cfg.get("openweathermap",'humi')
412       owm_lat = cfg.get("openweathermap",'lat')
413       owm_lon = cfg.get("openweathermap",'lon')
414       owm_station = cfg.get("openweathermap",'station')
415     try:
416       alarm_script = cfg.get("alarm","exec")
417       import subprocess
418     except:
419       alarm_script = None
420         
421     reconnect()
422  
423   except:
424
425     print_log("Cannot intialize system")
426     exit() 
427   
428 if __name__ == "__main__":
429   import sys
430   reload(sys)
431   sys.setdefaultencoding('utf-8')
432   init()
433   main()