0.2.21 add ability to set the default return command as option defaultReturnCmd

add consumer-connection emits
change name in connection handler to consumer from socket
add getClientIndex and getClient to api methods
This commit is contained in:
David Kebler 2019-09-13 18:59:22 -07:00
parent 5f28baaa74
commit 7502902a51
3 changed files with 58 additions and 29 deletions

View file

@ -1,6 +1,6 @@
{
"name": "@uci/socket",
"version": "0.2.20",
"version": "0.2.21",
"description": "JSON packet intra(named)/inter(TCP) host communication over socket",
"main": "src",
"scripts": {

View file

@ -72,7 +72,7 @@ class SocketConsumer extends Socket {
if (this.opts.host === '127.0.0.1'|| this.opts.host ==='localhost')
log.warn({method:'connect', line:107, msg:'tcp consumer on same machine as host, use named Pipe(Unix) Socket Instead'})
log.debug('first connnect attempt for', this.opts.name)
log.debug('first connnect attempt for', this.opts.id)
this.emit('status',{level:'info', msg:'attempting initial connection', id:this.id, opts:this.opts, ready:false})
let initTimeout = {}
@ -107,6 +107,8 @@ class SocketConsumer extends Socket {
this._listen() // setup for active connection
log.info({method:'connect', line:87, msg:'initial connect/authentication complete ready for communication'})
this.emit('status',{level:'info', msg:'authentication succesfull', id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected})
this.emit('consumer-connection',{state:'connected',name:this.id})
if (this.opts.conPacket) (this.send(this.conPacket))
resolve('initial connection successful')
}
}
@ -229,6 +231,8 @@ class SocketConsumer extends Socket {
}
this.stream.on('message', messageHandler.bind(this)) // reset default message handler
this.emit('status',{level:'info', msg:'reconnected', id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected})
this.emit('consumer-connection', {state:'reconnected', name:this.id})
if (this.opts.conPacket) (this.send(this.conPacket))
}
}
}
@ -239,6 +243,7 @@ class SocketConsumer extends Socket {
this._authenticated = false
this.emit('status',{level:'error', msg:'connection error', id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected})
log.debug({ method:'connect', line:130, error: err.code, msg:`connect error ${err.code}, attempting reconnect after ${this.retryWait/1000} secs`})
this.emit('consumer-connection', {state:'disconnected', name:this.id})
await pause(this.retryWait)
this.stream.removeAllListeners('message') // remove regular message handler in prep for reconnect
this.removeAllListeners('connect')
@ -259,7 +264,7 @@ class SocketConsumer extends Socket {
const pingHandler = async (packet) => {
clearTimeout(pingTimeout)
log.debug({method:'connect', line:191, msg:'received ping, restting timeout'})
log.trace({method:'connect', line:191, msg:'received ping, restting timeout'})
this._pingTimeout= packet.pingInterval + 1000
monitorPing.call(this)
}
@ -276,7 +281,7 @@ class SocketConsumer extends Socket {
// general handler
function messageHandler(packet) {
log.debug('incoming packet from socket sever',packet)
if (packet._header.id !== 'ping') log.debug('incoming packet from socket sever',packet)
this.emit(packet._header.id, packet)
}

View file

@ -57,12 +57,12 @@ export default function socketClass(Server) {
if (path.dirname(opts.path) === '.') // relative path sent
opts.path = path.join(DEFAULT_PIPE_DIR, opts.path)
}
this.defaultReturnCmd = opts.defaultReturnCmd
this.allowAnonymous = (!opts.tokens || !!process.env.UCI_ANON || opts.allowAnonymous) ? true : false
this.tokens = opts.tokens || []
this.keepAlive = 'keepAlive' in opts ? opts.keepAlive : true
this.pingInterval = opts.pingInterval === false ? opts.pingInterval : (opts.pingInterval * 1000 || 5000)
// this.clientTracking = opts.clientTracking || true
this.clients = [] // track consumers (i.e. clients)
this.clients = [] // track consumers (i.e. clients) TODO use a Map
this.nextClientID = 0 // incrementer for default initial client ID
this.opts = opts // for use to recover from selected errors
this.errorCount = 0
@ -126,8 +126,16 @@ export default function socketClass(Server) {
this.on('error', err => {
this.errorCount +=1 // log errors here
this.errors.push(err)
if(this.errorCount>2) this.emit('warn', {msg:'something bad maybe going on, 3 errors', errors:this.errors})
if(this.errorCount>5) this.emit('fatal', {msg:'something fatal is going on, 6 errors', errors:this.errors})
if(this.errorCount>2 && this.errorCount<6) {
let errors= {level:'warning',msg:'something bad maybe going on, 3 errors', errors:this.errors}
this.emit('status', errors)
log.error(errors)
}
if(this.errorCount>5) {
let errors = {level:'fatal',msg:'something fatal is going on, 6 errors', errors:this.errors}
log.fatal(errors)
this.emit('status', errors)
}
})
log.info({method:'create', line:54, msg:'socket server created and listening at', address:this.address()})
this.on('connection', this._connectionHandler.bind(this))
@ -207,7 +215,8 @@ export default function socketClass(Server) {
async push(packet={},id) {
packet._header = {id: id || 'pushed'}
if (this.clients.length > 0) {
log.debug({method:'push', line:142, id:packet._header.id, opts: this.opts, packet: packet, msg:'pushing a packet to all connected consumers'})
log.trace({method:'push', line:142, id:packet._header.id, opts: this.opts, packet: packet, msg:'pushing a packet to all connected consumers'})
// TODO should do a map and single promise
this.clients.forEach(async client => {
if (client.writable) {
let [err] = await btc(this._send)(client,packet)
@ -219,9 +228,19 @@ export default function socketClass(Server) {
}
}
// TODO won't need this if moving to a Map
getClientIndex(id) {
return this.clients.findIndex(client => {return client.id === id })
}
getClient(id) {
return this.clients[this._getClientIndex(id)]
}
removeClient (id) {
let index = this.clients.findIndex(client => {return client.id === id })
let client = this.clients[index]
let index = this.getClientIndex(id)
let client=this.clients[index]
this.emit('consumer-connection',{state:'disconnected', id:client.id, name:client.name, socketSide:true})
client.removeAllListeners()
client.stream.removeAllListeners()
this.clients.splice(index,1)
@ -281,14 +300,16 @@ export default function socketClass(Server) {
}
async _connectionHandler(socket) { // this gets called for each client connection and is unique to each
log.debug({method:'_listen', line:167, msg:'new consumer connecting'})
socket.id = ++this.nextClientID // server assigned ID
socket.authenticated = false
this.clients.push(socket) // add client to list
async _connectionHandler(consumer) { // this gets called for each client connection and is unique to each
consumer.id = ++this.nextClientID // server assigned ID
log.debug({method:'_listen', line:167, msg:'new consumer connecting', id:consumer.id, totalConsumers:this.clients.length})
consumer.socketSide = true
consumer.authenticated = false
this.clients.push(consumer) // add client to list
const stream = new JSONStream()
socket.stream = stream
socket.setKeepAlive(this.keepAlive,3000)
consumer.stream = stream
console.log('new consumer connecting', consumer.id, this.clients.length)
consumer.setKeepAlive(this.keepAlive,3000)
// add listeners
const clientCloseHandler = (id) => {
@ -296,38 +317,41 @@ export default function socketClass(Server) {
this.removeClient(id)
}
socket.on('close', clientCloseHandler.bind(this,socket.id) )
consumer.on('close', clientCloseHandler.bind(this,consumer.id) )
socket.on('error', (err) => {
consumer.on('error', (err) => {
log.error({msg:'client connection error during listen',error:err})
// TODO do more handling than just logging
})
socket.on('data', stream.onData) // send data to
consumer.on('data', stream.onData) // send data to
stream.on('error', (err) => {
log.error({msg:'client-socket stream error during listen',error:err})
log.error({msg:'consumer stream error during listen',error:err})
// TODO do more handling than just logging
})
let [err] = await btc(this.authenticateClient)(socket)
let [err] = await btc(this.authenticateClient)(consumer)
if (!this.allowAnonymous) {
if (err) {
socket.end()// abort new connection socket, cleanup, remove listeners
this.removeClient(socket.id)
consumer.end()// abort new connection consumer, cleanup, remove listeners
this.removeClient(consumer.id)
return
}
}
// all's set main message processor
stream.on('message', messageProcess.bind(this, socket))
stream.on('message', messageProcess.bind(this, consumer))
if (this.opts.conPacket) {
this.opts.conPacket._header = { id: 'pushed' }
log.debug({method:'_listen', line:171, conPacket: this.opts.conPacket, msg:'pushing a preset command to just connected consumer'})
this._send(socket,this.opts.conPacket) // send a packet command on to consumer on connection
this._send(consumer,this.opts.conPacket) // send a packet command on to consumer on connection
}
this.emit('consumer-connection',{state:'connected', id:consumer.id, name:consumer.name, socketSide:true})
this.emit('status',{level:'info', msg:'a consumer connected', name:consumer.name, id:consumer.id})
// that's it. Connection is active
async function messageProcess(client, packet) {
@ -346,7 +370,7 @@ export default function socketClass(Server) {
res._header.request = clone(packet, false)
res._header.responder = { name: this.name, instanceID: this.id }
res._header.socket = this.address()
if (!res.cmd) res.cmd = 'reply' // by default return command is 'reply'
if (!res.cmd) res.cmd = this.defaultReturnCmd || 'reply' // by default return command is 'reply'
let [err] = await btc(this._send)(client,res)
if (err) log.error({msg:err, error:err})
} // end message process
@ -375,7 +399,7 @@ export default function socketClass(Server) {
}
async _send(client, packet) {
log.debug({msg:`sending to client:${client.id}`, packet:packet})
log.trace({msg:`sending to client:${client.id}`, packet:packet})
return new Promise(async (resolve, reject) => {
let [err,ser] = await btc(client.stream.serialize)(packet)
if (err) reject('unable to serialze the packet')