From ea1342272e84414da74d83069dfc66c3eb9a99b9 Mon Sep 17 00:00:00 2001 From: David Kebler Date: Fri, 26 Apr 2019 11:05:10 -0700 Subject: [PATCH] 0.1.20 changed default commands handling, removed _default namespace changed s and c name spaces to use ._c, and _s as default name spaces amend commands work with above ._c and ._s props changed .socket to ._socket to avoid namespace issues with instances using .socket. Instances should use the getSocket method! copied host and port options to connect. for mqtt socket to make setting those more standard cleaned up logging Add hooking for before send, and before and after process Improved/cleaned preempting default processing --- examples/mqtt.js | 7 +- package.json | 14 +-- src/base.js | 306 +++++++++++++++++++++++++--------------------- src/processing.js | 37 +++--- 4 files changed, 198 insertions(+), 166 deletions(-) diff --git a/examples/mqtt.js b/examples/mqtt.js index e4def71..0d64923 100644 --- a/examples/mqtt.js +++ b/examples/mqtt.js @@ -2,6 +2,8 @@ import Base from '../src/base' let dy = new Base({id:'dynamic', useRootNS: true }) +const HOST = 'nas.kebler.net' + let sensor = true //dummy simulated push @@ -54,8 +56,9 @@ dy.switch = { (async () => { await dy.init() - await dy.addSocket('mqs','s','m') - dy.socket.mqs.subscribe(['switch/on','switch/off','switch/toggle','sensor/test']) + await dy.addSocket('mqs','s','m',{host:HOST}) + dy.socket.mqs.subscribe(['lights/#']) + console.log('ready') })().catch(err => { console.error('FATAL: UNABLE TO START SYSTEM!\n',err) diff --git a/package.json b/package.json index 950eb62..1354db3 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@uci/base", - "version": "0.1.19", + "version": "0.1.20", "description": "Multi type and transport JSON packet communication base class. Used in UCI extended classes", "main": "src/base", "scripts": { @@ -8,7 +8,7 @@ "fio": "nodemon -r esm examples/four-in-one", "dy": "node -r esm examples/dynamic", "web": "UCI_DEV=true nodemon -r esm examples/web", - "mqtt": "node -r esm examples/mqtt", + "mqtt": "nodemon -r esm examples/mqtt", "testw": "mocha -r esm test/*.test.mjs --watch --recurse --watch-extensions mjs", "test": "mocha -r esm test/*.test.mjs", "testci": "istanbul cover ./node_modules/.bin/_mocha --report lcovonly -- -R spec --recursive && codecov || true" @@ -37,11 +37,11 @@ }, "dependencies": { "@uci-utils/bind-funcs": "^0.2.3", - "@uci-utils/logger": "0.0.13", - "@uci/mqtt": "^0.1.9", - "@uci/socket": "^0.2.10", - "@uci/websocket": "^0.3.5", + "@uci-utils/logger": "^0.0.14", + "@uci/mqtt": "^0.1.11", + "@uci/socket": "^0.2.12", + "@uci/websocket": "^0.3.6", "await-to-js": "^2.1.1", - "p-settle": "^3.0.0" + "p-settle": "^3.1.0" } } diff --git a/src/base.js b/src/base.js index 6b020d7..6792c02 100644 --- a/src/base.js +++ b/src/base.js @@ -20,7 +20,7 @@ import EventEmitter from 'events' import pSettle from 'p-settle' // Internal dependencies -import { processor, commands, namespaces } from './processing' +import { processor, defaultCmds, namespaces } from './processing' // Useful Constants const SOCKET_INFO_KEYS = ['name', 'type', 'transport'] @@ -63,20 +63,23 @@ class Base extends EventEmitter { this.id = opts.id || opts.name || 'uci-base:' + new Date().getTime() log = logger({ name: 'base', id: this.id }) this.desc = opts.desc // additional details for humans - this.socket = {} // holds all the various communication sockets + this._socket = {} // holds all the various communication sockets this._started = false // flag to know when instance has been initialized this._processors = { _default: processor } - this._defaultCmds = commands + // _c and _s are the default namespaces this._namespaces = namespaces + this._c = defaultCmds.c + this._s = defaultCmds.s if (opts.useRootNS) { // add root of instance to checking for command functions - not recommended! this._namespaces.s.splice(-1, 0, null) this._namespaces.c.splice(-1, 0, null) } + // method that will bind a whole object tree of functions this.bindFuncs = bindFuncs // predefined sockets: // comma delimited list of this form '#>' - this.socket = {} + this._socket = {} if (opts.sockets) { opts.sockets.split(/[,|\s]+/).forEach(socketStr => { let socket = {} @@ -111,15 +114,12 @@ class Base extends EventEmitter { async init() { let sockets = [] let initSockets = [] - for (let name of Object.keys(this.socket)) { + for (let name of Object.keys(this._socket)) { initSockets.push(this._initSocket(name)) sockets.push(name) } return pSettle(initSockets).then(res => { - log.info( - { sockets: res }, - 'response from intializing sockets via instance options' - ) + log.debug({ sockets: res, method:'init', line:122, msg:'response from intializing sockets via instance options'}) let err = [] res.forEach((p, index) => { if (p.isRejected) { @@ -144,36 +144,32 @@ class Base extends EventEmitter { * @returns {string} Description */ async addSocket(name, type = 'c', transport = 'n', options = {}) { - log.info( - { socketName: name, type: type, tranport: transport, options: options }, - `adding socket ${name}` - ) + log.debug({ socketName: name, type: type, tranport: transport, options: options, method:'addSocket', line:147, msg:`adding socket ${name}`}) options.id = this.id + ':' + name switch (transport) { case 'n': options.path = options.path || true // falls through case 't': - this.socket[name] = new Socket[TRANSLATE[type]](options) + this._socket[name] = new Socket[TRANSLATE[type]](options) break case 'm': if (type === 'p') type = 'c' options.connect = options.connect || {} + if (options.host) options.connect.host = options.host + if (options.port) options.connect.port = options.port options.connect.connectTimeout = options.connect.connectTimeout || 5000 - this.socket[name] = new MQTT(options) + this._socket[name] = new MQTT(options) break case 'w': - if (type === 's') this.socket[name] = new WebSocket(options) + if (type === 's') this._socket[name] = new WebSocket(options) else - log.warn( - { name: name, type: type, transport: transport }, - 'Web socket not created Consumer/Client Web Socket not supported' - ) + log.warn({ name: name, type: type, transport: transport, method:'addSocket', line:167, msg:'Web socket not created Consumer/Client Web Socket not supported'}) } - this.socket[name].name = name - this.socket[name].type = type - this.socket[name].transport = transport - this.socket[name]._packetProcess = this._packetProcess.bind(this, name) + this._socket[name].name = name + this._socket[name].type = type + this._socket[name].transport = transport + this._socket[name]._packetProcess = this._packetProcess.bind(this, name) if (this._started) return await this._initSocket(name) else return `socket ${name} added` } @@ -186,13 +182,19 @@ class Base extends EventEmitter { */ async removeSocket(name) { // NOTE: uci consumers have .end renamed as .close to match socket method for convenience - let [err] = await to(this.socket[name].close()) - let errmsg = {socket:this.socket[name].name, error:err, msg:'socket/consumer closed with errors but still removed'} + let [err] = await to(this._socket[name].close()) + let errmsg = {socket:this._socket[name].name, error:err, msg:'socket/consumer closed with errors but still removed'} if (err) log.warn(errmsg) - delete this.socket[name] + delete this._socket[name] return err ? errmsg : 'success' } + + getSocket(name) { + if (name) return this._socket[name] + else return this._socket + } + /** * send - Description * @@ -205,43 +207,53 @@ class Base extends EventEmitter { if (typeof name !== 'string') { packet = name let sends = [] - for (let name of Object.keys(this.socket)) { - if (this.socket[name].type === 'c') { - sends.push(this.socket[name].send.bind(this.socket[name])) + for (let name of Object.keys(this._socket)) { + if (this._socket[name].type === 'c') { + let hookedPacket = {} + hookedPacket = this._socket[name].beforeSend ? await this._socket[name].beforeSend.call(this,Object.assign({},packet)) : packet + log.debug({msg:'after possible hook packet to send', name:name, packet:hookedPacket, method:'send', line:217}) + sends.push(this._socket[name].send.bind(this._socket[name],hookedPacket)) } } - if (sends.length === 1) return await sends[0](packet) + if (sends.length === 1) return await sends[0]() return Promise.all( sends.map(send => { - return send(packet) + return send() }) ) } else { - if (this.socket[name]) return await this.socket[name].send(packet) + if (this._socket[name]) { + if (this._socket[name].beforeSend) packet = await this._socket[name].beforeSend.call(this,packet) + log.debug({msg:'single socket hooked packet to send', name:name, packet:packet, method:'send', line:230}) + return await this._socket[name].send(packet) + } else return { error: `no consumer socket of name ${name}` } } } async push(packet) { let broadcast = [] - for (let name of Object.keys(this.socket)) { - if (this.socket[name].type === 's') { - broadcast.push(this.socket[name].push.bind(this.socket[name])) + for (let name of Object.keys(this._socket)) { + if (this._socket[name].type === 's') { + let hookedPacket = {} + hookedPacket = this._socket[name].beforeSend ? await this._socket[name].beforeSend.call(this,Object.assign({},packet),true) : packet + log.debug({msg:'hooked packet to push', name:name, packet:hookedPacket, method:'push', line:243}) + broadcast.push(this._socket[name].push.bind(this._socket[name],hookedPacket)) } } return Promise.all( broadcast.map(push => { - return push(packet) + return push() }) ) } async sendTransport(packet, transport) { let sends = [] - for (let name of Object.keys(this.socket)) { - if (this.socket[name].type === 'c') { - if (this.socket[name].transport === transport) { - sends.push(this.socket[name].send.bind(this.socket[name])) + for (let name of Object.keys(this._socket)) { + if (this._socket[name].type === 'c') { + if (this._socket[name].transport === transport) { + sends.push(this._socket[name].send.bind(this._socket[name])) } } } @@ -253,7 +265,7 @@ class Base extends EventEmitter { ) } - // async sendMQTT(topic, packet) {return this.socket.mqtt.send(topic, packet)} + // async sendMQTT(topic, packet) {return this._socket.mqtt.send(topic, packet)} async sendTCP(packet) { return this.sendTransport(packet, 't') } @@ -261,9 +273,14 @@ class Base extends EventEmitter { return this.sendTransport(packet, 'n') } - getSocket(name) { - return this.socket[name] + + socketsListen(event,fn) { + this._eventListen('s',event,fn) } + consumersListen(event,fn) { + this._eventListen('c',event,fn) + } + getPacketByName(name, packets) { if (!packets.length) packets = [packets] @@ -277,77 +294,84 @@ class Base extends EventEmitter { return found } - socketsListen(event,fn) { - this._eventListen('s',event,fn) - } - consumersListen(event,fn) { - this._eventListen('c',event,fn) - } - - - // TODO confirm Object.assign will be ok as it is not a deep copy - amendConsumerProcessing(funcs, trans) { - if (trans) { - if (!this._defaultCmds.c[trans]) this._defaultCmds.c[trans] = {} - Object.assign(this._defaultCmds.c[trans], funcs) - } - Object.assign(this._defaultCmds.c, funcs) - } - - amendSocketProcessing(funcs, trans) { - if (trans) { - if (!this._defaultCmds.s[trans]) this._defaultCmds.s[trans] = {} - Object.assign(this._defaultCmds.s[trans], funcs) - } - Object.assign(this._defaultCmds.s, funcs) - } - - // use s: and c: keys TODO need to change this - addNamedProcessing(name, funcs, type) { - if (type) { - if (!this._cmds[name][type]) this._cmds[name][type] = {} - Object.assign(this._cmds[name][type], funcs) - } else { - if (!this._cmds[name]) this._cmds[name] = {} - Object.assign(this._cmds[name], funcs) - } - } - - // func should take and return a packet - // beforeSendHook (func,type,transport){} // TODO - // afterReceiveHook(func,type,transport){} // TODO - // afterProcessHook(func,type,transport){} // TODO - - // here you can add namespaced functions for packet commands - - //====================================================================== - // TODO next several need to be redone now that namespace commands work - consumersProcessor(func) { - for (let name of Object.keys(this.socket)) { - if (this.socket[name].type === 'c') { - this.socketNameProcessor(func, name) - } - } - } - - socketsProcessor(func) { - for (let name of Object.keys(this.socket)) { - if (this.socket[name].type === 's') { - this.socketNamedProcessor(func, name) - } - } - } - - socketNameProcessor(func, socket_name) { - socket_name = socket_name || '_default' - this._processors[socket_name]._process = func - } - + // add set of functions to class prop/space and then register with this addNamespace(space, type, trans) { if (trans) return this._namespaces[type + trans].unshift(space) else return this._namespaces[type].unshift(space) } + // TODO confirm Object.assign will be ok as it is not a deep copy + // one off add a command function or two to basic namespaces which is called before default + amendConsumerCommands(funcs, trans) { + if (trans) { + if (!this._c[trans]) this._c[trans] = {} + Object.assign(this._c[trans], funcs) + } + Object.assign(this._c, funcs) + } + amendSocketCommands(funcs, trans) { + if (trans) { + if (!this._s[trans]) this._s[trans] = {} + Object.assign(this._s[trans], funcs) + } + Object.assign(this._s, funcs) + } + + // func should take and return a packet. if type + beforeSendHook(func,opts) { + this._packetHook('beforeSend', func,opts) + } + + beforeProcessHook(func,opts) { + this._packetHook('beforeProcess', func,opts) + } + + afterProcessHook(func,opts) { + this._packetHook('afterProcess', func,opts) + } + + _packetHook(hook,func,opts) { + log.debug({msg:'hooking a socket(s)', method:'_packetHook', line:334, hook:hook, function:func, options:opts}) + let {name,type,trans,all} = opts + if (name) this._socket[name][hook] = func + else { + log.debug({msg:'sockets available to hook', method:'_packetHook', line:338, sockets: Object.keys(this._socket)}) + for (let name of Object.keys(this._socket)) { + if (this._socket[name].type === type) this._socket[name][hook] = func + if (this._socket[name].transport === trans) this._socket[name][hook] = func + if (all) this._socket[name][hook] = func + if (this._socket[name][hook]) log.debug({msg:'hooked socket', method:'_packetHook', line:343, name:name, type:this._socket[name].type, trans:this._socket[name].transport}) + } + } + } + + // A Big Hammer - use only if necessary - default with hooks should suffice + // these three will pre-empt default processor to be called in ._packetProcess + + // add and override default processor for ALL consumers, i.e. packets returning form a send or arrived from a push + consumersProcessor(func) { + for (let name of Object.keys(this._socket)) { + if (this._socket[name].type === 'c') { + this.altProcessor(func, name) + } + } + } + + // add and override default processor for ALL sockets, i.e. packets coming in from consumer send + socketsProcessor(func) { + for (let name of Object.keys(this._socket)) { + if (this._socket[name].type === 's') { + this.altProcessor(func, name) + } + } + } + // add and override a processor for a particular socket/consumer to list of processors + // if no socket name given it will replace the default processor in _processors from processing.js + altProcessor(func, socket_name) { + socket_name = socket_name || '_default' + this._processors[socket_name] = func + } + //====================================================== /* @@ -356,38 +380,58 @@ class Base extends EventEmitter { * */ + /* + **********default packet processor for all sockets + * this can be hooked or replaced all together + */ + + async _packetProcess(socket_name, packet) { + log.debug({ socket:socket_name, packet:packet, method:'_packetProcess', line:393, msg:'processing incoming packet'}) + // the processor can be set via the incoming packet + // otherwise if one is set on the socket or the default found in processing.js + // TODO?? Try all each available packet processors in some order if fails try next one before trying the default + if (this._socket[socket_name].beforeProcess) packet = await this._socket[socket_name].beforeProcess.call(this,packet) + if (packet.error) return packet // hook invalidated the packet abort further processing + let processor = packet._processor || this._processors[socket_name] ? socket_name : '_default' + let res = await this._processors[processor].call(this,packet,socket_name) + if (this._socket[socket_name].afterProcess) res = await this._socket[socket_name].afterProcess.call(this,res) + log.debug({ socket:socket_name, response:res, msg:'processed packet ready for return'}) + return res + } + async _initSocket(name) { - let socket = this.socket[name] + let socket = this._socket[name] let init = {} - if (this.socket[name].type === 's' && this.socket[name].transport !== 'm') { + if (this._socket[name].type === 's' && this._socket[name].transport !== 'm') { init = socket.create } else { init = socket.connect } - log.info(`initializing socket ${name}, ${socket.type}, ${socket.transport}`) + log.debug(`initializing socket ${name}, ${socket.type}, ${socket.transport}`) if (this._started) return `socket ${name} added and initialzed, ${await init()}` else return init() } + // all sockets are emitters. Adds a listener to all sockets of a type with given event. + // now sockets can emit locally processed events _eventListen(type,event,fn) { - for (let name of Object.keys(this.socket)) { - if (this.socket[name].type === type) { - if (fn==='stop') this.socket[name].removeAllListeners(event) + for (let name of Object.keys(this._socket)) { + if (this._socket[name].type === type) { + if (fn==='stop') this._socket[name].removeAllListeners(event) else { - console.log('adding listener',name,type,event,fn ) - console.log(this.socket[name].name) - this.socket[name].on(event, fn) + log.debug({socket:name, type:type, event:event, msg:'adding listener to socket'}) + this._socket[name].on(event, fn) } } } } _transport(name) { - return this.socket[name].transport + return this._socket[name].transport } //getter for socket transport _type(name) { - return this.socket[name].type + return this._socket[name].type } //getter for socket type _getTransportNamespaces(socket) { @@ -406,19 +450,18 @@ class Base extends EventEmitter { // takes command and returns corresponding function in a hash _getCmdFunc(cmd, obj) { - // console.log('obj',obj) if (typeof cmd === 'string') { if (!obj) obj = this cmd = cmd.split(/[.:/]+/) - // console.log('===================',cmd) } var prop = cmd.shift() if (cmd.length === 0) return obj[prop] if (!obj[prop]) return null - // console.log(cmd.length,cmd,prop, obj[prop]) + log.debug({length:cmd.length,cmd:cmd, prop:prop, objprop:obj[prop], method:'_getCmdFunc', msg:'command to corresponding function in a hash'}) return this._getCmdFunc(cmd, obj[prop]) } + // primary function to find a function to call based on packet cmd async _callCmdFunc(packet, socket) { let cmd_func = this._getCmdFuncNamespace( packet.cmd, @@ -433,22 +476,7 @@ class Base extends EventEmitter { return 'failed' } - /* - **********default packet processor for all sockets - */ - async _packetProcess(socket_name, packet) { - - // TODO Try all added packet processors then defualt before sending back - // console.log(socket_name,packet) - let processor = - packet._processor || this._processors[socket_name] || '_default' - return await this._processors[processor].bind(this)( - packet, - socket_name, - this._processors[processor] - ) - } } // end Base Class export default Base diff --git a/src/processing.js b/src/processing.js index 5ff548a..188611d 100644 --- a/src/processing.js +++ b/src/processing.js @@ -7,44 +7,47 @@ let log = logger({ package: 'base',file:'processing.js'}) // processing errors that are caught should be sent back to consumer in packets with :error property // but they might also throw local errors/execptions so they should bubble up here and get caught and logged // messaging errors on socket will not be fatal to the entire socket server + +// common processor, will call based on type s or c the ones below const processor = async function (packet,socket) { let [err,res] = await to(_process[this.getSocket(socket).type].bind(this)(packet,socket)) if (err) { - let error = {error:err, socket:socket, packet:packet, msg:'some possibly unhandled badness happened during packet processing'} - log.warn(error) + let error = {error:err, socket:socket, packet:packet, function:'processor', line: 15, msg:'some possibly unhandled badness happened during packet processing'} + log.error(error) if (process.env.UCI_SHOW_UNHANDLED==='true') console.log(error) } else return res } -export { processor, commands, namespaces } +export { processor, defaultCmds, namespaces } +// default processors for socket/server and consumer/client const _process = { s: async function (packet,socket) { - // console.log('in default socket processor',packet.cmd) - if (!packet.cmd) return {error: '[socket] no command in packet', packet: packet } + if (!packet.cmd) return {error: 'no command (cmd:) in packet for socket', packet: packet } + // this call will search the namespace and envoke a function and return a repsonse packet let response = await this._callCmdFunc(packet,socket); if(response!=='failed') return response return {error: 'no socket processing function supplied for command', packet: packet } }, c: async function (packet,socket) { - // console.log('in default consumer processor',packet.cmd) - if (packet.error) return await this._defaultCmds.c.error(packet) + // the the end of life for a consumer packet that has been sent and returned or a packet that was pushed. + if (packet.error) return await this._c.error(packet) if (packet.cmd) { let response = await this._callCmdFunc(packet,socket); if(response!=='failed') return response - packet = {error:'no consumer processing function supplied for command',packet:packet} - this._defaultCmds.c.error(packet) + packet = {error:'no consumer reply processing function supplied for command',packet:packet} + this._c.error(packet) } else { - packet = {error:'[consumer] no command in packet',packet:packet} - return await this._defaultCmds.c.error(packet) + packet = {error:'[consumer] no command in reply packet',packet:packet} + return await this._c.error(packet) } } } const namespaces = { - s: ['s','_defaultCmds.s'], - c: ['c','_defaultCmds.c'], + s: ['_s'], + c: ['_c'], cn: ['cn'], ct: ['ct'], sn: ['sn'], @@ -60,7 +63,7 @@ const namespaces = { * */ -const commands ={ +const defaultCmds ={ s:{ echo: async packet => { packet.processed = true @@ -75,12 +78,10 @@ const commands ={ }, c:{ error: function (packet) { - // TODO log and make this show only on env debug - log.warn({error:packet.error, packet:packet, msg:'==========Packet ERROR [consumer]========='}) + if (process.env.UCI_ENV==='dev') log.error({error:packet.error, packet:packet, msg:'==========Consumer Sent Packet returned with ERROR ========='}) }, reply: function(packet) { - // TODO log and make this show only on env debug - log.debug({packet:packet, msg:'====Packet returned from socket - debug reply==='}) + if (process.env.UCI_ENV==='dev') log.debug({packet:packet, msg:'====Packet returned from socket - debug default reply logger==='}) } } }