processing.js
  add ready command to consumer for pushes
  emitting entire packet
base.js
  added ready all subcriber that sends/push when ready
  adds ready packet to conPackets for all s socket types
  change observer names to include sufixes :socket, :consumer, :process,  :device
  add method to easily create an observer of connecting consumer(s)
This commit is contained in:
David Kebler 2020-01-16 17:21:58 -08:00
parent a6d928bf56
commit 80286a2e4f
3 changed files with 71 additions and 17 deletions

View file

@ -1,6 +1,6 @@
{
"name": "@uci/base",
"version": "0.1.38",
"version": "0.1.39",
"description": "Multi type and transport JSON packet communication base class. Used in UCI extended classes",
"main": "src/base",
"scripts": {

View file

@ -4,6 +4,7 @@
// UCI communication transport communication modules
// TODO change to dynamic import so loads only if that socket type is requestd
import Socket from '@uci/socket' // tcp or named pipe
// TDDO import these socket libraries dynamically from per peerDependencies
import MQTT from '@uci/mqtt' // requires broker
import WebSocket from '@uci/websocket' // server only - client is for web browser only
// UCI helpers
@ -73,6 +74,8 @@ class Base extends EventEmitter {
this.defaultReturnCmd = opts.defaultReturnCmd
this._cmdProcessors = { _default: cmdProcessor }
this.ready = new Ready({emitter: this})
// ready packet to be sent when process is "ready"
this._readyPacket = {cmd:'ready', event:`${this.name}:process`, name:this.name, id:this.id, ready:false}
// _c and _s are the default namespaces
this._namespaces =Object.assign({},namespaces)
this._c = Object.assign({},defaultCmds.c)
@ -117,9 +120,19 @@ class Base extends EventEmitter {
*/
async init(sockets) {
// Object.getPrototypeOf(Object.getPrototypeOf(this).init.call(this,sockets))
// subscribe to all combination and send
this.ready.subscribe(async ready => {
this._readyPacket.ready= ready
delete (this._readyPacket.failure)
if (!ready) {
const name = this.ready.failure
this._readyPacket.failure = {name:name, details:this.ready.getObserverDetails(name)}
}
await this.send(this._readyPacket)
await this.push(this._readyPacket)
})
return this.socketsInit(sockets)
// can do other init stuff here
}
async socketsInit(sockets) {
@ -133,7 +146,6 @@ class Base extends EventEmitter {
try {
const value = await socket.init()
this.emit('status',{level:'info', socketName:socket.name, msg:'socket successfully initialized', message:value})
results[socket.name] = value
resolve(value)
} catch (error) {
this.emit('status',{level:'fatal', socketName:socket.name, msg:'socket init error',error:error})// emit an error here, remove socket
@ -151,6 +163,7 @@ class Base extends EventEmitter {
})
}
if (typeof sockets ==='string') sockets = [sockets]
sockets.forEach(name => {
if (this._socket[name]) {
inits.push({name:name, init:this.getSocketInit(name)})
@ -182,23 +195,32 @@ class Base extends EventEmitter {
* @returns {function} if called before base initialzation it can be ignored as all added sockets will be initialized. After through it be called to initialize that socket
*/
registerSocket(name, type = 'c', transport = 'n', options = {}) {
// console.log('=========================================REGISTER=========',name)
if (isPlainObject(name)) ({name, type = 'c', transport = 'n', options = {}} = name)
if (typeof name !=='string') return null
transport = this._validateTransport(transport)
// console.log({ socketName: name, type: type, tranport: transport, options: options, method:'addSocket', line:198, msg:`adding socket ${name}`})
log.debug({ socketName: name, type: type, tranport: transport, options: options, method:'addSocket', line:198, msg:`adding socket ${name}`})
options.id = options.id || name
options.name = options.name || name
// TODO add a 'd' type for duplex which creates an 's' first and waits on connect to make a 'c'
if (type==='c') options = Object.assign({initTimeout:this.initTimeout, retryWait:this.retryWait},options) // outbound
if (type==='s') options = Object.assign({defaultReturnCmd:this.defaultReturnCmd},options) // inbound
if (type==='c') options = Object.assign({
initTimeout:this.initTimeout,
retryWait:this.retryWait
},options) // outbound
if (type==='s') {
let conPackets = [this._readyPacket]
conPackets = options.conPackets ? conPackets.concat(options.conPackets) : conPackets
conPackets = options.conPacket ? conPackets.push(options.conPacket) : conPackets
options = Object.assign({
defaultReturnCmd:this.defaultReturnCmd,
conPackets: conPackets
},options) // inbound
}
// TODO get rid of hard coded transports and use registered transports (t and n being default)
switch (transport) {
case 'n':
options.path = options.path || true
// falls through
case 't':
// console.log('==========socket options==========\n',name,type,transport,options)
this._socket[name] = new Socket[TRANSLATE[type]](options)
break
case 'm':
@ -222,7 +244,7 @@ class Base extends EventEmitter {
this._socket[name]._packetProcess = this._packetProcess.bind(this, name)
// bubble up events from inidivual sockets to base instance
const EVENTS=['log','connection','connection:consumer', 'connection:socket'] // that should emit up from each socket to instance
const EVENTS=['log','socket','connection','connection:consumer', 'connection:socket'] // that should emit up from each socket to instance
EVENTS.forEach(event => {
this._socket[name].on(event, obj => {
if (Object.prototype.toString.call(obj) !== '[object Object]') {
@ -235,7 +257,7 @@ class Base extends EventEmitter {
})
})
if (type==='c') {
this.ready.addObserver(name,this._socket[name],{event:'connection:socket',condition:ev=>{return ev.state==='connected'}})
this.ready.addObserver(`${name}:consumer`,this._socket[name],{event:'connection:socket',condition:ev=>{return ev.state==='connected'}})
this._socket[name].on('pushed', packet => {
packet._header.socketName=name
@ -244,8 +266,7 @@ class Base extends EventEmitter {
}
if (type==='s') {
// this._socket[name].on('socket',ev=>console.log(ev))
this.ready.addObserver(name,this._socket[name],{ event:'socket', condition: ev => (ev || {}).state ==='listening' })
this.ready.addObserver(`${name}:socket`,this._socket[name],{ event:'socket', condition: ev => (ev || {}).state ==='listening' })
}
return this.getSocketInit(name) // returns the init function (e.g. connect or create) for the socket
@ -344,6 +365,7 @@ class Base extends EventEmitter {
// sockets not passed all sockets pushed, otherwise array of names or sting of transport
async push(packet,sockets) {
// TODO change sockets, check if sockets is plain object otherwise it's array of socket name, or single socket name
if (Array.isArray(sockets)) {
let socks = []
sockets.forEach(name => {if (this._socket[name].type==='s') socks.push(this._socket[name])})
@ -494,6 +516,30 @@ class Base extends EventEmitter {
this._cmdProcessors[socket_name] = func
}
// a call to this method will (make or add) return and or subscribe a ready observer for incoming consumer connections
consumerConnected (socket,opts={}) {
let { subscribe, consumer, add} = opts
const conditionHandler = async ev => {
if ((ev||{}).state ==='connected'){
let data = (ev.data ||{})
console.log('connected: data from consumer',data)
if (consumer) {
console.log('observing for a particular consumer',opts.consumer)
if (data.name === consumer || [ev.name, ev.id, data.name, data.id].some(name => (name||'').includes(consumer)) ) return true
} else return true
}
return false
}
if (typeof socket ==='string') socket = this.getSocket(socket)
const create = add ? 'addObserver' : 'makeObserver'
const obs = this.ready[create](socket,{event:'connection:consumer',condition:conditionHandler})
if (subscribe) return obs.subscribe(typeof subscribe==='function' ? subscribe : console.log)
return obs
} // end consumerConnected
//=============PRIVATE METHODS =========================================
/*
*

View file

@ -75,10 +75,11 @@ const defaultCmds ={
return packet
},
ready: async function (packet) {
console.log('=========================READY RECEIVED AND EMITTED ======================================\n',packet)
packet.msg = 'ready state was emitted on receiving process'
this.emit(packet.event || packet.name || packet.id, !!(packet.ready || packet.value || packet.state))
return packet
console.log('======================== READY RECEIVED AND EMITTED (sent)==================================\n',packet)
const event = [ packet.event || packet.name || packet.id]
delete(packet._header)
this.emit(event,packet)
return {cmd:'reply', msg:'event was emitted event at socket process from send', event:event}
}
},
c:{
@ -89,6 +90,13 @@ const defaultCmds ={
reply: function(packet) {
if (process.env.UCI_ENV==='dev') log.debug({packet:packet, msg:'====Packet returned from socket - default reply logger==='})
return packet
},
ready: async function (packet) {
console.log('----------------------- READY RECEIVED AND EMITTED (pushed)---------------------------\n',packet)
const event = [ packet.event || packet.name || packet.id]
delete(packet._header)
this.emit(event,packet)
return {cmd:'reply', msg:'event was emitted event at consumer process from push', event:event}
}
}
}