import { Socket } from 'net' import path from 'path' import { promisify } from 'util' import pause from 'delay' import btc from 'better-try-catch' import JsonStream from './json-stream' // import logger from '../../uci-logger/src/logger' import logger from '@uci-utils/logger' let log = {} // TODO change default pipe dir for windows and mac os const DEFAULT_PIPE_DIR = process.env.SOCKETS_DIR || '/tmp/UCI' const DEFAULT_SOCKET_NAME = 'uci-sock' /** * Socket Consumer - connects to UCI TCP or Named Pipe Sockets and coummunicates with uci packet.
* Extends {@link https://nodejs.org/api/net.html#net_class_net_socket | nodejs net.Socket} * @extends Socket */ class SocketConsumer extends Socket { /** * constructor - Description * * @param {object} [opts={}] test */ constructor(opts = {}) { super() log = logger({ file: 'src/consumer.js', class: 'Consumer', name: 'socket', id: this.id }) this.id = opts.id || opts.name || 'socket:' + new Date().getTime() if (!opts.path) { if(!opts.host) log.warn({method:'constructor', line:38, opts: opts, msg:'no host supplied using localhost...use named piped instead - opts.path'}) opts.host = opts.host || '127.0.0.1' opts.port = opts.port || 8080 } else { if (typeof opts.path === 'boolean') opts.path = path.join(DEFAULT_PIPE_DIR, DEFAULT_SOCKET_NAME) if (path.dirname(opts.path) === '.') opts.path = path.join(DEFAULT_PIPE_DIR, opts.path) } this.opts = opts // default is keepAlive true, must set to false to explicitly disable // if keepAlive is true then consumer will also be reconnecting consumer this.initTimeout = opts.initTimeout * 1000 || 60000 this.retryWait = opts.retryWait * 1000 || 3000 this.keepAlive = 'keepAlive' in opts ? opts.keepAlive : true this._ready = false this.stream = new JsonStream() // bind to class for other class functions this.connect = this.connect.bind(this) this.close = promisify(this.end).bind(this) // this.__ready = this.__ready.bind(this) this._conAttempt = 1 this._aborted = false this._reconnect = false // this._write = this._write.bind(this) } async connect() { return new Promise((resolve, reject) => { if (this.opts.host === '127.0.0.1'|| this.opts.host ==='localhost') log.warn({method:'connect', line:107, msg:'tcp consumer on same machine as host, use named Pipe(Unix) Socket Instead'}) log.debug('first connnect attempt for', this.opts.name) let initTimeout = setTimeout(() => { this.removeAllListeners() log.fatal({method:'connect', line:69, opts: this.opts, msg:`unable to initially connect to ${this.opts.name} in ${this.initTimeout/1000} secs no more attempts!`}) this.removeAllListeners() this.stream.removeAllListeners() this.destroy() reject({ opts: this.opts, msg: `unable to connect initially to socket server in ${this.initTimeout/1000} secs, giving up no more attempts`}) } , this.initTimeout) const initialHandshake = async (packet) => { if (packet._handshake) { log.debug({method:'connect', line:87, msg:'handshake received ready for communication'}) this.stream.removeAllListeners('message') this.removeListener('error',initialErrorHandler) clearTimeout(initTimeout) this._listen() // setup for active connection this._ready = true this.emit('connection','connected') resolve('initial connection successful') } } const initialConnectHandler = async () => { this.on('data', this.stream.onData) this.stream.on('message', initialHandshake.bind(this)) log.debug({method:'connect', line:113, msg:'connected waiting for socket ready handshake'}) } const initialErrorHandler = async (err) => { log.debug({method:'connect', line:101, error:err, msg:`error during initial connect, trying again in ${this.retryWait/1000} secs`}) await pause(this.retryWait) super.connect(this.opts) } this.once('connect', initialConnectHandler) this.on('error', initialErrorHandler) super.connect(this.opts) }) // end initial promise } async send(ipacket) { return new Promise(async resolve => { if (!this._ready) resolve({ error: 'socket consumer not connected, aborting send' }) let packet = Object.assign({}, ipacket) // need to avoid mutuation for different consumers using same packet instance setTimeout(() => {resolve({ error: 'no response from socket in 10sec' })}, 10000) packet._header = { id: Math.random() .toString() .slice(2), // need this for when multiple sends for different consumers use same packet instanceack sender: { name: this.name, instanceID: this.id }, path: this.opts.path, port: this.opts.port, host: this.opts.host } let [err, res] = await btc(this.stream.serialize)(packet) if (err) resolve({error: 'unable to serialize packet for sending',packet: packet}) await this.__write(res) this.once(packet._header.id, async function(reply) { let res = await this._packetProcess(reply) if (!res) { // if packetProcess was not promise res = reply log.debug({method:'send', line:180, msg:'consumer function was not promise returning further processing may be out of sequence'}) } resolve(res) // resolves processed packet not return packet }) //end listener }) } // TODO register user alt stream processor (emit 'message' with JSON, serialize function, onData method for raw socket chucks) registerPacketProcessor(func) { this._packetProcess = func } // PRIVATE METHODS // set up incoming message listening and error/reonnect handling async _listen() { // Define Handlers and Other Functions const reconnectHandler = () => { this.stream.once('message', handshake.bind(this)) log.debug({method:'connect', line:113, msg:'connected waiting for socket ready handshake'}) } const handshake = async (packet) => { if (packet._handshake) { log.debug({method:'connect', line:87, msg:'handshake received ready for communication'}) this.stream.on('message', messageProcess.bind(this)) if (this.keepAlive) { // only attempt reconnect if keepAlive is set which it is by default this.on('ping',pingHandler) this.setKeepAlive(this.keepAlive,3000) // keep connection alive unless disabled } this._ready = true this.emit('connection','reconnected') } } const errorHandler = async (err) => { log.debug({msg:'connection error emitted ', error:err}) this._ready = false if (err.code !== 'EISCONN') { this.emit('connection', err) log.debug({ method:'connect', line:130, error: err.code, msg:`connect error ${err.code}, attempting reconnect after ${this.retryWait/1000} secs`}) await pause(this.retryWait) this.stream.removeAllListeners('message') // remove regular message handler in prep for reconnect this.removeAllListeners('connect') this.once('connect',reconnectHandler) super.connect(this.opts) } else { this._ready = true this.emit('connection','reconnected') log.error({method:'connect', line:69, msg:'reconnected to socket, ready to go again'}) } } async function messageProcess(packet) { log.debug('incoming packet from socket sever',packet) this.emit(packet._header.id, packet) } const pushHandler = async (packet) => { // TODO do some extra security here? log.debug('packed was pushed from socket sever, processing',packet) let res = await this._packetProcess(packet) if (!res) { // if process was not promise returning then res will be undefined log.debug('consumer packet processing function was not promise returning') } } const pingHandler = async (packet) => { clearTimeout(pingTimeout) log.info({method:'connect', line:191, msg:'received ping, restting timeout'}) this._pingTimeout= packet.pingInterval + 1000 monitorPing.call(this) } let pingTimeout = {} function monitorPing () { pingTimeout = setTimeout( () => { log.error({method:'connect', line:142, msg:'socket (server) not availabe'}) this.removeAllListeners('ping') this._ready = false this.emit('error', { code: 'PING-FAILED' }) },this._pingTimeout) } // Start Message Listening and Error/Reconnect Handling log.debug('listening for incoming packets from socket') this.stream.on('message', messageProcess.bind(this)) this.setKeepAlive(this.keepAlive,3000) // keep connection alive unless disabled this.on('pushed', pushHandler ) this.on('error', errorHandler) if (this.keepAlive) { // keepAlive also activates ping Monitor this.on('ping',pingHandler) } } async __write(packet) { // timeout already set if sockect can't be drained in 10 secs return new Promise(resolve => { const cb = () => resolve('packet written to consumer side socket stream ') if (!super.write(packet)) { this.once('drain', cb) } else { process.nextTick(cb) } }) } // default packet process just a simple console logger. ignores any cmd: prop _packetProcess(packet) { console.log('default consumer processor -- log packet from socket to console') console.log('replace by calling .registerPacketProcessor(func) with your function') console.dir(packet) } } // end class export default SocketConsumer // _removeListeners(events) { // if (!events) this.removeAllListeners() // else { // if (typeof events === 'string') this.removeAllListeners(events) // else events.map(event => this.removeAllListeners(event)) // } // } // Helper Functions // wait until a passed ready function returns true // function isReady(ready, wait = 100, timeout = 2000) { // let time = 0 // return new Promise((resolve, reject) => { // (function waitReady() { // if (time > timeout) // return reject( // `timeout waiting for socket ready handshake - ${timeout}ms` // ) // if (ready()) return resolve('ready') // log.debug({function:'isReady', line:261, msg:`waiting ${wait}ms for handshake`}) // time += wait // setTimeout(waitReady, wait) // })() // }) // }