971 lines
32 KiB
JavaScript
971 lines
32 KiB
JavaScript
/**
|
|
* UCI Base Module
|
|
* @module UCI-Base
|
|
* @description This is the primary module to use in the UCI ecosystem
|
|
*/
|
|
|
|
// TODO add automated duplex (for consumer will create corresponding socket and send connection info to remote socket)
|
|
|
|
// --------------- UCI dependencies -------------------
|
|
// UCI communication transport communication modules
|
|
// TODO change to dynamic import so loads only if that connector type is requested
|
|
import Socket from '@uci/socket' // tcp or named pipe
|
|
|
|
import MQTT from '@uci/mqtt' // requires broker
|
|
import WebSocket from '@uci/websocket' // server only - client is for web browser only
|
|
|
|
// UCI helpers
|
|
import { Ready, changed, map } from '@uci-utils/ready'
|
|
// import { isClass } from '@uci-utils/type'
|
|
import { bindFuncs } from '@uci-utils/bind-funcs'
|
|
// UCI logger
|
|
import logger from '@uci-utils/logger'
|
|
let log = {} // declare module wide log to be set during construction
|
|
// -------------------------------------------------
|
|
|
|
// Community dependencies
|
|
import to from 'await-to-js'
|
|
import isPlainObject from 'is-plain-object'
|
|
import loadYaml from 'load-yaml-file'
|
|
import { merge } from 'merge-anything'
|
|
// Nodejs dependencies
|
|
import EventEmitter from 'events'
|
|
|
|
// Internal dependencies
|
|
import { cmdProcessor, defaultCmds, namespaces } from './processing'
|
|
|
|
// Constants
|
|
// TODO transport will become "connector"
|
|
// type will be input or output, duplex
|
|
// TODO make sure reach registered connector has unique name
|
|
// const SOCKET_INFO_KEYS = ['name', 'type', 'transport']
|
|
// TODO remove this hard-code and get these from plugin
|
|
|
|
const TRANSLATE = {
|
|
n: 'Named Pipe',
|
|
t: 'TCP',
|
|
s: 'Socket',
|
|
c: 'Consumer',
|
|
m: 'MQTT',
|
|
w: 'Web Socket',
|
|
}
|
|
|
|
/**
|
|
* @class Base
|
|
* @description
|
|
* An inter-process inter-machine multi socket communication class. </br>
|
|
* It is extended to derive many other UCI classes thus the name "Base" </br>
|
|
* The class itself is extended from the {@link https://nodejs.org/api/events.html#events_class_eventemitter nodejs EventEmitter } to support in process communication as well
|
|
*
|
|
* @extends EventEmitter
|
|
* @param options {object} - object hash of options
|
|
* @param [options.id=ucit-base+<timestamp>] {string} and id for this process/instance used for logging
|
|
* @param {String} [options.desc] additional description for humans, can be logged.
|
|
* @param {String | Boolean} [opts.path=either full path to where socket should be created or 'true' (which uses default path]
|
|
* @param {string} [opts.sockets] comma delimited strong of sockets to be created each socket of the form [name]#[c or s]>[n,t,m or w]
|
|
* @param [opts.useRootNS=false] {boolean} - include the "root" of the created base instance as part of the namespace for a packet cmd - use not recommended
|
|
* @param {Object} [opts.(name of socket)] options for a particular socket created by opts.socket. see UCI guide {@link ?content=guide#sockets|creating sockets}
|
|
* @property {array} socket collection of all sockets created and used by base instance as per opts.sockets or addSocket method
|
|
*
|
|
* @example
|
|
* import Base from '@uci/base'
|
|
* // options object example, credated is a socket of each transport using their defaults
|
|
* // and a tcp consumer socket that connects to a tcp socket on another host 'somehost' at port 8888
|
|
* const opts = id:mybase, sockets:'us#s>n,tc#c>t,ts#s>t,mqtts#s>m,webs#s>w', tc:{host:'somehost', port:8888}}
|
|
* let mybaseprocess = new Base(opts)
|
|
*/
|
|
class Base extends EventEmitter {
|
|
constructor(opts = {}) {
|
|
if (typeof opts === 'string') opts = loadYaml.sync(opts)
|
|
super()
|
|
this.name = opts.name || opts.appName || 'a base class instance'
|
|
this.id = opts.id || 'uci-base:' + new Date().getTime()
|
|
log = logger({ name: 'base', id: this.id })
|
|
this.desc = opts.desc // additional details for humans
|
|
this.opts = opts // make original options available
|
|
this._socket = {} // holds all the various communication sockets
|
|
// these two if passed will get applied to all consumer sockets, otherwise socket defaults will be used
|
|
/** @type {string} - timeout for connecting to a consumer socket */
|
|
this.initTimeout = opts.initTimeout
|
|
this.retryWait = opts.retryWait
|
|
this.defaultReturnCmd = opts.defaultReturnCmd
|
|
this._cmdProcessors = { _default: cmdProcessor }
|
|
this.ready = new Ready({ emitter: this })
|
|
// ready packet to be sent when process is "ready"
|
|
this._readyPacket = {
|
|
cmd: 'ready',
|
|
event: `${this.name}:process`,
|
|
name: this.name,
|
|
id: this.id,
|
|
ready: false,
|
|
}
|
|
// _c and _s are the default namespaces
|
|
this._namespaces = Object.assign({}, namespaces)
|
|
this._c = Object.assign({}, defaultCmds.c)
|
|
this._s = Object.assign({}, defaultCmds.s)
|
|
// make available a method that will bind a whole object tree of functions
|
|
// Note: functions called via a command namespace are called by base connext by default
|
|
// if called directlly/locally they should be bound to base context if desired
|
|
this.bindFuncs = bindFuncs
|
|
if (opts.useRootNS) {
|
|
// add root of instance to checking for command functions - not recommended!
|
|
this._namespaces.s.splice(-1, 0, null)
|
|
this._namespaces.c.splice(-1, 0, null)
|
|
}
|
|
// this.bindFuncs = bindFuncs // make available a method that will bind a whole object tree of functions
|
|
this.initSockets = opts.sockets // sockets to be registered at init
|
|
this._socket = {} // where all sockets are stored
|
|
// at creation defined sockets:
|
|
// generic sockets
|
|
opts.sockets = Array.isArray(opts.sockets) ? opts.sockets : [opts.sockets]
|
|
if (opts.port)
|
|
opts.sockets.push({
|
|
name: `${this.name}:tcp`,
|
|
type: 's',
|
|
transport: 'tcp',
|
|
options: { port: opts.port },
|
|
})
|
|
if (opts.path)
|
|
opts.sockets.push({
|
|
name: `${this.name}:named`,
|
|
type: 's',
|
|
transport: 'named',
|
|
options: { path: opts.path },
|
|
})
|
|
// specific sockets
|
|
// if (opts.sockets) {
|
|
// let sockets = opts.sockets
|
|
// sockets = Array.isArray(sockets) ? sockets : [sockets]
|
|
// sockets.forEach((socket) => {
|
|
// socket.name = socket.name || `${this.name}:${socket.transport}`
|
|
// this.registerSocket(socket)
|
|
// })
|
|
// }
|
|
} // end constructor
|
|
|
|
/*
|
|
* --------------------------------
|
|
* CLASS METHODS
|
|
* --------------------------------
|
|
*/
|
|
|
|
// PUBLIC METHODS
|
|
|
|
/**
|
|
* initialize the instance with the set options.
|
|
* This must be called to initialize all sockets and connections
|
|
* @async
|
|
* @public
|
|
* @required
|
|
* @param {array} sockets string of one or array of names to initialize, if none, then all current added sockets will be initialized
|
|
*
|
|
*/
|
|
async init(sockets) {
|
|
|
|
// register all sockets requested in constructor
|
|
for (const socket of this.opts.sockets) {
|
|
socket.name = socket.name || `${this.name}:${socket.transport}` // check for unique name
|
|
await this.registerSocket(socket)
|
|
}
|
|
|
|
const res = await this.socketsInit(sockets)
|
|
// will update ready packet and push/send that changed packet on ready state change
|
|
// on can add more observers to the ready
|
|
this.ready.all.subscribe(async (ready) => {
|
|
this._readyPacket.ready = ready
|
|
delete this._readyPacket._header
|
|
if (!ready) {
|
|
// make a list of the failures to send
|
|
// await new Promise(res=>setTimeout(()=>res(),1000))
|
|
this._readyPacket.failures = this.ready.failed
|
|
} else delete this._readyPacket.failures
|
|
let packet = Object.assign({}, this._readyPacket)
|
|
this.emit('log', {
|
|
level: 'debug',
|
|
packet: packet,
|
|
msg: `${this.name} has an updated ready state: broadcasting: event>state = ${this._readyPacket.event}>${this._readyPacket.ready}`,
|
|
})
|
|
// console.log('ready packet to broadcast',packet)
|
|
// (re)sets the connection packet for each socket
|
|
this.getSocketsFilter({ type: 's' }).forEach(
|
|
(socket) => (this.getSocket(socket).conPackets[0] = packet)
|
|
)
|
|
// announce changed ready state
|
|
setTimeout(async () => {
|
|
this.send(packet) // to any socket that this instance is connected to
|
|
this.push(packet) // to any remote consumer connected to an instance socket
|
|
}, 100) // delay 100ms, fixes issue so won't be sent during a disconnect which causes socket write error
|
|
})
|
|
return res
|
|
}
|
|
async socketsInit(sockets) {
|
|
let results = {}
|
|
let errors = {}
|
|
|
|
// single socket intialize mapper
|
|
const initialize = async (socket) => {
|
|
return new Promise(
|
|
async function (resolve) {
|
|
try {
|
|
const value = await socket.init()
|
|
this.emit('log', {
|
|
level: 'info',
|
|
socketName: socket.name,
|
|
msg: 'socket successfully initialized',
|
|
message: value,
|
|
})
|
|
resolve(value)
|
|
} catch (error) {
|
|
this.emit('log', {
|
|
level: 'fatal',
|
|
socketName: socket.name,
|
|
msg: 'socket init error',
|
|
error: error,
|
|
}) // emit an error here, remove socket
|
|
// let res = await this.removeSocket(socket.name)
|
|
errors[socket.name] = { error: error }
|
|
resolve(error)
|
|
}
|
|
}.bind(this)
|
|
)
|
|
}
|
|
|
|
let inits = []
|
|
if (!sockets) {
|
|
sockets = Object.keys(this._socket).filter((name) => {
|
|
return !this._socket[name].active // only intialize (connect) inactive sockets
|
|
})
|
|
}
|
|
if (typeof sockets === 'string') sockets = [sockets]
|
|
|
|
sockets.forEach((name) => {
|
|
if (this._socket[name]) {
|
|
inits.push({ name: name, init: this.getSocketInit(name) })
|
|
} else
|
|
log.warn({
|
|
msg: `no socket registered by name of ${name} to initialize`,
|
|
})
|
|
})
|
|
let [err] = await to(Promise.all(inits.map(initialize)))
|
|
if (err) {
|
|
this.emit('log', {
|
|
level: 'fatal',
|
|
msg: 'initialize of socket errors was NOT caught --- bad bad',
|
|
error: err,
|
|
})
|
|
return { errors: [err] }
|
|
}
|
|
if (Object.keys(errors).length === 0) errors = false
|
|
return { results: results, errors: errors }
|
|
}
|
|
|
|
// support old name for now
|
|
// async addSocket(name, type, transport, options) {
|
|
// return this.registerSocket(name, type, transport, options);
|
|
// }
|
|
|
|
// TODO use plugins for transports instead
|
|
/**
|
|
* registerSocket - register a socket for use
|
|
* This is not async and will NOT initialize the socket, that must be done with a call to init or socketInit
|
|
* @param {type} name Name of socket (usually something short but unique)
|
|
* @param {string} [type=c] consumer/client 'c' or socket/server 's'
|
|
* @param {string} [transport=n] transport: (n) named pipe/ unix socket, (t) tcp socket, (m) mqtt subscribe, (w) websocket
|
|
* @param {object} [options={}] options for that particular type/transport of socket (i.e. path, host, port, etc)
|
|
*
|
|
* @returns {function} if called before base initialzation it can be ignored as all added sockets will be initialized. After through it be called to initialize that socket
|
|
*/
|
|
async registerSocket(name, type = 'c', transport, options = {}) {
|
|
if (isPlainObject(name))
|
|
({ name, type = 'c', transport, options = {} } = name)
|
|
if (typeof name !== 'string') return null
|
|
transport = transport || (options.port ? 'tcp' : 'named')
|
|
transport = this._validateTransport(transport)
|
|
if (!transport) {
|
|
log.error({
|
|
socketName: name,
|
|
type: type,
|
|
transport: transport,
|
|
options: options,
|
|
method: 'registerSocket',
|
|
msg: `invalid transport ${transport}`,
|
|
})
|
|
return null
|
|
}
|
|
log.info({
|
|
socketName: name,
|
|
type: type,
|
|
tranport: transport,
|
|
options: options,
|
|
method: 'registerSocket',
|
|
line: 198,
|
|
msg: `registering socket ${name}`,
|
|
})
|
|
options.id = options.id || name
|
|
options.name = options.name || name
|
|
// TODO add a 'd' type for duplex which creates an 's' first and waits on connect to make a 'c'
|
|
if (type === 'c')
|
|
options = Object.assign(
|
|
{
|
|
initTimeout: this.initTimeout,
|
|
retryWait: this.retryWait,
|
|
},
|
|
options
|
|
) // outbound
|
|
if (type === 's') {
|
|
let conPackets = [] // [this._readyPacket]
|
|
conPackets = options.conPackets
|
|
? conPackets.concat(options.conPackets)
|
|
: conPackets
|
|
conPackets = options.conPacket
|
|
? conPackets.push(options.conPacket)
|
|
: conPackets
|
|
options = Object.assign(
|
|
{
|
|
defaultReturnCmd: this.defaultReturnCmd,
|
|
conPackets: conPackets,
|
|
},
|
|
options
|
|
) // inbound
|
|
}
|
|
// TODO get rid of hard coded transports and use registered transports (t and n being default)
|
|
// TODO dynamically import only requested connector plugins
|
|
// Base will pass options to plugin
|
|
// plugin will pass a unique transport identifier to base
|
|
// all plugins will use a function to generate a class instance
|
|
switch (transport) {
|
|
case 'n':
|
|
options.path = options.path || true
|
|
// falls through
|
|
case 't':
|
|
this._socket[name] = new Socket[TRANSLATE[type]](options)
|
|
break
|
|
case 'm':
|
|
if (type === 'p') type = 'c'
|
|
options.client = type === 'c' ? true : false
|
|
options.connect = options.connect || {}
|
|
if (options.host) options.connect.host = options.host
|
|
if (options.port) options.connect.port = options.port
|
|
options.connect.connectTimeout = options.connect.connectTimeout || 5000
|
|
this._socket[name] = new MQTT(options)
|
|
break
|
|
case 'w':
|
|
if (type === 's') this._socket[name] = await WebSocket(options)
|
|
}
|
|
|
|
if (this._socket[name]) {
|
|
// in case of invalid transport
|
|
|
|
this._socket[name].name = name
|
|
this._socket[name].type = type
|
|
this._socket[name].transport = transport
|
|
this._socket[name]._packetProcess = this._packetProcess.bind(this, name)
|
|
|
|
// bubble up events from inidivual sockets to base instance,
|
|
// connection:consumer is a socket emitting when a consumer is connecting
|
|
// connection:socket is a consumer emiting when connecting to a socket
|
|
const EVENTS = [
|
|
'log',
|
|
'socket',
|
|
'connection',
|
|
'connection:consumer',
|
|
'connection:socket',
|
|
] // that should emit up from each socket to instance
|
|
EVENTS.forEach((event) => {
|
|
this._socket[name].on(event, (obj) => {
|
|
if (Object.prototype.toString.call(obj) !== '[object Object]') {
|
|
let data = obj
|
|
obj = {}
|
|
obj.data = data
|
|
}
|
|
obj.socketName = name
|
|
this.emit(event, obj)
|
|
})
|
|
})
|
|
|
|
if (type === 'c') {
|
|
// when consumer has sucessfully connected to a socket
|
|
this._socket[name].obsName = `${name}:${
|
|
options.path ? options.path : `${options.host}:${options.port}`
|
|
}<outbound>`
|
|
this.ready.addObserver(this._socket[name].obsName, this._socket[name], {
|
|
event: 'connection:socket',
|
|
condition: (ev) => ev.state === 'connected',
|
|
})
|
|
// set up listner for any pushed packets and emit locally
|
|
this._socket[name].on('pushed', (packet) => {
|
|
packet._header.socketName = name
|
|
this.emit('pushed', packet)
|
|
})
|
|
}
|
|
|
|
if (type === 's') {
|
|
// when socket is listnening
|
|
this.ready.addObserver(
|
|
`${name}:socket<listening>`,
|
|
this._socket[name],
|
|
{
|
|
event: 'socket',
|
|
condition: (ev) => (ev || {}).state === 'listening',
|
|
}
|
|
)
|
|
// initially set conPackets, ready packets is ALWAYS the first
|
|
this._socket[name].conPackets.unshift(this._readyPacket)
|
|
if (options.duplex)
|
|
this.consumerConnected(this._socket[name], {
|
|
consumer: options.duplex,
|
|
add: true,
|
|
})
|
|
}
|
|
return this._socket[name] // return handle to newly registered socket
|
|
// return this.getSocketInit(name) // returns the init function (e.g. connect or create) for the socket
|
|
}
|
|
}
|
|
|
|
/**
|
|
* removeSocket
|
|
*
|
|
* @param {string} name name of socket as created
|
|
* @returns {String | Object } success string or error object
|
|
*/
|
|
|
|
async removeSocket(name) {
|
|
// NOTE: uci consumers have .end renamed as .close to match socket method for convenience
|
|
if (typeof name !== 'string')
|
|
return 'no socket name passed, nothing to remove'
|
|
const socket = this.getSocket(name)
|
|
if (!socket) return 'no socket by that name, nothing to remove'
|
|
let closeError
|
|
if (typeof socket.close !== 'function')
|
|
return 'bad socket no close function, nothing to remove'
|
|
let [err] = await to(socket.close())
|
|
if (err)
|
|
if (err.code !== 'ERR_SERVER_NOT_RUNNING') {
|
|
closeError = {
|
|
socket: this._socket[name].name,
|
|
error: err,
|
|
msg: 'socket/consumer closed with errors, but removed',
|
|
}
|
|
}
|
|
this.emit('log', {
|
|
level: 'warn',
|
|
msg: `socket ${name} has been removed`,
|
|
socket: this._socket[name].opts,
|
|
})
|
|
socket.removeAllListeners()
|
|
this.ready.removeObserver(
|
|
socket.type === 'c'
|
|
? this._socket[name].obsName
|
|
: `${name}:socket<listening>`
|
|
)
|
|
delete this._socket[name]
|
|
return closeError ? closeError : 'success'
|
|
}
|
|
|
|
getSocket(name) {
|
|
if (name) return this._socket[name] || null
|
|
else return this._socket
|
|
}
|
|
// returns array of names of sockets that pass filter
|
|
getSocketsFilter({ type, trans, active } = {}) {
|
|
if (trans) trans = this._validateTransport(trans)
|
|
let filtered = []
|
|
Object.keys(this._socket).forEach((name) => {
|
|
if (
|
|
(type == null || this._socket[name].type === type) &&
|
|
(trans == null || this._socket[name].transport === trans) &&
|
|
(active == null || this._socket[name].active === active)
|
|
)
|
|
filtered.push(name)
|
|
})
|
|
return filtered
|
|
}
|
|
|
|
getConsumers(filter = {}) {
|
|
filter.type = 'c'
|
|
return this.getSocketsFilter(filter)
|
|
}
|
|
|
|
getSocketInit(name) {
|
|
let socket = this._socket[name]
|
|
if (!socket) {
|
|
log.warn({
|
|
msg: `can't fetch create/connect function, no socket registered by name of ${name}`,
|
|
})
|
|
return null
|
|
}
|
|
if (
|
|
this._socket[name].type === 's' &&
|
|
this._socket[name].transport !== 'm'
|
|
) {
|
|
return socket.create
|
|
} else {
|
|
return socket.connect
|
|
}
|
|
}
|
|
|
|
/**
|
|
* send - Description
|
|
*
|
|
* @param {String} name name of socket for send (must be a consumer otherwise use push for server)
|
|
* @param {Object} packet
|
|
*
|
|
* @returns {type} Description
|
|
*/
|
|
async send(name, packet) {
|
|
let names = []
|
|
if (typeof name !== 'string') {
|
|
packet = name
|
|
names = this.getConsumers()
|
|
} else {
|
|
const con = this.getSocket(name)
|
|
if (!con) return `no consumer ${name} for sending`
|
|
names = [name]
|
|
}
|
|
if (!packet || !Object.keys(packet).length)
|
|
return 'no packet to send - aborted'
|
|
let sends = []
|
|
if (!names.length) return 'no consumers available for send, aborting'
|
|
for (let name of names) {
|
|
const consumer = this.getSocket(name)
|
|
let hookedPacket = {}
|
|
hookedPacket = consumer.beforeSend
|
|
? await consumer.beforeSend.call(this, Object.assign({}, packet))
|
|
: packet
|
|
log.debug({
|
|
msg: 'after hook, sending packet',
|
|
name: consumer.name,
|
|
packet: hookedPacket,
|
|
method: 'send',
|
|
})
|
|
sends.push(consumer.send.bind(consumer, hookedPacket))
|
|
}
|
|
if (sends.length === 1) return await sends[0]()
|
|
return Promise.all(
|
|
sends.map((send) => {
|
|
return send()
|
|
})
|
|
)
|
|
}
|
|
|
|
async push(packet, opts = {}) {
|
|
if (!packet || !Object.keys(packet).length)
|
|
return Promise.resolve('no packet to push - aborted')
|
|
let sockets = this.getSocketsFilter({ type: 's' })
|
|
if (!sockets.length) return Promise.resolve('no sockets on which to push')
|
|
opts.sockets = opts.sockets
|
|
? opts.sockets
|
|
: opts.socket
|
|
? [opts.socket]
|
|
: []
|
|
if (opts.sockets.length)
|
|
sockets = sockets.filter((name) => opts.sockets.includes(name))
|
|
sockets = sockets
|
|
.map((name) => this.getSocket(name))
|
|
.filter((sock) =>
|
|
opts.transport && opts.transport !== 'all'
|
|
? sock.transport === this._validateTransport(opts.transport)
|
|
: true
|
|
)
|
|
// console.log(sockets.length, 'sockets for push', sockets.map(socket=>socket.name))
|
|
if (!sockets.length) return Promise.resolve('no sockets on which to push')
|
|
let broadcast = []
|
|
// TODO use map and reflect
|
|
for (let socket of sockets) {
|
|
let hookedPacket = {}
|
|
hookedPacket = socket.beforeSend
|
|
? await socket.beforeSend.call(this, Object.assign({}, packet), true)
|
|
: packet
|
|
broadcast.push(socket.push.bind(socket, hookedPacket, opts))
|
|
}
|
|
return Promise.all(
|
|
broadcast.map((push) => {
|
|
return push()
|
|
})
|
|
)
|
|
}
|
|
// TODO accept alt transport string i.e. t or TCP
|
|
async sendTransport(packet, transport) {
|
|
let sends = []
|
|
for (let name of Object.keys(this._socket)) {
|
|
if (this._socket[name].type === 'c') {
|
|
if (this._socket[name].transport === transport) {
|
|
sends.push(this._socket[name].send.bind(this._socket[name]))
|
|
}
|
|
}
|
|
}
|
|
if (sends.length === 1) return sends[0](packet)
|
|
return Promise.all(
|
|
sends.map((send) => {
|
|
return send(packet)
|
|
})
|
|
)
|
|
}
|
|
|
|
// async sendMQTT(topic, packet) {return this._socket.mqtt.send(topic, packet)}
|
|
async sendTCP(packet) {
|
|
return this.sendTransport(packet, 't')
|
|
}
|
|
|
|
// TODO change this to PIPE
|
|
async sendIPC(packet) {
|
|
return this.sendTransport(packet, 'n')
|
|
}
|
|
|
|
// TODO add sendMQTT, sendWS
|
|
|
|
socketsListen(event, fn) {
|
|
this._eventListen('s', event, fn)
|
|
}
|
|
consumersListen(event, fn) {
|
|
this._eventListen('c', event, fn)
|
|
}
|
|
|
|
getPacketByName(name, packets) {
|
|
if (!packets.length) packets = [packets]
|
|
let found = {}
|
|
packets.some((packet, index, packets) => {
|
|
if (packet._header.sender.name === name) {
|
|
found = packets[index]
|
|
return true
|
|
}
|
|
})
|
|
return found
|
|
}
|
|
|
|
// add set of functions to class prop/space and then register with this
|
|
addNamespace(space, type, trans) {
|
|
if (type !== 'c' && type !== 's') {
|
|
trans = type
|
|
type = 's'
|
|
}
|
|
trans = this._validateTransport(trans)
|
|
if (trans) return this._namespaces[type + trans].unshift(space)
|
|
else return this._namespaces[type].unshift(space)
|
|
}
|
|
|
|
// object of functions, key is cmd name
|
|
amendCommands(funcs, trans, type) {
|
|
if (!trans && !type) type = 's'
|
|
if (trans === 'c' || trans === 's') {
|
|
type = trans
|
|
trans = ''
|
|
}
|
|
trans = this._validateTransport(trans)
|
|
if (!this['_' + type + trans]) this['_' + type + trans] = {}
|
|
Object.assign(this['_' + type + trans], funcs) // trans is type here
|
|
log.debug({
|
|
msg: 'amended namespace',
|
|
id: this.id,
|
|
default_key: '_' + type + trans,
|
|
functions: this['_' + type + trans],
|
|
})
|
|
}
|
|
|
|
amendConsumerCommands(funcs, trans) {
|
|
this.amendCommands(funcs, trans, 'c')
|
|
}
|
|
|
|
amendSocketCommands(funcs, trans) {
|
|
this.amendCommands(funcs, trans)
|
|
}
|
|
|
|
// func should take and return a packet. if type
|
|
beforeSendHook(func, opts) {
|
|
this._packetHook('beforeSend', func, opts)
|
|
}
|
|
|
|
beforeProcessHook(func, opts) {
|
|
this._packetHook('beforeProcess', func, opts)
|
|
}
|
|
|
|
afterProcessHook(func, opts) {
|
|
this._packetHook('afterProcess', func, opts)
|
|
}
|
|
|
|
// A Big Hammer - use only if necessary - default with hooks should suffice
|
|
// these three will pre-empt default processor to be called in ._packetProcess
|
|
|
|
// add and override default processor for ALL consumers, i.e. packets returning form a send or arrived from a push
|
|
consumersProcessor(func) {
|
|
for (let name of Object.keys(this._socket)) {
|
|
if (this._socket[name].type === 'c') {
|
|
this.altProcessor(func, name)
|
|
}
|
|
}
|
|
}
|
|
|
|
// add and override default processor for ALL sockets, i.e. packets coming in from consumer send
|
|
socketsProcessor(func) {
|
|
for (let name of Object.keys(this._socket)) {
|
|
if (this._socket[name].type === 's') {
|
|
this.altProcessor(func, name)
|
|
}
|
|
}
|
|
}
|
|
// add and override a processor for a particular socket/consumer to list of processors
|
|
// if no socket name given it will replace the default processor in _processors from processing.js
|
|
altProcessor(func, socket_name) {
|
|
socket_name = socket_name || '_default'
|
|
this._cmdProcessors[socket_name] = func
|
|
}
|
|
|
|
// a call to this method will (make or add) return and or subscribe a ready observer for incoming consumer connections
|
|
consumerConnected(socket, opts = {}) {
|
|
let { subscribe, consumer, name, add } = opts
|
|
|
|
const conditionHandler = async (ev) => {
|
|
if ((ev || {}).state === 'connected') {
|
|
let data = ev.data || {}
|
|
if (consumer) {
|
|
// specific consumer check
|
|
if (
|
|
data.name === consumer ||
|
|
[ev.name, ev.id, data.name, data.id].some((name) =>
|
|
(name || '').includes(consumer)
|
|
)
|
|
)
|
|
return true
|
|
} else return true
|
|
}
|
|
return false
|
|
}
|
|
if (typeof socket === 'string') socket = this.getSocket(socket)
|
|
|
|
name = name || consumer
|
|
add = add && consumer
|
|
const options = {
|
|
event: 'connection:consumer',
|
|
condition: conditionHandler,
|
|
}
|
|
let oname = `${name}:consumer>${socket.name}:socket<inbound>`
|
|
const obs = add
|
|
? this.ready.addObserver(oname, socket, options)
|
|
: this.ready.makeObserver(socket, options)
|
|
if (typeof subscribe === 'function') return obs.subscribe(subscribe)
|
|
return obs
|
|
} // end consumerConnected
|
|
|
|
//=============PRIVATE METHODS =========================================
|
|
/*
|
|
*
|
|
* Assigns a Hook Function to a Socket, Type or Transport
|
|
*
|
|
*/
|
|
// options allow applying hook function to specific socket or type or transport, default is all type 's' sockets
|
|
_packetHook(hook, func, opts) {
|
|
log.debug({
|
|
msg: 'hooking a socket(s)',
|
|
method: '_packetHook',
|
|
line: 334,
|
|
hook: hook,
|
|
function: func,
|
|
options: opts,
|
|
})
|
|
let { name, type, trans, all } = opts
|
|
if (opts == null) type = 's' // default is all type 's' sockets
|
|
if (name) this._socket[name][hook] = func
|
|
else {
|
|
log.debug({
|
|
msg: 'sockets available to hook',
|
|
method: '_packetHook',
|
|
line: 338,
|
|
sockets: Object.keys(this._socket),
|
|
})
|
|
for (let name of Object.keys(this._socket)) {
|
|
if (this._socket[name].type === type) this._socket[name][hook] = func
|
|
if (this._socket[name].transport === trans)
|
|
this._socket[name][hook] = func
|
|
if (all) this._socket[name][hook] = func
|
|
if (this._socket[name][hook])
|
|
log.debug({
|
|
msg: 'hooked socket',
|
|
method: '_packetHook',
|
|
line: 343,
|
|
name: name,
|
|
type: this._socket[name].type,
|
|
trans: this._socket[name].transport,
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
/*
|
|
********** main packet processor for all sockets
|
|
* supports per socket before and after hook processors
|
|
* supports additonal registered processors called via packet or socket name, with default processor,
|
|
*/
|
|
async _packetProcess(socket_name, packet) {
|
|
if (!packet || !Object.keys(packet).length)
|
|
packet = { error: 'no packet to process' }
|
|
if (!socket_name || !this.getSocket(socket_name))
|
|
packet.error = 'no socket name passed for packet processing'
|
|
if (!this.getSocket(socket_name))
|
|
packet.error = `socket by name of ${socket_name}`
|
|
if (packet.error) {
|
|
this.emit('log', {
|
|
level: 'error',
|
|
error: packet.error,
|
|
packet: packet,
|
|
msg: 'an error occured before processing an incoming packet',
|
|
})
|
|
return packet // don't process a packet with an error
|
|
}
|
|
// TODO allow adding to or altering the process map
|
|
let processors = new Map([
|
|
['before', this.getSocket(socket_name).beforeProcess],
|
|
[
|
|
'command',
|
|
this._cmdProcessors[
|
|
packet.cmdProcessor || this._cmdProcessors[socket_name]
|
|
? socket_name
|
|
: '_default'
|
|
],
|
|
],
|
|
['after', this.getSocket(socket_name).afterProcess],
|
|
])
|
|
|
|
let err
|
|
for (let [name, func] of processors) {
|
|
// the same as of recipeMap.entries()
|
|
[err, packet] = await to(this._process(socket_name, packet, name, func))
|
|
if (err) packet.error = err
|
|
}
|
|
return packet
|
|
}
|
|
|
|
async _process(socket_name, packet, name, func) {
|
|
if (packet.error) return packet // if an error occurs skip any further processing
|
|
let err, res
|
|
if (func) {
|
|
[err, res] = await to(func.call(this, packet, socket_name))
|
|
if (err) {
|
|
// forced an abort to processing
|
|
packet.error = err
|
|
} else {
|
|
if (!isPlainObject(res))
|
|
packet.processResult
|
|
? (packet.processResult[name] = res)
|
|
: (packet.processResult = { [name]: res })
|
|
else {
|
|
let method =
|
|
(packet.processMethod || {})[name] || packet.processMethod
|
|
// TODO could support other specialized merge methods
|
|
if (method === 'merge') {
|
|
packet = merge(packet, res)
|
|
} else {
|
|
packet = res
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return packet
|
|
}
|
|
|
|
// all sockets are emitters. Adds a listener to all sockets of a type with given event.
|
|
// now sockets can emit locally processed events
|
|
_eventListen(type, event, fn) {
|
|
for (let name of Object.keys(this._socket)) {
|
|
if (this._socket[name].type === type) {
|
|
if (fn === 'stop') this._socket[name].removeAllListeners(event)
|
|
else {
|
|
log.debug({
|
|
socket: name,
|
|
type: type,
|
|
event: event,
|
|
msg: 'adding listener to socket',
|
|
})
|
|
this._socket[name].on(event, fn)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
_validateTransport(trans, type = 's') {
|
|
const valids = {
|
|
w: 'w',
|
|
web: 'w',
|
|
n: 'n',
|
|
named: 'n',
|
|
unix: 'n',
|
|
pipe: 'n',
|
|
t: 't',
|
|
tcp: 't',
|
|
net: 't',
|
|
network: 't',
|
|
m: 'm',
|
|
mqtt: 'm',
|
|
}
|
|
trans = valids[trans] || null
|
|
if (type !== 's' && trans === 'w') {
|
|
log.warn({
|
|
type: type,
|
|
transport: trans,
|
|
msg:
|
|
'Invalid type/transport - Consumer/Client Web Socket not supported use TCP',
|
|
})
|
|
trans = null
|
|
}
|
|
return trans
|
|
}
|
|
|
|
_transport(name) {
|
|
return this._socket[name].transport
|
|
} //getter for socket transport
|
|
_type(name) {
|
|
return this._socket[name].type
|
|
} //getter for socket type
|
|
|
|
_getTransportNamespaces(socket) {
|
|
return this._namespace[this._type(socket) + this._transport(socket)]
|
|
}
|
|
|
|
_getCmdFuncNamespace(cmd, namespaces) {
|
|
let cmd_func = null
|
|
namespaces.some((namespace) => {
|
|
namespace = namespace ? namespace + '.' + cmd : cmd
|
|
cmd_func = this._getCmdFunc(namespace)
|
|
if (cmd_func) return true
|
|
})
|
|
return cmd_func
|
|
}
|
|
|
|
// takes command and returns corresponding function in a hash, recurisve walk
|
|
_getCmdFunc(cmd, obj) {
|
|
if (typeof cmd === 'string') {
|
|
if (!obj) obj = this
|
|
cmd = cmd.split(/[.:/]+/)
|
|
}
|
|
var prop = cmd.shift()
|
|
if (cmd.length === 0) return obj[prop]
|
|
if (!obj[prop]) return null
|
|
log.debug({
|
|
length: cmd.length,
|
|
cmd: cmd,
|
|
prop: prop,
|
|
objprop: obj[prop],
|
|
method: '_getCmdFunc',
|
|
msg: 'command to corresponding function in a hash',
|
|
})
|
|
return this._getCmdFunc(cmd, obj[prop])
|
|
}
|
|
|
|
// primary function to find a function to call based on packet cmd
|
|
async _callCmdFunc(packet, socket) {
|
|
let cmd_func = this._getCmdFuncNamespace(
|
|
packet.cmd,
|
|
this._namespaces[this._type(socket) + this._transport(socket)]
|
|
)
|
|
if (cmd_func) return await cmd_func.call(this, packet) // todo try .call
|
|
cmd_func = this._getCmdFuncNamespace(
|
|
packet.cmd,
|
|
this._namespaces[this._type(socket)]
|
|
)
|
|
if (cmd_func) return await cmd_func.call(this, packet)
|
|
return 'failed'
|
|
}
|
|
} // end Base Class
|
|
|
|
export default Base
|
|
export { Base, Ready, map, changed, isPlainObject, to, merge, loadYaml } // todo share rxjs
|