From 80286a2e4fec578e3c0b4192a49d9e0eb03fa327 Mon Sep 17 00:00:00 2001 From: David Kebler Date: Thu, 16 Jan 2020 17:21:58 -0800 Subject: [PATCH] 0.1.39 processing.js add ready command to consumer for pushes emitting entire packet base.js added ready all subcriber that sends/push when ready adds ready packet to conPackets for all s socket types change observer names to include sufixes :socket, :consumer, :process, :device add method to easily create an observer of connecting consumer(s) --- package.json | 2 +- src/base.js | 70 +++++++++++++++++++++++++++++++++++++++-------- src/processing.js | 16 ++++++++--- 3 files changed, 71 insertions(+), 17 deletions(-) diff --git a/package.json b/package.json index 996d674..f6c7c2b 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@uci/base", - "version": "0.1.38", + "version": "0.1.39", "description": "Multi type and transport JSON packet communication base class. Used in UCI extended classes", "main": "src/base", "scripts": { diff --git a/src/base.js b/src/base.js index ab10d15..f2c5c8e 100644 --- a/src/base.js +++ b/src/base.js @@ -4,6 +4,7 @@ // UCI communication transport communication modules // TODO change to dynamic import so loads only if that socket type is requestd import Socket from '@uci/socket' // tcp or named pipe +// TDDO import these socket libraries dynamically from per peerDependencies import MQTT from '@uci/mqtt' // requires broker import WebSocket from '@uci/websocket' // server only - client is for web browser only // UCI helpers @@ -73,6 +74,8 @@ class Base extends EventEmitter { this.defaultReturnCmd = opts.defaultReturnCmd this._cmdProcessors = { _default: cmdProcessor } this.ready = new Ready({emitter: this}) + // ready packet to be sent when process is "ready" + this._readyPacket = {cmd:'ready', event:`${this.name}:process`, name:this.name, id:this.id, ready:false} // _c and _s are the default namespaces this._namespaces =Object.assign({},namespaces) this._c = Object.assign({},defaultCmds.c) @@ -117,9 +120,19 @@ class Base extends EventEmitter { */ async init(sockets) { - // Object.getPrototypeOf(Object.getPrototypeOf(this).init.call(this,sockets)) + + // subscribe to all combination and send + this.ready.subscribe(async ready => { + this._readyPacket.ready= ready + delete (this._readyPacket.failure) + if (!ready) { + const name = this.ready.failure + this._readyPacket.failure = {name:name, details:this.ready.getObserverDetails(name)} + } + await this.send(this._readyPacket) + await this.push(this._readyPacket) + }) return this.socketsInit(sockets) - // can do other init stuff here } async socketsInit(sockets) { @@ -133,7 +146,6 @@ class Base extends EventEmitter { try { const value = await socket.init() this.emit('status',{level:'info', socketName:socket.name, msg:'socket successfully initialized', message:value}) - results[socket.name] = value resolve(value) } catch (error) { this.emit('status',{level:'fatal', socketName:socket.name, msg:'socket init error',error:error})// emit an error here, remove socket @@ -151,6 +163,7 @@ class Base extends EventEmitter { }) } if (typeof sockets ==='string') sockets = [sockets] + sockets.forEach(name => { if (this._socket[name]) { inits.push({name:name, init:this.getSocketInit(name)}) @@ -182,23 +195,32 @@ class Base extends EventEmitter { * @returns {function} if called before base initialzation it can be ignored as all added sockets will be initialized. After through it be called to initialize that socket */ registerSocket(name, type = 'c', transport = 'n', options = {}) { - // console.log('=========================================REGISTER=========',name) if (isPlainObject(name)) ({name, type = 'c', transport = 'n', options = {}} = name) if (typeof name !=='string') return null transport = this._validateTransport(transport) - // console.log({ socketName: name, type: type, tranport: transport, options: options, method:'addSocket', line:198, msg:`adding socket ${name}`}) + log.debug({ socketName: name, type: type, tranport: transport, options: options, method:'addSocket', line:198, msg:`adding socket ${name}`}) options.id = options.id || name options.name = options.name || name // TODO add a 'd' type for duplex which creates an 's' first and waits on connect to make a 'c' - if (type==='c') options = Object.assign({initTimeout:this.initTimeout, retryWait:this.retryWait},options) // outbound - if (type==='s') options = Object.assign({defaultReturnCmd:this.defaultReturnCmd},options) // inbound + if (type==='c') options = Object.assign({ + initTimeout:this.initTimeout, + retryWait:this.retryWait + },options) // outbound + if (type==='s') { + let conPackets = [this._readyPacket] + conPackets = options.conPackets ? conPackets.concat(options.conPackets) : conPackets + conPackets = options.conPacket ? conPackets.push(options.conPacket) : conPackets + options = Object.assign({ + defaultReturnCmd:this.defaultReturnCmd, + conPackets: conPackets + },options) // inbound + } // TODO get rid of hard coded transports and use registered transports (t and n being default) switch (transport) { case 'n': options.path = options.path || true // falls through case 't': - // console.log('==========socket options==========\n',name,type,transport,options) this._socket[name] = new Socket[TRANSLATE[type]](options) break case 'm': @@ -222,7 +244,7 @@ class Base extends EventEmitter { this._socket[name]._packetProcess = this._packetProcess.bind(this, name) // bubble up events from inidivual sockets to base instance - const EVENTS=['log','connection','connection:consumer', 'connection:socket'] // that should emit up from each socket to instance + const EVENTS=['log','socket','connection','connection:consumer', 'connection:socket'] // that should emit up from each socket to instance EVENTS.forEach(event => { this._socket[name].on(event, obj => { if (Object.prototype.toString.call(obj) !== '[object Object]') { @@ -235,7 +257,7 @@ class Base extends EventEmitter { }) }) if (type==='c') { - this.ready.addObserver(name,this._socket[name],{event:'connection:socket',condition:ev=>{return ev.state==='connected'}}) + this.ready.addObserver(`${name}:consumer`,this._socket[name],{event:'connection:socket',condition:ev=>{return ev.state==='connected'}}) this._socket[name].on('pushed', packet => { packet._header.socketName=name @@ -244,8 +266,7 @@ class Base extends EventEmitter { } if (type==='s') { - // this._socket[name].on('socket',ev=>console.log(ev)) - this.ready.addObserver(name,this._socket[name],{ event:'socket', condition: ev => (ev || {}).state ==='listening' }) + this.ready.addObserver(`${name}:socket`,this._socket[name],{ event:'socket', condition: ev => (ev || {}).state ==='listening' }) } return this.getSocketInit(name) // returns the init function (e.g. connect or create) for the socket @@ -344,6 +365,7 @@ class Base extends EventEmitter { // sockets not passed all sockets pushed, otherwise array of names or sting of transport async push(packet,sockets) { + // TODO change sockets, check if sockets is plain object otherwise it's array of socket name, or single socket name if (Array.isArray(sockets)) { let socks = [] sockets.forEach(name => {if (this._socket[name].type==='s') socks.push(this._socket[name])}) @@ -494,6 +516,30 @@ class Base extends EventEmitter { this._cmdProcessors[socket_name] = func } + // a call to this method will (make or add) return and or subscribe a ready observer for incoming consumer connections + consumerConnected (socket,opts={}) { + let { subscribe, consumer, add} = opts + + const conditionHandler = async ev => { + if ((ev||{}).state ==='connected'){ + let data = (ev.data ||{}) + console.log('connected: data from consumer',data) + if (consumer) { + console.log('observing for a particular consumer',opts.consumer) + if (data.name === consumer || [ev.name, ev.id, data.name, data.id].some(name => (name||'').includes(consumer)) ) return true + } else return true + } + return false + } + if (typeof socket ==='string') socket = this.getSocket(socket) + + const create = add ? 'addObserver' : 'makeObserver' + + const obs = this.ready[create](socket,{event:'connection:consumer',condition:conditionHandler}) + if (subscribe) return obs.subscribe(typeof subscribe==='function' ? subscribe : console.log) + return obs + } // end consumerConnected + //=============PRIVATE METHODS ========================================= /* * diff --git a/src/processing.js b/src/processing.js index 1fc5d9e..d3deb4b 100644 --- a/src/processing.js +++ b/src/processing.js @@ -75,10 +75,11 @@ const defaultCmds ={ return packet }, ready: async function (packet) { - console.log('=========================READY RECEIVED AND EMITTED ======================================\n',packet) - packet.msg = 'ready state was emitted on receiving process' - this.emit(packet.event || packet.name || packet.id, !!(packet.ready || packet.value || packet.state)) - return packet + console.log('======================== READY RECEIVED AND EMITTED (sent)==================================\n',packet) + const event = [ packet.event || packet.name || packet.id] + delete(packet._header) + this.emit(event,packet) + return {cmd:'reply', msg:'event was emitted event at socket process from send', event:event} } }, c:{ @@ -89,6 +90,13 @@ const defaultCmds ={ reply: function(packet) { if (process.env.UCI_ENV==='dev') log.debug({packet:packet, msg:'====Packet returned from socket - default reply logger==='}) return packet + }, + ready: async function (packet) { + console.log('----------------------- READY RECEIVED AND EMITTED (pushed)---------------------------\n',packet) + const event = [ packet.event || packet.name || packet.id] + delete(packet._header) + this.emit(event,packet) + return {cmd:'reply', msg:'event was emitted event at consumer process from push', event:event} } } }