uci-websocket-client/src/WSConsumer.js

340 lines
13 KiB
JavaScript

// Websocket is a native global for vanilla JS
/* globals WebSocket:true */
import btc from 'better-try-catch'
import EventEmitter from 'eventemitter3'
import autoBind from 'auto-bind'
import pause from 'delay'
/**
* Web Socket Consumer - An in browser consumer/client that can communicate via UCI packets
* extends {@link https://github.com/primus/eventemitter3 event emitter 3} an in browser event emitter
* uses the browser built in vanilla js global {@link https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/WebSocket Websocket client class}
* @extends EventEmitter
*/
class WSConsumer extends EventEmitter {
/**
* constructor - Description
*
* @param {type} url URL of UCI websocket server
*
*/
constructor (url, opts = {}) {
super()
this.name = opts.name || 'browser'
this.instanceID = new Date().getTime()
this.url = url
this.initTimeout = opts.initTimeout!=null ? opts.initTimeout * 1000 : 60000
this.retryWait = opts.retryWait!=null ? opts.retryWait * 1000 : 5000
this.protocol = opts.protocol // available if needed but not documented
this.opts = opts
this._connected = false
this._authenticated = false
this._conAttempt = 1
this._aborted = false
this._reconnect = false
autoBind(this)
}
// TODO refactor like for tcp socket
// emit connection events
/**
* connect - After instantiating this must be called to connect to a UCI WesbSocket Sever
* @required
* as coded will reconnect and set up listeners again if the socket gets closed
* commented code work
* but opted to use https://www.npmjs.com/package/reconnecting-websocket
*/
async connect (opts={}) {
this._connected = false
this._authenticated = false
this._conAttempt = 1
this.url = opts.url || this.url
this.name = opts.name || this.name
this.initTimeout = opts.initTimeout!=null ? opts.initTimeout * 1000 : this.initTimeout
this.retryWait = opts.retryWait!=null ? opts.retryWait * 1000 : this.retryWait
return new Promise((resolve, reject) => {
if(!this.url) reject('no url provided!')
if(this._authenticated) resolve('socket already online')
this.emit('status',{level:30, msg:'attempting an initial connection', id:this.id, opts:this.opts, ready:false})
let initTimeout = {}
if (this.initTimeout > 499) { // if not set above 500 ms then infinite tries
initTimeout = setTimeout(() => {
this._aborted = true
this.socket.onopen = null // same as 'connect' for regular socket
this.socket.onerror = null
this.emit('status',{level:30, msg:'initial connection timed out', id:this.id, timeout:this.initTimeout, wait:this.retryWait, opts:this.opts, ready:false})
reject({ opts: this.opts, msg: `unable to connect initially to socket server in ${this.initTimeout/1000} secs, giving up no more attempts`})
}
, this.initTimeout)
}
const initialConnectHandler = async () => {
this.emit('status',{level:30, msg:'initial connect handler', _clear:true})
this.socket.onmessage =initialHandshake.bind(this)
}
const initialHandshake = async (event) => {
this.emit('status',{level:30, msg:'initial handshake'})
let packet = JSON.parse(event.data)
if (packet._handshake) {
clearTimeout(initTimeout)
this._connected = true
this.emit('status',{level:30, msg:'connected to server, sending authentification', id:this.id, opts:this.opts, connected:this._authenticated})
let authPacket = this._authenticate() || {}
authPacket._authenticate = true
authPacket.consumerName = this.name
let [err,res] = await btc(this._authenticateSend)(authPacket)
if (err) reject(err)
if (!res.authenticated) {
this.emit('status',{level:60, msg:`authentication failed: ${res.reason}`, id:this.id, opts:this.opts, connected:this._authenticated })
reject('unable to authenticate')
}
else {
this._authenticated = res.authenticated
this.emit('status',{level:30, success:true, msg:'authentication succesful', id:this.id, opts:this.opts, connected:this._authenticated })
this._listen()
this.emit('consumer-connection', {state:'connected', id:this.id})
if (this.opts.conPacket) (this.send(this.conPacket))
resolve('initial connection successful')
}
}
}
const initialErrorHandler = async (err) => {
this.socket.onopen = null
this.socket.onerror = null
if (!this._aborted) {
this.emit('status',{level:'error', msg:`error during initial connect, trying again in ${this.retryWait/1000} secs`, err:err, id:this.id, connected:this._authenticated })
await pause(this.retryWait)
connect() // reconnect on error
}
}
const connect = async () => {
let [err] = await btc(this.disconnect)()
if (!err) {
this.socket = new WebSocket(this.url) //, this.protocol)
this.socket.onopen = initialConnectHandler.bind(this) // same as 'connect' for regular socket
this.socket.onerror = initialErrorHandler.bind(this)
}
} // end connect
connect() // get the ball rolling
}) // end promise
}
async disconnect () {
return new Promise((resolve, reject) => {
clearTimeout(this.pingTimeout)
this.removeAllListeners('ping')
if (!this.socket) { resolve ('no socket, nothing to close');return }
if (this.socket.readyState === WebSocket.CLOSED || this.socket.readyState === WebSocket.CLOSING) {
this._connected = false
this._authenticated = false
this.emit('status', {level:'trace', msg:'disconnecting - socket is already closed/closing'})
resolve('socket already closed')
return
}
this.socket.close()
const timeout = setTimeout(() => {
clearInterval(wait)
this.emit('status', {level:40, msg:'Unable to disconnect in 5 seconds!'})
reject('unable to close socket in 5 seconds')
},5000)
const wait = setInterval(() => {
if (this.socket.readyState == WebSocket.CLOSED) {
clearInterval(wait)
clearTimeout(timeout)
this._connected = false
this._authenticated = false
this.emit('status', {level:40, msg:'Socket been closed/disconnected', connected:this._authenticated})
resolve('socket is now closed')
}
},500)
})
}
/**
* listen - Description
*
* @param {type} func Description
*
* @returns {type} Description
*/
_listen () {
const reconnect = async (reason) => {
let [err] = await btc(this.disconnect)()
if (err) {
this.emit('status',{level:'fatal', msg:'unable to close current connection - reconnection attempts aborted'})
} else {
this.emit('status',{level:'error', msg:`connection failed because ${reason}. attempting reconnect in ${this.retryWait/1000} secs`})
this.emit('consumer-connection', {state:'disconnected', id:this.id})
await pause(this.retryWait)
this.removeListener('pushed',pushedHandler)
this.removeListener('ping',pingHandler)
this.socket = new WebSocket(this.url) //, this.protocol)
this.socket.onopen = connectHandler.bind(this) // same as 'connect' for regular socket
this.socket.onerror = errorHandler.bind(this)
}
} // end reconnect
const connectHandler = async () => {
this.emit('status',{level:30, msg:'starting reconnect', _clear:true})
this.socket.onmessage =handshake.bind(this)
}
const handshake = async (event) => {
this.emit('status',{level:30, msg:'handshake/authenticate'})
let packet = JSON.parse(event.data)
if (packet._handshake) {
this._connected = true
this.emit('status',{level:30, msg:'connected to server, sending authentification', id:this.id, opts:this.opts, connected:this._authenticated })
let authPacket = this._authenticate() || {}
authPacket._authenticate = true
authPacket.consumerName = this.name
let [err,res] = await btc(this._authenticateSend)(authPacket)
if (err) this.socket.emit('error','authentication send failed')
if (!res.authenticated) {
this.emit('status',{level:60, msg:`authentication failed: ${res.reason}`, id:this.id, opts:this.opts, connected:this._authenticated})
reconnect.call(this, 'authentication failed')
}
else {
this._authenticated = res.authenticated
this.emit('status',{level:30, success:true, msg:'authentication succesful - reconnected', id:this.id, opts:this.opts, connected:this._authenticated})
this._listen()
this.emit('consumer-connection', {state:'reconnected', id:this.id})
if (this.opts.conPacket) (this.send(this.conPacket))
}
}
}
const errorHandler = async () => { // all reconnects go through here
this.emit('status',{level:50, msg:'error with socket connection', _clear:true, connected: false})
reconnect('emitted connection error') // reconnect on error
}
function monitorPing () {
this.pingTimeout = setTimeout( () => {
this.removeAllListeners('ping')
this.emit('status',{level:40, msg:'failed to receive ping - websocket offline', connected:false })
reconnect('ping not received in time')
},this._pingTimeout)
}
const pingHandler = async (packet) => {
clearTimeout(this.pingTimeout)
this.emit('status',{level:'trace', msg:'received ping - resetting timeout' })
this._pingTimeout= packet.pingInterval + 1000
monitorPing.call(this)
}
const packetHandler = (event) => {
let [err, packet] = btc(JSON.parse)(event.data)
if (err) this.emit('status', {'level':'error', msg: `Incoming message - could not parse JSON: ${event.data}` })
else {
if (packet._header) {
if (packet._header.id !=='ping') this.emit('status', {'level':'debug', msg: `Incoming message - ${packet._header.id}` })
this.emit(packet._header.id,packet)
}
}
}
const pushedHandler = async (packet) => {
// TODO do some extra security here for 'evil' pushed packets
let res = await this._packetProcess(packet)
if (!res) {
this.emit('status',{level:40, msg:`consumer process function ${packet.cmd} was not promise returning`})
}
}
// set the main message handler and ping handler
this.on('pushed',pushedHandler)
this.socket.onmessage = packetHandler
this.on('ping',pingHandler)
} // end listen
/**
* send - Description
*
* @param {type} packet Description
*
* @returns {type} Description
*/
async send (packet) {
return new Promise((resolve, reject) => {
if (!this._connected) reject('Unable to send not connected')
packet._header = {
id: Math.random()
.toString()
.slice(2), // need this for when multiple sends for different consumers use same packet instanceack
sender: { name: this.name, instanceID: this.instanceID },
url: this.url
}
let [err, message] = btc(JSON.stringify)(packet)
if (err) reject(`Could not JSON stringify: ${packet}`)
this.socket.send(message)
this.once(packet._header.id, async function (reply) {
let res = await this._packetProcess(reply)
if (!res) {
res = reply
this.emit('status',{level:40, msg:`consumer process function ${packet.cmd} was not promise returning`})
}
resolve(res)
}) // end reply listener
})
}
/**
* registerPacketProcessor - attaches the passed packet function as the one to process incoming packets
* the funcion must take a uci packet object and return a promise whose resolution should be a uci packet if further communication is desir
*
* @param {function} func function to do the incoming packet processing
*
*/
registerPacketProcessor (func) {
this._packetProcess = func
}
// do nothing
async _packetProcess (packet) {
return Promise.resolve(packet)
}
// default authentication using a simple token
_authenticate () {
return { token: process.env.UCI_CLIENT_TOKEN || this.token || 'default' }
}
async _authenticateSend (authPacket={}) {
return new Promise(async (resolve, reject) => {
setTimeout(() => {reject({ error: 'no response from socket in 10sec' })}, 10000)
let [err, data] = await btc(JSON.stringify)(authPacket)
if (err) reject('unable to stringify authorization packet')
this.socket.onmessage = event => {
let [err,packet] = btc(JSON.parse)(event.data)
if (err) reject('unable to parse authorization return from server')
resolve (packet)
}
this.socket.send(data)
})
}
} // end Consumer Class
export default WSConsumer