diff --git a/.npmignore b/.npmignore index 93bc8e2..125fbc3 100644 --- a/.npmignore +++ b/.npmignore @@ -7,3 +7,4 @@ yarn.lock travis.yml .eslintrc.js archive/ +docs/ diff --git a/examples/client.js b/examples/client.js index 9cfd424..7ec8093 100644 --- a/examples/client.js +++ b/examples/client.js @@ -15,12 +15,11 @@ client.registerPacketProcessor(process) ; (async () => { - - // await Promise.all([client1.connect(),client2.connect()]) await client.connect() + console.log('sending packet ', packet) console.log('=========\n',await client.send(packet)) - client.end() + // client.end() })().catch(err => { console.error('FATAL: UNABLE TO START SYSTEM!\n',err) diff --git a/examples/server.js b/examples/server.js index 55f6c01..1256ae0 100644 --- a/examples/server.js +++ b/examples/server.js @@ -1,22 +1,22 @@ -import { Socket as uSocket, sSocket} from '../src' -import { fs } from 'mz' +import { Socket as uSocket} from '../src' +// import { fs } from 'mz' // made key cert into module that also uses environment variables -const TLS = process.env.TLS || false -const TLS_DIR = process.env.TLS_DIR || '/opt/certs' -const TLS_NAME = process.env.TLD_NAME || 'wc.kebler.net' -const TLS_KEY_PATH = process.env.TLS_KEY_PATH || `${TLS_DIR}/${TLS_NAME}.key` -const TLS_CRT_PATH = process.env.TLS_CRT_PATH || `${TLS_DIR}/${TLS_NAME}.crt` +// const TLS = process.env.TLS || false +// const TLS_DIR = process.env.TLS_DIR || '/opt/certs' +// const TLS_NAME = process.env.TLD_NAME || 'wc.kebler.net' +// const TLS_KEY_PATH = process.env.TLS_KEY_PATH || `${TLS_DIR}/${TLS_NAME}.key` +// const TLS_CRT_PATH = process.env.TLS_CRT_PATH || `${TLS_DIR}/${TLS_NAME}.crt` let Socket = uSocket ; (async () => { // TODO dynamic import - if(TLS_KEY_PATH && TLS_CRT_PATH && TLS) { - Socket = sSocket - console.log('using TLS') - } + // if(TLS_KEY_PATH && TLS_CRT_PATH && TLS) { + // Socket = sSocket + // console.log('using TLS') + // } class Test extends Socket { constructor(opts) { @@ -44,22 +44,23 @@ let Socket = uSocket } - const options = { - tls: TLS, - key: await fs.readFile(TLS_KEY_PATH), - cert: await fs.readFile(TLS_CRT_PATH), - // This is necessary only if using client certificate authentication. - // requestCert: true, - // This is necessary only if the client uses a self-signed certificate. - // ca: [ fs.readFileSync('client-cert.pem') ] - } + // const options = { + // tls: TLS, + // key: await fs.readFile(TLS_KEY_PATH), + // cert: await fs.readFile(TLS_CRT_PATH), + // // This is necessary only if using client certificate authentication. + // // requestCert: true, + // // This is necessary only if the client uses a self-signed certificate. + // // ca: [ fs.readFileSync('client-cert.pem') ] + // } - options.path = true + let options = {path:true} // let test = new Test() let test = new Test(options) await test.create() + console.log('ready') })().catch(err => { console.error('FATAL: UNABLE TO START SYSTEM!\n',err) diff --git a/package.json b/package.json index cd078e7..fe7b95c 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@uci/socket", - "version": "0.2.10", + "version": "0.2.11", "description": "JSON packet intra(named)/inter(TCP) host communication over socket", "main": "src", "scripts": { @@ -8,10 +8,10 @@ "test": "mocha -r esm --timeout 10000 test/*.test.mjs", "testlog": "UCI_DEV=true mocha -r esm --timeout 10000 test/*.test.mjs", "testci": "istanbul cover ./node_modules/.bin/_mocha --report lcovonly -- -R spec --recursive && codecov || true", - "s": "UCI_DEV=true node -r esm examples/server", + "s": "UCI_ENV=dev node -r esm examples/server", "sp": "UCI_DEV=true node -r esm examples/server-push", "devs": "SOCKETS_DIR=/opt/sockets UCI_DEV=true ./node_modules/.bin/nodemon -r esm-e mjs examples/server", - "c": "UCI_DEV=true node -r esm examples/client", + "c": "UCI_ENV=dev node -r esm examples/client", "cp": "UCI_DEV=true node -r esm examples/client-push", "devc": "SOCKETS_DIR=/opt/sockets UCI_DEV=true node -r esm examples/client", "c2": "node -r esm examples/client2" diff --git a/src/consumer.js b/src/consumer.js index 6b33e22..64473d1 100644 --- a/src/consumer.js +++ b/src/consumer.js @@ -45,9 +45,11 @@ class SocketConsumer extends Socket { opts.path = path.join(DEFAULT_PIPE_DIR, opts.path) } this.opts = opts + // default is keepAlive true, must set to false to explicitly disable + // if keepAlive is true then consumer will also be reconnecting consumer this.keepAlive = 'keepAlive' in opts ? opts.keepAlive : true this._ready = false - this.timeout = opts.timeout || 300 // 5 minutes and then rejects + this.timeout = opts.timeout || 60 // initial connect timeout in secs and then rejects this.wait = opts.wait || 2 this.stream = new JsonStream() // bind to class for other class functions @@ -59,22 +61,11 @@ class SocketConsumer extends Socket { async connect() { return new Promise((resolve, reject) => { - const connect = () => { - if (this.opts.host === '127.0.0.1') - log.warn( - 'tcp consumer on same machine as host, use named Pipe(Unix) Socket Instead' - ) - log.info( - { opts: this.opts }, - `attempting to connect ${this.id} to socket` - ) - super.connect(this.opts) - } - let reconnect = {} + let initial = true + // this is only for initial connection - const timeout = setTimeout(() => { - clearTimeout(reconnect) + const initTimeout = setTimeout(() => { log.fatal({ opts: this.opts }, `unable to connect in ${this.timeout}s`) reject( { opts: this.opts }, @@ -82,47 +73,83 @@ class SocketConsumer extends Socket { ) }, this.timeout * 1000) + this.once('connect', async () => { - clearTimeout(timeout) + clearTimeout(initTimeout) this._listen() - log.info( - { opts: this.opts }, - 'connected waiting for socket ready handshake' - ) - this.setKeepAlive(this.keepAlive, 100) - let [err, res] = await btc(isReady).bind(this)( - this.__ready, - this.wait, - this.timeout - ) + log.info({ opts: this.opts, msg:'initial connect waiting for socket ready handshake'}) + this.setKeepAlive(this.keepAlive, 3000) + let [err, res] = await btc(isReady).bind(this)(this.__ready,this.wait,this.timeout) if (err) reject(err) - log.info('handshake done, authenticating') + initial = false + log.info('handshake to socket done, authenticating') // TODO authenticate here by encrypting a payload with private key and sending that. // await btc(authenticate) + this.emit('connected') // for end users to take action resolve(res) }) - this.on('error', async err => { - log.warn({ error: err.code }, `connect error ${err.code}`) - if (err.code === 'EISCONN') { - return resolve('ready') - } - - reconnect = setTimeout(() => { + let reconTimeout + // function that sets a reconnect timeout + const reconnect = () => { + reconTimeout = setTimeout(() => { + this.removeAllListeners() + this.stream.removeAllListeners() + this.destroy() connect() }, this.wait * 1000) - }) + } - this.on('end', async () => { - log.warn('socket (server) terminated unexpectantly') - if (this.keepAlive) { - log.info( - 'keep alive was set, so waiting on server to come online for reconnect' - ) - this.destroy() - this.emit('error', { code: 'DISCONNECTED' }) + + // connection function that sets listeners and deals with reconnect + const connect = () => { + if (this.opts.host === '127.0.0.1'|| this.opts.host ==='localhost') + log.warn('tcp consumer on same machine as host, use named Pipe(Unix) Socket Instead') + + if(!initial) { + this.once('connect', async () => { + clearTimeout(reconTimeout) + this._listen() + log.info({msg:'reconnected waiting for socket ready handshake'}) + this.setKeepAlive(this.keepAlive, 3000) + let [err, res] = await btc(isReady).bind(this)(this.__ready,this.wait,this.timeout) + if (err) reject(err) + log.info('rehandshake done, reauthenticating') + // TODO authenticate here by encrypting a payload with private key and sending that. + // await btc(authenticate) + this.emit('reconnected') // for end users to take action + resolve(res) + }) } - }) + + this.on('error', async err => { + if (err.code !== 'EISCONN') { + this._ready = false + this.emit('ready', false) + log.warn({ error: err.code }, `connect error ${err.code}, attempting reconnect`) + reconnect() + } + else { + this._ready = true + this.emit('ready', true) + log.info('reconnected to socket, ready to go again') + } + }) + + if (this.keepAlive) { // only attempt reconnect is keepAlive is set which it is by default + this.on('end', async () => { + log.warn('socket (server) terminated unexpectantly') + this._ready = false + log.info('keep alive was set, so waiting on server to come online for reconnect') + this.emit('error', { code: 'DISCONNECTED' }) + }) + } + + // attempt connection + log.info({ opts: this.opts, msg:`attempting to connect ${this.id} to socket`}) + super.connect(this.opts) + + } // end connect function connect() // initial connect request }) //end promise @@ -130,11 +157,9 @@ class SocketConsumer extends Socket { async send(ipacket) { return new Promise(async resolve => { - // need this for when multiple sends for different consumers use same packet instance - let packet = Object.assign({}, ipacket) - setTimeout(() => { - resolve({ error: 'no response from socket in 10sec' }) - }, 10000) + if (!this._ready) resolve({ error: 'socket consumer not connected, aborting send' }) + let packet = Object.assign({}, ipacket) // need to avoid mutuation for different consumers using same packet instance + setTimeout(() => {resolve({ error: 'no response from socket in 10sec' })}, 10000) packet._header = { id: Math.random() .toString() @@ -146,24 +171,20 @@ class SocketConsumer extends Socket { } let [err, res] = await btc(this.stream.serialize)(packet) if (err) - resolve({ - error: 'unable to serialize packet for sending', - packet: packet - }) + resolve({error: 'unable to serialize packet for sending',packet: packet}) await this.__write(res) this.once(packet._header.id, async function(reply) { let res = await this._packetProcess(reply) - if (!res) { - // if process was not promise returning like just logged to console + if (!res) { // if packetProcess was not promise res = reply - // log.warn('consumer function was not promise returning further processing may be out of sequence') + log.warn('consumer function was not promise returning further processing may be out of sequence') } resolve(res) }) //end listener }) } - // TODO register alt stream processor (emit 'message' with JSON, serialize function, onData method for raw socket chucks) + // TODO register user alt stream processor (emit 'message' with JSON, serialize function, onData method for raw socket chucks) // TODO register authenciation function (set up default) registerPacketProcessor(func) { @@ -195,7 +216,7 @@ class SocketConsumer extends Socket { // TODO do some extra security here? let res = await this._packetProcess(packet) if (!res) { - // if process was not promise returning like just logged to console + // if process was not promise returning then res will be undefined log.warn('consumer packet processing function was not promise returning') } }) @@ -204,7 +225,7 @@ class SocketConsumer extends Socket { this.stream.on('message', messageProcess.bind(this)) async function messageProcess(packet) { - // console.log('incoming packet from socket',packet) + log.debug('incoming packet from socket',packet) if (packet._handshake) { this._ready = true return @@ -217,9 +238,8 @@ class SocketConsumer extends Socket { // default packet process just a simple console logger. ignores any cmd: prop _packetProcess(packet) { - console.log( - 'default consumer processor -- log packet from socket to console' - ) + console.log('default consumer processor -- log packet from socket to console') + console.log('replace by calling .registerPacketProcessor(func) with your function') console.dir(packet) } } // end class diff --git a/src/socket-class.js b/src/socket-class.js index 8d7985c..05360a7 100644 --- a/src/socket-class.js +++ b/src/socket-class.js @@ -239,7 +239,7 @@ export default function socketClass(Server) { }) } - // must have a consumer socket bound to use + // consumer send, must have a consumer socket bound to use async _send(packet) { // timeout already set if sockect can't be drained in 10 secs return new Promise(resolve => {