0.2.12 update deps, clean up logging, merged in tlc branch, TLS not implemented! but had many other changes to merge

master
David Kebler 2019-04-26 10:14:57 -07:00
parent 81bb898ab4
commit 072dd25dc4
4 changed files with 35 additions and 49 deletions

View File

@ -1,6 +1,6 @@
{ {
"name": "@uci/socket", "name": "@uci/socket",
"version": "0.2.11", "version": "0.2.12",
"description": "JSON packet intra(named)/inter(TCP) host communication over socket", "description": "JSON packet intra(named)/inter(TCP) host communication over socket",
"main": "src", "main": "src",
"scripts": { "scripts": {
@ -47,10 +47,10 @@
"nodemon": "^1.18.6" "nodemon": "^1.18.6"
}, },
"dependencies": { "dependencies": {
"@uci-utils/logger": "0.0.13", "@uci-utils/logger": "0.0.14",
"better-try-catch": "^0.6.2", "better-try-catch": "^0.6.2",
"clone": "^2.1.2", "clone": "^2.1.2",
"death": "^1.1.0", "death": "^1.1.0",
"make-dir": "^2.0.0" "make-dir": "^3.0.0"
} }
} }

View File

@ -35,7 +35,7 @@ class SocketConsumer extends Socket {
}) })
this.id = opts.id || opts.name || 'socket:' + new Date().getTime() this.id = opts.id || opts.name || 'socket:' + new Date().getTime()
if (!opts.path) { if (!opts.path) {
if(!opts.host) log.warn({ opts: opts, msg:'no host supplied using localhost...use named piped instead - opts.path'}) if(!opts.host) log.warn({method:'constructor', line:38, opts: opts, msg:'no host supplied using localhost...use named piped instead - opts.path'})
opts.host = opts.host || '127.0.0.1' opts.host = opts.host || '127.0.0.1'
opts.port = opts.port || 8080 opts.port = opts.port || 8080
} else { } else {
@ -66,7 +66,7 @@ class SocketConsumer extends Socket {
// this is only for initial connection // this is only for initial connection
const initTimeout = setTimeout(() => { const initTimeout = setTimeout(() => {
log.fatal({ opts: this.opts }, `unable to connect in ${this.timeout}s`) log.fatal({method:'connect', line:69, opts: this.opts, msg:`unable to connect in ${this.timeout}s`})
reject( reject(
{ opts: this.opts }, { opts: this.opts },
`unable to connect to socket server in ${this.timeout}secs` `unable to connect to socket server in ${this.timeout}secs`
@ -77,12 +77,12 @@ class SocketConsumer extends Socket {
this.once('connect', async () => { this.once('connect', async () => {
clearTimeout(initTimeout) clearTimeout(initTimeout)
this._listen() this._listen()
log.info({ opts: this.opts, msg:'initial connect waiting for socket ready handshake'}) log.debug({method:'connect', line:80, opts: this.opts, msg:'initial connect waiting for socket ready handshake'})
this.setKeepAlive(this.keepAlive, 3000) this.setKeepAlive(this.keepAlive, 3000)
let [err, res] = await btc(isReady).bind(this)(this.__ready,this.wait,this.timeout) let [err, res] = await btc(isReady).bind(this)(this.__ready,this.wait,this.timeout)
if (err) reject(err) if (err) reject(err)
initial = false initial = false
log.info('handshake to socket done, authenticating') log.debug({method:'connect', line:85, msg:'handshake to socket done, TODO authenticating'})
// TODO authenticate here by encrypting a payload with private key and sending that. // TODO authenticate here by encrypting a payload with private key and sending that.
// await btc(authenticate) // await btc(authenticate)
this.emit('connected') // for end users to take action this.emit('connected') // for end users to take action
@ -104,20 +104,21 @@ class SocketConsumer extends Socket {
// connection function that sets listeners and deals with reconnect // connection function that sets listeners and deals with reconnect
const connect = () => { const connect = () => {
if (this.opts.host === '127.0.0.1'|| this.opts.host ==='localhost') if (this.opts.host === '127.0.0.1'|| this.opts.host ==='localhost')
log.warn('tcp consumer on same machine as host, use named Pipe(Unix) Socket Instead') log.warn({method:'connect', line:107, msg:'tcp consumer on same machine as host, use named Pipe(Unix) Socket Instead'})
if(!initial) { if(!initial) {
this.once('connect', async () => { this.once('connect', async () => {
clearTimeout(reconTimeout) clearTimeout(reconTimeout)
this._listen() this._listen()
log.info({msg:'reconnected waiting for socket ready handshake'}) log.debug({method:'connect', line:113, msg:'reconnected waiting for socket ready handshake'})
this.setKeepAlive(this.keepAlive, 3000) this.setKeepAlive(this.keepAlive, 3000)
let [err, res] = await btc(isReady).bind(this)(this.__ready,this.wait,this.timeout) let [err, res] = await btc(isReady).bind(this)(this.__ready,this.wait,this.timeout)
if (err) reject(err) if (err) reject(err)
log.info('rehandshake done, reauthenticating') log.debug({method:'connect', line:69, msg:'rehandshake done, reauthenticating'})
// TODO authenticate here by encrypting a payload with private key and sending that. // TODO authenticate here by encrypting a payload with private key and sending that.
// await btc(authenticate) // await btc(authenticate)
this.emit('reconnected') // for end users to take action this.emit('connected')
this.emit('reconnected') // emit also reconnected for special end user action
resolve(res) resolve(res)
}) })
} }
@ -126,27 +127,26 @@ class SocketConsumer extends Socket {
if (err.code !== 'EISCONN') { if (err.code !== 'EISCONN') {
this._ready = false this._ready = false
this.emit('ready', false) this.emit('ready', false)
log.warn({ error: err.code }, `connect error ${err.code}, attempting reconnect`) log.debug({ method:'connect', line:130, error: err.code, msg:`connect error ${err.code}, attempting reconnect`})
reconnect() reconnect()
} }
else { else {
this._ready = true this._ready = true
this.emit('ready', true) this.emit('ready', true)
log.info('reconnected to socket, ready to go again') log.error({method:'connect', line:69, msg:'reconnected to socket, ready to go again'})
} }
}) })
if (this.keepAlive) { // only attempt reconnect is keepAlive is set which it is by default if (this.keepAlive) { // only attempt reconnect is keepAlive is set which it is by default
this.on('end', async () => { this.on('end', async () => {
log.warn('socket (server) terminated unexpectantly') log.error({method:'connect', line:142, msg:'socket (server) terminated unexpectantly, keepalive set, wait for server to come online'})
this._ready = false this._ready = false
log.info('keep alive was set, so waiting on server to come online for reconnect')
this.emit('error', { code: 'DISCONNECTED' }) this.emit('error', { code: 'DISCONNECTED' })
}) })
} }
// attempt connection // attempt connection
log.info({ opts: this.opts, msg:`attempting to connect ${this.id} to socket`}) log.debug({method:'connect', line:149, opts: this.opts, msg:`attempting to connect ${this.id} to socket`})
super.connect(this.opts) super.connect(this.opts)
} // end connect function } // end connect function
@ -177,7 +177,7 @@ class SocketConsumer extends Socket {
let res = await this._packetProcess(reply) let res = await this._packetProcess(reply)
if (!res) { // if packetProcess was not promise if (!res) { // if packetProcess was not promise
res = reply res = reply
log.warn('consumer function was not promise returning further processing may be out of sequence') log.debug({method:'send', line:180, msg:'consumer function was not promise returning further processing may be out of sequence'})
} }
resolve(res) resolve(res)
}) //end listener }) //end listener
@ -210,14 +210,14 @@ class SocketConsumer extends Socket {
} }
async _listen() { async _listen() {
log.info('listening for incoming packets from socket') log.debug('listening for incoming packets from socket')
// listen for pushed packets // listen for pushed packets
this.on('pushed', async function(packet) { this.on('pushed', async function(packet) {
// TODO do some extra security here? // TODO do some extra security here?
let res = await this._packetProcess(packet) let res = await this._packetProcess(packet)
if (!res) { if (!res) {
// if process was not promise returning then res will be undefined // if process was not promise returning then res will be undefined
log.warn('consumer packet processing function was not promise returning') log.debug('consumer packet processing function was not promise returning')
} }
}) })
// listen on socket stream // listen on socket stream
@ -257,7 +257,7 @@ function isReady(ready, wait = 30, timeout = 1000) {
`timeout waiting for socket ready handshake - ${timeout}ms` `timeout waiting for socket ready handshake - ${timeout}ms`
) )
if (ready()) return resolve('ready') if (ready()) return resolve('ready')
log.info(`waiting ${wait}ms for handshake`) log.debug({function:'isReady', line:261, msg:`waiting ${wait}ms for handshake`})
time += wait time += wait
setTimeout(waitReady, wait) setTimeout(waitReady, wait)
})() })()

View File

@ -21,7 +21,6 @@ class JsonStream extends EventEmitter {
} }
onData(data) { onData(data) {
// console.log('a chunk arrived', data)
data = decoder.write(data) data = decoder.write(data)
try { try {
this._handleData(data) this._handleData(data)
@ -37,7 +36,6 @@ class JsonStream extends EventEmitter {
let [err2, length] = btc(Buffer.byteLength)(messageData, 'utf8') let [err2, length] = btc(Buffer.byteLength)(messageData, 'utf8')
if (err2) reject(err2) if (err2) reject(err2)
let data = length + this._delimeter + messageData let data = length + this._delimeter + messageData
// console.log('serialized',data)
resolve(data) resolve(data)
}) })
} }

View File

@ -81,7 +81,7 @@ export default function socketClass(Server) {
return new Promise(async (resolve, reject) => { return new Promise(async (resolve, reject) => {
// set up a couple ways to gracefully destroy socket process is killed/aborted // set up a couple ways to gracefully destroy socket process is killed/aborted
_ON_DEATH(async () => { _ON_DEATH(async () => {
log.info('\nhe\'s dead jim') log.error({method:'create', line:84, msg:'\nhe\'s dead jim'})
await this._destroy() await this._destroy()
}) })
process.once('SIGUSR2', async () => { process.once('SIGUSR2', async () => {
@ -96,27 +96,21 @@ export default function socketClass(Server) {
// if TCP socket should already be dead // if TCP socket should already be dead
let [err, res] = await btc(promisify(fileDelete))(this.opts.path) let [err, res] = await btc(promisify(fileDelete))(this.opts.path)
if (!err) { if (!err) {
log.info( log.debug({method:'create', line:99, res: res, socket: this.opts.path, msg:'socket already exists.....deleted'})
{ res: res, socket: this.opts.path },
'socket already exists.....deleted'
)
return await this._listen(this.opts) return await this._listen(this.opts)
} }
log.fatal( log.error({method:'create', line:102, err: err, msg:'error deleting socket. Can not establish a socket'})
{ err: err },
'error deleting socket. Can not establish a socket'
)
return err return err
} }
} }
if (err.code === 'EACCES') { if (err.code === 'EACCES') {
log.info({ socket: this.opts.path, msg:'directory does not exist...creating'}) log.debug({method:'create', line:107, socket: this.opts.path, msg:'directory does not exist...creating'})
await mkdir(path.dirname(this.opts.path)) await mkdir(path.dirname(this.opts.path))
log.info({ socket: this.opts.path, msg:'directory created'}) log.debug({method:'create', line:109, socket: this.opts.path, msg:'directory created'})
return await this._listen(this.opts) return await this._listen(this.opts)
} }
// otherwise fatally exit // otherwise fatally exit
log.info(err, 'error creating socket') log.error({method:'create', line:113, err:err, msg:'error creating socket'})
reject(err) reject(err)
}) })
@ -145,10 +139,7 @@ export default function socketClass(Server) {
*/ */
async push(packet, id) { async push(packet, id) {
packet._header = { id: id || 'pushed' } packet._header = { id: id || 'pushed' }
log.info( log.debug({method:'push', line:142, opts: this.opts, packet: packet, msg:'pushing a packet to all connected consumers'})
{ opts: this.opts, packet: packet },
'pushing a packet to all connected consumers'
)
this.clients.forEach(async client => { this.clients.forEach(async client => {
if (client.writable) { if (client.writable) {
let [err, ser] = await btc(client.stream.serialize)(packet) let [err, ser] = await btc(client.stream.serialize)(packet)
@ -173,14 +164,11 @@ export default function socketClass(Server) {
let send = this._send.bind(socket) let send = this._send.bind(socket)
if (this.clientTracking) this.clients.push(socket) if (this.clientTracking) this.clients.push(socket)
// TODO add 'close' listener to socket to remove from this.clients // TODO add 'close' listener to socket to remove from this.clients
log.info('new consumer connecting') log.debug({method:'_listen', line:167, msg:'new consumer connecting'})
log.info(await send(await stream.serialize({ _handshake: true }))) log.debug(await send(await stream.serialize({ _handshake: true })))
if (this.opts.conPacket) { if (this.opts.conPacket) {
this.opts.conPacket._header = { id: 'pushed' } this.opts.conPacket._header = { id: 'pushed' }
log.info( log.debug({method:'_listen', line:171, conPacket: this.opts.conPacket, msg:'pushing a preset command to just connected consumer'})
{ conPacket: this.opts.conPacket },
'pushing a preset command to just connected consumer'
)
send(await stream.serialize(this.opts.conPacket)) // send a packet command on to consumer on connection send(await stream.serialize(this.opts.conPacket)) // send a packet command on to consumer on connection
} }
socket.on('data', stream.onData) socket.on('data', stream.onData)
@ -188,7 +176,7 @@ export default function socketClass(Server) {
stream.on('message', messageProcess.bind(this, socket)) stream.on('message', messageProcess.bind(this, socket))
async function messageProcess(client, packet) { async function messageProcess(client, packet) {
log.info({ packet: packet }, 'incoming packet on socket side') log.debug({method:'_listen', line:179, packet: packet, msg:'incoming packet on socket side'})
let res = {} let res = {}
if (this.clientTracking && packet.clientID) { if (this.clientTracking && packet.clientID) {
client.ID = packet.clientID client.ID = packet.clientID
@ -217,18 +205,18 @@ export default function socketClass(Server) {
err: err, err: err,
_header: { id: res._header.id } _header: { id: res._header.id }
}) })
log.info(await send(ser)) await send(ser)
} // end process message } // end process message
}) // end connecttion consumer }) // end connecttion consumer
log.info({ opts: this.opts }, 'socket created and listening') log.info({method:'_listen', line:211, opts: this.opt, msg:'socket created and listening'})
return res return res
}) // end super listen callback }) // end super listen callback
} // end listen } // end listen
async _destroy() { async _destroy() {
log.info('closing down socket') log.debug({method:'_destroy', line:217, msg:'closing down socket'})
await this.close() await this.close()
log.info('all connections closed....exiting') log.debug({method:'_destroy', line:219, msg:'all connections closed....exiting'})
process.exit() process.exit()
} }