refactored processing and packet command function lookup based on priortized namespaces. Packets can now have subcommands by use using . or : which will look in sub keys for the command sub function
parent
e5059368a9
commit
ddd81ea671
|
@ -3,15 +3,25 @@ import Base from '../src/base'
|
||||||
const USOCKET = __dirname + '/sample.sock'
|
const USOCKET = __dirname + '/sample.sock'
|
||||||
|
|
||||||
const socketfuncs = {
|
const socketfuncs = {
|
||||||
write: function(packet){
|
write: {
|
||||||
packet.cmd='reply'
|
happy: function(packet){
|
||||||
packet.response='return of write command'
|
return new Promise( async (resolve) => {
|
||||||
return packet
|
let res = {}
|
||||||
},
|
res.req = packet
|
||||||
write2: function(packet){
|
res.cmd='reply'
|
||||||
packet.cmd='reply'
|
res.response='Im a happy puppy :)'
|
||||||
packet.response='return of write2 command'
|
return resolve(res)
|
||||||
return packet
|
})
|
||||||
|
},
|
||||||
|
sad: function(packet){
|
||||||
|
return new Promise( async (resolve) => {
|
||||||
|
let res = {}
|
||||||
|
res.req = packet
|
||||||
|
res.cmd='reply'
|
||||||
|
res.response='Im a sad dog :('
|
||||||
|
return resolve(res)
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -21,24 +31,19 @@ const socketfuncs = {
|
||||||
const delay = time => new Promise(res=>setTimeout(()=>res(),time))
|
const delay = time => new Promise(res=>setTimeout(()=>res(),time))
|
||||||
|
|
||||||
// let app = new Base({com:'us,uc,ts,tc', id:'example', path: USOCKET, log:false})
|
// let app = new Base({com:'us,uc,ts,tc', id:'example', path: USOCKET, log:false})
|
||||||
let fio = new Base({sockets:'uc#c>n,us#s>n,tc#c>t,ts#s>t', id:'four-in-one'})
|
// let fio = new Base({sockets:'uc#c>n,us#s>n,tc#c>t,ts#s>t', id:'four-in-one'})
|
||||||
|
let fio = new Base({sockets:'uc#c>n,us#s>n', id:'four-in-one'})
|
||||||
// app.amendPacketProcessing(basefuncs)
|
// app.amendPacketProcessing(basefuncs)
|
||||||
|
|
||||||
await fio.init()
|
await fio.init()
|
||||||
|
|
||||||
fio.amendNamedProcessing('tc',{
|
fio.s = socketfuncs
|
||||||
reply: packet => {
|
|
||||||
console.log('==============Packet returned to nameed consumer tc==========')
|
|
||||||
console.dir(packet)
|
|
||||||
console.log('===========================')
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
fio.reply = packet =>{
|
//
|
||||||
console.log('==============Packet Displayed by Generic reply function')
|
// fio.reply = packet =>{
|
||||||
console.dir(packet)
|
// console.log('==============Packet Displayed by Generic reply function')
|
||||||
}
|
// console.dir(packet)
|
||||||
|
// }
|
||||||
|
|
||||||
|
|
||||||
let packet = {}
|
let packet = {}
|
||||||
|
@ -47,11 +52,25 @@ const socketfuncs = {
|
||||||
console.log(packet)
|
console.log(packet)
|
||||||
await fio.send(packet)
|
await fio.send(packet)
|
||||||
|
|
||||||
fio.amendSocketProcessing(socketfuncs)
|
// fio.amendSocketProcessing(socketfuncs)
|
||||||
|
//
|
||||||
packet = {cmd:'write', data:'data to write'}
|
packet = {cmd:'write:happy', data:'data to write'}
|
||||||
console.log(packet)
|
console.log(packet)
|
||||||
await fio.send(packet)
|
await fio.send(packet)
|
||||||
|
packet = {cmd:'write:sad', data:'data to write'}
|
||||||
|
console.log(packet)
|
||||||
|
await fio.send(packet)
|
||||||
|
|
||||||
|
delay(3000)
|
||||||
|
// fio.amendConsumerProcessing({
|
||||||
|
// reply: packet => {
|
||||||
|
// console.log('==============Amended Default Packet Replay for Consumer=========')
|
||||||
|
// console.dir(packet)
|
||||||
|
// console.log('===========================')
|
||||||
|
// }
|
||||||
|
// })
|
||||||
|
|
||||||
|
|
||||||
// await app.send(packet)
|
// await app.send(packet)
|
||||||
// await delay(500)
|
// await delay(500)
|
||||||
// app.amendPacketContext(
|
// app.amendPacketContext(
|
||||||
|
|
117
src/base.mjs
117
src/base.mjs
|
@ -1,15 +1,17 @@
|
||||||
// import { Socket, Consumer } from '@uci/socket'
|
// import { Socket, Consumer } from '@uci/socket'
|
||||||
import UCISocket from '../../uci-socket/src'
|
import UCISocket from '../../uci-socket/src'
|
||||||
import defaultProcessing from './default-processing.mjs'
|
|
||||||
import EventEmitter from 'events'
|
import EventEmitter from 'events'
|
||||||
|
|
||||||
|
import { processor, commands } from './processing.mjs'
|
||||||
|
|
||||||
export default class Base extends EventEmitter {
|
export default class Base extends EventEmitter {
|
||||||
constructor(opts={}) {
|
constructor(opts={}) {
|
||||||
super()
|
super()
|
||||||
this.id = opts.id || opts.name || 'uci-base:'+ new Date().getTime()
|
this.id = opts.id || opts.name || 'uci-base:'+ new Date().getTime()
|
||||||
this.desc = opts.desc // additional details for humans
|
this.desc = opts.desc // additional details for humans
|
||||||
this.socket={}
|
this.socket={}
|
||||||
this._default = defaultProcessing
|
this._processors = { _default: processor }
|
||||||
|
this._defaultCmds = commands
|
||||||
opts.sockets.split(/[,|\s]+/).forEach( socketStr => {
|
opts.sockets.split(/[,|\s]+/).forEach( socketStr => {
|
||||||
let socket = {}
|
let socket = {}
|
||||||
socketStr.split(/[>#]+/).map(function(prop,index) {
|
socketStr.split(/[>#]+/).map(function(prop,index) {
|
||||||
|
@ -19,7 +21,7 @@ export default class Base extends EventEmitter {
|
||||||
if (socket.transport ==='n') opts[socket.name].np = true
|
if (socket.transport ==='n') opts[socket.name].np = true
|
||||||
opts[socket.name].id = this.id +':'+ socket.name
|
opts[socket.name].id = this.id +':'+ socket.name
|
||||||
// console.log(TRANSLATIONS[socket.type])
|
// console.log(TRANSLATIONS[socket.type])
|
||||||
this.socket[socket.name] = new UCISocket[TRANSLATIONS[socket.type]](opts[socket.name])
|
this.socket[socket.name] = new UCISocket[TRANSLATE[socket.type]](opts[socket.name])
|
||||||
// console.log(socket.name, this.socket[socket.name].send)
|
// console.log(socket.name, this.socket[socket.name].send)
|
||||||
Object.assign(this.socket[socket.name],socket) // copy socket info props to new socket
|
Object.assign(this.socket[socket.name],socket) // copy socket info props to new socket
|
||||||
this.socket[socket.name]._packetProcess = this._packetProcess.bind(this,socket.name)
|
this.socket[socket.name]._packetProcess = this._packetProcess.bind(this,socket.name)
|
||||||
|
@ -57,58 +59,75 @@ export default class Base extends EventEmitter {
|
||||||
this.emit('packet', packet) // emits on instance for instance use
|
this.emit('packet', packet) // emits on instance for instance use
|
||||||
}
|
}
|
||||||
|
|
||||||
// registerPacketProcessor(name, func) {
|
|
||||||
// if (typeof name === 'string') {
|
|
||||||
// if (this.socket[name]) await this.socket[name]._packetProcess = func
|
|
||||||
// }
|
|
||||||
// else {
|
|
||||||
// func = name
|
|
||||||
// for(let name of Object.keys(this.socket)){
|
|
||||||
// this.socket[name].send(packet)
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
// registerConsumerProcessor(func) {
|
|
||||||
// this._packetProces = func
|
|
||||||
// }
|
|
||||||
// registerSocketProcessor(func) {
|
|
||||||
// this._packetProces = func
|
|
||||||
// }
|
|
||||||
|
|
||||||
getSocket(name) {return this.socket[name]}
|
getSocket(name) {return this.socket[name]}
|
||||||
|
|
||||||
|
getCmdFunc (cmd,obj) {
|
||||||
|
if (typeof cmd ==='string') {
|
||||||
|
if (typeof obj ==='string' || obj === null) cmd = obj+'.'+cmd
|
||||||
|
cmd=cmd.split(/[.:]+/)
|
||||||
|
obj = this
|
||||||
|
// console.log('===================',cmd, this.id)
|
||||||
|
}
|
||||||
|
var prop=cmd.shift()
|
||||||
|
if (cmd.length === 0) return obj[prop]
|
||||||
|
if(!obj[prop]) return null
|
||||||
|
// console.log(cmd.length,cmd,prop, obj[prop])
|
||||||
|
return this.getCmdFunc(cmd, obj[prop])
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
sendSocket(name) {}
|
sendSocket(name) {}
|
||||||
sendTransport(trans) {}
|
sendTransport(trans) {}
|
||||||
|
|
||||||
amendConsumerProcessing(funcs,trans) {
|
amendConsumerProcessing(funcs,trans) {
|
||||||
Object.assign(this._default.c[trans],funcs)
|
if (trans) {
|
||||||
|
if (!this._defaultCmds.c[trans]) this._defaultCmds.c[trans] ={}
|
||||||
|
Object.assign(this._defaultCmds.c[trans],funcs)
|
||||||
|
}
|
||||||
|
Object.assign(this._defaultCmds.c,funcs)
|
||||||
}
|
}
|
||||||
|
|
||||||
amendSocketProcessing(funcs,trans) {
|
amendSocketProcessing(funcs,trans) {
|
||||||
if (trans) {
|
if (trans) {
|
||||||
if (!this._default.s[trans]) this._default.s[trans] ={}
|
if (!this._defaultCmds.c[trans]) this._defaultCmds.c[trans] ={}
|
||||||
Object.assign(this._default.s[trans],funcs)
|
Object.assign(this._defaultCmds.c[trans],funcs)
|
||||||
}
|
}
|
||||||
Object.assign(this._default.s,funcs)
|
Object.assign(this._defaultCmds.c,funcs)
|
||||||
}
|
}
|
||||||
|
// use s: and c: keys TODO need to change this
|
||||||
amendNamedProcessing(name,funcs) {
|
addNamedProcessing(name,funcs,type) {
|
||||||
if(!this._default[name]) this._default[name] ={}
|
if (type){
|
||||||
Object.assign(this._default[name],funcs)
|
if(!this._cmds[name][type]) this._cmds[name][type] = {}
|
||||||
|
Object.assign(this._cmds[name][type],funcs)
|
||||||
|
} else {
|
||||||
|
if(!this._cmds[name]) this._cmds[name] ={}
|
||||||
|
Object.assign(this._cmds[name],funcs)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
beforeHook (type,funcs){}
|
beforeHook (type,funcs){}
|
||||||
afterHook(type,funcs){}
|
afterHook(type,funcs){}
|
||||||
|
|
||||||
addProcessing(name,type) {}
|
// here you can add namespaced functions for packet commands
|
||||||
|
consumersProcessor(func) {
|
||||||
consumerProcessor(func) {
|
for(let name of Object.keys(this.socket)){
|
||||||
this._default.c._process = func
|
if (this.socket[name].type ==='c') {
|
||||||
|
this.socketNamedProcessor(func,name)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
socketProcessor(func) {
|
socketsProcessor(func) {
|
||||||
this._default.s._process = func
|
for(let name of Object.keys(this.socket)){
|
||||||
|
if (this.socket[name].type ==='s') {
|
||||||
|
this.socketNamedProcessor(func,name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
socketNameProcessor(func,socket_name) {
|
||||||
|
socket_name = socket_name || '_default'
|
||||||
|
this._processors[socket_name]._process = func
|
||||||
}
|
}
|
||||||
|
|
||||||
packetProcessor(func) {
|
packetProcessor(func) {
|
||||||
|
@ -124,25 +143,17 @@ export default class Base extends EventEmitter {
|
||||||
_transport(name) {return this.socket[name].transport}
|
_transport(name) {return this.socket[name].transport}
|
||||||
_type(name) {return this.socket[name].type}
|
_type(name) {return this.socket[name].type}
|
||||||
|
|
||||||
// default packet processor for all sockets
|
/*
|
||||||
async _packetProcess (sname,packet) {
|
**********default packet processor for all sockets
|
||||||
return this._default[this.socket[sname].type]._process.bind(this)(packet,sname)
|
*/
|
||||||
|
|
||||||
|
async _packetProcess (socket_name,packet) {
|
||||||
|
let processor = packet._processor || this._processors[socket_name] || '_default'
|
||||||
|
return await this._processors[processor].bind(this)(packet,socket_name,this._processors[processor])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
} // end class
|
} // end class
|
||||||
|
|
||||||
const SOCKET_INFO_KEYS = ['name','type','transport']
|
const SOCKET_INFO_KEYS = ['name','type','transport']
|
||||||
const TRANSLATIONS = {t:'TCP',s:'Socket',c:'Consumer',n:'Named Pipe',}
|
const TRANSLATE= {n:'Named Pipe',t:'TCP',s:'Socket',c:'Consumer'}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// this.on('packet', packet => {
|
|
||||||
// console.log('incoming packet ==========>',packet)
|
|
||||||
// app[TRANSLATIONS[packet._socket.type]].bind(app)(packet)
|
|
||||||
// })
|
|
||||||
//
|
|
||||||
// // console.log('IN EMIT FUNCTION',socket_name,packet)
|
|
||||||
// // let s = this.socket[socket_name]
|
|
||||||
// // packet._socket = {name:s.name,type:s.type,transport:s.transport}
|
|
||||||
// // this.emit('packet', packet)
|
|
||||||
|
|
|
@ -0,0 +1,85 @@
|
||||||
|
|
||||||
|
|
||||||
|
// this._processing refers to this module/hash
|
||||||
|
|
||||||
|
const processor = async function (packet,socket) {
|
||||||
|
return await process[this.getSocket(socket).type].bind(this)(packet)
|
||||||
|
}
|
||||||
|
|
||||||
|
export { processor, commands }
|
||||||
|
|
||||||
|
const process = {
|
||||||
|
s: async function (packet) {
|
||||||
|
// add namepaces by unshifting them onto look_in
|
||||||
|
// console.log('in default socket processor',packet.cmd)
|
||||||
|
if (!packet.cmd) return {error: 'no command in packet', packet: packet }
|
||||||
|
let cmd_func = null
|
||||||
|
let namespaces = ['s',null,'_defaultCmds.s']
|
||||||
|
namespaces.some( namespace => {
|
||||||
|
cmd_func = this.getCmdFunc(namespace+'.'+packet.cmd)
|
||||||
|
if (cmd_func) return true
|
||||||
|
})
|
||||||
|
if (cmd_func) return cmd_func.bind(this)(packet)
|
||||||
|
return {error: 'no socket processing function supplied for command', packet: packet }
|
||||||
|
},
|
||||||
|
|
||||||
|
c: async function (packet) {
|
||||||
|
// console.log('in default consumer processor',packet.cmd)
|
||||||
|
if (packet.error) this._defaultCmds.c.error(packet)
|
||||||
|
if (packet.cmd) {
|
||||||
|
// move namespaces to class constructor and add unshift method for adding
|
||||||
|
let namespaces = ['c.'+packet.cmd,packet.cmd,'_defaultCmds.c.'+packet.cmd]
|
||||||
|
let cmd_func = false
|
||||||
|
namespaces.forEach( location => {
|
||||||
|
cmd_func = this.getCmdFunc(location)
|
||||||
|
if(cmd_func)return
|
||||||
|
})
|
||||||
|
if (cmd_func) return cmd_func.bind(this)(packet)
|
||||||
|
packet = {error:'no consumer processing function supplied for command',packet:packet}
|
||||||
|
this._defaultCmds.c.error(packet)
|
||||||
|
} else {
|
||||||
|
packet = {error:'no command in packet',packet:packet}
|
||||||
|
this._defaultCmds.c.error(packet)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
*
|
||||||
|
* Default packed command processing
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
|
||||||
|
const commands ={
|
||||||
|
s:{
|
||||||
|
echo: async packet => {
|
||||||
|
return new Promise( async (resolve) => {
|
||||||
|
packet.processed = true
|
||||||
|
packet.cmd = 'reply'
|
||||||
|
packet.info = 'default socket echo'
|
||||||
|
return resolve(packet)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
},
|
||||||
|
c:{
|
||||||
|
error: function (packet) {
|
||||||
|
console.log('==============Packet ERROR==========')
|
||||||
|
console.log(packet.error )
|
||||||
|
console.dir(packet.packet)
|
||||||
|
console.log('===========================')
|
||||||
|
},
|
||||||
|
reply: function(packet) {
|
||||||
|
console.log('==============Packet returned from socket==========')
|
||||||
|
console.dir(packet)
|
||||||
|
console.log('===========================')
|
||||||
|
this.amendConsumerProcessing({
|
||||||
|
reply: function (packet) {
|
||||||
|
console.log('==============Amended Default Packet Replay for Consumer=========')
|
||||||
|
console.dir(packet)
|
||||||
|
console.log('===========================')
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue