// Direct External Dependencies // none // UCI dependencies // UCI communication transport communication modules // TODO change to dynamic import so loads only if that socket type is requestd import Socket from '@uci/socket' // tcp or named pipe import MQTT from '@uci/mqtt' // requires broker import WebSocket from '@uci/websocket' // server only - client is for web browser only // UCI helpers import { bindFuncs } from '@uci/utils/src/function' // UCI logger import logger from '@uci/logger' let log = {} // declare module wide log to be set during construction // Community dependencies import EventEmitter from 'events' import pSettle from 'p-settle' // Internal dependencies import { processor, commands, namespaces } from './processing' // Useful Constants const SOCKET_INFO_KEYS = ['name', 'type', 'transport'] const TRANSLATE = { n: 'Named Pipe', t: 'TCP', s: 'Socket', c: 'Consumer', m: 'MQTT', w: 'Web Socket' } /** * @class Base * @description * An inter-process inter-machine multi socket communication class.
* It is extended to derive many other UCI classes thus the name "Base"
* The class itself is extended from the {@link https://nodejs.org/api/events.html#events_class_eventemitter nodejs EventEmitter } to support in process communication as well * * @extends EventEmitter * @param {Object} opts hash of options * @param {String} [opts.id='ucit-base'+timestamp] and id for this process/instance used for logging, default: uci-base + timestamp * @param {String} [opts.desc] additional description for humans, can be logged. * @param {String | Boolean} [opts.path=either full path to where socket should be created or 'true' (which uses default path] * @param {string} [opts.sockets] comma delimited strong of sockets to be created each socket of the form [name]#[c or s]>[n,t,m or w] * @param {Boolean} [opts.useRootNS=false] include the "root" of the created base instance as part of the namespace for which packet cmd's try to call corresponding function * @param {Object} [opts.(name of socket)] options for a particular socket created by opts.socket. see UCI guide {@link ?content=guide#sockets|creating sockets} * @property {array} socket collection of all sockets created and used by base instance as per opts.sockets or addSocket method * * @example * import Base from '@uci/base' * // options object example, credated is a socket of each transport using their defaults * // and a tcp consumer socket that connects to a tcp socket on another host 'somehost' at port 8888 * const opts = id:mybase, sockets:'us#s>n,tc#c>t,ts#s>t,mqtts#s>m,webs#s>w', tc:{host:'somehost', port:8888}} * let mybaseprocess = new Base(opts) */ class Base extends EventEmitter { constructor(opts = {}) { super() this.id = opts.id || opts.name || 'uci-base:' + new Date().getTime() log = logger({ name: 'base', id: this.id }) this.desc = opts.desc // additional details for humans this.socket = {} // holds all the various communication sockets this._started = false // flag to know when instance has been initialized this._processors = { _default: processor } this._defaultCmds = commands this._namespaces = namespaces if (opts.useRootNS) { // add root of instance to checking for command functions - not recommended! this._namespaces.s.splice(-1, 0, null) this._namespaces.c.splice(-1, 0, null) } this.bindFuncs = bindFuncs // predefined sockets: // comma delimited list of this form '#>' this.socket = {} if (opts.sockets) { opts.sockets.split(/[,|\s]+/).forEach(socketStr => { let socket = {} socketStr.split(/[>#]+/).map(function(prop, index) { socket[SOCKET_INFO_KEYS[index]] = prop }) this.addSocket( socket.name, socket.type, socket.transport, opts[socket.name] ) }) } } // end constructor /* * -------------------------------- * CLASS METHODS * -------------------------------- */ // PUBLIC METHODS /** * initialize the instance with the set options. This must be called to initialize all sockets and connections * @async * @public * @required */ async init() { let sockets = [] let initSockets = [] for (let name of Object.keys(this.socket)) { initSockets.push(this._initSocket(name)) sockets.push(name) } return pSettle(initSockets).then(res => { log.info( { sockets: res }, 'response from intializing sockets via instance options' ) let err = [] res.forEach((p, index) => { if (p.isRejected) { err.push({ name: sockets[index], err: p.reason }) } }) this._started = true return err // TODO if a websocket server was working then push status // TODO if no mqtt broker then attempt to start one }) } // init /** * addSocket - Add a socket at runtime as opposed to via the sockets option at creation * * @param {type} name Name of socket (usually something short but unique) * @param {string} [type=c] consumer/client 'c' or socket/server 's' * @param {string} [transport=n] transport: (n) named pipe/ unix socket, (t) tcp socket, (m) mqtt subscribe, (w) websocket * @param {object} [options={}] options for that particular type/transport of socket (i.e. path, host, port, etc) * * @returns {string} Description */ async addSocket(name, type = 'c', transport = 'n', options = {}) { log.info( { socketName: name, type: type, tranport: transport, options: options }, `adding socket ${name}` ) options.id = this.id + ':' + name switch (transport) { case 'n': options.path = options.path || true // falls through case 't': this.socket[name] = new Socket[TRANSLATE[type]](options) break case 'm': if (type === 'p') type = 'c' options.connect = options.connect || {} options.connect.connectTimeout = options.connect.connectTimeout || 5000 this.socket[name] = new MQTT(options) break case 'w': if (type === 's') this.socket[name] = new WebSocket(options) else log.warn( { name: name, type: type, transport: transport }, 'Web socket not created Consumer/Client Web Socket not supported' ) } this.socket[name].name = name this.socket[name].type = type this.socket[name].transport = transport this.socket[name]._packetProcess = this._packetProcess.bind(this, name) if (this._started) log.info(await this._initSocket(name)) else return `socket ${name} added` } /** * removeSocket - TODO not available * * @param {string} name name of socket as created * @returns {String | Object } success string or error object */ async removeSocket(name) { //TODO } /** * send - Description * * @param {String} name name of socket for send (must be a consumer otherwise use push for server) * @param {Object} packet * * @returns {type} Description */ async send(name, packet) { if (typeof name !== 'string') { packet = name let sends = [] for (let name of Object.keys(this.socket)) { if (this.socket[name].type === 'c') { sends.push(this.socket[name].send.bind(this.socket[name])) } } if (sends.length === 1) return sends[0](packet) return Promise.all( sends.map(send => { return send(packet) }) ) } else { if (this.socket[name]) return await this.socket[name].send(packet) else return { error: `no consumer socket of name ${name}` } } } async push(packet) { let broadcast = [] for (let name of Object.keys(this.socket)) { if (this.socket[name].type === 's') { broadcast.push(this.socket[name].push.bind(this.socket[name])) } } return Promise.all( broadcast.map(push => { return push(packet) }) ) } async sendTransport(packet, transport) { let sends = [] for (let name of Object.keys(this.socket)) { if (this.socket[name].type === 'c') { if (this.socket[name].transport === transport) { sends.push(this.socket[name].send.bind(this.socket[name])) } } } if (sends.length === 1) return sends[0](packet) return Promise.all( sends.map(send => { return send(packet) }) ) } // async sendMQTT(topic, packet) {return this.socket.mqtt.send(topic, packet)} async sendTCP(packet) { return this.sendTransport(packet, 't') } async sendIPC(packet) { return this.sendTransport(packet, 'n') } getSocket(name) { return this.socket[name] } getPacketByName(name, packets) { if (!packets.length) packets = [packets] let found = {} packets.some((packet, index, packets) => { if (packet._header.sender.name === name) { found = packets[index] return true } }) return found } // TODO confirm Object.assign will be ok as it is not a deep copy amendConsumerProcessing(funcs, trans) { if (trans) { if (!this._defaultCmds.c[trans]) this._defaultCmds.c[trans] = {} Object.assign(this._defaultCmds.c[trans], funcs) } Object.assign(this._defaultCmds.c, funcs) } amendSocketProcessing(funcs, trans) { if (trans) { if (!this._defaultCmds.s[trans]) this._defaultCmds.s[trans] = {} Object.assign(this._defaultCmds.s[trans], funcs) } Object.assign(this._defaultCmds.s, funcs) } // use s: and c: keys TODO need to change this addNamedProcessing(name, funcs, type) { if (type) { if (!this._cmds[name][type]) this._cmds[name][type] = {} Object.assign(this._cmds[name][type], funcs) } else { if (!this._cmds[name]) this._cmds[name] = {} Object.assign(this._cmds[name], funcs) } } // func should take and return a packet // beforeSendHook (func,type,transport){} // TODO // afterReceiveHook(func,type,transport){} // TODO // afterProcessHook(func,type,transport){} // TODO // here you can add namespaced functions for packet commands consumersProcessor(func) { for (let name of Object.keys(this.socket)) { if (this.socket[name].type === 'c') { this.socketNameProcessor(func, name) } } } socketsProcessor(func) { for (let name of Object.keys(this.socket)) { if (this.socket[name].type === 's') { this.socketNamedProcessor(func, name) } } } socketNameProcessor(func, socket_name) { socket_name = socket_name || '_default' this._processors[socket_name]._process = func } addNamespace(space, type, trans) { if (trans) return this._namespaces[type + trans].unshift(space) else return this._namespaces[type].unshift(space) } /* * * Private Methods * */ async _initSocket(name) { let socket = this.socket[name] let init = {} if (this.socket[name].type === 's' && this.socket[name].transport !== 'm') { init = socket.create } else { init = socket.connect } log.info(`initializing socket ${name}, ${socket.type}, ${socket.transport}`) if (this._started) return `socket ${name} added and initialzed, ${await init()}` else return init() } _transport(name) { return this.socket[name].transport } //getter for socket transport _type(name) { return this.socket[name].type } //getter for socket type _getTransportNamespaces(socket) { return this._namespace[this._type(socket) + this._transport(socket)] } _getCmdFuncNamespace(cmd, namespaces) { let cmd_func = null namespaces.some(namespace => { namespace = namespace ? namespace + '.' + cmd : cmd cmd_func = this._getCmdFunc(namespace) if (cmd_func) return true }) return cmd_func } // takes command and returns corresponding function in a hash _getCmdFunc(cmd, obj) { // console.log('obj',obj) if (typeof cmd === 'string') { if (!obj) obj = this cmd = cmd.split(/[.:/]+/) // console.log('===================',cmd) } var prop = cmd.shift() if (cmd.length === 0) return obj[prop] if (!obj[prop]) return null // console.log(cmd.length,cmd,prop, obj[prop]) return this._getCmdFunc(cmd, obj[prop]) } async _callCmdFunc(packet, socket) { let cmd_func = this._getCmdFuncNamespace( packet.cmd, this._namespaces[this._type(socket) + this._transport(socket)] ) if (cmd_func) return await cmd_func.bind(this)(packet) cmd_func = this._getCmdFuncNamespace( packet.cmd, this._namespaces[this._type(socket)] ) if (cmd_func) return await cmd_func.bind(this)(packet) return 'failed' } /* **********default packet processor for all sockets */ async _packetProcess(socket_name, packet) { // console.log(socket_name,packet) let processor = packet._processor || this._processors[socket_name] || '_default' return await this._processors[processor].bind(this)( packet, socket_name, this._processors[processor] ) } } // end Base Class export default Base