Обработка ошибок обрыва соединения.
[voicecontrol.git] / voicecontrol
1 #!/usr/bin/env python3
2
3 import websockets,asyncio
4 import sys
5 from pyaudio import PyAudio, Stream, paInt16
6 from contextlib import asynccontextmanager, contextmanager, AsyncExitStack, ExitStack
7 from typing import AsyncGenerator, Generator
8
9 from urllib.parse import urlencode, quote
10 import urllib3, base64, json
11
12 import configparser 
13 from os.path import expanduser
14 from streamp3 import MP3Decoder
15
16 from time import time, sleep
17
18 import webrtcvad
19
20 @contextmanager
21 def _pyaudio() -> Generator[PyAudio, None, None]:
22     p = PyAudio()
23     try:
24         yield p
25     finally:
26         print('Terminating PyAudio object')
27         p.terminate()
28
29 @contextmanager
30 def _pyaudio_open_stream(p: PyAudio, *args, **kwargs) -> Generator[Stream, None, None]:
31     s = p.open(*args, **kwargs)
32     try:
33         yield s
34     finally:
35         print('Closing PyAudio Stream')
36         s.close()
37
38 @asynccontextmanager
39 async def _polite_websocket(ws: websockets.WebSocketClientProtocol) -> AsyncGenerator[websockets.WebSocketClientProtocol, None]:
40     try:
41         yield ws
42     finally:
43         print('Terminating connection')
44         await ws.send('{"eof" : 1}')
45         print(await ws.recv())
46
47 def SkipSource(source,seconds):
48   global config
49   try:
50     if config["debug"]:
51       print("Skipping: ", seconds)
52     bufs = int((seconds)*source._rate/source._frames_per_buffer)
53     for i in range(bufs):
54       buffer = source.read(source._frames_per_buffer)
55   except KeyboardInterrupt:
56     raise
57   except:
58     pass
59
60 def Silence(speaker, seconds):
61   buf = bytes(speaker._frames_per_buffer)
62   bufs = int((seconds)*speaker._rate/speaker._frames_per_buffer)
63   for i in range(bufs):
64     speaker.write(buf)
65
66 def PlayBack(pyaud, text, mic = None):
67   global config, last_time
68   
69   http = urllib3.PoolManager()
70
71   playback_url = config["tts_url"]
72   playback_param = config["tts_param"]
73
74   if playback_url and text:
75
76     try:
77
78       if playback_param:
79         url = playback_url.format(urlencode({playback_param:text}))
80       else:
81         url = playback_url+quote(text)  
82
83       req = http.request('GET', url, preload_content=False)
84       decoder = MP3Decoder(req)
85
86       speaker = pyaud.open(output=True, format=paInt16, channels=decoder.num_channels, rate=decoder.sample_rate)
87       Silence(speaker, 0.3) 
88
89       for chunk in decoder:
90         speaker.write(chunk)
91
92       sleep(0.1)
93       speaker.stop_stream()
94       speaker.close()
95
96       elapsed = time() - last_time
97       last_time = time()
98
99       if mic:
100         SkipSource(mic, elapsed + 0.5)
101
102       return elapsed
103
104     except KeyboardInterrupt:
105       raise
106
107     except:
108       pass
109
110   else:
111     return 0
112
113 def RunCommand(command, pyaud, mic = None):
114
115   global config
116   
117   http = urllib3.PoolManager()
118
119   command_url = config["command_url"]
120   reply_url = config["reply_url"]
121   command_user = config["api_user"]
122   command_pwd = config["api_pwd"]
123   api_attempts = config["api_attempts"]
124
125   if command_url:
126     try:
127       if config["debug"]:
128         print('Preparing command')
129       if command_user:
130         my_headers = urllib3.util.make_headers(basic_auth=command_user+':'+command_pwd)
131       else:
132         my_headers = urllib3.util.make_headers()  
133       my_headers['Content-Type']='text/plain'
134       my_headers['Accept']='apllication/json'
135       if config["debug"]:
136         print('Sending command')
137       sent = False
138       for i in range(api_attempts):
139         try:
140           http.request('POST',command_url,headers=my_headers,body=command.encode('UTF-8'))
141           sent = True
142           break
143         except Exception as e:
144           print('Exception: '+str(e))
145           sleep(0.5)
146       if sent:
147         if config["debug"]:
148           print('Command sent')
149         if reply_url:
150           sleep(0.5)
151           res="NULL"
152           for i in range(api_attempts):
153             try:
154               if command_user:
155                 my_headers = urllib3.util.make_headers(basic_auth=command_user+':'+command_pwd)
156               else:
157                 my_headers = urllib3.util.make_headers()  
158               req=http.request('GET',reply_url,headers=my_headers).data
159               res = json.loads(req)['state'].strip()
160               if config["debug"]:
161                 print(res)
162               if not(res == 'NULL'):
163                 break
164               sleep(1)  
165             except KeyboardInterrupt:
166               raise
167             except Exception as e:
168               print('Exception: '+str(e))
169               sleep(1)
170           if res and not(res=="NULL"):
171             PlayBack(pyaud, res, mic=mic)
172           elif res=="NULL":
173             PlayBack(pyaud, "Сервер не ответил", mic=mic)  
174         if command_user:
175           my_headers = urllib3.util.make_headers(basic_auth=command_user+':'+command_pwd)
176         else:
177           my_headers = urllib3.util.make_headers()  
178         my_headers['Content-Type']='text/plain'
179         my_headers['Accept']='apllication/json'
180         command=""
181         http.request('POST',command_url, headers=my_headers, body=command.encode('UTF-8'))
182       else:
183         PlayBack(pyaud, "Сервер недоступен", mic=mic)
184     except KeyboardInterrupt:
185       raise
186     except Exception as e:
187       try:
188         print('Exception: '+str(e))
189         http.request('POST',command_url, headers=my_headers, body="")
190       except:  
191         pass
192
193 async def ListenPhrase(mic, server):
194   global config,last_time, vad
195
196   frame = 30/1000 # 30 ms
197   pause = 2
198   sz = int(mic._rate*frame)
199   sp = int(pause/frame)
200
201   try:
202
203     phrase = ""
204     voice = False
205
206     while not phrase:
207       data = mic.read(sz)
208       if len(data) == 0:
209         break
210       vd = vad.is_speech(data, mic._rate)
211       if vd and not voice:
212         voice = True
213         if config["debug"]:
214           print("+", end="")
215         cnt = 0
216       if voice and not vd:
217         cnt = cnt + 1
218         if cnt > sp:
219           cnt = 0
220           voice = False
221           if config["debug"]:
222             print("-")
223       if voice:
224         print("*",end="")
225         await server.send(data)
226         datatxt = await server.recv()
227         data = json.loads(datatxt)
228         try:
229           phrase = data["text"]
230           confidence = min(map(lambda x: x["conf"], data["result"]))
231         except:
232           pass  
233   
234     last_time = time()
235
236     return phrase, confidence
237
238   except KeyboardInterrupt:
239     raise
240   except websockets.exceptions.ConnectionClosedError:
241     raise  
242   except:
243     return '',0
244
245
246 async def main_loop(uri):
247
248   global config, last_time
249
250   keyphrase = config["keyphrase"]
251   confidence_treshold = config["confidence_treshold"]
252   rec_attempts = config["rec_attempts"]
253   commands = config["commands"]
254
255   
256   with ExitStack() as audio_stack:
257     p = audio_stack.enter_context(_pyaudio())
258     s = audio_stack.enter_context(_pyaudio_open_stream(p,
259             format = paInt16, 
260             channels = 1,
261             rate = 16000,
262             input = True, 
263             frames_per_buffer = 2000))
264
265     while True:
266       try:    
267         async with AsyncExitStack() as web_stack:
268           ws = await web_stack.enter_async_context(websockets.connect(uri))
269           print('Type Ctrl-C to exit')
270           phrases = [] + config["commands"]
271           phrases.append(config["keyphrase"])
272           phrases = json.dumps(phrases, ensure_ascii=False)
273           await ws.send('{"config" : { "phrase_list" : '+phrases+', "sample_rate" : 16000.0}}')
274
275           ws = await web_stack.enter_async_context(_polite_websocket(ws))
276           while True:
277             phrase, confidence = await ListenPhrase(s, ws)
278             if config["debug"]:
279               print(phrase,confidence)
280             if phrase == keyphrase and confidence>=confidence_treshold :
281               PlayBack(p, "Я жду команду", mic=s)
282               command = ""
283   
284               for i in range(rec_attempts):
285                 phrase, confidence = await ListenPhrase(s, ws)
286                 if config["debug"]:
287                   print(phrase,confidence)
288                 if confidence > confidence_treshold:
289                   if (not commands) or (phrase in commands):
290                     if config["debug"]:
291                       print("Command: ", phrase)
292                     command = phrase
293                     RunCommand(command, p, s)
294                     break
295                   else:
296                     PlayBack(p, "Не знаю такой команды: "+phrase, mic=s)
297                 else:
298                   PlayBack(p, "Не поняла, слишком неразборчиво", mic=s)
299
300               if not command:
301                 PlayBack(p, "Так команду и не поняла...", mic=s)
302       except KeyboardInterrupt:
303         raise
304       except Exception as e:
305         print('Exception: '+str(e))
306         pass
307              
308 def get_config(path):
309   
310   config = configparser.ConfigParser()
311   config.read(path)
312   
313   try:  
314     keyphrase = config['vosk']['keyphrase']
315   except:
316     print ("Обязательный параметр - ключевое слово - не задан!")
317     raise
318
319   try:  
320     rec_attempts = int(config['vosk']['attempts'])
321   except:
322     rec_attempts = 4
323
324   try:  
325     confidence_treshold = float(config['vosk']['confidence_treshold'])
326   except:
327     confidence_treshold = 0.4
328
329   try:
330     vosk_server = config['vosk']['server']
331   except:
332     print ("Обязательный параметр - сервер распознавания - не задан!")
333     raise
334
335   try:
336     command_file=config['commands']['command_file']
337     with open(command_file) as file:
338       commands = file.read().splitlines()
339   except:
340     commands = None
341
342   try:
343     tts_url=config['rest']['tts_url']
344   except:
345     tts_url = None
346
347   try:
348     tts_param=config['rest']['tts_param']
349   except:
350     tts_param = None
351
352   try:
353     api_attempts=int(config['rest']['attempts'])
354   except:
355     api_attempts = 2
356
357   try:  
358     api_user=config['rest']['api_user']
359     api_pwd=config['rest']['api_pwd']
360   except:
361     api_user = None
362     api_pwd = None
363
364   try:
365     command_url=config['rest']['command_url']
366   except:
367     command_url = None
368
369   try:  
370     reply_url=config['rest']['reply_url']
371   except:
372     reply_url = None  
373
374   try:  
375     vad_mode=config['vad']['agressive']
376   except:
377     vad_mode = 3
378
379   try:
380     debug = (config['system']['debug'].lower() == "true")
381   except:
382     debug = False  
383
384   if command_file:
385     with open(command_file) as file:
386       commands = file.read().splitlines()
387
388   return {
389       "asr_server": vosk_server,
390       "keyphrase": keyphrase,
391       "rec_attempts": rec_attempts,
392       "confidence_treshold": confidence_treshold,
393       "tts_url": tts_url,
394       "tts_param": tts_param,
395       "api_attempts": api_attempts,
396       "api_user": api_user,
397       "api_pwd": api_pwd,
398       "command_url": command_url,
399       "reply_url": reply_url,
400       "debug": debug,
401       "commands": commands,
402       "vad_mode": vad_mode
403     }
404
405
406 if len(sys.argv) == 2:
407     conf_file = sys.argv[1]
408 else:
409     conf_file = expanduser("~")+"/.config/voicecontrol.ini"
410
411 config = get_config(conf_file)
412
413 server = config['asr_server']
414
415 vad = webrtcvad.Vad(config['vad_mode'])
416
417 while True:
418
419   try:
420
421     loop = asyncio.get_event_loop()
422     loop.run_until_complete(
423         main_loop(f'ws://' + server))
424
425   except (Exception, KeyboardInterrupt) as e:
426     loop.run_until_complete(
427       loop.shutdown_asyncgens())
428     if isinstance(e, KeyboardInterrupt):
429       loop.stop()
430       print('Bye')
431       exit(0)
432     else:
433       print(f'Oops! {e}')
434       print('Restarting process...')
435       sleep(10)