From a47144606a21ce1d3e9cb97fae3d53255aaaeaa3 Mon Sep 17 00:00:00 2001 From: David Kebler Date: Mon, 6 Jan 2020 23:39:29 -0800 Subject: [PATCH] 0.1.37 base: add ready observer class instance adds a ready observer for each consumer (only) registered refactored default namespace processing a bit refactored _packetProcess the default/main processor now can register before,command, and after processors uses a single _process function to remove near reptition does a better job of maintaining/merging the packet from step to step processing: change processor to cmdProcessor refactor default consumer processing --- package.json | 7 ++- src/base.js | 142 +++++++++++++++++++++++----------------------- src/processing.js | 20 +++---- 3 files changed, 83 insertions(+), 86 deletions(-) diff --git a/package.json b/package.json index 50db733..e28d5f9 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@uci/base", - "version": "0.1.32", + "version": "0.1.37", "description": "Multi type and transport JSON packet communication base class. Used in UCI extended classes", "main": "src/base", "scripts": { @@ -39,11 +39,12 @@ "dependencies": { "@uci-utils/bind-funcs": "^0.2.4", "@uci-utils/logger": "^0.0.16", + "@uci-utils/ready": "^0.1.3", "@uci/mqtt": "^0.1.13", "@uci/socket": "^0.2.26", "@uci/websocket": "^0.3.10", "await-to-js": "^2.1.1", - "p-reflect": "^2.1.0", - "p-settle": "^3.1.0" + "is-plain-object": "^3.0.0", + "merge-anything": "^2.4.4" } } diff --git a/src/base.js b/src/base.js index 78f1287..15a476c 100644 --- a/src/base.js +++ b/src/base.js @@ -1,27 +1,28 @@ -// Direct External Dependencies -// none +// TODO add automated duplex (for consumer will create corresponding socket and send connection info to remote socket) -// UCI dependencies - -// UCI communication transport communication modules -// TODO change to dynamic import so loads only if that socket type is requestd +// --------------- UCI dependencies ------------------- +// UCI communication transport communication modules +// TODO change to dynamic import so loads only if that socket type is requestd import Socket from '@uci/socket' // tcp or named pipe import MQTT from '@uci/mqtt' // requires broker import WebSocket from '@uci/websocket' // server only - client is for web browser only -// UCI helpers +// UCI helpers +import { Ready, changed, map } from '@uci-utils/ready' import { bindFuncs } from '@uci-utils/bind-funcs' -// UCI logger +// UCI logger import logger from '@uci-utils/logger' let log = {} // declare module wide log to be set during construction +// ------------------------------------------------- // Community dependencies import to from 'await-to-js' +import isPlainObject from 'is-plain-object' +import merge from 'merge-anything' +// Nodejs dependencies import EventEmitter from 'events' -// import pSettle from 'p-settle' -// import pReflect from 'p-reflect' // Internal dependencies -import { processor, defaultCmds, namespaces } from './processing' +import { cmdProcessor, defaultCmds, namespaces } from './processing' // Constants const SOCKET_INFO_KEYS = ['name', 'type', 'transport'] @@ -69,11 +70,11 @@ class Base extends EventEmitter { this.initTimeout = opts.initTimeout this.retryWait = opts.retryWait this.defaultReturnCmd = opts.defaultReturnCmd - this._processors = { _default: processor } + this._cmdProcessors = { _default: cmdProcessor } // _c and _s are the default namespaces - this._namespaces = namespaces - this._c = defaultCmds.c - this._s = defaultCmds.s + this._namespaces =Object.assign({},namespaces) + this._c = Object.assign({},defaultCmds.c) + this._s = Object.assign({},defaultCmds.s) if (opts.useRootNS) { // add root of instance to checking for command functions - not recommended! this._namespaces.s.splice(-1, 0, null) @@ -90,7 +91,7 @@ class Base extends EventEmitter { socketStr.split(/[>#]+/).map(function(prop, index) { socket[SOCKET_INFO_KEYS[index]] = prop }) - this.addSocket( + this.registerSocket( socket.name, socket.type, socket.transport, @@ -98,6 +99,9 @@ class Base extends EventEmitter { ) }) } + + this.ready = new Ready({emitter: this, verbose:process.env.UCI_READY_VERBOSE}) + } // end constructor /* @@ -117,16 +121,6 @@ class Base extends EventEmitter { * */ - get ready() { - // TODO checks that all sockets are active - let ready = true - for (let name in this._socket) { - // console.log(name, this._socket[name].active) - ready = ready && this._socket[name].active - } - return ready - } - async init(sockets) { // Object.getPrototypeOf(Object.getPrototypeOf(this).init.call(this,sockets)) return this.socketsInit(sockets) @@ -142,13 +136,11 @@ class Base extends EventEmitter { const initialize = async socket => { return new Promise(async function(resolve) { try { - // console.log('initialize socket',socket) const value = await socket.init() this.emit('status',{level:'info', socketName:socket.name, msg:'socket successfully initialized', message:value}) results[socket.name] = value resolve(value) } catch (error) { - // console.log('catching error', error) this.emit('status',{level:'fatal', socketName:socket.name, msg:'socket init error',error:error})// emit an error here, remove socket // let res = await this.removeSocket(socket.name) errors[socket.name]={error:error} @@ -160,19 +152,15 @@ class Base extends EventEmitter { let inits = [] if (!sockets) { sockets = Object.keys(this._socket).filter(name => { - // console.log(this._socket[name].active) return !this._socket[name].active // only intialize (connect) inactive sockets }) - // console.log('inactive sockets', sockets) } if (typeof sockets ==='string') sockets = [sockets] - // console.log('sockets to initialize',sockets) sockets.forEach(name => { if (this._socket[name]) { inits.push({name:name, init:this.getSocketInit(name)}) } else log.warn({msg:`no socket registered by name of ${name} to initialize`}) }) - // console.log('starting promise',results,errors) let [err] = await to(Promise.all(inits.map(initialize))) if (err) { this.emit('status',{level:'fatal', msg:'initialize of socket errors was NOT caught --- bad bad',error:err}) @@ -231,7 +219,7 @@ class Base extends EventEmitter { this._socket[name].transport = transport this._socket[name]._packetProcess = this._packetProcess.bind(this, name) - // bubble up events from sockets to instance + // bubble up events from inidivual sockets to base instance const EVENTS=['log','connection','connection:consumer', 'connection:socket'] // that should emit up from each socket to instance EVENTS.forEach(event => { this._socket[name].on(event, obj => { @@ -246,15 +234,15 @@ class Base extends EventEmitter { }) if (type==='c') { + + this.ready.addObserver(name,this.getSocket(name),{event:'connection:socket',condition:ev=>{return ev.state==='connected'}}) + this._socket[name].on('pushed', packet => { packet._header.socketName=name this.emit('pushed', packet) }) } - // if (this._started) return this.getSocketInit(name) // returns the init function (e.g. connect or create) for the socket - // else return `socket ${name} added and ready to initialize ` - } } @@ -287,7 +275,6 @@ class Base extends EventEmitter { if (trans) trans = this._validateTransport(trans) let filtered = [] Object.keys(this._socket).forEach(name => { - // console.log(name, type,this._socket[name].type, trans, this._socket[name].transport) if ((type==null || this._socket[name].type === type) && (trans==null || this._socket[name].transport === trans) && (active==null || this._socket[name].active===active)) filtered.push(name) @@ -359,15 +346,11 @@ class Base extends EventEmitter { else { let trans = null if (typeof sockets === 'string') trans = sockets - // console.log('push transport', trans) sockets = Object.values(this._socket).filter(socket=>socket.type === 's') - // console.log('all server sockets',sockets) if (trans && trans !=='all') { sockets = sockets.filter(socket=>socket.transport === this._validateTransport(trans)) - // console.log('transport filtered server sockets',sockets) } } let broadcast = [] - // console.log('===before push', sockets) for (let socket of sockets) { let hookedPacket = {} hookedPacket = socket.beforeSend ? await socket.beforeSend.call(this,Object.assign({},packet),true) : packet @@ -444,8 +427,7 @@ class Base extends EventEmitter { else return this._namespaces[type].unshift(space) } - // TODO confirm Object.assign will be ok as it is not a deep copy - // allow a single or arrary of single functions + // object of functions, key is cmd name amendCommands(funcs, trans, type) { if (!trans && !type) type = 's' if (trans ==='c' || trans ==='s') { @@ -455,7 +437,7 @@ class Base extends EventEmitter { trans = this._validateTransport(trans) if (!this['_'+type+trans]) this['_'+type+trans] = {} Object.assign(this['_'+type+trans], funcs) // trans is type here - log.debug({msg:'amended namespace', default_key:'_'+type+trans, functions:this['_'+type+trans]}) + log.debug({msg:'amended namespace', id:this.id, default_key:'_'+type+trans, functions:this['_'+type+trans]}) } amendConsumerCommands(funcs, trans) { @@ -503,20 +485,20 @@ class Base extends EventEmitter { // if no socket name given it will replace the default processor in _processors from processing.js altProcessor(func, socket_name) { socket_name = socket_name || '_default' - this._processors[socket_name] = func + this._cmdProcessors[socket_name] = func } - //====================================================== - + //=============PRIVATE METHODS ========================================= /* * - * Private Methods + * Assigns a Hook Function to a Socket, Type or Transport * */ - + // options allow applying hook function to specific socket or type or transport, default is all type 's' sockets _packetHook(hook,func,opts) { log.debug({msg:'hooking a socket(s)', method:'_packetHook', line:334, hook:hook, function:func, options:opts}) let {name,type,trans,all} = opts + if (opts==null) type = 's' // default is all type 's' sockets if (name) this._socket[name][hook] = func else { log.debug({msg:'sockets available to hook', method:'_packetHook', line:338, sockets: Object.keys(this._socket)}) @@ -531,32 +513,47 @@ class Base extends EventEmitter { /* - **********default packet processor for all sockets - * this can be hooked or replaced all together + ********** main packet processor for all sockets + * supports per socket before and after hook processors + * supports additonal registered processors called via packet or socket name, with default processor, */ - async _packetProcess(socket_name, packet) { - log.debug({ socket:socket_name, packet:packet, method:'_packetProcess', line:393, msg:'processing incoming packet'}) - let header = packet._header ? packet._header : {} // retain header - let err, res - if (this._socket[socket_name].beforeProcess) { - [err,res] = await to(this._socket[socket_name].beforeProcess.call(this,packet)) - if (err) { // hook has forced an abort to processing - packet.error = err - return packet - } - packet = res + if (packet.error) return packet // don't process a packet with an error + // TODO allow adding to or altering the process map + let processors = new Map([ + ['before', this.getSocket(socket_name).beforeProcess ], + ['command', this._cmdProcessors[packet.cmdProcessor || this._cmdProcessors[socket_name] ? socket_name : '_default'] ], + ['after', this.getSocket(socket_name).afterProcess ], + ]) + + let err + for (let [name,func] of processors) { // the same as of recipeMap.entries() + [err,packet] = await to(this._process(socket_name,packet,name,func)) + if (err) packet.error = err } - // the processor can be set via the incoming packet - // otherwise if one is set on the socket or the default found in processing.js - // TODO Try each "available" packet processor in some order if fails try next one before trying the default - let processor = packet._processor || this._processors[socket_name] ? socket_name : '_default' - res = (await this._processors[processor].call(this,packet,socket_name))|| packet // processor didn't return a packet then return the packet sent - log.debug({ socket:socket_name, response:res, msg:'processed packet ready for hook'}) - if (this._socket[socket_name].afterProcess) res = await this._socket[socket_name].afterProcess.call(this,res) - log.debug({ socket:socket_name, response:res, msg:'packet after hook complete ready for return'}) - res._header = Object.assign(header,res._header) // re-apply header in case hooks or processor mangled or removed it - return res + return packet + } + + async _process(socket_name,packet,name,func) { + if (packet.error) return packet // if an error occurs skip any further processing + let err, res + if (func) { + [err,res] = await to(func.call(this,packet,socket_name)) + if (err) { // forced an abort to processing + packet.error = err + } else { + if (!isPlainObject(res)) packet.processResult ? packet.processResult[name]=res : packet.processResult = {[name]:res} + else { + let method = (packet.processMethod || {})[name] || packet.processMethod + if (method === 'merge') { + packet = merge(packet,res) + } + else Object.assign(packet,res) + } + } + } + this.emit('log', {level:'trace', msg:`processed packet stage:${name}`,socketName:socket_name,packet:packet}) + return packet } // all sockets are emitters. Adds a listener to all sockets of a type with given event. @@ -649,3 +646,4 @@ class Base extends EventEmitter { } // end Base Class export default Base +export { Base, map, changed, isPlainObject, to, merge } // todo share rxjs diff --git a/src/processing.js b/src/processing.js index 63e976f..e08b2c0 100644 --- a/src/processing.js +++ b/src/processing.js @@ -9,7 +9,7 @@ let log = logger({ package: 'base',file:'processing.js'}) // messaging errors on socket will not be fatal to the entire socket server // common processor, will call based on type s or c the ones below -const processor = async function (packet,socket) { +const cmdProcessor = async function (packet,socket) { let [err,res] = await to(_process[this.getSocket(socket).type].bind(this)(packet,socket)) if (err) { let error = {cmd:'error', error:err, packet:packet, socket:socket, function:'processor', line: 15, msg:`'unhandled error in packet command function ${packet.cmd}`} @@ -21,7 +21,7 @@ const processor = async function (packet,socket) { return res } -export { processor, defaultCmds, namespaces } +export { cmdProcessor, defaultCmds, namespaces } // default processors for socket/server and consumer/client const _process = { @@ -35,14 +35,10 @@ const _process = { c: async function (packet,socket) { // the the end of life for a consumer packet that has been sent and returned or a packet that was pushed. if (packet.error) packet.cmd='error' - if (packet.cmd) { - let response = await this._callCmdFunc(packet,socket); if(response!=='failed') return response - packet = {error:'no consumer processing function supplied in for command in returned packet',packet:packet} - this._c.error(packet) - } else { - packet = {error:'[consumer] no command in returned packet',packet:packet} - return await this._c.error(packet) - } + if (!packet.cmd) packet.cmd ='reply' + let response = await this._callCmdFunc(packet,socket); if(response!=='failed') return response + packet = {error:`no consumer return processing function supplied for ${packet.cmd}`, packet:packet} + this._c.error(packet) } } @@ -84,9 +80,11 @@ const defaultCmds ={ c:{ error: function (packet) { // default log.error({error:packet.error, packet:packet, msg:'==========Consumer Sent Packet returned with ERROR ========='}) + return packet }, reply: function(packet) { - if (process.env.UCI_ENV==='dev') log.info({packet:packet, msg:'====Packet returned from socket - default reply logger==='}) + if (process.env.UCI_ENV==='dev') log.debug({packet:packet, msg:'====Packet returned from socket - default reply logger==='}) + return packet } } }