From ba112c484eaf4b04c0d887f5f995e321e5599258 Mon Sep 17 00:00:00 2001 From: David Kebler Date: Thu, 5 Dec 2019 14:42:41 -0800 Subject: [PATCH] 0.2.26 refactor the connection events, status is now log, connection event is now connection:socket or connection:consumer 'online' to just 'connected' --- package.json | 2 +- src/consumer.js | 57 +++++++++++++++++++++------------------------ src/socket-class.js | 51 ++++++++++++++++++++-------------------- 3 files changed, 53 insertions(+), 57 deletions(-) diff --git a/package.json b/package.json index b6075a2..87d5097 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@uci/socket", - "version": "0.2.25", + "version": "0.2.26", "description": "JSON packet intra(named)/inter(TCP) host communication over socket", "main": "src", "scripts": { diff --git a/src/consumer.js b/src/consumer.js index a199546..c52691f 100644 --- a/src/consumer.js +++ b/src/consumer.js @@ -28,13 +28,13 @@ class SocketConsumer extends Socket { constructor(opts = {}) { super() + this.id = opts.id || opts.name || 'socket:' + new Date().getTime() log = logger({ file: 'src/consumer.js', class: 'Consumer', name: 'socket', id: this.id }) - this.id = opts.id || opts.name || 'socket:' + new Date().getTime() if (!opts.path) { if(!opts.host) log.warn({method:'constructor', line:38, opts: opts, msg:'no host supplied using localhost...use named piped instead - opts.path'}) opts.host = opts.host || '127.0.0.1' @@ -52,7 +52,6 @@ class SocketConsumer extends Socket { this.pingFailedTimeout = opts.pingFailedTimeout * 1000 || 10000 this.reconnectLimit = opts.reconnectLimit || 0 this.retryWait = opts.retryWait==null ? 5000 : opts.retryWait * 1000 - console.log(this.retryWait) this.heartBeat = !!process.env.HEARTBEAT || opts.heartBeat this.keepAlive = 'keepAlive' in opts ? opts.keepAlive : true this.stream = new JsonStream() @@ -75,24 +74,23 @@ class SocketConsumer extends Socket { this._connection = state || this._connection let opts = {state:this._connection, id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected} Object.assign(opts,moreOpts) - this.emit('connection',opts) + this.emit('connection:socket',opts) } - status(level='debug', msg) { + log(level='debug', msg) { let opts = {level:level, msg:msg, id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected, state:this._connection} if (typeof msg !== 'string') Object.assign(opts,msg) - this.emit('status',opts) + this.emit('log',opts) log[level](opts) } async connect(timeout=0) { - console.log('=================================================connect=========================================================================================') this.initTimeout = timeout > 4 ? timeout * 1000 : this.initTimeout this.notify('starting') if (this.opts.host === '127.0.0.1'|| this.opts.host ==='localhost') { let msg ='tcp consumer on same machine as host, use named Pipe(Unix) Socket Instead' log.warn({method:'connect', msg:'msg'}) - this.status('warn',msg) + this.log('warn',msg) } this.once('error', errorHandler.bind(this)) @@ -100,7 +98,7 @@ class SocketConsumer extends Socket { super.connect(this.opts) - // returns promise for initial connection with reject on timeout + // returns promise for initial connection when initTimeout is not zero with reject on timeout if (this._first && this.initTimeout) { let initTimeout = {} @@ -111,22 +109,23 @@ class SocketConsumer extends Socket { initTimeout = setTimeout(async () => { this.disconnect() let msg=`unable to connect initially to socket server in ${this.initTimeout/1000} secs, giving up no more attempts` - this.status('fatal',msg) + this.log('fatal',msg) this.notify('failed',{timeout:this.initTimeout, msg:msg}) reject({ error:msg, opts: this.opts}) } , this.initTimeout) const successHandler = (ev) => { - if (ev.state === 'online') { + console.log('initial success', ev.state) + if (ev.state === 'connected') { clearTimeout(initTimeout) this.removeListener('connection',successHandler) - this.status('info','initial connection successfull') + this.log('info','initial connection successfull') resolve({opts: this.opts, msg: 'initial connection successfull'}) } } - this.on('connection',successHandler) + this.on('connection:socket',successHandler) }) } @@ -156,15 +155,14 @@ class SocketConsumer extends Socket { this._first = false if ( this.reconnectLimit === 0 || this._reconnectCount < this.reconnectLimit ) { this._reconnectCount += 1 - this.status('warn',`Attempting a reconnect: attempt ${this._reconnectCount} of ${this.reconnectLimit ||'unlimited'}`) + this.log('warn',`Attempting a reconnect: attempt ${this._reconnectCount} of ${this.reconnectLimit ||'unlimited'}`) this.connect() } else { - this.status('fatal',`${this.reconnectLimit} reconnect attempts exceeded no more tries`) + this.log('fatal',`${this.reconnectLimit} reconnect attempts exceeded no more tries`) } } - async send(ipacket) { return new Promise(async resolve => { if (!this.active) { @@ -271,12 +269,12 @@ async function connectHandler () { this._doneAuthenticate = setTimeout(() =>{ let msg ='authentication not completed in 3 secs, forcing reconnect attempt' this.notify('failed',{msg:msg}) - this.status('warn',msg) + this.log('warn',msg) this.reconnect() },3000) this.on('data', this.stream.onData) this.stream.on('message', handshake.bind(this)) - this.status('info','in process of connecting waiting for socket handshake/authenticate') + this.log('info','in process of connecting waiting for socket handshake/authenticate') this.notify('connecting') } @@ -284,7 +282,6 @@ async function handshake (packet) { if (packet._handshake) { this._connected = true this.notify('handshake') - this.status('info','connection handshaking') let authPacket = this._authenticate() || {} authPacket._authenticate = true authPacket.clientName = this.id @@ -294,20 +291,18 @@ async function handshake (packet) { if (res.error) { let msg =`error during authentication ${res.error}, attempting reconnect in ${this.retryWait/1000}s to see if error clears` this.notify('error',{msg:msg}) - this.status('error',msg) + this.log('error',msg) this._errorRetry = setTimeout(this.reconnect.bind(this),this.retryWait) } else { if (!res.authenticated) { let msg =`authentication failed: ${res.reason}, disconnecting with no reconnect` this.notify('failed',{msg:msg}) - this.status('fatal',msg) this.disconnect() } else { this._authenticated = res.authenticated let msg ='authentication succeeded connection ready' - this.status('info',msg) - this.notify('online',{msg:msg}) + this.notify('connected',{msg:msg}) this._reconnectCount = 0 this.stream.on('message', messageHandler.bind(this)) // reset default message handler this.on('pushed', pushHandler.bind(this) ) @@ -326,7 +321,7 @@ async function handshake (packet) { function messageHandler(packet) { if (packet._header.id !== 'ping') { // ping has it's own listner let obj = { msg:'incoming packet from socket sever',packet:packet} - this.status('trace',obj) + this.log('trace',obj) } this.emit(packet._header.id, packet) } @@ -334,9 +329,9 @@ function messageHandler(packet) { // assume all errors are fatal and the socket needs to be disconnected/reconnected async function errorHandler (err) { this.disconnect() - let msg = {error:err, msg:`error, socket has been disconnected, trying reconnect in ${this.retryWait/1000} secs`} + let msg = `error, socket has been disconnected, trying reconnect in ${this.retryWait/1000} secs` this.notify('error',{error:err,msg:msg}) - this.status('error',msg) + this.log('error',msg) this._errorRetry = setTimeout(this.reconnect.bind(this),this.retryWait) } @@ -344,7 +339,7 @@ async function errorHandler (err) { async function pushHandler (packet) { // TODO do some extra security here? let msg = {msg:'packed was pushed from socket sever, processing', packet:packet} - this.status('trace',msg) + this.log('trace',msg) let res = await this._packetProcess(packet) if (!res) { // if process was not promise returning then res will be undefined @@ -366,23 +361,23 @@ async function pingFailedHandler () { this.removeAllListeners('ping') this.on('ping',pingHandler.bind(this)) let msg = 'ping has been received again, back to normal connection' - this.notify('online',{msg:msg}) - this.status('info',msg) + this.notify('connected',{msg:msg}) + this.log('info',msg) } function monitorPing () { this._ping = setTimeout( () => { this.removeAllListeners('ping') - let msg = `ping failed waiting ${this.pingFailedTimeout/1000} secs to come back online before forced retry` + let msg = `ping failed waiting ${this.pingFailedTimeout/1000} secs to before forced reconnect` this.notify('offline',{msg:msg}) - this.status('warn',msg) + this.log('warn',msg) this.on('ping',pingFailedHandler.bind(this)) this._pingFailed = setTimeout (() => { this.removeAllListeners('ping') let msg =`no ping received for ${this.pingFailedTimeout/1000} secs, force disconnect/reconnect` this.notify('failed',{msg:msg}) - this.status('warn',msg) + this.log('warn',msg) this.reconnect() // this.emit('error',{code:'PING_FAILED'}) }, this.pingFailedTimeout) diff --git a/src/socket-class.js b/src/socket-class.js index 70da3be..0d30b8c 100644 --- a/src/socket-class.js +++ b/src/socket-class.js @@ -131,19 +131,20 @@ export default function socketClass(Server) { this.errors.push(err) if(this.errorCount>2 && this.errorCount<6) { let errors= {level:'warn',msg:'something bad maybe going on, 3 errors', errors:this.errors} - this.emit('status', errors) + this.emit('log', errors) log.error(errors) } if(this.errorCount>5) { let errors = {level:'fatal',msg:'something fatal is going on, 6 errors', errors:this.errors} log.fatal(errors) this.listening=false - this.emit('status', errors) + this.emit('log', errors) } }) - log.info({method:'create', line:54, msg:'socket server created and listening at', address:this.address()}) + let obj = {method:'create', line:54, msg:'socket server created and listening at', address:this.address()} + log.info(obj) this.on('connection', this._connectionHandler.bind(this)) - this.emit('status',{active:this.active}) + this.emit('log:',) resolve(`socket ready and listening at ${this.address().address}:${this.address().port}`) }) @@ -234,36 +235,37 @@ export default function socketClass(Server) { } // TODO won't need this if moving to a Map - getClientIndex(id) { - return this.clients.findIndex(client => {return client.id === id }) + getClientIndex(sid) { + return this.clients.findIndex(client => {return client.sid === sid }) } - getClient(id) { - return this.clients[this._getClientIndex(id)] + getClient(sid) { + return this.clients[this._getClientIndex(sid)] } - removeClient (id) { - let index = this.getClientIndex(id) + removeClient (sid) { + let index = this.getClientIndex(sid) let client=this.clients[index] - this.emit('status',{level:'info', msg:'a consumer disconnected', name:client.name, id:client.id}) + this.emit('log',{level:'info', msg:'a consumer disconnected', name:client.name, id:client.id}) + this.emit('connection:consumer',{state:'disconnected', msg:'a consumer disconnected', name:client.name, id:client.id}) client.removeAllListeners() client.stream.removeAllListeners() this.clients.splice(index,1) - log.warn({msg:'client removed from tracking',id:id, curClientCount:this.clients.length}) + log.warn({msg:'consumer removed from tracking',sid:sid, curClientCount:this.clients.length}) } async authenticateClient(client) { return new Promise(async (resolve, reject) => { // when consumer gets the handshake they must follow with authentication client.stream.on('message', authenticate.bind(this,client)) - let [err] = await btc(this._send)(client,{_handshake: true, id:client.id}) + let [err] = await btc(this._send)(client,{_handshake: true, sid:client.sid}) if (err) { log.error({msg:'error in handshake send', error:err}) reject(err) } async function authenticate (client,packet) { - log.debug({msg:`authentication packet from client ${client.id}`, packet:packet}) + log.debug({msg:`authentication packet from client ${client.name}:${client.id}:${client.sid}`, packet:packet}) client.stream.removeAllListeners('message') if (!packet._authenticate) reject('first client packet was not authentication') else { @@ -275,7 +277,7 @@ export default function socketClass(Server) { log.debug({msg:'sending authorization result to client', packet:packet}) await this._send(client,packet) // send either way if (err && !this.allowAnonymous) { - log.info({msg:'client authentication failed', client:client.name, client_id:client.id, reason:err}) + log.info({msg:'client authentication failed', client:client.name, client_sid:client.sid, reason:err}) reject(packet.reason) } else { @@ -311,13 +313,6 @@ export default function socketClass(Server) { consumer.stream = stream // add listeners - const clientCloseHandler = (id) => { - log.warn({msg:'client connection closed during listen,',id:id}) - this.removeClient(id) - } - - consumer.on('close', clientCloseHandler.bind(this,consumer.id) ) - consumer.on('error', (err) => { log.error({msg:'client connection error',error:err}) // TODO do more handling than just logging @@ -341,12 +336,17 @@ export default function socketClass(Server) { return } } - // authenticated consumer add to list of clients - consumer.id = ++this.nextClientID // server assigned ID + // authenticated consumer, add to list of clients + consumer.sid = ++this.nextClientID // server assigned ID consumer.socketSide = true consumer.authenticated = true this.clients.push(consumer) // add current consumer to clients consumer.setKeepAlive(this.keepAlive,30) + const clientCloseHandler = (sid) => { + log.warn({msg:'consumer connection was closed',sid:sid}) + this.removeClient(sid) + } + consumer.on('close', clientCloseHandler.bind(this,consumer.sid)) log.debug({method:'_listen', line:364, msg:'new consumer connected/authenticated', cname:consumer.name, cid:consumer.id, totalConsumers:this.clients.length}) // all's set enable main incoming message processor stream.on('message', messageProcess.bind(this, consumer)) @@ -357,7 +357,8 @@ export default function socketClass(Server) { this._send(consumer,this.opts.conPacket) // send a packet command on to consumer on connection } - this.emit('status',{level:'info', msg:'a consumer connected and authenticated', name:consumer.name, id:consumer.id}) + this.emit('log',{level:'info', msg:'a consumer connected and authenticated', name:consumer.name, id:consumer.id}) + this.emit('connection:consumer',{state:'connected', msg:'a consumer connected and authenticated', name:consumer.name, id:consumer.id}) // that's it. Connection is active