Compare commits

...

1 Commits
master ... tls

Author SHA1 Message Date
David Kebler cec399ecc5 0.2.12 update deps, clean up logging 2019-04-26 10:15:11 -07:00
4 changed files with 35 additions and 49 deletions

View File

@ -1,6 +1,6 @@
{
"name": "@uci/socket",
"version": "0.2.11",
"version": "0.2.12",
"description": "JSON packet intra(named)/inter(TCP) host communication over socket",
"main": "src",
"scripts": {
@ -47,10 +47,10 @@
"nodemon": "^1.18.6"
},
"dependencies": {
"@uci-utils/logger": "0.0.13",
"@uci-utils/logger": "0.0.14",
"better-try-catch": "^0.6.2",
"clone": "^2.1.2",
"death": "^1.1.0",
"make-dir": "^2.0.0"
"make-dir": "^3.0.0"
}
}

View File

@ -35,7 +35,7 @@ class SocketConsumer extends Socket {
})
this.id = opts.id || opts.name || 'socket:' + new Date().getTime()
if (!opts.path) {
if(!opts.host) log.warn({ opts: opts, msg:'no host supplied using localhost...use named piped instead - opts.path'})
if(!opts.host) log.warn({method:'constructor', line:38, opts: opts, msg:'no host supplied using localhost...use named piped instead - opts.path'})
opts.host = opts.host || '127.0.0.1'
opts.port = opts.port || 8080
} else {
@ -66,7 +66,7 @@ class SocketConsumer extends Socket {
// this is only for initial connection
const initTimeout = setTimeout(() => {
log.fatal({ opts: this.opts }, `unable to connect in ${this.timeout}s`)
log.fatal({method:'connect', line:69, opts: this.opts, msg:`unable to connect in ${this.timeout}s`})
reject(
{ opts: this.opts },
`unable to connect to socket server in ${this.timeout}secs`
@ -77,12 +77,12 @@ class SocketConsumer extends Socket {
this.once('connect', async () => {
clearTimeout(initTimeout)
this._listen()
log.info({ opts: this.opts, msg:'initial connect waiting for socket ready handshake'})
log.debug({method:'connect', line:80, opts: this.opts, msg:'initial connect waiting for socket ready handshake'})
this.setKeepAlive(this.keepAlive, 3000)
let [err, res] = await btc(isReady).bind(this)(this.__ready,this.wait,this.timeout)
if (err) reject(err)
initial = false
log.info('handshake to socket done, authenticating')
log.debug({method:'connect', line:85, msg:'handshake to socket done, TODO authenticating'})
// TODO authenticate here by encrypting a payload with private key and sending that.
// await btc(authenticate)
this.emit('connected') // for end users to take action
@ -104,20 +104,21 @@ class SocketConsumer extends Socket {
// connection function that sets listeners and deals with reconnect
const connect = () => {
if (this.opts.host === '127.0.0.1'|| this.opts.host ==='localhost')
log.warn('tcp consumer on same machine as host, use named Pipe(Unix) Socket Instead')
log.warn({method:'connect', line:107, msg:'tcp consumer on same machine as host, use named Pipe(Unix) Socket Instead'})
if(!initial) {
this.once('connect', async () => {
clearTimeout(reconTimeout)
this._listen()
log.info({msg:'reconnected waiting for socket ready handshake'})
log.debug({method:'connect', line:113, msg:'reconnected waiting for socket ready handshake'})
this.setKeepAlive(this.keepAlive, 3000)
let [err, res] = await btc(isReady).bind(this)(this.__ready,this.wait,this.timeout)
if (err) reject(err)
log.info('rehandshake done, reauthenticating')
log.debug({method:'connect', line:69, msg:'rehandshake done, reauthenticating'})
// TODO authenticate here by encrypting a payload with private key and sending that.
// await btc(authenticate)
this.emit('reconnected') // for end users to take action
this.emit('connected')
this.emit('reconnected') // emit also reconnected for special end user action
resolve(res)
})
}
@ -126,27 +127,26 @@ class SocketConsumer extends Socket {
if (err.code !== 'EISCONN') {
this._ready = false
this.emit('ready', false)
log.warn({ error: err.code }, `connect error ${err.code}, attempting reconnect`)
log.debug({ method:'connect', line:130, error: err.code, msg:`connect error ${err.code}, attempting reconnect`})
reconnect()
}
else {
this._ready = true
this.emit('ready', true)
log.info('reconnected to socket, ready to go again')
log.error({method:'connect', line:69, msg:'reconnected to socket, ready to go again'})
}
})
if (this.keepAlive) { // only attempt reconnect is keepAlive is set which it is by default
this.on('end', async () => {
log.warn('socket (server) terminated unexpectantly')
log.error({method:'connect', line:142, msg:'socket (server) terminated unexpectantly, keepalive set, wait for server to come online'})
this._ready = false
log.info('keep alive was set, so waiting on server to come online for reconnect')
this.emit('error', { code: 'DISCONNECTED' })
})
}
// attempt connection
log.info({ opts: this.opts, msg:`attempting to connect ${this.id} to socket`})
log.debug({method:'connect', line:149, opts: this.opts, msg:`attempting to connect ${this.id} to socket`})
super.connect(this.opts)
} // end connect function
@ -177,7 +177,7 @@ class SocketConsumer extends Socket {
let res = await this._packetProcess(reply)
if (!res) { // if packetProcess was not promise
res = reply
log.warn('consumer function was not promise returning further processing may be out of sequence')
log.debug({method:'send', line:180, msg:'consumer function was not promise returning further processing may be out of sequence'})
}
resolve(res)
}) //end listener
@ -210,14 +210,14 @@ class SocketConsumer extends Socket {
}
async _listen() {
log.info('listening for incoming packets from socket')
log.debug('listening for incoming packets from socket')
// listen for pushed packets
this.on('pushed', async function(packet) {
// TODO do some extra security here?
let res = await this._packetProcess(packet)
if (!res) {
// if process was not promise returning then res will be undefined
log.warn('consumer packet processing function was not promise returning')
log.debug('consumer packet processing function was not promise returning')
}
})
// listen on socket stream
@ -257,7 +257,7 @@ function isReady(ready, wait = 30, timeout = 1000) {
`timeout waiting for socket ready handshake - ${timeout}ms`
)
if (ready()) return resolve('ready')
log.info(`waiting ${wait}ms for handshake`)
log.debug({function:'isReady', line:261, msg:`waiting ${wait}ms for handshake`})
time += wait
setTimeout(waitReady, wait)
})()

View File

@ -21,7 +21,6 @@ class JsonStream extends EventEmitter {
}
onData(data) {
// console.log('a chunk arrived', data)
data = decoder.write(data)
try {
this._handleData(data)
@ -37,7 +36,6 @@ class JsonStream extends EventEmitter {
let [err2, length] = btc(Buffer.byteLength)(messageData, 'utf8')
if (err2) reject(err2)
let data = length + this._delimeter + messageData
// console.log('serialized',data)
resolve(data)
})
}

View File

@ -81,7 +81,7 @@ export default function socketClass(Server) {
return new Promise(async (resolve, reject) => {
// set up a couple ways to gracefully destroy socket process is killed/aborted
_ON_DEATH(async () => {
log.info('\nhe\'s dead jim')
log.error({method:'create', line:84, msg:'\nhe\'s dead jim'})
await this._destroy()
})
process.once('SIGUSR2', async () => {
@ -96,27 +96,21 @@ export default function socketClass(Server) {
// if TCP socket should already be dead
let [err, res] = await btc(promisify(fileDelete))(this.opts.path)
if (!err) {
log.info(
{ res: res, socket: this.opts.path },
'socket already exists.....deleted'
)
log.debug({method:'create', line:99, res: res, socket: this.opts.path, msg:'socket already exists.....deleted'})
return await this._listen(this.opts)
}
log.fatal(
{ err: err },
'error deleting socket. Can not establish a socket'
)
log.error({method:'create', line:102, err: err, msg:'error deleting socket. Can not establish a socket'})
return err
}
}
if (err.code === 'EACCES') {
log.info({ socket: this.opts.path, msg:'directory does not exist...creating'})
log.debug({method:'create', line:107, socket: this.opts.path, msg:'directory does not exist...creating'})
await mkdir(path.dirname(this.opts.path))
log.info({ socket: this.opts.path, msg:'directory created'})
log.debug({method:'create', line:109, socket: this.opts.path, msg:'directory created'})
return await this._listen(this.opts)
}
// otherwise fatally exit
log.info(err, 'error creating socket')
log.error({method:'create', line:113, err:err, msg:'error creating socket'})
reject(err)
})
@ -145,10 +139,7 @@ export default function socketClass(Server) {
*/
async push(packet, id) {
packet._header = { id: id || 'pushed' }
log.info(
{ opts: this.opts, packet: packet },
'pushing a packet to all connected consumers'
)
log.debug({method:'push', line:142, opts: this.opts, packet: packet, msg:'pushing a packet to all connected consumers'})
this.clients.forEach(async client => {
if (client.writable) {
let [err, ser] = await btc(client.stream.serialize)(packet)
@ -173,14 +164,11 @@ export default function socketClass(Server) {
let send = this._send.bind(socket)
if (this.clientTracking) this.clients.push(socket)
// TODO add 'close' listener to socket to remove from this.clients
log.info('new consumer connecting')
log.info(await send(await stream.serialize({ _handshake: true })))
log.debug({method:'_listen', line:167, msg:'new consumer connecting'})
log.debug(await send(await stream.serialize({ _handshake: true })))
if (this.opts.conPacket) {
this.opts.conPacket._header = { id: 'pushed' }
log.info(
{ conPacket: this.opts.conPacket },
'pushing a preset command to just connected consumer'
)
log.debug({method:'_listen', line:171, conPacket: this.opts.conPacket, msg:'pushing a preset command to just connected consumer'})
send(await stream.serialize(this.opts.conPacket)) // send a packet command on to consumer on connection
}
socket.on('data', stream.onData)
@ -188,7 +176,7 @@ export default function socketClass(Server) {
stream.on('message', messageProcess.bind(this, socket))
async function messageProcess(client, packet) {
log.info({ packet: packet }, 'incoming packet on socket side')
log.debug({method:'_listen', line:179, packet: packet, msg:'incoming packet on socket side'})
let res = {}
if (this.clientTracking && packet.clientID) {
client.ID = packet.clientID
@ -217,18 +205,18 @@ export default function socketClass(Server) {
err: err,
_header: { id: res._header.id }
})
log.info(await send(ser))
await send(ser)
} // end process message
}) // end connecttion consumer
log.info({ opts: this.opts }, 'socket created and listening')
log.info({method:'_listen', line:211, opts: this.opt, msg:'socket created and listening'})
return res
}) // end super listen callback
} // end listen
async _destroy() {
log.info('closing down socket')
log.debug({method:'_destroy', line:217, msg:'closing down socket'})
await this.close()
log.info('all connections closed....exiting')
log.debug({method:'_destroy', line:219, msg:'all connections closed....exiting'})
process.exit()
}