Source code for pynuance.websocket

"""Module defining abstractWebsocket class"""

import asyncio
import base64
import binascii
import email
import hashlib
import hmac
import json
import os
import datetime
import urllib.parse

import aiohttp
try:
    from aiohttp import websocket
except ImportError:
    from aiohttp import _ws_impl as websocket


# This is a fixed string (constant), used in the Websockets protocol handshake
# in order to establish a conversation
WS_KEY = b"258EAFA5-E914-47DA-95CA-C5AB0DC85B11"


[docs]def connection_handshake(client): """Nuance connection handshake. Use for STT and NLU audio. """ client.send_message({ 'message': 'query_parameter', 'transaction_id': 123, 'parameter_name': 'AUDIO_INFO', 'parameter_type': 'audio', 'audio_id': 456 }) client.send_message({ 'message': 'query_end', 'transaction_id': 123, }) client.send_message({ 'message': 'audio', 'audio_id': 456, })
[docs]class AbstractWebsocketConnection(object): # pylint: disable=R0801 """WebSocket connection object to handle Nuance server communications""" MSG_JSON = 1 MSG_AUDIO = 2 def __init__(self, url, logger): self.url = url self.logger = logger self.connection = None self.response = None self.stream = None self.writer = None @asyncio.coroutine
[docs] def connect(self, app_id, app_key, use_plaintext=True): """Connect to the websocket""" raise NotImplementedError
@asyncio.coroutine
[docs] def receive(self): """Handle server response""" wsmsg = yield from self.stream.read() if wsmsg.tp == 1: return (self.MSG_JSON, json.loads(wsmsg.data)) return (self.MSG_AUDIO, wsmsg.data)
[docs] def send_message(self, msg): """Send json message to the server""" self.logger.debug(msg) self.writer.send(json.dumps(msg))
[docs] def send_audio(self, audio): """Send audio to the server""" self.writer.send(audio, binary=True)
[docs] def close(self): """Close WebSocket connection""" self.writer.close() self.response.close() self.connection.close()
@staticmethod def _handle_response_101(response): """handle response""" info = "%s %s\n" % (response.status, response.reason) for (key, val) in response.headers.items(): info += '%s: %s\n' % (key, val) info += '\n%s' % (yield from response.read()).decode('utf-8') if response.status == 401: raise RuntimeError("Authorization failure:\n%s" % info) elif response.status >= 500 and response.status < 600: raise RuntimeError("Server error:\n%s" % info) elif response.headers.get('upgrade', '').lower() != 'websocket': raise ValueError("Handshake error - Invalid upgrade header") elif response.headers.get('connection', '').lower() != 'upgrade': raise ValueError("Handshake error - Invalid connection header") else: raise ValueError("Handshake error: Invalid response status:\n%s" % info) def _handshake(self, response, sec_key): """Websocket handshake""" # Using WS_KEY in handshake key = response.headers.get('sec-websocket-accept', '').encode() match = base64.b64encode(hashlib.sha1(sec_key + WS_KEY).digest()) if key != match: raise ValueError("Handshake error - Invalid challenge response") # switch to websocket protocol self.connection = response.connection self.stream = self.connection.reader.set_parser(websocket.WebSocketParser) self.writer = websocket.WebSocketWriter(self.connection.writer) self.response = response
[docs]class BadWebsocketConnection(AbstractWebsocketConnection): """WebSocket connection object to handle Nuance server communications""" def __init__(self, url, logger): AbstractWebsocketConnection.__init__(self, url, logger) @asyncio.coroutine
[docs] def connect(self, app_id, app_key, use_plaintext=True): """Connect to the server""" sec_key = base64.b64encode(os.urandom(16)) params = {'app_id': app_id, 'algorithm': 'key', 'app_key': binascii.hexlify(app_key)} response = yield from aiohttp.request('get', self.url + '?' + urllib.parse.urlencode(params), headers={'UPGRADE': 'WebSocket', 'CONNECTION': 'Upgrade', 'SEC-WEBSOCKET-VERSION': '13', 'SEC-WEBSOCKET-KEY': sec_key.decode(), }) if response.status != 101: self._handle_response_101(response) self._handshake(response, sec_key)
[docs]class WebsocketConnection(AbstractWebsocketConnection): """Websocket client""" def __init__(self, url, logger): AbstractWebsocketConnection.__init__(self, url, logger) @asyncio.coroutine
[docs] def connect(self, app_id, app_key, use_plaintext=True): """Connect to the websocket""" date = datetime.datetime.utcnow() sec_key = base64.b64encode(os.urandom(16)) if use_plaintext: params = { 'app_id': app_id, 'algorithm': 'key', 'app_key': binascii.hexlify(app_key), } else: datestr = date.replace(microsecond=0).isoformat() params = { 'date': datestr, 'app_id': app_id, 'algorithm': 'HMAC-SHA-256', 'signature': self.sign_credentials(datestr, app_key, app_id), } response = yield from aiohttp.request( 'get', self.url + '?' + urllib.parse.urlencode(params), headers={ 'UPGRADE': 'WebSocket', 'CONNECTION': 'Upgrade', 'SEC-WEBSOCKET-VERSION': '13', 'SEC-WEBSOCKET-KEY': sec_key.decode(), }) if response.status == 401 and not use_plaintext: if 'Date' in response.headers: server_date = email.utils.parsedate_to_datetime(response.headers['Date']) if server_date.tzinfo is not None: server_date = (server_date - server_date.utcoffset()).replace(tzinfo=None) else: server_date = yield from response.read() server_date = datetime.datetime.strptime(server_date[:19].decode('ascii'), "%Y-%m-%dT%H:%M:%S") # Use delta on future requests date_delta = server_date - date print("Retrying authorization (delta=%s)" % date_delta) datestr = (date + date_delta).replace(microsecond=0).isoformat() params = { 'date': datestr, 'algorithm': 'HMAC-SHA-256', 'app_id': app_id, 'signature': self.sign_credentials(datestr, app_key, app_id), } response = yield from aiohttp.request('get', self.url + '?' + urllib.parse.urlencode(params), headers={'UPGRADE': 'WebSocket', 'CONNECTION': 'Upgrade', 'SEC-WEBSOCKET-VERSION': '13', 'SEC-WEBSOCKET-KEY': sec_key.decode(), }) if response.status != 101: self._handle_response_101(response) self._handshake(response, sec_key)
@staticmethod
[docs] def sign_credentials(datestr, app_key, app_id): """Handle credentials""" value = datestr.encode('ascii') + b' ' + app_id.encode('utf-8') return hmac.new(app_key, value, hashlib.sha256).hexdigest()