Переход на mosquitto
[weathermon.git] / weathermon-mqtt
1 #!/usr/bin/python -u
2
3 import paho.mqtt.client as paho
4 import sys
5 from ConfigParser import ConfigParser
6 import MySQLdb
7 from pprint import pprint
8 import json
9 from dateutil.parser import parser
10 tparser=parser()
11
12 def on_message(mosq, obj, msg):
13   topic=msg.topic
14   payload=json.loads(msg.payload)
15   timestamp=tparser.parse(payload['Time'])
16   for sensor_type in payload:
17     if sensor_type != 'Time' and sensor_type != 'TempUnit':
18       sensor_data=payload[sensor_type]
19       for param in sensor_data:
20         try:
21           value=sensor_data[param]
22           try:
23             c = database.cursor()
24             c.execute('CALL meteo.submit_mqtt(%s,%s,%s,%s,NULL)', (topic,sensor_type,param,value))
25             database.commit()
26             print topic,sensor_type,param,value
27           except:
28             print "Failed to submit data"
29         except:
30           None    
31
32 def Topics():
33
34   c = database.cursor()
35   c.execute(
36     '''
37     select topic from mqtt_topics where topic<>""
38     '''
39   )
40
41   topics=c.fetchall()
42   return topics
43   
44   
45 def Init():
46
47   global client,database
48
49   conffile = sys.argv[1]
50
51   config = ConfigParser()
52   config.add_section('mqtt')
53   # set defaults for anonymous auth
54   config.set('mqtt', 'username', '')
55   config.set('mqtt', 'password', '')
56   config.set('mqtt', 'port', '1883')
57   config.read(conffile)
58
59   mqtt_server = config.get('mqtt', 'server')
60   mqtt_port = config.getint('mqtt', 'port')
61   mqtt_username = config.get('mqtt', 'username')
62   mqtt_password = config.get('mqtt', 'password')
63
64   mysql_server = config.get('mysql', 'server')
65   mysql_username = config.get('mysql','username')
66   mysql_password = config.get('mysql','password')
67   mysql_db = config.get('mysql','db')  
68   
69   client = paho.Client('weather')
70   client.username_pw_set(mqtt_username, mqtt_password)
71   client.on_message=on_message
72   client.connect(mqtt_server, port=mqtt_port)
73
74   database = MySQLdb.connect(host=mysql_server,user=mysql_username,passwd=mysql_password,db=mysql_db,use_unicode=True,connect_timeout=10)
75   database.set_character_set('utf8')
76   c = database.cursor()
77   c.execute('SET NAMES utf8;')
78
79   topics=[]
80   for topic in Topics():
81     topics.append((topic[0].encode('UTF-8'),1))
82   
83   client.subscribe(topics)
84
85 Init()
86
87 try:
88   while True:
89     try:
90       client.loop()
91     except:
92       break
93 finally:
94   exit()