diff --git a/package.json b/package.json index d4352ec..536747b 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@uci/base", - "version": "0.1.30", + "version": "0.1.31", "description": "Multi type and transport JSON packet communication base class. Used in UCI extended classes", "main": "src/base", "scripts": { @@ -33,15 +33,15 @@ "devDependencies": { "chai": "^4.2.0", "esm": "^3.2.25", - "mocha": "^6.2.0", - "nodemon": "^1.19.2" + "mocha": "^6.2.2", + "nodemon": "^2.0.0" }, "dependencies": { "@uci-utils/bind-funcs": "^0.2.4", - "@uci-utils/logger": "^0.0.15", + "@uci-utils/logger": "^0.0.16", "@uci/mqtt": "^0.1.13", - "@uci/socket": "^0.2.21", - "@uci/websocket": "^0.3.9", + "@uci/socket": "^0.2.22", + "@uci/websocket": "^0.3.10", "await-to-js": "^2.1.1", "p-reflect": "^2.1.0", "p-settle": "^3.1.0" diff --git a/src/base.js b/src/base.js index 4236e61..8a5ff7b 100644 --- a/src/base.js +++ b/src/base.js @@ -69,7 +69,6 @@ class Base extends EventEmitter { this.initTimeout = opts.initTimeout this.retryWait = opts.retryWait this.defaultReturnCmd = opts.defaultReturnCmd - this._started = false // flag to know when instance has been initialized this._processors = { _default: processor } // _c and _s are the default namespaces this._namespaces = namespaces @@ -114,46 +113,91 @@ class Base extends EventEmitter { * @async * @public * @required + * @param {array} sockets string of one or array array names to initialize, if none, then all current added sockets will be initialized + * */ - async init() { + get ready() { + // TODO checks that all sockets are active + let ready = true + for (let name in this._socket) { + // console.log(name, this._socket[name].active) + ready = ready && this._socket[name].active + } + return ready + } + + async init(sockets) { + // Object.getPrototypeOf(Object.getPrototypeOf(this).init.call(this,sockets)) + this.socketsInit(sockets) + // can do other init stuff here + } + + async socketsInit(sockets) { let results = {} let errors = {} - const pReflect = async socket => { - try { - const value = await socket.init() - results[socket.name] = value - } catch (error) { - this.emit('status',{level:'fatal', msg:'socket init error',error:error})// emit an error here, remove socket - let res = await this.removeSocket(socket.name) - errors[socket.name]={error:error, remove:res} - } + // single socket intialize mapper + const initialize = async socket => { + return new Promise(async function(resolve) { + try { + // console.log('initialize socket',socket) + const value = await socket.init() + this.emit('status',{level:'info', socketName:socket.name, msg:'socket successfully initialized', message:value}) + results[socket.name] = value + resolve(value) + } catch (error) { + // console.log('catching error', error) + this.emit('status',{level:'fatal', socketName:socket.name, msg:'socket init error',error:error})// emit an error here, remove socket + // let res = await this.removeSocket(socket.name) + errors[socket.name]={error:error} + resolve(error) + } + }.bind(this)) } - let sockets = [] - for (let name of Object.keys(this._socket)) { - sockets.push(this._initSocket(name)) + let inits = [] + if (!sockets) { sockets = + Object.keys(this._socket).filter(name => { + // console.log(this._socket[name].active) + return !this._socket[name].active // only intialize (connect) inactive sockets + }) + // console.log('inactive sockets', sockets) } - await Promise.all(sockets.map(pReflect)) - - if(Object.keys(errors).length===0) errors=false - this._started = true + if (typeof sockets ==='string') sockets = [sockets] + // console.log('sockets to initialize',sockets) + sockets.forEach(name => { + inits.push({name:name, init:this.getSocketInit(name)}) + }) + // console.log('starting promise',results,errors) + let [err] = await to(Promise.all(inits.map(initialize))) + if (err) { + this.emit('status',{level:'fatal', msg:'initialize of socket errors was NOT caught --- bad bad',error:err}) + return {errors:[err]} + } + if (Object.keys(errors).length===0) errors=false return {results:results, errors:errors} } + // support old name for now + async addSocket(name,type,transport,options) { + this.registerSocket(name,type,transport,options) + } + + /** * addSocket - Add a socket at runtime as opposed to via the sockets option at creation - * + * This is not async and will NOT initialize the socket, that must be done with a call to init or socketInit * @param {type} name Name of socket (usually something short but unique) * @param {string} [type=c] consumer/client 'c' or socket/server 's' * @param {string} [transport=n] transport: (n) named pipe/ unix socket, (t) tcp socket, (m) mqtt subscribe, (w) websocket * @param {object} [options={}] options for that particular type/transport of socket (i.e. path, host, port, etc) * - * @returns {string} Description + * @returns {function} if called before base initialzation it can be ignored as all added sockets will be initialized. After through it be called to initialize that socket */ - async addSocket(name, type = 'c', transport = 'n', options = {}) { + registerSocket(name, type = 'c', transport = 'n', options = {}) { + transport = this._validateTransport(transport) log.debug({ socketName: name, type: type, tranport: transport, options: options, method:'addSocket', line:147, msg:`adding socket ${name}`}) options.id = options.id || this.id + ':' + name if (type==='c') options = Object.assign({initTimeout:this.initTimeout, retryWait:this.retryWait},options) @@ -176,46 +220,49 @@ class Base extends EventEmitter { break case 'w': if (type === 's') this._socket[name] = new WebSocket(options) - else - log.warn({ name: name, type: type, transport: transport, method:'addSocket', line:167, msg:'Web socket not created Consumer/Client Web Socket not supported'}) - } - this._socket[name].name = name - this._socket[name].type = type - this._socket[name].transport = transport - this._socket[name]._packetProcess = this._packetProcess.bind(this, name) - - // bubble up events from sockets to instance - const EVENTS=['status','consumer-connection'] // that should emit up from socket to instance - EVENTS.forEach(event => { - this._socket[name].on(event, obj => { - if (Object.prototype.toString.call(obj) !== '[object Object]') { - let data=obj - obj = {} - obj.data = data - } - obj.socketName = name - this.emit(event,obj) - }) - }) - - if (type==='c') { - this._socket[name].on('pushed', packet => { - packet._header.socketName=name - this.emit('pushed', packet) - }) } - // if instance already started then init this socket now - if (this._started) return await this._initSocket(name) - else return `socket ${name} added` + if (this._socket[name]) { // in case of invalid transport + + this._socket[name].name = name + this._socket[name].type = type + this._socket[name].transport = transport + this._socket[name]._packetProcess = this._packetProcess.bind(this, name) + + // bubble up events from sockets to instance + const EVENTS=['status','consumer-connection'] // that should emit up from socket to instance + EVENTS.forEach(event => { + this._socket[name].on(event, obj => { + if (Object.prototype.toString.call(obj) !== '[object Object]') { + let data=obj + obj = {} + obj.data = data + } + obj.socketName = name + this.emit(event,obj) + }) + }) + + if (type==='c') { + this._socket[name].on('pushed', packet => { + packet._header.socketName=name + this.emit('pushed', packet) + }) + } + // if (this._started) + return this.getSocketInit(name) // returns the init function (e.g. connect or create) for the socket + // else return `socket ${name} added and ready to initialize ` + + } } /** * removeSocket - TODO not available * - * @param {string} name name of socket as created + * @param {string} name name of socket as created * @returns {String | Object } success string or error object */ + async removeSocket(name) { // NOTE: uci consumers have .end renamed as .close to match socket method for convenience let closeError @@ -224,6 +271,7 @@ class Base extends EventEmitter { closeError = {socket:this._socket[name].name, error:err, msg:'socket/consumer closed with errors, but removed'} } this.emit('status', {level:'warn', msg:`socket ${name} has been removed`, socket:this._socket[name].opts}) + this._socket[name].removeAllListeners() delete this._socket[name] return closeError ? closeError : 'success' } @@ -232,6 +280,32 @@ class Base extends EventEmitter { if (name) return this._socket[name] else return this._socket } + // returns array of names of sockets that pass filter + getSocketsFilter({type,trans, active}) { + if (trans) trans = this._validateTransport(trans) + let filtered = [] + Object.keys(this._socket).forEach(name => { + // console.log(name, type,this._socket[name].type, trans, this._socket[name].transport) + if ((type==null || this._socket[name].type === type) + && (trans==null || this._socket[name].transport === trans) + && (active==null || this._socket[name].active===active)) filtered.push(name) + }) + return filtered + } + + getConsumers(filter={}) { + filter.type='c' + return this.getSocketsFilter(filter) + } + + getSocketInit(name) { + let socket = this._socket[name] + if (this._socket[name].type === 's' && this._socket[name].transport !== 'm') { + return socket.create + } else { + return socket.connect + } + } /** * send - Description @@ -269,17 +343,31 @@ class Base extends EventEmitter { } } - async push(packet) { - // TODO set like send to accept a name - let broadcast = [] - for (let name of Object.keys(this._socket)) { - if (this._socket[name].type === 's') { - let hookedPacket = {} - hookedPacket = this._socket[name].beforeSend ? await this._socket[name].beforeSend.call(this,Object.assign({},packet),true) : packet - log.debug({msg:'hooked packet to push', name:name, packet:hookedPacket, method:'push', line:243}) - broadcast.push(this._socket[name].push.bind(this._socket[name],hookedPacket)) + // sockets not passed all sockets pushed, otherwise array of names or sting of transport + async push(packet,sockets) { + if (Array.isArray(sockets)) { + let socks = [] + sockets.forEach(name => {if (this._socket[name].type==='s') socks.push(this._socket[name])}) + sockets = socks + } + else { + let trans = null + if (typeof sockets === 'string') trans = sockets + // console.log('push transport', trans) + sockets = Object.values(this._socket).filter(socket=>socket.type === 's') + // console.log('all server sockets',sockets) + if (trans && trans !=='all') { sockets = sockets.filter(socket=>socket.transport === this._validateTransport(trans)) + // console.log('transport filtered server sockets',sockets) } } + let broadcast = [] + // console.log('===before push', sockets) + for (let socket of sockets) { + let hookedPacket = {} + hookedPacket = socket.beforeSend ? await socket.beforeSend.call(this,Object.assign({},packet),true) : packet + log.debug({msg:'hooked packet to push', name:socket.name, packet:hookedPacket, method:'push', line:243}) + broadcast.push(socket.push.bind(socket,hookedPacket)) + } return Promise.all( broadcast.map(push => { return push() @@ -465,29 +553,6 @@ class Base extends EventEmitter { return res } - _initSocket(name) { - let socket = this._socket[name] - let init = {} - if (this._socket[name].type === 's' && this._socket[name].transport !== 'm') { - init = socket.create - } else { - init = socket.connect - } - log.info({msg:`initializing socket ${name}, ${socket.type}, ${socket.transport}`}) - - if (this._started) { - return init().then(function(res) { - return `socket ${name} added and initialzed, ${res}` - }) - .catch(function(err) { - this.emit('status', {level:'fatal', msg:'failed initialization', error:err, socket:socket, code:'SOCKET_INIT'}) - return {msg:`socket ${name} failed initialization`, error:err} - }.bind(this) - ) - } - else return {name:name, init:init} - } - // all sockets are emitters. Adds a listener to all sockets of a type with given event. // now sockets can emit locally processed events _eventListen(type,event,fn) { @@ -518,7 +583,10 @@ class Base extends EventEmitter { mqtt:'m', } trans = valids[trans] || '' - if (type !== 'c' && trans ==='w') trans = '' + if (type !== 's' && trans ==='w') { + log.warn({type: type, transport: trans, msg:'Invalid type/transport - Consumer/Client Web Socket not supported use TCP'}) + trans = '' + } return trans }