244 lines
7.5 KiB
JavaScript
244 lines
7.5 KiB
JavaScript
import UCISocket from '@uci/socket'
|
|
import EventEmitter from 'events'
|
|
import { bindFuncs } from '@uci/utils/src/function'
|
|
|
|
import { processor, commands, namespaces } from './processing.mjs'
|
|
|
|
import logger from '@uci/logger'
|
|
let log = {}
|
|
const LOG_OPTS = {
|
|
repo:'uci-base',
|
|
npm:'@uci/base',
|
|
file:'src/base.mjs',
|
|
class:'Base',
|
|
id:this.id,
|
|
instance_created:new Date().getTime()
|
|
}
|
|
|
|
export default class Base extends EventEmitter {
|
|
constructor(opts={}) {
|
|
super()
|
|
log = logger.child(LOG_OPTS)
|
|
this.id = opts.id || opts.name || 'uci-base:'+ new Date().getTime()
|
|
this.desc = opts.desc // additional details for humans
|
|
this.socket={}
|
|
this._processors = { _default: processor }
|
|
this._defaultCmds = commands
|
|
this._namespaces = namespaces
|
|
if(opts.sockets) {
|
|
opts.sockets.split(/[,|\s]+/).forEach( socketStr => {
|
|
let socket = {}
|
|
socketStr.split(/[>#]+/).map(function(prop,index) {
|
|
socket[SOCKET_INFO_KEYS[index]] = prop
|
|
})
|
|
if (!opts[socket.name]) opts[socket.name] = {}
|
|
if (socket.transport ==='n') opts[socket.name].np = true
|
|
opts[socket.name].id = this.id +':'+ socket.name
|
|
// console.log(TRANSLATIONS[socket.type])
|
|
this.socket[socket.name] = new UCISocket[TRANSLATE[socket.type]](opts[socket.name])
|
|
// console.log(socket.name, this.socket[socket.name].send)
|
|
Object.assign(this.socket[socket.name],socket) // copy socket info props to new socket
|
|
this.socket[socket.name]._packetProcess = this._packetProcess.bind(this,socket.name)
|
|
})
|
|
} else log.warn({opts:opts},'no sockets requested for creations -- on standard emitter')
|
|
this.bindFuncs = bindFuncs
|
|
// console.log('base opts', opts)
|
|
|
|
} // end constructor
|
|
|
|
async init () {
|
|
let sockets = []
|
|
for(let name of Object.keys(this.socket)){
|
|
if (this.socket[name].type ==='s') {
|
|
sockets.push(this.socket[name].create())
|
|
// setTimeout(this.socket[type].create,200) // test for consumer retry for delayed socket creation
|
|
}
|
|
else {
|
|
sockets.push(this.socket[name].connect())
|
|
}
|
|
}
|
|
// TODO maybe throw if one fails
|
|
return Promise.all(sockets).then(() => {return 'ready'}).catch((err) => {return err})
|
|
|
|
} // init
|
|
|
|
async end(name) {} // TODO end all or named sockets
|
|
|
|
async addSocket(options) {} //TODO add ability to add another socket at runtime and initialize
|
|
|
|
async send (name,packet) {
|
|
if (typeof name !== 'string') {
|
|
packet = name
|
|
let sends = []
|
|
for(let name of Object.keys(this.socket)){
|
|
if (this.socket[name].type ==='c') {
|
|
// console.log(name, this.socket[name])
|
|
sends.push(this.socket[name].send.bind(this.socket[name]))
|
|
}
|
|
}
|
|
// console.log(sends.map(send => {return send(packet)}))
|
|
if (sends.length === 1) return sends[0](packet)
|
|
return Promise.all(sends.map(send => {return send(packet)}))
|
|
} else {
|
|
if (this.socket[name]) return await this.socket[name].send(packet)
|
|
else return {error: `not consumer socket of name ${name}`}
|
|
}
|
|
}
|
|
|
|
async sendTransport(packet,transport) {
|
|
let sends = []
|
|
for(let name of Object.keys(this.socket)){
|
|
if (this.socket[name].type ==='c') {
|
|
if (this.socket[name].transport === transport) {
|
|
sends.push(this.socket[name].send.bind(this.socket[name]))
|
|
}
|
|
}
|
|
}
|
|
if (sends.length === 1) return sends[0](packet)
|
|
return Promise.all(sends.map(send => {return send(packet)}))
|
|
}
|
|
|
|
async sendTCP(packet) {return this.sendTransport(packet,'t')}
|
|
async sendIPC(packet) {return this.sendTransport(packet,'n')}
|
|
|
|
getSocket(name) {return this.socket[name]}
|
|
|
|
getPacketByName(name, packets) {
|
|
if (!packets.length) packets = [packets]
|
|
let found = {}
|
|
packets.some((packet,index,packets) => {
|
|
if (packet._header.sender.name === name) {
|
|
found = packets[index]
|
|
return true
|
|
}
|
|
})
|
|
return found
|
|
}
|
|
|
|
|
|
// TODO confirm Object.assign will be ok as it is not a deep copy
|
|
amendConsumerProcessing(funcs,trans) {
|
|
if (trans) {
|
|
if (!this._defaultCmds.c[trans]) this._defaultCmds.c[trans] ={}
|
|
Object.assign(this._defaultCmds.c[trans],funcs)
|
|
}
|
|
Object.assign(this._defaultCmds.c,funcs)
|
|
}
|
|
|
|
amendSocketProcessing(funcs,trans) {
|
|
if (trans) {
|
|
if (!this._defaultCmds.c[trans]) this._defaultCmds.c[trans] ={}
|
|
Object.assign(this._defaultCmds.c[trans],funcs)
|
|
}
|
|
Object.assign(this._defaultCmds.c,funcs)
|
|
}
|
|
// use s: and c: keys TODO need to change this
|
|
addNamedProcessing(name,funcs,type) {
|
|
if (type){
|
|
if(!this._cmds[name][type]) this._cmds[name][type] = {}
|
|
Object.assign(this._cmds[name][type],funcs)
|
|
} else {
|
|
if(!this._cmds[name]) this._cmds[name] ={}
|
|
Object.assign(this._cmds[name],funcs)
|
|
}
|
|
}
|
|
|
|
// takes and returns a packet
|
|
beforeWriteHook (type,funcs){} // TODO before packet send
|
|
afterReceiveHook(type,funcs){} // TODO after receiv
|
|
afterProcessHook(type,funcs){} // TODO
|
|
|
|
// here you can add namespaced functions for packet commands
|
|
consumersProcessor(func) {
|
|
for(let name of Object.keys(this.socket)){
|
|
if (this.socket[name].type ==='c') {
|
|
this.socketNamedProcessor(func,name)
|
|
}
|
|
}
|
|
}
|
|
|
|
socketsProcessor(func) {
|
|
for(let name of Object.keys(this.socket)){
|
|
if (this.socket[name].type ==='s') {
|
|
this.socketNamedProcessor(func,name)
|
|
}
|
|
}
|
|
}
|
|
|
|
socketNameProcessor(func,socket_name) {
|
|
socket_name = socket_name || '_default'
|
|
this._processors[socket_name]._process = func
|
|
}
|
|
|
|
addNamespace(space,type,trans) {
|
|
if (trans) return this._namespaces[type+trans].unshift(space)
|
|
else return this._namespaces[type].unshift(space)
|
|
}
|
|
|
|
packetProcessor(func) {
|
|
this._packetProcess = func
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
*
|
|
* Private Methods
|
|
*
|
|
*/
|
|
|
|
_transport(name) {return this.socket[name].transport}
|
|
_type(name) {return this.socket[name].type}
|
|
|
|
_getTransportNamespaces(socket) {
|
|
return this._namespace[this._type(socket)+this._transport(socket)]
|
|
}
|
|
|
|
_getCmdFuncNamespace (cmd,namespaces) {
|
|
let cmd_func = null
|
|
namespaces.some( namespace => {
|
|
namespace = namespace ? namespace+'.'+cmd : cmd
|
|
cmd_func = this._getCmdFunc(namespace)
|
|
if (cmd_func) return true
|
|
})
|
|
return cmd_func
|
|
}
|
|
|
|
_getCmdFunc (cmd,obj) {
|
|
// console.log('obj',obj)
|
|
if (typeof cmd ==='string') {
|
|
if (!obj) obj = this
|
|
cmd=cmd.split(/[.:/]+/)
|
|
// console.log('===================',cmd)
|
|
}
|
|
var prop=cmd.shift()
|
|
if (cmd.length === 0) return obj[prop]
|
|
if(!obj[prop]) return null
|
|
// console.log(cmd.length,cmd,prop, obj[prop])
|
|
return this._getCmdFunc(cmd, obj[prop])
|
|
}
|
|
|
|
async _callCmdFunc(packet,socket) {
|
|
let cmd_func = this._getCmdFuncNamespace(packet.cmd,this._namespaces[this._type(socket)+this._transport(socket)])
|
|
if (cmd_func) return await cmd_func.bind(this)(packet)
|
|
cmd_func = this._getCmdFuncNamespace(packet.cmd,this._namespaces[this._type(socket)])
|
|
if (cmd_func) return await cmd_func.bind(this)(packet)
|
|
return 'failed'
|
|
}
|
|
|
|
/*
|
|
**********default packet processor for all sockets
|
|
*/
|
|
|
|
async _packetProcess (socket_name,packet) {
|
|
// console.log(socket_name,packet)
|
|
let processor = packet._processor || this._processors[socket_name] || '_default'
|
|
return await this._processors[processor].bind(this)(packet,socket_name,this._processors[processor])
|
|
}
|
|
|
|
|
|
} // end class
|
|
|
|
const SOCKET_INFO_KEYS = ['name','type','transport']
|
|
const TRANSLATE= {n:'Named Pipe',t:'TCP',s:'Socket',c:'Consumer'}
|