From 375790bfc4106f962adeb8b2edef1f4fd1235ca4 Mon Sep 17 00:00:00 2001 From: David Kebler Date: Tue, 20 Aug 2019 10:52:59 -0700 Subject: [PATCH] Add client ID to connected client on server and remove client when it closes --- examples/client-push.js | 33 ------------- examples/client.js | 30 ++++++++++-- examples/client2.js | 37 --------------- examples/server-push.js | 42 ----------------- examples/server.js | 100 ++++++++++++++++++++++------------------ src/consumer.js | 7 ++- src/socket-class.js | 53 +++++++++++++-------- 7 files changed, 118 insertions(+), 184 deletions(-) delete mode 100644 examples/client-push.js delete mode 100644 examples/client2.js delete mode 100644 examples/server-push.js diff --git a/examples/client-push.js b/examples/client-push.js deleted file mode 100644 index 80296b6..0000000 --- a/examples/client-push.js +++ /dev/null @@ -1,33 +0,0 @@ -import Consumer from '../src/consumer' - -// const client1= new Consumer({name:'example-consumer1' }) -const client= new Consumer({path:true, name:'example-consumer'}) - -// let packet = {name: 'client', cmd:'doit', data:'data sent by client'} - -// This is your client handler object waiting on a message to do something -const process = async function (packet) { - // return new Promise((resolve) => { - // console.log('====== packet pushed from server ======') - // console.dir(packet) - // // setTimeout(resolve('done'),100) - // resolve('done') - // }) - console.log('====== packet pushed from server ======') - console.dir(packet) - // setTimeout(resolve('done'),100) - return Promise.resolve('done') - -} - -client.registerPacketProcessor(process) - -; -(async () => { - - console.log(await client.connect()) - - -})().catch(err => { - console.error('FATAL: UNABLE TO START SYSTEM!\n',err) -}) diff --git a/examples/client.js b/examples/client.js index de81557..eddfcb8 100644 --- a/examples/client.js +++ b/examples/client.js @@ -4,24 +4,44 @@ import btc from 'better-try-catch' // const client1= new Consumer({name:'example-consumer1' }) const client= new Consumer({path:true, name:'example-consumer', initTimeout:30 }) -let packet = {name: 'client', cmd:'doit', data:'sent by client'} - // This is your client handler object waiting on a message to do something -const process = function (packet) { - console.log(`Packet from ${packet.name} Processed by Socket: ${packet.status}`) +const process = async (packet) => { + console.log('packet being processed at socket') + if (packet.cmd) { + if (this[packet.cmd]) return await this[packet.cmd](packet) + else return {error: 'command has no processing function', packet: packet } + } return {error: 'no command in packet', packet: packet } +} + + +client.pushed = async (packet) => { + return new Promise(resolve => { + console.log('pushed packet/n',packet) + resolve() + }) } client.registerPacketProcessor(process) +client.on('connection', event => { + console.log('============ connection update ============') + console.log(event.id) + console.log(event.msg) + console.log(`${event.ready ? 'Consumer is connected' : 'Consumer is disconnected'}`) + console.log('======================================') +}) + ; (async () => { - + console.log('ready at start',client.ready) // await Promise.all([client1.connect(),client2.connect()]) let [err, res] = await btc(client.connect)() if (err) { console.log('error', err) } else { + console.log('ready now?',client.ready) console.log('connect reponse',res) + let packet = {name: 'client', cmd:'doit', data:'sent by client'} console.log('sending packet ', packet) console.log('=========\n',await client.send(packet)) } diff --git a/examples/client2.js b/examples/client2.js deleted file mode 100644 index cb60652..0000000 --- a/examples/client2.js +++ /dev/null @@ -1,37 +0,0 @@ -import Consumer from '../src/consumer' - -const USOCKET = __dirname + '/sample.sock' - - -class Client extends Consumer { - constructor(path,opts) { - super(path,opts) - } - - async _packetProcess (packet) { - this[packet.cmd](packet) - } - - async reply (packet) { - console.log(`Packet from ${packet.name} Processed by Socket: ${packet.status}`) - console.log(`Socket replied with data: ${packet.data}`) - } - -} - -const client1= new Client({path:true,name:'example-consumer1' }) -const client2 = new Client({path:true,name:'example-consumer2'}) - -let packet1 = {name: 'client1', cmd:'doit', data:'data sent by client1'} -let packet2 = {name: 'client2', cmd:'doit', data:'data sent by client2'} - -; -(async () => { - - await Promise.all([client1.connect(),client2.connect()]) - console.log(await Promise.all([client1.send(packet1),client2.send(packet2)])) - client1.end() - client2.end() -})().catch(err => { - console.error('FATAL: UNABLE TO START SYSTEM!\n',err) -}) diff --git a/examples/server-push.js b/examples/server-push.js deleted file mode 100644 index e0f26e7..0000000 --- a/examples/server-push.js +++ /dev/null @@ -1,42 +0,0 @@ -import { Socket } from '../src' - - ; -(async () => { - - class Test extends Socket { - constructor(opts) { - super(opts) - } - - async _packetProcess(packet) { - console.log('packet being processed at socket') - if (packet.cmd) return await this[packet.cmd](packet.data,packet.name) - return {error: 'no command in packet', packet: packet } - } - - async doit(data,name) { - return new Promise(resolve => { - let res = {} - console.log('data sent to doit = ', data) - res.status ='success' - res.name = name - res.cmd = 'reply' - res.data = 'this would be response data from socket doit function' - resolve(res) - }) - - } - - } - - // let test = new Test() - let test = new Test({path:true, conPacket:{onconnect:'this is a packet sent to consumer as soon as it connects'}}) - await test.create() - let count = 0 - setInterval( () => { - count++ - test.push({name:'pushed', count:count, status:'some pushed data'}) },10000) - -})().catch(err => { - console.error('FATAL: UNABLE TO START SYSTEM!\n',err) -}) diff --git a/examples/server.js b/examples/server.js index a91c0e8..944e9b2 100644 --- a/examples/server.js +++ b/examples/server.js @@ -1,5 +1,4 @@ import { Socket as uSocket} from '../src' -// import { fs } from 'mz' // made key cert into module that also uses environment variables // const TLS = process.env.TLS || false @@ -10,7 +9,49 @@ import { Socket as uSocket} from '../src' let Socket = uSocket -; +class Test extends Socket { + constructor(opts) { + super(opts) + } + + async _packetProcess(packet) { + console.log('packet being processed at socket') + if (packet.cmd) return await this[packet.cmd](packet.data,packet.name) + return {error: 'no command in packet', packet: packet } + } + + async doit(data,name) { + return new Promise(resolve => { + let res = {} + console.log('data sent to doit = ', data) + res.status ='success' + res.name = name + res.cmd = 'reply' + res.data = 'this would be response data from socket doit function' + resolve(res) + }) + } + +} + + +// const options = { +// tls: TLS, +// key: await fs.readFile(TLS_KEY_PATH), +// cert: await fs.readFile(TLS_CRT_PATH), +// // This is necessary only if using client certificate authentication. +// // requestCert: true, +// // This is necessary only if the client uses a self-signed certificate. +// // ca: [ fs.readFileSync('client-cert.pem') ] +// } + +let options = {path:true, conPacket:{onconnect:'this is a packet sent to consumer as soon as it connects'}} + + +// let test = new Test() +let test = new Test(options) + + ; (async () => { // TODO dynamic import // if(TLS_KEY_PATH && TLS_CRT_PATH && TLS) { @@ -18,52 +59,19 @@ let Socket = uSocket // console.log('using TLS') // } - class Test extends Socket { - constructor(opts) { - super(opts) - } - async _packetProcess(packet) { - console.log('packet being processed at socket') - if (packet.cmd) return await this[packet.cmd](packet.data,packet.name) - return {error: 'no command in packet', packet: packet } - } - - async doit(data,name) { - return new Promise(resolve => { - let res = {} - console.log('data sent to doit = ', data) - res.status ='success' - res.name = name - res.cmd = 'reply' - res.data = 'this would be response data from socket doit function' - resolve(res) - }) - } - - } - - - // const options = { - // tls: TLS, - // key: await fs.readFile(TLS_KEY_PATH), - // cert: await fs.readFile(TLS_CRT_PATH), - // // This is necessary only if using client certificate authentication. - // // requestCert: true, - // // This is necessary only if the client uses a self-signed certificate. - // // ca: [ fs.readFileSync('client-cert.pem') ] - // } - - let options = {path:true} - - - // let test = new Test() - let test = new Test(options) await test.create() - setTimeout( () => { - console.log('closing server') - test._destroy() - },20000) + + let count = 0 + setInterval( () => { + count++ + test.push({cmd:'pushed', count:count, status:'some pushed data'}) + },3000) + + // setTimeout( () => { + // console.log('closing server') + // test._destroy() + // },20000) })().catch(err => { console.error('FATAL: UNABLE TO START SYSTEM!\n',err) diff --git a/src/consumer.js b/src/consumer.js index e817bff..f158233 100644 --- a/src/consumer.js +++ b/src/consumer.js @@ -63,6 +63,8 @@ class SocketConsumer extends Socket { // this._write = this._write.bind(this) } + get ready() { return this._ready} + async connect() { return new Promise((resolve, reject) => { @@ -70,11 +72,12 @@ class SocketConsumer extends Socket { log.warn({method:'connect', line:107, msg:'tcp consumer on same machine as host, use named Pipe(Unix) Socket Instead'}) log.debug('first connnect attempt for', this.opts.name) + this.emit('connection',{msg:'attempting initial connection', id:this.id, opts:this.opts, ready:false}) let initTimeout = setTimeout(() => { + this.emit('connection',{msg:'initial connection timed out', id:this.id, timeout:this.initTimeout, wait:this.retryWait, opts:this.opts, ready:false}) this.removeAllListeners() log.fatal({method:'connect', line:69, opts: this.opts, msg:`unable to initially connect to ${this.opts.name} in ${this.initTimeout/1000} secs no more attempts!`}) - this.removeAllListeners() this.stream.removeAllListeners() this.destroy() reject({ opts: this.opts, msg: `unable to connect initially to socket server in ${this.initTimeout/1000} secs, giving up no more attempts`}) @@ -90,7 +93,7 @@ class SocketConsumer extends Socket { clearTimeout(initTimeout) this._listen() // setup for active connection this._ready = true - this.emit('connection','connected') + this.emit('connection',{msg:'initial connection succesfull', id:this.id, opts:this.opts, ready:true}) resolve('initial connection successful') } } diff --git a/src/socket-class.js b/src/socket-class.js index 9e32184..6c03470 100644 --- a/src/socket-class.js +++ b/src/socket-class.js @@ -61,6 +61,7 @@ export default function socketClass(Server) { this.pingInterval = opts.pingInterval === false ? opts.pingInterval : (opts.pingInterval * 1000 || 5000) this.clientTracking = opts.clientTracking || true this.clients = [] // track consumers (i.e. clients) + this.nextClientID = 0 // incrementer for simple client ID this.opts = opts // for use to recover from selected errors //self bindings this._listen = this._listen.bind(this) @@ -120,7 +121,7 @@ export default function socketClass(Server) { if (err) reject(err) if (this.pingInterval) { this._ping = setInterval( async () =>{ - if (this.clients) this.push({pingInterval:this.pingInterval},'ping') + if (this.clients.length > 0) this.push({pingInterval:this.pingInterval},'ping') },this.pingInterval) } resolve(res) @@ -146,19 +147,23 @@ export default function socketClass(Server) { */ async push(packet={},id) { packet._header = {id: id || 'pushed'} - log.debug({method:'push', line:142, id:packet._header.id, opts: this.opts, packet: packet, msg:'pushing a packet to all connected consumers'}) - this.clients.forEach(async client => { - if (client.writable) { - let [err, ser] = await btc(client.stream.serialize)(packet) - if (err) - ser = await client.stream.serialize({ - error: 'was not able to serialze the res packet', - err: err, - _header: { id: packet._header.id } - }) - await this._send.bind(client)(ser) - } - }) + if (this.clients.length >0) { + log.debug({method:'push', line:142, id:packet._header.id, opts: this.opts, packet: packet, msg:'pushing a packet to all connected consumers'}) + this.clients.forEach(async client => { + if (client.writable) { + let [err, ser] = await btc(client.stream.serialize)(packet) + if (err) + ser = await client.stream.serialize({ + error: 'was not able to serialze the res packet', + err: err, + _header: { id: packet._header.id } + }) + await this._send.bind(client)(ser) + } + }) + } else { + log.debug({method:'push', line:165, id:packet._header.id, opts: this.opts, packet: packet, msg:'no connected consumers, push ignored'}) + } } async _listen(opts) { @@ -166,10 +171,14 @@ export default function socketClass(Server) { if (err) return Promise.reject(err) // this gets called for each client connection and is unique to each this.on('connection', async socket => { + const send = this._send.bind(socket) const stream = new JSONStream() socket.stream = stream // need this to track clients - let send = this._send.bind(socket) - if (this.clientTracking) this.clients.push(socket) + if (this.clientTracking) { + this.nextClientID +=1 + socket.id = this.nextClientID + this.clients.push(socket) + } // TODO add 'close' listener to socket to remove from this.clients log.debug({method:'_listen', line:167, msg:'new consumer connecting'}) log.debug(await send(await stream.serialize({ _handshake: true }))) @@ -186,9 +195,15 @@ export default function socketClass(Server) { log.error({msg:'client connection error during listen',error:err}) }) - socket.on('close', (msg) => { - log.warn({msg:'client connection closed during listen',error:msg}) - }) + const clientCloseHandler = (id, msg) => { + console.log('client that closed', id, msg, + ) + this.clients.splice(this.clients.findIndex(client => {return client.id === id }),1) + console.log('number of active clients',this.clients.length) + log.warn({msg:'client connection closed during listen',id:this.id, error:msg}) + } + console.log('binding on this client', socket.id) + socket.on('close', clientCloseHandler.bind(this,socket.id) ) stream.on('error', (err) => { log.error({msg:'client-socket stream error during listen',error:err})