0.1.49 refactor push method now allows specification of sockets and consumers

has ready push for change and connect but they are buggy
This commit is contained in:
David Kebler 2020-02-10 21:33:54 -08:00
parent 709b3e0fbc
commit 55d93bf7d8
3 changed files with 80 additions and 58 deletions

View file

@ -1,6 +1,6 @@
{
"name": "@uci/base",
"version": "0.1.41",
"version": "0.1.49",
"description": "Multi type and transport JSON packet communication base class. Used in UCI extended classes",
"main": "src/base",
"scripts": {
@ -8,8 +8,12 @@
"fio": "nodemon -r esm examples/four-in-one",
"ack": "node -r esm examples/ack || true",
"ack:dev": "UCI_ENV=dev ./node_modules/.bin/nodemon -r esm --preserve-symlinks examples/ack || true",
"client": "node -r esm examples/client || true",
"client:dev": "UCI_ENV=dev ./node_modules/.bin/nodemon -r esm --preserve-symlinks examples/client || true",
"consumer": "UCI_LOG_LEVEL=error UCI_ENV=dev node -r esm --preserve-symlinks examples/consumer || true",
"consumer:nm": "UCI_LOG_LEVEL=error UCI_ENV=dev ./node_modules/.bin/nodemon -r esm --preserve-symlinks examples/consumer || true",
"consumer2": "UCI_LOG_LEVEL=error UCI_ENV=dev node -r esm --preserve-symlinks examples/consumer2 || true",
"consumer2:nm": "UCI_LOG_LEVEL=error UCI_ENV=dev ./node_modules/.bin/nodemon -r esm --preserve-symlinks examples/consumer2 || true",
"socket": "UCI_LOG_LEVEL=error UCI_ENV=dev node -r esm --preserve-symlinks examples/socket || true",
"socket:nm": "UCI_LOG_LEVEL=error UCI_ENV=dev ./node_modules/.bin/nodemon -r esm --preserve-symlinks examples/socket || true",
"test": "UCI_ENV=dev nodemon -r esm --preserve-symlinks test/test",
"mtestw": "mocha -r esm test/*.test.mjs --watch --recurse ",
"mtest": "mocha -r esm test/*.test.mjs"
@ -33,16 +37,16 @@
"devDependencies": {
"chai": "^4.2.0",
"esm": "^3.2.25",
"mocha": "^6.2.2",
"nodemon": "^2.0.1"
"mocha": "^7.0.1",
"nodemon": "^2.0.2"
},
"dependencies": {
"@uci-utils/bind-funcs": "^0.2.4",
"@uci-utils/logger": "^0.0.16",
"@uci-utils/ready": "^0.1.3",
"@uci-utils/ready": "^0.1.9",
"@uci/mqtt": "^0.1.13",
"@uci/socket": "^0.2.26",
"@uci/websocket": "^0.3.10",
"@uci/socket": "^0.2.31",
"@uci/websocket": "^0.3.13",
"await-to-js": "^2.1.1",
"is-plain-object": "^3.0.0",
"merge-anything": "^2.4.4"

View file

@ -99,6 +99,7 @@ class Base extends EventEmitter {
sockets = Array.isArray(sockets) ? sockets:[sockets]
sockets.forEach(socket => this.registerSocket(socket))
}
console.log('base.js @uci/base package tag 0.1.47')
} // end constructor
@ -120,20 +121,27 @@ class Base extends EventEmitter {
*/
async init(sockets) {
this.ready.subscribe(async ready => {
// TODO ready needs to allow multiple all subscribers that get rebuilt on add/remove
const res = await this.socketsInit(sockets)
// console.log('all observer', this.ready.all)
// update ready packet and push/send that changed packet
this.ready.all.subscribe(async ready => {
this._readyPacket.ready= ready
delete (this._readyPacket.failure)
if (!ready) {
const name = this.ready.failure
this._readyPacket.failure = {name:name, details:this.ready.getObserverDetails(name)}
}
// console.log('base process ready - pushing/sending\n',this._readyPacket)
await this.send(this._readyPacket)
await this.push(this._readyPacket)
if (!ready) { // make a list of the failures to send
// await new Promise(res=>setTimeout(()=>res(),1000))
this._readyPacket.failures = this.ready.failed
} else delete this._readyPacket.failures
this.emit('log',{level:'testing', msg:`${this.name} has an updated state broadcasting: event>state = ${this._readyPacket.event}>${this._readyPacket.ready}`})
// setTimeout(async () => {
// console.log('ready send', await this.send(this._readyPacket)) // to any socket that this instance is connected to
// console.log('ready push',await this.push(this._readyPacket)) // to any remote consumer connected to an instance socket
// },100)
})
return this.socketsInit(sockets)
return res
}
async socketsInit(sockets) {
let results = {}
@ -144,10 +152,10 @@ class Base extends EventEmitter {
return new Promise(async function(resolve) {
try {
const value = await socket.init()
this.emit('status',{level:'info', socketName:socket.name, msg:'socket successfully initialized', message:value})
this.emit('log',{level:'info', socketName:socket.name, msg:'socket successfully initialized', message:value})
resolve(value)
} catch (error) {
this.emit('status',{level:'fatal', socketName:socket.name, msg:'socket init error',error:error})// emit an error here, remove socket
this.emit('log',{level:'fatal', socketName:socket.name, msg:'socket init error',error:error})// emit an error here, remove socket
// let res = await this.removeSocket(socket.name)
errors[socket.name]={error:error}
resolve(error)
@ -170,7 +178,7 @@ class Base extends EventEmitter {
})
let [err] = await to(Promise.all(inits.map(initialize)))
if (err) {
this.emit('status',{level:'fatal', msg:'initialize of socket errors was NOT caught --- bad bad',error:err})
this.emit('log',{level:'fatal', msg:'initialize of socket errors was NOT caught --- bad bad',error:err})
return {errors:[err]}
}
if (Object.keys(errors).length===0) errors=false
@ -206,7 +214,7 @@ class Base extends EventEmitter {
retryWait:this.retryWait
},options) // outbound
if (type==='s') {
let conPackets = [this._readyPacket]
let conPackets = [] // [this._readyPacket]
conPackets = options.conPackets ? conPackets.concat(options.conPackets) : conPackets
conPackets = options.conPacket ? conPackets.push(options.conPacket) : conPackets
options = Object.assign({
@ -242,7 +250,9 @@ class Base extends EventEmitter {
this._socket[name].transport = transport
this._socket[name]._packetProcess = this._packetProcess.bind(this, name)
// bubble up events from inidivual sockets to base instance
// bubble up events from inidivual sockets to base instance,
// connection:consumer is a socket emitting when a consumer is connecting
// connection:socket is a consumer emiting when connecting to a socket
const EVENTS=['log','socket','connection','connection:consumer', 'connection:socket'] // that should emit up from each socket to instance
EVENTS.forEach(event => {
this._socket[name].on(event, obj => {
@ -255,8 +265,11 @@ class Base extends EventEmitter {
this.emit(event,obj)
})
})
if (type==='c') {
// when consumer has sucessfully connected to a socket
this.ready.addObserver(`${name}:consumer`,this._socket[name],{event:'connection:socket',condition:ev=>ev.state==='connected'})
// set up listner for any pushed packets and emit locally
this._socket[name].on('pushed', packet => {
packet._header.socketName=name
this.emit('pushed', packet)
@ -264,9 +277,18 @@ class Base extends EventEmitter {
}
if (type==='s') {
// when socket is listnening
this.ready.addObserver(`${name}:socket`,this._socket[name],{ event:'socket', condition: ev => (ev || {}).state ==='listening' })
}
// TODO refactor as regular listners so will know which consumer and can push only to it
this.consumerConnected(this._socket[name],{
subscribe: newConsumer => { if (newConsumer) {
this.emit('log',{level:'testing', msg:`${this.name} has new consumer connecting pushing: event>state = ${this._readyPacket.event}>${this._readyPacket.ready}`})
this.push(this._readyPacket,{socket:name})
}
}
})
}
return this.getSocketInit(name) // returns the init function (e.g. connect or create) for the socket
}
}
@ -280,12 +302,13 @@ class Base extends EventEmitter {
async removeSocket(name) {
// NOTE: uci consumers have .end renamed as .close to match socket method for convenience
if (!this.getSocket(name)) return 'no socket by that name'
let closeError
let [err] = await to(this._socket[name].close())
if(err) if (err.code !== 'ERR_SERVER_NOT_RUNNING') {
closeError = {socket:this._socket[name].name, error:err, msg:'socket/consumer closed with errors, but removed'}
}
this.emit('status', {level:'warn', msg:`socket ${name} has been removed`, socket:this._socket[name].opts})
this.emit('log', {level:'warn', msg:`socket ${name} has been removed`, socket:this._socket[name].opts})
this._socket[name].removeAllListeners()
delete this._socket[name]
return closeError ? closeError : 'success'
@ -361,37 +384,31 @@ class Base extends EventEmitter {
}
}
// sockets not passed all sockets pushed, otherwise array of names or sting of transport
async push(packet,sockets) {
// TODO change sockets, check if sockets is plain object otherwise it's array of socket name, or single socket name
if (Array.isArray(sockets)) {
let socks = []
sockets.forEach(name => {if (this._socket[name].type==='s') socks.push(this._socket[name])})
sockets = socks
}
else {
let trans = null
if (typeof sockets === 'string') trans = sockets
sockets = Object.values(this._socket).filter(socket=>socket.type === 's')
if (trans && trans !=='all') { sockets = sockets.filter(socket=>socket.transport === this._validateTransport(trans))
}
}
async push(packet,opts={}) {
let sockets = this.getSocketsFilter({type:'s'})
if (!sockets.length) return Promise.resolve('no sockets on which to push')
opts.sockets = opts.sockets ? opts.sockets : (opts.socket ? [opts.socket] : [])
if (opts.sockets.length) sockets = sockets.filter(name=>opts.sockets.includes(name))
sockets = sockets
.map(name=>this.getSocket(name))
.filter(sock=> (opts.transport && opts.transport !=='all') ? sock.transport=== this._validateTransport(opts.transport) : true)
// console.log(sockets.length, 'sockets for push', sockets.map(socket=>socket.name))
if (!sockets.length) return Promise.resolve('no sockets on which to push')
let broadcast = []
// TODO use map and reflect
for (let socket of sockets) {
let hookedPacket = {}
hookedPacket = socket.beforeSend ? await socket.beforeSend.call(this,Object.assign({},packet),true) : packet
log.debug({msg:'hooked packet to push', name:socket.name, packet:hookedPacket, method:'push', line:243})
broadcast.push(socket.push.bind(socket,hookedPacket))
broadcast.push(socket.push.bind(socket,hookedPacket,opts))
}
return Promise.all(
broadcast.map(push => {
return push()
})
)
}
// TODO make push version of all this sends
// TODO accept alt transport string i.e. t or TCP
async sendTransport(packet, transport) {
let sends = []
@ -521,7 +538,7 @@ class Base extends EventEmitter {
const conditionHandler = async ev => {
if ((ev||{}).state ==='connected'){
let data = (ev.data ||{})
if (consumer) {
if (consumer) { // specific consumer check
if (data.name === consumer || [ev.name, ev.id, data.name, data.id].some(name => (name||'').includes(consumer)) ) return true
} else return true
}
@ -595,10 +612,13 @@ class Base extends EventEmitter {
if (!isPlainObject(res)) packet.processResult ? packet.processResult[name]=res : packet.processResult = {[name]:res}
else {
let method = (packet.processMethod || {})[name] || packet.processMethod
// TODO could support other specialized methods
if (method === 'merge') {
packet = merge(packet,res)
}
else Object.assign(packet,res)
else {
packet=res
}
}
}
}

View file

@ -75,13 +75,12 @@ const defaultCmds ={
return packet
},
ready: async function (packet) {
// console.log('======================== READY RECEIVED AND EMITTED (sent)==================================')
// console.log(packet.event, 'ready', packet.ready)
// console.dir(packet)
const event = [ packet.event || packet.name || packet.id]
const event = packet.event || packet.name || packet.id
delete(packet._header)
this.emit(event,packet)
return {cmd:'reply', msg:'event was emitted event at socket process from send', event:event}
this.emit(event,packet.ready,packet)
// console.log('=========== READY RECEIVED AND EMITTED (sent)===========')
// console.log(event, 'ready', packet.ready)
return {cmd:'reply', msg:'consumer sent event was emitted event at socket process', event:event}
}
},
c:{
@ -94,13 +93,12 @@ const defaultCmds ={
return packet
},
ready: async function (packet) {
// console.log('----------------------- READY RECEIVED AND EMITTED (pushed)---------------------------')
// console.log(packet.event, 'ready', packet.ready)
// console.dir(packet)
const event = [ packet.event || packet.name || packet.id]
const event = packet.event || packet.name || packet.id
delete(packet._header)
this.emit(event,packet)
return {cmd:'reply', msg:'event was emitted event at consumer process from push', event:event}
this.emit(event,packet.ready,packet)
// console.log('---------- READY RECEIVED AND EMITTED (pushed)------------')
// console.log(event, 'ready', packet.ready)
return {cmd:'reply', msg:'ready packet event was emitted at consumer process from push'}
}
}
}