base:
  add ready observer class instance
  adds a ready observer for each consumer (only) registered
  refactored default namespace processing a bit
  refactored _packetProcess  the default/main processor
     now can register before,command, and after processors
     uses a single _process function to remove near reptition
     does a better job of maintaining/merging the packet from step to step
processing:
  change processor to cmdProcessor
  refactor default consumer processing
This commit is contained in:
David Kebler 2020-01-06 23:39:29 -08:00
parent 741823cce4
commit a47144606a
3 changed files with 83 additions and 86 deletions

View file

@ -1,6 +1,6 @@
{
"name": "@uci/base",
"version": "0.1.32",
"version": "0.1.37",
"description": "Multi type and transport JSON packet communication base class. Used in UCI extended classes",
"main": "src/base",
"scripts": {
@ -39,11 +39,12 @@
"dependencies": {
"@uci-utils/bind-funcs": "^0.2.4",
"@uci-utils/logger": "^0.0.16",
"@uci-utils/ready": "^0.1.3",
"@uci/mqtt": "^0.1.13",
"@uci/socket": "^0.2.26",
"@uci/websocket": "^0.3.10",
"await-to-js": "^2.1.1",
"p-reflect": "^2.1.0",
"p-settle": "^3.1.0"
"is-plain-object": "^3.0.0",
"merge-anything": "^2.4.4"
}
}

View file

@ -1,27 +1,28 @@
// Direct External Dependencies
// none
// TODO add automated duplex (for consumer will create corresponding socket and send connection info to remote socket)
// UCI dependencies
// UCI communication transport communication modules
// TODO change to dynamic import so loads only if that socket type is requestd
// --------------- UCI dependencies -------------------
// UCI communication transport communication modules
// TODO change to dynamic import so loads only if that socket type is requestd
import Socket from '@uci/socket' // tcp or named pipe
import MQTT from '@uci/mqtt' // requires broker
import WebSocket from '@uci/websocket' // server only - client is for web browser only
// UCI helpers
// UCI helpers
import { Ready, changed, map } from '@uci-utils/ready'
import { bindFuncs } from '@uci-utils/bind-funcs'
// UCI logger
// UCI logger
import logger from '@uci-utils/logger'
let log = {} // declare module wide log to be set during construction
// -------------------------------------------------
// Community dependencies
import to from 'await-to-js'
import isPlainObject from 'is-plain-object'
import merge from 'merge-anything'
// Nodejs dependencies
import EventEmitter from 'events'
// import pSettle from 'p-settle'
// import pReflect from 'p-reflect'
// Internal dependencies
import { processor, defaultCmds, namespaces } from './processing'
import { cmdProcessor, defaultCmds, namespaces } from './processing'
// Constants
const SOCKET_INFO_KEYS = ['name', 'type', 'transport']
@ -69,11 +70,11 @@ class Base extends EventEmitter {
this.initTimeout = opts.initTimeout
this.retryWait = opts.retryWait
this.defaultReturnCmd = opts.defaultReturnCmd
this._processors = { _default: processor }
this._cmdProcessors = { _default: cmdProcessor }
// _c and _s are the default namespaces
this._namespaces = namespaces
this._c = defaultCmds.c
this._s = defaultCmds.s
this._namespaces =Object.assign({},namespaces)
this._c = Object.assign({},defaultCmds.c)
this._s = Object.assign({},defaultCmds.s)
if (opts.useRootNS) {
// add root of instance to checking for command functions - not recommended!
this._namespaces.s.splice(-1, 0, null)
@ -90,7 +91,7 @@ class Base extends EventEmitter {
socketStr.split(/[>#]+/).map(function(prop, index) {
socket[SOCKET_INFO_KEYS[index]] = prop
})
this.addSocket(
this.registerSocket(
socket.name,
socket.type,
socket.transport,
@ -98,6 +99,9 @@ class Base extends EventEmitter {
)
})
}
this.ready = new Ready({emitter: this, verbose:process.env.UCI_READY_VERBOSE})
} // end constructor
/*
@ -117,16 +121,6 @@ class Base extends EventEmitter {
*
*/
get ready() {
// TODO checks that all sockets are active
let ready = true
for (let name in this._socket) {
// console.log(name, this._socket[name].active)
ready = ready && this._socket[name].active
}
return ready
}
async init(sockets) {
// Object.getPrototypeOf(Object.getPrototypeOf(this).init.call(this,sockets))
return this.socketsInit(sockets)
@ -142,13 +136,11 @@ class Base extends EventEmitter {
const initialize = async socket => {
return new Promise(async function(resolve) {
try {
// console.log('initialize socket',socket)
const value = await socket.init()
this.emit('status',{level:'info', socketName:socket.name, msg:'socket successfully initialized', message:value})
results[socket.name] = value
resolve(value)
} catch (error) {
// console.log('catching error', error)
this.emit('status',{level:'fatal', socketName:socket.name, msg:'socket init error',error:error})// emit an error here, remove socket
// let res = await this.removeSocket(socket.name)
errors[socket.name]={error:error}
@ -160,19 +152,15 @@ class Base extends EventEmitter {
let inits = []
if (!sockets) { sockets =
Object.keys(this._socket).filter(name => {
// console.log(this._socket[name].active)
return !this._socket[name].active // only intialize (connect) inactive sockets
})
// console.log('inactive sockets', sockets)
}
if (typeof sockets ==='string') sockets = [sockets]
// console.log('sockets to initialize',sockets)
sockets.forEach(name => {
if (this._socket[name]) {
inits.push({name:name, init:this.getSocketInit(name)})
} else log.warn({msg:`no socket registered by name of ${name} to initialize`})
})
// console.log('starting promise',results,errors)
let [err] = await to(Promise.all(inits.map(initialize)))
if (err) {
this.emit('status',{level:'fatal', msg:'initialize of socket errors was NOT caught --- bad bad',error:err})
@ -231,7 +219,7 @@ class Base extends EventEmitter {
this._socket[name].transport = transport
this._socket[name]._packetProcess = this._packetProcess.bind(this, name)
// bubble up events from sockets to instance
// bubble up events from inidivual sockets to base instance
const EVENTS=['log','connection','connection:consumer', 'connection:socket'] // that should emit up from each socket to instance
EVENTS.forEach(event => {
this._socket[name].on(event, obj => {
@ -246,15 +234,15 @@ class Base extends EventEmitter {
})
if (type==='c') {
this.ready.addObserver(name,this.getSocket(name),{event:'connection:socket',condition:ev=>{return ev.state==='connected'}})
this._socket[name].on('pushed', packet => {
packet._header.socketName=name
this.emit('pushed', packet)
})
}
// if (this._started)
return this.getSocketInit(name) // returns the init function (e.g. connect or create) for the socket
// else return `socket ${name} added and ready to initialize `
}
}
@ -287,7 +275,6 @@ class Base extends EventEmitter {
if (trans) trans = this._validateTransport(trans)
let filtered = []
Object.keys(this._socket).forEach(name => {
// console.log(name, type,this._socket[name].type, trans, this._socket[name].transport)
if ((type==null || this._socket[name].type === type)
&& (trans==null || this._socket[name].transport === trans)
&& (active==null || this._socket[name].active===active)) filtered.push(name)
@ -359,15 +346,11 @@ class Base extends EventEmitter {
else {
let trans = null
if (typeof sockets === 'string') trans = sockets
// console.log('push transport', trans)
sockets = Object.values(this._socket).filter(socket=>socket.type === 's')
// console.log('all server sockets',sockets)
if (trans && trans !=='all') { sockets = sockets.filter(socket=>socket.transport === this._validateTransport(trans))
// console.log('transport filtered server sockets',sockets)
}
}
let broadcast = []
// console.log('===before push', sockets)
for (let socket of sockets) {
let hookedPacket = {}
hookedPacket = socket.beforeSend ? await socket.beforeSend.call(this,Object.assign({},packet),true) : packet
@ -444,8 +427,7 @@ class Base extends EventEmitter {
else return this._namespaces[type].unshift(space)
}
// TODO confirm Object.assign will be ok as it is not a deep copy
// allow a single or arrary of single functions
// object of functions, key is cmd name
amendCommands(funcs, trans, type) {
if (!trans && !type) type = 's'
if (trans ==='c' || trans ==='s') {
@ -455,7 +437,7 @@ class Base extends EventEmitter {
trans = this._validateTransport(trans)
if (!this['_'+type+trans]) this['_'+type+trans] = {}
Object.assign(this['_'+type+trans], funcs) // trans is type here
log.debug({msg:'amended namespace', default_key:'_'+type+trans, functions:this['_'+type+trans]})
log.debug({msg:'amended namespace', id:this.id, default_key:'_'+type+trans, functions:this['_'+type+trans]})
}
amendConsumerCommands(funcs, trans) {
@ -503,20 +485,20 @@ class Base extends EventEmitter {
// if no socket name given it will replace the default processor in _processors from processing.js
altProcessor(func, socket_name) {
socket_name = socket_name || '_default'
this._processors[socket_name] = func
this._cmdProcessors[socket_name] = func
}
//======================================================
//=============PRIVATE METHODS =========================================
/*
*
* Private Methods
* Assigns a Hook Function to a Socket, Type or Transport
*
*/
// options allow applying hook function to specific socket or type or transport, default is all type 's' sockets
_packetHook(hook,func,opts) {
log.debug({msg:'hooking a socket(s)', method:'_packetHook', line:334, hook:hook, function:func, options:opts})
let {name,type,trans,all} = opts
if (opts==null) type = 's' // default is all type 's' sockets
if (name) this._socket[name][hook] = func
else {
log.debug({msg:'sockets available to hook', method:'_packetHook', line:338, sockets: Object.keys(this._socket)})
@ -531,32 +513,47 @@ class Base extends EventEmitter {
/*
**********default packet processor for all sockets
* this can be hooked or replaced all together
********** main packet processor for all sockets
* supports per socket before and after hook processors
* supports additonal registered processors called via packet or socket name, with default processor,
*/
async _packetProcess(socket_name, packet) {
log.debug({ socket:socket_name, packet:packet, method:'_packetProcess', line:393, msg:'processing incoming packet'})
let header = packet._header ? packet._header : {} // retain header
let err, res
if (this._socket[socket_name].beforeProcess) {
[err,res] = await to(this._socket[socket_name].beforeProcess.call(this,packet))
if (err) { // hook has forced an abort to processing
packet.error = err
return packet
}
packet = res
if (packet.error) return packet // don't process a packet with an error
// TODO allow adding to or altering the process map
let processors = new Map([
['before', this.getSocket(socket_name).beforeProcess ],
['command', this._cmdProcessors[packet.cmdProcessor || this._cmdProcessors[socket_name] ? socket_name : '_default'] ],
['after', this.getSocket(socket_name).afterProcess ],
])
let err
for (let [name,func] of processors) { // the same as of recipeMap.entries()
[err,packet] = await to(this._process(socket_name,packet,name,func))
if (err) packet.error = err
}
// the processor can be set via the incoming packet
// otherwise if one is set on the socket or the default found in processing.js
// TODO Try each "available" packet processor in some order if fails try next one before trying the default
let processor = packet._processor || this._processors[socket_name] ? socket_name : '_default'
res = (await this._processors[processor].call(this,packet,socket_name))|| packet // processor didn't return a packet then return the packet sent
log.debug({ socket:socket_name, response:res, msg:'processed packet ready for hook'})
if (this._socket[socket_name].afterProcess) res = await this._socket[socket_name].afterProcess.call(this,res)
log.debug({ socket:socket_name, response:res, msg:'packet after hook complete ready for return'})
res._header = Object.assign(header,res._header) // re-apply header in case hooks or processor mangled or removed it
return res
return packet
}
async _process(socket_name,packet,name,func) {
if (packet.error) return packet // if an error occurs skip any further processing
let err, res
if (func) {
[err,res] = await to(func.call(this,packet,socket_name))
if (err) { // forced an abort to processing
packet.error = err
} else {
if (!isPlainObject(res)) packet.processResult ? packet.processResult[name]=res : packet.processResult = {[name]:res}
else {
let method = (packet.processMethod || {})[name] || packet.processMethod
if (method === 'merge') {
packet = merge(packet,res)
}
else Object.assign(packet,res)
}
}
}
this.emit('log', {level:'trace', msg:`processed packet stage:${name}`,socketName:socket_name,packet:packet})
return packet
}
// all sockets are emitters. Adds a listener to all sockets of a type with given event.
@ -649,3 +646,4 @@ class Base extends EventEmitter {
} // end Base Class
export default Base
export { Base, map, changed, isPlainObject, to, merge } // todo share rxjs

View file

@ -9,7 +9,7 @@ let log = logger({ package: 'base',file:'processing.js'})
// messaging errors on socket will not be fatal to the entire socket server
// common processor, will call based on type s or c the ones below
const processor = async function (packet,socket) {
const cmdProcessor = async function (packet,socket) {
let [err,res] = await to(_process[this.getSocket(socket).type].bind(this)(packet,socket))
if (err) {
let error = {cmd:'error', error:err, packet:packet, socket:socket, function:'processor', line: 15, msg:`'unhandled error in packet command function ${packet.cmd}`}
@ -21,7 +21,7 @@ const processor = async function (packet,socket) {
return res
}
export { processor, defaultCmds, namespaces }
export { cmdProcessor, defaultCmds, namespaces }
// default processors for socket/server and consumer/client
const _process = {
@ -35,14 +35,10 @@ const _process = {
c: async function (packet,socket) {
// the the end of life for a consumer packet that has been sent and returned or a packet that was pushed.
if (packet.error) packet.cmd='error'
if (packet.cmd) {
let response = await this._callCmdFunc(packet,socket); if(response!=='failed') return response
packet = {error:'no consumer processing function supplied in for command in returned packet',packet:packet}
this._c.error(packet)
} else {
packet = {error:'[consumer] no command in returned packet',packet:packet}
return await this._c.error(packet)
}
if (!packet.cmd) packet.cmd ='reply'
let response = await this._callCmdFunc(packet,socket); if(response!=='failed') return response
packet = {error:`no consumer return processing function supplied for ${packet.cmd}`, packet:packet}
this._c.error(packet)
}
}
@ -84,9 +80,11 @@ const defaultCmds ={
c:{
error: function (packet) { // default
log.error({error:packet.error, packet:packet, msg:'==========Consumer Sent Packet returned with ERROR ========='})
return packet
},
reply: function(packet) {
if (process.env.UCI_ENV==='dev') log.info({packet:packet, msg:'====Packet returned from socket - default reply logger==='})
if (process.env.UCI_ENV==='dev') log.debug({packet:packet, msg:'====Packet returned from socket - default reply logger==='})
return packet
}
}
}