diff --git a/nodemon.json b/nodemon.json index 7044a6f..ab901e4 100644 --- a/nodemon.json +++ b/nodemon.json @@ -1,4 +1,4 @@ { "ignoreRoot": [".git","examples/ws-fio-client"], - "watch": ["node_modules/@uci/","node_modules/@uci-utils/","src/","index.js","examples/"] + "watch": ["node_modules/@uci/","node_modules/@uci-utils/","src/","index.js","examples/","test/"] } diff --git a/package.json b/package.json index c8c9bd1..3ebfed8 100644 --- a/package.json +++ b/package.json @@ -6,12 +6,9 @@ "scripts": { "fiod": "UCI_ENV=dev ./node_modules/.bin/nodemon -r esm --preserve-symlinks examples/four-in-one", "fio": "nodemon -r esm examples/four-in-one", - "dy": "node -r esm examples/dynamic", - "web": "UCI_DEV=true nodemon -r esm examples/web", - "mqtt": "nodemon -r esm examples/mqtt", - "ha-mqtt": "nodemon -r esm examples/ha-mqtt", - "testw": "mocha -r esm test/*.test.mjs --watch --recurse ", - "test": "mocha -r esm test/*.test.mjs" + "test": "UCI_ENV=dev nodemon -r esm --preserve-symlinks test/test", + "mtestw": "mocha -r esm test/*.test.mjs --watch --recurse ", + "mtest": "mocha -r esm test/*.test.mjs" }, "author": "David Kebler", "license": "MIT", @@ -42,6 +39,7 @@ "@uci/socket": "^0.2.19", "@uci/websocket": "^0.3.8", "await-to-js": "^2.1.1", + "p-reflect": "^2.1.0", "p-settle": "^3.1.0" } } diff --git a/src/base.js b/src/base.js index 97010d4..680a6a0 100644 --- a/src/base.js +++ b/src/base.js @@ -17,7 +17,8 @@ let log = {} // declare module wide log to be set during construction // Community dependencies import to from 'await-to-js' import EventEmitter from 'events' -import pSettle from 'p-settle' +// import pSettle from 'p-settle' +// import pReflect from 'p-reflect' // Internal dependencies import { processor, defaultCmds, namespaces } from './processing' @@ -112,26 +113,42 @@ class Base extends EventEmitter { */ async init() { - let sockets = [] - let initSockets = [] - for (let name of Object.keys(this._socket)) { - initSockets.push(this._initSocket(name)) - sockets.push(name) + + let results = {} + let errors = {} + + const pReflect = async socket => { + try { + const value = await socket.init() + results[socket.name] = value + } catch (error) { + this.emit('error',{msg:'socket init error',error:error})// emit an error here, remove socket + let res = await this.removeSocket(socket.name) + errors[socket.name]={error:error, remove:res} + } } - return pSettle(initSockets).then(res => { - log.debug({ sockets: res, method:'init', line:122, msg:'response from intializing sockets via instance options'}) - let err = [] - res.forEach((p, index) => { - if (p.isRejected) { - err.push({ name: sockets[index], err: p.reason }) - } - }) - this._started = true - return err - // TODO if a websocket server was working then push status - // TODO if no mqtt broker then attempt to start one - }) - } // init + + // console.log('in init', this._socket) + // for (let name in this._socket) { + // let socket = this._initSocket(name) + // console.log(socket) + // let [err,res] = await to(socket.init()) + // if (err) errors[name] = err + // results[name] = res + // } + // this._started = true + // return {results:results, errors:errors} + + let sockets = [] + for (let name of Object.keys(this._socket)) { + sockets.push(this._initSocket(name)) + } + await Promise.all(sockets.map(pReflect)) + + if(Object.keys(errors).length===0) errors=false + this._started = true + return {results:results, errors:errors} + } /** * addSocket - Add a socket at runtime as opposed to via the sockets option at creation @@ -170,6 +187,20 @@ class Base extends EventEmitter { this._socket[name].type = type this._socket[name].transport = transport this._socket[name]._packetProcess = this._packetProcess.bind(this, name) + + let bubble = (msg) => { + console.log(msg,name,this._socket[name].name) + this._socket[name].on(msg, ev => { + ev.socketName=name + this.emit(msg, ev) + }) + } + + const msgs = ['error','warn','fatal'] + msgs.map(bubble) // bubble up any emitted errors + + + // do this as .then promise then addSocket doesn't need to be async before init if (this._started) return await this._initSocket(name) else return `socket ${name} added` } @@ -182,14 +213,16 @@ class Base extends EventEmitter { */ async removeSocket(name) { // NOTE: uci consumers have .end renamed as .close to match socket method for convenience + let closeError let [err] = await to(this._socket[name].close()) - let errmsg = {socket:this._socket[name].name, error:err, msg:'socket/consumer closed with errors but still removed'} - if (err) log.warn(errmsg) + if (err.code !== 'ERR_SERVER_NOT_RUNNING') { + closeError = {socket:this._socket[name].name, error:err, msg:'socket/consumer closed with errors, but removed'} + } + this.emit('warn', {msg:`socket ${name} has been removed`, socket:this._socket[name].opts}) delete this._socket[name] - return err ? errmsg : 'success' + return closeError ? closeError : 'success' } - getSocket(name) { if (name) return this._socket[name] else return this._socket @@ -383,8 +416,6 @@ class Base extends EventEmitter { * */ - - _packetHook(hook,func,opts) { log.debug({msg:'hooking a socket(s)', method:'_packetHook', line:334, hook:hook, function:func, options:opts}) let {name,type,trans,all} = opts @@ -421,7 +452,7 @@ class Base extends EventEmitter { return res } - async _initSocket(name) { + _initSocket(name) { let socket = this._socket[name] let init = {} if (this._socket[name].type === 's' && this._socket[name].transport !== 'm') { @@ -429,10 +460,19 @@ class Base extends EventEmitter { } else { init = socket.connect } - log.debug(`initializing socket ${name}, ${socket.type}, ${socket.transport}`) - if (this._started) - return `socket ${name} added and initialzed, ${await init()}` - else return init() + log.info({msg:`initializing socket ${name}, ${socket.type}, ${socket.transport}`}) + + if (this._started) { + return init().then(function(res) { + return `socket ${name} added and initialzed, ${res}` + }) + .catch(function(err) { + this.emit('error', {msg:'failed initialization', error:err, socket:socket, code:'SOCKET_INIT'}) + return {msg:`socket ${name} failed initialization`, error:err} + }.bind(this) + ) + } + else return {name:name, init:init} } // all sockets are emitters. Adds a listener to all sockets of a type with given event. diff --git a/test/test.js b/test/test.js new file mode 100644 index 0000000..9d0a193 --- /dev/null +++ b/test/test.js @@ -0,0 +1,43 @@ +import Base from '../src/base' + + +let wstest = new Base({}) +; +(async () => { + + wstest.on('error', err => { + console.log('ATTENTION! =========base instance error emitted ========') + console.log(err) + }) + + wstest.on('warn', warn => { + console.log('ATTENTION! =========base instance warning emitted ========') + console.log(warn) + }) + + // await wstest.addSocket('web0','s','w',{port:9000}) + // await wstest.addSocket('m','s','m',{host:'nas.kebler.net'}) + await wstest.addSocket('t1','s','t',{port:8001}) + await wstest.addSocket('t2','s','t',{port:8001}) + await wstest.addSocket('t3','s','t',{port:8003}) + // await wstest.addSocket('mxxx','s','m',{host:'nas.kebler.net'}) + // await wstest.addSocket('m2','s','m',{host:'nas.kebler.net'}) + // await wstest.addSocket('web1','s','w',{port:9001}) + + let res = await wstest.init() + // await wstest.addSocket('web2','s','w',{port:9002}) + if (res.errors) { + console.log('initialize errors reported') + console.log(res.errors) + } + console.log('sockets initialize responses\n',res.results) + wstest.push({cmd:'test', data:'test'}) + + // await wstest.addSocket('web2','s','w',{port:9002}) + + wstest.removeAllListeners('error') + +})().catch(err => { + console.error('FATAL: UNABLE TO START SYSTEM!\n',err) + process.kill(process.pid, 'SIGTERM') +})