changed default commands handling, removed _default namespace
changed s and c name spaces to use ._c, and _s as default name spaces
amend commands work with above ._c and ._s props
changed .socket to ._socket to avoid namespace issues with instances using .socket.  Instances should use the getSocket method!
copied host and port options to connect. for mqtt socket to make setting those more standard
cleaned up logging
Add hooking for before send, and before and after process
Improved/cleaned preempting default processing
This commit is contained in:
David Kebler 2019-04-26 11:05:10 -07:00
parent 6da51d453f
commit ea1342272e
4 changed files with 198 additions and 166 deletions

View file

@ -2,6 +2,8 @@ import Base from '../src/base'
let dy = new Base({id:'dynamic', useRootNS: true })
const HOST = 'nas.kebler.net'
let sensor = true
//dummy simulated push
@ -54,8 +56,9 @@ dy.switch = {
(async () => {
await dy.init()
await dy.addSocket('mqs','s','m')
dy.socket.mqs.subscribe(['switch/on','switch/off','switch/toggle','sensor/test'])
await dy.addSocket('mqs','s','m',{host:HOST})
dy.socket.mqs.subscribe(['lights/#'])
console.log('ready')
})().catch(err => {
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)

View file

@ -1,6 +1,6 @@
{
"name": "@uci/base",
"version": "0.1.19",
"version": "0.1.20",
"description": "Multi type and transport JSON packet communication base class. Used in UCI extended classes",
"main": "src/base",
"scripts": {
@ -8,7 +8,7 @@
"fio": "nodemon -r esm examples/four-in-one",
"dy": "node -r esm examples/dynamic",
"web": "UCI_DEV=true nodemon -r esm examples/web",
"mqtt": "node -r esm examples/mqtt",
"mqtt": "nodemon -r esm examples/mqtt",
"testw": "mocha -r esm test/*.test.mjs --watch --recurse --watch-extensions mjs",
"test": "mocha -r esm test/*.test.mjs",
"testci": "istanbul cover ./node_modules/.bin/_mocha --report lcovonly -- -R spec --recursive && codecov || true"
@ -37,11 +37,11 @@
},
"dependencies": {
"@uci-utils/bind-funcs": "^0.2.3",
"@uci-utils/logger": "0.0.13",
"@uci/mqtt": "^0.1.9",
"@uci/socket": "^0.2.10",
"@uci/websocket": "^0.3.5",
"@uci-utils/logger": "^0.0.14",
"@uci/mqtt": "^0.1.11",
"@uci/socket": "^0.2.12",
"@uci/websocket": "^0.3.6",
"await-to-js": "^2.1.1",
"p-settle": "^3.0.0"
"p-settle": "^3.1.0"
}
}

View file

@ -20,7 +20,7 @@ import EventEmitter from 'events'
import pSettle from 'p-settle'
// Internal dependencies
import { processor, commands, namespaces } from './processing'
import { processor, defaultCmds, namespaces } from './processing'
// Useful Constants
const SOCKET_INFO_KEYS = ['name', 'type', 'transport']
@ -63,20 +63,23 @@ class Base extends EventEmitter {
this.id = opts.id || opts.name || 'uci-base:' + new Date().getTime()
log = logger({ name: 'base', id: this.id })
this.desc = opts.desc // additional details for humans
this.socket = {} // holds all the various communication sockets
this._socket = {} // holds all the various communication sockets
this._started = false // flag to know when instance has been initialized
this._processors = { _default: processor }
this._defaultCmds = commands
// _c and _s are the default namespaces
this._namespaces = namespaces
this._c = defaultCmds.c
this._s = defaultCmds.s
if (opts.useRootNS) {
// add root of instance to checking for command functions - not recommended!
this._namespaces.s.splice(-1, 0, null)
this._namespaces.c.splice(-1, 0, null)
}
// method that will bind a whole object tree of functions
this.bindFuncs = bindFuncs
// predefined sockets:
// comma delimited list of this form '<name>#<c/p/s>><n=np/t=tcp/m=mqtt/w=web>'
this.socket = {}
this._socket = {}
if (opts.sockets) {
opts.sockets.split(/[,|\s]+/).forEach(socketStr => {
let socket = {}
@ -111,15 +114,12 @@ class Base extends EventEmitter {
async init() {
let sockets = []
let initSockets = []
for (let name of Object.keys(this.socket)) {
for (let name of Object.keys(this._socket)) {
initSockets.push(this._initSocket(name))
sockets.push(name)
}
return pSettle(initSockets).then(res => {
log.info(
{ sockets: res },
'response from intializing sockets via instance options'
)
log.debug({ sockets: res, method:'init', line:122, msg:'response from intializing sockets via instance options'})
let err = []
res.forEach((p, index) => {
if (p.isRejected) {
@ -144,36 +144,32 @@ class Base extends EventEmitter {
* @returns {string} Description
*/
async addSocket(name, type = 'c', transport = 'n', options = {}) {
log.info(
{ socketName: name, type: type, tranport: transport, options: options },
`adding socket ${name}`
)
log.debug({ socketName: name, type: type, tranport: transport, options: options, method:'addSocket', line:147, msg:`adding socket ${name}`})
options.id = this.id + ':' + name
switch (transport) {
case 'n':
options.path = options.path || true
// falls through
case 't':
this.socket[name] = new Socket[TRANSLATE[type]](options)
this._socket[name] = new Socket[TRANSLATE[type]](options)
break
case 'm':
if (type === 'p') type = 'c'
options.connect = options.connect || {}
if (options.host) options.connect.host = options.host
if (options.port) options.connect.port = options.port
options.connect.connectTimeout = options.connect.connectTimeout || 5000
this.socket[name] = new MQTT(options)
this._socket[name] = new MQTT(options)
break
case 'w':
if (type === 's') this.socket[name] = new WebSocket(options)
if (type === 's') this._socket[name] = new WebSocket(options)
else
log.warn(
{ name: name, type: type, transport: transport },
'Web socket not created Consumer/Client Web Socket not supported'
)
log.warn({ name: name, type: type, transport: transport, method:'addSocket', line:167, msg:'Web socket not created Consumer/Client Web Socket not supported'})
}
this.socket[name].name = name
this.socket[name].type = type
this.socket[name].transport = transport
this.socket[name]._packetProcess = this._packetProcess.bind(this, name)
this._socket[name].name = name
this._socket[name].type = type
this._socket[name].transport = transport
this._socket[name]._packetProcess = this._packetProcess.bind(this, name)
if (this._started) return await this._initSocket(name)
else return `socket ${name} added`
}
@ -186,13 +182,19 @@ class Base extends EventEmitter {
*/
async removeSocket(name) {
// NOTE: uci consumers have .end renamed as .close to match socket method for convenience
let [err] = await to(this.socket[name].close())
let errmsg = {socket:this.socket[name].name, error:err, msg:'socket/consumer closed with errors but still removed'}
let [err] = await to(this._socket[name].close())
let errmsg = {socket:this._socket[name].name, error:err, msg:'socket/consumer closed with errors but still removed'}
if (err) log.warn(errmsg)
delete this.socket[name]
delete this._socket[name]
return err ? errmsg : 'success'
}
getSocket(name) {
if (name) return this._socket[name]
else return this._socket
}
/**
* send - Description
*
@ -205,43 +207,53 @@ class Base extends EventEmitter {
if (typeof name !== 'string') {
packet = name
let sends = []
for (let name of Object.keys(this.socket)) {
if (this.socket[name].type === 'c') {
sends.push(this.socket[name].send.bind(this.socket[name]))
for (let name of Object.keys(this._socket)) {
if (this._socket[name].type === 'c') {
let hookedPacket = {}
hookedPacket = this._socket[name].beforeSend ? await this._socket[name].beforeSend.call(this,Object.assign({},packet)) : packet
log.debug({msg:'after possible hook packet to send', name:name, packet:hookedPacket, method:'send', line:217})
sends.push(this._socket[name].send.bind(this._socket[name],hookedPacket))
}
}
if (sends.length === 1) return await sends[0](packet)
if (sends.length === 1) return await sends[0]()
return Promise.all(
sends.map(send => {
return send(packet)
return send()
})
)
} else {
if (this.socket[name]) return await this.socket[name].send(packet)
if (this._socket[name]) {
if (this._socket[name].beforeSend) packet = await this._socket[name].beforeSend.call(this,packet)
log.debug({msg:'single socket hooked packet to send', name:name, packet:packet, method:'send', line:230})
return await this._socket[name].send(packet)
}
else return { error: `no consumer socket of name ${name}` }
}
}
async push(packet) {
let broadcast = []
for (let name of Object.keys(this.socket)) {
if (this.socket[name].type === 's') {
broadcast.push(this.socket[name].push.bind(this.socket[name]))
for (let name of Object.keys(this._socket)) {
if (this._socket[name].type === 's') {
let hookedPacket = {}
hookedPacket = this._socket[name].beforeSend ? await this._socket[name].beforeSend.call(this,Object.assign({},packet),true) : packet
log.debug({msg:'hooked packet to push', name:name, packet:hookedPacket, method:'push', line:243})
broadcast.push(this._socket[name].push.bind(this._socket[name],hookedPacket))
}
}
return Promise.all(
broadcast.map(push => {
return push(packet)
return push()
})
)
}
async sendTransport(packet, transport) {
let sends = []
for (let name of Object.keys(this.socket)) {
if (this.socket[name].type === 'c') {
if (this.socket[name].transport === transport) {
sends.push(this.socket[name].send.bind(this.socket[name]))
for (let name of Object.keys(this._socket)) {
if (this._socket[name].type === 'c') {
if (this._socket[name].transport === transport) {
sends.push(this._socket[name].send.bind(this._socket[name]))
}
}
}
@ -253,7 +265,7 @@ class Base extends EventEmitter {
)
}
// async sendMQTT(topic, packet) {return this.socket.mqtt.send(topic, packet)}
// async sendMQTT(topic, packet) {return this._socket.mqtt.send(topic, packet)}
async sendTCP(packet) {
return this.sendTransport(packet, 't')
}
@ -261,9 +273,14 @@ class Base extends EventEmitter {
return this.sendTransport(packet, 'n')
}
getSocket(name) {
return this.socket[name]
socketsListen(event,fn) {
this._eventListen('s',event,fn)
}
consumersListen(event,fn) {
this._eventListen('c',event,fn)
}
getPacketByName(name, packets) {
if (!packets.length) packets = [packets]
@ -277,77 +294,84 @@ class Base extends EventEmitter {
return found
}
socketsListen(event,fn) {
this._eventListen('s',event,fn)
}
consumersListen(event,fn) {
this._eventListen('c',event,fn)
}
// TODO confirm Object.assign will be ok as it is not a deep copy
amendConsumerProcessing(funcs, trans) {
if (trans) {
if (!this._defaultCmds.c[trans]) this._defaultCmds.c[trans] = {}
Object.assign(this._defaultCmds.c[trans], funcs)
}
Object.assign(this._defaultCmds.c, funcs)
}
amendSocketProcessing(funcs, trans) {
if (trans) {
if (!this._defaultCmds.s[trans]) this._defaultCmds.s[trans] = {}
Object.assign(this._defaultCmds.s[trans], funcs)
}
Object.assign(this._defaultCmds.s, funcs)
}
// use s: and c: keys TODO need to change this
addNamedProcessing(name, funcs, type) {
if (type) {
if (!this._cmds[name][type]) this._cmds[name][type] = {}
Object.assign(this._cmds[name][type], funcs)
} else {
if (!this._cmds[name]) this._cmds[name] = {}
Object.assign(this._cmds[name], funcs)
}
}
// func should take and return a packet
// beforeSendHook (func,type,transport){} // TODO
// afterReceiveHook(func,type,transport){} // TODO
// afterProcessHook(func,type,transport){} // TODO
// here you can add namespaced functions for packet commands
//======================================================================
// TODO next several need to be redone now that namespace commands work
consumersProcessor(func) {
for (let name of Object.keys(this.socket)) {
if (this.socket[name].type === 'c') {
this.socketNameProcessor(func, name)
}
}
}
socketsProcessor(func) {
for (let name of Object.keys(this.socket)) {
if (this.socket[name].type === 's') {
this.socketNamedProcessor(func, name)
}
}
}
socketNameProcessor(func, socket_name) {
socket_name = socket_name || '_default'
this._processors[socket_name]._process = func
}
// add set of functions to class prop/space and then register with this
addNamespace(space, type, trans) {
if (trans) return this._namespaces[type + trans].unshift(space)
else return this._namespaces[type].unshift(space)
}
// TODO confirm Object.assign will be ok as it is not a deep copy
// one off add a command function or two to basic namespaces which is called before default
amendConsumerCommands(funcs, trans) {
if (trans) {
if (!this._c[trans]) this._c[trans] = {}
Object.assign(this._c[trans], funcs)
}
Object.assign(this._c, funcs)
}
amendSocketCommands(funcs, trans) {
if (trans) {
if (!this._s[trans]) this._s[trans] = {}
Object.assign(this._s[trans], funcs)
}
Object.assign(this._s, funcs)
}
// func should take and return a packet. if type
beforeSendHook(func,opts) {
this._packetHook('beforeSend', func,opts)
}
beforeProcessHook(func,opts) {
this._packetHook('beforeProcess', func,opts)
}
afterProcessHook(func,opts) {
this._packetHook('afterProcess', func,opts)
}
_packetHook(hook,func,opts) {
log.debug({msg:'hooking a socket(s)', method:'_packetHook', line:334, hook:hook, function:func, options:opts})
let {name,type,trans,all} = opts
if (name) this._socket[name][hook] = func
else {
log.debug({msg:'sockets available to hook', method:'_packetHook', line:338, sockets: Object.keys(this._socket)})
for (let name of Object.keys(this._socket)) {
if (this._socket[name].type === type) this._socket[name][hook] = func
if (this._socket[name].transport === trans) this._socket[name][hook] = func
if (all) this._socket[name][hook] = func
if (this._socket[name][hook]) log.debug({msg:'hooked socket', method:'_packetHook', line:343, name:name, type:this._socket[name].type, trans:this._socket[name].transport})
}
}
}
// A Big Hammer - use only if necessary - default with hooks should suffice
// these three will pre-empt default processor to be called in ._packetProcess
// add and override default processor for ALL consumers, i.e. packets returning form a send or arrived from a push
consumersProcessor(func) {
for (let name of Object.keys(this._socket)) {
if (this._socket[name].type === 'c') {
this.altProcessor(func, name)
}
}
}
// add and override default processor for ALL sockets, i.e. packets coming in from consumer send
socketsProcessor(func) {
for (let name of Object.keys(this._socket)) {
if (this._socket[name].type === 's') {
this.altProcessor(func, name)
}
}
}
// add and override a processor for a particular socket/consumer to list of processors
// if no socket name given it will replace the default processor in _processors from processing.js
altProcessor(func, socket_name) {
socket_name = socket_name || '_default'
this._processors[socket_name] = func
}
//======================================================
/*
@ -356,38 +380,58 @@ class Base extends EventEmitter {
*
*/
/*
**********default packet processor for all sockets
* this can be hooked or replaced all together
*/
async _packetProcess(socket_name, packet) {
log.debug({ socket:socket_name, packet:packet, method:'_packetProcess', line:393, msg:'processing incoming packet'})
// the processor can be set via the incoming packet
// otherwise if one is set on the socket or the default found in processing.js
// TODO?? Try all each available packet processors in some order if fails try next one before trying the default
if (this._socket[socket_name].beforeProcess) packet = await this._socket[socket_name].beforeProcess.call(this,packet)
if (packet.error) return packet // hook invalidated the packet abort further processing
let processor = packet._processor || this._processors[socket_name] ? socket_name : '_default'
let res = await this._processors[processor].call(this,packet,socket_name)
if (this._socket[socket_name].afterProcess) res = await this._socket[socket_name].afterProcess.call(this,res)
log.debug({ socket:socket_name, response:res, msg:'processed packet ready for return'})
return res
}
async _initSocket(name) {
let socket = this.socket[name]
let socket = this._socket[name]
let init = {}
if (this.socket[name].type === 's' && this.socket[name].transport !== 'm') {
if (this._socket[name].type === 's' && this._socket[name].transport !== 'm') {
init = socket.create
} else {
init = socket.connect
}
log.info(`initializing socket ${name}, ${socket.type}, ${socket.transport}`)
log.debug(`initializing socket ${name}, ${socket.type}, ${socket.transport}`)
if (this._started)
return `socket ${name} added and initialzed, ${await init()}`
else return init()
}
// all sockets are emitters. Adds a listener to all sockets of a type with given event.
// now sockets can emit locally processed events
_eventListen(type,event,fn) {
for (let name of Object.keys(this.socket)) {
if (this.socket[name].type === type) {
if (fn==='stop') this.socket[name].removeAllListeners(event)
for (let name of Object.keys(this._socket)) {
if (this._socket[name].type === type) {
if (fn==='stop') this._socket[name].removeAllListeners(event)
else {
console.log('adding listener',name,type,event,fn )
console.log(this.socket[name].name)
this.socket[name].on(event, fn)
log.debug({socket:name, type:type, event:event, msg:'adding listener to socket'})
this._socket[name].on(event, fn)
}
}
}
}
_transport(name) {
return this.socket[name].transport
return this._socket[name].transport
} //getter for socket transport
_type(name) {
return this.socket[name].type
return this._socket[name].type
} //getter for socket type
_getTransportNamespaces(socket) {
@ -406,19 +450,18 @@ class Base extends EventEmitter {
// takes command and returns corresponding function in a hash
_getCmdFunc(cmd, obj) {
// console.log('obj',obj)
if (typeof cmd === 'string') {
if (!obj) obj = this
cmd = cmd.split(/[.:/]+/)
// console.log('===================',cmd)
}
var prop = cmd.shift()
if (cmd.length === 0) return obj[prop]
if (!obj[prop]) return null
// console.log(cmd.length,cmd,prop, obj[prop])
log.debug({length:cmd.length,cmd:cmd, prop:prop, objprop:obj[prop], method:'_getCmdFunc', msg:'command to corresponding function in a hash'})
return this._getCmdFunc(cmd, obj[prop])
}
// primary function to find a function to call based on packet cmd
async _callCmdFunc(packet, socket) {
let cmd_func = this._getCmdFuncNamespace(
packet.cmd,
@ -433,22 +476,7 @@ class Base extends EventEmitter {
return 'failed'
}
/*
**********default packet processor for all sockets
*/
async _packetProcess(socket_name, packet) {
// TODO Try all added packet processors then defualt before sending back
// console.log(socket_name,packet)
let processor =
packet._processor || this._processors[socket_name] || '_default'
return await this._processors[processor].bind(this)(
packet,
socket_name,
this._processors[processor]
)
}
} // end Base Class
export default Base

View file

@ -7,44 +7,47 @@ let log = logger({ package: 'base',file:'processing.js'})
// processing errors that are caught should be sent back to consumer in packets with :error property
// but they might also throw local errors/execptions so they should bubble up here and get caught and logged
// messaging errors on socket will not be fatal to the entire socket server
// common processor, will call based on type s or c the ones below
const processor = async function (packet,socket) {
let [err,res] = await to(_process[this.getSocket(socket).type].bind(this)(packet,socket))
if (err) {
let error = {error:err, socket:socket, packet:packet, msg:'some possibly unhandled badness happened during packet processing'}
log.warn(error)
let error = {error:err, socket:socket, packet:packet, function:'processor', line: 15, msg:'some possibly unhandled badness happened during packet processing'}
log.error(error)
if (process.env.UCI_SHOW_UNHANDLED==='true') console.log(error)
}
else return res
}
export { processor, commands, namespaces }
export { processor, defaultCmds, namespaces }
// default processors for socket/server and consumer/client
const _process = {
s: async function (packet,socket) {
// console.log('in default socket processor',packet.cmd)
if (!packet.cmd) return {error: '[socket] no command in packet', packet: packet }
if (!packet.cmd) return {error: 'no command (cmd:) in packet for socket', packet: packet }
// this call will search the namespace and envoke a function and return a repsonse packet
let response = await this._callCmdFunc(packet,socket); if(response!=='failed') return response
return {error: 'no socket processing function supplied for command', packet: packet }
},
c: async function (packet,socket) {
// console.log('in default consumer processor',packet.cmd)
if (packet.error) return await this._defaultCmds.c.error(packet)
// the the end of life for a consumer packet that has been sent and returned or a packet that was pushed.
if (packet.error) return await this._c.error(packet)
if (packet.cmd) {
let response = await this._callCmdFunc(packet,socket); if(response!=='failed') return response
packet = {error:'no consumer processing function supplied for command',packet:packet}
this._defaultCmds.c.error(packet)
packet = {error:'no consumer reply processing function supplied for command',packet:packet}
this._c.error(packet)
} else {
packet = {error:'[consumer] no command in packet',packet:packet}
return await this._defaultCmds.c.error(packet)
packet = {error:'[consumer] no command in reply packet',packet:packet}
return await this._c.error(packet)
}
}
}
const namespaces = {
s: ['s','_defaultCmds.s'],
c: ['c','_defaultCmds.c'],
s: ['_s'],
c: ['_c'],
cn: ['cn'],
ct: ['ct'],
sn: ['sn'],
@ -60,7 +63,7 @@ const namespaces = {
*
*/
const commands ={
const defaultCmds ={
s:{
echo: async packet => {
packet.processed = true
@ -75,12 +78,10 @@ const commands ={
},
c:{
error: function (packet) {
// TODO log and make this show only on env debug
log.warn({error:packet.error, packet:packet, msg:'==========Packet ERROR [consumer]========='})
if (process.env.UCI_ENV==='dev') log.error({error:packet.error, packet:packet, msg:'==========Consumer Sent Packet returned with ERROR ========='})
},
reply: function(packet) {
// TODO log and make this show only on env debug
log.debug({packet:packet, msg:'====Packet returned from socket - debug reply==='})
if (process.env.UCI_ENV==='dev') log.debug({packet:packet, msg:'====Packet returned from socket - debug default reply logger==='})
}
}
}