From 1b3d5fceb7efcd11fd65f219cfbe17ba374c9faf Mon Sep 17 00:00:00 2001 From: David Kebler Date: Thu, 29 Aug 2019 13:41:32 -0700 Subject: [PATCH] refactored socket initialization to better catch and emit errors and remove offending sockets will now emit 'error', 'warn', 'fatal' when needed so instance can track and track easier and take action (like notification) sockets will bubble these up now. --- nodemon.json | 2 +- package.json | 10 ++--- src/base.js | 102 +++++++++++++++++++++++++++++++++++---------------- test/test.js | 43 ++++++++++++++++++++++ 4 files changed, 119 insertions(+), 38 deletions(-) create mode 100644 test/test.js diff --git a/nodemon.json b/nodemon.json index 7044a6f..ab901e4 100644 --- a/nodemon.json +++ b/nodemon.json @@ -1,4 +1,4 @@ { "ignoreRoot": [".git","examples/ws-fio-client"], - "watch": ["node_modules/@uci/","node_modules/@uci-utils/","src/","index.js","examples/"] + "watch": ["node_modules/@uci/","node_modules/@uci-utils/","src/","index.js","examples/","test/"] } diff --git a/package.json b/package.json index c8c9bd1..3ebfed8 100644 --- a/package.json +++ b/package.json @@ -6,12 +6,9 @@ "scripts": { "fiod": "UCI_ENV=dev ./node_modules/.bin/nodemon -r esm --preserve-symlinks examples/four-in-one", "fio": "nodemon -r esm examples/four-in-one", - "dy": "node -r esm examples/dynamic", - "web": "UCI_DEV=true nodemon -r esm examples/web", - "mqtt": "nodemon -r esm examples/mqtt", - "ha-mqtt": "nodemon -r esm examples/ha-mqtt", - "testw": "mocha -r esm test/*.test.mjs --watch --recurse ", - "test": "mocha -r esm test/*.test.mjs" + "test": "UCI_ENV=dev nodemon -r esm --preserve-symlinks test/test", + "mtestw": "mocha -r esm test/*.test.mjs --watch --recurse ", + "mtest": "mocha -r esm test/*.test.mjs" }, "author": "David Kebler", "license": "MIT", @@ -42,6 +39,7 @@ "@uci/socket": "^0.2.19", "@uci/websocket": "^0.3.8", "await-to-js": "^2.1.1", + "p-reflect": "^2.1.0", "p-settle": "^3.1.0" } } diff --git a/src/base.js b/src/base.js index 97010d4..680a6a0 100644 --- a/src/base.js +++ b/src/base.js @@ -17,7 +17,8 @@ let log = {} // declare module wide log to be set during construction // Community dependencies import to from 'await-to-js' import EventEmitter from 'events' -import pSettle from 'p-settle' +// import pSettle from 'p-settle' +// import pReflect from 'p-reflect' // Internal dependencies import { processor, defaultCmds, namespaces } from './processing' @@ -112,26 +113,42 @@ class Base extends EventEmitter { */ async init() { - let sockets = [] - let initSockets = [] - for (let name of Object.keys(this._socket)) { - initSockets.push(this._initSocket(name)) - sockets.push(name) + + let results = {} + let errors = {} + + const pReflect = async socket => { + try { + const value = await socket.init() + results[socket.name] = value + } catch (error) { + this.emit('error',{msg:'socket init error',error:error})// emit an error here, remove socket + let res = await this.removeSocket(socket.name) + errors[socket.name]={error:error, remove:res} + } } - return pSettle(initSockets).then(res => { - log.debug({ sockets: res, method:'init', line:122, msg:'response from intializing sockets via instance options'}) - let err = [] - res.forEach((p, index) => { - if (p.isRejected) { - err.push({ name: sockets[index], err: p.reason }) - } - }) - this._started = true - return err - // TODO if a websocket server was working then push status - // TODO if no mqtt broker then attempt to start one - }) - } // init + + // console.log('in init', this._socket) + // for (let name in this._socket) { + // let socket = this._initSocket(name) + // console.log(socket) + // let [err,res] = await to(socket.init()) + // if (err) errors[name] = err + // results[name] = res + // } + // this._started = true + // return {results:results, errors:errors} + + let sockets = [] + for (let name of Object.keys(this._socket)) { + sockets.push(this._initSocket(name)) + } + await Promise.all(sockets.map(pReflect)) + + if(Object.keys(errors).length===0) errors=false + this._started = true + return {results:results, errors:errors} + } /** * addSocket - Add a socket at runtime as opposed to via the sockets option at creation @@ -170,6 +187,20 @@ class Base extends EventEmitter { this._socket[name].type = type this._socket[name].transport = transport this._socket[name]._packetProcess = this._packetProcess.bind(this, name) + + let bubble = (msg) => { + console.log(msg,name,this._socket[name].name) + this._socket[name].on(msg, ev => { + ev.socketName=name + this.emit(msg, ev) + }) + } + + const msgs = ['error','warn','fatal'] + msgs.map(bubble) // bubble up any emitted errors + + + // do this as .then promise then addSocket doesn't need to be async before init if (this._started) return await this._initSocket(name) else return `socket ${name} added` } @@ -182,14 +213,16 @@ class Base extends EventEmitter { */ async removeSocket(name) { // NOTE: uci consumers have .end renamed as .close to match socket method for convenience + let closeError let [err] = await to(this._socket[name].close()) - let errmsg = {socket:this._socket[name].name, error:err, msg:'socket/consumer closed with errors but still removed'} - if (err) log.warn(errmsg) + if (err.code !== 'ERR_SERVER_NOT_RUNNING') { + closeError = {socket:this._socket[name].name, error:err, msg:'socket/consumer closed with errors, but removed'} + } + this.emit('warn', {msg:`socket ${name} has been removed`, socket:this._socket[name].opts}) delete this._socket[name] - return err ? errmsg : 'success' + return closeError ? closeError : 'success' } - getSocket(name) { if (name) return this._socket[name] else return this._socket @@ -383,8 +416,6 @@ class Base extends EventEmitter { * */ - - _packetHook(hook,func,opts) { log.debug({msg:'hooking a socket(s)', method:'_packetHook', line:334, hook:hook, function:func, options:opts}) let {name,type,trans,all} = opts @@ -421,7 +452,7 @@ class Base extends EventEmitter { return res } - async _initSocket(name) { + _initSocket(name) { let socket = this._socket[name] let init = {} if (this._socket[name].type === 's' && this._socket[name].transport !== 'm') { @@ -429,10 +460,19 @@ class Base extends EventEmitter { } else { init = socket.connect } - log.debug(`initializing socket ${name}, ${socket.type}, ${socket.transport}`) - if (this._started) - return `socket ${name} added and initialzed, ${await init()}` - else return init() + log.info({msg:`initializing socket ${name}, ${socket.type}, ${socket.transport}`}) + + if (this._started) { + return init().then(function(res) { + return `socket ${name} added and initialzed, ${res}` + }) + .catch(function(err) { + this.emit('error', {msg:'failed initialization', error:err, socket:socket, code:'SOCKET_INIT'}) + return {msg:`socket ${name} failed initialization`, error:err} + }.bind(this) + ) + } + else return {name:name, init:init} } // all sockets are emitters. Adds a listener to all sockets of a type with given event. diff --git a/test/test.js b/test/test.js new file mode 100644 index 0000000..9d0a193 --- /dev/null +++ b/test/test.js @@ -0,0 +1,43 @@ +import Base from '../src/base' + + +let wstest = new Base({}) +; +(async () => { + + wstest.on('error', err => { + console.log('ATTENTION! =========base instance error emitted ========') + console.log(err) + }) + + wstest.on('warn', warn => { + console.log('ATTENTION! =========base instance warning emitted ========') + console.log(warn) + }) + + // await wstest.addSocket('web0','s','w',{port:9000}) + // await wstest.addSocket('m','s','m',{host:'nas.kebler.net'}) + await wstest.addSocket('t1','s','t',{port:8001}) + await wstest.addSocket('t2','s','t',{port:8001}) + await wstest.addSocket('t3','s','t',{port:8003}) + // await wstest.addSocket('mxxx','s','m',{host:'nas.kebler.net'}) + // await wstest.addSocket('m2','s','m',{host:'nas.kebler.net'}) + // await wstest.addSocket('web1','s','w',{port:9001}) + + let res = await wstest.init() + // await wstest.addSocket('web2','s','w',{port:9002}) + if (res.errors) { + console.log('initialize errors reported') + console.log(res.errors) + } + console.log('sockets initialize responses\n',res.results) + wstest.push({cmd:'test', data:'test'}) + + // await wstest.addSocket('web2','s','w',{port:9002}) + + wstest.removeAllListeners('error') + +})().catch(err => { + console.error('FATAL: UNABLE TO START SYSTEM!\n',err) + process.kill(process.pid, 'SIGTERM') +})