import { Socket } from 'net' import path from 'path' import { promisify } from 'util' import btc from 'better-try-catch' import JsonStream from './json-stream' // import logger from '../../uci-logger/src/logger' import logger from '@uci-utils/logger' let log = {} // TODO change default pipe dir for windows and mac os const DEFAULT_PIPE_DIR = process.env.SOCKETS_DIR || '/tmp/UCI' const DEFAULT_SOCKET_NAME = 'uci-sock' /** * Socket Consumer - connects to UCI TCP or Named Pipe Sockets and coummunicates with uci packet.
* Extends {@link https://nodejs.org/api/net.html#net_class_net_socket | nodejs net.Socket} * @extends Socket */ class SocketConsumer extends Socket { /** * constructor - Description * * @param {object} [opts={}] test */ constructor(opts = {}) { super() log = logger({ file: 'src/consumer.js', class: 'Consumer', name: 'socket', id: this.id }) this.id = opts.id || opts.name || 'socket:' + new Date().getTime() if (!opts.path) { if(!opts.host) log.warn({ opts: opts, msg:'no host supplied using localhost...use named piped instead - opts.path'}) opts.host = opts.host || '127.0.0.1' opts.port = opts.port || 8080 } else { if (typeof opts.path === 'boolean') opts.path = path.join(DEFAULT_PIPE_DIR, DEFAULT_SOCKET_NAME) if (path.dirname(opts.path) === '.') opts.path = path.join(DEFAULT_PIPE_DIR, opts.path) } this.opts = opts // default is keepAlive true, must set to false to explicitly disable // if keepAlive is true then consumer will also be reconnecting consumer this.keepAlive = 'keepAlive' in opts ? opts.keepAlive : true this._ready = false this.timeout = opts.timeout || 60 // initial connect timeout in secs and then rejects this.wait = opts.wait || 2 this.stream = new JsonStream() // bind to class for other class functions this.connect = this.connect.bind(this) this.close = promisify(this.end).bind(this) this.__ready = this.__ready.bind(this) // this._write = this._write.bind(this) } async connect() { return new Promise((resolve, reject) => { let initial = true // this is only for initial connection const initTimeout = setTimeout(() => { log.fatal({ opts: this.opts }, `unable to connect in ${this.timeout}s`) reject( { opts: this.opts }, `unable to connect to socket server in ${this.timeout}secs` ) }, this.timeout * 1000) this.once('connect', async () => { clearTimeout(initTimeout) this._listen() log.info({ opts: this.opts, msg:'initial connect waiting for socket ready handshake'}) this.setKeepAlive(this.keepAlive, 3000) let [err, res] = await btc(isReady).bind(this)(this.__ready,this.wait,this.timeout) if (err) reject(err) initial = false log.info('handshake to socket done, authenticating') // TODO authenticate here by encrypting a payload with private key and sending that. // await btc(authenticate) this.emit('connected') // for end users to take action resolve(res) }) let reconTimeout // function that sets a reconnect timeout const reconnect = () => { reconTimeout = setTimeout(() => { this.removeAllListeners() this.stream.removeAllListeners() this.destroy() connect() }, this.wait * 1000) } // connection function that sets listeners and deals with reconnect const connect = () => { if (this.opts.host === '127.0.0.1'|| this.opts.host ==='localhost') log.warn('tcp consumer on same machine as host, use named Pipe(Unix) Socket Instead') if(!initial) { this.once('connect', async () => { clearTimeout(reconTimeout) this._listen() log.info({msg:'reconnected waiting for socket ready handshake'}) this.setKeepAlive(this.keepAlive, 3000) let [err, res] = await btc(isReady).bind(this)(this.__ready,this.wait,this.timeout) if (err) reject(err) log.info('rehandshake done, reauthenticating') // TODO authenticate here by encrypting a payload with private key and sending that. // await btc(authenticate) this.emit('reconnected') // for end users to take action resolve(res) }) } this.on('error', async err => { if (err.code !== 'EISCONN') { this._ready = false this.emit('ready', false) log.warn({ error: err.code }, `connect error ${err.code}, attempting reconnect`) reconnect() } else { this._ready = true this.emit('ready', true) log.info('reconnected to socket, ready to go again') } }) if (this.keepAlive) { // only attempt reconnect is keepAlive is set which it is by default this.on('end', async () => { log.warn('socket (server) terminated unexpectantly') this._ready = false log.info('keep alive was set, so waiting on server to come online for reconnect') this.emit('error', { code: 'DISCONNECTED' }) }) } // attempt connection log.info({ opts: this.opts, msg:`attempting to connect ${this.id} to socket`}) super.connect(this.opts) } // end connect function connect() // initial connect request }) //end promise } async send(ipacket) { return new Promise(async resolve => { if (!this._ready) resolve({ error: 'socket consumer not connected, aborting send' }) let packet = Object.assign({}, ipacket) // need to avoid mutuation for different consumers using same packet instance setTimeout(() => {resolve({ error: 'no response from socket in 10sec' })}, 10000) packet._header = { id: Math.random() .toString() .slice(2), // need this for when multiple sends for different consumers use same packet instanceack sender: { name: this.name, instanceID: this.id }, path: this.opts.path, port: this.opts.port, host: this.opts.host } let [err, res] = await btc(this.stream.serialize)(packet) if (err) resolve({error: 'unable to serialize packet for sending',packet: packet}) await this.__write(res) this.once(packet._header.id, async function(reply) { let res = await this._packetProcess(reply) if (!res) { // if packetProcess was not promise res = reply log.warn('consumer function was not promise returning further processing may be out of sequence') } resolve(res) }) //end listener }) } // TODO register user alt stream processor (emit 'message' with JSON, serialize function, onData method for raw socket chucks) // TODO register authenciation function (set up default) registerPacketProcessor(func) { this._packetProcess = func } // PRIVATE METHODS async __write(packet) { // timeout already set if sockect can't be drained in 10 secs return new Promise(resolve => { const cb = () => resolve('packet written to consumer side socket stream ') if (!super.write(packet)) { this.once('drain', cb) } else { process.nextTick(cb) } }) } __ready() { return this._ready } async _listen() { log.info('listening for incoming packets from socket') // listen for pushed packets this.on('pushed', async function(packet) { // TODO do some extra security here? let res = await this._packetProcess(packet) if (!res) { // if process was not promise returning then res will be undefined log.warn('consumer packet processing function was not promise returning') } }) // listen on socket stream this.on('data', this.stream.onData) this.stream.on('message', messageProcess.bind(this)) async function messageProcess(packet) { log.debug('incoming packet from socket',packet) if (packet._handshake) { this._ready = true return } // TODO send back ack with consumer ID and authorization and wait // when authorized drop through here to emit this.emit(packet._header.id, packet) } } // default packet process just a simple console logger. ignores any cmd: prop _packetProcess(packet) { console.log('default consumer processor -- log packet from socket to console') console.log('replace by calling .registerPacketProcessor(func) with your function') console.dir(packet) } } // end class export default SocketConsumer // Helper Functions // wait until a passed ready function returns true function isReady(ready, wait = 30, timeout = 1000) { let time = 0 return new Promise((resolve, reject) => { (function waitReady() { if (time > timeout) return reject( `timeout waiting for socket ready handshake - ${timeout}ms` ) if (ready()) return resolve('ready') log.info(`waiting ${wait}ms for handshake`) time += wait setTimeout(waitReady, wait) })() }) }