Add client ID to connected client on server and remove client when it closes

This commit is contained in:
David Kebler 2019-08-20 10:52:59 -07:00
parent 3bf375c8fd
commit 375790bfc4
7 changed files with 118 additions and 184 deletions

View file

@ -1,33 +0,0 @@
import Consumer from '../src/consumer'
// const client1= new Consumer({name:'example-consumer1' })
const client= new Consumer({path:true, name:'example-consumer'})
// let packet = {name: 'client', cmd:'doit', data:'data sent by client'}
// This is your client handler object waiting on a message to do something
const process = async function (packet) {
// return new Promise((resolve) => {
// console.log('====== packet pushed from server ======')
// console.dir(packet)
// // setTimeout(resolve('done'),100)
// resolve('done')
// })
console.log('====== packet pushed from server ======')
console.dir(packet)
// setTimeout(resolve('done'),100)
return Promise.resolve('done')
}
client.registerPacketProcessor(process)
;
(async () => {
console.log(await client.connect())
})().catch(err => {
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)
})

View file

@ -4,24 +4,44 @@ import btc from 'better-try-catch'
// const client1= new Consumer({name:'example-consumer1' })
const client= new Consumer({path:true, name:'example-consumer', initTimeout:30 })
let packet = {name: 'client', cmd:'doit', data:'sent by client'}
// This is your client handler object waiting on a message to do something
const process = function (packet) {
console.log(`Packet from ${packet.name} Processed by Socket: ${packet.status}`)
const process = async (packet) => {
console.log('packet being processed at socket')
if (packet.cmd) {
if (this[packet.cmd]) return await this[packet.cmd](packet)
else return {error: 'command has no processing function', packet: packet }
} return {error: 'no command in packet', packet: packet }
}
client.pushed = async (packet) => {
return new Promise(resolve => {
console.log('pushed packet/n',packet)
resolve()
})
}
client.registerPacketProcessor(process)
client.on('connection', event => {
console.log('============ connection update ============')
console.log(event.id)
console.log(event.msg)
console.log(`${event.ready ? 'Consumer is connected' : 'Consumer is disconnected'}`)
console.log('======================================')
})
;
(async () => {
console.log('ready at start',client.ready)
// await Promise.all([client1.connect(),client2.connect()])
let [err, res] = await btc(client.connect)()
if (err) {
console.log('error', err)
} else {
console.log('ready now?',client.ready)
console.log('connect reponse',res)
let packet = {name: 'client', cmd:'doit', data:'sent by client'}
console.log('sending packet ', packet)
console.log('=========\n',await client.send(packet))
}

View file

@ -1,37 +0,0 @@
import Consumer from '../src/consumer'
const USOCKET = __dirname + '/sample.sock'
class Client extends Consumer {
constructor(path,opts) {
super(path,opts)
}
async _packetProcess (packet) {
this[packet.cmd](packet)
}
async reply (packet) {
console.log(`Packet from ${packet.name} Processed by Socket: ${packet.status}`)
console.log(`Socket replied with data: ${packet.data}`)
}
}
const client1= new Client({path:true,name:'example-consumer1' })
const client2 = new Client({path:true,name:'example-consumer2'})
let packet1 = {name: 'client1', cmd:'doit', data:'data sent by client1'}
let packet2 = {name: 'client2', cmd:'doit', data:'data sent by client2'}
;
(async () => {
await Promise.all([client1.connect(),client2.connect()])
console.log(await Promise.all([client1.send(packet1),client2.send(packet2)]))
client1.end()
client2.end()
})().catch(err => {
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)
})

View file

@ -1,42 +0,0 @@
import { Socket } from '../src'
;
(async () => {
class Test extends Socket {
constructor(opts) {
super(opts)
}
async _packetProcess(packet) {
console.log('packet being processed at socket')
if (packet.cmd) return await this[packet.cmd](packet.data,packet.name)
return {error: 'no command in packet', packet: packet }
}
async doit(data,name) {
return new Promise(resolve => {
let res = {}
console.log('data sent to doit = ', data)
res.status ='success'
res.name = name
res.cmd = 'reply'
res.data = 'this would be response data from socket doit function'
resolve(res)
})
}
}
// let test = new Test()
let test = new Test({path:true, conPacket:{onconnect:'this is a packet sent to consumer as soon as it connects'}})
await test.create()
let count = 0
setInterval( () => {
count++
test.push({name:'pushed', count:count, status:'some pushed data'}) },10000)
})().catch(err => {
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)
})

View file

@ -1,5 +1,4 @@
import { Socket as uSocket} from '../src'
// import { fs } from 'mz'
// made key cert into module that also uses environment variables
// const TLS = process.env.TLS || false
@ -10,7 +9,49 @@ import { Socket as uSocket} from '../src'
let Socket = uSocket
;
class Test extends Socket {
constructor(opts) {
super(opts)
}
async _packetProcess(packet) {
console.log('packet being processed at socket')
if (packet.cmd) return await this[packet.cmd](packet.data,packet.name)
return {error: 'no command in packet', packet: packet }
}
async doit(data,name) {
return new Promise(resolve => {
let res = {}
console.log('data sent to doit = ', data)
res.status ='success'
res.name = name
res.cmd = 'reply'
res.data = 'this would be response data from socket doit function'
resolve(res)
})
}
}
// const options = {
// tls: TLS,
// key: await fs.readFile(TLS_KEY_PATH),
// cert: await fs.readFile(TLS_CRT_PATH),
// // This is necessary only if using client certificate authentication.
// // requestCert: true,
// // This is necessary only if the client uses a self-signed certificate.
// // ca: [ fs.readFileSync('client-cert.pem') ]
// }
let options = {path:true, conPacket:{onconnect:'this is a packet sent to consumer as soon as it connects'}}
// let test = new Test()
let test = new Test(options)
;
(async () => {
// TODO dynamic import
// if(TLS_KEY_PATH && TLS_CRT_PATH && TLS) {
@ -18,52 +59,19 @@ let Socket = uSocket
// console.log('using TLS')
// }
class Test extends Socket {
constructor(opts) {
super(opts)
}
async _packetProcess(packet) {
console.log('packet being processed at socket')
if (packet.cmd) return await this[packet.cmd](packet.data,packet.name)
return {error: 'no command in packet', packet: packet }
}
async doit(data,name) {
return new Promise(resolve => {
let res = {}
console.log('data sent to doit = ', data)
res.status ='success'
res.name = name
res.cmd = 'reply'
res.data = 'this would be response data from socket doit function'
resolve(res)
})
}
}
// const options = {
// tls: TLS,
// key: await fs.readFile(TLS_KEY_PATH),
// cert: await fs.readFile(TLS_CRT_PATH),
// // This is necessary only if using client certificate authentication.
// // requestCert: true,
// // This is necessary only if the client uses a self-signed certificate.
// // ca: [ fs.readFileSync('client-cert.pem') ]
// }
let options = {path:true}
// let test = new Test()
let test = new Test(options)
await test.create()
setTimeout( () => {
console.log('closing server')
test._destroy()
},20000)
let count = 0
setInterval( () => {
count++
test.push({cmd:'pushed', count:count, status:'some pushed data'})
},3000)
// setTimeout( () => {
// console.log('closing server')
// test._destroy()
// },20000)
})().catch(err => {
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)

View file

@ -63,6 +63,8 @@ class SocketConsumer extends Socket {
// this._write = this._write.bind(this)
}
get ready() { return this._ready}
async connect() {
return new Promise((resolve, reject) => {
@ -70,11 +72,12 @@ class SocketConsumer extends Socket {
log.warn({method:'connect', line:107, msg:'tcp consumer on same machine as host, use named Pipe(Unix) Socket Instead'})
log.debug('first connnect attempt for', this.opts.name)
this.emit('connection',{msg:'attempting initial connection', id:this.id, opts:this.opts, ready:false})
let initTimeout = setTimeout(() => {
this.emit('connection',{msg:'initial connection timed out', id:this.id, timeout:this.initTimeout, wait:this.retryWait, opts:this.opts, ready:false})
this.removeAllListeners()
log.fatal({method:'connect', line:69, opts: this.opts, msg:`unable to initially connect to ${this.opts.name} in ${this.initTimeout/1000} secs no more attempts!`})
this.removeAllListeners()
this.stream.removeAllListeners()
this.destroy()
reject({ opts: this.opts, msg: `unable to connect initially to socket server in ${this.initTimeout/1000} secs, giving up no more attempts`})
@ -90,7 +93,7 @@ class SocketConsumer extends Socket {
clearTimeout(initTimeout)
this._listen() // setup for active connection
this._ready = true
this.emit('connection','connected')
this.emit('connection',{msg:'initial connection succesfull', id:this.id, opts:this.opts, ready:true})
resolve('initial connection successful')
}
}

View file

@ -61,6 +61,7 @@ export default function socketClass(Server) {
this.pingInterval = opts.pingInterval === false ? opts.pingInterval : (opts.pingInterval * 1000 || 5000)
this.clientTracking = opts.clientTracking || true
this.clients = [] // track consumers (i.e. clients)
this.nextClientID = 0 // incrementer for simple client ID
this.opts = opts // for use to recover from selected errors
//self bindings
this._listen = this._listen.bind(this)
@ -120,7 +121,7 @@ export default function socketClass(Server) {
if (err) reject(err)
if (this.pingInterval) {
this._ping = setInterval( async () =>{
if (this.clients) this.push({pingInterval:this.pingInterval},'ping')
if (this.clients.length > 0) this.push({pingInterval:this.pingInterval},'ping')
},this.pingInterval)
}
resolve(res)
@ -146,19 +147,23 @@ export default function socketClass(Server) {
*/
async push(packet={},id) {
packet._header = {id: id || 'pushed'}
log.debug({method:'push', line:142, id:packet._header.id, opts: this.opts, packet: packet, msg:'pushing a packet to all connected consumers'})
this.clients.forEach(async client => {
if (client.writable) {
let [err, ser] = await btc(client.stream.serialize)(packet)
if (err)
ser = await client.stream.serialize({
error: 'was not able to serialze the res packet',
err: err,
_header: { id: packet._header.id }
})
await this._send.bind(client)(ser)
}
})
if (this.clients.length >0) {
log.debug({method:'push', line:142, id:packet._header.id, opts: this.opts, packet: packet, msg:'pushing a packet to all connected consumers'})
this.clients.forEach(async client => {
if (client.writable) {
let [err, ser] = await btc(client.stream.serialize)(packet)
if (err)
ser = await client.stream.serialize({
error: 'was not able to serialze the res packet',
err: err,
_header: { id: packet._header.id }
})
await this._send.bind(client)(ser)
}
})
} else {
log.debug({method:'push', line:165, id:packet._header.id, opts: this.opts, packet: packet, msg:'no connected consumers, push ignored'})
}
}
async _listen(opts) {
@ -166,10 +171,14 @@ export default function socketClass(Server) {
if (err) return Promise.reject(err)
// this gets called for each client connection and is unique to each
this.on('connection', async socket => {
const send = this._send.bind(socket)
const stream = new JSONStream()
socket.stream = stream // need this to track clients
let send = this._send.bind(socket)
if (this.clientTracking) this.clients.push(socket)
if (this.clientTracking) {
this.nextClientID +=1
socket.id = this.nextClientID
this.clients.push(socket)
}
// TODO add 'close' listener to socket to remove from this.clients
log.debug({method:'_listen', line:167, msg:'new consumer connecting'})
log.debug(await send(await stream.serialize({ _handshake: true })))
@ -186,9 +195,15 @@ export default function socketClass(Server) {
log.error({msg:'client connection error during listen',error:err})
})
socket.on('close', (msg) => {
log.warn({msg:'client connection closed during listen',error:msg})
})
const clientCloseHandler = (id, msg) => {
console.log('client that closed', id, msg,
)
this.clients.splice(this.clients.findIndex(client => {return client.id === id }),1)
console.log('number of active clients',this.clients.length)
log.warn({msg:'client connection closed during listen',id:this.id, error:msg})
}
console.log('binding on this client', socket.id)
socket.on('close', clientCloseHandler.bind(this,socket.id) )
stream.on('error', (err) => {
log.error({msg:'client-socket stream error during listen',error:err})