// Direct External Dependencies // none // UCI dependencies // UCI communication transport communication modules // TODO change to dynamic import so loads only if that socket type is requestd import Socket from '@uci/socket' // tcp or named pipe import MQTT from '@uci/mqtt' // requires broker import WebSocket from '@uci/websocket' // server only - client is for web browser only // UCI helpers import { bindFuncs } from '@uci-utils/bind-funcs' // UCI logger import logger from '@uci-utils/logger' let log = {} // declare module wide log to be set during construction // Community dependencies import to from 'await-to-js' import EventEmitter from 'events' // import pSettle from 'p-settle' // import pReflect from 'p-reflect' // Internal dependencies import { processor, defaultCmds, namespaces } from './processing' // Useful Constants const SOCKET_INFO_KEYS = ['name', 'type', 'transport'] const TRANSLATE = { n: 'Named Pipe', t: 'TCP', s: 'Socket', c: 'Consumer', m: 'MQTT', w: 'Web Socket' } /** * @class Base * @description * An inter-process inter-machine multi socket communication class.
* It is extended to derive many other UCI classes thus the name "Base"
* The class itself is extended from the {@link https://nodejs.org/api/events.html#events_class_eventemitter nodejs EventEmitter } to support in process communication as well * * @extends EventEmitter * @param {Object} opts hash of options * @param {String} [opts.id='ucit-base'+timestamp] and id for this process/instance used for logging, default: uci-base + timestamp * @param {String} [opts.desc] additional description for humans, can be logged. * @param {String | Boolean} [opts.path=either full path to where socket should be created or 'true' (which uses default path] * @param {string} [opts.sockets] comma delimited strong of sockets to be created each socket of the form [name]#[c or s]>[n,t,m or w] * @param {Boolean} [opts.useRootNS=false] include the "root" of the created base instance as part of the namespace for which packet cmd's try to call corresponding function * @param {Object} [opts.(name of socket)] options for a particular socket created by opts.socket. see UCI guide {@link ?content=guide#sockets|creating sockets} * @property {array} socket collection of all sockets created and used by base instance as per opts.sockets or addSocket method * * @example * import Base from '@uci/base' * // options object example, credated is a socket of each transport using their defaults * // and a tcp consumer socket that connects to a tcp socket on another host 'somehost' at port 8888 * const opts = id:mybase, sockets:'us#s>n,tc#c>t,ts#s>t,mqtts#s>m,webs#s>w', tc:{host:'somehost', port:8888}} * let mybaseprocess = new Base(opts) */ class Base extends EventEmitter { constructor(opts = {}) { super() this.id = opts.id || opts.name || 'uci-base:' + new Date().getTime() log = logger({ name: 'base', id: this.id }) this.desc = opts.desc // additional details for humans this._socket = {} // holds all the various communication sockets this._started = false // flag to know when instance has been initialized this._processors = { _default: processor } // _c and _s are the default namespaces this._namespaces = namespaces this._c = defaultCmds.c this._s = defaultCmds.s if (opts.useRootNS) { // add root of instance to checking for command functions - not recommended! this._namespaces.s.splice(-1, 0, null) this._namespaces.c.splice(-1, 0, null) } // method that will bind a whole object tree of functions this.bindFuncs = bindFuncs // predefined sockets: // comma delimited list of this form '#>' this._socket = {} if (opts.sockets) { opts.sockets.split(/[,|\s]+/).forEach(socketStr => { let socket = {} socketStr.split(/[>#]+/).map(function(prop, index) { socket[SOCKET_INFO_KEYS[index]] = prop }) this.addSocket( socket.name, socket.type, socket.transport, opts[socket.name] ) }) } } // end constructor /* * -------------------------------- * CLASS METHODS * -------------------------------- */ // PUBLIC METHODS /** * initialize the instance with the set options. This must be called to initialize all sockets and connections * @async * @public * @required */ async init() { let results = {} let errors = {} const pReflect = async socket => { try { const value = await socket.init() results[socket.name] = value } catch (error) { this.emit('error',{msg:'socket init error',error:error})// emit an error here, remove socket let res = await this.removeSocket(socket.name) errors[socket.name]={error:error, remove:res} } } let sockets = [] for (let name of Object.keys(this._socket)) { sockets.push(this._initSocket(name)) } await Promise.all(sockets.map(pReflect)) if(Object.keys(errors).length===0) errors=false this._started = true return {results:results, errors:errors} } /** * addSocket - Add a socket at runtime as opposed to via the sockets option at creation * * @param {type} name Name of socket (usually something short but unique) * @param {string} [type=c] consumer/client 'c' or socket/server 's' * @param {string} [transport=n] transport: (n) named pipe/ unix socket, (t) tcp socket, (m) mqtt subscribe, (w) websocket * @param {object} [options={}] options for that particular type/transport of socket (i.e. path, host, port, etc) * * @returns {string} Description */ async addSocket(name, type = 'c', transport = 'n', options = {}) { log.debug({ socketName: name, type: type, tranport: transport, options: options, method:'addSocket', line:147, msg:`adding socket ${name}`}) options.id = this.id + ':' + name switch (transport) { case 'n': options.path = options.path || true // falls through case 't': this._socket[name] = new Socket[TRANSLATE[type]](options) break case 'm': if (type === 'p') type = 'c' options.connect = options.connect || {} if (options.host) options.connect.host = options.host if (options.port) options.connect.port = options.port options.connect.connectTimeout = options.connect.connectTimeout || 5000 this._socket[name] = new MQTT(options) break case 'w': if (type === 's') this._socket[name] = new WebSocket(options) else log.warn({ name: name, type: type, transport: transport, method:'addSocket', line:167, msg:'Web socket not created Consumer/Client Web Socket not supported'}) } this._socket[name].name = name this._socket[name].type = type this._socket[name].transport = transport this._socket[name]._packetProcess = this._packetProcess.bind(this, name) if (type==='c') { // bubble up events from client sockets this._socket[name].on('status', ev => { ev.socketName=name this.emit('status', ev) }) this._socket[name].on('pushed', packet => { packet._header.socketName=name this.emit('pushed', packet) }) } // do this as .then promise then addSocket doesn't need to be async before init if (this._started) return await this._initSocket(name) else return `socket ${name} added` } /** * removeSocket - TODO not available * * @param {string} name name of socket as created * @returns {String | Object } success string or error object */ async removeSocket(name) { // NOTE: uci consumers have .end renamed as .close to match socket method for convenience let closeError let [err] = await to(this._socket[name].close()) if (err.code !== 'ERR_SERVER_NOT_RUNNING') { closeError = {socket:this._socket[name].name, error:err, msg:'socket/consumer closed with errors, but removed'} } this.emit('warn', {msg:`socket ${name} has been removed`, socket:this._socket[name].opts}) delete this._socket[name] return closeError ? closeError : 'success' } getSocket(name) { if (name) return this._socket[name] else return this._socket } /** * send - Description * * @param {String} name name of socket for send (must be a consumer otherwise use push for server) * @param {Object} packet * * @returns {type} Description */ async send(name, packet) { if (typeof name !== 'string') { packet = name let sends = [] for (let name of Object.keys(this._socket)) { if (this._socket[name].type === 'c') { let hookedPacket = {} hookedPacket = this._socket[name].beforeSend ? await this._socket[name].beforeSend.call(this,Object.assign({},packet)) : packet log.debug({msg:'after hook, sending packet', name:name, packet:hookedPacket, method:'send', line:235}) sends.push(this._socket[name].send.bind(this._socket[name],hookedPacket)) } } if (sends.length === 1) return await sends[0]() return Promise.all( sends.map(send => { return send() }) ) } else { if (this._socket[name]) { if (this._socket[name].beforeSend) packet = await this._socket[name].beforeSend.call(this,packet) log.debug({msg:'single socket hooked packet to send', name:name, packet:packet, method:'send', line:230}) return await this._socket[name].send(packet) } else return { error: `no consumer socket of name ${name}` } } } async push(packet) { // TODO set like send to accept a name let broadcast = [] for (let name of Object.keys(this._socket)) { if (this._socket[name].type === 's') { let hookedPacket = {} hookedPacket = this._socket[name].beforeSend ? await this._socket[name].beforeSend.call(this,Object.assign({},packet),true) : packet log.debug({msg:'hooked packet to push', name:name, packet:hookedPacket, method:'push', line:243}) broadcast.push(this._socket[name].push.bind(this._socket[name],hookedPacket)) } } return Promise.all( broadcast.map(push => { return push() }) ) } // TODO make push version of all this sends // TODO accept alt transport string i.e. t or TCP async sendTransport(packet, transport) { let sends = [] for (let name of Object.keys(this._socket)) { if (this._socket[name].type === 'c') { if (this._socket[name].transport === transport) { sends.push(this._socket[name].send.bind(this._socket[name])) } } } if (sends.length === 1) return sends[0](packet) return Promise.all( sends.map(send => { return send(packet) }) ) } // async sendMQTT(topic, packet) {return this._socket.mqtt.send(topic, packet)} async sendTCP(packet) { return this.sendTransport(packet, 't') } // TODO change this to PIPE async sendIPC(packet) { return this.sendTransport(packet, 'n') } // TODO add sendMQTT, sendWS socketsListen(event,fn) { this._eventListen('s',event,fn) } consumersListen(event,fn) { this._eventListen('c',event,fn) } getPacketByName(name, packets) { if (!packets.length) packets = [packets] let found = {} packets.some((packet, index, packets) => { if (packet._header.sender.name === name) { found = packets[index] return true } }) return found } // add set of functions to class prop/space and then register with this addNamespace(space, type, trans) { if (type !=='c' || type !=='s') { trans = type type = 's' } trans = this._validateTransport(trans) if (trans) return this._namespaces[type + trans].unshift(space) else return this._namespaces[type].unshift(space) } // TODO confirm Object.assign will be ok as it is not a deep copy // allow a single or arrary of single functions amendCommands(funcs, trans, type) { if (!trans && !type) type = 's' if (trans ==='c' || trans ==='s') { type = trans trans = '' } trans = this._validateTransport(trans) if (!this['_'+type+trans]) this['_'+type+trans] = {} Object.assign(this['_'+type+trans], funcs) // trans is type here log.debug({msg:'amended namespace', default_key:'_'+type+trans, functions:this['_'+type+trans]}) } amendConsumerCommands(funcs, trans) { this.amendCommands(funcs,trans,'c') } amendSocketCommands(funcs, trans) { this.amendCommands(funcs,trans) } // func should take and return a packet. if type beforeSendHook(func,opts) { this._packetHook('beforeSend', func,opts) } beforeProcessHook(func,opts) { this._packetHook('beforeProcess', func,opts) } afterProcessHook(func,opts) { this._packetHook('afterProcess', func,opts) } // A Big Hammer - use only if necessary - default with hooks should suffice // these three will pre-empt default processor to be called in ._packetProcess // add and override default processor for ALL consumers, i.e. packets returning form a send or arrived from a push consumersProcessor(func) { for (let name of Object.keys(this._socket)) { if (this._socket[name].type === 'c') { this.altProcessor(func, name) } } } // add and override default processor for ALL sockets, i.e. packets coming in from consumer send socketsProcessor(func) { for (let name of Object.keys(this._socket)) { if (this._socket[name].type === 's') { this.altProcessor(func, name) } } } // add and override a processor for a particular socket/consumer to list of processors // if no socket name given it will replace the default processor in _processors from processing.js altProcessor(func, socket_name) { socket_name = socket_name || '_default' this._processors[socket_name] = func } //====================================================== /* * * Private Methods * */ _packetHook(hook,func,opts) { log.debug({msg:'hooking a socket(s)', method:'_packetHook', line:334, hook:hook, function:func, options:opts}) let {name,type,trans,all} = opts if (name) this._socket[name][hook] = func else { log.debug({msg:'sockets available to hook', method:'_packetHook', line:338, sockets: Object.keys(this._socket)}) for (let name of Object.keys(this._socket)) { if (this._socket[name].type === type) this._socket[name][hook] = func if (this._socket[name].transport === trans) this._socket[name][hook] = func if (all) this._socket[name][hook] = func if (this._socket[name][hook]) log.debug({msg:'hooked socket', method:'_packetHook', line:343, name:name, type:this._socket[name].type, trans:this._socket[name].transport}) } } } /* **********default packet processor for all sockets * this can be hooked or replaced all together */ async _packetProcess(socket_name, packet) { log.debug({ socket:socket_name, packet:packet, method:'_packetProcess', line:393, msg:'processing incoming packet'}) let header = packet._header ? packet._header : {} // retain header let err, res if (this._socket[socket_name].beforeProcess) { [err,res] = await to(this._socket[socket_name].beforeProcess.call(this,packet)) if (err) { // hook has forced an abort to processing console.log('before error', packet) packet.error = err return packet } packet = res } // if (this._socket[socket_name].beforeProcess) packet = await this._socket[socket_name].beforeProcess.call(this,packet) // the processor can be set via the incoming packet // otherwise if one is set on the socket or the default found in processing.js // TODO Try each "available" packet processor in some order if fails try next one before trying the default let processor = packet._processor || this._processors[socket_name] ? socket_name : '_default' res = (await this._processors[processor].call(this,packet,socket_name))|| packet // processor didn't return a packet then return the packet sent log.debug({ socket:socket_name, response:res, msg:'processed packet ready for hook'}) if (this._socket[socket_name].afterProcess) res = await this._socket[socket_name].afterProcess.call(this,res) log.debug({ socket:socket_name, response:res, msg:'packet after hook complete ready for return'}) res._header = Object.assign(header,res._header) // re-apply header in case hooks or processor mangled or removed it return res } _initSocket(name) { let socket = this._socket[name] let init = {} if (this._socket[name].type === 's' && this._socket[name].transport !== 'm') { init = socket.create } else { init = socket.connect } log.info({msg:`initializing socket ${name}, ${socket.type}, ${socket.transport}`}) if (this._started) { return init().then(function(res) { return `socket ${name} added and initialzed, ${res}` }) .catch(function(err) { this.emit('error', {msg:'failed initialization', error:err, socket:socket, code:'SOCKET_INIT'}) return {msg:`socket ${name} failed initialization`, error:err} }.bind(this) ) } else return {name:name, init:init} } // all sockets are emitters. Adds a listener to all sockets of a type with given event. // now sockets can emit locally processed events _eventListen(type,event,fn) { for (let name of Object.keys(this._socket)) { if (this._socket[name].type === type) { if (fn==='stop') this._socket[name].removeAllListeners(event) else { log.debug({socket:name, type:type, event:event, msg:'adding listener to socket'}) this._socket[name].on(event, fn) } } } } _validateTransport(trans, type='s') { const valids = { w:'w', web:'w', n:'n', named:'n', unix:'n', pipe:'n', t:'t', tcp:'t', net:'t', network:'t', m:'m', mqtt:'m', } trans = valids[trans] || '' if (type !== 'c' && trans ==='w') trans = '' return trans } _transport(name) { return this._socket[name].transport } //getter for socket transport _type(name) { return this._socket[name].type } //getter for socket type _getTransportNamespaces(socket) { return this._namespace[this._type(socket) + this._transport(socket)] } _getCmdFuncNamespace(cmd, namespaces) { let cmd_func = null namespaces.some(namespace => { namespace = namespace ? namespace + '.' + cmd : cmd cmd_func = this._getCmdFunc(namespace) if (cmd_func) return true }) return cmd_func } // takes command and returns corresponding function in a hash, recurisve walk _getCmdFunc(cmd, obj) { if (typeof cmd === 'string') { if (!obj) obj = this cmd = cmd.split(/[.:/]+/) } var prop = cmd.shift() if (cmd.length === 0) return obj[prop] if (!obj[prop]) return null log.debug({length:cmd.length,cmd:cmd, prop:prop, objprop:obj[prop], method:'_getCmdFunc', msg:'command to corresponding function in a hash'}) return this._getCmdFunc(cmd, obj[prop]) } // primary function to find a function to call based on packet cmd async _callCmdFunc(packet, socket) { let cmd_func = this._getCmdFuncNamespace( packet.cmd, this._namespaces[this._type(socket) + this._transport(socket)] ) if (cmd_func) return await cmd_func.bind(this)(packet) // todo try .call cmd_func = this._getCmdFuncNamespace( packet.cmd, this._namespaces[this._type(socket)] ) if (cmd_func) return await cmd_func.bind(this)(packet) return 'failed' } } // end Base Class export default Base