// node modules import { unlink as fileDelete } from 'fs' import { promisify } from 'util' import path from 'path' // npmjs modules import mkdir from 'make-dir' import btc from 'better-try-catch' import _ON_DEATH from 'death' //this is intentionally ugly import JSONStream from './json-stream' import clone from 'clone' // uci modules import logger from '@uci-utils/logger' let log = {} // must declare here and set later for module wide access export default function socketClass(Server) { // TODO change default pipe dir depending on OS linux,windows,mac /** @constant {String} DEFAULT_PIPE_DIR * @description SOCKETS_DIR environment variable or '/tmp/UCI' */ const DEFAULT_PIPE_DIR = process.env.SOCKETS_DIR || '/tmp/UCI' /** @constant {String} DEFAULT_SOCKET_NAME * @description for named pipe 'uci-sock' if not set in options */ const DEFAULT_SOCKET_NAME = 'uci-sock' /** * UCI Socket - class used to create a socket (server) that supports passing json packets * supports both named pipes and tcp sockets * also supports push of packets to all connected consumers (clients) * is extended from {@link https://nodejs.org/api/net.html#net_class_net_server | nodejs net.Server } * @extends Server */ return class Socket extends Server { /** * UCI Socket class constructor * @param {Object} opts hash of options * @param {String} options.host a tcp host name nornally not used as 0.0.0.0 is set by default * @param {String} options.port a tcp * @param {String | Boolean} options.path xeither full path to where socket should be created or if just 'true' then use default * @param {Boolean} options.clientTracking track connected clients for push notifications - default: true * @param {Object} options.conPacket A json operson's property * */ constructor(opts = {}) { super(opts) delete opts.key delete opts.cert this.id = opts.id || opts.name || 'socket:' + new Date().getTime() if (!opts.path) { opts.host = opts.host || '0.0.0.0' opts.port = opts.port || 8080 } else { if (typeof opts.path === 'boolean') opts.path = path.join(DEFAULT_PIPE_DIR, DEFAULT_SOCKET_NAME) if (path.dirname(opts.path) === '.') // relative path sent opts.path = path.join(DEFAULT_PIPE_DIR, opts.path) } this.clientTracking = opts.clientTracking || true this.clients = [] // track consumers (i.e. clients) this.opts = opts // for use to recover from selected errors //self bindings this._listen = this._listen.bind(this) this.create = this.create.bind(this) this.close = promisify(this.close).bind(this) log = logger({ file: 'src/socket.js', class: 'Socket', name: 'socket', id: this.id }) } // end constructor /** * create - Description * * @returns {type} Description */ async create() { return new Promise(async (resolve, reject) => { // set up a couple ways to gracefully destroy socket process is killed/aborted _ON_DEATH(async () => { log.info('\nhe\'s dead jim') await this._destroy() }) process.once('SIGUSR2', async () => { await this._destroy process.kill(process.pid, 'SIGUSR2') }) this.once('error', async err => { // recover from socket file that was not removed if (err.code === 'EADDRINUSE') { if (this.opts.path) { // if TCP socket should already be dead let [err, res] = await btc(promisify(fileDelete))(this.opts.path) if (!err) { log.info( { res: res, socket: this.opts.path }, 'socket already exists.....deleted' ) return await this._listen(this.opts) } log.fatal( { err: err }, 'error deleting socket. Can not establish a socket' ) return err } } if (err.code === 'EACCES') { log.info({ socket: this.opts.path, msg:'directory does not exist...creating'}) await mkdir(path.dirname(this.opts.path)) log.info({ socket: this.opts.path, msg:'directory created'}) return await this._listen(this.opts) } // otherwise fatally exit log.info(err, 'error creating socket') reject(err) }) let [err, res] = await btc(this._listen)(this.opts) if (err) reject(err) resolve(res) }) // end creeate promise } // end create /** * registerPacketProcessor - Description * @public * @param {func} Description * */ registerPacketProcessor(func) { this._packetProcess = func } /** * push - pushes a supplied UCI object packet to all connected clients * * @param {object} packet Description * @param {string} id the header id string of the pushed packet, default: 'pushed' * */ async push(packet, id) { packet._header = { id: id || 'pushed' } log.info( { opts: this.opts, packet: packet }, 'pushing a packet to all connected consumers' ) this.clients.forEach(async client => { if (client.writable) { let [err, ser] = await btc(client.stream.serialize)(packet) if (err) ser = await client.stream.serialize({ error: 'was not able to serialze the res packet', err: err, _header: { id: packet._header.id } }) if (!id || id === client.ID) await this._send.bind(client)(ser) } }) } async _listen(opts) { super.listen(opts, async (err, res) => { if (err) return err // this gets called for each client connection and is unique to each this.on('connection', async socket => { const stream = new JSONStream() socket.stream = stream // need this to track clients let send = this._send.bind(socket) if (this.clientTracking) this.clients.push(socket) // TODO add 'close' listener to socket to remove from this.clients log.info('new consumer connecting') log.info(await send(await stream.serialize({ _handshake: true }))) if (this.opts.conPacket) { this.opts.conPacket._header = { id: 'pushed' } log.info( { conPacket: this.opts.conPacket }, 'pushing a preset command to just connected consumer' ) send(await stream.serialize(this.opts.conPacket)) // send a packet command on to consumer on connection } socket.on('data', stream.onData) // TODO need to start error listener for stream so errors can be processed stream.on('message', messageProcess.bind(this, socket)) async function messageProcess(client, packet) { log.info({ packet: packet }, 'incoming packet on socket side') let res = {} if (this.clientTracking && packet.clientID) { client.ID = packet.clientID res.cmd = 'ackID' } else { res = (await this._packetProcess(clone(packet))) || {} if (Object.keys(res).length === 0) res = { error: 'socket packet command function likely did not return a promise', packet: packet } } if (packet) { res._header = clone(packet._header, false) || {} //make sure return packet has header with id in case it was removed in processing delete packet._header // remove before adding to response header as request } else res._header = {} res._header.request = clone(packet, false) res._header.responder = { name: this.name, instanceID: this.id } res._header.socket = this.address() if (!res.cmd) res.cmd = 'reply' // by default return command is 'reply' let [err, ser] = await btc(stream.serialize)(res) if (err) ser = await stream.serialize({ error: 'was not able to serialze the res packet', err: err, _header: { id: res._header.id } }) log.info(await send(ser)) } // end process message }) // end connecttion consumer log.info({ opts: this.opts }, 'socket created and listening') return res }) // end super listen callback } // end listen async _destroy() { log.info('closing down socket') await this.close() log.info('all connections closed....exiting') process.exit() } // default packet process, just a simple echo async _packetProcess(packet) { return new Promise(resolve => { resolve(packet) }) } // must have a consumer socket bound to use async _send(packet) { // timeout already set if sockect can't be drained in 10 secs return new Promise(resolve => { const cb = () => resolve('packet written to socket stream') if (!this.write(packet)) { this.once('drain', cb) } else { process.nextTick(cb) } }) } } // end class } // end function makeSocketClass