From 1359aa7d31f0bae1ac2e4f537470c1b5fb9c5adc Mon Sep 17 00:00:00 2001 From: David Kebler Date: Sun, 17 Mar 2019 13:45:19 -0700 Subject: [PATCH] 0.1.17 add removeSocket method, add final catch of all packet processing errors. --- examples/four-in-one.js | 4 ++-- package.json | 13 +++++++------ src/base.js | 17 ++++++++++++++--- src/processing.js | 28 +++++++++++++++++----------- 4 files changed, 40 insertions(+), 22 deletions(-) diff --git a/examples/four-in-one.js b/examples/four-in-one.js index 6ab8242..f75fabb 100644 --- a/examples/four-in-one.js +++ b/examples/four-in-one.js @@ -50,7 +50,7 @@ function status(state,consumer) { } } -let fio = new Base({sockets:'uc#c>n,us#s>n,tc#c>t,ts#s>t,mqtts#s>m,mqtt#c>m,webs#s>w', tc:{port:8100}, ts:{port:8100}, webs:{ port:8090 }, mqtts:{ topics:['switch/on','switch/off','switch/toggle']}, mqtt:{ topics:['switch/status']}}) +let fio = new Base({sockets:'uc#c>n,us#s>n,tc#c>t,ts#s>t', tc:{port:8100}, ts:{port:8100}, webs:{ port:8090 }, mqtts:{ topics:['switch/on','switch/off','switch/toggle']}, mqtt:{ topics:['switch/status']}}) fio.s = socketfuncs fio.c = { reply: () => {return Promise.resolve()} } @@ -60,7 +60,7 @@ fio.cm = {switch:{status:status('on','mqtt')}} ; (async () => { - + await fio.addSocket('mqtt','s','m',{ topics:['switch/on','switch/off','switch/toggle']}) let res = await fio.init() console.log('initialize errors',res) console.log('waiting for packets') diff --git a/package.json b/package.json index d8b27d5..a23d839 100644 --- a/package.json +++ b/package.json @@ -1,11 +1,11 @@ { "name": "@uci/base", - "version": "0.1.16", + "version": "0.1.17", "description": "Multi type and transport JSON packet communication base class. Used in UCI extended classes", "main": "src/base", "scripts": { - "fiod": "./node_modules/.bin/nodemon -r esm examples/four-in-one", - "fio": "node -r esm examples/four-in-one", + "fiod": "UCI_ENV=dev ./node_modules/.bin/nodemon -r esm examples/four-in-one", + "fio": "nodemon -r esm examples/four-in-one", "dy": "node -r esm examples/dynamic", "web": "UCI_DEV=true nodemon -r esm examples/web", "mqtt": "node -r esm examples/mqtt", @@ -38,9 +38,10 @@ "dependencies": { "@uci-utils/bind-funcs": "^0.2.3", "@uci-utils/logger": "0.0.13", - "@uci/mqtt": "^0.1.8", - "@uci/socket": "^0.2.7", - "@uci/websocket": "^0.3.4", + "@uci/mqtt": "^0.1.9", + "@uci/socket": "^0.2.8", + "@uci/websocket": "^0.3.5", + "await-to-js": "^2.1.1", "p-settle": "^2.1.0" } } diff --git a/src/base.js b/src/base.js index ab71af6..42cc232 100644 --- a/src/base.js +++ b/src/base.js @@ -15,6 +15,7 @@ import logger from '@uci-utils/logger' let log = {} // declare module wide log to be set during construction // Community dependencies +import to from 'await-to-js' import EventEmitter from 'events' import pSettle from 'p-settle' @@ -173,7 +174,7 @@ class Base extends EventEmitter { this.socket[name].type = type this.socket[name].transport = transport this.socket[name]._packetProcess = this._packetProcess.bind(this, name) - if (this._started) log.info(await this._initSocket(name)) + if (this._started) return await this._initSocket(name) else return `socket ${name} added` } @@ -184,7 +185,12 @@ class Base extends EventEmitter { * @returns {String | Object } success string or error object */ async removeSocket(name) { - //TODO + // NOTE: uci consumers have .end renamed as .close to match socket method for convenience + let [err] = await to(this.socket[name].close()) + let errmsg = {socket:this.socket[name].name, error:err, msg:'socket/consumer closed with errors but still removed'} + if (err) log.warn(errmsg) + delete this.socket[name] + return err ? errmsg : 'success' } /** @@ -305,6 +311,9 @@ class Base extends EventEmitter { // afterProcessHook(func,type,transport){} // TODO // here you can add namespaced functions for packet commands + + //====================================================================== + // TODO next several need to be redone now that namespace commands work consumersProcessor(func) { for (let name of Object.keys(this.socket)) { if (this.socket[name].type === 'c') { @@ -331,6 +340,8 @@ class Base extends EventEmitter { else return this._namespaces[type].unshift(space) } + //====================================================== + /* * * Private Methods @@ -392,7 +403,7 @@ class Base extends EventEmitter { packet.cmd, this._namespaces[this._type(socket) + this._transport(socket)] ) - if (cmd_func) return await cmd_func.bind(this)(packet) + if (cmd_func) return await cmd_func.bind(this)(packet) // todo try .call cmd_func = this._getCmdFuncNamespace( packet.cmd, this._namespaces[this._type(socket)] diff --git a/src/processing.js b/src/processing.js index 7ad73ac..809a5b2 100644 --- a/src/processing.js +++ b/src/processing.js @@ -1,14 +1,25 @@ - +import to from 'await-to-js' +import logger from '@uci-utils/logger' +let log = logger({ package: 'base',file:'processing.js'}) // this._processing refers to this module/hash +// processing errors that are caught should be sent back to consumer in packets with :error property +// but they might also throw local errors/execptions so they should bubble up here and get caught and logged +// messaging errors on socket will not be fatal to the entire socket server const processor = async function (packet,socket) { - return await process[this.getSocket(socket).type].bind(this)(packet,socket) + let [err,res] = await to(_process[this.getSocket(socket).type].bind(this)(packet,socket)) + if (err) { + let error = {error:err, socket:socket, packet:packet, msg:'some possibly unhandled badness happened during packet processing'} + log.warn(error) + if (process.env.UCI_SHOW_UNHANDLED==='true') console.log(error) + } + else return res } export { processor, commands, namespaces } -const process = { +const _process = { s: async function (packet,socket) { // console.log('in default socket processor',packet.cmd) if (!packet.cmd) return {error: '[socket] no command in packet', packet: packet } @@ -32,7 +43,7 @@ const process = { } const namespaces = { - s: ['s','_defaultCmds.s'], + s: ['s','_defaultCmds.s'], c: ['c','_defaultCmds.c'], cn: ['cn'], ct: ['ct'], @@ -63,16 +74,11 @@ const commands ={ c:{ error: function (packet) { // TODO log and make this show only on env debug - console.log('==============Packet ERROR [consumer]==========') - console.log(packet.error ) - console.dir(packet.packet) - console.log('===========================') + log.warn({error:packet.error, packet:packet, msg:'==========Packet ERROR [consumer]========='}) }, reply: function(packet) { // TODO log and make this show only on env debug - console.log('==============Packet returned from socket - default reply==========') - console.dir(packet) - console.log('===========================') + log.debug({packet:packet, msg:'====Packet returned from socket - debug reply==='}) } } }