gather port or path option into proper socket entry

last commit before major refactor for plugins
master
Kebler Network System Administrator 2021-04-05 17:35:57 -07:00
parent 599b3242ea
commit 439b5dbd7e
2 changed files with 578 additions and 276 deletions

View File

@ -8,13 +8,15 @@
// --------------- UCI dependencies ------------------- // --------------- UCI dependencies -------------------
// UCI communication transport communication modules // UCI communication transport communication modules
// TODO change to dynamic import so loads only if that socket type is requestd // TODO change to dynamic import so loads only if that connector type is requested
import Socket from '@uci/socket' // tcp or named pipe import Socket from '@uci/socket' // tcp or named pipe
// TDDO import these socket libraries dynamically from per peerDependencies
import MQTT from '@uci/mqtt' // requires broker import MQTT from '@uci/mqtt' // requires broker
import WebSocket from '@uci/websocket' // server only - client is for web browser only import WebSocket from '@uci/websocket' // server only - client is for web browser only
// UCI helpers // UCI helpers
import { Ready, changed, map } from '@uci-utils/ready' import { Ready, changed, map } from '@uci-utils/ready'
// import { isClass } from '@uci-utils/type'
import { bindFuncs } from '@uci-utils/bind-funcs' import { bindFuncs } from '@uci-utils/bind-funcs'
// UCI logger // UCI logger
import logger from '@uci-utils/logger' import logger from '@uci-utils/logger'
@ -33,14 +35,19 @@ import EventEmitter from 'events'
import { cmdProcessor, defaultCmds, namespaces } from './processing' import { cmdProcessor, defaultCmds, namespaces } from './processing'
// Constants // Constants
// TODO transport will become "connector"
// type will be input or output, duplex
// TODO make sure reach registered connector has unique name
// const SOCKET_INFO_KEYS = ['name', 'type', 'transport'] // const SOCKET_INFO_KEYS = ['name', 'type', 'transport']
// TODO remove this hard-code and get these from plugin
const TRANSLATE = { const TRANSLATE = {
n: 'Named Pipe', n: 'Named Pipe',
t: 'TCP', t: 'TCP',
s: 'Socket', s: 'Socket',
c: 'Consumer', c: 'Consumer',
m: 'MQTT', m: 'MQTT',
w: 'Web Socket' w: 'Web Socket',
} }
/** /**
@ -68,13 +75,14 @@ const TRANSLATE = {
* let mybaseprocess = new Base(opts) * let mybaseprocess = new Base(opts)
*/ */
class Base extends EventEmitter { class Base extends EventEmitter {
constructor(opts={}) { constructor(opts = {}) {
if (typeof opts === 'string') opts=loadYaml.sync(opts) if (typeof opts === 'string') opts = loadYaml.sync(opts)
super() super()
this.name = opts.name || opts.appName || 'a base class instance' this.name = opts.name || opts.appName || 'a base class instance'
this.id = opts.id || 'uci-base:' + new Date().getTime() this.id = opts.id || 'uci-base:' + new Date().getTime()
log = logger({ name: 'base', id: this.id }) log = logger({ name: 'base', id: this.id })
this.desc = opts.desc // additional details for humans this.desc = opts.desc // additional details for humans
this.opts = opts // make original options available
this._socket = {} // holds all the various communication sockets this._socket = {} // holds all the various communication sockets
// these two if passed will get applied to all consumer sockets, otherwise socket defaults will be used // these two if passed will get applied to all consumer sockets, otherwise socket defaults will be used
/** @type {string} - timeout for connecting to a consumer socket */ /** @type {string} - timeout for connecting to a consumer socket */
@ -82,13 +90,19 @@ class Base extends EventEmitter {
this.retryWait = opts.retryWait this.retryWait = opts.retryWait
this.defaultReturnCmd = opts.defaultReturnCmd this.defaultReturnCmd = opts.defaultReturnCmd
this._cmdProcessors = { _default: cmdProcessor } this._cmdProcessors = { _default: cmdProcessor }
this.ready = new Ready({emitter: this}) this.ready = new Ready({ emitter: this })
// ready packet to be sent when process is "ready" // ready packet to be sent when process is "ready"
this._readyPacket = {cmd:'ready', event:`${this.name}:process`, name:this.name, id:this.id, ready:false} this._readyPacket = {
cmd: 'ready',
event: `${this.name}:process`,
name: this.name,
id: this.id,
ready: false,
}
// _c and _s are the default namespaces // _c and _s are the default namespaces
this._namespaces =Object.assign({},namespaces) this._namespaces = Object.assign({}, namespaces)
this._c = Object.assign({},defaultCmds.c) this._c = Object.assign({}, defaultCmds.c)
this._s = Object.assign({},defaultCmds.s) this._s = Object.assign({}, defaultCmds.s)
// make available a method that will bind a whole object tree of functions // make available a method that will bind a whole object tree of functions
// Note: functions called via a command namespace are called by base connext by default // Note: functions called via a command namespace are called by base connext by default
// if called directlly/locally they should be bound to base context if desired // if called directlly/locally they should be bound to base context if desired
@ -99,16 +113,34 @@ class Base extends EventEmitter {
this._namespaces.c.splice(-1, 0, null) this._namespaces.c.splice(-1, 0, null)
} }
// this.bindFuncs = bindFuncs // make available a method that will bind a whole object tree of functions // this.bindFuncs = bindFuncs // make available a method that will bind a whole object tree of functions
this.initSockets = opts.sockets // sockets to be registered at init
this._socket = {} // where all sockets are stored this._socket = {} // where all sockets are stored
// at creation defined sockets: // at creation defined sockets:
if (opts.port) this.registerSocket(`${opts.name||'base'}${opts.port&&opts.path ? ':t':''}`,'s','t',{port:opts.port}) // generic sockets
if (opts.path) this.registerSocket(`${opts.name||'base'}${opts.port&&opts.path ? ':n':''}`,'s','n',{path: opts.path}) opts.sockets = Array.isArray(opts.sockets) ? opts.sockets : [opts.sockets]
if (opts.sockets) { if (opts.port)
let sockets = opts.sockets opts.sockets.push({
sockets = Array.isArray(sockets) ? sockets:[sockets] name: `${this.name}:tcp`,
sockets.forEach(socket => this.registerSocket(socket)) type: 's',
} transport: 'tcp',
options: { port: opts.port },
})
if (opts.path)
opts.sockets.push({
name: `${this.name}:named`,
type: 's',
transport: 'named',
options: { path: opts.path },
})
// specific sockets
// if (opts.sockets) {
// let sockets = opts.sockets
// sockets = Array.isArray(sockets) ? sockets : [sockets]
// sockets.forEach((socket) => {
// socket.name = socket.name || `${this.name}:${socket.transport}`
// this.registerSocket(socket)
// })
// }
} // end constructor } // end constructor
/* /*
@ -125,86 +157,116 @@ class Base extends EventEmitter {
* @async * @async
* @public * @public
* @required * @required
* @param {array} sockets string of one or array array names to initialize, if none, then all current added sockets will be initialized * @param {array} sockets string of one or array of names to initialize, if none, then all current added sockets will be initialized
* *
*/ */
async init(sockets) { async init(sockets) {
// TODO ready needs to allow multiple all subscribers that get rebuilt on add/remove
// register all sockets requested in constructor
for (const socket of this.opts.sockets) {
socket.name = socket.name || `${this.name}:${socket.transport}` // check for unique name
await this.registerSocket(socket)
}
const res = await this.socketsInit(sockets) const res = await this.socketsInit(sockets)
// update ready packet and push/send that changed packet // will update ready packet and push/send that changed packet on ready state change
this.ready.all.subscribe(async ready => { // on can add more observers to the ready
this._readyPacket.ready= ready this.ready.all.subscribe(async (ready) => {
delete (this._readyPacket._header) this._readyPacket.ready = ready
if (!ready) { // make a list of the failures to send delete this._readyPacket._header
if (!ready) {
// make a list of the failures to send
// await new Promise(res=>setTimeout(()=>res(),1000)) // await new Promise(res=>setTimeout(()=>res(),1000))
this._readyPacket.failures = this.ready.failed this._readyPacket.failures = this.ready.failed
} else delete this._readyPacket.failures } else delete this._readyPacket.failures
let packet = Object.assign({},this._readyPacket) let packet = Object.assign({}, this._readyPacket)
this.emit('log',{level:'debug', packet:packet, msg:`${this.name} has an updated ready state: broadcasting: event>state = ${this._readyPacket.event}>${this._readyPacket.ready}`}) this.emit('log', {
level: 'debug',
packet: packet,
msg: `${this.name} has an updated ready state: broadcasting: event>state = ${this._readyPacket.event}>${this._readyPacket.ready}`,
})
// console.log('ready packet to broadcast',packet) // console.log('ready packet to broadcast',packet)
// set the connection packet for each socket // (re)sets the connection packet for each socket
this.getSocketsFilter({type:'s'}) this.getSocketsFilter({ type: 's' }).forEach(
.forEach(socket=>this.getSocket(socket).conPackets[0]=packet) (socket) => (this.getSocket(socket).conPackets[0] = packet)
)
// announce changed ready state // announce changed ready state
setTimeout(async () => { setTimeout(async () => {
this.send(packet) // to any socket that this instance is connected to this.send(packet) // to any socket that this instance is connected to
this.push(packet) // to any remote consumer connected to an instance socket this.push(packet) // to any remote consumer connected to an instance socket
},100) // delay 100ms, fixes issue so won't be sent during a disconnect which causes socket write error }, 100) // delay 100ms, fixes issue so won't be sent during a disconnect which causes socket write error
}) })
return res return res
} }
async socketsInit(sockets) { async socketsInit(sockets) {
let results = {} let results = {}
let errors = {} let errors = {}
// single socket intialize mapper // single socket intialize mapper
const initialize = async socket => { const initialize = async (socket) => {
return new Promise(async function(resolve) { return new Promise(
async function (resolve) {
try { try {
const value = await socket.init() const value = await socket.init()
this.emit('log',{level:'info', socketName:socket.name, msg:'socket successfully initialized', message:value}) this.emit('log', {
level: 'info',
socketName: socket.name,
msg: 'socket successfully initialized',
message: value,
})
resolve(value) resolve(value)
} catch (error) { } catch (error) {
this.emit('log',{level:'fatal', socketName:socket.name, msg:'socket init error',error:error})// emit an error here, remove socket this.emit('log', {
level: 'fatal',
socketName: socket.name,
msg: 'socket init error',
error: error,
}) // emit an error here, remove socket
// let res = await this.removeSocket(socket.name) // let res = await this.removeSocket(socket.name)
errors[socket.name]={error:error} errors[socket.name] = { error: error }
resolve(error) resolve(error)
} }
}.bind(this)) }.bind(this)
)
} }
let inits = [] let inits = []
if (!sockets) { sockets = if (!sockets) {
Object.keys(this._socket).filter(name => { sockets = Object.keys(this._socket).filter((name) => {
return !this._socket[name].active // only intialize (connect) inactive sockets return !this._socket[name].active // only intialize (connect) inactive sockets
}) })
} }
if (typeof sockets ==='string') sockets = [sockets] if (typeof sockets === 'string') sockets = [sockets]
sockets.forEach(name => { sockets.forEach((name) => {
if (this._socket[name]) { if (this._socket[name]) {
inits.push({name:name, init:this.getSocketInit(name)}) inits.push({ name: name, init: this.getSocketInit(name) })
} else log.warn({msg:`no socket registered by name of ${name} to initialize`}) } else
log.warn({
msg: `no socket registered by name of ${name} to initialize`,
})
}) })
let [err] = await to(Promise.all(inits.map(initialize))) let [err] = await to(Promise.all(inits.map(initialize)))
if (err) { if (err) {
this.emit('log',{level:'fatal', msg:'initialize of socket errors was NOT caught --- bad bad',error:err}) this.emit('log', {
return {errors:[err]} level: 'fatal',
msg: 'initialize of socket errors was NOT caught --- bad bad',
error: err,
})
return { errors: [err] }
} }
if (Object.keys(errors).length===0) errors=false if (Object.keys(errors).length === 0) errors = false
return {results:results, errors:errors} return { results: results, errors: errors }
} }
// support old name for now // support old name for now
async addSocket(name,type,transport,options) { // async addSocket(name, type, transport, options) {
return this.registerSocket(name,type,transport,options) // return this.registerSocket(name, type, transport, options);
} // }
// TODO use plugins for transports instead
/** /**
* addSocket - Add a socket at runtime as opposed to via the sockets option at creation * registerSocket - register a socket for use
* This is not async and will NOT initialize the socket, that must be done with a call to init or socketInit * This is not async and will NOT initialize the socket, that must be done with a call to init or socketInit
* @param {type} name Name of socket (usually something short but unique) * @param {type} name Name of socket (usually something short but unique)
* @param {string} [type=c] consumer/client 'c' or socket/server 's' * @param {string} [type=c] consumer/client 'c' or socket/server 's'
@ -213,28 +275,64 @@ class Base extends EventEmitter {
* *
* @returns {function} if called before base initialzation it can be ignored as all added sockets will be initialized. After through it be called to initialize that socket * @returns {function} if called before base initialzation it can be ignored as all added sockets will be initialized. After through it be called to initialize that socket
*/ */
registerSocket(name, type = 'c', transport = 'n', options = {}) { async registerSocket(name, type = 'c', transport, options = {}) {
if (isPlainObject(name)) ({name, type = 'c', transport = 'n', options = {}} = name) if (isPlainObject(name))
if (typeof name !=='string') return null ({ name, type = 'c', transport, options = {} } = name)
if (typeof name !== 'string') return null
transport = transport || (options.port ? 'tcp' : 'named')
transport = this._validateTransport(transport) transport = this._validateTransport(transport)
log.debug({ socketName: name, type: type, tranport: transport, options: options, method:'addSocket', line:198, msg:`adding socket ${name}`}) if (!transport) {
log.error({
socketName: name,
type: type,
transport: transport,
options: options,
method: 'registerSocket',
msg: `invalid transport ${transport}`,
})
return null
}
log.info({
socketName: name,
type: type,
tranport: transport,
options: options,
method: 'registerSocket',
line: 198,
msg: `registering socket ${name}`,
})
options.id = options.id || name options.id = options.id || name
options.name = options.name || name options.name = options.name || name
// TODO add a 'd' type for duplex which creates an 's' first and waits on connect to make a 'c' // TODO add a 'd' type for duplex which creates an 's' first and waits on connect to make a 'c'
if (type==='c') options = Object.assign({ if (type === 'c')
initTimeout:this.initTimeout, options = Object.assign(
retryWait:this.retryWait {
},options) // outbound initTimeout: this.initTimeout,
if (type==='s') { retryWait: this.retryWait,
},
options
) // outbound
if (type === 's') {
let conPackets = [] // [this._readyPacket] let conPackets = [] // [this._readyPacket]
conPackets = options.conPackets ? conPackets.concat(options.conPackets) : conPackets conPackets = options.conPackets
conPackets = options.conPacket ? conPackets.push(options.conPacket) : conPackets ? conPackets.concat(options.conPackets)
options = Object.assign({ : conPackets
defaultReturnCmd:this.defaultReturnCmd, conPackets = options.conPacket
conPackets: conPackets ? conPackets.push(options.conPacket)
},options) // inbound : conPackets
options = Object.assign(
{
defaultReturnCmd: this.defaultReturnCmd,
conPackets: conPackets,
},
options
) // inbound
} }
// TODO get rid of hard coded transports and use registered transports (t and n being default) // TODO get rid of hard coded transports and use registered transports (t and n being default)
// TODO dynamically import only requested connector plugins
// Base will pass options to plugin
// plugin will pass a unique transport identifier to base
// all plugins will use a function to generate a class instance
switch (transport) { switch (transport) {
case 'n': case 'n':
options.path = options.path || true options.path = options.path || true
@ -244,7 +342,7 @@ class Base extends EventEmitter {
break break
case 'm': case 'm':
if (type === 'p') type = 'c' if (type === 'p') type = 'c'
options.client = type==='c' ? true : false options.client = type === 'c' ? true : false
options.connect = options.connect || {} options.connect = options.connect || {}
if (options.host) options.connect.host = options.host if (options.host) options.connect.host = options.host
if (options.port) options.connect.port = options.port if (options.port) options.connect.port = options.port
@ -252,10 +350,11 @@ class Base extends EventEmitter {
this._socket[name] = new MQTT(options) this._socket[name] = new MQTT(options)
break break
case 'w': case 'w':
if (type === 's') this._socket[name] = new WebSocket(options) if (type === 's') this._socket[name] = await WebSocket(options)
} }
if (this._socket[name]) { // in case of invalid transport if (this._socket[name]) {
// in case of invalid transport
this._socket[name].name = name this._socket[name].name = name
this._socket[name].type = type this._socket[name].type = type
@ -265,36 +364,58 @@ class Base extends EventEmitter {
// bubble up events from inidivual sockets to base instance, // bubble up events from inidivual sockets to base instance,
// connection:consumer is a socket emitting when a consumer is connecting // connection:consumer is a socket emitting when a consumer is connecting
// connection:socket is a consumer emiting when connecting to a socket // connection:socket is a consumer emiting when connecting to a socket
const EVENTS=['log','socket','connection','connection:consumer', 'connection:socket'] // that should emit up from each socket to instance const EVENTS = [
EVENTS.forEach(event => { 'log',
this._socket[name].on(event, obj => { 'socket',
'connection',
'connection:consumer',
'connection:socket',
] // that should emit up from each socket to instance
EVENTS.forEach((event) => {
this._socket[name].on(event, (obj) => {
if (Object.prototype.toString.call(obj) !== '[object Object]') { if (Object.prototype.toString.call(obj) !== '[object Object]') {
let data=obj let data = obj
obj = {} obj = {}
obj.data = data obj.data = data
} }
obj.socketName = name obj.socketName = name
this.emit(event,obj) this.emit(event, obj)
}) })
}) })
if (type==='c') { if (type === 'c') {
// when consumer has sucessfully connected to a socket // when consumer has sucessfully connected to a socket
this._socket[name].obsName = `${name}:${options.path ? options.path : `${options.host}:${options.port}`}<outbound>` this._socket[name].obsName = `${name}:${
this.ready.addObserver(this._socket[name].obsName,this._socket[name],{event:'connection:socket',condition:ev=>ev.state==='connected'}) options.path ? options.path : `${options.host}:${options.port}`
}<outbound>`
this.ready.addObserver(this._socket[name].obsName, this._socket[name], {
event: 'connection:socket',
condition: (ev) => ev.state === 'connected',
})
// set up listner for any pushed packets and emit locally // set up listner for any pushed packets and emit locally
this._socket[name].on('pushed', packet => { this._socket[name].on('pushed', (packet) => {
packet._header.socketName=name packet._header.socketName = name
this.emit('pushed', packet) this.emit('pushed', packet)
}) })
} }
if (type==='s') { if (type === 's') {
// when socket is listnening // when socket is listnening
this.ready.addObserver(`${name}:socket<listening>`,this._socket[name],{ event:'socket', condition: ev => (ev || {}).state ==='listening' }) this.ready.addObserver(
`${name}:socket<listening>`,
this._socket[name],
{
event: 'socket',
condition: (ev) => (ev || {}).state === 'listening',
}
)
// initially set conPackets, ready packets is ALWAYS the first // initially set conPackets, ready packets is ALWAYS the first
this._socket[name].conPackets.unshift(this._readyPacket) this._socket[name].conPackets.unshift(this._readyPacket)
if (options.duplex) this.consumerConnected(this._socket[name],{consumer:options.duplex, add:true}) if (options.duplex)
this.consumerConnected(this._socket[name], {
consumer: options.duplex,
add: true,
})
} }
return this._socket[name] // return handle to newly registered socket return this._socket[name] // return handle to newly registered socket
// return this.getSocketInit(name) // returns the init function (e.g. connect or create) for the socket // return this.getSocketInit(name) // returns the init function (e.g. connect or create) for the socket
@ -310,18 +431,33 @@ class Base extends EventEmitter {
async removeSocket(name) { async removeSocket(name) {
// NOTE: uci consumers have .end renamed as .close to match socket method for convenience // NOTE: uci consumers have .end renamed as .close to match socket method for convenience
if (typeof name !=='string') return 'no socket name passed, nothing to remove' if (typeof name !== 'string')
return 'no socket name passed, nothing to remove'
const socket = this.getSocket(name) const socket = this.getSocket(name)
if (!socket) return 'no socket by that name, nothing to remove' if (!socket) return 'no socket by that name, nothing to remove'
let closeError let closeError
if (typeof socket.close !== 'function') return 'bad socket no close function, nothing to remove' if (typeof socket.close !== 'function')
return 'bad socket no close function, nothing to remove'
let [err] = await to(socket.close()) let [err] = await to(socket.close())
if(err) if (err.code !== 'ERR_SERVER_NOT_RUNNING') { if (err)
closeError = {socket:this._socket[name].name, error:err, msg:'socket/consumer closed with errors, but removed'} if (err.code !== 'ERR_SERVER_NOT_RUNNING') {
closeError = {
socket: this._socket[name].name,
error: err,
msg: 'socket/consumer closed with errors, but removed',
} }
this.emit('log', {level:'warn', msg:`socket ${name} has been removed`, socket:this._socket[name].opts}) }
this.emit('log', {
level: 'warn',
msg: `socket ${name} has been removed`,
socket: this._socket[name].opts,
})
socket.removeAllListeners() socket.removeAllListeners()
this.ready.removeObserver(socket.type==='c' ? this._socket[name].obsName : `${name}:socket<listening>`) this.ready.removeObserver(
socket.type === 'c'
? this._socket[name].obsName
: `${name}:socket<listening>`
)
delete this._socket[name] delete this._socket[name]
return closeError ? closeError : 'success' return closeError ? closeError : 'success'
} }
@ -331,29 +467,37 @@ class Base extends EventEmitter {
else return this._socket else return this._socket
} }
// returns array of names of sockets that pass filter // returns array of names of sockets that pass filter
getSocketsFilter({type, trans, active}={}) { getSocketsFilter({ type, trans, active } = {}) {
if (trans) trans = this._validateTransport(trans) if (trans) trans = this._validateTransport(trans)
let filtered = [] let filtered = []
Object.keys(this._socket).forEach(name => { Object.keys(this._socket).forEach((name) => {
if ((type==null || this._socket[name].type === type) if (
&& (trans==null || this._socket[name].transport === trans) (type == null || this._socket[name].type === type) &&
&& (active==null || this._socket[name].active===active)) filtered.push(name) (trans == null || this._socket[name].transport === trans) &&
(active == null || this._socket[name].active === active)
)
filtered.push(name)
}) })
return filtered return filtered
} }
getConsumers(filter={}) { getConsumers(filter = {}) {
filter.type='c' filter.type = 'c'
return this.getSocketsFilter(filter) return this.getSocketsFilter(filter)
} }
getSocketInit(name) { getSocketInit(name) {
let socket = this._socket[name] let socket = this._socket[name]
if(!socket) { if (!socket) {
log.warn({msg:`can't fetch create/connect function, no socket registered by name of ${name}`}) log.warn({
msg: `can't fetch create/connect function, no socket registered by name of ${name}`,
})
return null return null
} }
if (this._socket[name].type === 's' && this._socket[name].transport !== 'm') { if (
this._socket[name].type === 's' &&
this._socket[name].transport !== 'm'
) {
return socket.create return socket.create
} else { } else {
return socket.connect return socket.connect
@ -378,48 +522,67 @@ class Base extends EventEmitter {
if (!con) return `no consumer ${name} for sending` if (!con) return `no consumer ${name} for sending`
names = [name] names = [name]
} }
if (!packet || !Object.keys(packet).length) return 'no packet to send - aborted' if (!packet || !Object.keys(packet).length)
return 'no packet to send - aborted'
let sends = [] let sends = []
if (!names.length) return 'no consumers available for send, aborting' if (!names.length) return 'no consumers available for send, aborting'
for (let name of names) { for (let name of names) {
const consumer=this.getSocket(name) const consumer = this.getSocket(name)
let hookedPacket = {} let hookedPacket = {}
hookedPacket = consumer.beforeSend ? await consumer.beforeSend.call(this,Object.assign({},packet)) : packet hookedPacket = consumer.beforeSend
log.debug({msg:'after hook, sending packet', name:consumer.name, packet:hookedPacket, method:'send'}) ? await consumer.beforeSend.call(this, Object.assign({}, packet))
sends.push(consumer.send.bind(consumer,hookedPacket)) : packet
log.debug({
msg: 'after hook, sending packet',
name: consumer.name,
packet: hookedPacket,
method: 'send',
})
sends.push(consumer.send.bind(consumer, hookedPacket))
} }
if (sends.length === 1) return await sends[0]() if (sends.length === 1) return await sends[0]()
return Promise.all( return Promise.all(
sends.map(send => { sends.map((send) => {
return send() return send()
}) })
) )
} }
async push(packet,opts={}) { async push(packet, opts = {}) {
if (!packet || !Object.keys(packet).length) return Promise.resolve('no packet to push - aborted') if (!packet || !Object.keys(packet).length)
let sockets = this.getSocketsFilter({type:'s'}) return Promise.resolve('no packet to push - aborted')
let sockets = this.getSocketsFilter({ type: 's' })
if (!sockets.length) return Promise.resolve('no sockets on which to push') if (!sockets.length) return Promise.resolve('no sockets on which to push')
opts.sockets = opts.sockets ? opts.sockets : (opts.socket ? [opts.socket] : []) opts.sockets = opts.sockets
if (opts.sockets.length) sockets = sockets.filter(name=>opts.sockets.includes(name)) ? opts.sockets
: opts.socket
? [opts.socket]
: []
if (opts.sockets.length)
sockets = sockets.filter((name) => opts.sockets.includes(name))
sockets = sockets sockets = sockets
.map(name=>this.getSocket(name)) .map((name) => this.getSocket(name))
.filter(sock=> (opts.transport && opts.transport !=='all') ? sock.transport=== this._validateTransport(opts.transport) : true) .filter((sock) =>
opts.transport && opts.transport !== 'all'
? sock.transport === this._validateTransport(opts.transport)
: true
)
// console.log(sockets.length, 'sockets for push', sockets.map(socket=>socket.name)) // console.log(sockets.length, 'sockets for push', sockets.map(socket=>socket.name))
if (!sockets.length) return Promise.resolve('no sockets on which to push') if (!sockets.length) return Promise.resolve('no sockets on which to push')
let broadcast = [] let broadcast = []
// TODO use map and reflect // TODO use map and reflect
for (let socket of sockets) { for (let socket of sockets) {
let hookedPacket = {} let hookedPacket = {}
hookedPacket = socket.beforeSend ? await socket.beforeSend.call(this,Object.assign({},packet),true) : packet hookedPacket = socket.beforeSend
broadcast.push(socket.push.bind(socket,hookedPacket,opts)) ? await socket.beforeSend.call(this, Object.assign({}, packet), true)
: packet
broadcast.push(socket.push.bind(socket, hookedPacket, opts))
} }
return Promise.all( return Promise.all(
broadcast.map(push => { broadcast.map((push) => {
return push() return push()
}) })
) )
} }
// TODO accept alt transport string i.e. t or TCP // TODO accept alt transport string i.e. t or TCP
async sendTransport(packet, transport) { async sendTransport(packet, transport) {
@ -433,7 +596,7 @@ class Base extends EventEmitter {
} }
if (sends.length === 1) return sends[0](packet) if (sends.length === 1) return sends[0](packet)
return Promise.all( return Promise.all(
sends.map(send => { sends.map((send) => {
return send(packet) return send(packet)
}) })
) )
@ -451,15 +614,13 @@ class Base extends EventEmitter {
// TODO add sendMQTT, sendWS // TODO add sendMQTT, sendWS
socketsListen(event, fn) {
socketsListen(event,fn) { this._eventListen('s', event, fn)
this._eventListen('s',event,fn)
} }
consumersListen(event,fn) { consumersListen(event, fn) {
this._eventListen('c',event,fn) this._eventListen('c', event, fn)
} }
getPacketByName(name, packets) { getPacketByName(name, packets) {
if (!packets.length) packets = [packets] if (!packets.length) packets = [packets]
let found = {} let found = {}
@ -474,9 +635,10 @@ class Base extends EventEmitter {
// add set of functions to class prop/space and then register with this // add set of functions to class prop/space and then register with this
addNamespace(space, type, trans) { addNamespace(space, type, trans) {
if (type !=='c' && type !=='s') { if (type !== 'c' && type !== 's') {
trans = type trans = type
type = 's' } type = 's'
}
trans = this._validateTransport(trans) trans = this._validateTransport(trans)
if (trans) return this._namespaces[type + trans].unshift(space) if (trans) return this._namespaces[type + trans].unshift(space)
else return this._namespaces[type].unshift(space) else return this._namespaces[type].unshift(space)
@ -485,35 +647,40 @@ class Base extends EventEmitter {
// object of functions, key is cmd name // object of functions, key is cmd name
amendCommands(funcs, trans, type) { amendCommands(funcs, trans, type) {
if (!trans && !type) type = 's' if (!trans && !type) type = 's'
if (trans ==='c' || trans ==='s') { if (trans === 'c' || trans === 's') {
type = trans type = trans
trans = '' trans = ''
} }
trans = this._validateTransport(trans) trans = this._validateTransport(trans)
if (!this['_'+type+trans]) this['_'+type+trans] = {} if (!this['_' + type + trans]) this['_' + type + trans] = {}
Object.assign(this['_'+type+trans], funcs) // trans is type here Object.assign(this['_' + type + trans], funcs) // trans is type here
log.debug({msg:'amended namespace', id:this.id, default_key:'_'+type+trans, functions:this['_'+type+trans]}) log.debug({
msg: 'amended namespace',
id: this.id,
default_key: '_' + type + trans,
functions: this['_' + type + trans],
})
} }
amendConsumerCommands(funcs, trans) { amendConsumerCommands(funcs, trans) {
this.amendCommands(funcs,trans,'c') this.amendCommands(funcs, trans, 'c')
} }
amendSocketCommands(funcs, trans) { amendSocketCommands(funcs, trans) {
this.amendCommands(funcs,trans) this.amendCommands(funcs, trans)
} }
// func should take and return a packet. if type // func should take and return a packet. if type
beforeSendHook(func,opts) { beforeSendHook(func, opts) {
this._packetHook('beforeSend', func,opts) this._packetHook('beforeSend', func, opts)
} }
beforeProcessHook(func,opts) { beforeProcessHook(func, opts) {
this._packetHook('beforeProcess', func,opts) this._packetHook('beforeProcess', func, opts)
} }
afterProcessHook(func,opts) { afterProcessHook(func, opts) {
this._packetHook('afterProcess', func,opts) this._packetHook('afterProcess', func, opts)
} }
// A Big Hammer - use only if necessary - default with hooks should suffice // A Big Hammer - use only if necessary - default with hooks should suffice
@ -544,26 +711,38 @@ class Base extends EventEmitter {
} }
// a call to this method will (make or add) return and or subscribe a ready observer for incoming consumer connections // a call to this method will (make or add) return and or subscribe a ready observer for incoming consumer connections
consumerConnected (socket,opts={}) { consumerConnected(socket, opts = {}) {
let { subscribe, consumer, name, add} = opts let { subscribe, consumer, name, add } = opts
const conditionHandler = async ev => { const conditionHandler = async (ev) => {
if ((ev||{}).state ==='connected'){ if ((ev || {}).state === 'connected') {
let data = (ev.data ||{}) let data = ev.data || {}
if (consumer) { // specific consumer check if (consumer) {
if (data.name === consumer || [ev.name, ev.id, data.name, data.id].some(name => (name||'').includes(consumer)) ) return true // specific consumer check
if (
data.name === consumer ||
[ev.name, ev.id, data.name, data.id].some((name) =>
(name || '').includes(consumer)
)
)
return true
} else return true } else return true
} }
return false return false
} }
if (typeof socket ==='string') socket = this.getSocket(socket) if (typeof socket === 'string') socket = this.getSocket(socket)
name = name || consumer name = name || consumer
add = add && consumer add = add && consumer
const options = {event:'connection:consumer',condition:conditionHandler} const options = {
event: 'connection:consumer',
condition: conditionHandler,
}
let oname = `${name}:consumer>${socket.name}:socket<inbound>` let oname = `${name}:consumer>${socket.name}:socket<inbound>`
const obs = add ? this.ready.addObserver(oname,socket,options) : this.ready.makeObserver(socket,options) const obs = add
if (typeof subscribe ==='function') return obs.subscribe(subscribe) ? this.ready.addObserver(oname, socket, options)
: this.ready.makeObserver(socket, options)
if (typeof subscribe === 'function') return obs.subscribe(subscribe)
return obs return obs
} // end consumerConnected } // end consumerConnected
@ -574,68 +753,108 @@ class Base extends EventEmitter {
* *
*/ */
// options allow applying hook function to specific socket or type or transport, default is all type 's' sockets // options allow applying hook function to specific socket or type or transport, default is all type 's' sockets
_packetHook(hook,func,opts) { _packetHook(hook, func, opts) {
log.debug({msg:'hooking a socket(s)', method:'_packetHook', line:334, hook:hook, function:func, options:opts}) log.debug({
let {name,type,trans,all} = opts msg: 'hooking a socket(s)',
if (opts==null) type = 's' // default is all type 's' sockets method: '_packetHook',
line: 334,
hook: hook,
function: func,
options: opts,
})
let { name, type, trans, all } = opts
if (opts == null) type = 's' // default is all type 's' sockets
if (name) this._socket[name][hook] = func if (name) this._socket[name][hook] = func
else { else {
log.debug({msg:'sockets available to hook', method:'_packetHook', line:338, sockets: Object.keys(this._socket)}) log.debug({
msg: 'sockets available to hook',
method: '_packetHook',
line: 338,
sockets: Object.keys(this._socket),
})
for (let name of Object.keys(this._socket)) { for (let name of Object.keys(this._socket)) {
if (this._socket[name].type === type) this._socket[name][hook] = func if (this._socket[name].type === type) this._socket[name][hook] = func
if (this._socket[name].transport === trans) this._socket[name][hook] = func if (this._socket[name].transport === trans)
this._socket[name][hook] = func
if (all) this._socket[name][hook] = func if (all) this._socket[name][hook] = func
if (this._socket[name][hook]) log.debug({msg:'hooked socket', method:'_packetHook', line:343, name:name, type:this._socket[name].type, trans:this._socket[name].transport}) if (this._socket[name][hook])
log.debug({
msg: 'hooked socket',
method: '_packetHook',
line: 343,
name: name,
type: this._socket[name].type,
trans: this._socket[name].transport,
})
} }
} }
} }
/* /*
********** main packet processor for all sockets ********** main packet processor for all sockets
* supports per socket before and after hook processors * supports per socket before and after hook processors
* supports additonal registered processors called via packet or socket name, with default processor, * supports additonal registered processors called via packet or socket name, with default processor,
*/ */
async _packetProcess(socket_name, packet) { async _packetProcess(socket_name, packet) {
if (!packet || !Object.keys(packet).length) packet = {error:'no packet to process'} if (!packet || !Object.keys(packet).length)
if (!socket_name || !this.getSocket(socket_name)) packet.error = 'no socket name passed for packet processing' packet = { error: 'no packet to process' }
if (!this.getSocket(socket_name)) packet.error = `socket by name of ${socket_name}` if (!socket_name || !this.getSocket(socket_name))
packet.error = 'no socket name passed for packet processing'
if (!this.getSocket(socket_name))
packet.error = `socket by name of ${socket_name}`
if (packet.error) { if (packet.error) {
this.emit('log', {level:'error', error:packet.error, packet:packet, msg:'an error occured before processing an incoming packet'}) this.emit('log', {
level: 'error',
error: packet.error,
packet: packet,
msg: 'an error occured before processing an incoming packet',
})
return packet // don't process a packet with an error return packet // don't process a packet with an error
} }
// TODO allow adding to or altering the process map // TODO allow adding to or altering the process map
let processors = new Map([ let processors = new Map([
['before', this.getSocket(socket_name).beforeProcess ], ['before', this.getSocket(socket_name).beforeProcess],
['command', this._cmdProcessors[packet.cmdProcessor || this._cmdProcessors[socket_name] ? socket_name : '_default'] ], [
['after', this.getSocket(socket_name).afterProcess ], 'command',
this._cmdProcessors[
packet.cmdProcessor || this._cmdProcessors[socket_name]
? socket_name
: '_default'
],
],
['after', this.getSocket(socket_name).afterProcess],
]) ])
let err let err
for (let [name,func] of processors) { // the same as of recipeMap.entries() for (let [name, func] of processors) {
[err,packet] = await to(this._process(socket_name,packet,name,func)) // the same as of recipeMap.entries()
[err, packet] = await to(this._process(socket_name, packet, name, func))
if (err) packet.error = err if (err) packet.error = err
} }
return packet return packet
} }
async _process(socket_name,packet,name,func) { async _process(socket_name, packet, name, func) {
if (packet.error) return packet // if an error occurs skip any further processing if (packet.error) return packet // if an error occurs skip any further processing
let err, res let err, res
if (func) { if (func) {
[err,res] = await to(func.call(this,packet,socket_name)) [err, res] = await to(func.call(this, packet, socket_name))
if (err) { // forced an abort to processing if (err) {
// forced an abort to processing
packet.error = err packet.error = err
} else { } else {
if (!isPlainObject(res)) packet.processResult ? packet.processResult[name]=res : packet.processResult = {[name]:res} if (!isPlainObject(res))
packet.processResult
? (packet.processResult[name] = res)
: (packet.processResult = { [name]: res })
else { else {
let method = (packet.processMethod || {})[name] || packet.processMethod let method =
(packet.processMethod || {})[name] || packet.processMethod
// TODO could support other specialized merge methods // TODO could support other specialized merge methods
if (method === 'merge') { if (method === 'merge') {
packet = merge(packet,res) packet = merge(packet, res)
} } else {
else { packet = res
packet=res
} }
} }
} }
@ -645,37 +864,47 @@ class Base extends EventEmitter {
// all sockets are emitters. Adds a listener to all sockets of a type with given event. // all sockets are emitters. Adds a listener to all sockets of a type with given event.
// now sockets can emit locally processed events // now sockets can emit locally processed events
_eventListen(type,event,fn) { _eventListen(type, event, fn) {
for (let name of Object.keys(this._socket)) { for (let name of Object.keys(this._socket)) {
if (this._socket[name].type === type) { if (this._socket[name].type === type) {
if (fn==='stop') this._socket[name].removeAllListeners(event) if (fn === 'stop') this._socket[name].removeAllListeners(event)
else { else {
log.debug({socket:name, type:type, event:event, msg:'adding listener to socket'}) log.debug({
socket: name,
type: type,
event: event,
msg: 'adding listener to socket',
})
this._socket[name].on(event, fn) this._socket[name].on(event, fn)
} }
} }
} }
} }
_validateTransport(trans, type='s') { _validateTransport(trans, type = 's') {
const valids = { const valids = {
w:'w', w: 'w',
web:'w', web: 'w',
n:'n', n: 'n',
named:'n', named: 'n',
unix:'n', unix: 'n',
pipe:'n', pipe: 'n',
t:'t', t: 't',
tcp:'t', tcp: 't',
net:'t', net: 't',
network:'t', network: 't',
m:'m', m: 'm',
mqtt:'m', mqtt: 'm',
} }
trans = valids[trans] || '' trans = valids[trans] || null
if (type !== 's' && trans ==='w') { if (type !== 's' && trans === 'w') {
log.warn({type: type, transport: trans, msg:'Invalid type/transport - Consumer/Client Web Socket not supported use TCP'}) log.warn({
trans = '' type: type,
transport: trans,
msg:
'Invalid type/transport - Consumer/Client Web Socket not supported use TCP',
})
trans = null
} }
return trans return trans
} }
@ -693,7 +922,7 @@ class Base extends EventEmitter {
_getCmdFuncNamespace(cmd, namespaces) { _getCmdFuncNamespace(cmd, namespaces) {
let cmd_func = null let cmd_func = null
namespaces.some(namespace => { namespaces.some((namespace) => {
namespace = namespace ? namespace + '.' + cmd : cmd namespace = namespace ? namespace + '.' + cmd : cmd
cmd_func = this._getCmdFunc(namespace) cmd_func = this._getCmdFunc(namespace)
if (cmd_func) return true if (cmd_func) return true
@ -710,7 +939,14 @@ class Base extends EventEmitter {
var prop = cmd.shift() var prop = cmd.shift()
if (cmd.length === 0) return obj[prop] if (cmd.length === 0) return obj[prop]
if (!obj[prop]) return null if (!obj[prop]) return null
log.debug({length:cmd.length,cmd:cmd, prop:prop, objprop:obj[prop], method:'_getCmdFunc', msg:'command to corresponding function in a hash'}) log.debug({
length: cmd.length,
cmd: cmd,
prop: prop,
objprop: obj[prop],
method: '_getCmdFunc',
msg: 'command to corresponding function in a hash',
})
return this._getCmdFunc(cmd, obj[prop]) return this._getCmdFunc(cmd, obj[prop])
} }
@ -720,16 +956,15 @@ class Base extends EventEmitter {
packet.cmd, packet.cmd,
this._namespaces[this._type(socket) + this._transport(socket)] this._namespaces[this._type(socket) + this._transport(socket)]
) )
if (cmd_func) return await cmd_func.call(this,packet) // todo try .call if (cmd_func) return await cmd_func.call(this, packet) // todo try .call
cmd_func = this._getCmdFuncNamespace( cmd_func = this._getCmdFuncNamespace(
packet.cmd, packet.cmd,
this._namespaces[this._type(socket)] this._namespaces[this._type(socket)]
) )
if (cmd_func) return await cmd_func.call(this,packet) if (cmd_func) return await cmd_func.call(this, packet)
return 'failed' return 'failed'
} }
} // end Base Class } // end Base Class
export default Base export default Base
export { Base, map, changed, isPlainObject, to, merge, loadYaml } // todo share rxjs export { Base, Ready, map, changed, isPlainObject, to, merge, loadYaml } // todo share rxjs

View File

@ -1,6 +1,6 @@
import to from 'await-to-js' import to from 'await-to-js'
import logger from '@uci-utils/logger' import logger from '@uci-utils/logger'
let log = logger({ package: 'base',file:'processing.js'}) let log = logger({ package: 'base', file: 'processing.js' })
// this._processing refers to this module/hash // this._processing refers to this module/hash
@ -9,35 +9,57 @@ let log = logger({ package: 'base',file:'processing.js'})
// messaging errors on socket will not be fatal to the entire socket server // messaging errors on socket will not be fatal to the entire socket server
// common processor, will call based on type s or c the ones below // common processor, will call based on type s or c the ones below
const cmdProcessor = async function (packet,socket) { const cmdProcessor = async function (packet, socket) {
let [err,res] = await to(_process[this.getSocket(socket).type].call(this,packet,socket)) let [err, res] = await to(
_process[this.getSocket(socket).type].call(this, packet, socket)
)
if (err) { if (err) {
let error = {cmd:'error', error:err, packet:packet, socket:socket, function:'processor', line: 15, msg:`'unhandled error in packet command function ${packet.cmd}`} let error = {
cmd: 'error',
error: err,
packet: packet,
socket: socket,
function: 'processor',
line: 15,
msg: `'unhandled error in packet command function ${packet.cmd}`,
}
log.error(error) log.error(error)
res = Object.assign({},packet,error) res = Object.assign({}, packet, error)
if (process.env.UCI_PUSH_UNHANDLED==='true') this.push(res) if (process.env.UCI_PUSH_UNHANDLED === 'true') this.push(res)
if (process.env.UCI_SHOW_UNHANDLED==='true') console.log(error) if (process.env.UCI_SHOW_UNHANDLED === 'true') console.log(error)
} }
return res return res
} }
// default processors for socket/server and consumer/client // default processors for socket/server and consumer/client
const _process = { const _process = {
s: async function (packet,socket) { s: async function (packet, socket) {
if (!packet.cmd) return {error: 'no command (cmd:) in packet for socket', packet: packet } if (!packet.cmd)
return {
error: 'no command (cmd:) in packet for socket',
packet: packet,
}
// this call will search the namespace and envoke a function and return a repsonse packet // this call will search the namespace and envoke a function and return a repsonse packet
let response = await this._callCmdFunc(packet,socket); if(response!=='failed') return response let response = await this._callCmdFunc(packet, socket)
return {error: 'no socket processing function supplied for command', packet: packet } if (response !== 'failed') return response
return {
error: 'no socket processing function supplied for command',
packet: packet,
}
}, },
c: async function (packet,socket) { c: async function (packet, socket) {
// the the end of life for a consumer packet that has been sent and returned or a packet that was pushed. // the the end of life for a consumer packet that has been sent and returned or a packet that was pushed.
if (packet.error) packet.cmd='error' if (packet.error) packet.cmd = 'error'
if (!packet.cmd) packet.cmd ='reply' if (!packet.cmd) packet.cmd = 'reply'
let response = await this._callCmdFunc(packet,socket); if(response!=='failed') return response let response = await this._callCmdFunc(packet, socket)
packet = {error:`no consumer return processing function supplied for ${packet.cmd}`, packet:packet} if (response !== 'failed') return response
this._c.error(packet) packet = {
error: `no consumer return processing function supplied for ${packet.cmd}`,
packet: packet,
} }
this._c.error(packet)
},
} }
// default name spaces // default name spaces
@ -55,13 +77,13 @@ const namespaces = {
} }
/* /*
* *
* Default packet command processing functions * Default packet command processing functions
* *
*/ */
const defaultCmds ={ const defaultCmds = {
s:{ s: {
echo: function (packet) { echo: function (packet) {
packet.processed = true packet.processed = true
packet.msg = 'default socket echo' packet.msg = 'default socket echo'
@ -71,36 +93,81 @@ const defaultCmds ={
ack: async function (packet) { ack: async function (packet) {
packet.cmd = 'reply' packet.cmd = 'reply'
packet.ack = true packet.ack = true
packet.msg = 'this is the base default ack, superceed in your instance or extended class' packet.msg =
'this is the base default ack, superceed in your instance or extended class'
return packet return packet
}, },
ready: async function (packet) { ready: async function (packet) {
const event = packet.event || packet.name || packet.id const event = packet.event || packet.name || packet.id
delete(packet._header) delete packet._header
this.emit(event,packet.ready,packet) this.emit(event, packet.ready, packet)
this.emit('log', {level:'ready', msg:'change in ready state received via send', ready:packet.ready, packet:packet}) this.emit('ready', packet, packet)
setTimeout(()=>this.emit('log', {level:'state', msg:'new ready state', state:this.ready.state}),1000) this.emit('log', {
return {cmd:'reply', msg:'consumer sent event was emitted event at socket process', event:event} level: 'ready',
msg: 'change in ready state received via send',
ready: packet.ready,
packet: packet,
})
setTimeout(
() =>
this.emit('log', {
level: 'state',
msg: 'new ready state',
state: this.ready.state,
}),
1000
)
return {
cmd: 'reply',
msg: 'consumer sent event was emitted event at socket process',
event: event,
} }
}, },
c:{ },
error: function (packet) { // default c: {
log.error({error:packet.error, packet:packet, msg:'==========Consumer Sent Packet returned with ERROR ========='}) error: function (packet) {
// default
log.error({
error: packet.error,
packet: packet,
msg: '==========Consumer Sent Packet returned with ERROR =========',
})
return packet return packet
}, },
reply: function(packet) { reply: function (packet) {
if (process.env.UCI_ENV==='dev') log.debug({packet:packet, msg:'====Packet returned from socket - default reply logger==='}) if (process.env.UCI_ENV === 'dev')
log.debug({
packet: packet,
msg: '====Packet returned from socket - default reply logger===',
})
return packet return packet
}, },
ready: async function (packet) { ready: async function (packet) {
const event = packet.event || packet.name || packet.id const event = packet.event || packet.name || packet.id
delete(packet._header) delete packet._header
this.emit(event,packet.ready,packet) this.emit(event, packet.ready, packet)
this.emit('log', {level:'ready', msg:'change in ready state received via push', ready:packet.ready, packet:packet}) this.emit('ready', packet)
setTimeout(()=>this.emit('log', {level:'state', msg:'new ready state', state:this.ready.state}),1000) this.emit('log', {
return {cmd:'reply', msg:'ready packet event was emitted at consumer process from push'} level: 'ready',
} msg: 'change in ready state received via push',
ready: packet.ready,
packet: packet,
})
setTimeout(
() =>
this.emit('log', {
level: 'state',
msg: 'new ready state',
state: this.ready.state,
}),
1000
)
return {
cmd: 'reply',
msg: 'ready packet event was emitted at consumer process from push',
} }
},
},
} }
export { cmdProcessor, defaultCmds, namespaces } export { cmdProcessor, defaultCmds, namespaces }