0.1.17 add removeSocket method, add final catch of all packet processing errors.

master
David Kebler 2019-03-17 13:45:19 -07:00
parent 0a86774b17
commit 1359aa7d31
4 changed files with 40 additions and 22 deletions

View File

@ -50,7 +50,7 @@ function status(state,consumer) {
} }
} }
let fio = new Base({sockets:'uc#c>n,us#s>n,tc#c>t,ts#s>t,mqtts#s>m,mqtt#c>m,webs#s>w', tc:{port:8100}, ts:{port:8100}, webs:{ port:8090 }, mqtts:{ topics:['switch/on','switch/off','switch/toggle']}, mqtt:{ topics:['switch/status']}}) let fio = new Base({sockets:'uc#c>n,us#s>n,tc#c>t,ts#s>t', tc:{port:8100}, ts:{port:8100}, webs:{ port:8090 }, mqtts:{ topics:['switch/on','switch/off','switch/toggle']}, mqtt:{ topics:['switch/status']}})
fio.s = socketfuncs fio.s = socketfuncs
fio.c = { reply: () => {return Promise.resolve()} } fio.c = { reply: () => {return Promise.resolve()} }
@ -60,7 +60,7 @@ fio.cm = {switch:{status:status('on','mqtt')}}
; ;
(async () => { (async () => {
await fio.addSocket('mqtt','s','m',{ topics:['switch/on','switch/off','switch/toggle']})
let res = await fio.init() let res = await fio.init()
console.log('initialize errors',res) console.log('initialize errors',res)
console.log('waiting for packets') console.log('waiting for packets')

View File

@ -1,11 +1,11 @@
{ {
"name": "@uci/base", "name": "@uci/base",
"version": "0.1.16", "version": "0.1.17",
"description": "Multi type and transport JSON packet communication base class. Used in UCI extended classes", "description": "Multi type and transport JSON packet communication base class. Used in UCI extended classes",
"main": "src/base", "main": "src/base",
"scripts": { "scripts": {
"fiod": "./node_modules/.bin/nodemon -r esm examples/four-in-one", "fiod": "UCI_ENV=dev ./node_modules/.bin/nodemon -r esm examples/four-in-one",
"fio": "node -r esm examples/four-in-one", "fio": "nodemon -r esm examples/four-in-one",
"dy": "node -r esm examples/dynamic", "dy": "node -r esm examples/dynamic",
"web": "UCI_DEV=true nodemon -r esm examples/web", "web": "UCI_DEV=true nodemon -r esm examples/web",
"mqtt": "node -r esm examples/mqtt", "mqtt": "node -r esm examples/mqtt",
@ -38,9 +38,10 @@
"dependencies": { "dependencies": {
"@uci-utils/bind-funcs": "^0.2.3", "@uci-utils/bind-funcs": "^0.2.3",
"@uci-utils/logger": "0.0.13", "@uci-utils/logger": "0.0.13",
"@uci/mqtt": "^0.1.8", "@uci/mqtt": "^0.1.9",
"@uci/socket": "^0.2.7", "@uci/socket": "^0.2.8",
"@uci/websocket": "^0.3.4", "@uci/websocket": "^0.3.5",
"await-to-js": "^2.1.1",
"p-settle": "^2.1.0" "p-settle": "^2.1.0"
} }
} }

View File

@ -15,6 +15,7 @@ import logger from '@uci-utils/logger'
let log = {} // declare module wide log to be set during construction let log = {} // declare module wide log to be set during construction
// Community dependencies // Community dependencies
import to from 'await-to-js'
import EventEmitter from 'events' import EventEmitter from 'events'
import pSettle from 'p-settle' import pSettle from 'p-settle'
@ -173,7 +174,7 @@ class Base extends EventEmitter {
this.socket[name].type = type this.socket[name].type = type
this.socket[name].transport = transport this.socket[name].transport = transport
this.socket[name]._packetProcess = this._packetProcess.bind(this, name) this.socket[name]._packetProcess = this._packetProcess.bind(this, name)
if (this._started) log.info(await this._initSocket(name)) if (this._started) return await this._initSocket(name)
else return `socket ${name} added` else return `socket ${name} added`
} }
@ -184,7 +185,12 @@ class Base extends EventEmitter {
* @returns {String | Object } success string or error object * @returns {String | Object } success string or error object
*/ */
async removeSocket(name) { async removeSocket(name) {
//TODO // NOTE: uci consumers have .end renamed as .close to match socket method for convenience
let [err] = await to(this.socket[name].close())
let errmsg = {socket:this.socket[name].name, error:err, msg:'socket/consumer closed with errors but still removed'}
if (err) log.warn(errmsg)
delete this.socket[name]
return err ? errmsg : 'success'
} }
/** /**
@ -305,6 +311,9 @@ class Base extends EventEmitter {
// afterProcessHook(func,type,transport){} // TODO // afterProcessHook(func,type,transport){} // TODO
// here you can add namespaced functions for packet commands // here you can add namespaced functions for packet commands
//======================================================================
// TODO next several need to be redone now that namespace commands work
consumersProcessor(func) { consumersProcessor(func) {
for (let name of Object.keys(this.socket)) { for (let name of Object.keys(this.socket)) {
if (this.socket[name].type === 'c') { if (this.socket[name].type === 'c') {
@ -331,6 +340,8 @@ class Base extends EventEmitter {
else return this._namespaces[type].unshift(space) else return this._namespaces[type].unshift(space)
} }
//======================================================
/* /*
* *
* Private Methods * Private Methods
@ -392,7 +403,7 @@ class Base extends EventEmitter {
packet.cmd, packet.cmd,
this._namespaces[this._type(socket) + this._transport(socket)] this._namespaces[this._type(socket) + this._transport(socket)]
) )
if (cmd_func) return await cmd_func.bind(this)(packet) if (cmd_func) return await cmd_func.bind(this)(packet) // todo try .call
cmd_func = this._getCmdFuncNamespace( cmd_func = this._getCmdFuncNamespace(
packet.cmd, packet.cmd,
this._namespaces[this._type(socket)] this._namespaces[this._type(socket)]

View File

@ -1,14 +1,25 @@
import to from 'await-to-js'
import logger from '@uci-utils/logger'
let log = logger({ package: 'base',file:'processing.js'})
// this._processing refers to this module/hash // this._processing refers to this module/hash
// processing errors that are caught should be sent back to consumer in packets with :error property
// but they might also throw local errors/execptions so they should bubble up here and get caught and logged
// messaging errors on socket will not be fatal to the entire socket server
const processor = async function (packet,socket) { const processor = async function (packet,socket) {
return await process[this.getSocket(socket).type].bind(this)(packet,socket) let [err,res] = await to(_process[this.getSocket(socket).type].bind(this)(packet,socket))
if (err) {
let error = {error:err, socket:socket, packet:packet, msg:'some possibly unhandled badness happened during packet processing'}
log.warn(error)
if (process.env.UCI_SHOW_UNHANDLED==='true') console.log(error)
}
else return res
} }
export { processor, commands, namespaces } export { processor, commands, namespaces }
const process = { const _process = {
s: async function (packet,socket) { s: async function (packet,socket) {
// console.log('in default socket processor',packet.cmd) // console.log('in default socket processor',packet.cmd)
if (!packet.cmd) return {error: '[socket] no command in packet', packet: packet } if (!packet.cmd) return {error: '[socket] no command in packet', packet: packet }
@ -32,7 +43,7 @@ const process = {
} }
const namespaces = { const namespaces = {
s: ['s','_defaultCmds.s'], s: ['s','_defaultCmds.s'],
c: ['c','_defaultCmds.c'], c: ['c','_defaultCmds.c'],
cn: ['cn'], cn: ['cn'],
ct: ['ct'], ct: ['ct'],
@ -63,16 +74,11 @@ const commands ={
c:{ c:{
error: function (packet) { error: function (packet) {
// TODO log and make this show only on env debug // TODO log and make this show only on env debug
console.log('==============Packet ERROR [consumer]==========') log.warn({error:packet.error, packet:packet, msg:'==========Packet ERROR [consumer]========='})
console.log(packet.error )
console.dir(packet.packet)
console.log('===========================')
}, },
reply: function(packet) { reply: function(packet) {
// TODO log and make this show only on env debug // TODO log and make this show only on env debug
console.log('==============Packet returned from socket - default reply==========') log.debug({packet:packet, msg:'====Packet returned from socket - debug reply==='})
console.dir(packet)
console.log('===========================')
} }
} }
} }