0.2.26
refactor the connection events, status is now log, connection event is now connection:socket or connection:consumer 'online' to just 'connected'
This commit is contained in:
parent
dcd178de7b
commit
ba112c484e
3 changed files with 53 additions and 57 deletions
|
@ -1,6 +1,6 @@
|
||||||
{
|
{
|
||||||
"name": "@uci/socket",
|
"name": "@uci/socket",
|
||||||
"version": "0.2.25",
|
"version": "0.2.26",
|
||||||
"description": "JSON packet intra(named)/inter(TCP) host communication over socket",
|
"description": "JSON packet intra(named)/inter(TCP) host communication over socket",
|
||||||
"main": "src",
|
"main": "src",
|
||||||
"scripts": {
|
"scripts": {
|
||||||
|
|
|
@ -28,13 +28,13 @@ class SocketConsumer extends Socket {
|
||||||
|
|
||||||
constructor(opts = {}) {
|
constructor(opts = {}) {
|
||||||
super()
|
super()
|
||||||
|
this.id = opts.id || opts.name || 'socket:' + new Date().getTime()
|
||||||
log = logger({
|
log = logger({
|
||||||
file: 'src/consumer.js',
|
file: 'src/consumer.js',
|
||||||
class: 'Consumer',
|
class: 'Consumer',
|
||||||
name: 'socket',
|
name: 'socket',
|
||||||
id: this.id
|
id: this.id
|
||||||
})
|
})
|
||||||
this.id = opts.id || opts.name || 'socket:' + new Date().getTime()
|
|
||||||
if (!opts.path) {
|
if (!opts.path) {
|
||||||
if(!opts.host) log.warn({method:'constructor', line:38, opts: opts, msg:'no host supplied using localhost...use named piped instead - opts.path'})
|
if(!opts.host) log.warn({method:'constructor', line:38, opts: opts, msg:'no host supplied using localhost...use named piped instead - opts.path'})
|
||||||
opts.host = opts.host || '127.0.0.1'
|
opts.host = opts.host || '127.0.0.1'
|
||||||
|
@ -52,7 +52,6 @@ class SocketConsumer extends Socket {
|
||||||
this.pingFailedTimeout = opts.pingFailedTimeout * 1000 || 10000
|
this.pingFailedTimeout = opts.pingFailedTimeout * 1000 || 10000
|
||||||
this.reconnectLimit = opts.reconnectLimit || 0
|
this.reconnectLimit = opts.reconnectLimit || 0
|
||||||
this.retryWait = opts.retryWait==null ? 5000 : opts.retryWait * 1000
|
this.retryWait = opts.retryWait==null ? 5000 : opts.retryWait * 1000
|
||||||
console.log(this.retryWait)
|
|
||||||
this.heartBeat = !!process.env.HEARTBEAT || opts.heartBeat
|
this.heartBeat = !!process.env.HEARTBEAT || opts.heartBeat
|
||||||
this.keepAlive = 'keepAlive' in opts ? opts.keepAlive : true
|
this.keepAlive = 'keepAlive' in opts ? opts.keepAlive : true
|
||||||
this.stream = new JsonStream()
|
this.stream = new JsonStream()
|
||||||
|
@ -75,24 +74,23 @@ class SocketConsumer extends Socket {
|
||||||
this._connection = state || this._connection
|
this._connection = state || this._connection
|
||||||
let opts = {state:this._connection, id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected}
|
let opts = {state:this._connection, id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected}
|
||||||
Object.assign(opts,moreOpts)
|
Object.assign(opts,moreOpts)
|
||||||
this.emit('connection',opts)
|
this.emit('connection:socket',opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
status(level='debug', msg) {
|
log(level='debug', msg) {
|
||||||
let opts = {level:level, msg:msg, id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected, state:this._connection}
|
let opts = {level:level, msg:msg, id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected, state:this._connection}
|
||||||
if (typeof msg !== 'string') Object.assign(opts,msg)
|
if (typeof msg !== 'string') Object.assign(opts,msg)
|
||||||
this.emit('status',opts)
|
this.emit('log',opts)
|
||||||
log[level](opts)
|
log[level](opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
async connect(timeout=0) {
|
async connect(timeout=0) {
|
||||||
console.log('=================================================connect=========================================================================================')
|
|
||||||
this.initTimeout = timeout > 4 ? timeout * 1000 : this.initTimeout
|
this.initTimeout = timeout > 4 ? timeout * 1000 : this.initTimeout
|
||||||
this.notify('starting')
|
this.notify('starting')
|
||||||
if (this.opts.host === '127.0.0.1'|| this.opts.host ==='localhost') {
|
if (this.opts.host === '127.0.0.1'|| this.opts.host ==='localhost') {
|
||||||
let msg ='tcp consumer on same machine as host, use named Pipe(Unix) Socket Instead'
|
let msg ='tcp consumer on same machine as host, use named Pipe(Unix) Socket Instead'
|
||||||
log.warn({method:'connect', msg:'msg'})
|
log.warn({method:'connect', msg:'msg'})
|
||||||
this.status('warn',msg)
|
this.log('warn',msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
this.once('error', errorHandler.bind(this))
|
this.once('error', errorHandler.bind(this))
|
||||||
|
@ -100,7 +98,7 @@ class SocketConsumer extends Socket {
|
||||||
|
|
||||||
super.connect(this.opts)
|
super.connect(this.opts)
|
||||||
|
|
||||||
// returns promise for initial connection with reject on timeout
|
// returns promise for initial connection when initTimeout is not zero with reject on timeout
|
||||||
if (this._first && this.initTimeout) {
|
if (this._first && this.initTimeout) {
|
||||||
|
|
||||||
let initTimeout = {}
|
let initTimeout = {}
|
||||||
|
@ -111,22 +109,23 @@ class SocketConsumer extends Socket {
|
||||||
initTimeout = setTimeout(async () => {
|
initTimeout = setTimeout(async () => {
|
||||||
this.disconnect()
|
this.disconnect()
|
||||||
let msg=`unable to connect initially to socket server in ${this.initTimeout/1000} secs, giving up no more attempts`
|
let msg=`unable to connect initially to socket server in ${this.initTimeout/1000} secs, giving up no more attempts`
|
||||||
this.status('fatal',msg)
|
this.log('fatal',msg)
|
||||||
this.notify('failed',{timeout:this.initTimeout, msg:msg})
|
this.notify('failed',{timeout:this.initTimeout, msg:msg})
|
||||||
reject({ error:msg, opts: this.opts})
|
reject({ error:msg, opts: this.opts})
|
||||||
}
|
}
|
||||||
, this.initTimeout)
|
, this.initTimeout)
|
||||||
|
|
||||||
const successHandler = (ev) => {
|
const successHandler = (ev) => {
|
||||||
if (ev.state === 'online') {
|
console.log('initial success', ev.state)
|
||||||
|
if (ev.state === 'connected') {
|
||||||
clearTimeout(initTimeout)
|
clearTimeout(initTimeout)
|
||||||
this.removeListener('connection',successHandler)
|
this.removeListener('connection',successHandler)
|
||||||
this.status('info','initial connection successfull')
|
this.log('info','initial connection successfull')
|
||||||
resolve({opts: this.opts, msg: 'initial connection successfull'})
|
resolve({opts: this.opts, msg: 'initial connection successfull'})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
this.on('connection',successHandler)
|
this.on('connection:socket',successHandler)
|
||||||
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -156,15 +155,14 @@ class SocketConsumer extends Socket {
|
||||||
this._first = false
|
this._first = false
|
||||||
if ( this.reconnectLimit === 0 || this._reconnectCount < this.reconnectLimit ) {
|
if ( this.reconnectLimit === 0 || this._reconnectCount < this.reconnectLimit ) {
|
||||||
this._reconnectCount += 1
|
this._reconnectCount += 1
|
||||||
this.status('warn',`Attempting a reconnect: attempt ${this._reconnectCount} of ${this.reconnectLimit ||'unlimited'}`)
|
this.log('warn',`Attempting a reconnect: attempt ${this._reconnectCount} of ${this.reconnectLimit ||'unlimited'}`)
|
||||||
this.connect()
|
this.connect()
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
this.status('fatal',`${this.reconnectLimit} reconnect attempts exceeded no more tries`)
|
this.log('fatal',`${this.reconnectLimit} reconnect attempts exceeded no more tries`)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
async send(ipacket) {
|
async send(ipacket) {
|
||||||
return new Promise(async resolve => {
|
return new Promise(async resolve => {
|
||||||
if (!this.active) {
|
if (!this.active) {
|
||||||
|
@ -271,12 +269,12 @@ async function connectHandler () {
|
||||||
this._doneAuthenticate = setTimeout(() =>{
|
this._doneAuthenticate = setTimeout(() =>{
|
||||||
let msg ='authentication not completed in 3 secs, forcing reconnect attempt'
|
let msg ='authentication not completed in 3 secs, forcing reconnect attempt'
|
||||||
this.notify('failed',{msg:msg})
|
this.notify('failed',{msg:msg})
|
||||||
this.status('warn',msg)
|
this.log('warn',msg)
|
||||||
this.reconnect()
|
this.reconnect()
|
||||||
},3000)
|
},3000)
|
||||||
this.on('data', this.stream.onData)
|
this.on('data', this.stream.onData)
|
||||||
this.stream.on('message', handshake.bind(this))
|
this.stream.on('message', handshake.bind(this))
|
||||||
this.status('info','in process of connecting waiting for socket handshake/authenticate')
|
this.log('info','in process of connecting waiting for socket handshake/authenticate')
|
||||||
this.notify('connecting')
|
this.notify('connecting')
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -284,7 +282,6 @@ async function handshake (packet) {
|
||||||
if (packet._handshake) {
|
if (packet._handshake) {
|
||||||
this._connected = true
|
this._connected = true
|
||||||
this.notify('handshake')
|
this.notify('handshake')
|
||||||
this.status('info','connection handshaking')
|
|
||||||
let authPacket = this._authenticate() || {}
|
let authPacket = this._authenticate() || {}
|
||||||
authPacket._authenticate = true
|
authPacket._authenticate = true
|
||||||
authPacket.clientName = this.id
|
authPacket.clientName = this.id
|
||||||
|
@ -294,20 +291,18 @@ async function handshake (packet) {
|
||||||
if (res.error) {
|
if (res.error) {
|
||||||
let msg =`error during authentication ${res.error}, attempting reconnect in ${this.retryWait/1000}s to see if error clears`
|
let msg =`error during authentication ${res.error}, attempting reconnect in ${this.retryWait/1000}s to see if error clears`
|
||||||
this.notify('error',{msg:msg})
|
this.notify('error',{msg:msg})
|
||||||
this.status('error',msg)
|
this.log('error',msg)
|
||||||
this._errorRetry = setTimeout(this.reconnect.bind(this),this.retryWait)
|
this._errorRetry = setTimeout(this.reconnect.bind(this),this.retryWait)
|
||||||
} else {
|
} else {
|
||||||
if (!res.authenticated) {
|
if (!res.authenticated) {
|
||||||
let msg =`authentication failed: ${res.reason}, disconnecting with no reconnect`
|
let msg =`authentication failed: ${res.reason}, disconnecting with no reconnect`
|
||||||
this.notify('failed',{msg:msg})
|
this.notify('failed',{msg:msg})
|
||||||
this.status('fatal',msg)
|
|
||||||
this.disconnect()
|
this.disconnect()
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
this._authenticated = res.authenticated
|
this._authenticated = res.authenticated
|
||||||
let msg ='authentication succeeded connection ready'
|
let msg ='authentication succeeded connection ready'
|
||||||
this.status('info',msg)
|
this.notify('connected',{msg:msg})
|
||||||
this.notify('online',{msg:msg})
|
|
||||||
this._reconnectCount = 0
|
this._reconnectCount = 0
|
||||||
this.stream.on('message', messageHandler.bind(this)) // reset default message handler
|
this.stream.on('message', messageHandler.bind(this)) // reset default message handler
|
||||||
this.on('pushed', pushHandler.bind(this) )
|
this.on('pushed', pushHandler.bind(this) )
|
||||||
|
@ -326,7 +321,7 @@ async function handshake (packet) {
|
||||||
function messageHandler(packet) {
|
function messageHandler(packet) {
|
||||||
if (packet._header.id !== 'ping') { // ping has it's own listner
|
if (packet._header.id !== 'ping') { // ping has it's own listner
|
||||||
let obj = { msg:'incoming packet from socket sever',packet:packet}
|
let obj = { msg:'incoming packet from socket sever',packet:packet}
|
||||||
this.status('trace',obj)
|
this.log('trace',obj)
|
||||||
}
|
}
|
||||||
this.emit(packet._header.id, packet)
|
this.emit(packet._header.id, packet)
|
||||||
}
|
}
|
||||||
|
@ -334,9 +329,9 @@ function messageHandler(packet) {
|
||||||
// assume all errors are fatal and the socket needs to be disconnected/reconnected
|
// assume all errors are fatal and the socket needs to be disconnected/reconnected
|
||||||
async function errorHandler (err) {
|
async function errorHandler (err) {
|
||||||
this.disconnect()
|
this.disconnect()
|
||||||
let msg = {error:err, msg:`error, socket has been disconnected, trying reconnect in ${this.retryWait/1000} secs`}
|
let msg = `error, socket has been disconnected, trying reconnect in ${this.retryWait/1000} secs`
|
||||||
this.notify('error',{error:err,msg:msg})
|
this.notify('error',{error:err,msg:msg})
|
||||||
this.status('error',msg)
|
this.log('error',msg)
|
||||||
this._errorRetry = setTimeout(this.reconnect.bind(this),this.retryWait)
|
this._errorRetry = setTimeout(this.reconnect.bind(this),this.retryWait)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -344,7 +339,7 @@ async function errorHandler (err) {
|
||||||
async function pushHandler (packet) {
|
async function pushHandler (packet) {
|
||||||
// TODO do some extra security here?
|
// TODO do some extra security here?
|
||||||
let msg = {msg:'packed was pushed from socket sever, processing', packet:packet}
|
let msg = {msg:'packed was pushed from socket sever, processing', packet:packet}
|
||||||
this.status('trace',msg)
|
this.log('trace',msg)
|
||||||
let res = await this._packetProcess(packet)
|
let res = await this._packetProcess(packet)
|
||||||
if (!res) {
|
if (!res) {
|
||||||
// if process was not promise returning then res will be undefined
|
// if process was not promise returning then res will be undefined
|
||||||
|
@ -366,23 +361,23 @@ async function pingFailedHandler () {
|
||||||
this.removeAllListeners('ping')
|
this.removeAllListeners('ping')
|
||||||
this.on('ping',pingHandler.bind(this))
|
this.on('ping',pingHandler.bind(this))
|
||||||
let msg = 'ping has been received again, back to normal connection'
|
let msg = 'ping has been received again, back to normal connection'
|
||||||
this.notify('online',{msg:msg})
|
this.notify('connected',{msg:msg})
|
||||||
this.status('info',msg)
|
this.log('info',msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
function monitorPing () {
|
function monitorPing () {
|
||||||
this._ping = setTimeout( () => {
|
this._ping = setTimeout( () => {
|
||||||
this.removeAllListeners('ping')
|
this.removeAllListeners('ping')
|
||||||
let msg = `ping failed waiting ${this.pingFailedTimeout/1000} secs to come back online before forced retry`
|
let msg = `ping failed waiting ${this.pingFailedTimeout/1000} secs to before forced reconnect`
|
||||||
this.notify('offline',{msg:msg})
|
this.notify('offline',{msg:msg})
|
||||||
this.status('warn',msg)
|
this.log('warn',msg)
|
||||||
this.on('ping',pingFailedHandler.bind(this))
|
this.on('ping',pingFailedHandler.bind(this))
|
||||||
this._pingFailed = setTimeout (() => {
|
this._pingFailed = setTimeout (() => {
|
||||||
this.removeAllListeners('ping')
|
this.removeAllListeners('ping')
|
||||||
let msg =`no ping received for ${this.pingFailedTimeout/1000} secs, force disconnect/reconnect`
|
let msg =`no ping received for ${this.pingFailedTimeout/1000} secs, force disconnect/reconnect`
|
||||||
this.notify('failed',{msg:msg})
|
this.notify('failed',{msg:msg})
|
||||||
this.status('warn',msg)
|
this.log('warn',msg)
|
||||||
this.reconnect()
|
this.reconnect()
|
||||||
// this.emit('error',{code:'PING_FAILED'})
|
// this.emit('error',{code:'PING_FAILED'})
|
||||||
}, this.pingFailedTimeout)
|
}, this.pingFailedTimeout)
|
||||||
|
|
|
@ -131,19 +131,20 @@ export default function socketClass(Server) {
|
||||||
this.errors.push(err)
|
this.errors.push(err)
|
||||||
if(this.errorCount>2 && this.errorCount<6) {
|
if(this.errorCount>2 && this.errorCount<6) {
|
||||||
let errors= {level:'warn',msg:'something bad maybe going on, 3 errors', errors:this.errors}
|
let errors= {level:'warn',msg:'something bad maybe going on, 3 errors', errors:this.errors}
|
||||||
this.emit('status', errors)
|
this.emit('log', errors)
|
||||||
log.error(errors)
|
log.error(errors)
|
||||||
}
|
}
|
||||||
if(this.errorCount>5) {
|
if(this.errorCount>5) {
|
||||||
let errors = {level:'fatal',msg:'something fatal is going on, 6 errors', errors:this.errors}
|
let errors = {level:'fatal',msg:'something fatal is going on, 6 errors', errors:this.errors}
|
||||||
log.fatal(errors)
|
log.fatal(errors)
|
||||||
this.listening=false
|
this.listening=false
|
||||||
this.emit('status', errors)
|
this.emit('log', errors)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
log.info({method:'create', line:54, msg:'socket server created and listening at', address:this.address()})
|
let obj = {method:'create', line:54, msg:'socket server created and listening at', address:this.address()}
|
||||||
|
log.info(obj)
|
||||||
this.on('connection', this._connectionHandler.bind(this))
|
this.on('connection', this._connectionHandler.bind(this))
|
||||||
this.emit('status',{active:this.active})
|
this.emit('log:',)
|
||||||
resolve(`socket ready and listening at ${this.address().address}:${this.address().port}`)
|
resolve(`socket ready and listening at ${this.address().address}:${this.address().port}`)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -234,36 +235,37 @@ export default function socketClass(Server) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO won't need this if moving to a Map
|
// TODO won't need this if moving to a Map
|
||||||
getClientIndex(id) {
|
getClientIndex(sid) {
|
||||||
return this.clients.findIndex(client => {return client.id === id })
|
return this.clients.findIndex(client => {return client.sid === sid })
|
||||||
}
|
}
|
||||||
|
|
||||||
getClient(id) {
|
getClient(sid) {
|
||||||
return this.clients[this._getClientIndex(id)]
|
return this.clients[this._getClientIndex(sid)]
|
||||||
}
|
}
|
||||||
|
|
||||||
removeClient (id) {
|
removeClient (sid) {
|
||||||
let index = this.getClientIndex(id)
|
let index = this.getClientIndex(sid)
|
||||||
let client=this.clients[index]
|
let client=this.clients[index]
|
||||||
this.emit('status',{level:'info', msg:'a consumer disconnected', name:client.name, id:client.id})
|
this.emit('log',{level:'info', msg:'a consumer disconnected', name:client.name, id:client.id})
|
||||||
|
this.emit('connection:consumer',{state:'disconnected', msg:'a consumer disconnected', name:client.name, id:client.id})
|
||||||
client.removeAllListeners()
|
client.removeAllListeners()
|
||||||
client.stream.removeAllListeners()
|
client.stream.removeAllListeners()
|
||||||
this.clients.splice(index,1)
|
this.clients.splice(index,1)
|
||||||
log.warn({msg:'client removed from tracking',id:id, curClientCount:this.clients.length})
|
log.warn({msg:'consumer removed from tracking',sid:sid, curClientCount:this.clients.length})
|
||||||
}
|
}
|
||||||
|
|
||||||
async authenticateClient(client) {
|
async authenticateClient(client) {
|
||||||
return new Promise(async (resolve, reject) => {
|
return new Promise(async (resolve, reject) => {
|
||||||
// when consumer gets the handshake they must follow with authentication
|
// when consumer gets the handshake they must follow with authentication
|
||||||
client.stream.on('message', authenticate.bind(this,client))
|
client.stream.on('message', authenticate.bind(this,client))
|
||||||
let [err] = await btc(this._send)(client,{_handshake: true, id:client.id})
|
let [err] = await btc(this._send)(client,{_handshake: true, sid:client.sid})
|
||||||
if (err) {
|
if (err) {
|
||||||
log.error({msg:'error in handshake send', error:err})
|
log.error({msg:'error in handshake send', error:err})
|
||||||
reject(err)
|
reject(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
async function authenticate (client,packet) {
|
async function authenticate (client,packet) {
|
||||||
log.debug({msg:`authentication packet from client ${client.id}`, packet:packet})
|
log.debug({msg:`authentication packet from client ${client.name}:${client.id}:${client.sid}`, packet:packet})
|
||||||
client.stream.removeAllListeners('message')
|
client.stream.removeAllListeners('message')
|
||||||
if (!packet._authenticate) reject('first client packet was not authentication')
|
if (!packet._authenticate) reject('first client packet was not authentication')
|
||||||
else {
|
else {
|
||||||
|
@ -275,7 +277,7 @@ export default function socketClass(Server) {
|
||||||
log.debug({msg:'sending authorization result to client', packet:packet})
|
log.debug({msg:'sending authorization result to client', packet:packet})
|
||||||
await this._send(client,packet) // send either way
|
await this._send(client,packet) // send either way
|
||||||
if (err && !this.allowAnonymous) {
|
if (err && !this.allowAnonymous) {
|
||||||
log.info({msg:'client authentication failed', client:client.name, client_id:client.id, reason:err})
|
log.info({msg:'client authentication failed', client:client.name, client_sid:client.sid, reason:err})
|
||||||
reject(packet.reason)
|
reject(packet.reason)
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
|
@ -311,13 +313,6 @@ export default function socketClass(Server) {
|
||||||
consumer.stream = stream
|
consumer.stream = stream
|
||||||
|
|
||||||
// add listeners
|
// add listeners
|
||||||
const clientCloseHandler = (id) => {
|
|
||||||
log.warn({msg:'client connection closed during listen,',id:id})
|
|
||||||
this.removeClient(id)
|
|
||||||
}
|
|
||||||
|
|
||||||
consumer.on('close', clientCloseHandler.bind(this,consumer.id) )
|
|
||||||
|
|
||||||
consumer.on('error', (err) => {
|
consumer.on('error', (err) => {
|
||||||
log.error({msg:'client connection error',error:err})
|
log.error({msg:'client connection error',error:err})
|
||||||
// TODO do more handling than just logging
|
// TODO do more handling than just logging
|
||||||
|
@ -341,12 +336,17 @@ export default function socketClass(Server) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// authenticated consumer add to list of clients
|
// authenticated consumer, add to list of clients
|
||||||
consumer.id = ++this.nextClientID // server assigned ID
|
consumer.sid = ++this.nextClientID // server assigned ID
|
||||||
consumer.socketSide = true
|
consumer.socketSide = true
|
||||||
consumer.authenticated = true
|
consumer.authenticated = true
|
||||||
this.clients.push(consumer) // add current consumer to clients
|
this.clients.push(consumer) // add current consumer to clients
|
||||||
consumer.setKeepAlive(this.keepAlive,30)
|
consumer.setKeepAlive(this.keepAlive,30)
|
||||||
|
const clientCloseHandler = (sid) => {
|
||||||
|
log.warn({msg:'consumer connection was closed',sid:sid})
|
||||||
|
this.removeClient(sid)
|
||||||
|
}
|
||||||
|
consumer.on('close', clientCloseHandler.bind(this,consumer.sid))
|
||||||
log.debug({method:'_listen', line:364, msg:'new consumer connected/authenticated', cname:consumer.name, cid:consumer.id, totalConsumers:this.clients.length})
|
log.debug({method:'_listen', line:364, msg:'new consumer connected/authenticated', cname:consumer.name, cid:consumer.id, totalConsumers:this.clients.length})
|
||||||
// all's set enable main incoming message processor
|
// all's set enable main incoming message processor
|
||||||
stream.on('message', messageProcess.bind(this, consumer))
|
stream.on('message', messageProcess.bind(this, consumer))
|
||||||
|
@ -357,7 +357,8 @@ export default function socketClass(Server) {
|
||||||
this._send(consumer,this.opts.conPacket) // send a packet command on to consumer on connection
|
this._send(consumer,this.opts.conPacket) // send a packet command on to consumer on connection
|
||||||
}
|
}
|
||||||
|
|
||||||
this.emit('status',{level:'info', msg:'a consumer connected and authenticated', name:consumer.name, id:consumer.id})
|
this.emit('log',{level:'info', msg:'a consumer connected and authenticated', name:consumer.name, id:consumer.id})
|
||||||
|
this.emit('connection:consumer',{state:'connected', msg:'a consumer connected and authenticated', name:consumer.name, id:consumer.id})
|
||||||
|
|
||||||
// that's it. Connection is active
|
// that's it. Connection is active
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue