import { Socket } from 'net' import path from 'path' import { promisify } from 'util' import pause from 'delay' import btc from 'better-try-catch' import JsonStream from './json-stream' // import logger from '../../uci-logger/src/logger' import logger from '@uci-utils/logger' let log = {} // TODO change default pipe dir for windows and mac os const DEFAULT_PIPE_DIR = process.env.SOCKETS_DIR || '/tmp/UCI' const DEFAULT_SOCKET_NAME = 'uci-sock' /** * Socket Consumer - connects to UCI TCP or Named Pipe Sockets and coummunicates with uci packet.
* Extends {@link https://nodejs.org/api/net.html#net_class_net_socket | nodejs net.Socket} * @extends Socket */ class SocketConsumer extends Socket { /** * constructor - Description * * @param {object} [opts={}] test */ constructor(opts = {}) { super() log = logger({ file: 'src/consumer.js', class: 'Consumer', name: 'socket', id: this.id }) this.id = opts.id || opts.name || 'socket:' + new Date().getTime() if (!opts.path) { if(!opts.host) log.warn({method:'constructor', line:38, opts: opts, msg:'no host supplied using localhost...use named piped instead - opts.path'}) opts.host = opts.host || '127.0.0.1' opts.port = opts.port || 8080 } else { if (typeof opts.path === 'boolean') opts.path = path.join(DEFAULT_PIPE_DIR, DEFAULT_SOCKET_NAME) if (path.dirname(opts.path) === '.') opts.path = path.join(DEFAULT_PIPE_DIR, opts.path) } this.opts = opts // default is keepAlive true, must set to false to explicitly disable // if keepAlive is true then consumer will also be reconnecting consumer this.initTimeout = opts.initTimeout==null ? 60000 : opts.initTimeout * 1000 this.retryWait = opts.retryWait==null ? 5000 : opts.retryWait * 1000 this.keepAlive = 'keepAlive' in opts ? opts.keepAlive : true this._connected = false this._authenticated = false this.stream = new JsonStream() // bind to class for other class functions this.connect = this.connect.bind(this) this.close = promisify(this.end).bind(this) // this.__ready = this.__ready.bind(this) this._conAttempt = 1 this._aborted = false this._reconnect = false this.retryPause = {} // timeout that may need to be cancelled if init timeout throws // this._packetProcess = this._packetProcess.bind(this) } get connected() { return this._connected} get active() { return !!this._authenticated } async connect() { return new Promise((resolve, reject) => { if (this.opts.host === '127.0.0.1'|| this.opts.host ==='localhost') log.warn({method:'connect', line:107, msg:'tcp consumer on same machine as host, use named Pipe(Unix) Socket Instead'}) log.debug('first connnect attempt for', this.opts.id) this.emit('status',{level:'info', msg:'attempting initial connection', id:this.id, opts:this.opts, ready:false}) console.log('TIMEOUT IN SOCKE CONNECT',this.initTimeout) let initTimeout = {} if (this.initTimeout > 499) { initTimeout = setTimeout(() => { clearTimeout(this.retryPause) this.emit('status',{level:'error', msg:'initial connection timed out', id:this.id, timeout:this.initTimeout, wait:this.retryWait, opts:this.opts, ready:false}) this.removeAllListeners() log.fatal({method:'connect', line:69, opts: this.opts, msg:`unable to initially connect to ${this.opts.name} in ${this.initTimeout/1000} secs no more attempts!`}) this.stream.removeAllListeners() reject({ opts: this.opts, msg: `unable to connect initially to socket server in ${this.initTimeout/1000} secs, giving up no more attempts`}) } , this.initTimeout) } const initialHandshake = async (packet) => { if (packet._handshake) { clearTimeout(initTimeout) this._connected = true let authPacket = this._authenticate() || {} authPacket._authenticate = true authPacket.clientName = this.id let res = (await this._authenticateSend(authPacket)) || {} if (!res.authenticated) { this.emit('status',{level:'info', msg:`authentication failed: ${res.reason}`, id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected}) reject('unable to authenticate') } else { this._authenticated = res.authenticated this.removeListener('error',initialErrorHandler) this._listen() // setup for active connection log.info({method:'connect', line:87, msg:'initial connect/authentication complete ready for communication'}) this.emit('status',{level:'info', msg:'authentication succesfull', id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected}) this.emit('consumer-connection',{state:'connected',name:this.id}) if (this.opts.conPacket) (this.send(this.conPacket)) resolve('initial connection successful') } } } const initialConnectHandler = async () => { this.on('data', this.stream.onData) this.stream.once('message', initialHandshake.bind(this)) log.debug({method:'connect', line:113, msg:'connected waiting for socket ready handshake'}) this.emit('status',{level:'debug', msg:'consumer connected'}) } const initialErrorHandler = async (err) => { let msg = {level:'error', method:'connect', line:101, error:err, msg:`error during initial connect, trying again in ${this.retryWait/1000} secs`} log.error(msg) this.emit('status',msg) let connect = () => { super.connect(this.opts)} this.retryPause = setTimeout(connect.bind(this),this.retryWait) } this.once('connect', initialConnectHandler) this.on('error', initialErrorHandler) super.connect(this.opts) }) // end initial promise } async send(ipacket) { return new Promise(async resolve => { if (!this._connected) { resolve({ error: 'socket consumer not connected, aborting send' }) } let packet = Object.assign({}, ipacket) // need to avoid mutuation for different consumers using same packet instance setTimeout(() => {resolve({ error: 'no response from socket in 10sec' })}, 10000) packet._header = { id: Math.random() .toString() .slice(2), // need this for when multiple sends for different consumers use same packet instanceack sender: { name: this.name, instanceID: this.id }, path: this.opts.path, port: this.opts.port, host: this.opts.host } let [err, res] = await btc(this.stream.serialize)(packet) if (err) resolve({error: 'unable to serialize packet for sending',packet: packet}) await this.__write(res) this.once(packet._header.id, async function(reply) { let res = await this._packetProcess(reply) if (!res) { // if packetProcess was not promise res = reply log.debug({method:'send', line:180, msg:'consumer function was not promise returning further processing may be out of sequence'}) } resolve(res) // resolves processed packet not return packet }) //end listener }) } // TODO register user alt stream processor (emit 'message' with JSON, serialize function, onData method for raw socket chucks) registerPacketProcessor(func) { this._packetProcess = func } // func should return an object the server expects registerAuthenticator (func) { this._authenticate = func } // PRIVATE METHODS // default authentication using a simple token _authenticate () { return { token: process.env.UCI_CLIENT_TOKEN || this.token || 'default' } } async _authenticateSend (authPacket={}) { return new Promise(async resolve => { setTimeout(() => {resolve({ error: 'no response from socket in 10sec' })}, 10000) let [err, res] = await btc(this.stream.serialize)(authPacket) if (err) resolve({error: 'unable to serialize packet for sending',packet: authPacket}) this.stream.on('message',(res) => { this.stream.removeAllListeners('message') resolve(res) }) await this.__write(res) }) } // set up incoming message listening and error/reonnect handling async _listen() { // Define Handlers and Other Functions const reconnectHandler = () => { this.stream.once('message', handshake.bind(this)) log.debug({method:'connect', line:113, msg:'connected waiting for socket ready handshake'}) this.emit('status',{level:'info', msg:'attemping reconnect', id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected}) } const handshake = async (packet) => { if (packet._handshake) { this._connected = true let authPacket = this._authenticate() || {} authPacket._authenticate = true authPacket.clientName = this.id let res = (await this._authenticateSend(authPacket)) || {} if (!res.authenticated) { this.emit('status',{level:'error', msg:`authentication failed: ${res.reason}`, id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected}) this.emit('error',{code:'authentification failed'}) } else { this._authenticated = res.authenticated log.info({method:'connect', line:87, msg:'initial connect/authentication complete ready for communication'}) this.emit('status',{level:'info', msg:'authentication successful', id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected}) if (this.keepAlive) { // only attempt reconnect if keepAlive is set which it is by default this.on('ping',pingHandler) this.setKeepAlive(this.keepAlive,3000) // keep connection alive unless disabled } this.stream.on('message', messageHandler.bind(this)) // reset default message handler this.emit('status',{level:'info', msg:'reconnected', id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected}) this.emit('consumer-connection', {state:'reconnected', name:this.id}) if (this.opts.conPacket) (this.send(this.conPacket)) } } } const errorHandler = async (err) => { log.debug({msg:'connection error emitted ', error:err}) this._connected = false this._authenticated = false this.emit('status',{level:'error', msg:'connection error', id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected}) log.debug({ method:'connect', line:130, error: err.code, msg:`connect error ${err.code}, attempting reconnect after ${this.retryWait/1000} secs`}) this.emit('consumer-connection', {state:'disconnected', name:this.id}) await pause(this.retryWait) this.stream.removeAllListeners('message') // remove regular message handler in prep for reconnect this.removeAllListeners('connect') this.removeAllListeners('ping') this.once('connect',reconnectHandler) super.connect(this.opts) } const pushHandler = async (packet) => { // TODO do some extra security here? log.debug('packed was pushed from socket sever, processing',packet) let res = await this._packetProcess(packet) if (!res) { // if process was not promise returning then res will be undefined log.debug('consumer packet processing function was not promise returning') } } const pingHandler = async (packet) => { clearTimeout(pingTimeout) log.trace({method:'connect', line:191, msg:'received ping, restting timeout'}) this._pingTimeout= packet.pingInterval + 1000 monitorPing.call(this) } let pingTimeout = {} function monitorPing () { pingTimeout = setTimeout( () => { log.error({method:'connect', line:142, msg:'socket (server) not availabe'}) this.removeAllListeners('ping') this._connected = false this.emit('error', { code: 'PING-FAILED' }) },this._pingTimeout) } // general handler function messageHandler(packet) { if (packet._header.id !== 'ping') log.debug('incoming packet from socket sever',packet) this.emit(packet._header.id, packet) } // Start Message Listening and Error/Reconnect Handling log.debug('listening for incoming packets from socket') this.stream.on('message', messageHandler.bind(this)) this.setKeepAlive(this.keepAlive,3000) // keep connection alive unless disabled this.on('pushed', pushHandler ) this.on('error', errorHandler) if (this.keepAlive) { // keepAlive also activates ping Monitor this.on('ping',pingHandler) } } async __write(packet) { // timeout already set if sockect can't be drained in 10 secs return new Promise(resolve => { const cb = () => resolve('packet written to consumer side socket stream ') if (!super.write(packet)) { this.once('drain', cb) } else { process.nextTick(cb) } }) } // default packet process just a simple console logger. ignores any cmd: prop _packetProcess(packet) { console.log('default consumer processor -- log packet from socket to console') console.log('replace by calling .registerPacketProcessor(func) with your function') console.dir(packet) } } // end class export default SocketConsumer