import WebSocket from 'ws' import btc from 'better-try-catch' import to from 'await-to-js' import { promisify } from 'util' import _ON_DEATH from 'death' //this is intentionally ugly import clone from 'clone' import logger from '@uci-utils/logger' let log = {} /** * Socket - a websocket that supports uci packets * @class WebSocket * @extends ws.server * */ class Socket extends WebSocket.Server { constructor(opts = {}) { opts.host = opts.host || '0.0.0.0' opts.port = opts.port || 8090 opts.clientTracking = opts.clientTracking || true super(opts) this.id = opts.id || opts.name || 'Websocket:' + new Date().getTime() this.opts = opts // for use to recover from selected errors //self bindings this._listen = this._listen.bind(this) this.create = this.create.bind(this) this.close = promisify(this.close).bind(this) log = logger({ file: 'src/socket.js', class: 'Socket', name: 'websocket', id: this.id }) } // end constructor async create() { return new Promise((resolve, reject) => { _ON_DEATH(async () => { log.error({method:'create', line:39, msg:'\nhe\'s dead jim'}) await this._destroy() }) process.once('SIGUSR2', async () => { await this._destroy process.kill(process.pid, 'SIGUSR2') }) this.on('error', async err => { log.error({method:'creaete', line:48, err:err, msg:'socket server error'}) reject(err) }) this.on('listening', async () => { this._listen() log.info({method:'create', line:54, msg:'websocket server created and listening at', address:this.address()}) resolve( `websocket ready and listening at ${this.address().address}:${ this.address().port }` ) }) }) } // end create registerPacketProcessor(func) { this._packetProcess = func } _listen() { this.on('connection', async (socket, req) => { let send = this._send.bind(socket) log.debug({method:'_listen', line:71, req: req, msg: 'new consumer connecting'}) socket.address = req.remoteAddress socket.on('error', (err) => { log.error({msg:'client connection error during listen',error:err}) }) socket.on('close', (msg) => { log.warn({msg:'client connection closed during listen',error:msg}) }) socket.on('message', messageProcess.bind(this, socket)) async function messageProcess(client, strPacket) { log.debug({method:'_listen', line:76, packet: strPacket, msg:' incoming packet on web socket side'}) let res = {} let [err, packet] = btc(JSON.parse)(strPacket) log.debug({method:'_listen', line:79, error:err, packet:packet, msg:'parsed websocket packet'}) if (err) { res = { error: `Could not parse JSON: ${packet}` } } else { if (packet.clientID) { client.ID = packet.clientID res.cmd = 'ackID' } else { res = (await this._packetProcess(clone(packet))) || {} if (Object.keys(res).length === 0) res = { error: 'socket packet command function likely did not return a promise', packet: packet } } } if (packet) { res._header = clone(packet._header, false) || {} //make sure return packet has header with id in case it was removed in processing delete packet._header // remove before adding to response header as request } else res._header = {} res._header.request = clone(packet, false) res._header.responder = { name: this.name, instanceID: this.id } res._header.socket = this.address() if (!res.cmd) res.cmd = 'reply' // by default return command is 'reply' let sres = await send(res) log.debug({method:'_listen', line:105, packet: res, sendResponse:sres, msg:'packet reply send back to client'}) } }) // end connected consumer log.debug('socket created') } // end listen async _destroy() { log.debug({method:'_listen', line:105, msg:'closing down socket'}) await this.close() log.debug({method:'_listen', line:105, msg:'all connections closed....exiting'}) process.exit() } // default packet process, just a simple echo - replace async _packetProcess(packet) { return new Promise(resolve => { resolve(packet) }) } async push(packet, id) { packet._header = { id: id || 'pushed' } this.clients.forEach(async client => { if (client.readyState === WebSocket.OPEN) { if (!id || id === client.ID) await this._send.bind(client)(packet) } }) } // must have a consumer socket instance bound to call this function async _send(packet) { if (this.readyState !== 1) log.error({method:'_send', line:147, msg:`Connection not Ready, CODE:${this.readyState}`}) else { let [err, message] = btc(JSON.stringify)(packet) if (err) log.error({method:'_send', line:147, msg:`WS Could not JSON stringify: ${packet}`}) else { try { this.send(message) } catch(err) { log.error({method:'_send', line:153, msg:'error sending from ws server', error:err}) } } } } } // end class export default Socket