From 072dd25dc48163588cd6a9fe61784a6b02e8dc42 Mon Sep 17 00:00:00 2001 From: David Kebler Date: Fri, 26 Apr 2019 10:14:57 -0700 Subject: [PATCH] 0.2.12 update deps, clean up logging, merged in tlc branch, TLS not implemented! but had many other changes to merge --- package.json | 6 +++--- src/consumer.js | 34 +++++++++++++++++----------------- src/json-stream.js | 2 -- src/socket-class.js | 42 +++++++++++++++--------------------------- 4 files changed, 35 insertions(+), 49 deletions(-) diff --git a/package.json b/package.json index fe7b95c..6741546 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@uci/socket", - "version": "0.2.11", + "version": "0.2.12", "description": "JSON packet intra(named)/inter(TCP) host communication over socket", "main": "src", "scripts": { @@ -47,10 +47,10 @@ "nodemon": "^1.18.6" }, "dependencies": { - "@uci-utils/logger": "0.0.13", + "@uci-utils/logger": "0.0.14", "better-try-catch": "^0.6.2", "clone": "^2.1.2", "death": "^1.1.0", - "make-dir": "^2.0.0" + "make-dir": "^3.0.0" } } diff --git a/src/consumer.js b/src/consumer.js index 64473d1..a314e42 100644 --- a/src/consumer.js +++ b/src/consumer.js @@ -35,7 +35,7 @@ class SocketConsumer extends Socket { }) this.id = opts.id || opts.name || 'socket:' + new Date().getTime() if (!opts.path) { - if(!opts.host) log.warn({ opts: opts, msg:'no host supplied using localhost...use named piped instead - opts.path'}) + if(!opts.host) log.warn({method:'constructor', line:38, opts: opts, msg:'no host supplied using localhost...use named piped instead - opts.path'}) opts.host = opts.host || '127.0.0.1' opts.port = opts.port || 8080 } else { @@ -66,7 +66,7 @@ class SocketConsumer extends Socket { // this is only for initial connection const initTimeout = setTimeout(() => { - log.fatal({ opts: this.opts }, `unable to connect in ${this.timeout}s`) + log.fatal({method:'connect', line:69, opts: this.opts, msg:`unable to connect in ${this.timeout}s`}) reject( { opts: this.opts }, `unable to connect to socket server in ${this.timeout}secs` @@ -77,12 +77,12 @@ class SocketConsumer extends Socket { this.once('connect', async () => { clearTimeout(initTimeout) this._listen() - log.info({ opts: this.opts, msg:'initial connect waiting for socket ready handshake'}) + log.debug({method:'connect', line:80, opts: this.opts, msg:'initial connect waiting for socket ready handshake'}) this.setKeepAlive(this.keepAlive, 3000) let [err, res] = await btc(isReady).bind(this)(this.__ready,this.wait,this.timeout) if (err) reject(err) initial = false - log.info('handshake to socket done, authenticating') + log.debug({method:'connect', line:85, msg:'handshake to socket done, TODO authenticating'}) // TODO authenticate here by encrypting a payload with private key and sending that. // await btc(authenticate) this.emit('connected') // for end users to take action @@ -104,20 +104,21 @@ class SocketConsumer extends Socket { // connection function that sets listeners and deals with reconnect const connect = () => { if (this.opts.host === '127.0.0.1'|| this.opts.host ==='localhost') - log.warn('tcp consumer on same machine as host, use named Pipe(Unix) Socket Instead') + log.warn({method:'connect', line:107, msg:'tcp consumer on same machine as host, use named Pipe(Unix) Socket Instead'}) if(!initial) { this.once('connect', async () => { clearTimeout(reconTimeout) this._listen() - log.info({msg:'reconnected waiting for socket ready handshake'}) + log.debug({method:'connect', line:113, msg:'reconnected waiting for socket ready handshake'}) this.setKeepAlive(this.keepAlive, 3000) let [err, res] = await btc(isReady).bind(this)(this.__ready,this.wait,this.timeout) if (err) reject(err) - log.info('rehandshake done, reauthenticating') + log.debug({method:'connect', line:69, msg:'rehandshake done, reauthenticating'}) // TODO authenticate here by encrypting a payload with private key and sending that. // await btc(authenticate) - this.emit('reconnected') // for end users to take action + this.emit('connected') + this.emit('reconnected') // emit also reconnected for special end user action resolve(res) }) } @@ -126,27 +127,26 @@ class SocketConsumer extends Socket { if (err.code !== 'EISCONN') { this._ready = false this.emit('ready', false) - log.warn({ error: err.code }, `connect error ${err.code}, attempting reconnect`) + log.debug({ method:'connect', line:130, error: err.code, msg:`connect error ${err.code}, attempting reconnect`}) reconnect() } else { this._ready = true this.emit('ready', true) - log.info('reconnected to socket, ready to go again') + log.error({method:'connect', line:69, msg:'reconnected to socket, ready to go again'}) } }) if (this.keepAlive) { // only attempt reconnect is keepAlive is set which it is by default this.on('end', async () => { - log.warn('socket (server) terminated unexpectantly') + log.error({method:'connect', line:142, msg:'socket (server) terminated unexpectantly, keepalive set, wait for server to come online'}) this._ready = false - log.info('keep alive was set, so waiting on server to come online for reconnect') this.emit('error', { code: 'DISCONNECTED' }) }) } // attempt connection - log.info({ opts: this.opts, msg:`attempting to connect ${this.id} to socket`}) + log.debug({method:'connect', line:149, opts: this.opts, msg:`attempting to connect ${this.id} to socket`}) super.connect(this.opts) } // end connect function @@ -177,7 +177,7 @@ class SocketConsumer extends Socket { let res = await this._packetProcess(reply) if (!res) { // if packetProcess was not promise res = reply - log.warn('consumer function was not promise returning further processing may be out of sequence') + log.debug({method:'send', line:180, msg:'consumer function was not promise returning further processing may be out of sequence'}) } resolve(res) }) //end listener @@ -210,14 +210,14 @@ class SocketConsumer extends Socket { } async _listen() { - log.info('listening for incoming packets from socket') + log.debug('listening for incoming packets from socket') // listen for pushed packets this.on('pushed', async function(packet) { // TODO do some extra security here? let res = await this._packetProcess(packet) if (!res) { // if process was not promise returning then res will be undefined - log.warn('consumer packet processing function was not promise returning') + log.debug('consumer packet processing function was not promise returning') } }) // listen on socket stream @@ -257,7 +257,7 @@ function isReady(ready, wait = 30, timeout = 1000) { `timeout waiting for socket ready handshake - ${timeout}ms` ) if (ready()) return resolve('ready') - log.info(`waiting ${wait}ms for handshake`) + log.debug({function:'isReady', line:261, msg:`waiting ${wait}ms for handshake`}) time += wait setTimeout(waitReady, wait) })() diff --git a/src/json-stream.js b/src/json-stream.js index 74c7d92..d907677 100644 --- a/src/json-stream.js +++ b/src/json-stream.js @@ -21,7 +21,6 @@ class JsonStream extends EventEmitter { } onData(data) { - // console.log('a chunk arrived', data) data = decoder.write(data) try { this._handleData(data) @@ -37,7 +36,6 @@ class JsonStream extends EventEmitter { let [err2, length] = btc(Buffer.byteLength)(messageData, 'utf8') if (err2) reject(err2) let data = length + this._delimeter + messageData - // console.log('serialized',data) resolve(data) }) } diff --git a/src/socket-class.js b/src/socket-class.js index 05360a7..be4f9d6 100644 --- a/src/socket-class.js +++ b/src/socket-class.js @@ -81,7 +81,7 @@ export default function socketClass(Server) { return new Promise(async (resolve, reject) => { // set up a couple ways to gracefully destroy socket process is killed/aborted _ON_DEATH(async () => { - log.info('\nhe\'s dead jim') + log.error({method:'create', line:84, msg:'\nhe\'s dead jim'}) await this._destroy() }) process.once('SIGUSR2', async () => { @@ -96,27 +96,21 @@ export default function socketClass(Server) { // if TCP socket should already be dead let [err, res] = await btc(promisify(fileDelete))(this.opts.path) if (!err) { - log.info( - { res: res, socket: this.opts.path }, - 'socket already exists.....deleted' - ) + log.debug({method:'create', line:99, res: res, socket: this.opts.path, msg:'socket already exists.....deleted'}) return await this._listen(this.opts) } - log.fatal( - { err: err }, - 'error deleting socket. Can not establish a socket' - ) + log.error({method:'create', line:102, err: err, msg:'error deleting socket. Can not establish a socket'}) return err } } if (err.code === 'EACCES') { - log.info({ socket: this.opts.path, msg:'directory does not exist...creating'}) + log.debug({method:'create', line:107, socket: this.opts.path, msg:'directory does not exist...creating'}) await mkdir(path.dirname(this.opts.path)) - log.info({ socket: this.opts.path, msg:'directory created'}) + log.debug({method:'create', line:109, socket: this.opts.path, msg:'directory created'}) return await this._listen(this.opts) } // otherwise fatally exit - log.info(err, 'error creating socket') + log.error({method:'create', line:113, err:err, msg:'error creating socket'}) reject(err) }) @@ -145,10 +139,7 @@ export default function socketClass(Server) { */ async push(packet, id) { packet._header = { id: id || 'pushed' } - log.info( - { opts: this.opts, packet: packet }, - 'pushing a packet to all connected consumers' - ) + log.debug({method:'push', line:142, opts: this.opts, packet: packet, msg:'pushing a packet to all connected consumers'}) this.clients.forEach(async client => { if (client.writable) { let [err, ser] = await btc(client.stream.serialize)(packet) @@ -173,14 +164,11 @@ export default function socketClass(Server) { let send = this._send.bind(socket) if (this.clientTracking) this.clients.push(socket) // TODO add 'close' listener to socket to remove from this.clients - log.info('new consumer connecting') - log.info(await send(await stream.serialize({ _handshake: true }))) + log.debug({method:'_listen', line:167, msg:'new consumer connecting'}) + log.debug(await send(await stream.serialize({ _handshake: true }))) if (this.opts.conPacket) { this.opts.conPacket._header = { id: 'pushed' } - log.info( - { conPacket: this.opts.conPacket }, - 'pushing a preset command to just connected consumer' - ) + log.debug({method:'_listen', line:171, conPacket: this.opts.conPacket, msg:'pushing a preset command to just connected consumer'}) send(await stream.serialize(this.opts.conPacket)) // send a packet command on to consumer on connection } socket.on('data', stream.onData) @@ -188,7 +176,7 @@ export default function socketClass(Server) { stream.on('message', messageProcess.bind(this, socket)) async function messageProcess(client, packet) { - log.info({ packet: packet }, 'incoming packet on socket side') + log.debug({method:'_listen', line:179, packet: packet, msg:'incoming packet on socket side'}) let res = {} if (this.clientTracking && packet.clientID) { client.ID = packet.clientID @@ -217,18 +205,18 @@ export default function socketClass(Server) { err: err, _header: { id: res._header.id } }) - log.info(await send(ser)) + await send(ser) } // end process message }) // end connecttion consumer - log.info({ opts: this.opts }, 'socket created and listening') + log.info({method:'_listen', line:211, opts: this.opt, msg:'socket created and listening'}) return res }) // end super listen callback } // end listen async _destroy() { - log.info('closing down socket') + log.debug({method:'_destroy', line:217, msg:'closing down socket'}) await this.close() - log.info('all connections closed....exiting') + log.debug({method:'_destroy', line:219, msg:'all connections closed....exiting'}) process.exit() }