diff --git a/package.json b/package.json index fe5fb20..c2c5a82 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@uci/socket", - "version": "0.2.2", + "version": "0.2.3", "description": "JSON packet intra(named)/inter(TCP) host communication over socket", "main": "src", "scripts": { @@ -48,7 +48,7 @@ "nodemon": "^1.18.6" }, "dependencies": { - "@uci/logger": "0.0.7", + "@uci/logger": "0.0.9", "better-try-catch": "^0.6.2", "clone": "^2.1.2", "death": "^1.1.0", diff --git a/src/index.js b/src/index.js index 1d77f13..af874e5 100644 --- a/src/index.js +++ b/src/index.js @@ -5,4 +5,4 @@ import Consumer from './consumer' export { sSocket as sSocket } export { Socket as Socket } export { Consumer as Consumer } -export default { Socket, sSocket, Consumer } +export default { Socket, sSocket, Consumer } diff --git a/src/sSocket.js b/src/sSocket.js new file mode 100644 index 0000000..e570de1 --- /dev/null +++ b/src/sSocket.js @@ -0,0 +1,6 @@ +// returns an encrypted/secure socket class +import { Server } from 'tls' +import socketClass from './socket-class' + +export default (() => { + return socketClass(Server) })() diff --git a/src/socket-class.js b/src/socket-class.js new file mode 100644 index 0000000..333b1b7 --- /dev/null +++ b/src/socket-class.js @@ -0,0 +1,255 @@ +// node modules +import { unlink as fileDelete } from 'fs' +import { promisify } from 'util' +import path from 'path' +// npmjs modules +import mkdir from 'make-dir' +import btc from 'better-try-catch' +import _ON_DEATH from 'death' //this is intentionally ugly +import JSONStream from './json-stream' +import clone from 'clone' +// uci modules +import logger from '@uci/logger' +let log = {} // must declare here and set later for module wide access + +export default function socketClass(Server) { + +// TODO change default pipe dir depending on OS linux,windows,mac +/** @constant {String} DEFAULT_PIPE_DIR + * @description SOCKETS_DIR environment variable or '/tmp/UCI' + */ + const DEFAULT_PIPE_DIR = process.env.SOCKETS_DIR || '/tmp/UCI' + /** @constant {String} DEFAULT_SOCKET_NAME + * @description for named pipe 'uci-sock' if not set in options */ + const DEFAULT_SOCKET_NAME = 'uci-sock' + + /** + * UCI Socket - class used to create a socket (server) that supports passing json packets + * supports both named pipes and tcp sockets + * also supports push of packets to all connected consumers (clients) + * is extended from {@link https://nodejs.org/api/net.html#net_class_net_server | nodejs net.Server } + * @extends Server + */ + + + return class Socket extends Server { + /** + * UCI Socket class constructor + * @param {Object} opts hash of options + * @param {String} options.host a tcp host name nornally not used as 0.0.0.0 is set by default + * @param {String} options.port a tcp + * @param {String | Boolean} options.path xeither full path to where socket should be created or if just 'true' then use default + * @param {Boolean} options.clientTracking track connected clients for push notifications - default: true + * @param {Object} options.conPacket A json operson's property + * + */ + constructor(opts = {}) { + super(opts) + delete opts.key + delete opts.cert + this.id = opts.id || opts.name || 'socket:' + new Date().getTime() + if (!opts.path) { + opts.host = opts.host || '0.0.0.0' + opts.port = opts.port || 8080 + } else { + if (typeof opts.path === 'boolean') + opts.path = path.join(DEFAULT_PIPE_DIR, DEFAULT_SOCKET_NAME) + if (path.dirname(opts.path) === '.') + opts.path = path.join(DEFAULT_PIPE_DIR, opts.path) + } + this.clientTracking = opts.clientTracking || true + this.clients = [] // track consumers (i.e. clients) + this.opts = opts // for use to recover from selected errors + //self bindings + this._listen = this._listen.bind(this) + this.create = this.create.bind(this) + log = logger({ + file: 'src/socket.js', + class: 'Socket', + name: 'socket', + id: this.id + }) + } // end constructor + + /** + * create - Description + * + * @returns {type} Description + */ + async create() { + return new Promise(async (resolve, reject) => { + // set up a couple ways to gracefully destroy socket process is killed/aborted + _ON_DEATH(async () => { + log.info('\nhe\'s dead jim') + await this._destroy() + }) + process.once('SIGUSR2', async () => { + await this._destroy + process.kill(process.pid, 'SIGUSR2') + }) + + this.once('error', async err => { + // recover from socket file that was not removed + if (err.code === 'EADDRINUSE') { + if (this.opts.path) { + // if TCP socket should already be dead + let [err, res] = await btc(promisify(fileDelete))(this.opts.path) + if (!err) { + log.info( + { res: res, socket: this.opts.path }, + 'socket already exists.....deleted' + ) + return await this._listen(this.opts) + } + log.fatal( + { err: err }, + 'error deleting socket. Can not establish a socket' + ) + return err + } + } + if (err.code === 'EACCES') { + log.info({ socket: this.opts.path, msg:'directory does not exist...creating'}) + await mkdir(path.dirname(this.opts.path)) + log.info({ socket: this.opts.path, msg:'directory created'}) + return await this._listen(this.opts) + } + // otherwise fatally exit + log.info(err, 'error creating socket') + reject(err) + }) + + let [err, res] = await btc(this._listen)(this.opts) + if (err) reject(err) + resolve(res) + }) // end creeate promise + } // end create + + /** + * registerPacketProcessor - Description + * @public + * @param {func} Description + * + */ + registerPacketProcessor(func) { + this._packetProcess = func + } + + /** + * push - pushes a supplied UCI object packet to all connected clients + * + * @param {object} packet Description + * @param {string} id the header id string of the pushed packet, default: 'pushed' + * + */ + async push(packet, id) { + packet._header = { id: id || 'pushed' } + log.info( + { opts: this.opts, packet: packet }, + 'pushing a packet to all connected consumers' + ) + this.clients.forEach(async client => { + if (client.writable) { + let [err, ser] = await btc(client.stream.serialize)(packet) + if (err) + ser = await client.stream.serialize({ + error: 'was not able to serialze the res packet', + err: err, + _header: { id: packet._header.id } + }) + if (!id || id === client.ID) await this._send.bind(client)(ser) + } + }) + } + + async _listen(opts) { + super.listen(opts, async (err, res) => { + if (err) return err + // this gets called for each client connection and is unique to each + this.on('connection', async socket => { + const stream = new JSONStream() + socket.stream = stream // need this to track clients + let send = this._send.bind(socket) + if (this.clientTracking) this.clients.push(socket) + // TODO add 'close' listener to socket to remove from this.clients + log.info('new consumer connecting') + log.info(await send(await stream.serialize({ _handshake: true }))) + if (this.opts.conPacket) { + this.opts.conPacket._header = { id: 'pushed' } + log.info( + { conPacket: this.opts.conPacket }, + 'pushing a preset command to just connected consumer' + ) + send(await stream.serialize(this.opts.conPacket)) // send a packet command on to consumer on connection + } + socket.on('data', stream.onData) + // TODO need to start error listener for stream so errors can be processed + stream.on('message', messageProcess.bind(this, socket)) + + async function messageProcess(client, packet) { + log.info({ packet: packet }, 'incoming packet on socket side') + let res = {} + if (this.clientTracking && packet.clientID) { + client.ID = packet.clientID + res.cmd = 'ackID' + } else { + res = (await this._packetProcess(clone(packet))) || {} + if (Object.keys(res).length === 0) + res = { + error: + 'socket packet command function likely did not return a promise', + packet: packet + } + } + if (packet) { + res._header = clone(packet._header, false) || {} //make sure return packet has header with id in case it was removed in processing + delete packet._header // remove before adding to response header as request + } else res._header = {} + res._header.request = clone(packet, false) + res._header.responder = { name: this.name, instanceID: this.id } + res._header.socket = this.address() + if (!res.cmd) res.cmd = 'reply' // by default return command is 'reply' + let [err, ser] = await btc(stream.serialize)(res) + if (err) + ser = await stream.serialize({ + error: 'was not able to serialze the res packet', + err: err, + _header: { id: res._header.id } + }) + log.info(await send(ser)) + } // end process message + }) // end connecttion consumer + log.info({ opts: this.opts }, 'socket created and listening') + return res + }) // end super listen callback + } // end listen + + async _destroy() { + log.info('closing down socket') + await this.close() + log.info('all connections closed....exiting') + process.exit() + } + + // default packet process, just a simple echo + async _packetProcess(packet) { + return new Promise(resolve => { + resolve(packet) + }) + } + + // must have a consumer socket bound to use + async _send(packet) { + // timeout already set if sockect can't be drained in 10 secs + return new Promise(resolve => { + const cb = () => resolve('packet written to socket stream') + if (!this.write(packet)) { + this.once('drain', cb) + } else { + process.nextTick(cb) + } + }) + } + } // end class + +} // end function makeSocketClass