// TODO add automated duplex (for consumer will create corresponding socket and send connection info to remote socket) // --------------- UCI dependencies ------------------- // UCI communication transport communication modules // TODO change to dynamic import so loads only if that socket type is requestd import Socket from '@uci/socket' // tcp or named pipe import MQTT from '@uci/mqtt' // requires broker import WebSocket from '@uci/websocket' // server only - client is for web browser only // UCI helpers import { Ready, changed, map } from '@uci-utils/ready' import { bindFuncs } from '@uci-utils/bind-funcs' // UCI logger import logger from '@uci-utils/logger' let log = {} // declare module wide log to be set during construction // ------------------------------------------------- // Community dependencies import to from 'await-to-js' import isPlainObject from 'is-plain-object' import merge from 'merge-anything' // Nodejs dependencies import EventEmitter from 'events' // Internal dependencies import { cmdProcessor, defaultCmds, namespaces } from './processing' // Constants const SOCKET_INFO_KEYS = ['name', 'type', 'transport'] const TRANSLATE = { n: 'Named Pipe', t: 'TCP', s: 'Socket', c: 'Consumer', m: 'MQTT', w: 'Web Socket' } /** * @class Base * @description * An inter-process inter-machine multi socket communication class.
* It is extended to derive many other UCI classes thus the name "Base"
* The class itself is extended from the {@link https://nodejs.org/api/events.html#events_class_eventemitter nodejs EventEmitter } to support in process communication as well * * @extends EventEmitter * @param {Object} opts hash of options * @param {String} [opts.id='ucit-base'+timestamp] and id for this process/instance used for logging, default: uci-base + timestamp * @param {String} [opts.desc] additional description for humans, can be logged. * @param {String | Boolean} [opts.path=either full path to where socket should be created or 'true' (which uses default path] * @param {string} [opts.sockets] comma delimited strong of sockets to be created each socket of the form [name]#[c or s]>[n,t,m or w] * @param {Boolean} [opts.useRootNS=false] include the "root" of the created base instance as part of the namespace for which packet cmd's try to call corresponding function * @param {Object} [opts.(name of socket)] options for a particular socket created by opts.socket. see UCI guide {@link ?content=guide#sockets|creating sockets} * @property {array} socket collection of all sockets created and used by base instance as per opts.sockets or addSocket method * * @example * import Base from '@uci/base' * // options object example, credated is a socket of each transport using their defaults * // and a tcp consumer socket that connects to a tcp socket on another host 'somehost' at port 8888 * const opts = id:mybase, sockets:'us#s>n,tc#c>t,ts#s>t,mqtts#s>m,webs#s>w', tc:{host:'somehost', port:8888}} * let mybaseprocess = new Base(opts) */ class Base extends EventEmitter { constructor(opts = {}) { super() this.name = opts.name || opts.appName || 'a base class instance' this.id = opts.id || 'uci-base:' + new Date().getTime() log = logger({ name: 'base', id: this.id }) this.desc = opts.desc // additional details for humans this._socket = {} // holds all the various communication sockets // these two if passed will get applied to all consumer sockets, otherwise socket defaults will be used this.initTimeout = opts.initTimeout this.retryWait = opts.retryWait this.defaultReturnCmd = opts.defaultReturnCmd this._cmdProcessors = { _default: cmdProcessor } this.ready = new Ready({emitter: this}) // _c and _s are the default namespaces this._namespaces =Object.assign({},namespaces) this._c = Object.assign({},defaultCmds.c) this._s = Object.assign({},defaultCmds.s) // make available a method that will bind a whole object tree of functions // Note: functions called via a command namespace are called by base connext by default // if called directlly/locally they should be bound to base context if desired this.bindFuncs = bindFuncs if (opts.useRootNS) { // add root of instance to checking for command functions - not recommended! this._namespaces.s.splice(-1, 0, null) this._namespaces.c.splice(-1, 0, null) } // this.bindFuncs = bindFuncs // make available a method that will bind a whole object tree of functions this._socket = {} // where all sockets are stored // at creation defined sockets: if (opts.port) this.registerSocket(`${opts.name||'base'}${opts.port&&opts.path ? ':t':''}`,'s','t',{port:opts.port}) if (opts.path) this.registerSocket(`${opts.name||'base'}${opts.port&&opts.path ? ':n':''}`,'s','n',{path: opts.path}) if (opts.sockets) { let sockets = opts.sockets sockets = Array.isArray(sockets) ? sockets:[sockets] sockets.forEach(socket => this.registerSocket(socket)) } } // end constructor /* * -------------------------------- * CLASS METHODS * -------------------------------- */ // PUBLIC METHODS /** * initialize the instance with the set options. This must be called to initialize all sockets and connections * @async * @public * @required * @param {array} sockets string of one or array array names to initialize, if none, then all current added sockets will be initialized * */ async init(sockets) { // Object.getPrototypeOf(Object.getPrototypeOf(this).init.call(this,sockets)) return this.socketsInit(sockets) // can do other init stuff here } async socketsInit(sockets) { let results = {} let errors = {} // single socket intialize mapper const initialize = async socket => { return new Promise(async function(resolve) { try { const value = await socket.init() this.emit('status',{level:'info', socketName:socket.name, msg:'socket successfully initialized', message:value}) results[socket.name] = value resolve(value) } catch (error) { this.emit('status',{level:'fatal', socketName:socket.name, msg:'socket init error',error:error})// emit an error here, remove socket // let res = await this.removeSocket(socket.name) errors[socket.name]={error:error} resolve(error) } }.bind(this)) } let inits = [] if (!sockets) { sockets = Object.keys(this._socket).filter(name => { return !this._socket[name].active // only intialize (connect) inactive sockets }) } if (typeof sockets ==='string') sockets = [sockets] sockets.forEach(name => { if (this._socket[name]) { inits.push({name:name, init:this.getSocketInit(name)}) } else log.warn({msg:`no socket registered by name of ${name} to initialize`}) }) let [err] = await to(Promise.all(inits.map(initialize))) if (err) { this.emit('status',{level:'fatal', msg:'initialize of socket errors was NOT caught --- bad bad',error:err}) return {errors:[err]} } if (Object.keys(errors).length===0) errors=false return {results:results, errors:errors} } // support old name for now async addSocket(name,type,transport,options) { return this.registerSocket(name,type,transport,options) } /** * addSocket - Add a socket at runtime as opposed to via the sockets option at creation * This is not async and will NOT initialize the socket, that must be done with a call to init or socketInit * @param {type} name Name of socket (usually something short but unique) * @param {string} [type=c] consumer/client 'c' or socket/server 's' * @param {string} [transport=n] transport: (n) named pipe/ unix socket, (t) tcp socket, (m) mqtt subscribe, (w) websocket * @param {object} [options={}] options for that particular type/transport of socket (i.e. path, host, port, etc) * * @returns {function} if called before base initialzation it can be ignored as all added sockets will be initialized. After through it be called to initialize that socket */ registerSocket(name, type = 'c', transport = 'n', options = {}) { // console.log('=========================================REGISTER=========',name) if (isPlainObject(name)) ({name, type = 'c', transport = 'n', options = {}} = name) if (typeof name !=='string') return null transport = this._validateTransport(transport) // console.log({ socketName: name, type: type, tranport: transport, options: options, method:'addSocket', line:198, msg:`adding socket ${name}`}) options.id = options.id || name options.name = options.name || name // TODO add a 'd' type for duplex which creates an 's' first and waits on connect to make a 'c' if (type==='c') options = Object.assign({initTimeout:this.initTimeout, retryWait:this.retryWait},options) // outbound if (type==='s') options = Object.assign({defaultReturnCmd:this.defaultReturnCmd},options) // inbound // TODO get rid of hard coded transports and use registered transports (t and n being default) switch (transport) { case 'n': options.path = options.path || true // falls through case 't': // console.log('==========socket options==========\n',name,type,transport,options) this._socket[name] = new Socket[TRANSLATE[type]](options) break case 'm': if (type === 'p') type = 'c' options.client = type==='c' ? true : false options.connect = options.connect || {} if (options.host) options.connect.host = options.host if (options.port) options.connect.port = options.port options.connect.connectTimeout = options.connect.connectTimeout || 5000 this._socket[name] = new MQTT(options) break case 'w': if (type === 's') this._socket[name] = new WebSocket(options) } if (this._socket[name]) { // in case of invalid transport this._socket[name].name = name this._socket[name].type = type this._socket[name].transport = transport this._socket[name]._packetProcess = this._packetProcess.bind(this, name) // bubble up events from inidivual sockets to base instance const EVENTS=['log','connection','connection:consumer', 'connection:socket'] // that should emit up from each socket to instance EVENTS.forEach(event => { this._socket[name].on(event, obj => { if (Object.prototype.toString.call(obj) !== '[object Object]') { let data=obj obj = {} obj.data = data } obj.socketName = name this.emit(event,obj) }) }) if (type==='c') { this.ready.addObserver(name,this._socket[name],{event:'connection:socket',condition:ev=>{return ev.state==='connected'}}) this._socket[name].on('pushed', packet => { packet._header.socketName=name this.emit('pushed', packet) }) } if (type==='s') { // this._socket[name].on('socket',ev=>console.log(ev)) this.ready.addObserver(name,this._socket[name],{ event:'socket', condition: ev => (ev || {}).state ==='listening' }) } return this.getSocketInit(name) // returns the init function (e.g. connect or create) for the socket } } /** * removeSocket - TODO not available * * @param {string} name name of socket as created * @returns {String | Object } success string or error object */ async removeSocket(name) { // NOTE: uci consumers have .end renamed as .close to match socket method for convenience let closeError let [err] = await to(this._socket[name].close()) if(err) if (err.code !== 'ERR_SERVER_NOT_RUNNING') { closeError = {socket:this._socket[name].name, error:err, msg:'socket/consumer closed with errors, but removed'} } this.emit('status', {level:'warn', msg:`socket ${name} has been removed`, socket:this._socket[name].opts}) this._socket[name].removeAllListeners() delete this._socket[name] return closeError ? closeError : 'success' } getSocket(name) { if (name) return this._socket[name] else return this._socket } // returns array of names of sockets that pass filter getSocketsFilter({type,trans, active}) { if (trans) trans = this._validateTransport(trans) let filtered = [] Object.keys(this._socket).forEach(name => { if ((type==null || this._socket[name].type === type) && (trans==null || this._socket[name].transport === trans) && (active==null || this._socket[name].active===active)) filtered.push(name) }) return filtered } getConsumers(filter={}) { filter.type='c' return this.getSocketsFilter(filter) } getSocketInit(name) { let socket = this._socket[name] if(!socket) { log.warn({msg:`can't fetch create/connect function, no socket registered by name of ${name}`}) return null } if (this._socket[name].type === 's' && this._socket[name].transport !== 'm') { return socket.create } else { return socket.connect } } /** * send - Description * * @param {String} name name of socket for send (must be a consumer otherwise use push for server) * @param {Object} packet * * @returns {type} Description */ async send(name, packet) { if (typeof name !== 'string') { packet = name let sends = [] for (let name of Object.keys(this._socket)) { if (this._socket[name].type === 'c') { let hookedPacket = {} hookedPacket = this._socket[name].beforeSend ? await this._socket[name].beforeSend.call(this,Object.assign({},packet)) : packet log.debug({msg:'after hook, sending packet', name:name, packet:hookedPacket, method:'send', line:235}) sends.push(this._socket[name].send.bind(this._socket[name],hookedPacket)) } } if (sends.length === 1) return await sends[0]() return Promise.all( sends.map(send => { return send() }) ) } else { if (this._socket[name]) { if (this._socket[name].beforeSend) packet = await this._socket[name].beforeSend.call(this,packet) log.debug({msg:'single socket hooked packet to send', name:name, packet:packet, method:'send', line:230}) return await this._socket[name].send(packet) } else return { error: `no consumer socket of name ${name}` } } } // sockets not passed all sockets pushed, otherwise array of names or sting of transport async push(packet,sockets) { if (Array.isArray(sockets)) { let socks = [] sockets.forEach(name => {if (this._socket[name].type==='s') socks.push(this._socket[name])}) sockets = socks } else { let trans = null if (typeof sockets === 'string') trans = sockets sockets = Object.values(this._socket).filter(socket=>socket.type === 's') if (trans && trans !=='all') { sockets = sockets.filter(socket=>socket.transport === this._validateTransport(trans)) } } let broadcast = [] for (let socket of sockets) { let hookedPacket = {} hookedPacket = socket.beforeSend ? await socket.beforeSend.call(this,Object.assign({},packet),true) : packet log.debug({msg:'hooked packet to push', name:socket.name, packet:hookedPacket, method:'push', line:243}) broadcast.push(socket.push.bind(socket,hookedPacket)) } return Promise.all( broadcast.map(push => { return push() }) ) } // TODO make push version of all this sends // TODO accept alt transport string i.e. t or TCP async sendTransport(packet, transport) { let sends = [] for (let name of Object.keys(this._socket)) { if (this._socket[name].type === 'c') { if (this._socket[name].transport === transport) { sends.push(this._socket[name].send.bind(this._socket[name])) } } } if (sends.length === 1) return sends[0](packet) return Promise.all( sends.map(send => { return send(packet) }) ) } // async sendMQTT(topic, packet) {return this._socket.mqtt.send(topic, packet)} async sendTCP(packet) { return this.sendTransport(packet, 't') } // TODO change this to PIPE async sendIPC(packet) { return this.sendTransport(packet, 'n') } // TODO add sendMQTT, sendWS socketsListen(event,fn) { this._eventListen('s',event,fn) } consumersListen(event,fn) { this._eventListen('c',event,fn) } getPacketByName(name, packets) { if (!packets.length) packets = [packets] let found = {} packets.some((packet, index, packets) => { if (packet._header.sender.name === name) { found = packets[index] return true } }) return found } // add set of functions to class prop/space and then register with this addNamespace(space, type, trans) { if (type !=='c' && type !=='s') { trans = type type = 's' } trans = this._validateTransport(trans) if (trans) return this._namespaces[type + trans].unshift(space) else return this._namespaces[type].unshift(space) } // object of functions, key is cmd name amendCommands(funcs, trans, type) { if (!trans && !type) type = 's' if (trans ==='c' || trans ==='s') { type = trans trans = '' } trans = this._validateTransport(trans) if (!this['_'+type+trans]) this['_'+type+trans] = {} Object.assign(this['_'+type+trans], funcs) // trans is type here log.debug({msg:'amended namespace', id:this.id, default_key:'_'+type+trans, functions:this['_'+type+trans]}) } amendConsumerCommands(funcs, trans) { this.amendCommands(funcs,trans,'c') } amendSocketCommands(funcs, trans) { this.amendCommands(funcs,trans) } // func should take and return a packet. if type beforeSendHook(func,opts) { this._packetHook('beforeSend', func,opts) } beforeProcessHook(func,opts) { this._packetHook('beforeProcess', func,opts) } afterProcessHook(func,opts) { this._packetHook('afterProcess', func,opts) } // A Big Hammer - use only if necessary - default with hooks should suffice // these three will pre-empt default processor to be called in ._packetProcess // add and override default processor for ALL consumers, i.e. packets returning form a send or arrived from a push consumersProcessor(func) { for (let name of Object.keys(this._socket)) { if (this._socket[name].type === 'c') { this.altProcessor(func, name) } } } // add and override default processor for ALL sockets, i.e. packets coming in from consumer send socketsProcessor(func) { for (let name of Object.keys(this._socket)) { if (this._socket[name].type === 's') { this.altProcessor(func, name) } } } // add and override a processor for a particular socket/consumer to list of processors // if no socket name given it will replace the default processor in _processors from processing.js altProcessor(func, socket_name) { socket_name = socket_name || '_default' this._cmdProcessors[socket_name] = func } //=============PRIVATE METHODS ========================================= /* * * Assigns a Hook Function to a Socket, Type or Transport * */ // options allow applying hook function to specific socket or type or transport, default is all type 's' sockets _packetHook(hook,func,opts) { log.debug({msg:'hooking a socket(s)', method:'_packetHook', line:334, hook:hook, function:func, options:opts}) let {name,type,trans,all} = opts if (opts==null) type = 's' // default is all type 's' sockets if (name) this._socket[name][hook] = func else { log.debug({msg:'sockets available to hook', method:'_packetHook', line:338, sockets: Object.keys(this._socket)}) for (let name of Object.keys(this._socket)) { if (this._socket[name].type === type) this._socket[name][hook] = func if (this._socket[name].transport === trans) this._socket[name][hook] = func if (all) this._socket[name][hook] = func if (this._socket[name][hook]) log.debug({msg:'hooked socket', method:'_packetHook', line:343, name:name, type:this._socket[name].type, trans:this._socket[name].transport}) } } } /* ********** main packet processor for all sockets * supports per socket before and after hook processors * supports additonal registered processors called via packet or socket name, with default processor, */ async _packetProcess(socket_name, packet) { if (packet.error) return packet // don't process a packet with an error // TODO allow adding to or altering the process map let processors = new Map([ ['before', this.getSocket(socket_name).beforeProcess ], ['command', this._cmdProcessors[packet.cmdProcessor || this._cmdProcessors[socket_name] ? socket_name : '_default'] ], ['after', this.getSocket(socket_name).afterProcess ], ]) let err for (let [name,func] of processors) { // the same as of recipeMap.entries() [err,packet] = await to(this._process(socket_name,packet,name,func)) if (err) packet.error = err } return packet } async _process(socket_name,packet,name,func) { if (packet.error) return packet // if an error occurs skip any further processing let err, res if (func) { [err,res] = await to(func.call(this,packet,socket_name)) if (err) { // forced an abort to processing packet.error = err } else { if (!isPlainObject(res)) packet.processResult ? packet.processResult[name]=res : packet.processResult = {[name]:res} else { let method = (packet.processMethod || {})[name] || packet.processMethod if (method === 'merge') { packet = merge(packet,res) } else Object.assign(packet,res) } } } this.emit('log', {level:'trace', msg:`processed packet stage:${name}`,socketName:socket_name,packet:packet}) return packet } // all sockets are emitters. Adds a listener to all sockets of a type with given event. // now sockets can emit locally processed events _eventListen(type,event,fn) { for (let name of Object.keys(this._socket)) { if (this._socket[name].type === type) { if (fn==='stop') this._socket[name].removeAllListeners(event) else { log.debug({socket:name, type:type, event:event, msg:'adding listener to socket'}) this._socket[name].on(event, fn) } } } } _validateTransport(trans, type='s') { const valids = { w:'w', web:'w', n:'n', named:'n', unix:'n', pipe:'n', t:'t', tcp:'t', net:'t', network:'t', m:'m', mqtt:'m', } trans = valids[trans] || '' if (type !== 's' && trans ==='w') { log.warn({type: type, transport: trans, msg:'Invalid type/transport - Consumer/Client Web Socket not supported use TCP'}) trans = '' } return trans } _transport(name) { return this._socket[name].transport } //getter for socket transport _type(name) { return this._socket[name].type } //getter for socket type _getTransportNamespaces(socket) { return this._namespace[this._type(socket) + this._transport(socket)] } _getCmdFuncNamespace(cmd, namespaces) { let cmd_func = null namespaces.some(namespace => { namespace = namespace ? namespace + '.' + cmd : cmd cmd_func = this._getCmdFunc(namespace) if (cmd_func) return true }) return cmd_func } // takes command and returns corresponding function in a hash, recurisve walk _getCmdFunc(cmd, obj) { if (typeof cmd === 'string') { if (!obj) obj = this cmd = cmd.split(/[.:/]+/) } var prop = cmd.shift() if (cmd.length === 0) return obj[prop] if (!obj[prop]) return null log.debug({length:cmd.length,cmd:cmd, prop:prop, objprop:obj[prop], method:'_getCmdFunc', msg:'command to corresponding function in a hash'}) return this._getCmdFunc(cmd, obj[prop]) } // primary function to find a function to call based on packet cmd async _callCmdFunc(packet, socket) { let cmd_func = this._getCmdFuncNamespace( packet.cmd, this._namespaces[this._type(socket) + this._transport(socket)] ) if (cmd_func) return await cmd_func.call(this,packet) // todo try .call cmd_func = this._getCmdFuncNamespace( packet.cmd, this._namespaces[this._type(socket)] ) if (cmd_func) return await cmd_func.call(this,packet) return 'failed' } } // end Base Class export default Base export { Base, map, changed, isPlainObject, to, merge } // todo share rxjs