From 3299e9125ea804aa26b274f850ba29e57bf98770 Mon Sep 17 00:00:00 2001 From: David Kebler Date: Fri, 23 Aug 2019 15:48:39 -0700 Subject: [PATCH] refactored _send in socket class added authentifcation with default being a token attach any passed client name to server side client socket --- examples/client.js | 46 +++++++--- examples/server.js | 53 ++++++++---- src/consumer.js | 179 +++++++++++++++++++++----------------- src/socket-class.js | 207 ++++++++++++++++++++++++++++---------------- 4 files changed, 300 insertions(+), 185 deletions(-) diff --git a/examples/client.js b/examples/client.js index eddfcb8..3d11ea2 100644 --- a/examples/client.js +++ b/examples/client.js @@ -5,29 +5,51 @@ import btc from 'better-try-catch' const client= new Consumer({path:true, name:'example-consumer', initTimeout:30 }) // This is your client handler object waiting on a message to do something -const process = async (packet) => { - console.log('packet being processed at socket') +async function processor (packet) { + // console.log('packet being processed at socket', packet) if (packet.cmd) { if (this[packet.cmd]) return await this[packet.cmd](packet) - else return {error: 'command has no processing function', packet: packet } - } return {error: 'no command in packet', packet: packet } + else { + console.log('no processing function for command', packet.cmd) + return {error: 'command has no processing function', packet: packet } + } + } + console.log('no command in packet', packet) + return {error: 'no command in packet', packet: packet } } -client.pushed = async (packet) => { +client.registerPacketProcessor(processor) + +client.reply = (packet) => { return new Promise(resolve => { - console.log('pushed packet/n',packet) + console.log('generic reply command from server\n',packet.data) resolve() }) } -client.registerPacketProcessor(process) +client.onconnect = (packet) => { + return new Promise(resolve => { + console.log('on connect server sent command processed here\n',packet.data) + resolve() + }) +} + +client.pushed = (packet) => { + return new Promise(resolve => { + console.log('pushed packet\n',packet.status) + resolve() + }) +} + + client.on('connection', event => { console.log('============ connection update ============') console.log(event.id) console.log(event.msg) - console.log(`${event.ready ? 'Consumer is connected' : 'Consumer is disconnected'}`) + console.log(`Consumer is ${event.connected ? 'connected' : 'disconnected'}`) + console.log(`Authenticated? ${event.authenticated || false}`) console.log('======================================') }) @@ -39,11 +61,11 @@ client.on('connection', event => { if (err) { console.log('error', err) } else { - console.log('ready now?',client.ready) - console.log('connect reponse',res) + console.log(res) let packet = {name: 'client', cmd:'doit', data:'sent by client'} - console.log('sending packet ', packet) - console.log('=========\n',await client.send(packet)) + console.log('=============sending a test packet=========\n', packet) + console.log('can also await relay of any send too for processing in sync manner\n',await client.send(packet)) + console.log('===============') } })().catch(err => { diff --git a/examples/server.js b/examples/server.js index 944e9b2..cfd4c66 100644 --- a/examples/server.js +++ b/examples/server.js @@ -14,18 +14,11 @@ class Test extends Socket { super(opts) } - async _packetProcess(packet) { - console.log('packet being processed at socket') - if (packet.cmd) return await this[packet.cmd](packet.data,packet.name) - return {error: 'no command in packet', packet: packet } - } - - async doit(data,name) { + async doit(packet) { return new Promise(resolve => { let res = {} - console.log('data sent to doit = ', data) + console.log('command doit sent with data = ', packet.data) res.status ='success' - res.name = name res.cmd = 'reply' res.data = 'this would be response data from socket doit function' resolve(res) @@ -34,7 +27,6 @@ class Test extends Socket { } - // const options = { // tls: TLS, // key: await fs.readFile(TLS_KEY_PATH), @@ -45,13 +37,26 @@ class Test extends Socket { // // ca: [ fs.readFileSync('client-cert.pem') ] // } -let options = {path:true, conPacket:{onconnect:'this is a packet sent to consumer as soon as it connects'}} - - -// let test = new Test() +let options = {path:true, tokens:['test'], conPacket:{cmd:'onconnect', data:'this is a packet data sent consumer after handshake/authentification'}} let test = new Test(options) - ; +async function processor (packet) { + // console.log('packet being processed at socket', packet) + if (packet.cmd) { + if (this[packet.cmd]) return await this[packet.cmd](packet) + else { + console.log('no processing function for command', packet.cmd) + return {error: 'command has no processing function', packet: packet } + } + } + console.log('no command in packet', packet) + return {error: 'no command in packet', packet: packet } +} + +test.registerPacketProcessor(processor) + + +; (async () => { // TODO dynamic import // if(TLS_KEY_PATH && TLS_CRT_PATH && TLS) { @@ -63,10 +68,20 @@ let test = new Test(options) await test.create() let count = 0 - setInterval( () => { - count++ - test.push({cmd:'pushed', count:count, status:'some pushed data'}) - },3000) + // const push = setInterval( () => { + // count++ + // test.push({cmd:'pushed', count:count, status:`some pushed data ${count}`}) + // if (count >3) { + // clearInterval(push) + // test.push({cmd:'pushed',status:'now will simulate server going offline by stopping to send pingfor 10 seconds'}) + // test.disablePing() + // setTimeout( () => { + // test.enablePing() + // },10000) + // + // } + // },3000) + // setTimeout( () => { // console.log('closing server') diff --git a/src/consumer.js b/src/consumer.js index f158233..9e37d0a 100644 --- a/src/consumer.js +++ b/src/consumer.js @@ -49,9 +49,10 @@ class SocketConsumer extends Socket { // default is keepAlive true, must set to false to explicitly disable // if keepAlive is true then consumer will also be reconnecting consumer this.initTimeout = opts.initTimeout * 1000 || 60000 - this.retryWait = opts.retryWait * 1000 || 3000 + this.retryWait = opts.retryWait * 1000 || 5000 this.keepAlive = 'keepAlive' in opts ? opts.keepAlive : true - this._ready = false + this._connected = false + this._authenticated = false this.stream = new JsonStream() // bind to class for other class functions this.connect = this.connect.bind(this) @@ -60,10 +61,10 @@ class SocketConsumer extends Socket { this._conAttempt = 1 this._aborted = false this._reconnect = false - // this._write = this._write.bind(this) + // this._packetProcess = this._packetProcess.bind(this) } - get ready() { return this._ready} + get connected() { return this._connected} async connect() { return new Promise((resolve, reject) => { @@ -74,33 +75,46 @@ class SocketConsumer extends Socket { log.debug('first connnect attempt for', this.opts.name) this.emit('connection',{msg:'attempting initial connection', id:this.id, opts:this.opts, ready:false}) - let initTimeout = setTimeout(() => { - this.emit('connection',{msg:'initial connection timed out', id:this.id, timeout:this.initTimeout, wait:this.retryWait, opts:this.opts, ready:false}) - this.removeAllListeners() - log.fatal({method:'connect', line:69, opts: this.opts, msg:`unable to initially connect to ${this.opts.name} in ${this.initTimeout/1000} secs no more attempts!`}) - this.stream.removeAllListeners() - this.destroy() - reject({ opts: this.opts, msg: `unable to connect initially to socket server in ${this.initTimeout/1000} secs, giving up no more attempts`}) + let initTimeout = {} + if (this.initTimeout > 499) { + initTimeout = setTimeout(() => { + this.emit('connection',{msg:'initial connection timed out', id:this.id, timeout:this.initTimeout, wait:this.retryWait, opts:this.opts, ready:false}) + this.removeAllListeners() + log.fatal({method:'connect', line:69, opts: this.opts, msg:`unable to initially connect to ${this.opts.name} in ${this.initTimeout/1000} secs no more attempts!`}) + this.stream.removeAllListeners() + this.destroy() + reject({ opts: this.opts, msg: `unable to connect initially to socket server in ${this.initTimeout/1000} secs, giving up no more attempts`}) + } + , this.initTimeout) } - , this.initTimeout) - const initialHandshake = async (packet) => { + if (packet._handshake) { - log.debug({method:'connect', line:87, msg:'handshake received ready for communication'}) - this.stream.removeAllListeners('message') - this.removeListener('error',initialErrorHandler) clearTimeout(initTimeout) - this._listen() // setup for active connection - this._ready = true - this.emit('connection',{msg:'initial connection succesfull', id:this.id, opts:this.opts, ready:true}) - resolve('initial connection successful') + this._connected = true + let authPacket = this._authenticate() + authPacket._authenticate = true + authPacket.clientName = this.id + let res = (await this._authenticateSend(authPacket)) || {} + console.log('auth response', res) + // if ((await this.send(authPacket)).authenticated) authenticated() + if (!res.authenticated) reject('unable to authenticate') + else { + this._authenticated = res.authenticated + this.removeListener('error',initialErrorHandler) + this._listen() // setup for active connection + log.info({method:'connect', line:87, msg:'initial connect/authentication complete ready for communication'}) + this.emit('connection',{msg:'authentication succesfull', id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected}) + resolve('initial connection successful') + } } } + const initialConnectHandler = async () => { this.on('data', this.stream.onData) - this.stream.on('message', initialHandshake.bind(this)) + this.stream.once('message', initialHandshake.bind(this)) log.debug({method:'connect', line:113, msg:'connected waiting for socket ready handshake'}) } @@ -120,7 +134,10 @@ class SocketConsumer extends Socket { async send(ipacket) { return new Promise(async resolve => { - if (!this._ready) resolve({ error: 'socket consumer not connected, aborting send' }) + if (!this._connected) { + resolve({ error: 'socket consumer not connected, aborting send' }) + return + } let packet = Object.assign({}, ipacket) // need to avoid mutuation for different consumers using same packet instance setTimeout(() => {resolve({ error: 'no response from socket in 10sec' })}, 10000) packet._header = { @@ -136,7 +153,9 @@ class SocketConsumer extends Socket { if (err) resolve({error: 'unable to serialize packet for sending',packet: packet}) await this.__write(res) + console.log(packet._header) this.once(packet._header.id, async function(reply) { + console.log('returning response', reply) let res = await this._packetProcess(reply) if (!res) { // if packetProcess was not promise res = reply @@ -154,6 +173,26 @@ class SocketConsumer extends Socket { } // PRIVATE METHODS + _authenticate () { + return { token:'test'} + } + + async _authenticateSend (authPacket) { + return new Promise(async resolve => { + setTimeout(() => {resolve({ error: 'no response from socket in 10sec' })}, 10000) + let [err, res] = await btc(this.stream.serialize)(authPacket) + if (err) + resolve({error: 'unable to serialize packet for sending',packet: authPacket}) + this.stream.on('message',(res) => { + this.stream.removeAllListeners('message') + resolve(res) + + }) + await this.__write(res) + + + }) + } // set up incoming message listening and error/reonnect handling async _listen() { @@ -161,43 +200,46 @@ class SocketConsumer extends Socket { const reconnectHandler = () => { this.stream.once('message', handshake.bind(this)) log.debug({method:'connect', line:113, msg:'connected waiting for socket ready handshake'}) + this.emit('connection',{msg:'attemping reconnect', id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected}) } const handshake = async (packet) => { if (packet._handshake) { - log.debug({method:'connect', line:87, msg:'handshake received ready for communication'}) - this.stream.on('message', messageProcess.bind(this)) - if (this.keepAlive) { // only attempt reconnect if keepAlive is set which it is by default - this.on('ping',pingHandler) - this.setKeepAlive(this.keepAlive,3000) // keep connection alive unless disabled + this._connected = true + let authPacket = this._authenticate() + authPacket._authenticate = true + authPacket.clientName = this.id + let res = (await this._authenticateSend(authPacket)) || {} + if (!res.authenticated) { + this.emit('connection',{msg:'authentification failed', id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected, res:res}) + + } + else { + this._authenticated = res.authenticated + log.info({method:'connect', line:87, msg:'initial connect/authentication complete ready for communication'}) + this.emit('connection',{msg:'authentication successful', id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected}) + if (this.keepAlive) { // only attempt reconnect if keepAlive is set which it is by default + this.on('ping',pingHandler) + this.setKeepAlive(this.keepAlive,3000) // keep connection alive unless disabled + } + this.stream.on('message', messageHandler.bind(this)) // reset default message handler + this.emit('connection',{msg:'reconnected', id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected}) } - this._ready = true - this.emit('connection','reconnected') } } const errorHandler = async (err) => { log.debug({msg:'connection error emitted ', error:err}) - this._ready = false - if (err.code !== 'EISCONN') { - this.emit('connection', err) - log.debug({ method:'connect', line:130, error: err.code, msg:`connect error ${err.code}, attempting reconnect after ${this.retryWait/1000} secs`}) - await pause(this.retryWait) - this.stream.removeAllListeners('message') // remove regular message handler in prep for reconnect - this.removeAllListeners('connect') - this.once('connect',reconnectHandler) - super.connect(this.opts) - } - else { - this._ready = true - this.emit('connection','reconnected') - log.error({method:'connect', line:69, msg:'reconnected to socket, ready to go again'}) - } - } - - async function messageProcess(packet) { - log.debug('incoming packet from socket sever',packet) - this.emit(packet._header.id, packet) + this._connected = false + this._authenticated = false + this.emit('connection',{msg:'connection(socket) error', id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected}) + log.debug({ method:'connect', line:130, error: err.code, msg:`connect error ${err.code}, attempting reconnect after ${this.retryWait/1000} secs`}) + await pause(this.retryWait) + this.stream.removeAllListeners('message') // remove regular message handler in prep for reconnect + this.removeAllListeners('connect') + this.removeAllListeners('ping') + this.once('connect',reconnectHandler) + super.connect(this.opts) } const pushHandler = async (packet) => { @@ -212,7 +254,7 @@ class SocketConsumer extends Socket { const pingHandler = async (packet) => { clearTimeout(pingTimeout) - log.info({method:'connect', line:191, msg:'received ping, restting timeout'}) + log.debug({method:'connect', line:191, msg:'received ping, restting timeout'}) this._pingTimeout= packet.pingInterval + 1000 monitorPing.call(this) } @@ -222,14 +264,20 @@ class SocketConsumer extends Socket { pingTimeout = setTimeout( () => { log.error({method:'connect', line:142, msg:'socket (server) not availabe'}) this.removeAllListeners('ping') - this._ready = false + this._connected = false this.emit('error', { code: 'PING-FAILED' }) },this._pingTimeout) } + // general handler + function messageHandler(packet) { + log.debug('incoming packet from socket sever',packet) + this.emit(packet._header.id, packet) + } + // Start Message Listening and Error/Reconnect Handling log.debug('listening for incoming packets from socket') - this.stream.on('message', messageProcess.bind(this)) + this.stream.on('message', messageHandler.bind(this)) this.setKeepAlive(this.keepAlive,3000) // keep connection alive unless disabled this.on('pushed', pushHandler ) this.on('error', errorHandler) @@ -262,30 +310,3 @@ class SocketConsumer extends Socket { } // end class export default SocketConsumer - - -// _removeListeners(events) { -// if (!events) this.removeAllListeners() -// else { -// if (typeof events === 'string') this.removeAllListeners(events) -// else events.map(event => this.removeAllListeners(event)) -// } -// } - -// Helper Functions -// wait until a passed ready function returns true -// function isReady(ready, wait = 100, timeout = 2000) { -// let time = 0 -// return new Promise((resolve, reject) => { -// (function waitReady() { -// if (time > timeout) -// return reject( -// `timeout waiting for socket ready handshake - ${timeout}ms` -// ) -// if (ready()) return resolve('ready') -// log.debug({function:'isReady', line:261, msg:`waiting ${wait}ms for handshake`}) -// time += wait -// setTimeout(waitReady, wait) -// })() -// }) -// } diff --git a/src/socket-class.js b/src/socket-class.js index 6c03470..ac99d3b 100644 --- a/src/socket-class.js +++ b/src/socket-class.js @@ -57,15 +57,19 @@ export default function socketClass(Server) { if (path.dirname(opts.path) === '.') // relative path sent opts.path = path.join(DEFAULT_PIPE_DIR, opts.path) } + this.allowAnonymous = process.env.UCI_ANON || opts.allowAnonymous || false + this.tokens = opts.tokens || [] this.keepAlive = 'keepAlive' in opts ? opts.keepAlive : true this.pingInterval = opts.pingInterval === false ? opts.pingInterval : (opts.pingInterval * 1000 || 5000) - this.clientTracking = opts.clientTracking || true + // this.clientTracking = opts.clientTracking || true this.clients = [] // track consumers (i.e. clients) - this.nextClientID = 0 // incrementer for simple client ID + this.nextClientID = 0 // incrementer for default initial client ID this.opts = opts // for use to recover from selected errors //self bindings this._listen = this._listen.bind(this) this.create = this.create.bind(this) + this.authenticateClient = this.authenticateClient.bind(this) + this._authenticate = this._authenticate.bind(this) this.close = promisify(this.close).bind(this) log = logger({ file: 'src/socket.js', @@ -119,11 +123,7 @@ export default function socketClass(Server) { let [err, res] = await btc(this._listen)(this.opts) if (err) reject(err) - if (this.pingInterval) { - this._ping = setInterval( async () =>{ - if (this.clients.length > 0) this.push({pingInterval:this.pingInterval},'ping') - },this.pingInterval) - } + this.enablePing() resolve(res) }) // end creeate promise } // end create @@ -138,6 +138,18 @@ export default function socketClass(Server) { this._packetProcess = func } + enablePing () { + if (this.pingInterval > 499) { + this._ping = setInterval( async () =>{ + if (this.clients.length > 0) this.push({pingInterval:this.pingInterval},'ping') + },this.pingInterval) + } + } + + disablePing() { + clearInterval(this._ping) + } + /** * push - pushes a supplied UCI object packet to all connected clients * @@ -147,18 +159,12 @@ export default function socketClass(Server) { */ async push(packet={},id) { packet._header = {id: id || 'pushed'} - if (this.clients.length >0) { + if (this.clients.length > 0) { log.debug({method:'push', line:142, id:packet._header.id, opts: this.opts, packet: packet, msg:'pushing a packet to all connected consumers'}) this.clients.forEach(async client => { if (client.writable) { - let [err, ser] = await btc(client.stream.serialize)(packet) - if (err) - ser = await client.stream.serialize({ - error: 'was not able to serialze the res packet', - err: err, - _header: { id: packet._header.id } - }) - await this._send.bind(client)(ser) + let [err] = await btc(this._send)(client,packet) + if (err) log.error({msg:err, error:err}) } }) } else { @@ -166,68 +172,113 @@ export default function socketClass(Server) { } } + removeClient (id) { + let index = this.clients.findIndex(client => {return client.id === id }) + let client = this.clients[index] + client.removeAllListeners() + client.stream.removeAllListeners() + this.clients.splice(index,1) + log.warn({msg:'client removed from tracking',id:id, curClientCount:this.clients.length}) + } + + async authenticateClient(client) { + // let server = this + return new Promise(async function(resolve, reject) { + // when consumer gets the handshake they must follow with authentication + client.stream.on('message', authenticate.bind(this,client)) + let [err] = await btc(this._send)(client,{_handshake: true, id:client.id}) + if (err) { + log.error({msg:'error in handshake send', error:err}) + reject(err) + } + + async function authenticate (client,packet) { + log.info({msg:`authentication packet from client ${client.id}`, packet:packet}) + client.stream.removeAllListeners('message') + if (!packet._authenticate) reject('first client packet was not authentication') + else { + let [err, res] = await btc(this._authenticate)(packet) + client.authenticated = this.allowAnonymous ? 'anonymous' : (err || res) + client.name = packet.clientName + packet.authenticated = client.authenticated + await this._send(client,packet) // send either way + if (err && !this.allowAnonymous) reject(client.authenticated) + else resolve(client.authenticated) + } + } + }.bind(this)) + } + + // private methods + + // default authenticator + async _authenticate (packet) { + if (this.tokens.indexOf(packet.token) === -1) return Promise.reject(false) + return true + } + async _listen(opts) { return super.listen(opts, async (err, res) => { if (err) return Promise.reject(err) // this gets called for each client connection and is unique to each this.on('connection', async socket => { - const send = this._send.bind(socket) - const stream = new JSONStream() - socket.stream = stream // need this to track clients - if (this.clientTracking) { - this.nextClientID +=1 - socket.id = this.nextClientID - this.clients.push(socket) - } - // TODO add 'close' listener to socket to remove from this.clients log.debug({method:'_listen', line:167, msg:'new consumer connecting'}) - log.debug(await send(await stream.serialize({ _handshake: true }))) - if (this.opts.conPacket) { - this.opts.conPacket._header = { id: 'pushed' } - log.debug({method:'_listen', line:171, conPacket: this.opts.conPacket, msg:'pushing a preset command to just connected consumer'}) - send(await stream.serialize(this.opts.conPacket)) // send a packet command on to consumer on connection - } - + socket.id = ++this.nextClientID // server assigned ID + socket.authenticated = false + this.clients.push(socket) // add client to list + const stream = new JSONStream() + socket.stream = stream socket.setKeepAlive(this.keepAlive,3000) - // add error and close listeners on connection + // add listeners + const clientCloseHandler = (id) => { + log.warn({msg:'client connection closed during listen,',id:id}) + this.removeClient(id) + } + + socket.on('close', clientCloseHandler.bind(this,socket.id) ) + socket.on('error', (err) => { log.error({msg:'client connection error during listen',error:err}) + // TODO do more handling than just logging }) - const clientCloseHandler = (id, msg) => { - console.log('client that closed', id, msg, - ) - this.clients.splice(this.clients.findIndex(client => {return client.id === id }),1) - console.log('number of active clients',this.clients.length) - log.warn({msg:'client connection closed during listen',id:this.id, error:msg}) - } - console.log('binding on this client', socket.id) - socket.on('close', clientCloseHandler.bind(this,socket.id) ) + socket.on('data', stream.onData) // send data to stream.on('error', (err) => { log.error({msg:'client-socket stream error during listen',error:err}) + // TODO do more handling than just logging }) - socket.on('data', stream.onData) - // TODO need to start error listener for stream so errors can be processed + let [err] = await btc(this.authenticateClient)(socket) + if (!this.allowAnonymous) { + if (err) { + socket.end()// abort new connection socket, cleanup, remove listeners + this.removeClient(socket.id) + return + } + } + + // all's set main message processor stream.on('message', messageProcess.bind(this, socket)) + if (this.opts.conPacket) { + this.opts.conPacket._header = { id: 'pushed' } + log.debug({method:'_listen', line:171, conPacket: this.opts.conPacket, msg:'pushing a preset command to just connected consumer'}) + this._send(socket,this.opts.conPacket) // send a packet command on to consumer on connection + } + + // that's it. Connection is active + async function messageProcess(client, packet) { - log.debug({method:'_listen', line:179, packet: packet, msg:'incoming packet on socket side'}) - let res = {} - if (this.clientTracking && packet.clientID) { - client.ID = packet.clientID - res.cmd = 'ackID' - } else { - res = (await this._packetProcess(clone(packet))) || {} - if (Object.keys(res).length === 0) - res = { - error: + log.info({method:'_listen', line:179, packet: packet, client:client.name, msg:'incoming packet on socket side'}) + let res = (await this._packetProcess(clone(packet))) || {} + if (Object.keys(res).length === 0) + res = { + error: 'socket packet command function likely did not return a promise', - packet: packet - } - } + packet: packet + } if (packet) { res._header = clone(packet._header, false) || {} //make sure return packet has header with id in case it was removed in processing delete packet._header // remove before adding to response header as request @@ -236,44 +287,50 @@ export default function socketClass(Server) { res._header.responder = { name: this.name, instanceID: this.id } res._header.socket = this.address() if (!res.cmd) res.cmd = 'reply' // by default return command is 'reply' - let [err, ser] = await btc(stream.serialize)(res) - if (err) - ser = await stream.serialize({ - error: 'was not able to serialze the res packet', - err: err, - _header: { id: res._header.id } - }) - await send(ser) - } // end process message + let [err] = await btc(this._send)(client,res) + if (err) log.error({msg:err, error:err}) + } // end message process + + }) // end connecttion handler - log.info({method:'_listen', line:255, opts: this.opt, msg:'socket created and listening'}) + + log.info({method:'_listen', line:255, opts: this.opt, msg:'socket server created and listening'}) return res + }) // end super listen callback + } // end listen + // call when socket server is going down async _destroy() { - log.debug({method:'_destroy', line:217, msg:'closing down socket'}) + log.fatal({method:'_destroy', line:217, msg:'closing down socket server'}) + // this.push() clearInterval(this._ping) await this.close() + this.clients.forEach(client => { + client.removeAllListeners() + client.stream.removeAllListeners() + }) log.debug({method:'_destroy', line:219, msg:'all connections closed....exiting'}) process.exit() } - // default packet process, just a simple echo + // default packet process, just a simple echo, override with registerPacketProcessor async _packetProcess(packet) { return new Promise(resolve => { resolve(packet) }) } - // consumer send, must have a consumer socket bound to use - async _send(packet) { - // timeout already set if sockect can't be drained in 10 secs - return new Promise(resolve => { + async _send(client, packet) { + log.debug({msg:`sending to client:${client.id}`, packet:packet}) + return new Promise(async (resolve, reject) => { + let [err,ser] = await btc(client.stream.serialize)(packet) + if (err) reject('unable to serialze the packet') const cb = () => resolve('packet written to socket stream') - if (!this.write(packet)) { - this.once('drain', cb) + if (!client.write(ser)) { + client.once('drain', cb) } else { process.nextTick(cb) }