/** * UCI Base Module * @module UCI-Base * @description This is the primary module to use in the UCI ecosystem */ // TODO add automated duplex (for consumer will create corresponding socket and send connection info to remote socket) // --------------- UCI dependencies ------------------- // UCI communication transport communication modules // TODO change to dynamic import so loads only if that socket type is requestd import Socket from '@uci/socket' // tcp or named pipe // TDDO import these socket libraries dynamically from per peerDependencies import MQTT from '@uci/mqtt' // requires broker import WebSocket from '@uci/websocket' // server only - client is for web browser only // UCI helpers import { Ready, changed, map } from '@uci-utils/ready' import { bindFuncs } from '@uci-utils/bind-funcs' // UCI logger import logger from '@uci-utils/logger' let log = {} // declare module wide log to be set during construction // ------------------------------------------------- // Community dependencies import to from 'await-to-js' import isPlainObject from 'is-plain-object' import loadYaml from 'load-yaml-file' import { merge } from 'merge-anything' // Nodejs dependencies import EventEmitter from 'events' // Internal dependencies import { cmdProcessor, defaultCmds, namespaces } from './processing' // Constants // const SOCKET_INFO_KEYS = ['name', 'type', 'transport'] const TRANSLATE = { n: 'Named Pipe', t: 'TCP', s: 'Socket', c: 'Consumer', m: 'MQTT', w: 'Web Socket' } /** * @class Base * @description * An inter-process inter-machine multi socket communication class.
* It is extended to derive many other UCI classes thus the name "Base"
* The class itself is extended from the {@link https://nodejs.org/api/events.html#events_class_eventemitter nodejs EventEmitter } to support in process communication as well * * @extends EventEmitter * @param options {object} - object hash of options * @param [options.id=ucit-base+] {string} and id for this process/instance used for logging * @param {String} [options.desc] additional description for humans, can be logged. * @param {String | Boolean} [opts.path=either full path to where socket should be created or 'true' (which uses default path] * @param {string} [opts.sockets] comma delimited strong of sockets to be created each socket of the form [name]#[c or s]>[n,t,m or w] * @param [opts.useRootNS=false] {boolean} - include the "root" of the created base instance as part of the namespace for a packet cmd - use not recommended * @param {Object} [opts.(name of socket)] options for a particular socket created by opts.socket. see UCI guide {@link ?content=guide#sockets|creating sockets} * @property {array} socket collection of all sockets created and used by base instance as per opts.sockets or addSocket method * * @example * import Base from '@uci/base' * // options object example, credated is a socket of each transport using their defaults * // and a tcp consumer socket that connects to a tcp socket on another host 'somehost' at port 8888 * const opts = id:mybase, sockets:'us#s>n,tc#c>t,ts#s>t,mqtts#s>m,webs#s>w', tc:{host:'somehost', port:8888}} * let mybaseprocess = new Base(opts) */ class Base extends EventEmitter { constructor(opts={}) { if (typeof opts === 'string') opts=loadYaml.sync(opts) super() this.name = opts.name || opts.appName || 'a base class instance' this.id = opts.id || 'uci-base:' + new Date().getTime() log = logger({ name: 'base', id: this.id }) this.desc = opts.desc // additional details for humans this._socket = {} // holds all the various communication sockets // these two if passed will get applied to all consumer sockets, otherwise socket defaults will be used /** @type {string} - timeout for connecting to a consumer socket */ this.initTimeout = opts.initTimeout this.retryWait = opts.retryWait this.defaultReturnCmd = opts.defaultReturnCmd this._cmdProcessors = { _default: cmdProcessor } this.ready = new Ready({emitter: this}) // ready packet to be sent when process is "ready" this._readyPacket = {cmd:'ready', event:`${this.name}:process`, name:this.name, id:this.id, ready:false} // _c and _s are the default namespaces this._namespaces =Object.assign({},namespaces) this._c = Object.assign({},defaultCmds.c) this._s = Object.assign({},defaultCmds.s) // make available a method that will bind a whole object tree of functions // Note: functions called via a command namespace are called by base connext by default // if called directlly/locally they should be bound to base context if desired this.bindFuncs = bindFuncs if (opts.useRootNS) { // add root of instance to checking for command functions - not recommended! this._namespaces.s.splice(-1, 0, null) this._namespaces.c.splice(-1, 0, null) } // this.bindFuncs = bindFuncs // make available a method that will bind a whole object tree of functions this._socket = {} // where all sockets are stored // at creation defined sockets: if (opts.port) this.registerSocket(`${opts.name||'base'}${opts.port&&opts.path ? ':t':''}`,'s','t',{port:opts.port}) if (opts.path) this.registerSocket(`${opts.name||'base'}${opts.port&&opts.path ? ':n':''}`,'s','n',{path: opts.path}) if (opts.sockets) { let sockets = opts.sockets sockets = Array.isArray(sockets) ? sockets:[sockets] sockets.forEach(socket => this.registerSocket(socket)) } } // end constructor /* * -------------------------------- * CLASS METHODS * -------------------------------- */ // PUBLIC METHODS /** * initialize the instance with the set options. * This must be called to initialize all sockets and connections * @async * @public * @required * @param {array} sockets string of one or array array names to initialize, if none, then all current added sockets will be initialized * */ async init(sockets) { // TODO ready needs to allow multiple all subscribers that get rebuilt on add/remove const res = await this.socketsInit(sockets) // update ready packet and push/send that changed packet this.ready.all.subscribe(async ready => { this._readyPacket.ready= ready delete (this._readyPacket._header) if (!ready) { // make a list of the failures to send // await new Promise(res=>setTimeout(()=>res(),1000)) this._readyPacket.failures = this.ready.failed } else delete this._readyPacket.failures let packet = Object.assign({},this._readyPacket) this.emit('log',{level:'debug', packet:packet, msg:`${this.name} has an updated ready state: broadcasting: event>state = ${this._readyPacket.event}>${this._readyPacket.ready}`}) // console.log('ready packet to broadcast',packet) // set the connection packet for each socket this.getSocketsFilter({type:'s'}) .forEach(socket=>this.getSocket(socket).conPackets[0]=packet) // announce changed ready state setTimeout(async () => { this.send(packet) // to any socket that this instance is connected to this.push(packet) // to any remote consumer connected to an instance socket },100) // delay 100ms, fixes issue so won't be sent during a disconnect which causes socket write error }) return res } async socketsInit(sockets) { let results = {} let errors = {} // single socket intialize mapper const initialize = async socket => { return new Promise(async function(resolve) { try { const value = await socket.init() this.emit('log',{level:'info', socketName:socket.name, msg:'socket successfully initialized', message:value}) resolve(value) } catch (error) { this.emit('log',{level:'fatal', socketName:socket.name, msg:'socket init error',error:error})// emit an error here, remove socket // let res = await this.removeSocket(socket.name) errors[socket.name]={error:error} resolve(error) } }.bind(this)) } let inits = [] if (!sockets) { sockets = Object.keys(this._socket).filter(name => { return !this._socket[name].active // only intialize (connect) inactive sockets }) } if (typeof sockets ==='string') sockets = [sockets] sockets.forEach(name => { if (this._socket[name]) { inits.push({name:name, init:this.getSocketInit(name)}) } else log.warn({msg:`no socket registered by name of ${name} to initialize`}) }) let [err] = await to(Promise.all(inits.map(initialize))) if (err) { this.emit('log',{level:'fatal', msg:'initialize of socket errors was NOT caught --- bad bad',error:err}) return {errors:[err]} } if (Object.keys(errors).length===0) errors=false return {results:results, errors:errors} } // support old name for now async addSocket(name,type,transport,options) { return this.registerSocket(name,type,transport,options) } /** * addSocket - Add a socket at runtime as opposed to via the sockets option at creation * This is not async and will NOT initialize the socket, that must be done with a call to init or socketInit * @param {type} name Name of socket (usually something short but unique) * @param {string} [type=c] consumer/client 'c' or socket/server 's' * @param {string} [transport=n] transport: (n) named pipe/ unix socket, (t) tcp socket, (m) mqtt subscribe, (w) websocket * @param {object} [options={}] options for that particular type/transport of socket (i.e. path, host, port, etc) * * @returns {function} if called before base initialzation it can be ignored as all added sockets will be initialized. After through it be called to initialize that socket */ registerSocket(name, type = 'c', transport = 'n', options = {}) { if (isPlainObject(name)) ({name, type = 'c', transport = 'n', options = {}} = name) if (typeof name !=='string') return null transport = this._validateTransport(transport) log.debug({ socketName: name, type: type, tranport: transport, options: options, method:'addSocket', line:198, msg:`adding socket ${name}`}) options.id = options.id || name options.name = options.name || name // TODO add a 'd' type for duplex which creates an 's' first and waits on connect to make a 'c' if (type==='c') options = Object.assign({ initTimeout:this.initTimeout, retryWait:this.retryWait },options) // outbound if (type==='s') { let conPackets = [] // [this._readyPacket] conPackets = options.conPackets ? conPackets.concat(options.conPackets) : conPackets conPackets = options.conPacket ? conPackets.push(options.conPacket) : conPackets options = Object.assign({ defaultReturnCmd:this.defaultReturnCmd, conPackets: conPackets },options) // inbound } // TODO get rid of hard coded transports and use registered transports (t and n being default) switch (transport) { case 'n': options.path = options.path || true // falls through case 't': this._socket[name] = new Socket[TRANSLATE[type]](options) break case 'm': if (type === 'p') type = 'c' options.client = type==='c' ? true : false options.connect = options.connect || {} if (options.host) options.connect.host = options.host if (options.port) options.connect.port = options.port options.connect.connectTimeout = options.connect.connectTimeout || 5000 this._socket[name] = new MQTT(options) break case 'w': if (type === 's') this._socket[name] = new WebSocket(options) } if (this._socket[name]) { // in case of invalid transport this._socket[name].name = name this._socket[name].type = type this._socket[name].transport = transport this._socket[name]._packetProcess = this._packetProcess.bind(this, name) // bubble up events from inidivual sockets to base instance, // connection:consumer is a socket emitting when a consumer is connecting // connection:socket is a consumer emiting when connecting to a socket const EVENTS=['log','socket','connection','connection:consumer', 'connection:socket'] // that should emit up from each socket to instance EVENTS.forEach(event => { this._socket[name].on(event, obj => { if (Object.prototype.toString.call(obj) !== '[object Object]') { let data=obj obj = {} obj.data = data } obj.socketName = name this.emit(event,obj) }) }) if (type==='c') { // when consumer has sucessfully connected to a socket this._socket[name].obsName = `${name}:${options.path ? options.path : `${options.host}:${options.port}`}` this.ready.addObserver(this._socket[name].obsName,this._socket[name],{event:'connection:socket',condition:ev=>ev.state==='connected'}) // set up listner for any pushed packets and emit locally this._socket[name].on('pushed', packet => { packet._header.socketName=name this.emit('pushed', packet) }) } if (type==='s') { // when socket is listnening this.ready.addObserver(`${name}:socket`,this._socket[name],{ event:'socket', condition: ev => (ev || {}).state ==='listening' }) // initially set conPackets, ready packets is ALWAYS the first this._socket[name].conPackets.unshift(this._readyPacket) if (options.duplex) this.consumerConnected(this._socket[name],{consumer:options.duplex, add:true}) } return this._socket[name] // return handle to newly registered socket // return this.getSocketInit(name) // returns the init function (e.g. connect or create) for the socket } } /** * removeSocket * * @param {string} name name of socket as created * @returns {String | Object } success string or error object */ async removeSocket(name) { // NOTE: uci consumers have .end renamed as .close to match socket method for convenience if (typeof name !=='string') return 'no socket name passed, nothing to remove' const socket = this.getSocket(name) if (!socket) return 'no socket by that name, nothing to remove' let closeError if (typeof socket.close !== 'function') return 'bad socket no close function, nothing to remove' let [err] = await to(socket.close()) if(err) if (err.code !== 'ERR_SERVER_NOT_RUNNING') { closeError = {socket:this._socket[name].name, error:err, msg:'socket/consumer closed with errors, but removed'} } this.emit('log', {level:'warn', msg:`socket ${name} has been removed`, socket:this._socket[name].opts}) socket.removeAllListeners() this.ready.removeObserver(socket.type==='c' ? this._socket[name].obsName : `${name}:socket`) delete this._socket[name] return closeError ? closeError : 'success' } getSocket(name) { if (name) return this._socket[name] || null else return this._socket } // returns array of names of sockets that pass filter getSocketsFilter({type, trans, active}={}) { if (trans) trans = this._validateTransport(trans) let filtered = [] Object.keys(this._socket).forEach(name => { if ((type==null || this._socket[name].type === type) && (trans==null || this._socket[name].transport === trans) && (active==null || this._socket[name].active===active)) filtered.push(name) }) return filtered } getConsumers(filter={}) { filter.type='c' return this.getSocketsFilter(filter) } getSocketInit(name) { let socket = this._socket[name] if(!socket) { log.warn({msg:`can't fetch create/connect function, no socket registered by name of ${name}`}) return null } if (this._socket[name].type === 's' && this._socket[name].transport !== 'm') { return socket.create } else { return socket.connect } } /** * send - Description * * @param {String} name name of socket for send (must be a consumer otherwise use push for server) * @param {Object} packet * * @returns {type} Description */ async send(name, packet) { let names = [] if (typeof name !== 'string') { packet = name names = this.getConsumers() } else { const con = this.getSocket(name) if (!con) return `no consumer ${name} for sending` names = [name] } if (!packet || !Object.keys(packet).length) return 'no packet to send - aborted' let sends = [] if (!names.length) return 'no consumers available for send, aborting' for (let name of names) { const consumer=this.getSocket(name) let hookedPacket = {} hookedPacket = consumer.beforeSend ? await consumer.beforeSend.call(this,Object.assign({},packet)) : packet log.debug({msg:'after hook, sending packet', name:consumer.name, packet:hookedPacket, method:'send'}) sends.push(consumer.send.bind(consumer,hookedPacket)) } if (sends.length === 1) return await sends[0]() return Promise.all( sends.map(send => { return send() }) ) } async push(packet,opts={}) { if (!packet || !Object.keys(packet).length) return Promise.resolve('no packet to push - aborted') let sockets = this.getSocketsFilter({type:'s'}) if (!sockets.length) return Promise.resolve('no sockets on which to push') opts.sockets = opts.sockets ? opts.sockets : (opts.socket ? [opts.socket] : []) if (opts.sockets.length) sockets = sockets.filter(name=>opts.sockets.includes(name)) sockets = sockets .map(name=>this.getSocket(name)) .filter(sock=> (opts.transport && opts.transport !=='all') ? sock.transport=== this._validateTransport(opts.transport) : true) // console.log(sockets.length, 'sockets for push', sockets.map(socket=>socket.name)) if (!sockets.length) return Promise.resolve('no sockets on which to push') let broadcast = [] // TODO use map and reflect for (let socket of sockets) { let hookedPacket = {} hookedPacket = socket.beforeSend ? await socket.beforeSend.call(this,Object.assign({},packet),true) : packet broadcast.push(socket.push.bind(socket,hookedPacket,opts)) } return Promise.all( broadcast.map(push => { return push() }) ) } // TODO accept alt transport string i.e. t or TCP async sendTransport(packet, transport) { let sends = [] for (let name of Object.keys(this._socket)) { if (this._socket[name].type === 'c') { if (this._socket[name].transport === transport) { sends.push(this._socket[name].send.bind(this._socket[name])) } } } if (sends.length === 1) return sends[0](packet) return Promise.all( sends.map(send => { return send(packet) }) ) } // async sendMQTT(topic, packet) {return this._socket.mqtt.send(topic, packet)} async sendTCP(packet) { return this.sendTransport(packet, 't') } // TODO change this to PIPE async sendIPC(packet) { return this.sendTransport(packet, 'n') } // TODO add sendMQTT, sendWS socketsListen(event,fn) { this._eventListen('s',event,fn) } consumersListen(event,fn) { this._eventListen('c',event,fn) } getPacketByName(name, packets) { if (!packets.length) packets = [packets] let found = {} packets.some((packet, index, packets) => { if (packet._header.sender.name === name) { found = packets[index] return true } }) return found } // add set of functions to class prop/space and then register with this addNamespace(space, type, trans) { if (type !=='c' && type !=='s') { trans = type type = 's' } trans = this._validateTransport(trans) if (trans) return this._namespaces[type + trans].unshift(space) else return this._namespaces[type].unshift(space) } // object of functions, key is cmd name amendCommands(funcs, trans, type) { if (!trans && !type) type = 's' if (trans ==='c' || trans ==='s') { type = trans trans = '' } trans = this._validateTransport(trans) if (!this['_'+type+trans]) this['_'+type+trans] = {} Object.assign(this['_'+type+trans], funcs) // trans is type here log.debug({msg:'amended namespace', id:this.id, default_key:'_'+type+trans, functions:this['_'+type+trans]}) } amendConsumerCommands(funcs, trans) { this.amendCommands(funcs,trans,'c') } amendSocketCommands(funcs, trans) { this.amendCommands(funcs,trans) } // func should take and return a packet. if type beforeSendHook(func,opts) { this._packetHook('beforeSend', func,opts) } beforeProcessHook(func,opts) { this._packetHook('beforeProcess', func,opts) } afterProcessHook(func,opts) { this._packetHook('afterProcess', func,opts) } // A Big Hammer - use only if necessary - default with hooks should suffice // these three will pre-empt default processor to be called in ._packetProcess // add and override default processor for ALL consumers, i.e. packets returning form a send or arrived from a push consumersProcessor(func) { for (let name of Object.keys(this._socket)) { if (this._socket[name].type === 'c') { this.altProcessor(func, name) } } } // add and override default processor for ALL sockets, i.e. packets coming in from consumer send socketsProcessor(func) { for (let name of Object.keys(this._socket)) { if (this._socket[name].type === 's') { this.altProcessor(func, name) } } } // add and override a processor for a particular socket/consumer to list of processors // if no socket name given it will replace the default processor in _processors from processing.js altProcessor(func, socket_name) { socket_name = socket_name || '_default' this._cmdProcessors[socket_name] = func } // a call to this method will (make or add) return and or subscribe a ready observer for incoming consumer connections consumerConnected (socket,opts={}) { let { subscribe, consumer, name, add} = opts const conditionHandler = async ev => { if ((ev||{}).state ==='connected'){ let data = (ev.data ||{}) if (consumer) { // specific consumer check if (data.name === consumer || [ev.name, ev.id, data.name, data.id].some(name => (name||'').includes(consumer)) ) return true } else return true } return false } if (typeof socket ==='string') socket = this.getSocket(socket) name = name || consumer add = add && consumer const options = {event:'connection:consumer',condition:conditionHandler} let oname = `${name}:consumer>${socket.name}:socket` const obs = add ? this.ready.addObserver(oname,socket,options) : this.ready.makeObserver(socket,options) if (typeof subscribe ==='function') return obs.subscribe(subscribe) return obs } // end consumerConnected //=============PRIVATE METHODS ========================================= /* * * Assigns a Hook Function to a Socket, Type or Transport * */ // options allow applying hook function to specific socket or type or transport, default is all type 's' sockets _packetHook(hook,func,opts) { log.debug({msg:'hooking a socket(s)', method:'_packetHook', line:334, hook:hook, function:func, options:opts}) let {name,type,trans,all} = opts if (opts==null) type = 's' // default is all type 's' sockets if (name) this._socket[name][hook] = func else { log.debug({msg:'sockets available to hook', method:'_packetHook', line:338, sockets: Object.keys(this._socket)}) for (let name of Object.keys(this._socket)) { if (this._socket[name].type === type) this._socket[name][hook] = func if (this._socket[name].transport === trans) this._socket[name][hook] = func if (all) this._socket[name][hook] = func if (this._socket[name][hook]) log.debug({msg:'hooked socket', method:'_packetHook', line:343, name:name, type:this._socket[name].type, trans:this._socket[name].transport}) } } } /* ********** main packet processor for all sockets * supports per socket before and after hook processors * supports additonal registered processors called via packet or socket name, with default processor, */ async _packetProcess(socket_name, packet) { if (!packet || !Object.keys(packet).length) packet = {error:'no packet to process'} if (!socket_name || !this.getSocket(socket_name)) packet.error = 'no socket name passed for packet processing' if (!this.getSocket(socket_name)) packet.error = `socket by name of ${socket_name}` if (packet.error) { this.emit('log', {level:'error', error:packet.error, packet:packet, msg:'an error occured before processing an incoming packet'}) return packet // don't process a packet with an error } // TODO allow adding to or altering the process map let processors = new Map([ ['before', this.getSocket(socket_name).beforeProcess ], ['command', this._cmdProcessors[packet.cmdProcessor || this._cmdProcessors[socket_name] ? socket_name : '_default'] ], ['after', this.getSocket(socket_name).afterProcess ], ]) let err for (let [name,func] of processors) { // the same as of recipeMap.entries() [err,packet] = await to(this._process(socket_name,packet,name,func)) if (err) packet.error = err } return packet } async _process(socket_name,packet,name,func) { if (packet.error) return packet // if an error occurs skip any further processing let err, res if (func) { [err,res] = await to(func.call(this,packet,socket_name)) if (err) { // forced an abort to processing packet.error = err } else { if (!isPlainObject(res)) packet.processResult ? packet.processResult[name]=res : packet.processResult = {[name]:res} else { let method = (packet.processMethod || {})[name] || packet.processMethod // TODO could support other specialized merge methods if (method === 'merge') { packet = merge(packet,res) } else { packet=res } } } } return packet } // all sockets are emitters. Adds a listener to all sockets of a type with given event. // now sockets can emit locally processed events _eventListen(type,event,fn) { for (let name of Object.keys(this._socket)) { if (this._socket[name].type === type) { if (fn==='stop') this._socket[name].removeAllListeners(event) else { log.debug({socket:name, type:type, event:event, msg:'adding listener to socket'}) this._socket[name].on(event, fn) } } } } _validateTransport(trans, type='s') { const valids = { w:'w', web:'w', n:'n', named:'n', unix:'n', pipe:'n', t:'t', tcp:'t', net:'t', network:'t', m:'m', mqtt:'m', } trans = valids[trans] || '' if (type !== 's' && trans ==='w') { log.warn({type: type, transport: trans, msg:'Invalid type/transport - Consumer/Client Web Socket not supported use TCP'}) trans = '' } return trans } _transport(name) { return this._socket[name].transport } //getter for socket transport _type(name) { return this._socket[name].type } //getter for socket type _getTransportNamespaces(socket) { return this._namespace[this._type(socket) + this._transport(socket)] } _getCmdFuncNamespace(cmd, namespaces) { let cmd_func = null namespaces.some(namespace => { namespace = namespace ? namespace + '.' + cmd : cmd cmd_func = this._getCmdFunc(namespace) if (cmd_func) return true }) return cmd_func } // takes command and returns corresponding function in a hash, recurisve walk _getCmdFunc(cmd, obj) { if (typeof cmd === 'string') { if (!obj) obj = this cmd = cmd.split(/[.:/]+/) } var prop = cmd.shift() if (cmd.length === 0) return obj[prop] if (!obj[prop]) return null log.debug({length:cmd.length,cmd:cmd, prop:prop, objprop:obj[prop], method:'_getCmdFunc', msg:'command to corresponding function in a hash'}) return this._getCmdFunc(cmd, obj[prop]) } // primary function to find a function to call based on packet cmd async _callCmdFunc(packet, socket) { let cmd_func = this._getCmdFuncNamespace( packet.cmd, this._namespaces[this._type(socket) + this._transport(socket)] ) if (cmd_func) return await cmd_func.call(this,packet) // todo try .call cmd_func = this._getCmdFuncNamespace( packet.cmd, this._namespaces[this._type(socket)] ) if (cmd_func) return await cmd_func.call(this,packet) return 'failed' } } // end Base Class export default Base export { Base, map, changed, isPlainObject, to, merge, loadYaml } // todo share rxjs