diff --git a/package.json b/package.json index d9d235a..a3db197 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@uci/socket", - "version": "0.2.14", + "version": "0.2.16", "description": "JSON packet intra(named)/inter(TCP) host communication over socket", "main": "src", "scripts": { diff --git a/src/consumer.js b/src/consumer.js index 2189da7..3154ce9 100644 --- a/src/consumer.js +++ b/src/consumer.js @@ -49,94 +49,88 @@ class SocketConsumer extends Socket { // if keepAlive is true then consumer will also be reconnecting consumer this.keepAlive = 'keepAlive' in opts ? opts.keepAlive : true this._ready = false - this.timeout = opts.timeout || process.env.UCI_CONNECT_TIMEOUT || 120 // initial connect timeout in secs and then rejects - this.wait = opts.wait || 2 this.stream = new JsonStream() // bind to class for other class functions this.connect = this.connect.bind(this) this.close = promisify(this.end).bind(this) this.__ready = this.__ready.bind(this) + this._conAttempt = 0 + this._reconnect = false // this._write = this._write.bind(this) } async connect() { return new Promise((resolve, reject) => { - let initial = true + let initTimeout = {} + // let endlessAttempts = {} - // this is only for initial connection - const initTimeout = setTimeout(() => { - log.fatal({method:'connect', line:69, opts: this.opts, msg:`unable to connect in ${this.timeout}s`}) - reject({ opts: this.opts },`unable to connect to socket server in ${this.timeout}secs`) - }, this.timeout * 1000) + if (this.opts.host === '127.0.0.1'|| this.opts.host ==='localhost') + log.warn({method:'connect', line:107, msg:'tcp consumer on same machine as host, use named Pipe(Unix) Socket Instead'}) + console.log('first connnect attempt for', this.opts.name) - this.once('connect', async () => { - clearTimeout(initTimeout) - this._listen() - log.debug({method:'connect', line:80, opts: this.opts, msg:'initial connect waiting for socket ready handshake'}) - this.setKeepAlive(this.keepAlive, 3000) - let [err, res] = await btc(isReady).bind(this)(this.__ready,this.wait,this.timeout) - if (err) reject(err) - initial = false - log.debug({method:'connect', line:85, msg:'handshake to socket done, TODO authenticating'}) - // TODO authenticate here by encrypting a payload with private key and sending that. - // await btc(authenticate) - this.emit('connected') // for end users to take action - resolve(res) - }) - - let reconTimeout - // function that sets a reconnect timeout - const reconnect = () => { - reconTimeout = setTimeout(() => { - this.removeAllListeners() - this.stream.removeAllListeners() - this.destroy() - connect() - }, this.wait * 1000) + // this is only for initial drop dead connection failure + if (this.opts.initTimeout) { + initTimeout = setTimeout(() => { + log.fatal({method:'connect', line:69, opts: this.opts, msg:`unable to initially connect to ${this.opts.name} in ${this.opts.initTimeout}s no more attempts!`}) + reject({ opts: this.opts, msg: `unable to connect initially to socket server in ${this.timeout}secs, giving up no more attempts`}) + } + , this.opts.initTimeout * 1000) } - // connection function that sets listeners and deals with reconnect - const connect = () => { - if (this.opts.host === '127.0.0.1'|| this.opts.host ==='localhost') - log.warn({method:'connect', line:107, msg:'tcp consumer on same machine as host, use named Pipe(Unix) Socket Instead'}) + const _connect = () => { - if(!initial) { - this.once('connect', async () => { - clearTimeout(reconTimeout) - this._listen() - log.debug({method:'connect', line:113, msg:'reconnected waiting for socket ready handshake'}) - this.setKeepAlive(this.keepAlive, 3000) - let [err, res] = await btc(isReady).bind(this)(this.__ready,this.wait,this.timeout) - if (err) reject(err) - log.debug({method:'connect', line:69, msg:'rehandshake done, reauthenticating'}) - // TODO authenticate here by encrypting a payload with private key and sending that. - // await btc(authenticate) - this.emit('connected') + this.once('connect', async () => { + console.log(`${this.opts.name} - has ${this._reconnect ? 're':''}CONNECTED after ${this._conAttempt} attempts`) + if (!this._reconnect) clearTimeout(initTimeout) + // clearInterval(endlessAttempts) + this._listen() + log.debug({method:'connect', line:113, msg:'connected waiting for socket ready handshake'}) + this.setKeepAlive(this.keepAlive, 3000) + let [err, res] = await btc(isReady).bind(this)(this.__ready,this.hsInterval,this.hsTimeout) + if (err) { + log.error(`${this.opts.name} - could not complete handshake after ${this._reconnect ? 're':''}connect attempt ${this._conAttempt}`) + if (!this._reconnect) reject(err) + this.emit('error', new Error('HANDSHAKE')) + } else { + log.debug({method:'connect', line:69, msg:'handshake done, authenticating'}) + this.send({cmd:'auth', id:this.opts.clientID, name:this.opts.clientNamem, key:'somebigsecret'}) + // TODO authenticate here by encrypting private key. + if (!this._reconnect) { + resolve(res) + this.emit('connected') + } this.emit('reconnected') // emit also reconnected for special end user action - resolve(res) - }) - } + this._ready = true + this._conAttempts = 0 + this._reconnect = true + } + }) this.on('error', async err => { + console.log('error emitted', this.opts.name, err.code) if (err.code !== 'EISCONN') { this._ready = false this.emit('ready', false) log.debug({ method:'connect', line:130, error: err.code, msg:`connect error ${err.code}, attempting reconnect`}) - reconnect() - } - else { + this.removeAllListeners() + this.stream.removeAllListeners() + this.destroy() + this._conAttempt += 1 + console.log(`${this.opts.name} - ${this._reconnect ? 're':''}connection attempt ${this._conAttempt}`) + _connect() + } else { this._ready = true this.emit('ready', true) log.error({method:'connect', line:69, msg:'reconnected to socket, ready to go again'}) } }) - if (this.keepAlive) { // only attempt reconnect is keepAlive is set which it is by default + if (this.keepAlive) { // only attempt reconnect if keepAlive is set which it is by default this.on('end', async () => { - log.error({method:'connect', line:142, msg:'socket (server) terminated unexpectantly, keepalive set, wait for server to come online'}) + console.log({method:'connect', line:142, msg:'socket (server) terminated unexpectantly, keepalive set, wait for server to come online'}) this._ready = false this.emit('error', { code: 'DISCONNECTED' }) }) @@ -144,11 +138,11 @@ class SocketConsumer extends Socket { // attempt connection log.debug({method:'connect', line:149, opts: this.opts, msg:`attempting to connect ${this.id} to socket`}) - super.connect(this.opts) + super.connect(this.opts) // ###call nodejs net connect here### } // end connect function - connect() // initial connect request + _connect() // initial connect request to start the ball rolling }) //end promise } @@ -245,7 +239,7 @@ export default SocketConsumer // Helper Functions // wait until a passed ready function returns true -function isReady(ready, wait = 30, timeout = 1000) { +function isReady(ready, wait = 100, timeout = 2000) { let time = 0 return new Promise((resolve, reject) => { (function waitReady() {