diff --git a/package.json b/package.json index 2de1751..15f7afe 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@uci/socket", - "version": "0.2.23", + "version": "0.2.24", "description": "JSON packet intra(named)/inter(TCP) host communication over socket", "main": "src", "scripts": { @@ -11,6 +11,7 @@ "s": "node -r esm examples/server", "devs": "UCI_ENV=dev ./node_modules/.bin/nodemon -r esm examples/server", "devs:anon": "UCI_ANON=true npm run devs", + "devs:anon:debug": "UCI_LOG_LEVEL=debug npm run devs:anon", "devs:debug": "UCI_LOG_LEVEL=debug npm run devs", "client": "node -r esm examples/client", "devc": "UCI_ENV=dev ./node_modules/.bin/nodemon -r esm examples/client", diff --git a/src/consumer.js b/src/consumer.js index 4f3fe43..a199546 100644 --- a/src/consumer.js +++ b/src/consumer.js @@ -1,7 +1,7 @@ import { Socket } from 'net' import path from 'path' import { promisify } from 'util' -import pause from 'delay' +// import pause from 'delay' import btc from 'better-try-catch' import JsonStream from './json-stream' @@ -48,128 +48,152 @@ class SocketConsumer extends Socket { this.opts = opts // default is keepAlive true, must set to false to explicitly disable // if keepAlive is true then consumer will also be reconnecting consumer - this.initTimeout = opts.initTimeout==null ? 60000 : opts.initTimeout * 1000 + this.initTimeout = opts.initTimeout > 4 ? opts.initTimeout * 1000 : null + this.pingFailedTimeout = opts.pingFailedTimeout * 1000 || 10000 + this.reconnectLimit = opts.reconnectLimit || 0 this.retryWait = opts.retryWait==null ? 5000 : opts.retryWait * 1000 + console.log(this.retryWait) + this.heartBeat = !!process.env.HEARTBEAT || opts.heartBeat this.keepAlive = 'keepAlive' in opts ? opts.keepAlive : true - this._connected = false - this._authenticated = false this.stream = new JsonStream() // bind to class for other class functions this.connect = this.connect.bind(this) this.close = promisify(this.end).bind(this) - // this.__ready = this.__ready.bind(this) - this._conAttempt = 1 - this._aborted = false - this._reconnect = false - this.retryPause = {} // timeout that may need to be cancelled if init timeout throws - // this._packetProcess = this._packetProcess.bind(this) + this._reconnectCount = 0 + this._connected = false + this._authenticated = false + this._connection = 'offline' + this._first = true // first connection or not + this._pingTimeout // value sent from socket upon connect } get connected() { return this._connected} get active() { return !!this._authenticated } + get connection() { return this._connection } - async connect() { - return new Promise((resolve, reject) => { + notify(state, moreOpts={}) { + this._connection = state || this._connection + let opts = {state:this._connection, id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected} + Object.assign(opts,moreOpts) + this.emit('connection',opts) + } - if (this.opts.host === '127.0.0.1'|| this.opts.host ==='localhost') - log.warn({method:'connect', line:107, msg:'tcp consumer on same machine as host, use named Pipe(Unix) Socket Instead'}) + status(level='debug', msg) { + let opts = {level:level, msg:msg, id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected, state:this._connection} + if (typeof msg !== 'string') Object.assign(opts,msg) + this.emit('status',opts) + log[level](opts) + } - log.debug('first connnect attempt for', this.opts.id) - this.emit('status',{level:'info', msg:'attempting initial connection', id:this.id, opts:this.opts, ready:false}) + async connect(timeout=0) { + console.log('=================================================connect=========================================================================================') + this.initTimeout = timeout > 4 ? timeout * 1000 : this.initTimeout + this.notify('starting') + if (this.opts.host === '127.0.0.1'|| this.opts.host ==='localhost') { + let msg ='tcp consumer on same machine as host, use named Pipe(Unix) Socket Instead' + log.warn({method:'connect', msg:'msg'}) + this.status('warn',msg) + } - console.log('TIMEOUT IN SOCKE CONNECT',this.initTimeout) + this.once('error', errorHandler.bind(this)) + this.once('connect',connectHandler.bind(this)) + + super.connect(this.opts) + + // returns promise for initial connection with reject on timeout + if (this._first && this.initTimeout) { let initTimeout = {} - if (this.initTimeout > 499) { - initTimeout = setTimeout(() => { - clearTimeout(this.retryPause) - this.emit('status',{level:'error', msg:'initial connection timed out', id:this.id, timeout:this.initTimeout, wait:this.retryWait, opts:this.opts, ready:false}) - this.removeAllListeners() - log.fatal({method:'connect', line:69, opts: this.opts, msg:`unable to initially connect to ${this.opts.name} in ${this.initTimeout/1000} secs no more attempts!`}) - this.stream.removeAllListeners() - reject({ opts: this.opts, msg: `unable to connect initially to socket server in ${this.initTimeout/1000} secs, giving up no more attempts`}) + this._first=false + + return new Promise((resolve, reject) => { + + initTimeout = setTimeout(async () => { + this.disconnect() + let msg=`unable to connect initially to socket server in ${this.initTimeout/1000} secs, giving up no more attempts` + this.status('fatal',msg) + this.notify('failed',{timeout:this.initTimeout, msg:msg}) + reject({ error:msg, opts: this.opts}) } , this.initTimeout) - } - const initialHandshake = async (packet) => { - - if (packet._handshake) { - clearTimeout(initTimeout) - this._connected = true - let authPacket = this._authenticate() || {} - authPacket._authenticate = true - authPacket.clientName = this.id - let res = (await this._authenticateSend(authPacket)) || {} - if (!res.authenticated) { - this.emit('status',{level:'info', msg:`authentication failed: ${res.reason}`, id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected}) - reject('unable to authenticate') - } - else { - this._authenticated = res.authenticated - this.removeListener('error',initialErrorHandler) - this._listen() // setup for active connection - log.info({method:'connect', line:87, msg:'initial connect/authentication complete ready for communication'}) - this.emit('status',{level:'info', msg:'authentication succesfull', id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected}) - this.emit('consumer-connection',{state:'connected',name:this.id}) - if (this.opts.conPacket) (this.send(this.conPacket)) - resolve('initial connection successful') + const successHandler = (ev) => { + if (ev.state === 'online') { + clearTimeout(initTimeout) + this.removeListener('connection',successHandler) + this.status('info','initial connection successfull') + resolve({opts: this.opts, msg: 'initial connection successfull'}) } } - } + + this.on('connection',successHandler) + + }) + } + this._first=false + return 'connection in progress' + + } // end connect - const initialConnectHandler = async () => { - this.on('data', this.stream.onData) - this.stream.once('message', initialHandshake.bind(this)) - log.debug({method:'connect', line:113, msg:'connected waiting for socket ready handshake'}) - this.emit('status',{level:'debug', msg:'consumer connected'}) - } - - const initialErrorHandler = async (err) => { - let msg = {level:'error', method:'connect', line:101, error:err, msg:`error during initial connect, trying again in ${this.retryWait/1000} secs`} - log.error(msg) - this.emit('status',msg) - let connect = () => { super.connect(this.opts)} - this.retryPause = setTimeout(connect.bind(this),this.retryWait) - } - - - this.once('connect', initialConnectHandler) - this.on('error', initialErrorHandler) - super.connect(this.opts) - - }) // end initial promise + // manual disonnect + async disconnect() { + clearTimeout(this._errorRetry) + this.removeAllListeners('ping') + this.removeAllListeners('connect') + this.removeAllListeners('error') + this.removeAllListeners('data') + this.removeAllListeners('pushed') + this.stream.removeAllListeners() + this._connected=false + this._authenticated=false + this.notify('disconnected') + this._first = true } + async reconnect () { + if (this._connected) this.disconnect() + this._first = false + if ( this.reconnectLimit === 0 || this._reconnectCount < this.reconnectLimit ) { + this._reconnectCount += 1 + this.status('warn',`Attempting a reconnect: attempt ${this._reconnectCount} of ${this.reconnectLimit ||'unlimited'}`) + this.connect() + } + else { + this.status('fatal',`${this.reconnectLimit} reconnect attempts exceeded no more tries`) + } + } + + async send(ipacket) { return new Promise(async resolve => { - if (!this._connected) { + if (!this.active) { resolve({ error: 'socket consumer not connected, aborting send' }) - } - let packet = Object.assign({}, ipacket) // need to avoid mutuation for different consumers using same packet instance - setTimeout(() => {resolve({ error: 'no response from socket in 10sec' })}, 10000) - packet._header = { - id: Math.random() - .toString() - .slice(2), // need this for when multiple sends for different consumers use same packet instanceack - sender: { name: this.name, instanceID: this.id }, - path: this.opts.path, - port: this.opts.port, - host: this.opts.host - } - let [err, res] = await btc(this.stream.serialize)(packet) - if (err) - resolve({error: 'unable to serialize packet for sending',packet: packet}) - await this.__write(res) - this.once(packet._header.id, async function(reply) { - let res = await this._packetProcess(reply) - if (!res) { // if packetProcess was not promise - res = reply - log.debug({method:'send', line:180, msg:'consumer function was not promise returning further processing may be out of sequence'}) + } else { + let packet = Object.assign({}, ipacket) // need to avoid mutuation for different consumers using same packet instance + setTimeout(() => {resolve({ error: 'no response from socket in 10sec' })}, 10000) + packet._header = { + id: Math.random() + .toString() + .slice(2), // need this for when multiple sends for different consumers use same packet instanceack + sender: { name: this.name, instanceID: this.id }, + path: this.opts.path, + port: this.opts.port, + host: this.opts.host } - resolve(res) // resolves processed packet not return packet - }) //end listener + let [err, res] = await btc(this.stream.serialize)(packet) + if (err) + resolve({error: 'unable to serialize packet for sending',packet: packet}) + await this.__write(res) // write errors will be caught by socket error listener and result in reconnect + this.once(packet._header.id, async function(reply) { + let res = await this._packetProcess(reply) + if (!res) { // if packetProcess was not promise + res = reply + log.debug({method:'send', line:180, msg:'consumer function was not promise returning further processing may be out of sequence'}) + } + resolve(res) // resolves processed packet not return packet + }) //end listener + } }) } @@ -184,6 +208,12 @@ class SocketConsumer extends Socket { this._authenticate = func } + // stream needs an .onData method and will be stream bound handler for data event + resgisterStreamProcessor (func) { + this.stream = func + } + + // PRIVATE METHODS // default authentication using a simple token @@ -194,123 +224,30 @@ class SocketConsumer extends Socket { async _authenticateSend (authPacket={}) { return new Promise(async resolve => { setTimeout(() => {resolve({ error: 'no response from socket in 10sec' })}, 10000) - let [err, res] = await btc(this.stream.serialize)(authPacket) + let [err, serPacket] = await btc(this.stream.serialize)(authPacket) if (err) resolve({error: 'unable to serialize packet for sending',packet: authPacket}) - this.stream.on('message',(res) => { - this.stream.removeAllListeners('message') + this.stream.once('message',(res) => { resolve(res) - }) - await this.__write(res) - - + let res = await this.__write(serPacket) + if (res.error) resolve(res) + // otherwise wait for message listener above to return }) } - // set up incoming message listening and error/reonnect handling - async _listen() { - // Define Handlers and Other Functions - const reconnectHandler = () => { - this.stream.once('message', handshake.bind(this)) - log.debug({method:'connect', line:113, msg:'connected waiting for socket ready handshake'}) - this.emit('status',{level:'info', msg:'attemping reconnect', id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected}) - } - - const handshake = async (packet) => { - if (packet._handshake) { - this._connected = true - let authPacket = this._authenticate() || {} - authPacket._authenticate = true - authPacket.clientName = this.id - let res = (await this._authenticateSend(authPacket)) || {} - if (!res.authenticated) { - this.emit('status',{level:'error', msg:`authentication failed: ${res.reason}`, id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected}) - this.emit('error',{code:'authentification failed'}) - } - else { - this._authenticated = res.authenticated - log.info({method:'connect', line:87, msg:'initial connect/authentication complete ready for communication'}) - this.emit('status',{level:'info', msg:'authentication successful', id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected}) - if (this.keepAlive) { // only attempt reconnect if keepAlive is set which it is by default - this.on('ping',pingHandler) - this.setKeepAlive(this.keepAlive,3000) // keep connection alive unless disabled - } - this.stream.on('message', messageHandler.bind(this)) // reset default message handler - this.emit('status',{level:'info', msg:'reconnected', id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected}) - this.emit('consumer-connection', {state:'reconnected', name:this.id}) - if (this.opts.conPacket) (this.send(this.conPacket)) - } - } - } - - const errorHandler = async (err) => { - log.debug({msg:'connection error emitted ', error:err}) - this._connected = false - this._authenticated = false - this.emit('status',{level:'error', msg:'connection error', id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected}) - log.debug({ method:'connect', line:130, error: err.code, msg:`connect error ${err.code}, attempting reconnect after ${this.retryWait/1000} secs`}) - this.emit('consumer-connection', {state:'disconnected', name:this.id}) - await pause(this.retryWait) - this.stream.removeAllListeners('message') // remove regular message handler in prep for reconnect - this.removeAllListeners('connect') - this.removeAllListeners('ping') - this.once('connect',reconnectHandler) - super.connect(this.opts) - } - - const pushHandler = async (packet) => { - // TODO do some extra security here? - log.debug('packed was pushed from socket sever, processing',packet) - let res = await this._packetProcess(packet) - if (!res) { - // if process was not promise returning then res will be undefined - log.debug('consumer packet processing function was not promise returning') - } - } - - const pingHandler = async (packet) => { - clearTimeout(pingTimeout) - log.trace({method:'connect', line:191, msg:'received ping, restting timeout'}) - this._pingTimeout= packet.pingInterval + 1000 - monitorPing.call(this) - } - - let pingTimeout = {} - function monitorPing () { - pingTimeout = setTimeout( () => { - log.error({method:'connect', line:142, msg:'socket (server) not availabe'}) - this.removeAllListeners('ping') - this._connected = false - this.emit('error', { code: 'PING-FAILED' }) - },this._pingTimeout) - } - - // general handler - function messageHandler(packet) { - if (packet._header.id !== 'ping') log.debug('incoming packet from socket sever',packet) - this.emit(packet._header.id, packet) - } - - // Start Message Listening and Error/Reconnect Handling - log.debug('listening for incoming packets from socket') - this.stream.on('message', messageHandler.bind(this)) - this.setKeepAlive(this.keepAlive,3000) // keep connection alive unless disabled - this.on('pushed', pushHandler ) - this.on('error', errorHandler) - if (this.keepAlive) { // keepAlive also activates ping Monitor - this.on('ping',pingHandler) - } - } - async __write(packet) { // timeout already set if sockect can't be drained in 10 secs return new Promise(resolve => { const cb = () => resolve('packet written to consumer side socket stream ') - if (!super.write(packet)) { - this.once('drain', cb) - } else { - process.nextTick(cb) + try { + if (!super.write(packet)) { + this.once('drain', cb) + } else { + process.nextTick(cb) + } + } catch (err){ + resolve({error:`error during write to socket - ${err.code}`, err:err}) } }) } @@ -325,3 +262,129 @@ class SocketConsumer extends Socket { } // end class export default SocketConsumer + + +// CONNECTION HANDLERS + +async function connectHandler () { + this.removeAllListeners('error') // turn off error handler during handshake + this._doneAuthenticate = setTimeout(() =>{ + let msg ='authentication not completed in 3 secs, forcing reconnect attempt' + this.notify('failed',{msg:msg}) + this.status('warn',msg) + this.reconnect() + },3000) + this.on('data', this.stream.onData) + this.stream.on('message', handshake.bind(this)) + this.status('info','in process of connecting waiting for socket handshake/authenticate') + this.notify('connecting') +} + +async function handshake (packet) { + if (packet._handshake) { + this._connected = true + this.notify('handshake') + this.status('info','connection handshaking') + let authPacket = this._authenticate() || {} + authPacket._authenticate = true + authPacket.clientName = this.id + let res = await this._authenticateSend(authPacket) + this.stream.removeAllListeners('message') + clearTimeout(this._doneAuthenticate) + if (res.error) { + let msg =`error during authentication ${res.error}, attempting reconnect in ${this.retryWait/1000}s to see if error clears` + this.notify('error',{msg:msg}) + this.status('error',msg) + this._errorRetry = setTimeout(this.reconnect.bind(this),this.retryWait) + } else { + if (!res.authenticated) { + let msg =`authentication failed: ${res.reason}, disconnecting with no reconnect` + this.notify('failed',{msg:msg}) + this.status('fatal',msg) + this.disconnect() + } + else { + this._authenticated = res.authenticated + let msg ='authentication succeeded connection ready' + this.status('info',msg) + this.notify('online',{msg:msg}) + this._reconnectCount = 0 + this.stream.on('message', messageHandler.bind(this)) // reset default message handler + this.on('pushed', pushHandler.bind(this) ) + this.once('error', errorHandler.bind(this)) // listen for errors on authenticated socket + if (this.keepAlive) { // only attempt reconnect if keepAlive is set which it is by default + this.on('ping',pingHandler.bind(this)) + this.setKeepAlive(this.keepAlive,3000) // keep connection alive unless disabled + } + if (this.opts.conPacket) (this.send(this.conPacket)) + } + } + } +} + +// general message handler handler +function messageHandler(packet) { + if (packet._header.id !== 'ping') { // ping has it's own listner + let obj = { msg:'incoming packet from socket sever',packet:packet} + this.status('trace',obj) + } + this.emit(packet._header.id, packet) +} + +// assume all errors are fatal and the socket needs to be disconnected/reconnected +async function errorHandler (err) { + this.disconnect() + let msg = {error:err, msg:`error, socket has been disconnected, trying reconnect in ${this.retryWait/1000} secs`} + this.notify('error',{error:err,msg:msg}) + this.status('error',msg) + this._errorRetry = setTimeout(this.reconnect.bind(this),this.retryWait) +} + +// const pushHandler = async (packet) => { +async function pushHandler (packet) { + // TODO do some extra security here? + let msg = {msg:'packed was pushed from socket sever, processing', packet:packet} + this.status('trace',msg) + let res = await this._packetProcess(packet) + if (!res) { + // if process was not promise returning then res will be undefined + log.debug('consumer packet processing function was not promise returning') + } +} + +// const pingHandler = async (packet) => { +async function pingHandler (packet) { + if (this.heartBeat) console.log('lub dub') + clearTimeout(this._ping) + this._pingTimeout= (packet.pingInterval || 5000) + 1000 // set value from server + monitorPing.call(this) +} + +async function pingFailedHandler () { + clearTimeout(this._pingFailed) + clearTimeout(this._ping) // clear any others set + this.removeAllListeners('ping') + this.on('ping',pingHandler.bind(this)) + let msg = 'ping has been received again, back to normal connection' + this.notify('online',{msg:msg}) + this.status('info',msg) +} + + +function monitorPing () { + this._ping = setTimeout( () => { + this.removeAllListeners('ping') + let msg = `ping failed waiting ${this.pingFailedTimeout/1000} secs to come back online before forced retry` + this.notify('offline',{msg:msg}) + this.status('warn',msg) + this.on('ping',pingFailedHandler.bind(this)) + this._pingFailed = setTimeout (() => { + this.removeAllListeners('ping') + let msg =`no ping received for ${this.pingFailedTimeout/1000} secs, force disconnect/reconnect` + this.notify('failed',{msg:msg}) + this.status('warn',msg) + this.reconnect() + // this.emit('error',{code:'PING_FAILED'}) + }, this.pingFailedTimeout) + },this._pingTimeout) +} diff --git a/src/json-stream.js b/src/json-stream.js index c379a98..7b4baf6 100644 --- a/src/json-stream.js +++ b/src/json-stream.js @@ -18,8 +18,19 @@ class JsonStream extends EventEmitter { this._delimeter = opts.delimiter || '#' this.onData = this.onData.bind(this) this.serialize = this.serialize.bind(this) + this._state = 'online' + this._queue = [] } + get state () {return this._state} + offline () { this._state = 'offline' } + pause () {this._state = 'paused'} // queue messages in handler + resume () { + // emit the messages in the queue + this._state='online' + } + online() {this._state = 'online'} + onData(data) { data = decoder.write(data) try { @@ -91,7 +102,11 @@ class JsonStream extends EventEmitter { throw err } message = message || {} - this.emit('message', message) + // console.log('stream message', message, this._state) + if (this._stream ==='pause') { + if (message._header.id !== 'ping') this.queue.shift(message) + } + if(this._state==='online') this.emit('message', message) } } diff --git a/src/socket-class.js b/src/socket-class.js index 48283e9..70da3be 100644 --- a/src/socket-class.js +++ b/src/socket-class.js @@ -138,7 +138,6 @@ export default function socketClass(Server) { let errors = {level:'fatal',msg:'something fatal is going on, 6 errors', errors:this.errors} log.fatal(errors) this.listening=false - this.emit('status', {active:this.active}) this.emit('status', errors) } }) @@ -246,7 +245,7 @@ export default function socketClass(Server) { removeClient (id) { let index = this.getClientIndex(id) let client=this.clients[index] - this.emit('consumer-connection',{state:'disconnected', id:client.id, name:client.name, socketSide:true}) + this.emit('status',{level:'info', msg:'a consumer disconnected', name:client.name, id:client.id}) client.removeAllListeners() client.stream.removeAllListeners() this.clients.splice(index,1) @@ -254,8 +253,7 @@ export default function socketClass(Server) { } async authenticateClient(client) { - // let server = this - return new Promise(async function(resolve, reject) { + return new Promise(async (resolve, reject) => { // when consumer gets the handshake they must follow with authentication client.stream.on('message', authenticate.bind(this,client)) let [err] = await btc(this._send)(client,{_handshake: true, id:client.id}) @@ -281,13 +279,13 @@ export default function socketClass(Server) { reject(packet.reason) } else { - log.info({msg:'client authenticated successfuly', client:client.name, client_id:client.id}) - if (this.allowAnonymous) log.warn({msg:'socket consumer connected anonymously', consumer:client.name, consumer_id:client.id}) + log.info({msg:'client authenticated successfuly', client:client.name}) + if (this.allowAnonymous) log.warn({msg:'socket consumer connected anonymously', consumer:client.name}) resolve(client.authenticated) } } } - }.bind(this)) + }) } // private methods @@ -306,15 +304,11 @@ export default function socketClass(Server) { } + // async _connectionHandler({consumer, server}) { // this gets called for each client connection and is unique to async _connectionHandler(consumer) { // this gets called for each client connection and is unique to each - consumer.id = ++this.nextClientID // server assigned ID - log.debug({method:'_listen', line:167, msg:'new consumer connecting', id:consumer.id, totalConsumers:this.clients.length}) - consumer.socketSide = true - consumer.authenticated = false - this.clients.push(consumer) // add client to list + const stream = new JSONStream() consumer.stream = stream - consumer.setKeepAlive(this.keepAlive,3000) // add listeners const clientCloseHandler = (id) => { @@ -325,7 +319,7 @@ export default function socketClass(Server) { consumer.on('close', clientCloseHandler.bind(this,consumer.id) ) consumer.on('error', (err) => { - log.error({msg:'client connection error during listen',error:err}) + log.error({msg:'client connection error',error:err}) // TODO do more handling than just logging }) @@ -336,16 +330,25 @@ export default function socketClass(Server) { // TODO do more handling than just logging }) + consumer.authenticated = true + let [err] = await btc(this.authenticateClient)(consumer) if (!this.allowAnonymous) { if (err) { + consumer.removeAllListeners() + consumer.stream.removeAllListeners() consumer.end()// abort new connection consumer, cleanup, remove listeners - this.removeClient(consumer.id) return } } - - // all's set main message processor + // authenticated consumer add to list of clients + consumer.id = ++this.nextClientID // server assigned ID + consumer.socketSide = true + consumer.authenticated = true + this.clients.push(consumer) // add current consumer to clients + consumer.setKeepAlive(this.keepAlive,30) + log.debug({method:'_listen', line:364, msg:'new consumer connected/authenticated', cname:consumer.name, cid:consumer.id, totalConsumers:this.clients.length}) + // all's set enable main incoming message processor stream.on('message', messageProcess.bind(this, consumer)) if (this.opts.conPacket) { @@ -354,8 +357,7 @@ export default function socketClass(Server) { this._send(consumer,this.opts.conPacket) // send a packet command on to consumer on connection } - this.emit('consumer-connection',{state:'connected', id:consumer.id, name:consumer.name, socketSide:true}) - this.emit('status',{level:'info', msg:'a consumer connected', name:consumer.name, id:consumer.id}) + this.emit('status',{level:'info', msg:'a consumer connected and authenticated', name:consumer.name, id:consumer.id}) // that's it. Connection is active