/** * UCI Base Module * @module UCI-Base * @description This is the primary module to use in the UCI ecosystem */ // TODO add automated duplex (for consumer will create corresponding socket and send connection info to remote socket) // --------------- UCI dependencies ------------------- // UCI communication transport communication modules // TODO change to dynamic import so loads only if that connector type is requested import Socket from '@uci/socket' // tcp or named pipe import MQTT from '@uci/mqtt' // requires broker import WebSocket from '@uci/websocket' // server only - client is for web browser only // UCI helpers import { Ready, changed, map } from '@uci-utils/ready' // import { isClass } from '@uci-utils/type' import { bindFuncs } from '@uci-utils/bind-funcs' // UCI logger import logger from '@uci-utils/logger' let log = {} // declare module wide log to be set during construction // ------------------------------------------------- // Community dependencies import to from 'await-to-js' import isPlainObject from 'is-plain-object' import loadYaml from 'load-yaml-file' import { merge } from 'merge-anything' // Nodejs dependencies import EventEmitter from 'events' // Internal dependencies import { cmdProcessor, defaultCmds, namespaces } from './processing' // Constants // TODO transport will become "connector" // type will be input or output, duplex // TODO make sure reach registered connector has unique name // const SOCKET_INFO_KEYS = ['name', 'type', 'transport'] // TODO remove this hard-code and get these from plugin const TRANSLATE = { n: 'Named Pipe', t: 'TCP', s: 'Socket', c: 'Consumer', m: 'MQTT', w: 'Web Socket', } /** * @class Base * @description * An inter-process inter-machine multi socket communication class.
* It is extended to derive many other UCI classes thus the name "Base"
* The class itself is extended from the {@link https://nodejs.org/api/events.html#events_class_eventemitter nodejs EventEmitter } to support in process communication as well * * @extends EventEmitter * @param options {object} - object hash of options * @param [options.id=ucit-base+] {string} and id for this process/instance used for logging * @param {String} [options.desc] additional description for humans, can be logged. * @param {String | Boolean} [opts.path=either full path to where socket should be created or 'true' (which uses default path] * @param {string} [opts.sockets] comma delimited strong of sockets to be created each socket of the form [name]#[c or s]>[n,t,m or w] * @param [opts.useRootNS=false] {boolean} - include the "root" of the created base instance as part of the namespace for a packet cmd - use not recommended * @param {Object} [opts.(name of socket)] options for a particular socket created by opts.socket. see UCI guide {@link ?content=guide#sockets|creating sockets} * @property {array} socket collection of all sockets created and used by base instance as per opts.sockets or addSocket method * * @example * import Base from '@uci/base' * // options object example, credated is a socket of each transport using their defaults * // and a tcp consumer socket that connects to a tcp socket on another host 'somehost' at port 8888 * const opts = id:mybase, sockets:'us#s>n,tc#c>t,ts#s>t,mqtts#s>m,webs#s>w', tc:{host:'somehost', port:8888}} * let mybaseprocess = new Base(opts) */ class Base extends EventEmitter { constructor(opts = {}) { if (typeof opts === 'string') opts = loadYaml.sync(opts) super() this.name = opts.name || opts.appName || 'a base class instance' this.id = opts.id || 'uci-base:' + new Date().getTime() log = logger({ name: 'base', id: this.id }) this.desc = opts.desc // additional details for humans this.opts = opts // make original options available this._socket = {} // holds all the various communication sockets // these two if passed will get applied to all consumer sockets, otherwise socket defaults will be used /** @type {string} - timeout for connecting to a consumer socket */ this.initTimeout = opts.initTimeout this.retryWait = opts.retryWait this.defaultReturnCmd = opts.defaultReturnCmd this._cmdProcessors = { _default: cmdProcessor } this.ready = new Ready({ emitter: this }) // ready packet to be sent when process is "ready" this._readyPacket = { cmd: 'ready', event: `${this.name}:process`, name: this.name, id: this.id, ready: false, } // _c and _s are the default namespaces this._namespaces = Object.assign({}, namespaces) this._c = Object.assign({}, defaultCmds.c) this._s = Object.assign({}, defaultCmds.s) // make available a method that will bind a whole object tree of functions // Note: functions called via a command namespace are called by base connext by default // if called directlly/locally they should be bound to base context if desired this.bindFuncs = bindFuncs if (opts.useRootNS) { // add root of instance to checking for command functions - not recommended! this._namespaces.s.splice(-1, 0, null) this._namespaces.c.splice(-1, 0, null) } // this.bindFuncs = bindFuncs // make available a method that will bind a whole object tree of functions this.initSockets = opts.sockets // sockets to be registered at init this._socket = {} // where all sockets are stored // at creation defined sockets: // generic sockets opts.sockets = Array.isArray(opts.sockets) ? opts.sockets : [opts.sockets] if (opts.port) opts.sockets.push({ name: `${this.name}:tcp`, type: 's', transport: 'tcp', options: { port: opts.port }, }) if (opts.path) opts.sockets.push({ name: `${this.name}:named`, type: 's', transport: 'named', options: { path: opts.path }, }) // specific sockets // if (opts.sockets) { // let sockets = opts.sockets // sockets = Array.isArray(sockets) ? sockets : [sockets] // sockets.forEach((socket) => { // socket.name = socket.name || `${this.name}:${socket.transport}` // this.registerSocket(socket) // }) // } } // end constructor /* * -------------------------------- * CLASS METHODS * -------------------------------- */ // PUBLIC METHODS /** * initialize the instance with the set options. * This must be called to initialize all sockets and connections * @async * @public * @required * @param {array} sockets string of one or array of names to initialize, if none, then all current added sockets will be initialized * */ async init(sockets) { // register all sockets requested in constructor for (const socket of this.opts.sockets) { socket.name = socket.name || `${this.name}:${socket.transport}` // check for unique name await this.registerSocket(socket) } const res = await this.socketsInit(sockets) // will update ready packet and push/send that changed packet on ready state change // on can add more observers to the ready this.ready.all.subscribe(async (ready) => { this._readyPacket.ready = ready delete this._readyPacket._header if (!ready) { // make a list of the failures to send // await new Promise(res=>setTimeout(()=>res(),1000)) this._readyPacket.failures = this.ready.failed } else delete this._readyPacket.failures let packet = Object.assign({}, this._readyPacket) this.emit('log', { level: 'debug', packet: packet, msg: `${this.name} has an updated ready state: broadcasting: event>state = ${this._readyPacket.event}>${this._readyPacket.ready}`, }) // console.log('ready packet to broadcast',packet) // (re)sets the connection packet for each socket this.getSocketsFilter({ type: 's' }).forEach( (socket) => (this.getSocket(socket).conPackets[0] = packet) ) // announce changed ready state setTimeout(async () => { this.send(packet) // to any socket that this instance is connected to this.push(packet) // to any remote consumer connected to an instance socket }, 100) // delay 100ms, fixes issue so won't be sent during a disconnect which causes socket write error }) return res } async socketsInit(sockets) { let results = {} let errors = {} // single socket intialize mapper const initialize = async (socket) => { return new Promise( async function (resolve) { try { const value = await socket.init() this.emit('log', { level: 'info', socketName: socket.name, msg: 'socket successfully initialized', message: value, }) resolve(value) } catch (error) { this.emit('log', { level: 'fatal', socketName: socket.name, msg: 'socket init error', error: error, }) // emit an error here, remove socket // let res = await this.removeSocket(socket.name) errors[socket.name] = { error: error } resolve(error) } }.bind(this) ) } let inits = [] if (!sockets) { sockets = Object.keys(this._socket).filter((name) => { return !this._socket[name].active // only intialize (connect) inactive sockets }) } if (typeof sockets === 'string') sockets = [sockets] sockets.forEach((name) => { if (this._socket[name]) { inits.push({ name: name, init: this.getSocketInit(name) }) } else log.warn({ msg: `no socket registered by name of ${name} to initialize`, }) }) let [err] = await to(Promise.all(inits.map(initialize))) if (err) { this.emit('log', { level: 'fatal', msg: 'initialize of socket errors was NOT caught --- bad bad', error: err, }) return { errors: [err] } } if (Object.keys(errors).length === 0) errors = false return { results: results, errors: errors } } // support old name for now // async addSocket(name, type, transport, options) { // return this.registerSocket(name, type, transport, options); // } // TODO use plugins for transports instead /** * registerSocket - register a socket for use * This is not async and will NOT initialize the socket, that must be done with a call to init or socketInit * @param {type} name Name of socket (usually something short but unique) * @param {string} [type=c] consumer/client 'c' or socket/server 's' * @param {string} [transport=n] transport: (n) named pipe/ unix socket, (t) tcp socket, (m) mqtt subscribe, (w) websocket * @param {object} [options={}] options for that particular type/transport of socket (i.e. path, host, port, etc) * * @returns {function} if called before base initialzation it can be ignored as all added sockets will be initialized. After through it be called to initialize that socket */ async registerSocket(name, type = 'c', transport, options = {}) { if (isPlainObject(name)) ({ name, type = 'c', transport, options = {} } = name) if (typeof name !== 'string') return null transport = transport || (options.port ? 'tcp' : 'named') transport = this._validateTransport(transport) if (!transport) { log.error({ socketName: name, type: type, transport: transport, options: options, method: 'registerSocket', msg: `invalid transport ${transport}`, }) return null } log.info({ socketName: name, type: type, tranport: transport, options: options, method: 'registerSocket', line: 198, msg: `registering socket ${name}`, }) options.id = options.id || name options.name = options.name || name // TODO add a 'd' type for duplex which creates an 's' first and waits on connect to make a 'c' if (type === 'c') options = Object.assign( { initTimeout: this.initTimeout, retryWait: this.retryWait, }, options ) // outbound if (type === 's') { let conPackets = [] // [this._readyPacket] conPackets = options.conPackets ? conPackets.concat(options.conPackets) : conPackets conPackets = options.conPacket ? conPackets.push(options.conPacket) : conPackets options = Object.assign( { defaultReturnCmd: this.defaultReturnCmd, conPackets: conPackets, }, options ) // inbound } // TODO get rid of hard coded transports and use registered transports (t and n being default) // TODO dynamically import only requested connector plugins // Base will pass options to plugin // plugin will pass a unique transport identifier to base // all plugins will use a function to generate a class instance switch (transport) { case 'n': options.path = options.path || true // falls through case 't': this._socket[name] = new Socket[TRANSLATE[type]](options) break case 'm': if (type === 'p') type = 'c' options.client = type === 'c' ? true : false options.connect = options.connect || {} if (options.host) options.connect.host = options.host if (options.port) options.connect.port = options.port options.connect.connectTimeout = options.connect.connectTimeout || 5000 this._socket[name] = new MQTT(options) break case 'w': if (type === 's') this._socket[name] = await WebSocket(options) } if (this._socket[name]) { // in case of invalid transport this._socket[name].name = name this._socket[name].type = type this._socket[name].transport = transport this._socket[name]._packetProcess = this._packetProcess.bind(this, name) // bubble up events from inidivual sockets to base instance, // connection:consumer is a socket emitting when a consumer is connecting // connection:socket is a consumer emiting when connecting to a socket const EVENTS = [ 'log', 'socket', 'connection', 'connection:consumer', 'connection:socket', ] // that should emit up from each socket to instance EVENTS.forEach((event) => { this._socket[name].on(event, (obj) => { if (Object.prototype.toString.call(obj) !== '[object Object]') { let data = obj obj = {} obj.data = data } obj.socketName = name this.emit(event, obj) }) }) if (type === 'c') { // when consumer has sucessfully connected to a socket this._socket[name].obsName = `${name}:${ options.path ? options.path : `${options.host}:${options.port}` }` this.ready.addObserver(this._socket[name].obsName, this._socket[name], { event: 'connection:socket', condition: (ev) => ev.state === 'connected', }) // set up listner for any pushed packets and emit locally this._socket[name].on('pushed', (packet) => { packet._header.socketName = name this.emit('pushed', packet) }) } if (type === 's') { // when socket is listnening this.ready.addObserver( `${name}:socket`, this._socket[name], { event: 'socket', condition: (ev) => (ev || {}).state === 'listening', } ) // initially set conPackets, ready packets is ALWAYS the first this._socket[name].conPackets.unshift(this._readyPacket) if (options.duplex) this.consumerConnected(this._socket[name], { consumer: options.duplex, add: true, }) } return this._socket[name] // return handle to newly registered socket // return this.getSocketInit(name) // returns the init function (e.g. connect or create) for the socket } } /** * removeSocket * * @param {string} name name of socket as created * @returns {String | Object } success string or error object */ async removeSocket(name) { // NOTE: uci consumers have .end renamed as .close to match socket method for convenience if (typeof name !== 'string') return 'no socket name passed, nothing to remove' const socket = this.getSocket(name) if (!socket) return 'no socket by that name, nothing to remove' let closeError if (typeof socket.close !== 'function') return 'bad socket no close function, nothing to remove' let [err] = await to(socket.close()) if (err) if (err.code !== 'ERR_SERVER_NOT_RUNNING') { closeError = { socket: this._socket[name].name, error: err, msg: 'socket/consumer closed with errors, but removed', } } this.emit('log', { level: 'warn', msg: `socket ${name} has been removed`, socket: this._socket[name].opts, }) socket.removeAllListeners() this.ready.removeObserver( socket.type === 'c' ? this._socket[name].obsName : `${name}:socket` ) delete this._socket[name] return closeError ? closeError : 'success' } getSocket(name) { if (name) return this._socket[name] || null else return this._socket } // returns array of names of sockets that pass filter getSocketsFilter({ type, trans, active } = {}) { if (trans) trans = this._validateTransport(trans) let filtered = [] Object.keys(this._socket).forEach((name) => { if ( (type == null || this._socket[name].type === type) && (trans == null || this._socket[name].transport === trans) && (active == null || this._socket[name].active === active) ) filtered.push(name) }) return filtered } getConsumers(filter = {}) { filter.type = 'c' return this.getSocketsFilter(filter) } getSocketInit(name) { let socket = this._socket[name] if (!socket) { log.warn({ msg: `can't fetch create/connect function, no socket registered by name of ${name}`, }) return null } if ( this._socket[name].type === 's' && this._socket[name].transport !== 'm' ) { return socket.create } else { return socket.connect } } /** * send - Description * * @param {String} name name of socket for send (must be a consumer otherwise use push for server) * @param {Object} packet * * @returns {type} Description */ async send(name, packet) { let names = [] if (typeof name !== 'string') { packet = name names = this.getConsumers() } else { const con = this.getSocket(name) if (!con) return `no consumer ${name} for sending` names = [name] } if (!packet || !Object.keys(packet).length) return 'no packet to send - aborted' let sends = [] if (!names.length) return 'no consumers available for send, aborting' for (let name of names) { const consumer = this.getSocket(name) let hookedPacket = {} hookedPacket = consumer.beforeSend ? await consumer.beforeSend.call(this, Object.assign({}, packet)) : packet log.debug({ msg: 'after hook, sending packet', name: consumer.name, packet: hookedPacket, method: 'send', }) sends.push(consumer.send.bind(consumer, hookedPacket)) } if (sends.length === 1) return await sends[0]() return Promise.all( sends.map((send) => { return send() }) ) } async push(packet, opts = {}) { if (!packet || !Object.keys(packet).length) return Promise.resolve('no packet to push - aborted') let sockets = this.getSocketsFilter({ type: 's' }) if (!sockets.length) return Promise.resolve('no sockets on which to push') opts.sockets = opts.sockets ? opts.sockets : opts.socket ? [opts.socket] : [] if (opts.sockets.length) sockets = sockets.filter((name) => opts.sockets.includes(name)) sockets = sockets .map((name) => this.getSocket(name)) .filter((sock) => opts.transport && opts.transport !== 'all' ? sock.transport === this._validateTransport(opts.transport) : true ) // console.log(sockets.length, 'sockets for push', sockets.map(socket=>socket.name)) if (!sockets.length) return Promise.resolve('no sockets on which to push') let broadcast = [] // TODO use map and reflect for (let socket of sockets) { let hookedPacket = {} hookedPacket = socket.beforeSend ? await socket.beforeSend.call(this, Object.assign({}, packet), true) : packet broadcast.push(socket.push.bind(socket, hookedPacket, opts)) } return Promise.all( broadcast.map((push) => { return push() }) ) } // TODO accept alt transport string i.e. t or TCP async sendTransport(packet, transport) { let sends = [] for (let name of Object.keys(this._socket)) { if (this._socket[name].type === 'c') { if (this._socket[name].transport === transport) { sends.push(this._socket[name].send.bind(this._socket[name])) } } } if (sends.length === 1) return sends[0](packet) return Promise.all( sends.map((send) => { return send(packet) }) ) } // async sendMQTT(topic, packet) {return this._socket.mqtt.send(topic, packet)} async sendTCP(packet) { return this.sendTransport(packet, 't') } // TODO change this to PIPE async sendIPC(packet) { return this.sendTransport(packet, 'n') } // TODO add sendMQTT, sendWS socketsListen(event, fn) { this._eventListen('s', event, fn) } consumersListen(event, fn) { this._eventListen('c', event, fn) } getPacketByName(name, packets) { if (!packets.length) packets = [packets] let found = {} packets.some((packet, index, packets) => { if (packet._header.sender.name === name) { found = packets[index] return true } }) return found } // add set of functions to class prop/space and then register with this addNamespace(space, type, trans) { if (type !== 'c' && type !== 's') { trans = type type = 's' } trans = this._validateTransport(trans) if (trans) return this._namespaces[type + trans].unshift(space) else return this._namespaces[type].unshift(space) } // object of functions, key is cmd name amendCommands(funcs, trans, type) { if (!trans && !type) type = 's' if (trans === 'c' || trans === 's') { type = trans trans = '' } trans = this._validateTransport(trans) if (!this['_' + type + trans]) this['_' + type + trans] = {} Object.assign(this['_' + type + trans], funcs) // trans is type here log.debug({ msg: 'amended namespace', id: this.id, default_key: '_' + type + trans, functions: this['_' + type + trans], }) } amendConsumerCommands(funcs, trans) { this.amendCommands(funcs, trans, 'c') } amendSocketCommands(funcs, trans) { this.amendCommands(funcs, trans) } // func should take and return a packet. if type beforeSendHook(func, opts) { this._packetHook('beforeSend', func, opts) } beforeProcessHook(func, opts) { this._packetHook('beforeProcess', func, opts) } afterProcessHook(func, opts) { this._packetHook('afterProcess', func, opts) } // A Big Hammer - use only if necessary - default with hooks should suffice // these three will pre-empt default processor to be called in ._packetProcess // add and override default processor for ALL consumers, i.e. packets returning form a send or arrived from a push consumersProcessor(func) { for (let name of Object.keys(this._socket)) { if (this._socket[name].type === 'c') { this.altProcessor(func, name) } } } // add and override default processor for ALL sockets, i.e. packets coming in from consumer send socketsProcessor(func) { for (let name of Object.keys(this._socket)) { if (this._socket[name].type === 's') { this.altProcessor(func, name) } } } // add and override a processor for a particular socket/consumer to list of processors // if no socket name given it will replace the default processor in _processors from processing.js altProcessor(func, socket_name) { socket_name = socket_name || '_default' this._cmdProcessors[socket_name] = func } // a call to this method will (make or add) return and or subscribe a ready observer for incoming consumer connections consumerConnected(socket, opts = {}) { let { subscribe, consumer, name, add } = opts const conditionHandler = async (ev) => { if ((ev || {}).state === 'connected') { let data = ev.data || {} if (consumer) { // specific consumer check if ( data.name === consumer || [ev.name, ev.id, data.name, data.id].some((name) => (name || '').includes(consumer) ) ) return true } else return true } return false } if (typeof socket === 'string') socket = this.getSocket(socket) name = name || consumer add = add && consumer const options = { event: 'connection:consumer', condition: conditionHandler, } let oname = `${name}:consumer>${socket.name}:socket` const obs = add ? this.ready.addObserver(oname, socket, options) : this.ready.makeObserver(socket, options) if (typeof subscribe === 'function') return obs.subscribe(subscribe) return obs } // end consumerConnected //=============PRIVATE METHODS ========================================= /* * * Assigns a Hook Function to a Socket, Type or Transport * */ // options allow applying hook function to specific socket or type or transport, default is all type 's' sockets _packetHook(hook, func, opts) { log.debug({ msg: 'hooking a socket(s)', method: '_packetHook', line: 334, hook: hook, function: func, options: opts, }) let { name, type, trans, all } = opts if (opts == null) type = 's' // default is all type 's' sockets if (name) this._socket[name][hook] = func else { log.debug({ msg: 'sockets available to hook', method: '_packetHook', line: 338, sockets: Object.keys(this._socket), }) for (let name of Object.keys(this._socket)) { if (this._socket[name].type === type) this._socket[name][hook] = func if (this._socket[name].transport === trans) this._socket[name][hook] = func if (all) this._socket[name][hook] = func if (this._socket[name][hook]) log.debug({ msg: 'hooked socket', method: '_packetHook', line: 343, name: name, type: this._socket[name].type, trans: this._socket[name].transport, }) } } } /* ********** main packet processor for all sockets * supports per socket before and after hook processors * supports additonal registered processors called via packet or socket name, with default processor, */ async _packetProcess(socket_name, packet) { if (!packet || !Object.keys(packet).length) packet = { error: 'no packet to process' } if (!socket_name || !this.getSocket(socket_name)) packet.error = 'no socket name passed for packet processing' if (!this.getSocket(socket_name)) packet.error = `socket by name of ${socket_name}` if (packet.error) { this.emit('log', { level: 'error', error: packet.error, packet: packet, msg: 'an error occured before processing an incoming packet', }) return packet // don't process a packet with an error } // TODO allow adding to or altering the process map let processors = new Map([ ['before', this.getSocket(socket_name).beforeProcess], [ 'command', this._cmdProcessors[ packet.cmdProcessor || this._cmdProcessors[socket_name] ? socket_name : '_default' ], ], ['after', this.getSocket(socket_name).afterProcess], ]) let err for (let [name, func] of processors) { // the same as of recipeMap.entries() [err, packet] = await to(this._process(socket_name, packet, name, func)) if (err) packet.error = err } return packet } async _process(socket_name, packet, name, func) { if (packet.error) return packet // if an error occurs skip any further processing let err, res if (func) { [err, res] = await to(func.call(this, packet, socket_name)) if (err) { // forced an abort to processing packet.error = err } else { if (!isPlainObject(res)) packet.processResult ? (packet.processResult[name] = res) : (packet.processResult = { [name]: res }) else { let method = (packet.processMethod || {})[name] || packet.processMethod // TODO could support other specialized merge methods if (method === 'merge') { packet = merge(packet, res) } else { packet = res } } } } return packet } // all sockets are emitters. Adds a listener to all sockets of a type with given event. // now sockets can emit locally processed events _eventListen(type, event, fn) { for (let name of Object.keys(this._socket)) { if (this._socket[name].type === type) { if (fn === 'stop') this._socket[name].removeAllListeners(event) else { log.debug({ socket: name, type: type, event: event, msg: 'adding listener to socket', }) this._socket[name].on(event, fn) } } } } _validateTransport(trans, type = 's') { const valids = { w: 'w', web: 'w', n: 'n', named: 'n', unix: 'n', pipe: 'n', t: 't', tcp: 't', net: 't', network: 't', m: 'm', mqtt: 'm', } trans = valids[trans] || null if (type !== 's' && trans === 'w') { log.warn({ type: type, transport: trans, msg: 'Invalid type/transport - Consumer/Client Web Socket not supported use TCP', }) trans = null } return trans } _transport(name) { return this._socket[name].transport } //getter for socket transport _type(name) { return this._socket[name].type } //getter for socket type _getTransportNamespaces(socket) { return this._namespace[this._type(socket) + this._transport(socket)] } _getCmdFuncNamespace(cmd, namespaces) { let cmd_func = null namespaces.some((namespace) => { namespace = namespace ? namespace + '.' + cmd : cmd cmd_func = this._getCmdFunc(namespace) if (cmd_func) return true }) return cmd_func } // takes command and returns corresponding function in a hash, recurisve walk _getCmdFunc(cmd, obj) { if (typeof cmd === 'string') { if (!obj) obj = this cmd = cmd.split(/[.:/]+/) } var prop = cmd.shift() if (cmd.length === 0) return obj[prop] if (!obj[prop]) return null log.debug({ length: cmd.length, cmd: cmd, prop: prop, objprop: obj[prop], method: '_getCmdFunc', msg: 'command to corresponding function in a hash', }) return this._getCmdFunc(cmd, obj[prop]) } // primary function to find a function to call based on packet cmd async _callCmdFunc(packet, socket) { let cmd_func = this._getCmdFuncNamespace( packet.cmd, this._namespaces[this._type(socket) + this._transport(socket)] ) if (cmd_func) return await cmd_func.call(this, packet) // todo try .call cmd_func = this._getCmdFuncNamespace( packet.cmd, this._namespaces[this._type(socket)] ) if (cmd_func) return await cmd_func.call(this, packet) return 'failed' } } // end Base Class export default Base export { Base, Ready, map, changed, isPlainObject, to, merge, loadYaml } // todo share rxjs