From 55d93bf7d8b405778599a2a43bd68c52d3f783e4 Mon Sep 17 00:00:00 2001 From: David Kebler Date: Mon, 10 Feb 2020 21:33:54 -0800 Subject: [PATCH] 0.1.49 refactor push method now allows specification of sockets and consumers has ready push for change and connect but they are buggy --- package.json | 20 ++++++---- src/base.js | 96 ++++++++++++++++++++++++++++------------------- src/processing.js | 22 +++++------ 3 files changed, 80 insertions(+), 58 deletions(-) diff --git a/package.json b/package.json index fc45483..beecc91 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@uci/base", - "version": "0.1.41", + "version": "0.1.49", "description": "Multi type and transport JSON packet communication base class. Used in UCI extended classes", "main": "src/base", "scripts": { @@ -8,8 +8,12 @@ "fio": "nodemon -r esm examples/four-in-one", "ack": "node -r esm examples/ack || true", "ack:dev": "UCI_ENV=dev ./node_modules/.bin/nodemon -r esm --preserve-symlinks examples/ack || true", - "client": "node -r esm examples/client || true", - "client:dev": "UCI_ENV=dev ./node_modules/.bin/nodemon -r esm --preserve-symlinks examples/client || true", + "consumer": "UCI_LOG_LEVEL=error UCI_ENV=dev node -r esm --preserve-symlinks examples/consumer || true", + "consumer:nm": "UCI_LOG_LEVEL=error UCI_ENV=dev ./node_modules/.bin/nodemon -r esm --preserve-symlinks examples/consumer || true", + "consumer2": "UCI_LOG_LEVEL=error UCI_ENV=dev node -r esm --preserve-symlinks examples/consumer2 || true", + "consumer2:nm": "UCI_LOG_LEVEL=error UCI_ENV=dev ./node_modules/.bin/nodemon -r esm --preserve-symlinks examples/consumer2 || true", + "socket": "UCI_LOG_LEVEL=error UCI_ENV=dev node -r esm --preserve-symlinks examples/socket || true", + "socket:nm": "UCI_LOG_LEVEL=error UCI_ENV=dev ./node_modules/.bin/nodemon -r esm --preserve-symlinks examples/socket || true", "test": "UCI_ENV=dev nodemon -r esm --preserve-symlinks test/test", "mtestw": "mocha -r esm test/*.test.mjs --watch --recurse ", "mtest": "mocha -r esm test/*.test.mjs" @@ -33,16 +37,16 @@ "devDependencies": { "chai": "^4.2.0", "esm": "^3.2.25", - "mocha": "^6.2.2", - "nodemon": "^2.0.1" + "mocha": "^7.0.1", + "nodemon": "^2.0.2" }, "dependencies": { "@uci-utils/bind-funcs": "^0.2.4", "@uci-utils/logger": "^0.0.16", - "@uci-utils/ready": "^0.1.3", + "@uci-utils/ready": "^0.1.9", "@uci/mqtt": "^0.1.13", - "@uci/socket": "^0.2.26", - "@uci/websocket": "^0.3.10", + "@uci/socket": "^0.2.31", + "@uci/websocket": "^0.3.13", "await-to-js": "^2.1.1", "is-plain-object": "^3.0.0", "merge-anything": "^2.4.4" diff --git a/src/base.js b/src/base.js index 5ef105d..359d5d1 100644 --- a/src/base.js +++ b/src/base.js @@ -99,6 +99,7 @@ class Base extends EventEmitter { sockets = Array.isArray(sockets) ? sockets:[sockets] sockets.forEach(socket => this.registerSocket(socket)) } + console.log('base.js @uci/base package tag 0.1.47') } // end constructor @@ -120,20 +121,27 @@ class Base extends EventEmitter { */ async init(sockets) { - this.ready.subscribe(async ready => { + + // TODO ready needs to allow multiple all subscribers that get rebuilt on add/remove + + const res = await this.socketsInit(sockets) + // console.log('all observer', this.ready.all) + // update ready packet and push/send that changed packet + this.ready.all.subscribe(async ready => { this._readyPacket.ready= ready delete (this._readyPacket.failure) - if (!ready) { - const name = this.ready.failure - this._readyPacket.failure = {name:name, details:this.ready.getObserverDetails(name)} - } - // console.log('base process ready - pushing/sending\n',this._readyPacket) - await this.send(this._readyPacket) - await this.push(this._readyPacket) + if (!ready) { // make a list of the failures to send + // await new Promise(res=>setTimeout(()=>res(),1000)) + this._readyPacket.failures = this.ready.failed + } else delete this._readyPacket.failures + this.emit('log',{level:'testing', msg:`${this.name} has an updated state broadcasting: event>state = ${this._readyPacket.event}>${this._readyPacket.ready}`}) + // setTimeout(async () => { + // console.log('ready send', await this.send(this._readyPacket)) // to any socket that this instance is connected to + // console.log('ready push',await this.push(this._readyPacket)) // to any remote consumer connected to an instance socket + // },100) }) - return this.socketsInit(sockets) + return res } - async socketsInit(sockets) { let results = {} @@ -144,10 +152,10 @@ class Base extends EventEmitter { return new Promise(async function(resolve) { try { const value = await socket.init() - this.emit('status',{level:'info', socketName:socket.name, msg:'socket successfully initialized', message:value}) + this.emit('log',{level:'info', socketName:socket.name, msg:'socket successfully initialized', message:value}) resolve(value) } catch (error) { - this.emit('status',{level:'fatal', socketName:socket.name, msg:'socket init error',error:error})// emit an error here, remove socket + this.emit('log',{level:'fatal', socketName:socket.name, msg:'socket init error',error:error})// emit an error here, remove socket // let res = await this.removeSocket(socket.name) errors[socket.name]={error:error} resolve(error) @@ -170,7 +178,7 @@ class Base extends EventEmitter { }) let [err] = await to(Promise.all(inits.map(initialize))) if (err) { - this.emit('status',{level:'fatal', msg:'initialize of socket errors was NOT caught --- bad bad',error:err}) + this.emit('log',{level:'fatal', msg:'initialize of socket errors was NOT caught --- bad bad',error:err}) return {errors:[err]} } if (Object.keys(errors).length===0) errors=false @@ -206,7 +214,7 @@ class Base extends EventEmitter { retryWait:this.retryWait },options) // outbound if (type==='s') { - let conPackets = [this._readyPacket] + let conPackets = [] // [this._readyPacket] conPackets = options.conPackets ? conPackets.concat(options.conPackets) : conPackets conPackets = options.conPacket ? conPackets.push(options.conPacket) : conPackets options = Object.assign({ @@ -242,7 +250,9 @@ class Base extends EventEmitter { this._socket[name].transport = transport this._socket[name]._packetProcess = this._packetProcess.bind(this, name) - // bubble up events from inidivual sockets to base instance + // bubble up events from inidivual sockets to base instance, + // connection:consumer is a socket emitting when a consumer is connecting + // connection:socket is a consumer emiting when connecting to a socket const EVENTS=['log','socket','connection','connection:consumer', 'connection:socket'] // that should emit up from each socket to instance EVENTS.forEach(event => { this._socket[name].on(event, obj => { @@ -255,8 +265,11 @@ class Base extends EventEmitter { this.emit(event,obj) }) }) + if (type==='c') { + // when consumer has sucessfully connected to a socket this.ready.addObserver(`${name}:consumer`,this._socket[name],{event:'connection:socket',condition:ev=>ev.state==='connected'}) + // set up listner for any pushed packets and emit locally this._socket[name].on('pushed', packet => { packet._header.socketName=name this.emit('pushed', packet) @@ -264,9 +277,18 @@ class Base extends EventEmitter { } if (type==='s') { + // when socket is listnening this.ready.addObserver(`${name}:socket`,this._socket[name],{ event:'socket', condition: ev => (ev || {}).state ==='listening' }) - } + // TODO refactor as regular listners so will know which consumer and can push only to it + this.consumerConnected(this._socket[name],{ + subscribe: newConsumer => { if (newConsumer) { + this.emit('log',{level:'testing', msg:`${this.name} has new consumer connecting pushing: event>state = ${this._readyPacket.event}>${this._readyPacket.ready}`}) + this.push(this._readyPacket,{socket:name}) + } + } + }) + } return this.getSocketInit(name) // returns the init function (e.g. connect or create) for the socket } } @@ -280,12 +302,13 @@ class Base extends EventEmitter { async removeSocket(name) { // NOTE: uci consumers have .end renamed as .close to match socket method for convenience + if (!this.getSocket(name)) return 'no socket by that name' let closeError let [err] = await to(this._socket[name].close()) if(err) if (err.code !== 'ERR_SERVER_NOT_RUNNING') { closeError = {socket:this._socket[name].name, error:err, msg:'socket/consumer closed with errors, but removed'} } - this.emit('status', {level:'warn', msg:`socket ${name} has been removed`, socket:this._socket[name].opts}) + this.emit('log', {level:'warn', msg:`socket ${name} has been removed`, socket:this._socket[name].opts}) this._socket[name].removeAllListeners() delete this._socket[name] return closeError ? closeError : 'success' @@ -361,37 +384,31 @@ class Base extends EventEmitter { } } - // sockets not passed all sockets pushed, otherwise array of names or sting of transport - async push(packet,sockets) { - // TODO change sockets, check if sockets is plain object otherwise it's array of socket name, or single socket name - if (Array.isArray(sockets)) { - let socks = [] - sockets.forEach(name => {if (this._socket[name].type==='s') socks.push(this._socket[name])}) - sockets = socks - } - else { - let trans = null - if (typeof sockets === 'string') trans = sockets - sockets = Object.values(this._socket).filter(socket=>socket.type === 's') - if (trans && trans !=='all') { sockets = sockets.filter(socket=>socket.transport === this._validateTransport(trans)) - } - } + async push(packet,opts={}) { + let sockets = this.getSocketsFilter({type:'s'}) + if (!sockets.length) return Promise.resolve('no sockets on which to push') + opts.sockets = opts.sockets ? opts.sockets : (opts.socket ? [opts.socket] : []) + if (opts.sockets.length) sockets = sockets.filter(name=>opts.sockets.includes(name)) + sockets = sockets + .map(name=>this.getSocket(name)) + .filter(sock=> (opts.transport && opts.transport !=='all') ? sock.transport=== this._validateTransport(opts.transport) : true) + // console.log(sockets.length, 'sockets for push', sockets.map(socket=>socket.name)) + if (!sockets.length) return Promise.resolve('no sockets on which to push') let broadcast = [] + // TODO use map and reflect for (let socket of sockets) { let hookedPacket = {} hookedPacket = socket.beforeSend ? await socket.beforeSend.call(this,Object.assign({},packet),true) : packet log.debug({msg:'hooked packet to push', name:socket.name, packet:hookedPacket, method:'push', line:243}) - broadcast.push(socket.push.bind(socket,hookedPacket)) + broadcast.push(socket.push.bind(socket,hookedPacket,opts)) } return Promise.all( broadcast.map(push => { return push() }) ) + } - - // TODO make push version of all this sends - // TODO accept alt transport string i.e. t or TCP async sendTransport(packet, transport) { let sends = [] @@ -521,7 +538,7 @@ class Base extends EventEmitter { const conditionHandler = async ev => { if ((ev||{}).state ==='connected'){ let data = (ev.data ||{}) - if (consumer) { + if (consumer) { // specific consumer check if (data.name === consumer || [ev.name, ev.id, data.name, data.id].some(name => (name||'').includes(consumer)) ) return true } else return true } @@ -595,10 +612,13 @@ class Base extends EventEmitter { if (!isPlainObject(res)) packet.processResult ? packet.processResult[name]=res : packet.processResult = {[name]:res} else { let method = (packet.processMethod || {})[name] || packet.processMethod + // TODO could support other specialized methods if (method === 'merge') { packet = merge(packet,res) } - else Object.assign(packet,res) + else { + packet=res + } } } } diff --git a/src/processing.js b/src/processing.js index e6f510a..3bdbeb6 100644 --- a/src/processing.js +++ b/src/processing.js @@ -75,13 +75,12 @@ const defaultCmds ={ return packet }, ready: async function (packet) { - // console.log('======================== READY RECEIVED AND EMITTED (sent)==================================') - // console.log(packet.event, 'ready', packet.ready) - // console.dir(packet) - const event = [ packet.event || packet.name || packet.id] + const event = packet.event || packet.name || packet.id delete(packet._header) - this.emit(event,packet) - return {cmd:'reply', msg:'event was emitted event at socket process from send', event:event} + this.emit(event,packet.ready,packet) + // console.log('=========== READY RECEIVED AND EMITTED (sent)===========') + // console.log(event, 'ready', packet.ready) + return {cmd:'reply', msg:'consumer sent event was emitted event at socket process', event:event} } }, c:{ @@ -94,13 +93,12 @@ const defaultCmds ={ return packet }, ready: async function (packet) { - // console.log('----------------------- READY RECEIVED AND EMITTED (pushed)---------------------------') - // console.log(packet.event, 'ready', packet.ready) - // console.dir(packet) - const event = [ packet.event || packet.name || packet.id] + const event = packet.event || packet.name || packet.id delete(packet._header) - this.emit(event,packet) - return {cmd:'reply', msg:'event was emitted event at consumer process from push', event:event} + this.emit(event,packet.ready,packet) + // console.log('---------- READY RECEIVED AND EMITTED (pushed)------------') + // console.log(event, 'ready', packet.ready) + return {cmd:'reply', msg:'ready packet event was emitted at consumer process from push'} } } }