566 lines
19 KiB
JavaScript
566 lines
19 KiB
JavaScript
// Direct External Dependencies
|
|
// none
|
|
|
|
// UCI dependencies
|
|
|
|
// UCI communication transport communication modules
|
|
// TODO change to dynamic import so loads only if that socket type is requestd
|
|
import Socket from '@uci/socket' // tcp or named pipe
|
|
import MQTT from '@uci/mqtt' // requires broker
|
|
import WebSocket from '@uci/websocket' // server only - client is for web browser only
|
|
// UCI helpers
|
|
import { bindFuncs } from '@uci-utils/bind-funcs'
|
|
// UCI logger
|
|
import logger from '@uci-utils/logger'
|
|
let log = {} // declare module wide log to be set during construction
|
|
|
|
// Community dependencies
|
|
import to from 'await-to-js'
|
|
import EventEmitter from 'events'
|
|
// import pSettle from 'p-settle'
|
|
// import pReflect from 'p-reflect'
|
|
|
|
// Internal dependencies
|
|
import { processor, defaultCmds, namespaces } from './processing'
|
|
|
|
// Useful Constants
|
|
const SOCKET_INFO_KEYS = ['name', 'type', 'transport']
|
|
const TRANSLATE = {
|
|
n: 'Named Pipe',
|
|
t: 'TCP',
|
|
s: 'Socket',
|
|
c: 'Consumer',
|
|
m: 'MQTT',
|
|
w: 'Web Socket'
|
|
}
|
|
|
|
/**
|
|
* @class Base
|
|
* @description
|
|
* An inter-process inter-machine multi socket communication class. </br>
|
|
* It is extended to derive many other UCI classes thus the name "Base" </br>
|
|
* The class itself is extended from the {@link https://nodejs.org/api/events.html#events_class_eventemitter nodejs EventEmitter } to support in process communication as well
|
|
*
|
|
* @extends EventEmitter
|
|
* @param {Object} opts hash of options
|
|
* @param {String} [opts.id='ucit-base'+timestamp] and id for this process/instance used for logging, default: uci-base + timestamp
|
|
* @param {String} [opts.desc] additional description for humans, can be logged.
|
|
* @param {String | Boolean} [opts.path=either full path to where socket should be created or 'true' (which uses default path]
|
|
* @param {string} [opts.sockets] comma delimited strong of sockets to be created each socket of the form [name]#[c or s]>[n,t,m or w]
|
|
* @param {Boolean} [opts.useRootNS=false] include the "root" of the created base instance as part of the namespace for which packet cmd's try to call corresponding function
|
|
* @param {Object} [opts.(name of socket)] options for a particular socket created by opts.socket. see UCI guide {@link ?content=guide#sockets|creating sockets}
|
|
* @property {array} socket collection of all sockets created and used by base instance as per opts.sockets or addSocket method
|
|
*
|
|
* @example
|
|
* import Base from '@uci/base'
|
|
* // options object example, credated is a socket of each transport using their defaults
|
|
* // and a tcp consumer socket that connects to a tcp socket on another host 'somehost' at port 8888
|
|
* const opts = id:mybase, sockets:'us#s>n,tc#c>t,ts#s>t,mqtts#s>m,webs#s>w', tc:{host:'somehost', port:8888}}
|
|
* let mybaseprocess = new Base(opts)
|
|
*/
|
|
class Base extends EventEmitter {
|
|
constructor(opts = {}) {
|
|
super()
|
|
this.id = opts.id || opts.name || 'uci-base:' + new Date().getTime()
|
|
log = logger({ name: 'base', id: this.id })
|
|
this.desc = opts.desc // additional details for humans
|
|
this._socket = {} // holds all the various communication sockets
|
|
this._started = false // flag to know when instance has been initialized
|
|
this._processors = { _default: processor }
|
|
// _c and _s are the default namespaces
|
|
this._namespaces = namespaces
|
|
this._c = defaultCmds.c
|
|
this._s = defaultCmds.s
|
|
if (opts.useRootNS) {
|
|
// add root of instance to checking for command functions - not recommended!
|
|
this._namespaces.s.splice(-1, 0, null)
|
|
this._namespaces.c.splice(-1, 0, null)
|
|
}
|
|
// method that will bind a whole object tree of functions
|
|
this.bindFuncs = bindFuncs
|
|
// predefined sockets:
|
|
// comma delimited list of this form '<name>#<c/p/s>><n=np/t=tcp/m=mqtt/w=web>'
|
|
this._socket = {}
|
|
if (opts.sockets) {
|
|
opts.sockets.split(/[,|\s]+/).forEach(socketStr => {
|
|
let socket = {}
|
|
socketStr.split(/[>#]+/).map(function(prop, index) {
|
|
socket[SOCKET_INFO_KEYS[index]] = prop
|
|
})
|
|
this.addSocket(
|
|
socket.name,
|
|
socket.type,
|
|
socket.transport,
|
|
opts[socket.name]
|
|
)
|
|
})
|
|
}
|
|
} // end constructor
|
|
|
|
/*
|
|
* --------------------------------
|
|
* CLASS METHODS
|
|
* --------------------------------
|
|
*/
|
|
|
|
// PUBLIC METHODS
|
|
|
|
/**
|
|
* initialize the instance with the set options. This must be called to initialize all sockets and connections
|
|
* @async
|
|
* @public
|
|
* @required
|
|
*/
|
|
|
|
async init() {
|
|
|
|
let results = {}
|
|
let errors = {}
|
|
|
|
const pReflect = async socket => {
|
|
try {
|
|
const value = await socket.init()
|
|
results[socket.name] = value
|
|
} catch (error) {
|
|
this.emit('error',{msg:'socket init error',error:error})// emit an error here, remove socket
|
|
let res = await this.removeSocket(socket.name)
|
|
errors[socket.name]={error:error, remove:res}
|
|
}
|
|
}
|
|
|
|
// console.log('in init', this._socket)
|
|
// for (let name in this._socket) {
|
|
// let socket = this._initSocket(name)
|
|
// console.log(socket)
|
|
// let [err,res] = await to(socket.init())
|
|
// if (err) errors[name] = err
|
|
// results[name] = res
|
|
// }
|
|
// this._started = true
|
|
// return {results:results, errors:errors}
|
|
|
|
let sockets = []
|
|
for (let name of Object.keys(this._socket)) {
|
|
sockets.push(this._initSocket(name))
|
|
}
|
|
await Promise.all(sockets.map(pReflect))
|
|
|
|
if(Object.keys(errors).length===0) errors=false
|
|
this._started = true
|
|
return {results:results, errors:errors}
|
|
}
|
|
|
|
/**
|
|
* addSocket - Add a socket at runtime as opposed to via the sockets option at creation
|
|
*
|
|
* @param {type} name Name of socket (usually something short but unique)
|
|
* @param {string} [type=c] consumer/client 'c' or socket/server 's'
|
|
* @param {string} [transport=n] transport: (n) named pipe/ unix socket, (t) tcp socket, (m) mqtt subscribe, (w) websocket
|
|
* @param {object} [options={}] options for that particular type/transport of socket (i.e. path, host, port, etc)
|
|
*
|
|
* @returns {string} Description
|
|
*/
|
|
async addSocket(name, type = 'c', transport = 'n', options = {}) {
|
|
log.debug({ socketName: name, type: type, tranport: transport, options: options, method:'addSocket', line:147, msg:`adding socket ${name}`})
|
|
options.id = this.id + ':' + name
|
|
switch (transport) {
|
|
case 'n':
|
|
options.path = options.path || true
|
|
// falls through
|
|
case 't':
|
|
this._socket[name] = new Socket[TRANSLATE[type]](options)
|
|
break
|
|
case 'm':
|
|
if (type === 'p') type = 'c'
|
|
options.connect = options.connect || {}
|
|
if (options.host) options.connect.host = options.host
|
|
if (options.port) options.connect.port = options.port
|
|
options.connect.connectTimeout = options.connect.connectTimeout || 5000
|
|
this._socket[name] = new MQTT(options)
|
|
break
|
|
case 'w':
|
|
if (type === 's') this._socket[name] = new WebSocket(options)
|
|
else
|
|
log.warn({ name: name, type: type, transport: transport, method:'addSocket', line:167, msg:'Web socket not created Consumer/Client Web Socket not supported'})
|
|
}
|
|
this._socket[name].name = name
|
|
this._socket[name].type = type
|
|
this._socket[name].transport = transport
|
|
this._socket[name]._packetProcess = this._packetProcess.bind(this, name)
|
|
|
|
let bubble = (msg) => {
|
|
console.log(msg,name,this._socket[name].name)
|
|
this._socket[name].on(msg, ev => {
|
|
ev.socketName=name
|
|
this.emit(msg, ev)
|
|
})
|
|
}
|
|
|
|
const msgs = ['error','warn','fatal']
|
|
msgs.map(bubble) // bubble up any emitted errors
|
|
|
|
|
|
// do this as .then promise then addSocket doesn't need to be async before init
|
|
if (this._started) return await this._initSocket(name)
|
|
else return `socket ${name} added`
|
|
}
|
|
|
|
/**
|
|
* removeSocket - TODO not available
|
|
*
|
|
* @param {string} name name of socket as created
|
|
* @returns {String | Object } success string or error object
|
|
*/
|
|
async removeSocket(name) {
|
|
// NOTE: uci consumers have .end renamed as .close to match socket method for convenience
|
|
let closeError
|
|
let [err] = await to(this._socket[name].close())
|
|
if (err.code !== 'ERR_SERVER_NOT_RUNNING') {
|
|
closeError = {socket:this._socket[name].name, error:err, msg:'socket/consumer closed with errors, but removed'}
|
|
}
|
|
this.emit('warn', {msg:`socket ${name} has been removed`, socket:this._socket[name].opts})
|
|
delete this._socket[name]
|
|
return closeError ? closeError : 'success'
|
|
}
|
|
|
|
getSocket(name) {
|
|
if (name) return this._socket[name]
|
|
else return this._socket
|
|
}
|
|
|
|
/**
|
|
* send - Description
|
|
*
|
|
* @param {String} name name of socket for send (must be a consumer otherwise use push for server)
|
|
* @param {Object} packet
|
|
*
|
|
* @returns {type} Description
|
|
*/
|
|
async send(name, packet) {
|
|
if (typeof name !== 'string') {
|
|
packet = name
|
|
let sends = []
|
|
for (let name of Object.keys(this._socket)) {
|
|
if (this._socket[name].type === 'c') {
|
|
let hookedPacket = {}
|
|
hookedPacket = this._socket[name].beforeSend ? await this._socket[name].beforeSend.call(this,Object.assign({},packet)) : packet
|
|
log.debug({msg:'after possible hook packet to send', name:name, packet:hookedPacket, method:'send', line:217})
|
|
sends.push(this._socket[name].send.bind(this._socket[name],hookedPacket))
|
|
}
|
|
}
|
|
if (sends.length === 1) return await sends[0]()
|
|
return Promise.all(
|
|
sends.map(send => {
|
|
return send()
|
|
})
|
|
)
|
|
} else {
|
|
if (this._socket[name]) {
|
|
if (this._socket[name].beforeSend) packet = await this._socket[name].beforeSend.call(this,packet)
|
|
log.debug({msg:'single socket hooked packet to send', name:name, packet:packet, method:'send', line:230})
|
|
return await this._socket[name].send(packet)
|
|
}
|
|
else return { error: `no consumer socket of name ${name}` }
|
|
}
|
|
}
|
|
|
|
async push(packet) {
|
|
// TODO set like send to accept a name
|
|
let broadcast = []
|
|
for (let name of Object.keys(this._socket)) {
|
|
if (this._socket[name].type === 's') {
|
|
let hookedPacket = {}
|
|
hookedPacket = this._socket[name].beforeSend ? await this._socket[name].beforeSend.call(this,Object.assign({},packet),true) : packet
|
|
log.debug({msg:'hooked packet to push', name:name, packet:hookedPacket, method:'push', line:243})
|
|
broadcast.push(this._socket[name].push.bind(this._socket[name],hookedPacket))
|
|
}
|
|
}
|
|
return Promise.all(
|
|
broadcast.map(push => {
|
|
return push()
|
|
})
|
|
)
|
|
}
|
|
|
|
// TODO make push version of all this sends
|
|
|
|
// TODO accept alt transport string i.e. t or TCP
|
|
async sendTransport(packet, transport) {
|
|
let sends = []
|
|
for (let name of Object.keys(this._socket)) {
|
|
if (this._socket[name].type === 'c') {
|
|
if (this._socket[name].transport === transport) {
|
|
sends.push(this._socket[name].send.bind(this._socket[name]))
|
|
}
|
|
}
|
|
}
|
|
if (sends.length === 1) return sends[0](packet)
|
|
return Promise.all(
|
|
sends.map(send => {
|
|
return send(packet)
|
|
})
|
|
)
|
|
}
|
|
|
|
// async sendMQTT(topic, packet) {return this._socket.mqtt.send(topic, packet)}
|
|
async sendTCP(packet) {
|
|
return this.sendTransport(packet, 't')
|
|
}
|
|
|
|
// TODO change this to PIPE
|
|
async sendIPC(packet) {
|
|
return this.sendTransport(packet, 'n')
|
|
}
|
|
|
|
// TODO add sendMQTT, sendWS
|
|
|
|
|
|
socketsListen(event,fn) {
|
|
this._eventListen('s',event,fn)
|
|
}
|
|
consumersListen(event,fn) {
|
|
this._eventListen('c',event,fn)
|
|
}
|
|
|
|
|
|
getPacketByName(name, packets) {
|
|
if (!packets.length) packets = [packets]
|
|
let found = {}
|
|
packets.some((packet, index, packets) => {
|
|
if (packet._header.sender.name === name) {
|
|
found = packets[index]
|
|
return true
|
|
}
|
|
})
|
|
return found
|
|
}
|
|
|
|
// add set of functions to class prop/space and then register with this
|
|
addNamespace(space, type, trans) {
|
|
if (type !=='c' || type !=='s') {
|
|
trans = type
|
|
type = 's' }
|
|
trans = this._validateTransport(trans)
|
|
if (trans) return this._namespaces[type + trans].unshift(space)
|
|
else return this._namespaces[type].unshift(space)
|
|
}
|
|
|
|
// TODO confirm Object.assign will be ok as it is not a deep copy
|
|
// one off add a command function or two to basic namespaces which is called before default
|
|
|
|
amendCommands(funcs, trans, type) {
|
|
if (!trans && !type) type = 's'
|
|
if (trans ==='c' || trans ==='s') {
|
|
type = trans
|
|
trans = ''
|
|
}
|
|
trans = this._validateTransport(trans)
|
|
if (!this['_'+type+trans]) this['_'+type+trans] = {}
|
|
Object.assign(this['_'+type+trans], funcs) // trans is type here
|
|
log.debug({msg:'amended namespace', default_key:'_'+type+trans, functions:this['_'+type+trans]})
|
|
}
|
|
|
|
amendConsumerCommands(funcs, trans) {
|
|
this.amendCommands(funcs,trans,'c')
|
|
}
|
|
|
|
amendSocketCommands(funcs, trans) {
|
|
this.amendCommands(funcs,trans)
|
|
}
|
|
|
|
// func should take and return a packet. if type
|
|
beforeSendHook(func,opts) {
|
|
this._packetHook('beforeSend', func,opts)
|
|
}
|
|
|
|
beforeProcessHook(func,opts) {
|
|
this._packetHook('beforeProcess', func,opts)
|
|
}
|
|
|
|
afterProcessHook(func,opts) {
|
|
this._packetHook('afterProcess', func,opts)
|
|
}
|
|
|
|
// A Big Hammer - use only if necessary - default with hooks should suffice
|
|
// these three will pre-empt default processor to be called in ._packetProcess
|
|
|
|
// add and override default processor for ALL consumers, i.e. packets returning form a send or arrived from a push
|
|
consumersProcessor(func) {
|
|
for (let name of Object.keys(this._socket)) {
|
|
if (this._socket[name].type === 'c') {
|
|
this.altProcessor(func, name)
|
|
}
|
|
}
|
|
}
|
|
|
|
// add and override default processor for ALL sockets, i.e. packets coming in from consumer send
|
|
socketsProcessor(func) {
|
|
for (let name of Object.keys(this._socket)) {
|
|
if (this._socket[name].type === 's') {
|
|
this.altProcessor(func, name)
|
|
}
|
|
}
|
|
}
|
|
// add and override a processor for a particular socket/consumer to list of processors
|
|
// if no socket name given it will replace the default processor in _processors from processing.js
|
|
altProcessor(func, socket_name) {
|
|
socket_name = socket_name || '_default'
|
|
this._processors[socket_name] = func
|
|
}
|
|
|
|
//======================================================
|
|
|
|
/*
|
|
*
|
|
* Private Methods
|
|
*
|
|
*/
|
|
|
|
_packetHook(hook,func,opts) {
|
|
log.debug({msg:'hooking a socket(s)', method:'_packetHook', line:334, hook:hook, function:func, options:opts})
|
|
let {name,type,trans,all} = opts
|
|
if (name) this._socket[name][hook] = func
|
|
else {
|
|
log.debug({msg:'sockets available to hook', method:'_packetHook', line:338, sockets: Object.keys(this._socket)})
|
|
for (let name of Object.keys(this._socket)) {
|
|
if (this._socket[name].type === type) this._socket[name][hook] = func
|
|
if (this._socket[name].transport === trans) this._socket[name][hook] = func
|
|
if (all) this._socket[name][hook] = func
|
|
if (this._socket[name][hook]) log.debug({msg:'hooked socket', method:'_packetHook', line:343, name:name, type:this._socket[name].type, trans:this._socket[name].transport})
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
/*
|
|
**********default packet processor for all sockets
|
|
* this can be hooked or replaced all together
|
|
*/
|
|
|
|
async _packetProcess(socket_name, packet) {
|
|
log.debug({ socket:socket_name, packet:packet, method:'_packetProcess', line:393, msg:'processing incoming packet'})
|
|
// the processor can be set via the incoming packet
|
|
// otherwise if one is set on the socket or the default found in processing.js
|
|
// TODO?? Try all each available packet processors in some order if fails try next one before trying the default
|
|
if (this._socket[socket_name].beforeProcess) packet = await this._socket[socket_name].beforeProcess.call(this,packet)
|
|
if (packet.error) return packet // hook invalidated the packet abort further processing
|
|
let processor = packet._processor || this._processors[socket_name] ? socket_name : '_default'
|
|
let res = await this._processors[processor].call(this,packet,socket_name)
|
|
log.debug({ socket:socket_name, response:res, msg:'processed packet ready for hook'})
|
|
if (this._socket[socket_name].afterProcess) res = await this._socket[socket_name].afterProcess.call(this,res)
|
|
log.debug({ socket:socket_name, response:res, msg:'packet after hook complete ready for return'})
|
|
return res
|
|
}
|
|
|
|
_initSocket(name) {
|
|
let socket = this._socket[name]
|
|
let init = {}
|
|
if (this._socket[name].type === 's' && this._socket[name].transport !== 'm') {
|
|
init = socket.create
|
|
} else {
|
|
init = socket.connect
|
|
}
|
|
log.info({msg:`initializing socket ${name}, ${socket.type}, ${socket.transport}`})
|
|
|
|
if (this._started) {
|
|
return init().then(function(res) {
|
|
return `socket ${name} added and initialzed, ${res}`
|
|
})
|
|
.catch(function(err) {
|
|
this.emit('error', {msg:'failed initialization', error:err, socket:socket, code:'SOCKET_INIT'})
|
|
return {msg:`socket ${name} failed initialization`, error:err}
|
|
}.bind(this)
|
|
)
|
|
}
|
|
else return {name:name, init:init}
|
|
}
|
|
|
|
// all sockets are emitters. Adds a listener to all sockets of a type with given event.
|
|
// now sockets can emit locally processed events
|
|
_eventListen(type,event,fn) {
|
|
for (let name of Object.keys(this._socket)) {
|
|
if (this._socket[name].type === type) {
|
|
if (fn==='stop') this._socket[name].removeAllListeners(event)
|
|
else {
|
|
log.debug({socket:name, type:type, event:event, msg:'adding listener to socket'})
|
|
this._socket[name].on(event, fn)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
_validateTransport(trans, type='s') {
|
|
const valids = {
|
|
w:'w',
|
|
web:'w',
|
|
n:'n',
|
|
named:'n',
|
|
unix:'n',
|
|
pipe:'n',
|
|
t:'t',
|
|
tcp:'t',
|
|
net:'t',
|
|
network:'t',
|
|
m:'m',
|
|
mqtt:'m',
|
|
}
|
|
trans = valids[trans] || ''
|
|
if (type !== 'c' && trans ==='w') trans = ''
|
|
return trans
|
|
}
|
|
|
|
|
|
_transport(name) {
|
|
return this._socket[name].transport
|
|
} //getter for socket transport
|
|
_type(name) {
|
|
return this._socket[name].type
|
|
} //getter for socket type
|
|
|
|
_getTransportNamespaces(socket) {
|
|
return this._namespace[this._type(socket) + this._transport(socket)]
|
|
}
|
|
|
|
_getCmdFuncNamespace(cmd, namespaces) {
|
|
let cmd_func = null
|
|
namespaces.some(namespace => {
|
|
namespace = namespace ? namespace + '.' + cmd : cmd
|
|
cmd_func = this._getCmdFunc(namespace)
|
|
if (cmd_func) return true
|
|
})
|
|
return cmd_func
|
|
}
|
|
|
|
// takes command and returns corresponding function in a hash, recurisve walk
|
|
_getCmdFunc(cmd, obj) {
|
|
if (typeof cmd === 'string') {
|
|
if (!obj) obj = this
|
|
cmd = cmd.split(/[.:/]+/)
|
|
}
|
|
var prop = cmd.shift()
|
|
if (cmd.length === 0) return obj[prop]
|
|
if (!obj[prop]) return null
|
|
log.debug({length:cmd.length,cmd:cmd, prop:prop, objprop:obj[prop], method:'_getCmdFunc', msg:'command to corresponding function in a hash'})
|
|
return this._getCmdFunc(cmd, obj[prop])
|
|
}
|
|
|
|
// primary function to find a function to call based on packet cmd
|
|
async _callCmdFunc(packet, socket) {
|
|
let cmd_func = this._getCmdFuncNamespace(
|
|
packet.cmd,
|
|
this._namespaces[this._type(socket) + this._transport(socket)]
|
|
)
|
|
if (cmd_func) return await cmd_func.bind(this)(packet) // todo try .call
|
|
cmd_func = this._getCmdFuncNamespace(
|
|
packet.cmd,
|
|
this._namespaces[this._type(socket)]
|
|
)
|
|
if (cmd_func) return await cmd_func.bind(this)(packet)
|
|
return 'failed'
|
|
}
|
|
|
|
|
|
} // end Base Class
|
|
|
|
export default Base
|