import { Server as WSS } from 'ws' import { Server } from 'http' import btc from 'better-try-catch' import pReflect from 'p-reflect' import { promisify } from 'util' import _ON_DEATH from 'death' //this is intentionally ugly import clone from 'clone' import logger from '@uci-utils/logger' let log = {} /** * Socket - Description * @extends Server */ class Socket extends Server { /** * constructor - Description * * @param {object} [opts={}] Description * * @returns {type} Description */ constructor(opts = {}) { super(opts) opts.host = opts.host || '0.0.0.0' opts.port = opts.port || 8090 this.id = opts.id || opts.name || 'Websocket:' + new Date().getTime() this.opts = opts // for use to recover from selected errors this.wss = {} // web socket instance goes here this.allowAnonymous = (!opts.tokens || !!process.env.UCI_ANON || opts.allowAnonymous) ? true : false this.tokens = opts.tokens || [] this.pingInterval = opts.pingInterval === false ? opts.pingInterval : (opts.pingInterval * 1000 || 5000) this.nextconsumerID = 0 // incrementer for default initial consumer ID this.consumers = new Map() this.conPackets = opts.conPackets || [opts.conPacket] this.errors = [] this.errorCount = 0 this.create = this.create.bind(this) this._destroy = this._destroy.bind(this) this._listening=false this.authenticateconsumer = this.authenticateconsumer.bind(this) this._authenticate = this._authenticate.bind(this) this.close = promisify(this.close).bind(this) log = logger({ package:'@uci/websocket', file: 'src/socket.js', class: 'Socket', name: 'websocket', id: this.id }) } // end constructor get active() { return this._listening } /** * create - Description * * @returns {type} Description */ async create() { this.emit('socket',{state:'creating', msg:'creating socket for consumers to connect'}) return new Promise((resolve, reject) => { _ON_DEATH(async () => { log.error({method:'create', line:51, msg:'\nhe\'s dead jim'}) await this._destroy() }) // kills nodemon properly process.once('SIGUSR2', async () => { await this._destroy() process.kill(process.pid, 'SIGUSR2') }) this.once('error', async err => { log.error({method:'creaete', line:48, err:err, msg:'http server error'}) reject(err) }) this.once('listening', () => { log.info({method:'create', line:54, msg:'websocket server created and listening at', address:this.address()}) // emit ready this.on('error', err => { this.errorCount +=1 // log errors here this.errors.push(err) if(this.errorCount>2) { this.emit('log', {level:'warn',msg:'something bad maybe going on, 3 errors', errors:this.errors}) this.emit('socket',{state:'error', msg:'2 to 5 socket errors', errors:this.errors}) } if(this.errorCount>5) { let errors = {level:'fatal',msg:'something fatal is going on, 6 errors', errors:this.errors} log.fatal(errors) this._listening=false this.close(() => { this.emit('socket',{state:'offline', msg:'too many socket errors no longer listening for consumers to connect'}) }) this.emit('log', {active:this.active}) this.emit('log', errors) } }) this.wss = new WSS({server:this}) this.wss.on('error', err => {this.emit('error', err)}) // bubble up errors this.wss.on('connection', this._connectionHandler.bind(this)) this._listening=true let msg = `socket ready and listening ${typeof this.address() ==='string' ? `at ${this.address()}` : `on port ${this.address().port}`}` this.emit('log',{active:this.active}) this.emit('socket',{state:'listening', msg:msg}) resolve(`websocket ready and listening at ${this.address().address}:${this.address().port}`) }) super.listen(this.opts) }) } // end create registerPacketProcessor(func) { this._packetProcess = func.bind(this) } addTokens(tokens) { if (typeof tokens ==='string'){ tokens = tokens.split(',') } this.tokens = this.tokens.concat(tokens) if (this.tokens.length>0) this.allowAnonymous = false } removeTokens(tokens) { if (typeof tokens ==='string'){ if (tokens === 'all') { this.tokens = [] this.allowAnonymous = true return } tokens = tokens.split(',') } this.tokens = this.tokens.filter(token => !tokens.includes(token)) if (this.tokens.length===0) { log.warn({msg:'all tokens have been removed, switching to allow anonymous connections'}) this.allowAnonymous = true } } registerTokenValidator (func) { this.allowAnonymous = false this._validateToken = func } registerAuthenticator (func) { this.allowAnonymous = false this._authenticate = func } /** * push * * @param {Object} packet - this is the parameter packet * @param {} id - this is the parameter id * */ async push(packet={},opts={}) { if (this.consumers.size > 0) { packet._header = {id: opts.packetId || 'pushed'} let consumers = [] if ( opts.consumers || opts.consumer ) { if (opts.consumer) opts.consumers = [opts.consumer] consumers = Array.from(this.consumers).filter(([sid,consumer]) => opts.consumers.some(con=> { console.log(consumer.sid,consumer.data,con) return ( con === ( (consumer.data ||{}).name || (consumer.data ||{}).id ) || con.sid=== sid || con.name === (consumer.data ||{}).name || con.id === (consumer.data ||{}).id ) } ) ).map(con=>con[1]) // console.log('custom consumers',consumers.length) } else consumers = Array.from(this.consumers.values()) consumers = consumers.filter(consumer=>consumer.writable||consumer.readyState===1) const send = consumer => this._send(consumer,packet) const res = await Promise.all(consumers.map(send).map(pReflect)) const success = res.filter(result => result.isFulfilled).map((result,index) => [consumers[index].name,result.value]) const errors =res.filter(result => result.isRejected).map((result,index) => [consumers[index].name,result.reason]) this.emit('log',{level:errors.length? 'error': packet._header.id==='ping'?'trace':'debug', msg:'packet was pushed', socket:this.name||this.id, errors:errors, packet:packet, success:success, headerId:packet._header.id}) } else { this.emit('log',{level:'debug', msg:'no connected consumers packet push ignored', packet:packet}) } } removeconsumer (id) { let consumer = this.consumers.get(id) this.emit('connection:consumer',{state:'disconnected', name:consumer.name}) clearInterval(consumer._ping) consumer.removeAllListeners() log.warn({msg:`consumer ${id}:${consumer.name} removed from server tracking`, id:id, name:consumer.name, curconsumerCount:this.consumers.size}) this.consumers.delete(id) } async authenticateconsumer(consumer) { // let server = this return new Promise(async function(resolve, reject) { // when consumer gets the handshake they must follow with authentication consumer.on('message', authenticate.bind(this,consumer)) let [err] = await btc(this._send)(consumer,{_handshake: true, id:consumer.id}) if (err) { log.error({msg:'error in handshake send', error:err}) reject(err) } async function authenticate (consumer,message) { let [err,packet] = btc(JSON.parse)(message) if (err) reject('unable to parse authentication packet from consumer') log.debug({msg:`authentication packet from consumer ${consumer.id}`, packet:packet}) if (!packet._authenticate) reject('first consumer packet was not authentication') else { let [err, res] = await btc(this._authenticate)(packet) consumer.authenticated = this.allowAnonymous ? 'anonymous' : (err ? false : res) consumer.name = packet.consumerName packet.authenticated = consumer.authenticated if (err && !this.allowAnonymous) packet.reason = err log.debug({msg:'sending authorization result to consumer', packet:packet}) await this._send(consumer,packet) // send either way if (packet.reason) { log.info({msg:'consumer authentication failed', consumer:consumer.name, consumer_id:consumer.id, reason:err}) reject(packet.reason) } else { log.info({msg:'consumer authenticated successfuly', consumer:consumer.name, consumer_id:consumer.id}) if (this.allowAnonymous) log.warn({msg:'web consumer connected anonymously', consumer:consumer.name, consumer_id:consumer.id}) resolve(consumer.authenticated) } } } }.bind(this)) } // private methods // default validator _validateToken (token) { if (token) return this.tokens.includes(token) return false } // default authenticator - reject value should be reason which is returned to consumer async _authenticate (packet) { if (!this._validateToken(packet.token)) return Promise.reject('invalid token') return true } async _connectionHandler(consumer) { log.debug({method:'_connectionHandler', line:76, msg: 'new web consumer connecting'}) consumer.id = ++this.nextconsumerID // server assigned ID this.consumers.set(consumer.id,consumer) consumer.authenticated = false consumer.connected = true // add listeners const consumerCloseHandler = (id) => { log.warn({msg:'consumer connection closed during listen,',id:id}) this.removeconsumer(id) } consumer.on('close', consumerCloseHandler.bind(this,consumer.id)) consumer.on('error', (err) => { log.error({msg:'consumer connection error during listen',error:err}) // TODO do more handling than just logging }) let [err] = await btc(this.authenticateconsumer)(consumer) if (!this.allowAnonymous) { if (err) { consumer.close()// abort new connection consumer, cleanup, remove listeners return } } if (this.conPackets.length) { for (let packet of this.conPackets) { packet._header = {type:'on connection packet', id: 'pushed'} await this._send(consumer,packet) // send a packet command on to consumer on connection } } this.emit('connection:consumer',{state:'connected', name:consumer.name, id:consumer.id}) consumer._ping = setInterval( () => { consumer.ping(JSON.stringify({pingInterval:this.pingInterval})) this._send(consumer,{_header:{id:'ping'},pingInterval:this.pingInterval}) },this.pingInterval) consumer.on('message', messageProcess.bind(this, consumer)) async function messageProcess(consumer, strPacket) { log.debug({method:'_listen', line:76, packet: strPacket, msg:' incoming packet from web consumer'}) let res = {} let [err, packet] = btc(JSON.parse)(strPacket) if (err) { res = { error: 'Could not JSON parse packet', packet:strPacket } } else { log.debug({method:'_listen', line:266, packet:packet, msg:'parsed packet'}) res = (await this._packetProcess(clone(packet))) || {} if (Object.keys(res).length === 0) res = { error: 'consumer packet command function likely did not return a promise', packet: packet } if (packet) { res._header = clone(packet._header, false) || {} //make sure return packet has header with id in case it was removed in processing delete packet._header // remove before adding to response header as request } else res._header = {} res._header.request = clone(packet, false) res._header.responder = { name: this.name, instanceID: this.id } res._header.socket = this.address() if (!res.cmd) res.cmd = 'reply' // by default return command is 'reply' let [err] = await btc(this._send)(consumer,res) if (err) log.error({msg:err, error:err}) } } // end message process } // end connection Handler async _destroy() { log.debug({method:'_destroy', line:302, msg:'closing down http server and attached sockets', port:this.port}) this.consumers.forEach(consumer => { consumer.terminate() consumer.emit('close') }) await this.close() log.debug({method:'_listen', line:105, msg:'all connections closed, all consumer sockets deleted....exiting'}) process.exit() } // default packet process, just a simple echo - replace async _packetProcess(packet) { return new Promise(resolve => { resolve(packet) }) } async _send(consumer, packet) { log.debug({msg:`sending to client:${consumer.id}:${consumer.name}`, packet:packet}) return new Promise(async (resolve, reject) => { // if (!consumer._connected) reject('can not send no connection') let [err, message] = btc(JSON.stringify)(packet) if (err) reject('unable to serialze the packet') consumer.send(message,(err) => { if (!err) resolve('packet written to socket stream successfully') else reject(`error sending: ${err}`) }) }) } } // end class export default Socket