0.2.12 update deps, clean up logging
parent
81bb898ab4
commit
cec399ecc5
|
@ -1,6 +1,6 @@
|
||||||
{
|
{
|
||||||
"name": "@uci/socket",
|
"name": "@uci/socket",
|
||||||
"version": "0.2.11",
|
"version": "0.2.12",
|
||||||
"description": "JSON packet intra(named)/inter(TCP) host communication over socket",
|
"description": "JSON packet intra(named)/inter(TCP) host communication over socket",
|
||||||
"main": "src",
|
"main": "src",
|
||||||
"scripts": {
|
"scripts": {
|
||||||
|
@ -47,10 +47,10 @@
|
||||||
"nodemon": "^1.18.6"
|
"nodemon": "^1.18.6"
|
||||||
},
|
},
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"@uci-utils/logger": "0.0.13",
|
"@uci-utils/logger": "0.0.14",
|
||||||
"better-try-catch": "^0.6.2",
|
"better-try-catch": "^0.6.2",
|
||||||
"clone": "^2.1.2",
|
"clone": "^2.1.2",
|
||||||
"death": "^1.1.0",
|
"death": "^1.1.0",
|
||||||
"make-dir": "^2.0.0"
|
"make-dir": "^3.0.0"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,7 +35,7 @@ class SocketConsumer extends Socket {
|
||||||
})
|
})
|
||||||
this.id = opts.id || opts.name || 'socket:' + new Date().getTime()
|
this.id = opts.id || opts.name || 'socket:' + new Date().getTime()
|
||||||
if (!opts.path) {
|
if (!opts.path) {
|
||||||
if(!opts.host) log.warn({ opts: opts, msg:'no host supplied using localhost...use named piped instead - opts.path'})
|
if(!opts.host) log.warn({method:'constructor', line:38, opts: opts, msg:'no host supplied using localhost...use named piped instead - opts.path'})
|
||||||
opts.host = opts.host || '127.0.0.1'
|
opts.host = opts.host || '127.0.0.1'
|
||||||
opts.port = opts.port || 8080
|
opts.port = opts.port || 8080
|
||||||
} else {
|
} else {
|
||||||
|
@ -66,7 +66,7 @@ class SocketConsumer extends Socket {
|
||||||
|
|
||||||
// this is only for initial connection
|
// this is only for initial connection
|
||||||
const initTimeout = setTimeout(() => {
|
const initTimeout = setTimeout(() => {
|
||||||
log.fatal({ opts: this.opts }, `unable to connect in ${this.timeout}s`)
|
log.fatal({method:'connect', line:69, opts: this.opts, msg:`unable to connect in ${this.timeout}s`})
|
||||||
reject(
|
reject(
|
||||||
{ opts: this.opts },
|
{ opts: this.opts },
|
||||||
`unable to connect to socket server in ${this.timeout}secs`
|
`unable to connect to socket server in ${this.timeout}secs`
|
||||||
|
@ -77,12 +77,12 @@ class SocketConsumer extends Socket {
|
||||||
this.once('connect', async () => {
|
this.once('connect', async () => {
|
||||||
clearTimeout(initTimeout)
|
clearTimeout(initTimeout)
|
||||||
this._listen()
|
this._listen()
|
||||||
log.info({ opts: this.opts, msg:'initial connect waiting for socket ready handshake'})
|
log.debug({method:'connect', line:80, opts: this.opts, msg:'initial connect waiting for socket ready handshake'})
|
||||||
this.setKeepAlive(this.keepAlive, 3000)
|
this.setKeepAlive(this.keepAlive, 3000)
|
||||||
let [err, res] = await btc(isReady).bind(this)(this.__ready,this.wait,this.timeout)
|
let [err, res] = await btc(isReady).bind(this)(this.__ready,this.wait,this.timeout)
|
||||||
if (err) reject(err)
|
if (err) reject(err)
|
||||||
initial = false
|
initial = false
|
||||||
log.info('handshake to socket done, authenticating')
|
log.debug({method:'connect', line:85, msg:'handshake to socket done, TODO authenticating'})
|
||||||
// TODO authenticate here by encrypting a payload with private key and sending that.
|
// TODO authenticate here by encrypting a payload with private key and sending that.
|
||||||
// await btc(authenticate)
|
// await btc(authenticate)
|
||||||
this.emit('connected') // for end users to take action
|
this.emit('connected') // for end users to take action
|
||||||
|
@ -104,20 +104,21 @@ class SocketConsumer extends Socket {
|
||||||
// connection function that sets listeners and deals with reconnect
|
// connection function that sets listeners and deals with reconnect
|
||||||
const connect = () => {
|
const connect = () => {
|
||||||
if (this.opts.host === '127.0.0.1'|| this.opts.host ==='localhost')
|
if (this.opts.host === '127.0.0.1'|| this.opts.host ==='localhost')
|
||||||
log.warn('tcp consumer on same machine as host, use named Pipe(Unix) Socket Instead')
|
log.warn({method:'connect', line:107, msg:'tcp consumer on same machine as host, use named Pipe(Unix) Socket Instead'})
|
||||||
|
|
||||||
if(!initial) {
|
if(!initial) {
|
||||||
this.once('connect', async () => {
|
this.once('connect', async () => {
|
||||||
clearTimeout(reconTimeout)
|
clearTimeout(reconTimeout)
|
||||||
this._listen()
|
this._listen()
|
||||||
log.info({msg:'reconnected waiting for socket ready handshake'})
|
log.debug({method:'connect', line:113, msg:'reconnected waiting for socket ready handshake'})
|
||||||
this.setKeepAlive(this.keepAlive, 3000)
|
this.setKeepAlive(this.keepAlive, 3000)
|
||||||
let [err, res] = await btc(isReady).bind(this)(this.__ready,this.wait,this.timeout)
|
let [err, res] = await btc(isReady).bind(this)(this.__ready,this.wait,this.timeout)
|
||||||
if (err) reject(err)
|
if (err) reject(err)
|
||||||
log.info('rehandshake done, reauthenticating')
|
log.debug({method:'connect', line:69, msg:'rehandshake done, reauthenticating'})
|
||||||
// TODO authenticate here by encrypting a payload with private key and sending that.
|
// TODO authenticate here by encrypting a payload with private key and sending that.
|
||||||
// await btc(authenticate)
|
// await btc(authenticate)
|
||||||
this.emit('reconnected') // for end users to take action
|
this.emit('connected')
|
||||||
|
this.emit('reconnected') // emit also reconnected for special end user action
|
||||||
resolve(res)
|
resolve(res)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -126,27 +127,26 @@ class SocketConsumer extends Socket {
|
||||||
if (err.code !== 'EISCONN') {
|
if (err.code !== 'EISCONN') {
|
||||||
this._ready = false
|
this._ready = false
|
||||||
this.emit('ready', false)
|
this.emit('ready', false)
|
||||||
log.warn({ error: err.code }, `connect error ${err.code}, attempting reconnect`)
|
log.debug({ method:'connect', line:130, error: err.code, msg:`connect error ${err.code}, attempting reconnect`})
|
||||||
reconnect()
|
reconnect()
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
this._ready = true
|
this._ready = true
|
||||||
this.emit('ready', true)
|
this.emit('ready', true)
|
||||||
log.info('reconnected to socket, ready to go again')
|
log.error({method:'connect', line:69, msg:'reconnected to socket, ready to go again'})
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
if (this.keepAlive) { // only attempt reconnect is keepAlive is set which it is by default
|
if (this.keepAlive) { // only attempt reconnect is keepAlive is set which it is by default
|
||||||
this.on('end', async () => {
|
this.on('end', async () => {
|
||||||
log.warn('socket (server) terminated unexpectantly')
|
log.error({method:'connect', line:142, msg:'socket (server) terminated unexpectantly, keepalive set, wait for server to come online'})
|
||||||
this._ready = false
|
this._ready = false
|
||||||
log.info('keep alive was set, so waiting on server to come online for reconnect')
|
|
||||||
this.emit('error', { code: 'DISCONNECTED' })
|
this.emit('error', { code: 'DISCONNECTED' })
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// attempt connection
|
// attempt connection
|
||||||
log.info({ opts: this.opts, msg:`attempting to connect ${this.id} to socket`})
|
log.debug({method:'connect', line:149, opts: this.opts, msg:`attempting to connect ${this.id} to socket`})
|
||||||
super.connect(this.opts)
|
super.connect(this.opts)
|
||||||
|
|
||||||
} // end connect function
|
} // end connect function
|
||||||
|
@ -177,7 +177,7 @@ class SocketConsumer extends Socket {
|
||||||
let res = await this._packetProcess(reply)
|
let res = await this._packetProcess(reply)
|
||||||
if (!res) { // if packetProcess was not promise
|
if (!res) { // if packetProcess was not promise
|
||||||
res = reply
|
res = reply
|
||||||
log.warn('consumer function was not promise returning further processing may be out of sequence')
|
log.debug({method:'send', line:180, msg:'consumer function was not promise returning further processing may be out of sequence'})
|
||||||
}
|
}
|
||||||
resolve(res)
|
resolve(res)
|
||||||
}) //end listener
|
}) //end listener
|
||||||
|
@ -210,14 +210,14 @@ class SocketConsumer extends Socket {
|
||||||
}
|
}
|
||||||
|
|
||||||
async _listen() {
|
async _listen() {
|
||||||
log.info('listening for incoming packets from socket')
|
log.debug('listening for incoming packets from socket')
|
||||||
// listen for pushed packets
|
// listen for pushed packets
|
||||||
this.on('pushed', async function(packet) {
|
this.on('pushed', async function(packet) {
|
||||||
// TODO do some extra security here?
|
// TODO do some extra security here?
|
||||||
let res = await this._packetProcess(packet)
|
let res = await this._packetProcess(packet)
|
||||||
if (!res) {
|
if (!res) {
|
||||||
// if process was not promise returning then res will be undefined
|
// if process was not promise returning then res will be undefined
|
||||||
log.warn('consumer packet processing function was not promise returning')
|
log.debug('consumer packet processing function was not promise returning')
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
// listen on socket stream
|
// listen on socket stream
|
||||||
|
@ -257,7 +257,7 @@ function isReady(ready, wait = 30, timeout = 1000) {
|
||||||
`timeout waiting for socket ready handshake - ${timeout}ms`
|
`timeout waiting for socket ready handshake - ${timeout}ms`
|
||||||
)
|
)
|
||||||
if (ready()) return resolve('ready')
|
if (ready()) return resolve('ready')
|
||||||
log.info(`waiting ${wait}ms for handshake`)
|
log.debug({function:'isReady', line:261, msg:`waiting ${wait}ms for handshake`})
|
||||||
time += wait
|
time += wait
|
||||||
setTimeout(waitReady, wait)
|
setTimeout(waitReady, wait)
|
||||||
})()
|
})()
|
||||||
|
|
|
@ -21,7 +21,6 @@ class JsonStream extends EventEmitter {
|
||||||
}
|
}
|
||||||
|
|
||||||
onData(data) {
|
onData(data) {
|
||||||
// console.log('a chunk arrived', data)
|
|
||||||
data = decoder.write(data)
|
data = decoder.write(data)
|
||||||
try {
|
try {
|
||||||
this._handleData(data)
|
this._handleData(data)
|
||||||
|
@ -37,7 +36,6 @@ class JsonStream extends EventEmitter {
|
||||||
let [err2, length] = btc(Buffer.byteLength)(messageData, 'utf8')
|
let [err2, length] = btc(Buffer.byteLength)(messageData, 'utf8')
|
||||||
if (err2) reject(err2)
|
if (err2) reject(err2)
|
||||||
let data = length + this._delimeter + messageData
|
let data = length + this._delimeter + messageData
|
||||||
// console.log('serialized',data)
|
|
||||||
resolve(data)
|
resolve(data)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -81,7 +81,7 @@ export default function socketClass(Server) {
|
||||||
return new Promise(async (resolve, reject) => {
|
return new Promise(async (resolve, reject) => {
|
||||||
// set up a couple ways to gracefully destroy socket process is killed/aborted
|
// set up a couple ways to gracefully destroy socket process is killed/aborted
|
||||||
_ON_DEATH(async () => {
|
_ON_DEATH(async () => {
|
||||||
log.info('\nhe\'s dead jim')
|
log.error({method:'create', line:84, msg:'\nhe\'s dead jim'})
|
||||||
await this._destroy()
|
await this._destroy()
|
||||||
})
|
})
|
||||||
process.once('SIGUSR2', async () => {
|
process.once('SIGUSR2', async () => {
|
||||||
|
@ -96,27 +96,21 @@ export default function socketClass(Server) {
|
||||||
// if TCP socket should already be dead
|
// if TCP socket should already be dead
|
||||||
let [err, res] = await btc(promisify(fileDelete))(this.opts.path)
|
let [err, res] = await btc(promisify(fileDelete))(this.opts.path)
|
||||||
if (!err) {
|
if (!err) {
|
||||||
log.info(
|
log.debug({method:'create', line:99, res: res, socket: this.opts.path, msg:'socket already exists.....deleted'})
|
||||||
{ res: res, socket: this.opts.path },
|
|
||||||
'socket already exists.....deleted'
|
|
||||||
)
|
|
||||||
return await this._listen(this.opts)
|
return await this._listen(this.opts)
|
||||||
}
|
}
|
||||||
log.fatal(
|
log.error({method:'create', line:102, err: err, msg:'error deleting socket. Can not establish a socket'})
|
||||||
{ err: err },
|
|
||||||
'error deleting socket. Can not establish a socket'
|
|
||||||
)
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (err.code === 'EACCES') {
|
if (err.code === 'EACCES') {
|
||||||
log.info({ socket: this.opts.path, msg:'directory does not exist...creating'})
|
log.debug({method:'create', line:107, socket: this.opts.path, msg:'directory does not exist...creating'})
|
||||||
await mkdir(path.dirname(this.opts.path))
|
await mkdir(path.dirname(this.opts.path))
|
||||||
log.info({ socket: this.opts.path, msg:'directory created'})
|
log.debug({method:'create', line:109, socket: this.opts.path, msg:'directory created'})
|
||||||
return await this._listen(this.opts)
|
return await this._listen(this.opts)
|
||||||
}
|
}
|
||||||
// otherwise fatally exit
|
// otherwise fatally exit
|
||||||
log.info(err, 'error creating socket')
|
log.error({method:'create', line:113, err:err, msg:'error creating socket'})
|
||||||
reject(err)
|
reject(err)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -145,10 +139,7 @@ export default function socketClass(Server) {
|
||||||
*/
|
*/
|
||||||
async push(packet, id) {
|
async push(packet, id) {
|
||||||
packet._header = { id: id || 'pushed' }
|
packet._header = { id: id || 'pushed' }
|
||||||
log.info(
|
log.debug({method:'push', line:142, opts: this.opts, packet: packet, msg:'pushing a packet to all connected consumers'})
|
||||||
{ opts: this.opts, packet: packet },
|
|
||||||
'pushing a packet to all connected consumers'
|
|
||||||
)
|
|
||||||
this.clients.forEach(async client => {
|
this.clients.forEach(async client => {
|
||||||
if (client.writable) {
|
if (client.writable) {
|
||||||
let [err, ser] = await btc(client.stream.serialize)(packet)
|
let [err, ser] = await btc(client.stream.serialize)(packet)
|
||||||
|
@ -173,14 +164,11 @@ export default function socketClass(Server) {
|
||||||
let send = this._send.bind(socket)
|
let send = this._send.bind(socket)
|
||||||
if (this.clientTracking) this.clients.push(socket)
|
if (this.clientTracking) this.clients.push(socket)
|
||||||
// TODO add 'close' listener to socket to remove from this.clients
|
// TODO add 'close' listener to socket to remove from this.clients
|
||||||
log.info('new consumer connecting')
|
log.debug({method:'_listen', line:167, msg:'new consumer connecting'})
|
||||||
log.info(await send(await stream.serialize({ _handshake: true })))
|
log.debug(await send(await stream.serialize({ _handshake: true })))
|
||||||
if (this.opts.conPacket) {
|
if (this.opts.conPacket) {
|
||||||
this.opts.conPacket._header = { id: 'pushed' }
|
this.opts.conPacket._header = { id: 'pushed' }
|
||||||
log.info(
|
log.debug({method:'_listen', line:171, conPacket: this.opts.conPacket, msg:'pushing a preset command to just connected consumer'})
|
||||||
{ conPacket: this.opts.conPacket },
|
|
||||||
'pushing a preset command to just connected consumer'
|
|
||||||
)
|
|
||||||
send(await stream.serialize(this.opts.conPacket)) // send a packet command on to consumer on connection
|
send(await stream.serialize(this.opts.conPacket)) // send a packet command on to consumer on connection
|
||||||
}
|
}
|
||||||
socket.on('data', stream.onData)
|
socket.on('data', stream.onData)
|
||||||
|
@ -188,7 +176,7 @@ export default function socketClass(Server) {
|
||||||
stream.on('message', messageProcess.bind(this, socket))
|
stream.on('message', messageProcess.bind(this, socket))
|
||||||
|
|
||||||
async function messageProcess(client, packet) {
|
async function messageProcess(client, packet) {
|
||||||
log.info({ packet: packet }, 'incoming packet on socket side')
|
log.debug({method:'_listen', line:179, packet: packet, msg:'incoming packet on socket side'})
|
||||||
let res = {}
|
let res = {}
|
||||||
if (this.clientTracking && packet.clientID) {
|
if (this.clientTracking && packet.clientID) {
|
||||||
client.ID = packet.clientID
|
client.ID = packet.clientID
|
||||||
|
@ -217,18 +205,18 @@ export default function socketClass(Server) {
|
||||||
err: err,
|
err: err,
|
||||||
_header: { id: res._header.id }
|
_header: { id: res._header.id }
|
||||||
})
|
})
|
||||||
log.info(await send(ser))
|
await send(ser)
|
||||||
} // end process message
|
} // end process message
|
||||||
}) // end connecttion consumer
|
}) // end connecttion consumer
|
||||||
log.info({ opts: this.opts }, 'socket created and listening')
|
log.info({method:'_listen', line:211, opts: this.opt, msg:'socket created and listening'})
|
||||||
return res
|
return res
|
||||||
}) // end super listen callback
|
}) // end super listen callback
|
||||||
} // end listen
|
} // end listen
|
||||||
|
|
||||||
async _destroy() {
|
async _destroy() {
|
||||||
log.info('closing down socket')
|
log.debug({method:'_destroy', line:217, msg:'closing down socket'})
|
||||||
await this.close()
|
await this.close()
|
||||||
log.info('all connections closed....exiting')
|
log.debug({method:'_destroy', line:219, msg:'all connections closed....exiting'})
|
||||||
process.exit()
|
process.exit()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue