diff --git a/.eslintrc.js b/.eslintrc.js index d4dda45..2bed546 100644 --- a/.eslintrc.js +++ b/.eslintrc.js @@ -9,11 +9,9 @@ module.exports = { "node": true, "mocha": true }, - "parser": "babel-eslint", "parserOptions": { "ecmaVersion": 2017, - "sourceType": "module", - "allowImportExportEverywhere": true + "sourceType": "module" }, "extends": "eslint:recommended", "rules": { diff --git a/examples/client.js b/examples/client.js index 7ec8093..de81557 100644 --- a/examples/client.js +++ b/examples/client.js @@ -1,9 +1,10 @@ import Consumer from '../src/consumer' +import btc from 'better-try-catch' // const client1= new Consumer({name:'example-consumer1' }) -const client= new Consumer({path:true, name:'example-consumer' }) +const client= new Consumer({path:true, name:'example-consumer', initTimeout:30 }) -let packet = {name: 'client', cmd:'doit', data:'data sent by client'} +let packet = {name: 'client', cmd:'doit', data:'sent by client'} // This is your client handler object waiting on a message to do something const process = function (packet) { @@ -16,10 +17,14 @@ client.registerPacketProcessor(process) (async () => { // await Promise.all([client1.connect(),client2.connect()]) - await client.connect() - console.log('sending packet ', packet) - console.log('=========\n',await client.send(packet)) - // client.end() + let [err, res] = await btc(client.connect)() + if (err) { + console.log('error', err) + } else { + console.log('connect reponse',res) + console.log('sending packet ', packet) + console.log('=========\n',await client.send(packet)) + } })().catch(err => { console.error('FATAL: UNABLE TO START SYSTEM!\n',err) diff --git a/examples/server.js b/examples/server.js index 1256ae0..a91c0e8 100644 --- a/examples/server.js +++ b/examples/server.js @@ -39,11 +39,11 @@ let Socket = uSocket res.data = 'this would be response data from socket doit function' resolve(res) }) - } } + // const options = { // tls: TLS, // key: await fs.readFile(TLS_KEY_PATH), @@ -60,7 +60,10 @@ let Socket = uSocket // let test = new Test() let test = new Test(options) await test.create() - console.log('ready') + setTimeout( () => { + console.log('closing server') + test._destroy() + },20000) })().catch(err => { console.error('FATAL: UNABLE TO START SYSTEM!\n',err) diff --git a/package.json b/package.json index f9d7ba5..95951a3 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@uci/socket", - "version": "0.2.17", + "version": "0.2.18", "description": "JSON packet intra(named)/inter(TCP) host communication over socket", "main": "src", "scripts": { @@ -8,12 +8,12 @@ "test": "mocha -r esm --timeout 10000 test/*.test.mjs", "testlog": "UCI_DEV=true mocha -r esm --timeout 10000 test/*.test.mjs", "testci": "istanbul cover ./node_modules/.bin/_mocha --report lcovonly -- -R spec --recursive && codecov || true", - "s": "UCI_ENV=dev node -r esm examples/server", - "sp": "UCI_DEV=true node -r esm examples/server-push", - "devs": "SOCKETS_DIR=/opt/sockets UCI_DEV=true ./node_modules/.bin/nodemon -r esm-e mjs examples/server", - "c": "UCI_ENV=dev node -r esm examples/client", - "cp": "UCI_DEV=true node -r esm examples/client-push", - "devc": "SOCKETS_DIR=/opt/sockets UCI_DEV=true node -r esm examples/client", + "s": "node -r esm examples/server", + "devs": "UCI_ENV=dev ./node_modules/.bin/nodemon -r esm examples/server", + "devs:debug": "UCI_LOG_LEVEL=debug npm run devs", + "client": "=node -r esm examples/client", + "devc": "UCI_ENV=dev ./node_modules/.bin/nodemon -r esm examples/client", + "devc:debug": "UCI_LOG_LEVEL=debug npm run devc", "c2": "node -r esm examples/client2" }, "author": "David Kebler", @@ -49,6 +49,7 @@ "better-try-catch": "^0.6.2", "clone": "^2.1.2", "death": "^1.1.0", + "delay": "^4.3.0", "make-dir": "^3.0.0" } } diff --git a/src/consumer.js b/src/consumer.js index 3154ce9..e817bff 100644 --- a/src/consumer.js +++ b/src/consumer.js @@ -1,6 +1,7 @@ import { Socket } from 'net' import path from 'path' import { promisify } from 'util' +import pause from 'delay' import btc from 'better-try-catch' import JsonStream from './json-stream' @@ -47,14 +48,17 @@ class SocketConsumer extends Socket { this.opts = opts // default is keepAlive true, must set to false to explicitly disable // if keepAlive is true then consumer will also be reconnecting consumer + this.initTimeout = opts.initTimeout * 1000 || 60000 + this.retryWait = opts.retryWait * 1000 || 3000 this.keepAlive = 'keepAlive' in opts ? opts.keepAlive : true this._ready = false this.stream = new JsonStream() // bind to class for other class functions this.connect = this.connect.bind(this) this.close = promisify(this.end).bind(this) - this.__ready = this.__ready.bind(this) - this._conAttempt = 0 + // this.__ready = this.__ready.bind(this) + this._conAttempt = 1 + this._aborted = false this._reconnect = false // this._write = this._write.bind(this) } @@ -62,88 +66,53 @@ class SocketConsumer extends Socket { async connect() { return new Promise((resolve, reject) => { - let initTimeout = {} - // let endlessAttempts = {} - if (this.opts.host === '127.0.0.1'|| this.opts.host ==='localhost') log.warn({method:'connect', line:107, msg:'tcp consumer on same machine as host, use named Pipe(Unix) Socket Instead'}) - console.log('first connnect attempt for', this.opts.name) + log.debug('first connnect attempt for', this.opts.name) - // this is only for initial drop dead connection failure - if (this.opts.initTimeout) { - initTimeout = setTimeout(() => { - log.fatal({method:'connect', line:69, opts: this.opts, msg:`unable to initially connect to ${this.opts.name} in ${this.opts.initTimeout}s no more attempts!`}) - reject({ opts: this.opts, msg: `unable to connect initially to socket server in ${this.timeout}secs, giving up no more attempts`}) + let initTimeout = setTimeout(() => { + this.removeAllListeners() + log.fatal({method:'connect', line:69, opts: this.opts, msg:`unable to initially connect to ${this.opts.name} in ${this.initTimeout/1000} secs no more attempts!`}) + this.removeAllListeners() + this.stream.removeAllListeners() + this.destroy() + reject({ opts: this.opts, msg: `unable to connect initially to socket server in ${this.initTimeout/1000} secs, giving up no more attempts`}) + } + , this.initTimeout) + + + const initialHandshake = async (packet) => { + if (packet._handshake) { + log.debug({method:'connect', line:87, msg:'handshake received ready for communication'}) + this.stream.removeAllListeners('message') + this.removeListener('error',initialErrorHandler) + clearTimeout(initTimeout) + this._listen() // setup for active connection + this._ready = true + this.emit('connection','connected') + resolve('initial connection successful') } - , this.opts.initTimeout * 1000) } - // connection function that sets listeners and deals with reconnect - const _connect = () => { + const initialConnectHandler = async () => { + this.on('data', this.stream.onData) + this.stream.on('message', initialHandshake.bind(this)) + log.debug({method:'connect', line:113, msg:'connected waiting for socket ready handshake'}) + } - this.once('connect', async () => { - console.log(`${this.opts.name} - has ${this._reconnect ? 're':''}CONNECTED after ${this._conAttempt} attempts`) - if (!this._reconnect) clearTimeout(initTimeout) - // clearInterval(endlessAttempts) - this._listen() - log.debug({method:'connect', line:113, msg:'connected waiting for socket ready handshake'}) - this.setKeepAlive(this.keepAlive, 3000) - let [err, res] = await btc(isReady).bind(this)(this.__ready,this.hsInterval,this.hsTimeout) - if (err) { - log.error(`${this.opts.name} - could not complete handshake after ${this._reconnect ? 're':''}connect attempt ${this._conAttempt}`) - if (!this._reconnect) reject(err) - this.emit('error', new Error('HANDSHAKE')) - } else { - log.debug({method:'connect', line:69, msg:'handshake done, authenticating'}) - this.send({cmd:'auth', id:this.opts.clientID, name:this.opts.clientNamem, key:'somebigsecret'}) - // TODO authenticate here by encrypting private key. - if (!this._reconnect) { - resolve(res) - this.emit('connected') - } - this.emit('reconnected') // emit also reconnected for special end user action - this._ready = true - this._conAttempts = 0 - this._reconnect = true - } - }) + const initialErrorHandler = async (err) => { + log.debug({method:'connect', line:101, error:err, msg:`error during initial connect, trying again in ${this.retryWait/1000} secs`}) + await pause(this.retryWait) + super.connect(this.opts) + } - this.on('error', async err => { - console.log('error emitted', this.opts.name, err.code) - if (err.code !== 'EISCONN') { - this._ready = false - this.emit('ready', false) - log.debug({ method:'connect', line:130, error: err.code, msg:`connect error ${err.code}, attempting reconnect`}) - this.removeAllListeners() - this.stream.removeAllListeners() - this.destroy() - this._conAttempt += 1 - console.log(`${this.opts.name} - ${this._reconnect ? 're':''}connection attempt ${this._conAttempt}`) - _connect() - } else { - this._ready = true - this.emit('ready', true) - log.error({method:'connect', line:69, msg:'reconnected to socket, ready to go again'}) - } - }) - if (this.keepAlive) { // only attempt reconnect if keepAlive is set which it is by default - this.on('end', async () => { - console.log({method:'connect', line:142, msg:'socket (server) terminated unexpectantly, keepalive set, wait for server to come online'}) - this._ready = false - this.emit('error', { code: 'DISCONNECTED' }) - }) - } + this.once('connect', initialConnectHandler) + this.on('error', initialErrorHandler) + super.connect(this.opts) - // attempt connection - log.debug({method:'connect', line:149, opts: this.opts, msg:`attempting to connect ${this.id} to socket`}) - super.connect(this.opts) // ###call nodejs net connect here### - - } // end connect function - - _connect() // initial connect request to start the ball rolling - }) //end promise + }) // end initial promise } async send(ipacket) { @@ -170,13 +139,12 @@ class SocketConsumer extends Socket { res = reply log.debug({method:'send', line:180, msg:'consumer function was not promise returning further processing may be out of sequence'}) } - resolve(res) + resolve(res) // resolves processed packet not return packet }) //end listener }) } // TODO register user alt stream processor (emit 'message' with JSON, serialize function, onData method for raw socket chucks) - // TODO register authenciation function (set up default) registerPacketProcessor(func) { this._packetProcess = func @@ -184,6 +152,89 @@ class SocketConsumer extends Socket { // PRIVATE METHODS + // set up incoming message listening and error/reonnect handling + async _listen() { + // Define Handlers and Other Functions + const reconnectHandler = () => { + this.stream.once('message', handshake.bind(this)) + log.debug({method:'connect', line:113, msg:'connected waiting for socket ready handshake'}) + } + + const handshake = async (packet) => { + if (packet._handshake) { + log.debug({method:'connect', line:87, msg:'handshake received ready for communication'}) + this.stream.on('message', messageProcess.bind(this)) + if (this.keepAlive) { // only attempt reconnect if keepAlive is set which it is by default + this.on('ping',pingHandler) + this.setKeepAlive(this.keepAlive,3000) // keep connection alive unless disabled + } + this._ready = true + this.emit('connection','reconnected') + } + } + + const errorHandler = async (err) => { + log.debug({msg:'connection error emitted ', error:err}) + this._ready = false + if (err.code !== 'EISCONN') { + this.emit('connection', err) + log.debug({ method:'connect', line:130, error: err.code, msg:`connect error ${err.code}, attempting reconnect after ${this.retryWait/1000} secs`}) + await pause(this.retryWait) + this.stream.removeAllListeners('message') // remove regular message handler in prep for reconnect + this.removeAllListeners('connect') + this.once('connect',reconnectHandler) + super.connect(this.opts) + } + else { + this._ready = true + this.emit('connection','reconnected') + log.error({method:'connect', line:69, msg:'reconnected to socket, ready to go again'}) + } + } + + async function messageProcess(packet) { + log.debug('incoming packet from socket sever',packet) + this.emit(packet._header.id, packet) + } + + const pushHandler = async (packet) => { + // TODO do some extra security here? + log.debug('packed was pushed from socket sever, processing',packet) + let res = await this._packetProcess(packet) + if (!res) { + // if process was not promise returning then res will be undefined + log.debug('consumer packet processing function was not promise returning') + } + } + + const pingHandler = async (packet) => { + clearTimeout(pingTimeout) + log.info({method:'connect', line:191, msg:'received ping, restting timeout'}) + this._pingTimeout= packet.pingInterval + 1000 + monitorPing.call(this) + } + + let pingTimeout = {} + function monitorPing () { + pingTimeout = setTimeout( () => { + log.error({method:'connect', line:142, msg:'socket (server) not availabe'}) + this.removeAllListeners('ping') + this._ready = false + this.emit('error', { code: 'PING-FAILED' }) + },this._pingTimeout) + } + + // Start Message Listening and Error/Reconnect Handling + log.debug('listening for incoming packets from socket') + this.stream.on('message', messageProcess.bind(this)) + this.setKeepAlive(this.keepAlive,3000) // keep connection alive unless disabled + this.on('pushed', pushHandler ) + this.on('error', errorHandler) + if (this.keepAlive) { // keepAlive also activates ping Monitor + this.on('ping',pingHandler) + } + } + async __write(packet) { // timeout already set if sockect can't be drained in 10 secs return new Promise(resolve => { @@ -196,36 +247,7 @@ class SocketConsumer extends Socket { }) } - __ready() { - return this._ready - } - async _listen() { - log.debug('listening for incoming packets from socket') - // listen for pushed packets - this.on('pushed', async function(packet) { - // TODO do some extra security here? - let res = await this._packetProcess(packet) - if (!res) { - // if process was not promise returning then res will be undefined - log.debug('consumer packet processing function was not promise returning') - } - }) - // listen on socket stream - this.on('data', this.stream.onData) - this.stream.on('message', messageProcess.bind(this)) - - async function messageProcess(packet) { - log.debug('incoming packet from socket',packet) - if (packet._handshake) { - this._ready = true - return - } - // TODO send back ack with consumer ID and authorization and wait - // when authorized drop through here to emit - this.emit(packet._header.id, packet) - } - } // default packet process just a simple console logger. ignores any cmd: prop _packetProcess(packet) { @@ -233,24 +255,34 @@ class SocketConsumer extends Socket { console.log('replace by calling .registerPacketProcessor(func) with your function') console.dir(packet) } + } // end class export default SocketConsumer + +// _removeListeners(events) { +// if (!events) this.removeAllListeners() +// else { +// if (typeof events === 'string') this.removeAllListeners(events) +// else events.map(event => this.removeAllListeners(event)) +// } +// } + // Helper Functions // wait until a passed ready function returns true -function isReady(ready, wait = 100, timeout = 2000) { - let time = 0 - return new Promise((resolve, reject) => { - (function waitReady() { - if (time > timeout) - return reject( - `timeout waiting for socket ready handshake - ${timeout}ms` - ) - if (ready()) return resolve('ready') - log.debug({function:'isReady', line:261, msg:`waiting ${wait}ms for handshake`}) - time += wait - setTimeout(waitReady, wait) - })() - }) -} +// function isReady(ready, wait = 100, timeout = 2000) { +// let time = 0 +// return new Promise((resolve, reject) => { +// (function waitReady() { +// if (time > timeout) +// return reject( +// `timeout waiting for socket ready handshake - ${timeout}ms` +// ) +// if (ready()) return resolve('ready') +// log.debug({function:'isReady', line:261, msg:`waiting ${wait}ms for handshake`}) +// time += wait +// setTimeout(waitReady, wait) +// })() +// }) +// } diff --git a/src/json-stream.js b/src/json-stream.js index d907677..c379a98 100644 --- a/src/json-stream.js +++ b/src/json-stream.js @@ -24,8 +24,9 @@ class JsonStream extends EventEmitter { data = decoder.write(data) try { this._handleData(data) - } catch (e) { - this.emit('error', { error: e }) + } catch (err) { + // emit an error on the socket that handled with other socket errors + this.emit('error', err) } } @@ -51,7 +52,7 @@ class JsonStream extends EventEmitter { if (isNaN(this._contentLength)) { this._contentLength = null this._buffer = '' - var err = new Error( + let err = new Error( 'Invalid content length supplied (' + rawContentLength + ') in: ' + @@ -64,12 +65,12 @@ class JsonStream extends EventEmitter { } } if (this._contentLength != null) { - var length = Buffer.byteLength(this._buffer, 'utf8') + let length = Buffer.byteLength(this._buffer, 'utf8') if (length == this._contentLength) { this._handleMessage(this._buffer) } else if (length > this._contentLength) { - var message = this._buffer.substring(0, this._contentLength) - var rest = this._buffer.substring(this._contentLength) + let message = this._buffer.substring(0, this._contentLength) + let rest = this._buffer.substring(this._contentLength) this._handleMessage(message) this.onData(rest) } @@ -83,7 +84,7 @@ class JsonStream extends EventEmitter { try { message = JSON.parse(data) } catch (e) { - var err = new Error( + let err = new Error( 'Could not parse JSON: ' + e.message + '\nRequest data: ' + data ) err.code = 'E_INVALID_JSON' diff --git a/src/socket-class.js b/src/socket-class.js index cfe5ec8..9e32184 100644 --- a/src/socket-class.js +++ b/src/socket-class.js @@ -57,6 +57,8 @@ export default function socketClass(Server) { if (path.dirname(opts.path) === '.') // relative path sent opts.path = path.join(DEFAULT_PIPE_DIR, opts.path) } + this.keepAlive = 'keepAlive' in opts ? opts.keepAlive : true + this.pingInterval = opts.pingInterval === false ? opts.pingInterval : (opts.pingInterval * 1000 || 5000) this.clientTracking = opts.clientTracking || true this.clients = [] // track consumers (i.e. clients) this.opts = opts // for use to recover from selected errors @@ -85,7 +87,7 @@ export default function socketClass(Server) { await this._destroy() }) process.once('SIGUSR2', async () => { - await this._destroy + await this._destroy() process.kill(process.pid, 'SIGUSR2') }) @@ -116,6 +118,11 @@ export default function socketClass(Server) { let [err, res] = await btc(this._listen)(this.opts) if (err) reject(err) + if (this.pingInterval) { + this._ping = setInterval( async () =>{ + if (this.clients) this.push({pingInterval:this.pingInterval},'ping') + },this.pingInterval) + } resolve(res) }) // end creeate promise } // end create @@ -137,9 +144,9 @@ export default function socketClass(Server) { * @param {string} id the header id string of the pushed packet, default: 'pushed' * */ - async push(packet, id) { - packet._header = { id: id || 'pushed' } - log.debug({method:'push', line:142, opts: this.opts, packet: packet, msg:'pushing a packet to all connected consumers'}) + async push(packet={},id) { + packet._header = {id: id || 'pushed'} + log.debug({method:'push', line:142, id:packet._header.id, opts: this.opts, packet: packet, msg:'pushing a packet to all connected consumers'}) this.clients.forEach(async client => { if (client.writable) { let [err, ser] = await btc(client.stream.serialize)(packet) @@ -149,14 +156,14 @@ export default function socketClass(Server) { err: err, _header: { id: packet._header.id } }) - if (!id || id === client.ID) await this._send.bind(client)(ser) + await this._send.bind(client)(ser) } }) } async _listen(opts) { - super.listen(opts, async (err, res) => { - if (err) return err + return super.listen(opts, async (err, res) => { + if (err) return Promise.reject(err) // this gets called for each client connection and is unique to each this.on('connection', async socket => { const stream = new JSONStream() @@ -172,6 +179,8 @@ export default function socketClass(Server) { send(await stream.serialize(this.opts.conPacket)) // send a packet command on to consumer on connection } + socket.setKeepAlive(this.keepAlive,3000) + // add error and close listeners on connection socket.on('error', (err) => { log.error({msg:'client connection error during listen',error:err}) @@ -221,14 +230,16 @@ export default function socketClass(Server) { }) await send(ser) } // end process message - }) // end connecttion consumer - log.info({method:'_listen', line:211, opts: this.opt, msg:'socket created and listening'}) + }) // end connecttion handler + + log.info({method:'_listen', line:255, opts: this.opt, msg:'socket created and listening'}) return res }) // end super listen callback } // end listen async _destroy() { log.debug({method:'_destroy', line:217, msg:'closing down socket'}) + clearInterval(this._ping) await this.close() log.debug({method:'_destroy', line:219, msg:'all connections closed....exiting'}) process.exit()