diff --git a/src/base.js b/src/base.js index 9934d4c..c208c7d 100644 --- a/src/base.js +++ b/src/base.js @@ -8,13 +8,15 @@ // --------------- UCI dependencies ------------------- // UCI communication transport communication modules -// TODO change to dynamic import so loads only if that socket type is requestd +// TODO change to dynamic import so loads only if that connector type is requested import Socket from '@uci/socket' // tcp or named pipe -// TDDO import these socket libraries dynamically from per peerDependencies + import MQTT from '@uci/mqtt' // requires broker import WebSocket from '@uci/websocket' // server only - client is for web browser only + // UCI helpers import { Ready, changed, map } from '@uci-utils/ready' +// import { isClass } from '@uci-utils/type' import { bindFuncs } from '@uci-utils/bind-funcs' // UCI logger import logger from '@uci-utils/logger' @@ -24,7 +26,7 @@ let log = {} // declare module wide log to be set during construction // Community dependencies import to from 'await-to-js' import isPlainObject from 'is-plain-object' -import loadYaml from 'load-yaml-file' +import loadYaml from 'load-yaml-file' import { merge } from 'merge-anything' // Nodejs dependencies import EventEmitter from 'events' @@ -33,14 +35,19 @@ import EventEmitter from 'events' import { cmdProcessor, defaultCmds, namespaces } from './processing' // Constants +// TODO transport will become "connector" +// type will be input or output, duplex +// TODO make sure reach registered connector has unique name // const SOCKET_INFO_KEYS = ['name', 'type', 'transport'] +// TODO remove this hard-code and get these from plugin + const TRANSLATE = { n: 'Named Pipe', t: 'TCP', s: 'Socket', c: 'Consumer', m: 'MQTT', - w: 'Web Socket' + w: 'Web Socket', } /** @@ -68,13 +75,14 @@ const TRANSLATE = { * let mybaseprocess = new Base(opts) */ class Base extends EventEmitter { - constructor(opts={}) { - if (typeof opts === 'string') opts=loadYaml.sync(opts) + constructor(opts = {}) { + if (typeof opts === 'string') opts = loadYaml.sync(opts) super() this.name = opts.name || opts.appName || 'a base class instance' this.id = opts.id || 'uci-base:' + new Date().getTime() log = logger({ name: 'base', id: this.id }) this.desc = opts.desc // additional details for humans + this.opts = opts // make original options available this._socket = {} // holds all the various communication sockets // these two if passed will get applied to all consumer sockets, otherwise socket defaults will be used /** @type {string} - timeout for connecting to a consumer socket */ @@ -82,13 +90,19 @@ class Base extends EventEmitter { this.retryWait = opts.retryWait this.defaultReturnCmd = opts.defaultReturnCmd this._cmdProcessors = { _default: cmdProcessor } - this.ready = new Ready({emitter: this}) + this.ready = new Ready({ emitter: this }) // ready packet to be sent when process is "ready" - this._readyPacket = {cmd:'ready', event:`${this.name}:process`, name:this.name, id:this.id, ready:false} + this._readyPacket = { + cmd: 'ready', + event: `${this.name}:process`, + name: this.name, + id: this.id, + ready: false, + } // _c and _s are the default namespaces - this._namespaces =Object.assign({},namespaces) - this._c = Object.assign({},defaultCmds.c) - this._s = Object.assign({},defaultCmds.s) + this._namespaces = Object.assign({}, namespaces) + this._c = Object.assign({}, defaultCmds.c) + this._s = Object.assign({}, defaultCmds.s) // make available a method that will bind a whole object tree of functions // Note: functions called via a command namespace are called by base connext by default // if called directlly/locally they should be bound to base context if desired @@ -99,23 +113,41 @@ class Base extends EventEmitter { this._namespaces.c.splice(-1, 0, null) } // this.bindFuncs = bindFuncs // make available a method that will bind a whole object tree of functions + this.initSockets = opts.sockets // sockets to be registered at init this._socket = {} // where all sockets are stored // at creation defined sockets: - if (opts.port) this.registerSocket(`${opts.name||'base'}${opts.port&&opts.path ? ':t':''}`,'s','t',{port:opts.port}) - if (opts.path) this.registerSocket(`${opts.name||'base'}${opts.port&&opts.path ? ':n':''}`,'s','n',{path: opts.path}) - if (opts.sockets) { - let sockets = opts.sockets - sockets = Array.isArray(sockets) ? sockets:[sockets] - sockets.forEach(socket => this.registerSocket(socket)) - } - + // generic sockets + opts.sockets = Array.isArray(opts.sockets) ? opts.sockets : [opts.sockets] + if (opts.port) + opts.sockets.push({ + name: `${this.name}:tcp`, + type: 's', + transport: 'tcp', + options: { port: opts.port }, + }) + if (opts.path) + opts.sockets.push({ + name: `${this.name}:named`, + type: 's', + transport: 'named', + options: { path: opts.path }, + }) + // specific sockets + // if (opts.sockets) { + // let sockets = opts.sockets + // sockets = Array.isArray(sockets) ? sockets : [sockets] + // sockets.forEach((socket) => { + // socket.name = socket.name || `${this.name}:${socket.transport}` + // this.registerSocket(socket) + // }) + // } } // end constructor /* - * -------------------------------- - * CLASS METHODS - * -------------------------------- - */ + * -------------------------------- + * CLASS METHODS + * -------------------------------- + */ // PUBLIC METHODS @@ -125,86 +157,116 @@ class Base extends EventEmitter { * @async * @public * @required - * @param {array} sockets string of one or array array names to initialize, if none, then all current added sockets will be initialized - * + * @param {array} sockets string of one or array of names to initialize, if none, then all current added sockets will be initialized + * */ async init(sockets) { - // TODO ready needs to allow multiple all subscribers that get rebuilt on add/remove + + // register all sockets requested in constructor + for (const socket of this.opts.sockets) { + socket.name = socket.name || `${this.name}:${socket.transport}` // check for unique name + await this.registerSocket(socket) + } + const res = await this.socketsInit(sockets) - // update ready packet and push/send that changed packet - this.ready.all.subscribe(async ready => { - this._readyPacket.ready= ready - delete (this._readyPacket._header) - if (!ready) { // make a list of the failures to send + // will update ready packet and push/send that changed packet on ready state change + // on can add more observers to the ready + this.ready.all.subscribe(async (ready) => { + this._readyPacket.ready = ready + delete this._readyPacket._header + if (!ready) { + // make a list of the failures to send // await new Promise(res=>setTimeout(()=>res(),1000)) this._readyPacket.failures = this.ready.failed } else delete this._readyPacket.failures - let packet = Object.assign({},this._readyPacket) - this.emit('log',{level:'debug', packet:packet, msg:`${this.name} has an updated ready state: broadcasting: event>state = ${this._readyPacket.event}>${this._readyPacket.ready}`}) + let packet = Object.assign({}, this._readyPacket) + this.emit('log', { + level: 'debug', + packet: packet, + msg: `${this.name} has an updated ready state: broadcasting: event>state = ${this._readyPacket.event}>${this._readyPacket.ready}`, + }) // console.log('ready packet to broadcast',packet) - // set the connection packet for each socket - this.getSocketsFilter({type:'s'}) - .forEach(socket=>this.getSocket(socket).conPackets[0]=packet) + // (re)sets the connection packet for each socket + this.getSocketsFilter({ type: 's' }).forEach( + (socket) => (this.getSocket(socket).conPackets[0] = packet) + ) // announce changed ready state setTimeout(async () => { this.send(packet) // to any socket that this instance is connected to - this.push(packet) // to any remote consumer connected to an instance socket - },100) // delay 100ms, fixes issue so won't be sent during a disconnect which causes socket write error - + this.push(packet) // to any remote consumer connected to an instance socket + }, 100) // delay 100ms, fixes issue so won't be sent during a disconnect which causes socket write error }) return res } async socketsInit(sockets) { - let results = {} let errors = {} // single socket intialize mapper - const initialize = async socket => { - return new Promise(async function(resolve) { - try { - const value = await socket.init() - this.emit('log',{level:'info', socketName:socket.name, msg:'socket successfully initialized', message:value}) - resolve(value) - } catch (error) { - this.emit('log',{level:'fatal', socketName:socket.name, msg:'socket init error',error:error})// emit an error here, remove socket - // let res = await this.removeSocket(socket.name) - errors[socket.name]={error:error} - resolve(error) - } - }.bind(this)) + const initialize = async (socket) => { + return new Promise( + async function (resolve) { + try { + const value = await socket.init() + this.emit('log', { + level: 'info', + socketName: socket.name, + msg: 'socket successfully initialized', + message: value, + }) + resolve(value) + } catch (error) { + this.emit('log', { + level: 'fatal', + socketName: socket.name, + msg: 'socket init error', + error: error, + }) // emit an error here, remove socket + // let res = await this.removeSocket(socket.name) + errors[socket.name] = { error: error } + resolve(error) + } + }.bind(this) + ) } let inits = [] - if (!sockets) { sockets = - Object.keys(this._socket).filter(name => { - return !this._socket[name].active // only intialize (connect) inactive sockets + if (!sockets) { + sockets = Object.keys(this._socket).filter((name) => { + return !this._socket[name].active // only intialize (connect) inactive sockets }) } - if (typeof sockets ==='string') sockets = [sockets] + if (typeof sockets === 'string') sockets = [sockets] - sockets.forEach(name => { + sockets.forEach((name) => { if (this._socket[name]) { - inits.push({name:name, init:this.getSocketInit(name)}) - } else log.warn({msg:`no socket registered by name of ${name} to initialize`}) + inits.push({ name: name, init: this.getSocketInit(name) }) + } else + log.warn({ + msg: `no socket registered by name of ${name} to initialize`, + }) }) let [err] = await to(Promise.all(inits.map(initialize))) if (err) { - this.emit('log',{level:'fatal', msg:'initialize of socket errors was NOT caught --- bad bad',error:err}) - return {errors:[err]} + this.emit('log', { + level: 'fatal', + msg: 'initialize of socket errors was NOT caught --- bad bad', + error: err, + }) + return { errors: [err] } } - if (Object.keys(errors).length===0) errors=false - return {results:results, errors:errors} + if (Object.keys(errors).length === 0) errors = false + return { results: results, errors: errors } } // support old name for now - async addSocket(name,type,transport,options) { - return this.registerSocket(name,type,transport,options) - } - + // async addSocket(name, type, transport, options) { + // return this.registerSocket(name, type, transport, options); + // } + // TODO use plugins for transports instead /** - * addSocket - Add a socket at runtime as opposed to via the sockets option at creation + * registerSocket - register a socket for use * This is not async and will NOT initialize the socket, that must be done with a call to init or socketInit * @param {type} name Name of socket (usually something short but unique) * @param {string} [type=c] consumer/client 'c' or socket/server 's' @@ -213,28 +275,64 @@ class Base extends EventEmitter { * * @returns {function} if called before base initialzation it can be ignored as all added sockets will be initialized. After through it be called to initialize that socket */ - registerSocket(name, type = 'c', transport = 'n', options = {}) { - if (isPlainObject(name)) ({name, type = 'c', transport = 'n', options = {}} = name) - if (typeof name !=='string') return null + async registerSocket(name, type = 'c', transport, options = {}) { + if (isPlainObject(name)) + ({ name, type = 'c', transport, options = {} } = name) + if (typeof name !== 'string') return null + transport = transport || (options.port ? 'tcp' : 'named') transport = this._validateTransport(transport) - log.debug({ socketName: name, type: type, tranport: transport, options: options, method:'addSocket', line:198, msg:`adding socket ${name}`}) + if (!transport) { + log.error({ + socketName: name, + type: type, + transport: transport, + options: options, + method: 'registerSocket', + msg: `invalid transport ${transport}`, + }) + return null + } + log.info({ + socketName: name, + type: type, + tranport: transport, + options: options, + method: 'registerSocket', + line: 198, + msg: `registering socket ${name}`, + }) options.id = options.id || name options.name = options.name || name // TODO add a 'd' type for duplex which creates an 's' first and waits on connect to make a 'c' - if (type==='c') options = Object.assign({ - initTimeout:this.initTimeout, - retryWait:this.retryWait - },options) // outbound - if (type==='s') { + if (type === 'c') + options = Object.assign( + { + initTimeout: this.initTimeout, + retryWait: this.retryWait, + }, + options + ) // outbound + if (type === 's') { let conPackets = [] // [this._readyPacket] - conPackets = options.conPackets ? conPackets.concat(options.conPackets) : conPackets - conPackets = options.conPacket ? conPackets.push(options.conPacket) : conPackets - options = Object.assign({ - defaultReturnCmd:this.defaultReturnCmd, - conPackets: conPackets - },options) // inbound + conPackets = options.conPackets + ? conPackets.concat(options.conPackets) + : conPackets + conPackets = options.conPacket + ? conPackets.push(options.conPacket) + : conPackets + options = Object.assign( + { + defaultReturnCmd: this.defaultReturnCmd, + conPackets: conPackets, + }, + options + ) // inbound } // TODO get rid of hard coded transports and use registered transports (t and n being default) + // TODO dynamically import only requested connector plugins + // Base will pass options to plugin + // plugin will pass a unique transport identifier to base + // all plugins will use a function to generate a class instance switch (transport) { case 'n': options.path = options.path || true @@ -244,7 +342,7 @@ class Base extends EventEmitter { break case 'm': if (type === 'p') type = 'c' - options.client = type==='c' ? true : false + options.client = type === 'c' ? true : false options.connect = options.connect || {} if (options.host) options.connect.host = options.host if (options.port) options.connect.port = options.port @@ -252,10 +350,11 @@ class Base extends EventEmitter { this._socket[name] = new MQTT(options) break case 'w': - if (type === 's') this._socket[name] = new WebSocket(options) + if (type === 's') this._socket[name] = await WebSocket(options) } - if (this._socket[name]) { // in case of invalid transport + if (this._socket[name]) { + // in case of invalid transport this._socket[name].name = name this._socket[name].type = type @@ -265,38 +364,60 @@ class Base extends EventEmitter { // bubble up events from inidivual sockets to base instance, // connection:consumer is a socket emitting when a consumer is connecting // connection:socket is a consumer emiting when connecting to a socket - const EVENTS=['log','socket','connection','connection:consumer', 'connection:socket'] // that should emit up from each socket to instance - EVENTS.forEach(event => { - this._socket[name].on(event, obj => { + const EVENTS = [ + 'log', + 'socket', + 'connection', + 'connection:consumer', + 'connection:socket', + ] // that should emit up from each socket to instance + EVENTS.forEach((event) => { + this._socket[name].on(event, (obj) => { if (Object.prototype.toString.call(obj) !== '[object Object]') { - let data=obj + let data = obj obj = {} obj.data = data } obj.socketName = name - this.emit(event,obj) + this.emit(event, obj) }) }) - if (type==='c') { + if (type === 'c') { // when consumer has sucessfully connected to a socket - this._socket[name].obsName = `${name}:${options.path ? options.path : `${options.host}:${options.port}`}` - this.ready.addObserver(this._socket[name].obsName,this._socket[name],{event:'connection:socket',condition:ev=>ev.state==='connected'}) + this._socket[name].obsName = `${name}:${ + options.path ? options.path : `${options.host}:${options.port}` + }` + this.ready.addObserver(this._socket[name].obsName, this._socket[name], { + event: 'connection:socket', + condition: (ev) => ev.state === 'connected', + }) // set up listner for any pushed packets and emit locally - this._socket[name].on('pushed', packet => { - packet._header.socketName=name + this._socket[name].on('pushed', (packet) => { + packet._header.socketName = name this.emit('pushed', packet) }) } - if (type==='s') { + if (type === 's') { // when socket is listnening - this.ready.addObserver(`${name}:socket`,this._socket[name],{ event:'socket', condition: ev => (ev || {}).state ==='listening' }) + this.ready.addObserver( + `${name}:socket`, + this._socket[name], + { + event: 'socket', + condition: (ev) => (ev || {}).state === 'listening', + } + ) // initially set conPackets, ready packets is ALWAYS the first this._socket[name].conPackets.unshift(this._readyPacket) - if (options.duplex) this.consumerConnected(this._socket[name],{consumer:options.duplex, add:true}) + if (options.duplex) + this.consumerConnected(this._socket[name], { + consumer: options.duplex, + add: true, + }) } - return this._socket[name] // return handle to newly registered socket + return this._socket[name] // return handle to newly registered socket // return this.getSocketInit(name) // returns the init function (e.g. connect or create) for the socket } } @@ -310,18 +431,33 @@ class Base extends EventEmitter { async removeSocket(name) { // NOTE: uci consumers have .end renamed as .close to match socket method for convenience - if (typeof name !=='string') return 'no socket name passed, nothing to remove' + if (typeof name !== 'string') + return 'no socket name passed, nothing to remove' const socket = this.getSocket(name) if (!socket) return 'no socket by that name, nothing to remove' let closeError - if (typeof socket.close !== 'function') return 'bad socket no close function, nothing to remove' + if (typeof socket.close !== 'function') + return 'bad socket no close function, nothing to remove' let [err] = await to(socket.close()) - if(err) if (err.code !== 'ERR_SERVER_NOT_RUNNING') { - closeError = {socket:this._socket[name].name, error:err, msg:'socket/consumer closed with errors, but removed'} - } - this.emit('log', {level:'warn', msg:`socket ${name} has been removed`, socket:this._socket[name].opts}) + if (err) + if (err.code !== 'ERR_SERVER_NOT_RUNNING') { + closeError = { + socket: this._socket[name].name, + error: err, + msg: 'socket/consumer closed with errors, but removed', + } + } + this.emit('log', { + level: 'warn', + msg: `socket ${name} has been removed`, + socket: this._socket[name].opts, + }) socket.removeAllListeners() - this.ready.removeObserver(socket.type==='c' ? this._socket[name].obsName : `${name}:socket`) + this.ready.removeObserver( + socket.type === 'c' + ? this._socket[name].obsName + : `${name}:socket` + ) delete this._socket[name] return closeError ? closeError : 'success' } @@ -331,29 +467,37 @@ class Base extends EventEmitter { else return this._socket } // returns array of names of sockets that pass filter - getSocketsFilter({type, trans, active}={}) { + getSocketsFilter({ type, trans, active } = {}) { if (trans) trans = this._validateTransport(trans) let filtered = [] - Object.keys(this._socket).forEach(name => { - if ((type==null || this._socket[name].type === type) - && (trans==null || this._socket[name].transport === trans) - && (active==null || this._socket[name].active===active)) filtered.push(name) + Object.keys(this._socket).forEach((name) => { + if ( + (type == null || this._socket[name].type === type) && + (trans == null || this._socket[name].transport === trans) && + (active == null || this._socket[name].active === active) + ) + filtered.push(name) }) return filtered } - getConsumers(filter={}) { - filter.type='c' + getConsumers(filter = {}) { + filter.type = 'c' return this.getSocketsFilter(filter) } getSocketInit(name) { let socket = this._socket[name] - if(!socket) { - log.warn({msg:`can't fetch create/connect function, no socket registered by name of ${name}`}) + if (!socket) { + log.warn({ + msg: `can't fetch create/connect function, no socket registered by name of ${name}`, + }) return null } - if (this._socket[name].type === 's' && this._socket[name].transport !== 'm') { + if ( + this._socket[name].type === 's' && + this._socket[name].transport !== 'm' + ) { return socket.create } else { return socket.connect @@ -378,48 +522,67 @@ class Base extends EventEmitter { if (!con) return `no consumer ${name} for sending` names = [name] } - if (!packet || !Object.keys(packet).length) return 'no packet to send - aborted' + if (!packet || !Object.keys(packet).length) + return 'no packet to send - aborted' let sends = [] if (!names.length) return 'no consumers available for send, aborting' for (let name of names) { - const consumer=this.getSocket(name) + const consumer = this.getSocket(name) let hookedPacket = {} - hookedPacket = consumer.beforeSend ? await consumer.beforeSend.call(this,Object.assign({},packet)) : packet - log.debug({msg:'after hook, sending packet', name:consumer.name, packet:hookedPacket, method:'send'}) - sends.push(consumer.send.bind(consumer,hookedPacket)) + hookedPacket = consumer.beforeSend + ? await consumer.beforeSend.call(this, Object.assign({}, packet)) + : packet + log.debug({ + msg: 'after hook, sending packet', + name: consumer.name, + packet: hookedPacket, + method: 'send', + }) + sends.push(consumer.send.bind(consumer, hookedPacket)) } if (sends.length === 1) return await sends[0]() return Promise.all( - sends.map(send => { + sends.map((send) => { return send() }) ) } - async push(packet,opts={}) { - if (!packet || !Object.keys(packet).length) return Promise.resolve('no packet to push - aborted') - let sockets = this.getSocketsFilter({type:'s'}) - if (!sockets.length) return Promise.resolve('no sockets on which to push') - opts.sockets = opts.sockets ? opts.sockets : (opts.socket ? [opts.socket] : []) - if (opts.sockets.length) sockets = sockets.filter(name=>opts.sockets.includes(name)) + async push(packet, opts = {}) { + if (!packet || !Object.keys(packet).length) + return Promise.resolve('no packet to push - aborted') + let sockets = this.getSocketsFilter({ type: 's' }) + if (!sockets.length) return Promise.resolve('no sockets on which to push') + opts.sockets = opts.sockets + ? opts.sockets + : opts.socket + ? [opts.socket] + : [] + if (opts.sockets.length) + sockets = sockets.filter((name) => opts.sockets.includes(name)) sockets = sockets - .map(name=>this.getSocket(name)) - .filter(sock=> (opts.transport && opts.transport !=='all') ? sock.transport=== this._validateTransport(opts.transport) : true) + .map((name) => this.getSocket(name)) + .filter((sock) => + opts.transport && opts.transport !== 'all' + ? sock.transport === this._validateTransport(opts.transport) + : true + ) // console.log(sockets.length, 'sockets for push', sockets.map(socket=>socket.name)) if (!sockets.length) return Promise.resolve('no sockets on which to push') let broadcast = [] // TODO use map and reflect for (let socket of sockets) { let hookedPacket = {} - hookedPacket = socket.beforeSend ? await socket.beforeSend.call(this,Object.assign({},packet),true) : packet - broadcast.push(socket.push.bind(socket,hookedPacket,opts)) + hookedPacket = socket.beforeSend + ? await socket.beforeSend.call(this, Object.assign({}, packet), true) + : packet + broadcast.push(socket.push.bind(socket, hookedPacket, opts)) } return Promise.all( - broadcast.map(push => { + broadcast.map((push) => { return push() }) ) - } // TODO accept alt transport string i.e. t or TCP async sendTransport(packet, transport) { @@ -433,7 +596,7 @@ class Base extends EventEmitter { } if (sends.length === 1) return sends[0](packet) return Promise.all( - sends.map(send => { + sends.map((send) => { return send(packet) }) ) @@ -451,15 +614,13 @@ class Base extends EventEmitter { // TODO add sendMQTT, sendWS - - socketsListen(event,fn) { - this._eventListen('s',event,fn) + socketsListen(event, fn) { + this._eventListen('s', event, fn) } - consumersListen(event,fn) { - this._eventListen('c',event,fn) + consumersListen(event, fn) { + this._eventListen('c', event, fn) } - getPacketByName(name, packets) { if (!packets.length) packets = [packets] let found = {} @@ -474,9 +635,10 @@ class Base extends EventEmitter { // add set of functions to class prop/space and then register with this addNamespace(space, type, trans) { - if (type !=='c' && type !=='s') { + if (type !== 'c' && type !== 's') { trans = type - type = 's' } + type = 's' + } trans = this._validateTransport(trans) if (trans) return this._namespaces[type + trans].unshift(space) else return this._namespaces[type].unshift(space) @@ -485,35 +647,40 @@ class Base extends EventEmitter { // object of functions, key is cmd name amendCommands(funcs, trans, type) { if (!trans && !type) type = 's' - if (trans ==='c' || trans ==='s') { + if (trans === 'c' || trans === 's') { type = trans trans = '' } trans = this._validateTransport(trans) - if (!this['_'+type+trans]) this['_'+type+trans] = {} - Object.assign(this['_'+type+trans], funcs) // trans is type here - log.debug({msg:'amended namespace', id:this.id, default_key:'_'+type+trans, functions:this['_'+type+trans]}) + if (!this['_' + type + trans]) this['_' + type + trans] = {} + Object.assign(this['_' + type + trans], funcs) // trans is type here + log.debug({ + msg: 'amended namespace', + id: this.id, + default_key: '_' + type + trans, + functions: this['_' + type + trans], + }) } amendConsumerCommands(funcs, trans) { - this.amendCommands(funcs,trans,'c') + this.amendCommands(funcs, trans, 'c') } amendSocketCommands(funcs, trans) { - this.amendCommands(funcs,trans) + this.amendCommands(funcs, trans) } // func should take and return a packet. if type - beforeSendHook(func,opts) { - this._packetHook('beforeSend', func,opts) + beforeSendHook(func, opts) { + this._packetHook('beforeSend', func, opts) } - beforeProcessHook(func,opts) { - this._packetHook('beforeProcess', func,opts) + beforeProcessHook(func, opts) { + this._packetHook('beforeProcess', func, opts) } - afterProcessHook(func,opts) { - this._packetHook('afterProcess', func,opts) + afterProcessHook(func, opts) { + this._packetHook('afterProcess', func, opts) } // A Big Hammer - use only if necessary - default with hooks should suffice @@ -544,98 +711,150 @@ class Base extends EventEmitter { } // a call to this method will (make or add) return and or subscribe a ready observer for incoming consumer connections - consumerConnected (socket,opts={}) { - let { subscribe, consumer, name, add} = opts + consumerConnected(socket, opts = {}) { + let { subscribe, consumer, name, add } = opts - const conditionHandler = async ev => { - if ((ev||{}).state ==='connected'){ - let data = (ev.data ||{}) - if (consumer) { // specific consumer check - if (data.name === consumer || [ev.name, ev.id, data.name, data.id].some(name => (name||'').includes(consumer)) ) return true + const conditionHandler = async (ev) => { + if ((ev || {}).state === 'connected') { + let data = ev.data || {} + if (consumer) { + // specific consumer check + if ( + data.name === consumer || + [ev.name, ev.id, data.name, data.id].some((name) => + (name || '').includes(consumer) + ) + ) + return true } else return true } return false } - if (typeof socket ==='string') socket = this.getSocket(socket) + if (typeof socket === 'string') socket = this.getSocket(socket) name = name || consumer add = add && consumer - const options = {event:'connection:consumer',condition:conditionHandler} + const options = { + event: 'connection:consumer', + condition: conditionHandler, + } let oname = `${name}:consumer>${socket.name}:socket` - const obs = add ? this.ready.addObserver(oname,socket,options) : this.ready.makeObserver(socket,options) - if (typeof subscribe ==='function') return obs.subscribe(subscribe) + const obs = add + ? this.ready.addObserver(oname, socket, options) + : this.ready.makeObserver(socket, options) + if (typeof subscribe === 'function') return obs.subscribe(subscribe) return obs } // end consumerConnected //=============PRIVATE METHODS ========================================= /* - * - * Assigns a Hook Function to a Socket, Type or Transport - * - */ + * + * Assigns a Hook Function to a Socket, Type or Transport + * + */ // options allow applying hook function to specific socket or type or transport, default is all type 's' sockets - _packetHook(hook,func,opts) { - log.debug({msg:'hooking a socket(s)', method:'_packetHook', line:334, hook:hook, function:func, options:opts}) - let {name,type,trans,all} = opts - if (opts==null) type = 's' // default is all type 's' sockets + _packetHook(hook, func, opts) { + log.debug({ + msg: 'hooking a socket(s)', + method: '_packetHook', + line: 334, + hook: hook, + function: func, + options: opts, + }) + let { name, type, trans, all } = opts + if (opts == null) type = 's' // default is all type 's' sockets if (name) this._socket[name][hook] = func else { - log.debug({msg:'sockets available to hook', method:'_packetHook', line:338, sockets: Object.keys(this._socket)}) + log.debug({ + msg: 'sockets available to hook', + method: '_packetHook', + line: 338, + sockets: Object.keys(this._socket), + }) for (let name of Object.keys(this._socket)) { if (this._socket[name].type === type) this._socket[name][hook] = func - if (this._socket[name].transport === trans) this._socket[name][hook] = func + if (this._socket[name].transport === trans) + this._socket[name][hook] = func if (all) this._socket[name][hook] = func - if (this._socket[name][hook]) log.debug({msg:'hooked socket', method:'_packetHook', line:343, name:name, type:this._socket[name].type, trans:this._socket[name].transport}) + if (this._socket[name][hook]) + log.debug({ + msg: 'hooked socket', + method: '_packetHook', + line: 343, + name: name, + type: this._socket[name].type, + trans: this._socket[name].transport, + }) } } } - /* - ********** main packet processor for all sockets - * supports per socket before and after hook processors - * supports additonal registered processors called via packet or socket name, with default processor, - */ + ********** main packet processor for all sockets + * supports per socket before and after hook processors + * supports additonal registered processors called via packet or socket name, with default processor, + */ async _packetProcess(socket_name, packet) { - if (!packet || !Object.keys(packet).length) packet = {error:'no packet to process'} - if (!socket_name || !this.getSocket(socket_name)) packet.error = 'no socket name passed for packet processing' - if (!this.getSocket(socket_name)) packet.error = `socket by name of ${socket_name}` + if (!packet || !Object.keys(packet).length) + packet = { error: 'no packet to process' } + if (!socket_name || !this.getSocket(socket_name)) + packet.error = 'no socket name passed for packet processing' + if (!this.getSocket(socket_name)) + packet.error = `socket by name of ${socket_name}` if (packet.error) { - this.emit('log', {level:'error', error:packet.error, packet:packet, msg:'an error occured before processing an incoming packet'}) - return packet // don't process a packet with an error + this.emit('log', { + level: 'error', + error: packet.error, + packet: packet, + msg: 'an error occured before processing an incoming packet', + }) + return packet // don't process a packet with an error } // TODO allow adding to or altering the process map let processors = new Map([ - ['before', this.getSocket(socket_name).beforeProcess ], - ['command', this._cmdProcessors[packet.cmdProcessor || this._cmdProcessors[socket_name] ? socket_name : '_default'] ], - ['after', this.getSocket(socket_name).afterProcess ], + ['before', this.getSocket(socket_name).beforeProcess], + [ + 'command', + this._cmdProcessors[ + packet.cmdProcessor || this._cmdProcessors[socket_name] + ? socket_name + : '_default' + ], + ], + ['after', this.getSocket(socket_name).afterProcess], ]) let err - for (let [name,func] of processors) { // the same as of recipeMap.entries() - [err,packet] = await to(this._process(socket_name,packet,name,func)) + for (let [name, func] of processors) { + // the same as of recipeMap.entries() + [err, packet] = await to(this._process(socket_name, packet, name, func)) if (err) packet.error = err } return packet } - async _process(socket_name,packet,name,func) { - if (packet.error) return packet // if an error occurs skip any further processing + async _process(socket_name, packet, name, func) { + if (packet.error) return packet // if an error occurs skip any further processing let err, res if (func) { - [err,res] = await to(func.call(this,packet,socket_name)) - if (err) { // forced an abort to processing + [err, res] = await to(func.call(this, packet, socket_name)) + if (err) { + // forced an abort to processing packet.error = err } else { - if (!isPlainObject(res)) packet.processResult ? packet.processResult[name]=res : packet.processResult = {[name]:res} + if (!isPlainObject(res)) + packet.processResult + ? (packet.processResult[name] = res) + : (packet.processResult = { [name]: res }) else { - let method = (packet.processMethod || {})[name] || packet.processMethod + let method = + (packet.processMethod || {})[name] || packet.processMethod // TODO could support other specialized merge methods if (method === 'merge') { - packet = merge(packet,res) - } - else { - packet=res + packet = merge(packet, res) + } else { + packet = res } } } @@ -645,37 +864,47 @@ class Base extends EventEmitter { // all sockets are emitters. Adds a listener to all sockets of a type with given event. // now sockets can emit locally processed events - _eventListen(type,event,fn) { + _eventListen(type, event, fn) { for (let name of Object.keys(this._socket)) { if (this._socket[name].type === type) { - if (fn==='stop') this._socket[name].removeAllListeners(event) + if (fn === 'stop') this._socket[name].removeAllListeners(event) else { - log.debug({socket:name, type:type, event:event, msg:'adding listener to socket'}) + log.debug({ + socket: name, + type: type, + event: event, + msg: 'adding listener to socket', + }) this._socket[name].on(event, fn) } } } } - _validateTransport(trans, type='s') { + _validateTransport(trans, type = 's') { const valids = { - w:'w', - web:'w', - n:'n', - named:'n', - unix:'n', - pipe:'n', - t:'t', - tcp:'t', - net:'t', - network:'t', - m:'m', - mqtt:'m', + w: 'w', + web: 'w', + n: 'n', + named: 'n', + unix: 'n', + pipe: 'n', + t: 't', + tcp: 't', + net: 't', + network: 't', + m: 'm', + mqtt: 'm', } - trans = valids[trans] || '' - if (type !== 's' && trans ==='w') { - log.warn({type: type, transport: trans, msg:'Invalid type/transport - Consumer/Client Web Socket not supported use TCP'}) - trans = '' + trans = valids[trans] || null + if (type !== 's' && trans === 'w') { + log.warn({ + type: type, + transport: trans, + msg: + 'Invalid type/transport - Consumer/Client Web Socket not supported use TCP', + }) + trans = null } return trans } @@ -693,7 +922,7 @@ class Base extends EventEmitter { _getCmdFuncNamespace(cmd, namespaces) { let cmd_func = null - namespaces.some(namespace => { + namespaces.some((namespace) => { namespace = namespace ? namespace + '.' + cmd : cmd cmd_func = this._getCmdFunc(namespace) if (cmd_func) return true @@ -710,7 +939,14 @@ class Base extends EventEmitter { var prop = cmd.shift() if (cmd.length === 0) return obj[prop] if (!obj[prop]) return null - log.debug({length:cmd.length,cmd:cmd, prop:prop, objprop:obj[prop], method:'_getCmdFunc', msg:'command to corresponding function in a hash'}) + log.debug({ + length: cmd.length, + cmd: cmd, + prop: prop, + objprop: obj[prop], + method: '_getCmdFunc', + msg: 'command to corresponding function in a hash', + }) return this._getCmdFunc(cmd, obj[prop]) } @@ -720,16 +956,15 @@ class Base extends EventEmitter { packet.cmd, this._namespaces[this._type(socket) + this._transport(socket)] ) - if (cmd_func) return await cmd_func.call(this,packet) // todo try .call + if (cmd_func) return await cmd_func.call(this, packet) // todo try .call cmd_func = this._getCmdFuncNamespace( packet.cmd, this._namespaces[this._type(socket)] ) - if (cmd_func) return await cmd_func.call(this,packet) + if (cmd_func) return await cmd_func.call(this, packet) return 'failed' } - } // end Base Class export default Base -export { Base, map, changed, isPlainObject, to, merge, loadYaml } // todo share rxjs +export { Base, Ready, map, changed, isPlainObject, to, merge, loadYaml } // todo share rxjs diff --git a/src/processing.js b/src/processing.js index 5c223af..b24d266 100644 --- a/src/processing.js +++ b/src/processing.js @@ -1,6 +1,6 @@ import to from 'await-to-js' import logger from '@uci-utils/logger' -let log = logger({ package: 'base',file:'processing.js'}) +let log = logger({ package: 'base', file: 'processing.js' }) // this._processing refers to this module/hash @@ -9,41 +9,63 @@ let log = logger({ package: 'base',file:'processing.js'}) // messaging errors on socket will not be fatal to the entire socket server // common processor, will call based on type s or c the ones below -const cmdProcessor = async function (packet,socket) { - let [err,res] = await to(_process[this.getSocket(socket).type].call(this,packet,socket)) +const cmdProcessor = async function (packet, socket) { + let [err, res] = await to( + _process[this.getSocket(socket).type].call(this, packet, socket) + ) if (err) { - let error = {cmd:'error', error:err, packet:packet, socket:socket, function:'processor', line: 15, msg:`'unhandled error in packet command function ${packet.cmd}`} + let error = { + cmd: 'error', + error: err, + packet: packet, + socket: socket, + function: 'processor', + line: 15, + msg: `'unhandled error in packet command function ${packet.cmd}`, + } log.error(error) - res = Object.assign({},packet,error) - if (process.env.UCI_PUSH_UNHANDLED==='true') this.push(res) - if (process.env.UCI_SHOW_UNHANDLED==='true') console.log(error) + res = Object.assign({}, packet, error) + if (process.env.UCI_PUSH_UNHANDLED === 'true') this.push(res) + if (process.env.UCI_SHOW_UNHANDLED === 'true') console.log(error) } return res } // default processors for socket/server and consumer/client const _process = { - s: async function (packet,socket) { - if (!packet.cmd) return {error: 'no command (cmd:) in packet for socket', packet: packet } + s: async function (packet, socket) { + if (!packet.cmd) + return { + error: 'no command (cmd:) in packet for socket', + packet: packet, + } // this call will search the namespace and envoke a function and return a repsonse packet - let response = await this._callCmdFunc(packet,socket); if(response!=='failed') return response - return {error: 'no socket processing function supplied for command', packet: packet } + let response = await this._callCmdFunc(packet, socket) + if (response !== 'failed') return response + return { + error: 'no socket processing function supplied for command', + packet: packet, + } }, - c: async function (packet,socket) { + c: async function (packet, socket) { // the the end of life for a consumer packet that has been sent and returned or a packet that was pushed. - if (packet.error) packet.cmd='error' - if (!packet.cmd) packet.cmd ='reply' - let response = await this._callCmdFunc(packet,socket); if(response!=='failed') return response - packet = {error:`no consumer return processing function supplied for ${packet.cmd}`, packet:packet} + if (packet.error) packet.cmd = 'error' + if (!packet.cmd) packet.cmd = 'reply' + let response = await this._callCmdFunc(packet, socket) + if (response !== 'failed') return response + packet = { + error: `no consumer return processing function supplied for ${packet.cmd}`, + packet: packet, + } this._c.error(packet) - } + }, } // default name spaces const namespaces = { - s: ['_s'], // default command functions below - c: ['_c'], // default command functions below + s: ['_s'], // default command functions below + c: ['_c'], // default command functions below cn: ['_cn'], ct: ['_ct'], cm: ['_cm'], @@ -55,13 +77,13 @@ const namespaces = { } /* -* -* Default packet command processing functions -* -*/ + * + * Default packet command processing functions + * + */ -const defaultCmds ={ - s:{ +const defaultCmds = { + s: { echo: function (packet) { packet.processed = true packet.msg = 'default socket echo' @@ -71,36 +93,81 @@ const defaultCmds ={ ack: async function (packet) { packet.cmd = 'reply' packet.ack = true - packet.msg = 'this is the base default ack, superceed in your instance or extended class' + packet.msg = + 'this is the base default ack, superceed in your instance or extended class' return packet }, ready: async function (packet) { - const event = packet.event || packet.name || packet.id - delete(packet._header) - this.emit(event,packet.ready,packet) - this.emit('log', {level:'ready', msg:'change in ready state received via send', ready:packet.ready, packet:packet}) - setTimeout(()=>this.emit('log', {level:'state', msg:'new ready state', state:this.ready.state}),1000) - return {cmd:'reply', msg:'consumer sent event was emitted event at socket process', event:event} - } + const event = packet.event || packet.name || packet.id + delete packet._header + this.emit(event, packet.ready, packet) + this.emit('ready', packet, packet) + this.emit('log', { + level: 'ready', + msg: 'change in ready state received via send', + ready: packet.ready, + packet: packet, + }) + setTimeout( + () => + this.emit('log', { + level: 'state', + msg: 'new ready state', + state: this.ready.state, + }), + 1000 + ) + return { + cmd: 'reply', + msg: 'consumer sent event was emitted event at socket process', + event: event, + } + }, }, - c:{ - error: function (packet) { // default - log.error({error:packet.error, packet:packet, msg:'==========Consumer Sent Packet returned with ERROR ========='}) + c: { + error: function (packet) { + // default + log.error({ + error: packet.error, + packet: packet, + msg: '==========Consumer Sent Packet returned with ERROR =========', + }) return packet }, - reply: function(packet) { - if (process.env.UCI_ENV==='dev') log.debug({packet:packet, msg:'====Packet returned from socket - default reply logger==='}) + reply: function (packet) { + if (process.env.UCI_ENV === 'dev') + log.debug({ + packet: packet, + msg: '====Packet returned from socket - default reply logger===', + }) return packet }, ready: async function (packet) { - const event = packet.event || packet.name || packet.id - delete(packet._header) - this.emit(event,packet.ready,packet) - this.emit('log', {level:'ready', msg:'change in ready state received via push', ready:packet.ready, packet:packet}) - setTimeout(()=>this.emit('log', {level:'state', msg:'new ready state', state:this.ready.state}),1000) - return {cmd:'reply', msg:'ready packet event was emitted at consumer process from push'} - } - } + const event = packet.event || packet.name || packet.id + delete packet._header + this.emit(event, packet.ready, packet) + this.emit('ready', packet) + this.emit('log', { + level: 'ready', + msg: 'change in ready state received via push', + ready: packet.ready, + packet: packet, + }) + setTimeout( + () => + this.emit('log', { + level: 'state', + msg: 'new ready state', + state: this.ready.state, + }), + 1000 + ) + return { + cmd: 'reply', + msg: 'ready packet event was emitted at consumer process from push', + } + }, + }, } export { cmdProcessor, defaultCmds, namespaces }