improved namespace handling and added transport namespacing

added / delimiter to command/subcommand
master
David Kebler 2018-02-06 18:30:00 -08:00
parent ddd81ea671
commit b938647efe
3 changed files with 148 additions and 119 deletions

View File

@ -25,67 +25,89 @@ const socketfuncs = {
}
}
;
(async () => {
const tcpfuncs = {
write: {
happy: function(packet){
return new Promise( async (resolve) => {
let res = {}
res.req = packet
res.cmd='reply'
res.response='Im a TCP happy puppy :)'
return resolve(res)
})
},
sad: function(packet){
return new Promise( async (resolve) => {
let res = {}
res.req = packet
res.cmd='reply'
res.response='Im a TCP sad dog :('
return resolve(res)
})
}
}
}
const delay = time => new Promise(res=>setTimeout(()=>res(),time))
// let app = new Base({com:'us,uc,ts,tc', id:'example', path: USOCKET, log:false})
// let fio = new Base({sockets:'uc#c>n,us#s>n,tc#c>t,ts#s>t', id:'four-in-one'})
let fio = new Base({sockets:'uc#c>n,us#s>n', id:'four-in-one'})
// app.amendPacketProcessing(basefuncs)
;
(async () => {
let fio = new Base({sockets:'uc#c>n,us#s>n,tc#c>t,ts#s>t', id:'four-in-one'})
// let fio = new Base({sockets:'uc#c>n,us#s>n', id:'four-in-one'})
await fio.init()
fio.s = socketfuncs
fio.st = tcpfuncs
fio.ct = {reply: packet =>{
console.log('==============Packet Displayed for TCP consumer received packets only')
console.dir(packet)
}}
fio.cn = {
reply: function (packet) {
console.log('==============Replay for only Named Pipe Consumer=========')
console.dir(packet)
console.log('===========================')
}
}
//
// fio.reply = packet =>{
// console.log('==============Packet Displayed by Generic reply function')
// console.dir(packet)
// }
fio.good = {
bad: function(packet){
return new Promise( async (resolve) => {
let res = {}
res.req = packet
res.cmd='good/ugly'
res.response='The Good The Bad and The Ugly'
return resolve(res)
})
},
ugly: function (packet) {
console.log('==============reply from Good Bad command =========')
console.log(packet.response)
console.log('===========================')
}
}
let packet = {}
console.log('=============sending============')
packet = {cmd:'echo', data:'some data to echo'}
// packet = {cmd:'echo', data:'some data to echo'}
// console.log(packet)
// await fio.send(packet)
// packet = {cmd:'write:happy', data:'data to write'}
// console.log(packet)
// await fio.send(packet)
// packet = {cmd:'write:sad', data:'data to write'}
// console.log(packet)
// await fio.send(packet)
// packet = {cmd:'write:sad', data:'sent only via tcp'}
// console.log(packet)
// await fio.sendTCP(packet)
packet = {cmd:'good:bad', data:'data sent that has no corresponding function'}
console.log(packet)
await fio.send(packet)
// fio.amendSocketProcessing(socketfuncs)
//
packet = {cmd:'write:happy', data:'data to write'}
console.log(packet)
await fio.send(packet)
packet = {cmd:'write:sad', data:'data to write'}
console.log(packet)
await fio.send(packet)
delay(3000)
// fio.amendConsumerProcessing({
// reply: packet => {
// console.log('==============Amended Default Packet Replay for Consumer=========')
// console.dir(packet)
// console.log('===========================')
// }
// })
// await app.send(packet)
// await delay(500)
// app.amendPacketContext(
// {write: function(packet){
// packet.cmd='log'
// packet.response='return of AMMEDED write command'
// return packet
// }}
// )
// await delay(500)
// packet = {cmd:'write', data:'2ND data to write'}
// await app.sendIPC(packet)
// packet = {cmd:'write2', data:'data to write'}
// await app.send(packet)
// //
await delay(2000)
process.kill(process.pid, 'SIGTERM')
@ -94,20 +116,3 @@ const socketfuncs = {
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)
process.kill(process.pid, 'SIGTERM')
})
const tcpfuncs = {
write: function(packet){
packet.cmd='log'
packet.response='return of write command'
packet.via = 'tcp write'
return packet
},
write2: function(packet){
packet.cmd='log'
packet.response='return of write2 command'
packet.via = 'tcp write2'
return packet
}
}

View File

@ -2,7 +2,7 @@
import UCISocket from '../../uci-socket/src'
import EventEmitter from 'events'
import { processor, commands } from './processing.mjs'
import { processor, commands, namespaces } from './processing.mjs'
export default class Base extends EventEmitter {
constructor(opts={}) {
@ -12,6 +12,7 @@ export default class Base extends EventEmitter {
this.socket={}
this._processors = { _default: processor }
this._defaultCmds = commands
this._namespaces = namespaces
opts.sockets.split(/[,|\s]+/).forEach( socketStr => {
let socket = {}
socketStr.split(/[>#]+/).map(function(prop,index) {
@ -44,12 +45,12 @@ export default class Base extends EventEmitter {
} // init
async send (name, packet) {
if (typeof name === 'string') {
// TODO make some kind of ability to wait for reply if needed before sending any more requests
async send (packet,name) {
if (name) {
if (this.socket[name]) await this.socket[name].send(packet)
}
else {
packet = name
for(let name of Object.keys(this.socket)){
if (this.socket[name].type ==='c') {
await this.socket[name].send(packet)
@ -59,26 +60,20 @@ export default class Base extends EventEmitter {
this.emit('packet', packet) // emits on instance for instance use
}
async sendTransport(packet,transport) {
for(let name of Object.keys(this.socket)){
if (this.socket[name].type ==='c') {
if (this.socket[name].transport === transport) return await this.socket[name].send(packet)
}
}
}
async sendTCP(packet) {return this.sendTransport(packet,'t')}
async sendIPC(packet) {return this.sendTransport(packet,'n')}
getSocket(name) {return this.socket[name]}
getCmdFunc (cmd,obj) {
if (typeof cmd ==='string') {
if (typeof obj ==='string' || obj === null) cmd = obj+'.'+cmd
cmd=cmd.split(/[.:]+/)
obj = this
// console.log('===================',cmd, this.id)
}
var prop=cmd.shift()
if (cmd.length === 0) return obj[prop]
if(!obj[prop]) return null
// console.log(cmd.length,cmd,prop, obj[prop])
return this.getCmdFunc(cmd, obj[prop])
}
sendSocket(name) {}
sendTransport(trans) {}
amendConsumerProcessing(funcs,trans) {
if (trans) {
if (!this._defaultCmds.c[trans]) this._defaultCmds.c[trans] ={}
@ -130,6 +125,11 @@ export default class Base extends EventEmitter {
this._processors[socket_name]._process = func
}
addNamespace(space,type,trans) {
if (trans) return this._namespaces[type+trans].unshift(space)
else return this._namespaces[type].unshift(space)
}
packetProcessor(func) {
this._packetProcess = func
}
@ -143,6 +143,42 @@ export default class Base extends EventEmitter {
_transport(name) {return this.socket[name].transport}
_type(name) {return this.socket[name].type}
_getTransportNamespaces(socket) {
return this._namespace[this._type(socket)+this._transport(socket)]
}
_getCmdFuncNamespace (cmd,namespaces) {
let cmd_func = null
namespaces.some( namespace => {
namespace = namespace ? namespace+'.'+cmd : cmd
cmd_func = this._getCmdFunc(namespace)
if (cmd_func) return true
})
return cmd_func
}
_getCmdFunc (cmd,obj) {
// console.log('obj',obj)
if (typeof cmd ==='string') {
if (!obj) obj = this
cmd=cmd.split(/[.:/]+/)
// console.log('===================',cmd)
}
var prop=cmd.shift()
if (cmd.length === 0) return obj[prop]
if(!obj[prop]) return null
// console.log(cmd.length,cmd,prop, obj[prop])
return this._getCmdFunc(cmd, obj[prop])
}
async _callCmdFunc(packet,socket) {
let cmd_func = this._getCmdFuncNamespace(packet.cmd,this._namespaces[this._type(socket)+this._transport(socket)])
if (cmd_func) return await cmd_func.bind(this)(packet)
cmd_func = this._getCmdFuncNamespace(packet.cmd,this._namespaces[this._type(socket)])
if (cmd_func) return await cmd_func.bind(this)(packet)
return 'failed'
}
/*
**********default packet processor for all sockets
*/

View File

@ -3,50 +3,46 @@
// this._processing refers to this module/hash
const processor = async function (packet,socket) {
return await process[this.getSocket(socket).type].bind(this)(packet)
return await process[this.getSocket(socket).type].bind(this)(packet,socket)
}
export { processor, commands }
export { processor, commands, namespaces }
const process = {
s: async function (packet) {
// add namepaces by unshifting them onto look_in
s: async function (packet,socket) {
// console.log('in default socket processor',packet.cmd)
if (!packet.cmd) return {error: 'no command in packet', packet: packet }
let cmd_func = null
let namespaces = ['s',null,'_defaultCmds.s']
namespaces.some( namespace => {
cmd_func = this.getCmdFunc(namespace+'.'+packet.cmd)
if (cmd_func) return true
})
if (cmd_func) return cmd_func.bind(this)(packet)
if (!packet.cmd) return {error: '[socket] no command in packet', packet: packet }
let response = await this._callCmdFunc(packet,socket); if(response!=='failed') return response
return {error: 'no socket processing function supplied for command', packet: packet }
},
c: async function (packet) {
c: async function (packet,socket) {
// console.log('in default consumer processor',packet.cmd)
if (packet.error) this._defaultCmds.c.error(packet)
if (packet.error) return await this._defaultCmds.c.error(packet)
if (packet.cmd) {
// move namespaces to class constructor and add unshift method for adding
let namespaces = ['c.'+packet.cmd,packet.cmd,'_defaultCmds.c.'+packet.cmd]
let cmd_func = false
namespaces.forEach( location => {
cmd_func = this.getCmdFunc(location)
if(cmd_func)return
})
if (cmd_func) return cmd_func.bind(this)(packet)
let response = await this._callCmdFunc(packet,socket); if(response!=='failed') return response
packet = {error:'no consumer processing function supplied for command',packet:packet}
this._defaultCmds.c.error(packet)
} else {
packet = {error:'no command in packet',packet:packet}
this._defaultCmds.c.error(packet)
packet = {error:'[consumer] no command in packet',packet:packet}
return await this._defaultCmds.c.error(packet)
}
}
}
const namespaces = {
s: ['s',null,'_defaultCmds.s'],
c: ['c',null,'_defaultCmds.c'],
cn: ['cn'],
ct: ['ct'],
sn: ['sn'],
st: ['st'],
}
/*
*
* Default packed command processing
* Default packet command processing functions
*
*/
@ -69,17 +65,9 @@ const commands ={
console.log('===========================')
},
reply: function(packet) {
console.log('==============Packet returned from socket==========')
console.dir(packet)
console.log('===========================')
this.amendConsumerProcessing({
reply: function (packet) {
console.log('==============Amended Default Packet Replay for Consumer=========')
console.log('==============Packet returned from socket - default reply==========')
console.dir(packet)
console.log('===========================')
}
})
}
}
}