refactor init to socketsInit support old init
registerSocket now only registers a socket it does not initialize, addSocket now calls this for backwards compatibility
added active getter to all sockets/consumers so now ready getter returns complete active state of all sockets
socketsInit only attempts to init non active sockets so can be called multiple times
add some more helpers
getSocketsFilter, getConsumers, getSocketInit
push now supports push to specific sockets/servers instead of just all which is the default
master
David Kebler 2019-11-21 09:30:12 -08:00
parent ec311b51df
commit 2d552bc6c1
2 changed files with 159 additions and 91 deletions

View File

@ -1,6 +1,6 @@
{
"name": "@uci/base",
"version": "0.1.30",
"version": "0.1.31",
"description": "Multi type and transport JSON packet communication base class. Used in UCI extended classes",
"main": "src/base",
"scripts": {
@ -33,15 +33,15 @@
"devDependencies": {
"chai": "^4.2.0",
"esm": "^3.2.25",
"mocha": "^6.2.0",
"nodemon": "^1.19.2"
"mocha": "^6.2.2",
"nodemon": "^2.0.0"
},
"dependencies": {
"@uci-utils/bind-funcs": "^0.2.4",
"@uci-utils/logger": "^0.0.15",
"@uci-utils/logger": "^0.0.16",
"@uci/mqtt": "^0.1.13",
"@uci/socket": "^0.2.21",
"@uci/websocket": "^0.3.9",
"@uci/socket": "^0.2.22",
"@uci/websocket": "^0.3.10",
"await-to-js": "^2.1.1",
"p-reflect": "^2.1.0",
"p-settle": "^3.1.0"

View File

@ -69,7 +69,6 @@ class Base extends EventEmitter {
this.initTimeout = opts.initTimeout
this.retryWait = opts.retryWait
this.defaultReturnCmd = opts.defaultReturnCmd
this._started = false // flag to know when instance has been initialized
this._processors = { _default: processor }
// _c and _s are the default namespaces
this._namespaces = namespaces
@ -114,46 +113,91 @@ class Base extends EventEmitter {
* @async
* @public
* @required
* @param {array} sockets string of one or array array names to initialize, if none, then all current added sockets will be initialized
*
*/
async init() {
get ready() {
// TODO checks that all sockets are active
let ready = true
for (let name in this._socket) {
// console.log(name, this._socket[name].active)
ready = ready && this._socket[name].active
}
return ready
}
async init(sockets) {
// Object.getPrototypeOf(Object.getPrototypeOf(this).init.call(this,sockets))
this.socketsInit(sockets)
// can do other init stuff here
}
async socketsInit(sockets) {
let results = {}
let errors = {}
const pReflect = async socket => {
try {
const value = await socket.init()
results[socket.name] = value
} catch (error) {
this.emit('status',{level:'fatal', msg:'socket init error',error:error})// emit an error here, remove socket
let res = await this.removeSocket(socket.name)
errors[socket.name]={error:error, remove:res}
}
// single socket intialize mapper
const initialize = async socket => {
return new Promise(async function(resolve) {
try {
// console.log('initialize socket',socket)
const value = await socket.init()
this.emit('status',{level:'info', socketName:socket.name, msg:'socket successfully initialized', message:value})
results[socket.name] = value
resolve(value)
} catch (error) {
// console.log('catching error', error)
this.emit('status',{level:'fatal', socketName:socket.name, msg:'socket init error',error:error})// emit an error here, remove socket
// let res = await this.removeSocket(socket.name)
errors[socket.name]={error:error}
resolve(error)
}
}.bind(this))
}
let sockets = []
for (let name of Object.keys(this._socket)) {
sockets.push(this._initSocket(name))
let inits = []
if (!sockets) { sockets =
Object.keys(this._socket).filter(name => {
// console.log(this._socket[name].active)
return !this._socket[name].active // only intialize (connect) inactive sockets
})
// console.log('inactive sockets', sockets)
}
await Promise.all(sockets.map(pReflect))
if(Object.keys(errors).length===0) errors=false
this._started = true
if (typeof sockets ==='string') sockets = [sockets]
// console.log('sockets to initialize',sockets)
sockets.forEach(name => {
inits.push({name:name, init:this.getSocketInit(name)})
})
// console.log('starting promise',results,errors)
let [err] = await to(Promise.all(inits.map(initialize)))
if (err) {
this.emit('status',{level:'fatal', msg:'initialize of socket errors was NOT caught --- bad bad',error:err})
return {errors:[err]}
}
if (Object.keys(errors).length===0) errors=false
return {results:results, errors:errors}
}
// support old name for now
async addSocket(name,type,transport,options) {
this.registerSocket(name,type,transport,options)
}
/**
* addSocket - Add a socket at runtime as opposed to via the sockets option at creation
*
* This is not async and will NOT initialize the socket, that must be done with a call to init or socketInit
* @param {type} name Name of socket (usually something short but unique)
* @param {string} [type=c] consumer/client 'c' or socket/server 's'
* @param {string} [transport=n] transport: (n) named pipe/ unix socket, (t) tcp socket, (m) mqtt subscribe, (w) websocket
* @param {object} [options={}] options for that particular type/transport of socket (i.e. path, host, port, etc)
*
* @returns {string} Description
* @returns {function} if called before base initialzation it can be ignored as all added sockets will be initialized. After through it be called to initialize that socket
*/
async addSocket(name, type = 'c', transport = 'n', options = {}) {
registerSocket(name, type = 'c', transport = 'n', options = {}) {
transport = this._validateTransport(transport)
log.debug({ socketName: name, type: type, tranport: transport, options: options, method:'addSocket', line:147, msg:`adding socket ${name}`})
options.id = options.id || this.id + ':' + name
if (type==='c') options = Object.assign({initTimeout:this.initTimeout, retryWait:this.retryWait},options)
@ -176,46 +220,49 @@ class Base extends EventEmitter {
break
case 'w':
if (type === 's') this._socket[name] = new WebSocket(options)
else
log.warn({ name: name, type: type, transport: transport, method:'addSocket', line:167, msg:'Web socket not created Consumer/Client Web Socket not supported'})
}
this._socket[name].name = name
this._socket[name].type = type
this._socket[name].transport = transport
this._socket[name]._packetProcess = this._packetProcess.bind(this, name)
// bubble up events from sockets to instance
const EVENTS=['status','consumer-connection'] // that should emit up from socket to instance
EVENTS.forEach(event => {
this._socket[name].on(event, obj => {
if (Object.prototype.toString.call(obj) !== '[object Object]') {
let data=obj
obj = {}
obj.data = data
}
obj.socketName = name
this.emit(event,obj)
})
})
if (type==='c') {
this._socket[name].on('pushed', packet => {
packet._header.socketName=name
this.emit('pushed', packet)
})
}
// if instance already started then init this socket now
if (this._started) return await this._initSocket(name)
else return `socket ${name} added`
if (this._socket[name]) { // in case of invalid transport
this._socket[name].name = name
this._socket[name].type = type
this._socket[name].transport = transport
this._socket[name]._packetProcess = this._packetProcess.bind(this, name)
// bubble up events from sockets to instance
const EVENTS=['status','consumer-connection'] // that should emit up from socket to instance
EVENTS.forEach(event => {
this._socket[name].on(event, obj => {
if (Object.prototype.toString.call(obj) !== '[object Object]') {
let data=obj
obj = {}
obj.data = data
}
obj.socketName = name
this.emit(event,obj)
})
})
if (type==='c') {
this._socket[name].on('pushed', packet => {
packet._header.socketName=name
this.emit('pushed', packet)
})
}
// if (this._started)
return this.getSocketInit(name) // returns the init function (e.g. connect or create) for the socket
// else return `socket ${name} added and ready to initialize `
}
}
/**
* removeSocket - TODO not available
*
* @param {string} name name of socket as created
* @param {string} name name of socket as created
* @returns {String | Object } success string or error object
*/
async removeSocket(name) {
// NOTE: uci consumers have .end renamed as .close to match socket method for convenience
let closeError
@ -224,6 +271,7 @@ class Base extends EventEmitter {
closeError = {socket:this._socket[name].name, error:err, msg:'socket/consumer closed with errors, but removed'}
}
this.emit('status', {level:'warn', msg:`socket ${name} has been removed`, socket:this._socket[name].opts})
this._socket[name].removeAllListeners()
delete this._socket[name]
return closeError ? closeError : 'success'
}
@ -232,6 +280,32 @@ class Base extends EventEmitter {
if (name) return this._socket[name]
else return this._socket
}
// returns array of names of sockets that pass filter
getSocketsFilter({type,trans, active}) {
if (trans) trans = this._validateTransport(trans)
let filtered = []
Object.keys(this._socket).forEach(name => {
// console.log(name, type,this._socket[name].type, trans, this._socket[name].transport)
if ((type==null || this._socket[name].type === type)
&& (trans==null || this._socket[name].transport === trans)
&& (active==null || this._socket[name].active===active)) filtered.push(name)
})
return filtered
}
getConsumers(filter={}) {
filter.type='c'
return this.getSocketsFilter(filter)
}
getSocketInit(name) {
let socket = this._socket[name]
if (this._socket[name].type === 's' && this._socket[name].transport !== 'm') {
return socket.create
} else {
return socket.connect
}
}
/**
* send - Description
@ -269,17 +343,31 @@ class Base extends EventEmitter {
}
}
async push(packet) {
// TODO set like send to accept a name
let broadcast = []
for (let name of Object.keys(this._socket)) {
if (this._socket[name].type === 's') {
let hookedPacket = {}
hookedPacket = this._socket[name].beforeSend ? await this._socket[name].beforeSend.call(this,Object.assign({},packet),true) : packet
log.debug({msg:'hooked packet to push', name:name, packet:hookedPacket, method:'push', line:243})
broadcast.push(this._socket[name].push.bind(this._socket[name],hookedPacket))
// sockets not passed all sockets pushed, otherwise array of names or sting of transport
async push(packet,sockets) {
if (Array.isArray(sockets)) {
let socks = []
sockets.forEach(name => {if (this._socket[name].type==='s') socks.push(this._socket[name])})
sockets = socks
}
else {
let trans = null
if (typeof sockets === 'string') trans = sockets
// console.log('push transport', trans)
sockets = Object.values(this._socket).filter(socket=>socket.type === 's')
// console.log('all server sockets',sockets)
if (trans && trans !=='all') { sockets = sockets.filter(socket=>socket.transport === this._validateTransport(trans))
// console.log('transport filtered server sockets',sockets)
}
}
let broadcast = []
// console.log('===before push', sockets)
for (let socket of sockets) {
let hookedPacket = {}
hookedPacket = socket.beforeSend ? await socket.beforeSend.call(this,Object.assign({},packet),true) : packet
log.debug({msg:'hooked packet to push', name:socket.name, packet:hookedPacket, method:'push', line:243})
broadcast.push(socket.push.bind(socket,hookedPacket))
}
return Promise.all(
broadcast.map(push => {
return push()
@ -465,29 +553,6 @@ class Base extends EventEmitter {
return res
}
_initSocket(name) {
let socket = this._socket[name]
let init = {}
if (this._socket[name].type === 's' && this._socket[name].transport !== 'm') {
init = socket.create
} else {
init = socket.connect
}
log.info({msg:`initializing socket ${name}, ${socket.type}, ${socket.transport}`})
if (this._started) {
return init().then(function(res) {
return `socket ${name} added and initialzed, ${res}`
})
.catch(function(err) {
this.emit('status', {level:'fatal', msg:'failed initialization', error:err, socket:socket, code:'SOCKET_INIT'})
return {msg:`socket ${name} failed initialization`, error:err}
}.bind(this)
)
}
else return {name:name, init:init}
}
// all sockets are emitters. Adds a listener to all sockets of a type with given event.
// now sockets can emit locally processed events
_eventListen(type,event,fn) {
@ -518,7 +583,10 @@ class Base extends EventEmitter {
mqtt:'m',
}
trans = valids[trans] || ''
if (type !== 'c' && trans ==='w') trans = ''
if (type !== 's' && trans ==='w') {
log.warn({type: type, transport: trans, msg:'Invalid type/transport - Consumer/Client Web Socket not supported use TCP'})
trans = ''
}
return trans
}