import WebSocket from 'ws' import btc from 'better-try-catch' import _ON_DEATH from 'death' //this is intentionally ugly import clone from 'clone' import logger from '@uci-utils/logger' let log = {} /** * Socket - a websocket that supports uci packets * @class WebSocket * @extends ws.server * */ class Socket extends WebSocket.Server { constructor(opts = {}) { opts.host = opts.host || '0.0.0.0' opts.port = opts.port || 8090 opts.clientTracking = opts.clientTracking || true super(opts) this.id = opts.id || opts.name || 'Websocket:' + new Date().getTime() this.opts = opts // for use to recover from selected errors //self bindings this._listen = this._listen.bind(this) this.create = this.create.bind(this) log = logger({ file: 'src/socket.js', class: 'Socket', name: 'websocket', id: this.id }) } // end constructor async create() { return new Promise((resolve, reject) => { _ON_DEATH(async () => { log.info('\nhe\'s dead jim') await this._destroy() }) process.once('SIGUSR2', async () => { await this._destroy process.kill(process.pid, 'SIGUSR2') }) this.on('error', async err => { log.fatal(err, 'socket server error') console.error(err, 'socket server error') reject(err) }) this.on('listening', async () => { this._listen() log.info('websocket server created and listening at', this.address()) resolve( `websocket ready and listening at ${this.address().address}:${ this.address().port }` ) }) }) } // end create registerPacketProcessor(func) { this._packetProcess = func } _listen() { this.on('connection', async (socket, req) => { let send = this._send.bind(socket) log.info({ req: req }, 'new consumer connecting') socket.address = req.remoteAddress socket.on('message', messageProcess.bind(this, socket)) async function messageProcess(client, strPacket) { log.info({ packet: strPacket }, ' incoming packet on web socket side') let res = {} let [err, packet] = btc(JSON.parse)(strPacket) log.info('packet', err, packet) if (err) { res = { error: `Could not parse JSON: ${packet}` } } else { if (packet.clientID) { client.ID = packet.clientID res.cmd = 'ackID' } else { res = (await this._packetProcess(clone(packet))) || {} if (Object.keys(res).length === 0) res = { error: 'socket packet command function likely did not return a promise', packet: packet } } } if (packet) { res._header = clone(packet._header, false) || {} //make sure return packet has header with id in case it was removed in processing delete packet._header // remove before adding to response header as request } else res._header = {} res._header.request = clone(packet, false) res._header.responder = { name: this.name, instanceID: this.id } res._header.socket = this.address() if (!res.cmd) res.cmd = 'reply' // by default return command is 'reply' log.info({ packet: res }, await send(res)) } }) // end connected consumer log.info('socket created') } // end listen async _destroy() { log.info('closing down socket') await this.close() log.info('all connections closed....exiting') process.exit() } // default packet process, just a simple echo - replace async _packetProcess(packet) { return new Promise(resolve => { resolve(packet) }) } async push(packet, id) { packet._header = { id: id || 'pushed' } this.clients.forEach(async client => { if (client.readyState === WebSocket.OPEN) { if (!id || id === client.ID) await this._send.bind(client)(packet) } }) } // must have a consumer socket instance bound to call this function async _send(packet) { return new Promise((resolve, reject) => { if (this.readyState !== 1) reject(`Connection not Ready, CODE:${this.readyState}`) let [err, message] = btc(JSON.stringify)(packet) if (err) reject(`Could not JSON stringify: ${packet}`) this.send(message) resolve('sent packet') }) } } // end class export default Socket