Merge branch 'master' of rvb.name:openhab-process
[openhab-process.git] / mqtt-agi / mqtt-sms
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 #
5 # Usage:
6 # AGI(/etc/asterisk/agi/mqtt,/etc/asterisk/agi/mqtt.cfg,calls/missed)
7 # or, to override the extension:
8 # AGI(/etc/asterisk/agi/mqtt,/etc/asterisk/agi/mqtt.cfg,calls/missed,+43123456789)
9 #
10
11 import sys
12 reload(sys)
13 sys.setdefaultencoding('utf-8')
14
15 from ConfigParser import ConfigParser
16 import paho.mqtt.client as paho
17 from pymessaging.sms import SmsDeliver
18
19 import json
20
21 from os import getpid,uname
22 from pprint import pprint
23
24 import sqlite3
25 from sqlite3 import Error
26
27 from os import remove,system
28
29 client_name='agi-'+uname()[1]+'-'+str(getpid())
30
31 conffile, topic, base64 = sys.argv[1:4]
32
33 config = ConfigParser()
34 config.add_section('mqtt')
35 config.add_section('sms')
36 # set defaults for anonymous auth
37 config.set('mqtt', 'username', '')
38 config.set('mqtt', 'password', '')
39 config.set('mqtt', 'port', '1883')
40 config.set('sms', 'tmpfile', '/tmp/sms.db')
41 config.read(conffile)
42
43 mqtt_server = config.get('mqtt', 'server')
44 mqtt_port = config.getint('mqtt', 'port')
45 mqtt_username = config.get('mqtt', 'username')
46 mqtt_password = config.get('mqtt', 'password')
47 sms_db = config.get('sms','tmpfile')
48
49 try:
50   conn = sqlite3.connect(sms_db)
51 except:
52   remove(sms_db)
53   conn = sqlite3.connect(sms_db)
54
55 agi = []
56 while 1:
57     line = sys.stdin.readline()
58     if not line or line == "\n":
59         break
60     agi.append(line)
61
62 agi = dict([line.rstrip('\n').replace('agi_', '', 1).split(': ', 1) for line in agi])
63
64 agi['base64'] = base64
65
66 full_text=""
67 try:
68   sms=SmsDeliver(base64.decode('base64').split('\r\n')[1])
69   if 'cnt' in sms.data:
70     sms_type='sms-multipart'
71     part_count=sms.data['cnt']
72     part_number=sms.data['seq']
73     ref=sms.data['ref']
74     agi['part-count']=part_count
75     agi['part-number']=part_number
76     agi['ref']=ref
77   else:
78     sms_type='sms'
79   agi['sms-type']=sms_type
80   sms_text=sms.data['text'].encode('utf-8')
81   agi['sms_text']=sms_text
82   if sms_type=='sms':
83     full_text=sms_text
84   else:
85     cur = conn.cursor()
86     cur.execute("create table if not exists sms_parts (ref integer,cnt integer,seq integer,datetime timestamp default current_timestamp,text varchar(255))")  
87     cur.execute("insert into sms_parts(ref,cnt,seq,text) values(%s,%s,%s,\'%s\')" % (ref,part_count,part_number,sms_text))
88     cur.execute("select count(*),cnt from sms_parts where ref=%s" % (ref))
89     received,cnt=cur.fetchone()
90     if received==cnt:
91       cur.execute("select text from sms_parts where ref=%s order by seq" % (ref))
92       full_text=''.join(rec[0] for rec in cur.fetchall())
93 #      full_text=full_text.replace("\"","")
94       agi["full_text"]=full_text
95       cur.execute("delete from sms_parts where ref=%s" % (ref))
96     conn.commit()  
97 except:
98   raise
99
100 if full_text:
101   print full_text
102   f1=open("/tmp/sms.log","w+")
103   f1.write(full_text.strip().encode('utf-8'))
104   f1.close()
105   
106 def agi_exit(rc, *args):
107     if rc != 0:
108         print "VERBOSE rc=%s %s" % (rc, args)
109     sys.exit(rc)
110
111 def on_connect(mosq, rc, *args):
112     if rc != 0:
113         agi_exit(1, "Connection failed: %d" % rc)
114
115 def on_publish(mosq, *args):
116     # done
117     agi_exit(0)
118
119 client = paho.Client(client_name)
120 client.username_pw_set(mqtt_username, mqtt_password)
121 client.connect(mqtt_server, port=mqtt_port)
122 client.on_connect = on_connect
123 client.on_publish = on_publish
124
125 client.publish(topic, payload=json.dumps(agi))
126 client.loop()
127 agi_exit(1, "Message publish timed out")