From b938647efee9e9487df25ab4f9ee2e50acac2808 Mon Sep 17 00:00:00 2001 From: David Kebler Date: Tue, 6 Feb 2018 18:30:00 -0800 Subject: [PATCH] improved namespace handling and added transport namespacing added / delimiter to command/subcommand --- examples/four-in-one.mjs | 137 ++++++++++++++++++++------------------- src/base.mjs | 74 +++++++++++++++------ src/processing.mjs | 56 +++++++--------- 3 files changed, 148 insertions(+), 119 deletions(-) diff --git a/examples/four-in-one.mjs b/examples/four-in-one.mjs index f795ccf..5a7bd91 100644 --- a/examples/four-in-one.mjs +++ b/examples/four-in-one.mjs @@ -25,67 +25,89 @@ const socketfuncs = { } } + + +const tcpfuncs = { + write: { + happy: function(packet){ + return new Promise( async (resolve) => { + let res = {} + res.req = packet + res.cmd='reply' + res.response='Im a TCP happy puppy :)' + return resolve(res) + }) + }, + sad: function(packet){ + return new Promise( async (resolve) => { + let res = {} + res.req = packet + res.cmd='reply' + res.response='Im a TCP sad dog :(' + return resolve(res) + }) + } + } +} + + +const delay = time => new Promise(res=>setTimeout(()=>res(),time)) + ; (async () => { - const delay = time => new Promise(res=>setTimeout(()=>res(),time)) - - // let app = new Base({com:'us,uc,ts,tc', id:'example', path: USOCKET, log:false}) - // let fio = new Base({sockets:'uc#c>n,us#s>n,tc#c>t,ts#s>t', id:'four-in-one'}) - let fio = new Base({sockets:'uc#c>n,us#s>n', id:'four-in-one'}) - // app.amendPacketProcessing(basefuncs) - + let fio = new Base({sockets:'uc#c>n,us#s>n,tc#c>t,ts#s>t', id:'four-in-one'}) + // let fio = new Base({sockets:'uc#c>n,us#s>n', id:'four-in-one'}) await fio.init() - fio.s = socketfuncs + fio.st = tcpfuncs + fio.ct = {reply: packet =>{ + console.log('==============Packet Displayed for TCP consumer received packets only') + console.dir(packet) + }} + fio.cn = { + reply: function (packet) { + console.log('==============Replay for only Named Pipe Consumer=========') + console.dir(packet) + console.log('===========================') + } + } - // - // fio.reply = packet =>{ - // console.log('==============Packet Displayed by Generic reply function') - // console.dir(packet) - // } - + fio.good = { + bad: function(packet){ + return new Promise( async (resolve) => { + let res = {} + res.req = packet + res.cmd='good/ugly' + res.response='The Good The Bad and The Ugly' + return resolve(res) + }) + }, + ugly: function (packet) { + console.log('==============reply from Good Bad command =========') + console.log(packet.response) + console.log('===========================') + } + } let packet = {} console.log('=============sending============') - packet = {cmd:'echo', data:'some data to echo'} + // packet = {cmd:'echo', data:'some data to echo'} + // console.log(packet) + // await fio.send(packet) + // packet = {cmd:'write:happy', data:'data to write'} + // console.log(packet) + // await fio.send(packet) + // packet = {cmd:'write:sad', data:'data to write'} + // console.log(packet) + // await fio.send(packet) + // packet = {cmd:'write:sad', data:'sent only via tcp'} + // console.log(packet) + // await fio.sendTCP(packet) + packet = {cmd:'good:bad', data:'data sent that has no corresponding function'} console.log(packet) await fio.send(packet) - // fio.amendSocketProcessing(socketfuncs) - // - packet = {cmd:'write:happy', data:'data to write'} - console.log(packet) - await fio.send(packet) - packet = {cmd:'write:sad', data:'data to write'} - console.log(packet) - await fio.send(packet) - - delay(3000) - // fio.amendConsumerProcessing({ - // reply: packet => { - // console.log('==============Amended Default Packet Replay for Consumer=========') - // console.dir(packet) - // console.log('===========================') - // } - // }) - - - // await app.send(packet) - // await delay(500) - // app.amendPacketContext( - // {write: function(packet){ - // packet.cmd='log' - // packet.response='return of AMMEDED write command' - // return packet - // }} - // ) - // await delay(500) - // packet = {cmd:'write', data:'2ND data to write'} - // await app.sendIPC(packet) - // packet = {cmd:'write2', data:'data to write'} - // await app.send(packet) - // // await delay(2000) process.kill(process.pid, 'SIGTERM') @@ -94,20 +116,3 @@ const socketfuncs = { console.error('FATAL: UNABLE TO START SYSTEM!\n',err) process.kill(process.pid, 'SIGTERM') }) - - - -const tcpfuncs = { - write: function(packet){ - packet.cmd='log' - packet.response='return of write command' - packet.via = 'tcp write' - return packet - }, - write2: function(packet){ - packet.cmd='log' - packet.response='return of write2 command' - packet.via = 'tcp write2' - return packet - } -} diff --git a/src/base.mjs b/src/base.mjs index 0875e25..f0f82a0 100644 --- a/src/base.mjs +++ b/src/base.mjs @@ -2,7 +2,7 @@ import UCISocket from '../../uci-socket/src' import EventEmitter from 'events' -import { processor, commands } from './processing.mjs' +import { processor, commands, namespaces } from './processing.mjs' export default class Base extends EventEmitter { constructor(opts={}) { @@ -12,6 +12,7 @@ export default class Base extends EventEmitter { this.socket={} this._processors = { _default: processor } this._defaultCmds = commands + this._namespaces = namespaces opts.sockets.split(/[,|\s]+/).forEach( socketStr => { let socket = {} socketStr.split(/[>#]+/).map(function(prop,index) { @@ -44,12 +45,12 @@ export default class Base extends EventEmitter { } // init - async send (name, packet) { - if (typeof name === 'string') { + // TODO make some kind of ability to wait for reply if needed before sending any more requests + async send (packet,name) { + if (name) { if (this.socket[name]) await this.socket[name].send(packet) } else { - packet = name for(let name of Object.keys(this.socket)){ if (this.socket[name].type ==='c') { await this.socket[name].send(packet) @@ -59,25 +60,19 @@ export default class Base extends EventEmitter { this.emit('packet', packet) // emits on instance for instance use } - getSocket(name) {return this.socket[name]} - - getCmdFunc (cmd,obj) { - if (typeof cmd ==='string') { - if (typeof obj ==='string' || obj === null) cmd = obj+'.'+cmd - cmd=cmd.split(/[.:]+/) - obj = this - // console.log('===================',cmd, this.id) + async sendTransport(packet,transport) { + for(let name of Object.keys(this.socket)){ + if (this.socket[name].type ==='c') { + if (this.socket[name].transport === transport) return await this.socket[name].send(packet) + } } - var prop=cmd.shift() - if (cmd.length === 0) return obj[prop] - if(!obj[prop]) return null - // console.log(cmd.length,cmd,prop, obj[prop]) - return this.getCmdFunc(cmd, obj[prop]) } + async sendTCP(packet) {return this.sendTransport(packet,'t')} + async sendIPC(packet) {return this.sendTransport(packet,'n')} - sendSocket(name) {} - sendTransport(trans) {} + + getSocket(name) {return this.socket[name]} amendConsumerProcessing(funcs,trans) { if (trans) { @@ -130,6 +125,11 @@ export default class Base extends EventEmitter { this._processors[socket_name]._process = func } + addNamespace(space,type,trans) { + if (trans) return this._namespaces[type+trans].unshift(space) + else return this._namespaces[type].unshift(space) + } + packetProcessor(func) { this._packetProcess = func } @@ -143,6 +143,42 @@ export default class Base extends EventEmitter { _transport(name) {return this.socket[name].transport} _type(name) {return this.socket[name].type} + _getTransportNamespaces(socket) { + return this._namespace[this._type(socket)+this._transport(socket)] + } + + _getCmdFuncNamespace (cmd,namespaces) { + let cmd_func = null + namespaces.some( namespace => { + namespace = namespace ? namespace+'.'+cmd : cmd + cmd_func = this._getCmdFunc(namespace) + if (cmd_func) return true + }) + return cmd_func + } + + _getCmdFunc (cmd,obj) { + // console.log('obj',obj) + if (typeof cmd ==='string') { + if (!obj) obj = this + cmd=cmd.split(/[.:/]+/) + // console.log('===================',cmd) + } + var prop=cmd.shift() + if (cmd.length === 0) return obj[prop] + if(!obj[prop]) return null + // console.log(cmd.length,cmd,prop, obj[prop]) + return this._getCmdFunc(cmd, obj[prop]) + } + + async _callCmdFunc(packet,socket) { + let cmd_func = this._getCmdFuncNamespace(packet.cmd,this._namespaces[this._type(socket)+this._transport(socket)]) + if (cmd_func) return await cmd_func.bind(this)(packet) + cmd_func = this._getCmdFuncNamespace(packet.cmd,this._namespaces[this._type(socket)]) + if (cmd_func) return await cmd_func.bind(this)(packet) + return 'failed' + } + /* **********default packet processor for all sockets */ diff --git a/src/processing.mjs b/src/processing.mjs index 75b7d1b..706a021 100644 --- a/src/processing.mjs +++ b/src/processing.mjs @@ -3,50 +3,46 @@ // this._processing refers to this module/hash const processor = async function (packet,socket) { - return await process[this.getSocket(socket).type].bind(this)(packet) + return await process[this.getSocket(socket).type].bind(this)(packet,socket) } -export { processor, commands } +export { processor, commands, namespaces } const process = { - s: async function (packet) { - // add namepaces by unshifting them onto look_in + s: async function (packet,socket) { // console.log('in default socket processor',packet.cmd) - if (!packet.cmd) return {error: 'no command in packet', packet: packet } - let cmd_func = null - let namespaces = ['s',null,'_defaultCmds.s'] - namespaces.some( namespace => { - cmd_func = this.getCmdFunc(namespace+'.'+packet.cmd) - if (cmd_func) return true - }) - if (cmd_func) return cmd_func.bind(this)(packet) + if (!packet.cmd) return {error: '[socket] no command in packet', packet: packet } + let response = await this._callCmdFunc(packet,socket); if(response!=='failed') return response return {error: 'no socket processing function supplied for command', packet: packet } }, - c: async function (packet) { + c: async function (packet,socket) { + // console.log('in default consumer processor',packet.cmd) - if (packet.error) this._defaultCmds.c.error(packet) + if (packet.error) return await this._defaultCmds.c.error(packet) if (packet.cmd) { - // move namespaces to class constructor and add unshift method for adding - let namespaces = ['c.'+packet.cmd,packet.cmd,'_defaultCmds.c.'+packet.cmd] - let cmd_func = false - namespaces.forEach( location => { - cmd_func = this.getCmdFunc(location) - if(cmd_func)return - }) - if (cmd_func) return cmd_func.bind(this)(packet) + let response = await this._callCmdFunc(packet,socket); if(response!=='failed') return response packet = {error:'no consumer processing function supplied for command',packet:packet} this._defaultCmds.c.error(packet) } else { - packet = {error:'no command in packet',packet:packet} - this._defaultCmds.c.error(packet) + packet = {error:'[consumer] no command in packet',packet:packet} + return await this._defaultCmds.c.error(packet) } } } +const namespaces = { + s: ['s',null,'_defaultCmds.s'], + c: ['c',null,'_defaultCmds.c'], + cn: ['cn'], + ct: ['ct'], + sn: ['sn'], + st: ['st'], +} + /* * -* Default packed command processing +* Default packet command processing functions * */ @@ -69,17 +65,9 @@ const commands ={ console.log('===========================') }, reply: function(packet) { - console.log('==============Packet returned from socket==========') + console.log('==============Packet returned from socket - default reply==========') console.dir(packet) console.log('===========================') - this.amendConsumerProcessing({ - reply: function (packet) { - console.log('==============Amended Default Packet Replay for Consumer=========') - console.dir(packet) - console.log('===========================') - } - }) - } } }