0.2.24 refactor consumer for more robust error/disconnect handling
add online/offline/pause/resume to json stream removed bad 'connection' emits from socket-class as that event is already used by actual consumer connections.
This commit is contained in:
parent
e29b4ba838
commit
2314670c04
4 changed files with 302 additions and 221 deletions
|
@ -1,6 +1,6 @@
|
|||
{
|
||||
"name": "@uci/socket",
|
||||
"version": "0.2.23",
|
||||
"version": "0.2.24",
|
||||
"description": "JSON packet intra(named)/inter(TCP) host communication over socket",
|
||||
"main": "src",
|
||||
"scripts": {
|
||||
|
@ -11,6 +11,7 @@
|
|||
"s": "node -r esm examples/server",
|
||||
"devs": "UCI_ENV=dev ./node_modules/.bin/nodemon -r esm examples/server",
|
||||
"devs:anon": "UCI_ANON=true npm run devs",
|
||||
"devs:anon:debug": "UCI_LOG_LEVEL=debug npm run devs:anon",
|
||||
"devs:debug": "UCI_LOG_LEVEL=debug npm run devs",
|
||||
"client": "node -r esm examples/client",
|
||||
"devc": "UCI_ENV=dev ./node_modules/.bin/nodemon -r esm examples/client",
|
||||
|
|
463
src/consumer.js
463
src/consumer.js
|
@ -1,7 +1,7 @@
|
|||
import { Socket } from 'net'
|
||||
import path from 'path'
|
||||
import { promisify } from 'util'
|
||||
import pause from 'delay'
|
||||
// import pause from 'delay'
|
||||
import btc from 'better-try-catch'
|
||||
import JsonStream from './json-stream'
|
||||
|
||||
|
@ -48,128 +48,152 @@ class SocketConsumer extends Socket {
|
|||
this.opts = opts
|
||||
// default is keepAlive true, must set to false to explicitly disable
|
||||
// if keepAlive is true then consumer will also be reconnecting consumer
|
||||
this.initTimeout = opts.initTimeout==null ? 60000 : opts.initTimeout * 1000
|
||||
this.initTimeout = opts.initTimeout > 4 ? opts.initTimeout * 1000 : null
|
||||
this.pingFailedTimeout = opts.pingFailedTimeout * 1000 || 10000
|
||||
this.reconnectLimit = opts.reconnectLimit || 0
|
||||
this.retryWait = opts.retryWait==null ? 5000 : opts.retryWait * 1000
|
||||
console.log(this.retryWait)
|
||||
this.heartBeat = !!process.env.HEARTBEAT || opts.heartBeat
|
||||
this.keepAlive = 'keepAlive' in opts ? opts.keepAlive : true
|
||||
this._connected = false
|
||||
this._authenticated = false
|
||||
this.stream = new JsonStream()
|
||||
// bind to class for other class functions
|
||||
this.connect = this.connect.bind(this)
|
||||
this.close = promisify(this.end).bind(this)
|
||||
// this.__ready = this.__ready.bind(this)
|
||||
this._conAttempt = 1
|
||||
this._aborted = false
|
||||
this._reconnect = false
|
||||
this.retryPause = {} // timeout that may need to be cancelled if init timeout throws
|
||||
// this._packetProcess = this._packetProcess.bind(this)
|
||||
this._reconnectCount = 0
|
||||
this._connected = false
|
||||
this._authenticated = false
|
||||
this._connection = 'offline'
|
||||
this._first = true // first connection or not
|
||||
this._pingTimeout // value sent from socket upon connect
|
||||
}
|
||||
|
||||
get connected() { return this._connected}
|
||||
get active() { return !!this._authenticated }
|
||||
get connection() { return this._connection }
|
||||
|
||||
async connect() {
|
||||
return new Promise((resolve, reject) => {
|
||||
notify(state, moreOpts={}) {
|
||||
this._connection = state || this._connection
|
||||
let opts = {state:this._connection, id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected}
|
||||
Object.assign(opts,moreOpts)
|
||||
this.emit('connection',opts)
|
||||
}
|
||||
|
||||
if (this.opts.host === '127.0.0.1'|| this.opts.host ==='localhost')
|
||||
log.warn({method:'connect', line:107, msg:'tcp consumer on same machine as host, use named Pipe(Unix) Socket Instead'})
|
||||
status(level='debug', msg) {
|
||||
let opts = {level:level, msg:msg, id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected, state:this._connection}
|
||||
if (typeof msg !== 'string') Object.assign(opts,msg)
|
||||
this.emit('status',opts)
|
||||
log[level](opts)
|
||||
}
|
||||
|
||||
log.debug('first connnect attempt for', this.opts.id)
|
||||
this.emit('status',{level:'info', msg:'attempting initial connection', id:this.id, opts:this.opts, ready:false})
|
||||
async connect(timeout=0) {
|
||||
console.log('=================================================connect=========================================================================================')
|
||||
this.initTimeout = timeout > 4 ? timeout * 1000 : this.initTimeout
|
||||
this.notify('starting')
|
||||
if (this.opts.host === '127.0.0.1'|| this.opts.host ==='localhost') {
|
||||
let msg ='tcp consumer on same machine as host, use named Pipe(Unix) Socket Instead'
|
||||
log.warn({method:'connect', msg:'msg'})
|
||||
this.status('warn',msg)
|
||||
}
|
||||
|
||||
console.log('TIMEOUT IN SOCKE CONNECT',this.initTimeout)
|
||||
this.once('error', errorHandler.bind(this))
|
||||
this.once('connect',connectHandler.bind(this))
|
||||
|
||||
super.connect(this.opts)
|
||||
|
||||
// returns promise for initial connection with reject on timeout
|
||||
if (this._first && this.initTimeout) {
|
||||
|
||||
let initTimeout = {}
|
||||
if (this.initTimeout > 499) {
|
||||
initTimeout = setTimeout(() => {
|
||||
clearTimeout(this.retryPause)
|
||||
this.emit('status',{level:'error', msg:'initial connection timed out', id:this.id, timeout:this.initTimeout, wait:this.retryWait, opts:this.opts, ready:false})
|
||||
this.removeAllListeners()
|
||||
log.fatal({method:'connect', line:69, opts: this.opts, msg:`unable to initially connect to ${this.opts.name} in ${this.initTimeout/1000} secs no more attempts!`})
|
||||
this.stream.removeAllListeners()
|
||||
reject({ opts: this.opts, msg: `unable to connect initially to socket server in ${this.initTimeout/1000} secs, giving up no more attempts`})
|
||||
this._first=false
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
|
||||
initTimeout = setTimeout(async () => {
|
||||
this.disconnect()
|
||||
let msg=`unable to connect initially to socket server in ${this.initTimeout/1000} secs, giving up no more attempts`
|
||||
this.status('fatal',msg)
|
||||
this.notify('failed',{timeout:this.initTimeout, msg:msg})
|
||||
reject({ error:msg, opts: this.opts})
|
||||
}
|
||||
, this.initTimeout)
|
||||
}
|
||||
|
||||
const initialHandshake = async (packet) => {
|
||||
|
||||
if (packet._handshake) {
|
||||
clearTimeout(initTimeout)
|
||||
this._connected = true
|
||||
let authPacket = this._authenticate() || {}
|
||||
authPacket._authenticate = true
|
||||
authPacket.clientName = this.id
|
||||
let res = (await this._authenticateSend(authPacket)) || {}
|
||||
if (!res.authenticated) {
|
||||
this.emit('status',{level:'info', msg:`authentication failed: ${res.reason}`, id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected})
|
||||
reject('unable to authenticate')
|
||||
}
|
||||
else {
|
||||
this._authenticated = res.authenticated
|
||||
this.removeListener('error',initialErrorHandler)
|
||||
this._listen() // setup for active connection
|
||||
log.info({method:'connect', line:87, msg:'initial connect/authentication complete ready for communication'})
|
||||
this.emit('status',{level:'info', msg:'authentication succesfull', id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected})
|
||||
this.emit('consumer-connection',{state:'connected',name:this.id})
|
||||
if (this.opts.conPacket) (this.send(this.conPacket))
|
||||
resolve('initial connection successful')
|
||||
const successHandler = (ev) => {
|
||||
if (ev.state === 'online') {
|
||||
clearTimeout(initTimeout)
|
||||
this.removeListener('connection',successHandler)
|
||||
this.status('info','initial connection successfull')
|
||||
resolve({opts: this.opts, msg: 'initial connection successfull'})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
this.on('connection',successHandler)
|
||||
|
||||
})
|
||||
}
|
||||
this._first=false
|
||||
return 'connection in progress'
|
||||
|
||||
} // end connect
|
||||
|
||||
|
||||
const initialConnectHandler = async () => {
|
||||
this.on('data', this.stream.onData)
|
||||
this.stream.once('message', initialHandshake.bind(this))
|
||||
log.debug({method:'connect', line:113, msg:'connected waiting for socket ready handshake'})
|
||||
this.emit('status',{level:'debug', msg:'consumer connected'})
|
||||
}
|
||||
|
||||
const initialErrorHandler = async (err) => {
|
||||
let msg = {level:'error', method:'connect', line:101, error:err, msg:`error during initial connect, trying again in ${this.retryWait/1000} secs`}
|
||||
log.error(msg)
|
||||
this.emit('status',msg)
|
||||
let connect = () => { super.connect(this.opts)}
|
||||
this.retryPause = setTimeout(connect.bind(this),this.retryWait)
|
||||
}
|
||||
|
||||
|
||||
this.once('connect', initialConnectHandler)
|
||||
this.on('error', initialErrorHandler)
|
||||
super.connect(this.opts)
|
||||
|
||||
}) // end initial promise
|
||||
// manual disonnect
|
||||
async disconnect() {
|
||||
clearTimeout(this._errorRetry)
|
||||
this.removeAllListeners('ping')
|
||||
this.removeAllListeners('connect')
|
||||
this.removeAllListeners('error')
|
||||
this.removeAllListeners('data')
|
||||
this.removeAllListeners('pushed')
|
||||
this.stream.removeAllListeners()
|
||||
this._connected=false
|
||||
this._authenticated=false
|
||||
this.notify('disconnected')
|
||||
this._first = true
|
||||
}
|
||||
|
||||
async reconnect () {
|
||||
if (this._connected) this.disconnect()
|
||||
this._first = false
|
||||
if ( this.reconnectLimit === 0 || this._reconnectCount < this.reconnectLimit ) {
|
||||
this._reconnectCount += 1
|
||||
this.status('warn',`Attempting a reconnect: attempt ${this._reconnectCount} of ${this.reconnectLimit ||'unlimited'}`)
|
||||
this.connect()
|
||||
}
|
||||
else {
|
||||
this.status('fatal',`${this.reconnectLimit} reconnect attempts exceeded no more tries`)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
async send(ipacket) {
|
||||
return new Promise(async resolve => {
|
||||
if (!this._connected) {
|
||||
if (!this.active) {
|
||||
resolve({ error: 'socket consumer not connected, aborting send' })
|
||||
}
|
||||
let packet = Object.assign({}, ipacket) // need to avoid mutuation for different consumers using same packet instance
|
||||
setTimeout(() => {resolve({ error: 'no response from socket in 10sec' })}, 10000)
|
||||
packet._header = {
|
||||
id: Math.random()
|
||||
.toString()
|
||||
.slice(2), // need this for when multiple sends for different consumers use same packet instanceack
|
||||
sender: { name: this.name, instanceID: this.id },
|
||||
path: this.opts.path,
|
||||
port: this.opts.port,
|
||||
host: this.opts.host
|
||||
}
|
||||
let [err, res] = await btc(this.stream.serialize)(packet)
|
||||
if (err)
|
||||
resolve({error: 'unable to serialize packet for sending',packet: packet})
|
||||
await this.__write(res)
|
||||
this.once(packet._header.id, async function(reply) {
|
||||
let res = await this._packetProcess(reply)
|
||||
if (!res) { // if packetProcess was not promise
|
||||
res = reply
|
||||
log.debug({method:'send', line:180, msg:'consumer function was not promise returning further processing may be out of sequence'})
|
||||
} else {
|
||||
let packet = Object.assign({}, ipacket) // need to avoid mutuation for different consumers using same packet instance
|
||||
setTimeout(() => {resolve({ error: 'no response from socket in 10sec' })}, 10000)
|
||||
packet._header = {
|
||||
id: Math.random()
|
||||
.toString()
|
||||
.slice(2), // need this for when multiple sends for different consumers use same packet instanceack
|
||||
sender: { name: this.name, instanceID: this.id },
|
||||
path: this.opts.path,
|
||||
port: this.opts.port,
|
||||
host: this.opts.host
|
||||
}
|
||||
resolve(res) // resolves processed packet not return packet
|
||||
}) //end listener
|
||||
let [err, res] = await btc(this.stream.serialize)(packet)
|
||||
if (err)
|
||||
resolve({error: 'unable to serialize packet for sending',packet: packet})
|
||||
await this.__write(res) // write errors will be caught by socket error listener and result in reconnect
|
||||
this.once(packet._header.id, async function(reply) {
|
||||
let res = await this._packetProcess(reply)
|
||||
if (!res) { // if packetProcess was not promise
|
||||
res = reply
|
||||
log.debug({method:'send', line:180, msg:'consumer function was not promise returning further processing may be out of sequence'})
|
||||
}
|
||||
resolve(res) // resolves processed packet not return packet
|
||||
}) //end listener
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -184,6 +208,12 @@ class SocketConsumer extends Socket {
|
|||
this._authenticate = func
|
||||
}
|
||||
|
||||
// stream needs an .onData method and will be stream bound handler for data event
|
||||
resgisterStreamProcessor (func) {
|
||||
this.stream = func
|
||||
}
|
||||
|
||||
|
||||
// PRIVATE METHODS
|
||||
|
||||
// default authentication using a simple token
|
||||
|
@ -194,123 +224,30 @@ class SocketConsumer extends Socket {
|
|||
async _authenticateSend (authPacket={}) {
|
||||
return new Promise(async resolve => {
|
||||
setTimeout(() => {resolve({ error: 'no response from socket in 10sec' })}, 10000)
|
||||
let [err, res] = await btc(this.stream.serialize)(authPacket)
|
||||
let [err, serPacket] = await btc(this.stream.serialize)(authPacket)
|
||||
if (err)
|
||||
resolve({error: 'unable to serialize packet for sending',packet: authPacket})
|
||||
this.stream.on('message',(res) => {
|
||||
this.stream.removeAllListeners('message')
|
||||
this.stream.once('message',(res) => {
|
||||
resolve(res)
|
||||
|
||||
})
|
||||
await this.__write(res)
|
||||
|
||||
|
||||
let res = await this.__write(serPacket)
|
||||
if (res.error) resolve(res)
|
||||
// otherwise wait for message listener above to return
|
||||
})
|
||||
}
|
||||
|
||||
// set up incoming message listening and error/reonnect handling
|
||||
async _listen() {
|
||||
// Define Handlers and Other Functions
|
||||
const reconnectHandler = () => {
|
||||
this.stream.once('message', handshake.bind(this))
|
||||
log.debug({method:'connect', line:113, msg:'connected waiting for socket ready handshake'})
|
||||
this.emit('status',{level:'info', msg:'attemping reconnect', id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected})
|
||||
}
|
||||
|
||||
const handshake = async (packet) => {
|
||||
if (packet._handshake) {
|
||||
this._connected = true
|
||||
let authPacket = this._authenticate() || {}
|
||||
authPacket._authenticate = true
|
||||
authPacket.clientName = this.id
|
||||
let res = (await this._authenticateSend(authPacket)) || {}
|
||||
if (!res.authenticated) {
|
||||
this.emit('status',{level:'error', msg:`authentication failed: ${res.reason}`, id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected})
|
||||
this.emit('error',{code:'authentification failed'})
|
||||
}
|
||||
else {
|
||||
this._authenticated = res.authenticated
|
||||
log.info({method:'connect', line:87, msg:'initial connect/authentication complete ready for communication'})
|
||||
this.emit('status',{level:'info', msg:'authentication successful', id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected})
|
||||
if (this.keepAlive) { // only attempt reconnect if keepAlive is set which it is by default
|
||||
this.on('ping',pingHandler)
|
||||
this.setKeepAlive(this.keepAlive,3000) // keep connection alive unless disabled
|
||||
}
|
||||
this.stream.on('message', messageHandler.bind(this)) // reset default message handler
|
||||
this.emit('status',{level:'info', msg:'reconnected', id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected})
|
||||
this.emit('consumer-connection', {state:'reconnected', name:this.id})
|
||||
if (this.opts.conPacket) (this.send(this.conPacket))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const errorHandler = async (err) => {
|
||||
log.debug({msg:'connection error emitted ', error:err})
|
||||
this._connected = false
|
||||
this._authenticated = false
|
||||
this.emit('status',{level:'error', msg:'connection error', id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected})
|
||||
log.debug({ method:'connect', line:130, error: err.code, msg:`connect error ${err.code}, attempting reconnect after ${this.retryWait/1000} secs`})
|
||||
this.emit('consumer-connection', {state:'disconnected', name:this.id})
|
||||
await pause(this.retryWait)
|
||||
this.stream.removeAllListeners('message') // remove regular message handler in prep for reconnect
|
||||
this.removeAllListeners('connect')
|
||||
this.removeAllListeners('ping')
|
||||
this.once('connect',reconnectHandler)
|
||||
super.connect(this.opts)
|
||||
}
|
||||
|
||||
const pushHandler = async (packet) => {
|
||||
// TODO do some extra security here?
|
||||
log.debug('packed was pushed from socket sever, processing',packet)
|
||||
let res = await this._packetProcess(packet)
|
||||
if (!res) {
|
||||
// if process was not promise returning then res will be undefined
|
||||
log.debug('consumer packet processing function was not promise returning')
|
||||
}
|
||||
}
|
||||
|
||||
const pingHandler = async (packet) => {
|
||||
clearTimeout(pingTimeout)
|
||||
log.trace({method:'connect', line:191, msg:'received ping, restting timeout'})
|
||||
this._pingTimeout= packet.pingInterval + 1000
|
||||
monitorPing.call(this)
|
||||
}
|
||||
|
||||
let pingTimeout = {}
|
||||
function monitorPing () {
|
||||
pingTimeout = setTimeout( () => {
|
||||
log.error({method:'connect', line:142, msg:'socket (server) not availabe'})
|
||||
this.removeAllListeners('ping')
|
||||
this._connected = false
|
||||
this.emit('error', { code: 'PING-FAILED' })
|
||||
},this._pingTimeout)
|
||||
}
|
||||
|
||||
// general handler
|
||||
function messageHandler(packet) {
|
||||
if (packet._header.id !== 'ping') log.debug('incoming packet from socket sever',packet)
|
||||
this.emit(packet._header.id, packet)
|
||||
}
|
||||
|
||||
// Start Message Listening and Error/Reconnect Handling
|
||||
log.debug('listening for incoming packets from socket')
|
||||
this.stream.on('message', messageHandler.bind(this))
|
||||
this.setKeepAlive(this.keepAlive,3000) // keep connection alive unless disabled
|
||||
this.on('pushed', pushHandler )
|
||||
this.on('error', errorHandler)
|
||||
if (this.keepAlive) { // keepAlive also activates ping Monitor
|
||||
this.on('ping',pingHandler)
|
||||
}
|
||||
}
|
||||
|
||||
async __write(packet) {
|
||||
// timeout already set if sockect can't be drained in 10 secs
|
||||
return new Promise(resolve => {
|
||||
const cb = () => resolve('packet written to consumer side socket stream ')
|
||||
if (!super.write(packet)) {
|
||||
this.once('drain', cb)
|
||||
} else {
|
||||
process.nextTick(cb)
|
||||
try {
|
||||
if (!super.write(packet)) {
|
||||
this.once('drain', cb)
|
||||
} else {
|
||||
process.nextTick(cb)
|
||||
}
|
||||
} catch (err){
|
||||
resolve({error:`error during write to socket - ${err.code}`, err:err})
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -325,3 +262,129 @@ class SocketConsumer extends Socket {
|
|||
} // end class
|
||||
|
||||
export default SocketConsumer
|
||||
|
||||
|
||||
// CONNECTION HANDLERS
|
||||
|
||||
async function connectHandler () {
|
||||
this.removeAllListeners('error') // turn off error handler during handshake
|
||||
this._doneAuthenticate = setTimeout(() =>{
|
||||
let msg ='authentication not completed in 3 secs, forcing reconnect attempt'
|
||||
this.notify('failed',{msg:msg})
|
||||
this.status('warn',msg)
|
||||
this.reconnect()
|
||||
},3000)
|
||||
this.on('data', this.stream.onData)
|
||||
this.stream.on('message', handshake.bind(this))
|
||||
this.status('info','in process of connecting waiting for socket handshake/authenticate')
|
||||
this.notify('connecting')
|
||||
}
|
||||
|
||||
async function handshake (packet) {
|
||||
if (packet._handshake) {
|
||||
this._connected = true
|
||||
this.notify('handshake')
|
||||
this.status('info','connection handshaking')
|
||||
let authPacket = this._authenticate() || {}
|
||||
authPacket._authenticate = true
|
||||
authPacket.clientName = this.id
|
||||
let res = await this._authenticateSend(authPacket)
|
||||
this.stream.removeAllListeners('message')
|
||||
clearTimeout(this._doneAuthenticate)
|
||||
if (res.error) {
|
||||
let msg =`error during authentication ${res.error}, attempting reconnect in ${this.retryWait/1000}s to see if error clears`
|
||||
this.notify('error',{msg:msg})
|
||||
this.status('error',msg)
|
||||
this._errorRetry = setTimeout(this.reconnect.bind(this),this.retryWait)
|
||||
} else {
|
||||
if (!res.authenticated) {
|
||||
let msg =`authentication failed: ${res.reason}, disconnecting with no reconnect`
|
||||
this.notify('failed',{msg:msg})
|
||||
this.status('fatal',msg)
|
||||
this.disconnect()
|
||||
}
|
||||
else {
|
||||
this._authenticated = res.authenticated
|
||||
let msg ='authentication succeeded connection ready'
|
||||
this.status('info',msg)
|
||||
this.notify('online',{msg:msg})
|
||||
this._reconnectCount = 0
|
||||
this.stream.on('message', messageHandler.bind(this)) // reset default message handler
|
||||
this.on('pushed', pushHandler.bind(this) )
|
||||
this.once('error', errorHandler.bind(this)) // listen for errors on authenticated socket
|
||||
if (this.keepAlive) { // only attempt reconnect if keepAlive is set which it is by default
|
||||
this.on('ping',pingHandler.bind(this))
|
||||
this.setKeepAlive(this.keepAlive,3000) // keep connection alive unless disabled
|
||||
}
|
||||
if (this.opts.conPacket) (this.send(this.conPacket))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// general message handler handler
|
||||
function messageHandler(packet) {
|
||||
if (packet._header.id !== 'ping') { // ping has it's own listner
|
||||
let obj = { msg:'incoming packet from socket sever',packet:packet}
|
||||
this.status('trace',obj)
|
||||
}
|
||||
this.emit(packet._header.id, packet)
|
||||
}
|
||||
|
||||
// assume all errors are fatal and the socket needs to be disconnected/reconnected
|
||||
async function errorHandler (err) {
|
||||
this.disconnect()
|
||||
let msg = {error:err, msg:`error, socket has been disconnected, trying reconnect in ${this.retryWait/1000} secs`}
|
||||
this.notify('error',{error:err,msg:msg})
|
||||
this.status('error',msg)
|
||||
this._errorRetry = setTimeout(this.reconnect.bind(this),this.retryWait)
|
||||
}
|
||||
|
||||
// const pushHandler = async (packet) => {
|
||||
async function pushHandler (packet) {
|
||||
// TODO do some extra security here?
|
||||
let msg = {msg:'packed was pushed from socket sever, processing', packet:packet}
|
||||
this.status('trace',msg)
|
||||
let res = await this._packetProcess(packet)
|
||||
if (!res) {
|
||||
// if process was not promise returning then res will be undefined
|
||||
log.debug('consumer packet processing function was not promise returning')
|
||||
}
|
||||
}
|
||||
|
||||
// const pingHandler = async (packet) => {
|
||||
async function pingHandler (packet) {
|
||||
if (this.heartBeat) console.log('lub dub')
|
||||
clearTimeout(this._ping)
|
||||
this._pingTimeout= (packet.pingInterval || 5000) + 1000 // set value from server
|
||||
monitorPing.call(this)
|
||||
}
|
||||
|
||||
async function pingFailedHandler () {
|
||||
clearTimeout(this._pingFailed)
|
||||
clearTimeout(this._ping) // clear any others set
|
||||
this.removeAllListeners('ping')
|
||||
this.on('ping',pingHandler.bind(this))
|
||||
let msg = 'ping has been received again, back to normal connection'
|
||||
this.notify('online',{msg:msg})
|
||||
this.status('info',msg)
|
||||
}
|
||||
|
||||
|
||||
function monitorPing () {
|
||||
this._ping = setTimeout( () => {
|
||||
this.removeAllListeners('ping')
|
||||
let msg = `ping failed waiting ${this.pingFailedTimeout/1000} secs to come back online before forced retry`
|
||||
this.notify('offline',{msg:msg})
|
||||
this.status('warn',msg)
|
||||
this.on('ping',pingFailedHandler.bind(this))
|
||||
this._pingFailed = setTimeout (() => {
|
||||
this.removeAllListeners('ping')
|
||||
let msg =`no ping received for ${this.pingFailedTimeout/1000} secs, force disconnect/reconnect`
|
||||
this.notify('failed',{msg:msg})
|
||||
this.status('warn',msg)
|
||||
this.reconnect()
|
||||
// this.emit('error',{code:'PING_FAILED'})
|
||||
}, this.pingFailedTimeout)
|
||||
},this._pingTimeout)
|
||||
}
|
||||
|
|
|
@ -18,8 +18,19 @@ class JsonStream extends EventEmitter {
|
|||
this._delimeter = opts.delimiter || '#'
|
||||
this.onData = this.onData.bind(this)
|
||||
this.serialize = this.serialize.bind(this)
|
||||
this._state = 'online'
|
||||
this._queue = []
|
||||
}
|
||||
|
||||
get state () {return this._state}
|
||||
offline () { this._state = 'offline' }
|
||||
pause () {this._state = 'paused'} // queue messages in handler
|
||||
resume () {
|
||||
// emit the messages in the queue
|
||||
this._state='online'
|
||||
}
|
||||
online() {this._state = 'online'}
|
||||
|
||||
onData(data) {
|
||||
data = decoder.write(data)
|
||||
try {
|
||||
|
@ -91,7 +102,11 @@ class JsonStream extends EventEmitter {
|
|||
throw err
|
||||
}
|
||||
message = message || {}
|
||||
this.emit('message', message)
|
||||
// console.log('stream message', message, this._state)
|
||||
if (this._stream ==='pause') {
|
||||
if (message._header.id !== 'ping') this.queue.shift(message)
|
||||
}
|
||||
if(this._state==='online') this.emit('message', message)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -138,7 +138,6 @@ export default function socketClass(Server) {
|
|||
let errors = {level:'fatal',msg:'something fatal is going on, 6 errors', errors:this.errors}
|
||||
log.fatal(errors)
|
||||
this.listening=false
|
||||
this.emit('status', {active:this.active})
|
||||
this.emit('status', errors)
|
||||
}
|
||||
})
|
||||
|
@ -246,7 +245,7 @@ export default function socketClass(Server) {
|
|||
removeClient (id) {
|
||||
let index = this.getClientIndex(id)
|
||||
let client=this.clients[index]
|
||||
this.emit('consumer-connection',{state:'disconnected', id:client.id, name:client.name, socketSide:true})
|
||||
this.emit('status',{level:'info', msg:'a consumer disconnected', name:client.name, id:client.id})
|
||||
client.removeAllListeners()
|
||||
client.stream.removeAllListeners()
|
||||
this.clients.splice(index,1)
|
||||
|
@ -254,8 +253,7 @@ export default function socketClass(Server) {
|
|||
}
|
||||
|
||||
async authenticateClient(client) {
|
||||
// let server = this
|
||||
return new Promise(async function(resolve, reject) {
|
||||
return new Promise(async (resolve, reject) => {
|
||||
// when consumer gets the handshake they must follow with authentication
|
||||
client.stream.on('message', authenticate.bind(this,client))
|
||||
let [err] = await btc(this._send)(client,{_handshake: true, id:client.id})
|
||||
|
@ -281,13 +279,13 @@ export default function socketClass(Server) {
|
|||
reject(packet.reason)
|
||||
}
|
||||
else {
|
||||
log.info({msg:'client authenticated successfuly', client:client.name, client_id:client.id})
|
||||
if (this.allowAnonymous) log.warn({msg:'socket consumer connected anonymously', consumer:client.name, consumer_id:client.id})
|
||||
log.info({msg:'client authenticated successfuly', client:client.name})
|
||||
if (this.allowAnonymous) log.warn({msg:'socket consumer connected anonymously', consumer:client.name})
|
||||
resolve(client.authenticated)
|
||||
}
|
||||
}
|
||||
}
|
||||
}.bind(this))
|
||||
})
|
||||
}
|
||||
|
||||
// private methods
|
||||
|
@ -306,15 +304,11 @@ export default function socketClass(Server) {
|
|||
}
|
||||
|
||||
|
||||
// async _connectionHandler({consumer, server}) { // this gets called for each client connection and is unique to
|
||||
async _connectionHandler(consumer) { // this gets called for each client connection and is unique to each
|
||||
consumer.id = ++this.nextClientID // server assigned ID
|
||||
log.debug({method:'_listen', line:167, msg:'new consumer connecting', id:consumer.id, totalConsumers:this.clients.length})
|
||||
consumer.socketSide = true
|
||||
consumer.authenticated = false
|
||||
this.clients.push(consumer) // add client to list
|
||||
|
||||
const stream = new JSONStream()
|
||||
consumer.stream = stream
|
||||
consumer.setKeepAlive(this.keepAlive,3000)
|
||||
|
||||
// add listeners
|
||||
const clientCloseHandler = (id) => {
|
||||
|
@ -325,7 +319,7 @@ export default function socketClass(Server) {
|
|||
consumer.on('close', clientCloseHandler.bind(this,consumer.id) )
|
||||
|
||||
consumer.on('error', (err) => {
|
||||
log.error({msg:'client connection error during listen',error:err})
|
||||
log.error({msg:'client connection error',error:err})
|
||||
// TODO do more handling than just logging
|
||||
})
|
||||
|
||||
|
@ -336,16 +330,25 @@ export default function socketClass(Server) {
|
|||
// TODO do more handling than just logging
|
||||
})
|
||||
|
||||
consumer.authenticated = true
|
||||
|
||||
let [err] = await btc(this.authenticateClient)(consumer)
|
||||
if (!this.allowAnonymous) {
|
||||
if (err) {
|
||||
consumer.removeAllListeners()
|
||||
consumer.stream.removeAllListeners()
|
||||
consumer.end()// abort new connection consumer, cleanup, remove listeners
|
||||
this.removeClient(consumer.id)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// all's set main message processor
|
||||
// authenticated consumer add to list of clients
|
||||
consumer.id = ++this.nextClientID // server assigned ID
|
||||
consumer.socketSide = true
|
||||
consumer.authenticated = true
|
||||
this.clients.push(consumer) // add current consumer to clients
|
||||
consumer.setKeepAlive(this.keepAlive,30)
|
||||
log.debug({method:'_listen', line:364, msg:'new consumer connected/authenticated', cname:consumer.name, cid:consumer.id, totalConsumers:this.clients.length})
|
||||
// all's set enable main incoming message processor
|
||||
stream.on('message', messageProcess.bind(this, consumer))
|
||||
|
||||
if (this.opts.conPacket) {
|
||||
|
@ -354,8 +357,7 @@ export default function socketClass(Server) {
|
|||
this._send(consumer,this.opts.conPacket) // send a packet command on to consumer on connection
|
||||
}
|
||||
|
||||
this.emit('consumer-connection',{state:'connected', id:consumer.id, name:consumer.name, socketSide:true})
|
||||
this.emit('status',{level:'info', msg:'a consumer connected', name:consumer.name, id:consumer.id})
|
||||
this.emit('status',{level:'info', msg:'a consumer connected and authenticated', name:consumer.name, id:consumer.id})
|
||||
|
||||
// that's it. Connection is active
|
||||
|
||||
|
|
Loading…
Reference in a new issue