diff --git a/examples/mqtt.js b/examples/mqtt.js index e4def71..0d64923 100644 --- a/examples/mqtt.js +++ b/examples/mqtt.js @@ -2,6 +2,8 @@ import Base from '../src/base' let dy = new Base({id:'dynamic', useRootNS: true }) +const HOST = 'nas.kebler.net' + let sensor = true //dummy simulated push @@ -54,8 +56,9 @@ dy.switch = { (async () => { await dy.init() - await dy.addSocket('mqs','s','m') - dy.socket.mqs.subscribe(['switch/on','switch/off','switch/toggle','sensor/test']) + await dy.addSocket('mqs','s','m',{host:HOST}) + dy.socket.mqs.subscribe(['lights/#']) + console.log('ready') })().catch(err => { console.error('FATAL: UNABLE TO START SYSTEM!\n',err) diff --git a/package.json b/package.json index 950eb62..1354db3 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@uci/base", - "version": "0.1.19", + "version": "0.1.20", "description": "Multi type and transport JSON packet communication base class. Used in UCI extended classes", "main": "src/base", "scripts": { @@ -8,7 +8,7 @@ "fio": "nodemon -r esm examples/four-in-one", "dy": "node -r esm examples/dynamic", "web": "UCI_DEV=true nodemon -r esm examples/web", - "mqtt": "node -r esm examples/mqtt", + "mqtt": "nodemon -r esm examples/mqtt", "testw": "mocha -r esm test/*.test.mjs --watch --recurse --watch-extensions mjs", "test": "mocha -r esm test/*.test.mjs", "testci": "istanbul cover ./node_modules/.bin/_mocha --report lcovonly -- -R spec --recursive && codecov || true" @@ -37,11 +37,11 @@ }, "dependencies": { "@uci-utils/bind-funcs": "^0.2.3", - "@uci-utils/logger": "0.0.13", - "@uci/mqtt": "^0.1.9", - "@uci/socket": "^0.2.10", - "@uci/websocket": "^0.3.5", + "@uci-utils/logger": "^0.0.14", + "@uci/mqtt": "^0.1.11", + "@uci/socket": "^0.2.12", + "@uci/websocket": "^0.3.6", "await-to-js": "^2.1.1", - "p-settle": "^3.0.0" + "p-settle": "^3.1.0" } } diff --git a/src/base.js b/src/base.js index 6b020d7..6792c02 100644 --- a/src/base.js +++ b/src/base.js @@ -20,7 +20,7 @@ import EventEmitter from 'events' import pSettle from 'p-settle' // Internal dependencies -import { processor, commands, namespaces } from './processing' +import { processor, defaultCmds, namespaces } from './processing' // Useful Constants const SOCKET_INFO_KEYS = ['name', 'type', 'transport'] @@ -63,20 +63,23 @@ class Base extends EventEmitter { this.id = opts.id || opts.name || 'uci-base:' + new Date().getTime() log = logger({ name: 'base', id: this.id }) this.desc = opts.desc // additional details for humans - this.socket = {} // holds all the various communication sockets + this._socket = {} // holds all the various communication sockets this._started = false // flag to know when instance has been initialized this._processors = { _default: processor } - this._defaultCmds = commands + // _c and _s are the default namespaces this._namespaces = namespaces + this._c = defaultCmds.c + this._s = defaultCmds.s if (opts.useRootNS) { // add root of instance to checking for command functions - not recommended! this._namespaces.s.splice(-1, 0, null) this._namespaces.c.splice(-1, 0, null) } + // method that will bind a whole object tree of functions this.bindFuncs = bindFuncs // predefined sockets: // comma delimited list of this form '#>' - this.socket = {} + this._socket = {} if (opts.sockets) { opts.sockets.split(/[,|\s]+/).forEach(socketStr => { let socket = {} @@ -111,15 +114,12 @@ class Base extends EventEmitter { async init() { let sockets = [] let initSockets = [] - for (let name of Object.keys(this.socket)) { + for (let name of Object.keys(this._socket)) { initSockets.push(this._initSocket(name)) sockets.push(name) } return pSettle(initSockets).then(res => { - log.info( - { sockets: res }, - 'response from intializing sockets via instance options' - ) + log.debug({ sockets: res, method:'init', line:122, msg:'response from intializing sockets via instance options'}) let err = [] res.forEach((p, index) => { if (p.isRejected) { @@ -144,36 +144,32 @@ class Base extends EventEmitter { * @returns {string} Description */ async addSocket(name, type = 'c', transport = 'n', options = {}) { - log.info( - { socketName: name, type: type, tranport: transport, options: options }, - `adding socket ${name}` - ) + log.debug({ socketName: name, type: type, tranport: transport, options: options, method:'addSocket', line:147, msg:`adding socket ${name}`}) options.id = this.id + ':' + name switch (transport) { case 'n': options.path = options.path || true // falls through case 't': - this.socket[name] = new Socket[TRANSLATE[type]](options) + this._socket[name] = new Socket[TRANSLATE[type]](options) break case 'm': if (type === 'p') type = 'c' options.connect = options.connect || {} + if (options.host) options.connect.host = options.host + if (options.port) options.connect.port = options.port options.connect.connectTimeout = options.connect.connectTimeout || 5000 - this.socket[name] = new MQTT(options) + this._socket[name] = new MQTT(options) break case 'w': - if (type === 's') this.socket[name] = new WebSocket(options) + if (type === 's') this._socket[name] = new WebSocket(options) else - log.warn( - { name: name, type: type, transport: transport }, - 'Web socket not created Consumer/Client Web Socket not supported' - ) + log.warn({ name: name, type: type, transport: transport, method:'addSocket', line:167, msg:'Web socket not created Consumer/Client Web Socket not supported'}) } - this.socket[name].name = name - this.socket[name].type = type - this.socket[name].transport = transport - this.socket[name]._packetProcess = this._packetProcess.bind(this, name) + this._socket[name].name = name + this._socket[name].type = type + this._socket[name].transport = transport + this._socket[name]._packetProcess = this._packetProcess.bind(this, name) if (this._started) return await this._initSocket(name) else return `socket ${name} added` } @@ -186,13 +182,19 @@ class Base extends EventEmitter { */ async removeSocket(name) { // NOTE: uci consumers have .end renamed as .close to match socket method for convenience - let [err] = await to(this.socket[name].close()) - let errmsg = {socket:this.socket[name].name, error:err, msg:'socket/consumer closed with errors but still removed'} + let [err] = await to(this._socket[name].close()) + let errmsg = {socket:this._socket[name].name, error:err, msg:'socket/consumer closed with errors but still removed'} if (err) log.warn(errmsg) - delete this.socket[name] + delete this._socket[name] return err ? errmsg : 'success' } + + getSocket(name) { + if (name) return this._socket[name] + else return this._socket + } + /** * send - Description * @@ -205,43 +207,53 @@ class Base extends EventEmitter { if (typeof name !== 'string') { packet = name let sends = [] - for (let name of Object.keys(this.socket)) { - if (this.socket[name].type === 'c') { - sends.push(this.socket[name].send.bind(this.socket[name])) + for (let name of Object.keys(this._socket)) { + if (this._socket[name].type === 'c') { + let hookedPacket = {} + hookedPacket = this._socket[name].beforeSend ? await this._socket[name].beforeSend.call(this,Object.assign({},packet)) : packet + log.debug({msg:'after possible hook packet to send', name:name, packet:hookedPacket, method:'send', line:217}) + sends.push(this._socket[name].send.bind(this._socket[name],hookedPacket)) } } - if (sends.length === 1) return await sends[0](packet) + if (sends.length === 1) return await sends[0]() return Promise.all( sends.map(send => { - return send(packet) + return send() }) ) } else { - if (this.socket[name]) return await this.socket[name].send(packet) + if (this._socket[name]) { + if (this._socket[name].beforeSend) packet = await this._socket[name].beforeSend.call(this,packet) + log.debug({msg:'single socket hooked packet to send', name:name, packet:packet, method:'send', line:230}) + return await this._socket[name].send(packet) + } else return { error: `no consumer socket of name ${name}` } } } async push(packet) { let broadcast = [] - for (let name of Object.keys(this.socket)) { - if (this.socket[name].type === 's') { - broadcast.push(this.socket[name].push.bind(this.socket[name])) + for (let name of Object.keys(this._socket)) { + if (this._socket[name].type === 's') { + let hookedPacket = {} + hookedPacket = this._socket[name].beforeSend ? await this._socket[name].beforeSend.call(this,Object.assign({},packet),true) : packet + log.debug({msg:'hooked packet to push', name:name, packet:hookedPacket, method:'push', line:243}) + broadcast.push(this._socket[name].push.bind(this._socket[name],hookedPacket)) } } return Promise.all( broadcast.map(push => { - return push(packet) + return push() }) ) } async sendTransport(packet, transport) { let sends = [] - for (let name of Object.keys(this.socket)) { - if (this.socket[name].type === 'c') { - if (this.socket[name].transport === transport) { - sends.push(this.socket[name].send.bind(this.socket[name])) + for (let name of Object.keys(this._socket)) { + if (this._socket[name].type === 'c') { + if (this._socket[name].transport === transport) { + sends.push(this._socket[name].send.bind(this._socket[name])) } } } @@ -253,7 +265,7 @@ class Base extends EventEmitter { ) } - // async sendMQTT(topic, packet) {return this.socket.mqtt.send(topic, packet)} + // async sendMQTT(topic, packet) {return this._socket.mqtt.send(topic, packet)} async sendTCP(packet) { return this.sendTransport(packet, 't') } @@ -261,9 +273,14 @@ class Base extends EventEmitter { return this.sendTransport(packet, 'n') } - getSocket(name) { - return this.socket[name] + + socketsListen(event,fn) { + this._eventListen('s',event,fn) } + consumersListen(event,fn) { + this._eventListen('c',event,fn) + } + getPacketByName(name, packets) { if (!packets.length) packets = [packets] @@ -277,77 +294,84 @@ class Base extends EventEmitter { return found } - socketsListen(event,fn) { - this._eventListen('s',event,fn) - } - consumersListen(event,fn) { - this._eventListen('c',event,fn) - } - - - // TODO confirm Object.assign will be ok as it is not a deep copy - amendConsumerProcessing(funcs, trans) { - if (trans) { - if (!this._defaultCmds.c[trans]) this._defaultCmds.c[trans] = {} - Object.assign(this._defaultCmds.c[trans], funcs) - } - Object.assign(this._defaultCmds.c, funcs) - } - - amendSocketProcessing(funcs, trans) { - if (trans) { - if (!this._defaultCmds.s[trans]) this._defaultCmds.s[trans] = {} - Object.assign(this._defaultCmds.s[trans], funcs) - } - Object.assign(this._defaultCmds.s, funcs) - } - - // use s: and c: keys TODO need to change this - addNamedProcessing(name, funcs, type) { - if (type) { - if (!this._cmds[name][type]) this._cmds[name][type] = {} - Object.assign(this._cmds[name][type], funcs) - } else { - if (!this._cmds[name]) this._cmds[name] = {} - Object.assign(this._cmds[name], funcs) - } - } - - // func should take and return a packet - // beforeSendHook (func,type,transport){} // TODO - // afterReceiveHook(func,type,transport){} // TODO - // afterProcessHook(func,type,transport){} // TODO - - // here you can add namespaced functions for packet commands - - //====================================================================== - // TODO next several need to be redone now that namespace commands work - consumersProcessor(func) { - for (let name of Object.keys(this.socket)) { - if (this.socket[name].type === 'c') { - this.socketNameProcessor(func, name) - } - } - } - - socketsProcessor(func) { - for (let name of Object.keys(this.socket)) { - if (this.socket[name].type === 's') { - this.socketNamedProcessor(func, name) - } - } - } - - socketNameProcessor(func, socket_name) { - socket_name = socket_name || '_default' - this._processors[socket_name]._process = func - } - + // add set of functions to class prop/space and then register with this addNamespace(space, type, trans) { if (trans) return this._namespaces[type + trans].unshift(space) else return this._namespaces[type].unshift(space) } + // TODO confirm Object.assign will be ok as it is not a deep copy + // one off add a command function or two to basic namespaces which is called before default + amendConsumerCommands(funcs, trans) { + if (trans) { + if (!this._c[trans]) this._c[trans] = {} + Object.assign(this._c[trans], funcs) + } + Object.assign(this._c, funcs) + } + amendSocketCommands(funcs, trans) { + if (trans) { + if (!this._s[trans]) this._s[trans] = {} + Object.assign(this._s[trans], funcs) + } + Object.assign(this._s, funcs) + } + + // func should take and return a packet. if type + beforeSendHook(func,opts) { + this._packetHook('beforeSend', func,opts) + } + + beforeProcessHook(func,opts) { + this._packetHook('beforeProcess', func,opts) + } + + afterProcessHook(func,opts) { + this._packetHook('afterProcess', func,opts) + } + + _packetHook(hook,func,opts) { + log.debug({msg:'hooking a socket(s)', method:'_packetHook', line:334, hook:hook, function:func, options:opts}) + let {name,type,trans,all} = opts + if (name) this._socket[name][hook] = func + else { + log.debug({msg:'sockets available to hook', method:'_packetHook', line:338, sockets: Object.keys(this._socket)}) + for (let name of Object.keys(this._socket)) { + if (this._socket[name].type === type) this._socket[name][hook] = func + if (this._socket[name].transport === trans) this._socket[name][hook] = func + if (all) this._socket[name][hook] = func + if (this._socket[name][hook]) log.debug({msg:'hooked socket', method:'_packetHook', line:343, name:name, type:this._socket[name].type, trans:this._socket[name].transport}) + } + } + } + + // A Big Hammer - use only if necessary - default with hooks should suffice + // these three will pre-empt default processor to be called in ._packetProcess + + // add and override default processor for ALL consumers, i.e. packets returning form a send or arrived from a push + consumersProcessor(func) { + for (let name of Object.keys(this._socket)) { + if (this._socket[name].type === 'c') { + this.altProcessor(func, name) + } + } + } + + // add and override default processor for ALL sockets, i.e. packets coming in from consumer send + socketsProcessor(func) { + for (let name of Object.keys(this._socket)) { + if (this._socket[name].type === 's') { + this.altProcessor(func, name) + } + } + } + // add and override a processor for a particular socket/consumer to list of processors + // if no socket name given it will replace the default processor in _processors from processing.js + altProcessor(func, socket_name) { + socket_name = socket_name || '_default' + this._processors[socket_name] = func + } + //====================================================== /* @@ -356,38 +380,58 @@ class Base extends EventEmitter { * */ + /* + **********default packet processor for all sockets + * this can be hooked or replaced all together + */ + + async _packetProcess(socket_name, packet) { + log.debug({ socket:socket_name, packet:packet, method:'_packetProcess', line:393, msg:'processing incoming packet'}) + // the processor can be set via the incoming packet + // otherwise if one is set on the socket or the default found in processing.js + // TODO?? Try all each available packet processors in some order if fails try next one before trying the default + if (this._socket[socket_name].beforeProcess) packet = await this._socket[socket_name].beforeProcess.call(this,packet) + if (packet.error) return packet // hook invalidated the packet abort further processing + let processor = packet._processor || this._processors[socket_name] ? socket_name : '_default' + let res = await this._processors[processor].call(this,packet,socket_name) + if (this._socket[socket_name].afterProcess) res = await this._socket[socket_name].afterProcess.call(this,res) + log.debug({ socket:socket_name, response:res, msg:'processed packet ready for return'}) + return res + } + async _initSocket(name) { - let socket = this.socket[name] + let socket = this._socket[name] let init = {} - if (this.socket[name].type === 's' && this.socket[name].transport !== 'm') { + if (this._socket[name].type === 's' && this._socket[name].transport !== 'm') { init = socket.create } else { init = socket.connect } - log.info(`initializing socket ${name}, ${socket.type}, ${socket.transport}`) + log.debug(`initializing socket ${name}, ${socket.type}, ${socket.transport}`) if (this._started) return `socket ${name} added and initialzed, ${await init()}` else return init() } + // all sockets are emitters. Adds a listener to all sockets of a type with given event. + // now sockets can emit locally processed events _eventListen(type,event,fn) { - for (let name of Object.keys(this.socket)) { - if (this.socket[name].type === type) { - if (fn==='stop') this.socket[name].removeAllListeners(event) + for (let name of Object.keys(this._socket)) { + if (this._socket[name].type === type) { + if (fn==='stop') this._socket[name].removeAllListeners(event) else { - console.log('adding listener',name,type,event,fn ) - console.log(this.socket[name].name) - this.socket[name].on(event, fn) + log.debug({socket:name, type:type, event:event, msg:'adding listener to socket'}) + this._socket[name].on(event, fn) } } } } _transport(name) { - return this.socket[name].transport + return this._socket[name].transport } //getter for socket transport _type(name) { - return this.socket[name].type + return this._socket[name].type } //getter for socket type _getTransportNamespaces(socket) { @@ -406,19 +450,18 @@ class Base extends EventEmitter { // takes command and returns corresponding function in a hash _getCmdFunc(cmd, obj) { - // console.log('obj',obj) if (typeof cmd === 'string') { if (!obj) obj = this cmd = cmd.split(/[.:/]+/) - // console.log('===================',cmd) } var prop = cmd.shift() if (cmd.length === 0) return obj[prop] if (!obj[prop]) return null - // console.log(cmd.length,cmd,prop, obj[prop]) + log.debug({length:cmd.length,cmd:cmd, prop:prop, objprop:obj[prop], method:'_getCmdFunc', msg:'command to corresponding function in a hash'}) return this._getCmdFunc(cmd, obj[prop]) } + // primary function to find a function to call based on packet cmd async _callCmdFunc(packet, socket) { let cmd_func = this._getCmdFuncNamespace( packet.cmd, @@ -433,22 +476,7 @@ class Base extends EventEmitter { return 'failed' } - /* - **********default packet processor for all sockets - */ - async _packetProcess(socket_name, packet) { - - // TODO Try all added packet processors then defualt before sending back - // console.log(socket_name,packet) - let processor = - packet._processor || this._processors[socket_name] || '_default' - return await this._processors[processor].bind(this)( - packet, - socket_name, - this._processors[processor] - ) - } } // end Base Class export default Base diff --git a/src/processing.js b/src/processing.js index 5ff548a..188611d 100644 --- a/src/processing.js +++ b/src/processing.js @@ -7,44 +7,47 @@ let log = logger({ package: 'base',file:'processing.js'}) // processing errors that are caught should be sent back to consumer in packets with :error property // but they might also throw local errors/execptions so they should bubble up here and get caught and logged // messaging errors on socket will not be fatal to the entire socket server + +// common processor, will call based on type s or c the ones below const processor = async function (packet,socket) { let [err,res] = await to(_process[this.getSocket(socket).type].bind(this)(packet,socket)) if (err) { - let error = {error:err, socket:socket, packet:packet, msg:'some possibly unhandled badness happened during packet processing'} - log.warn(error) + let error = {error:err, socket:socket, packet:packet, function:'processor', line: 15, msg:'some possibly unhandled badness happened during packet processing'} + log.error(error) if (process.env.UCI_SHOW_UNHANDLED==='true') console.log(error) } else return res } -export { processor, commands, namespaces } +export { processor, defaultCmds, namespaces } +// default processors for socket/server and consumer/client const _process = { s: async function (packet,socket) { - // console.log('in default socket processor',packet.cmd) - if (!packet.cmd) return {error: '[socket] no command in packet', packet: packet } + if (!packet.cmd) return {error: 'no command (cmd:) in packet for socket', packet: packet } + // this call will search the namespace and envoke a function and return a repsonse packet let response = await this._callCmdFunc(packet,socket); if(response!=='failed') return response return {error: 'no socket processing function supplied for command', packet: packet } }, c: async function (packet,socket) { - // console.log('in default consumer processor',packet.cmd) - if (packet.error) return await this._defaultCmds.c.error(packet) + // the the end of life for a consumer packet that has been sent and returned or a packet that was pushed. + if (packet.error) return await this._c.error(packet) if (packet.cmd) { let response = await this._callCmdFunc(packet,socket); if(response!=='failed') return response - packet = {error:'no consumer processing function supplied for command',packet:packet} - this._defaultCmds.c.error(packet) + packet = {error:'no consumer reply processing function supplied for command',packet:packet} + this._c.error(packet) } else { - packet = {error:'[consumer] no command in packet',packet:packet} - return await this._defaultCmds.c.error(packet) + packet = {error:'[consumer] no command in reply packet',packet:packet} + return await this._c.error(packet) } } } const namespaces = { - s: ['s','_defaultCmds.s'], - c: ['c','_defaultCmds.c'], + s: ['_s'], + c: ['_c'], cn: ['cn'], ct: ['ct'], sn: ['sn'], @@ -60,7 +63,7 @@ const namespaces = { * */ -const commands ={ +const defaultCmds ={ s:{ echo: async packet => { packet.processed = true @@ -75,12 +78,10 @@ const commands ={ }, c:{ error: function (packet) { - // TODO log and make this show only on env debug - log.warn({error:packet.error, packet:packet, msg:'==========Packet ERROR [consumer]========='}) + if (process.env.UCI_ENV==='dev') log.error({error:packet.error, packet:packet, msg:'==========Consumer Sent Packet returned with ERROR ========='}) }, reply: function(packet) { - // TODO log and make this show only on env debug - log.debug({packet:packet, msg:'====Packet returned from socket - debug reply==='}) + if (process.env.UCI_ENV==='dev') log.debug({packet:packet, msg:'====Packet returned from socket - debug default reply logger==='}) } } }