import { Socket } from 'net' import path from 'path' import { promisify } from 'util' // import pause from 'delay' import btc from 'better-try-catch' import JsonStream from './json-stream' // import logger from '../../uci-logger/src/logger' import logger from '@uci-utils/logger' let log = {} // TODO change default pipe dir for windows and mac os const DEFAULT_PIPE_DIR = process.env.SOCKETS_DIR || '/tmp/UCI' const DEFAULT_SOCKET_NAME = 'uci-sock' /** * Socket Consumer - connects to UCI TCP or Named Pipe Sockets and coummunicates with uci packet.
* Extends {@link https://nodejs.org/api/net.html#net_class_net_socket | nodejs net.Socket} * @extends Socket */ class SocketConsumer extends Socket { /** * constructor - Description * * @param {object} [opts={}] test */ constructor(opts = {}) { super() this.id = opts.id || opts.name || 'socket:' + new Date().getTime() log = logger({ file: 'src/consumer.js', class: 'Consumer', name: 'socket', id: this.id }) if (!opts.path) { if(!opts.host) log.warn({method:'constructor', line:38, opts: opts, msg:'no host supplied using localhost...use named piped instead - opts.path'}) opts.host = opts.host || '127.0.0.1' opts.port = opts.port || 8080 } else { if (typeof opts.path === 'boolean') opts.path = path.join(DEFAULT_PIPE_DIR, DEFAULT_SOCKET_NAME) if (path.dirname(opts.path) === '.') opts.path = path.join(DEFAULT_PIPE_DIR, opts.path) } this.opts = opts this._data = {id:this.id, name:opts.name||this.id} Object.assign(this._data,opts.data||{}) // holds consumer specific data that will be passed to server in header and on connection // default is keepAlive true, must set to false to explicitly disable // if keepAlive is true then consumer will also be reconnecting consumer // initTimeout > 4 means socketInit will return a promise this.initTimeout = opts.initTimeout > 4 ? opts.initTimeout * 1000 : null this.pingFailedTimeout = opts.pingFailedTimeout * 1000 || 5000 this.reconnectLimit = opts.reconnectLimit || 0 this.retryWait = opts.retryWait==null ? 5000 : opts.retryWait * 1000 this.heartBeat = !!process.env.HEARTBEAT || opts.heartBeat this.keepAlive = 'keepAlive' in opts ? opts.keepAlive : true this.stream = new JsonStream() // bind to class for other class functions this.connect = this.connect.bind(this) this.close = promisify(this.end).bind(this) this._reconnectCount = 0 this._connected = false this._authenticated = false this._connection = 'offline' this._first = true // first connection or not this._pingTimeout // value sent from socket upon connect } get connected() { return this._connected} get active() { return !!this._authenticated } get connection() { return this._connection } notify(state, moreOpts={}) { this._connection = state || this._connection let opts = {state:this._connection, id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected} Object.assign(opts,moreOpts) this.emit('connection:socket',opts) } log(level='debug', msg) { let opts = {level:level, msg:msg, id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected, state:this._connection} if (typeof msg !== 'string') Object.assign(opts,msg) this.emit('log',opts) log[level](opts) } async connect(timeout=0) { this.initTimeout = timeout > 4 ? timeout * 1000 : this.initTimeout this.notify('starting') if (this.opts.host === '127.0.0.1'|| this.opts.host ==='localhost') { let msg ='tcp consumer on same machine as host, use named Pipe(Unix) Socket Instead' log.warn({method:'connect', msg:'msg'}) this.log('warn',msg) } this.once('error', errorHandler.bind(this)) this.once('connect',connectHandler.bind(this)) super.connect(this.opts) // returns promise for initial connection when initTimeout is not zero with reject on timeout if (this._first && this.initTimeout) { let initTimeout = {} this._first=false return new Promise((resolve, reject) => { initTimeout = setTimeout(async () => { this.disconnect() let msg=`unable to connect initially to socket server in ${this.initTimeout/1000} secs, giving up no more attempts` this.log('fatal',msg) this.notify('failed',{timeout:this.initTimeout, msg:msg}) reject({ error:msg, opts: this.opts}) } , this.initTimeout) const successHandler = (ev) => { if (ev.state === 'connected') { clearTimeout(initTimeout) this.removeListener('connection:socket',successHandler) this.log('info','initial connection successfull') resolve({opts: this.opts, msg: 'initial connection successfull'}) } } this.on('connection:socket',successHandler.bind(this)) }) } this._first=false return 'connection in progress' } // end connect // manual disonnect async disconnect() { clearTimeout(this._errorRetry) this.removeAllListeners('ping') this.removeAllListeners('connect') this.removeAllListeners('error') this.removeAllListeners('data') this.removeAllListeners('pushed') this.stream.removeAllListeners() this._connected=false this._authenticated=false this.notify('disconnected') this._first = true } async reconnect () { if (this._connected) this.disconnect() this._first = false if ( this.reconnectLimit === 0 || this._reconnectCount < this.reconnectLimit ) { this._reconnectCount += 1 this.log('warn',`Attempting a reconnect: attempt ${this._reconnectCount} of ${this.reconnectLimit ||'unlimited'}`) this.connect() } else { this.log('fatal',`${this.reconnectLimit} reconnect attempts exceeded no more tries`) } } async send(ipacket) { return new Promise(async resolve => { if (!this.active) { resolve({ error: 'socket consumer not connected, aborting send' }) } else { let packet = Object.assign({}, ipacket) // need to avoid mutuation for different consumers using same packet instance setTimeout(() => {resolve({ error: 'no response from socket in 10sec' })}, 10000) packet._header = { id: Math.random() .toString() .slice(2), // need this for when multiple sends for different consumers use same packet instanceack sender: { data: this._data, instanceID: this.id}, path: this.opts.path, port: this.opts.port, host: this.opts.host } let [err, res] = await btc(this.stream.serialize)(packet) if (err) resolve({error: 'unable to serialize packet for sending',packet: packet}) let res2 = await this.__write(res) if (res2.error) resolve(res2) // if no write error then wait for send response this.once(packet._header.id, async function(reply) { let res = await this._packetProcess(reply) if (!res) { // if packetProcess was not promise res = reply log.debug({method:'send', line:180, msg:'consumer function was not promise returning further processing may be out of sequence'}) } resolve(res) // resolves processed packet not return packet }) //end listener } }) } // TODO register user alt stream processor (emit 'message' with JSON, serialize function, onData method for raw socket chucks) registerPacketProcessor(func) { this._packetProcess = func } // func should return an object the server expects registerAuthenticator (func) { this._authenticate = func } // stream needs an .onData method and will be stream bound handler for data event resgisterStreamProcessor (func) { this.stream = func } // PRIVATE METHODS // default authentication using a simple token _authenticate () { return { token: process.env.UCI_CLIENT_TOKEN || this.token || 'default' } } async _authenticateSend (authPacket={}) { return new Promise(async resolve => { setTimeout(() => {resolve({ error: 'no response from socket in 10sec' })}, 10000) let [err, serPacket] = await btc(this.stream.serialize)(authPacket) if (err) resolve({error: 'unable to serialize packet for sending',packet: authPacket}) this.stream.once('message',(res) => { resolve(res) }) let res = await this.__write(serPacket) if (res.error) resolve(res) // otherwise wait for message listener above to return }) } async __write(packet) { // timeout already set if sockect can't be drained in 10 secs return new Promise((resolve) => { // most stream write errors will be caught by socket error listener but make sure if (!this.writable) { // stream is writeable as can't catch epipe errors there resolve({error:'socket stream closed can not send packet'}) return } const cb = () => resolve({response:'packet written to consumer side socket stream '}) if (!super.write(packet)) { this.once('drain', cb) } else { process.nextTick(cb) } }) } // default packet process just a simple console logger. ignores any cmd: prop _packetProcess(packet) { console.log('default consumer processor -- log packet from socket to console') console.log('replace by calling .registerPacketProcessor(func) with your function') console.dir(packet) } } // end class export default SocketConsumer // CONNECTION HANDLERS async function connectHandler () { this.removeAllListeners('error') // turn off error handler during handshake this._doneAuthenticate = setTimeout(() =>{ let msg ='authentication not completed in 3 secs, forcing reconnect attempt' this.notify('failed',{msg:msg}) this.log('warn',msg) this.reconnect() },3000) this.on('data', this.stream.onData) this.stream.on('message', handshake.bind(this)) this.log('info','in process of connecting waiting for socket handshake/authenticate') this.notify('connecting') } async function handshake (packet) { if (packet._handshake) { this._connected = true this.notify('handshake') let authPacket = this._authenticate() || {} authPacket._authenticate = true authPacket.data = this._data let res = await this._authenticateSend(authPacket) this.stream.removeAllListeners('message') clearTimeout(this._doneAuthenticate) if (res.error) { let msg =`error during authentication ${res.error}, attempting reconnect in ${this.retryWait/1000}s to see if error clears` this.notify('error',{msg:msg}) this.log('error',msg) this._errorRetry = setTimeout(this.reconnect.bind(this),this.retryWait) } else { if (!res.authenticated) { let msg =`authentication failed: ${res.reason}, disconnecting with no reconnect` this.notify('failed',{msg:msg}) this.disconnect() } else { this._authenticated = res.authenticated let msg ='authentication succeeded connection ready' this.notify('connected',{msg:msg}) this._reconnectCount = 0 this.stream.on('message', messageHandler.bind(this)) // reset default message handler this.on('pushed', pushHandler.bind(this) ) this.once('error', errorHandler.bind(this)) // listen for errors on authenticated socket if (this.keepAlive) { // only attempt reconnect if keepAlive is set which it is by default this.on('ping',pingHandler.bind(this)) this.setKeepAlive(this.keepAlive,3000) // keep connection alive unless disabled } if (this.opts.conPacket) (this.send(this.conPacket)) } } } } // general message handler handler function messageHandler(packet) { if (packet._header.id !== 'ping') { // ping has it's own listner let obj = { msg:'incoming packet from socket sever',packet:packet} this.log('trace',obj) } this.emit(packet._header.id, packet) } // assume all errors are fatal and the socket needs to be disconnected/reconnected async function errorHandler (err) { this.disconnect() let msg = `error, this consumer is disconnected, trying reconnect in ${this.retryWait/1000} secs` this.notify('error',{error:err,msg:msg}) this.log('error',msg) this._errorRetry = setTimeout(this.reconnect.bind(this),this.retryWait) } // const pushHandler = async (packet) => { async function pushHandler (packet) { // TODO do some extra security here? let msg = {msg:'packed was pushed from socket sever, processing', packet:packet} this.log('trace',msg) let res = await this._packetProcess(packet) if (!res) { // if process was not promise returning then res will be undefined log.debug('consumer packet processing function was not promise returning') } } // const pingHandler = async (packet) => { async function pingHandler (packet) { if (this.heartBeat) console.log('lub dub') clearTimeout(this._ping) this._pingTimeout= (packet.pingInterval || 5000) + 1000 // set value from server monitorPing.call(this) } async function pingFailedHandler () { clearTimeout(this._pingFailed) clearTimeout(this._ping) // clear any others set this.removeAllListeners('ping') this.on('ping',pingHandler.bind(this)) let msg = 'ping has been received again, back to normal connection' this.notify('connected',{msg:msg}) this.log('info',msg) } function monitorPing () { this._ping = setTimeout( () => { this.removeAllListeners('ping') let msg = `ping failed waiting ${this.pingFailedTimeout/1000} secs to before forced reconnect` this.notify('offline',{msg:msg}) this.log('warn',msg) this.on('ping',pingFailedHandler.bind(this)) this._pingFailed = setTimeout (() => { this.removeAllListeners('ping') let msg =`no ping received for ${this.pingFailedTimeout/1000} secs, force disconnect/reconnect` this.notify('failed',{msg:msg}) this.log('warn',msg) this.reconnect() // this.emit('error',{code:'PING_FAILED'}) }, this.pingFailedTimeout) },this._pingTimeout) }