fixed separation of packet properties for each socket type when customizing

add amendPacket.. and changed registerPacket to accept single type changes

The four-in-one example now using amend and register sucessfully
master
David Kebler 2018-01-29 21:51:13 -08:00
parent a83646c4f6
commit 3c1340baf0
5 changed files with 115 additions and 39 deletions

View File

@ -9,7 +9,6 @@ module.exports = {
"node": true,
"mocha": true
},
"parser": "babel-eslint",
"parserOptions": {
"ecmaVersion": 2017
,"sourceType": "module"

View File

@ -1,33 +1,53 @@
import Base from '../src/base'
const USOCKET = __dirname + '/sample.sock'
const delay = time => new Promise(res=>setTimeout(()=>res(),time))
;
(async () => {
// let app = new Base({com:'us,uc,ts,tc', id:'example', path: USOCKET, log:false})
let app = new Base({com:'us,uc,ts,tc', id:'example'})
const delay = time => new Promise(res=>setTimeout(()=>res(),time))
// let app = new Base({com:'us,uc,ts,tc', id:'example', path: USOCKET, log:false})
let app = new Base({sockets:'us,uc,ts,tc', id:'example', log:false})
await app.init()
console.log('after init')
app.amendPacketProcessing('tc',{
log: packet => {
console.log('==============Packet returned to TCP consumer==========')
console.dir(packet)
console.log('===========================')
}
})
app.registerPacketProcessor('ts',
async function (packet) {
packet.test = 'this went through custom tcp socket processor'
if (!packet.cmd) return {error: 'no command in packet', packet: packet }
if (this.context) if (this.context[packet.cmd]) return await this.context[packet.cmd].bind(this.context)(packet)
if (this[packet.cmd]) return await this[packet.cmd](packet)
return {error: 'no socket processing function supplied for command', packet: packet }
})
let packet = {}
console.log('ready=============sending============')
let packet = {cmd:'echo', data:'some data to echo'}
await app.send(packet)
app.write = function(packet){
packet.cmd='log'
packet.response='return of write command'
return packet
}
app.write2 = function(packet){
packet.cmd='log'
packet.response='return of write2 command'
return packet
}
app.registerPacketContext(app)
// packet = {cmd:'echo', data:'some data to echo'}
// await app.send(packet)
Object.assign(app,unixfuncs)
app.registerPacketContext('ts',tcpfuncs)
packet = {cmd:'write', data:'data to write'}
await app.send(packet)
await delay(1000)
app.amendPacketContext(
{write: function(packet){
packet.cmd='log'
packet.response='return of AMMEDED write command'
return packet
}}
)
await delay(1000)
packet = {cmd:'write', data:'2ND data to write'}
await app.send(packet)
packet = {cmd:'write2', data:'data to write'}
await app.send(packet)
@ -37,4 +57,33 @@ const delay = time => new Promise(res=>setTimeout(()=>res(),time))
})().catch(err => {
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)
process.kill(process.pid, 'SIGTERM')
})
const unixfuncs = {
write: function(packet){
packet.cmd='log'
packet.response='return of write command'
return packet
},
write2: function(packet){
packet.cmd='log'
packet.response='return of write2 command'
return packet
}
}
const tcpfuncs = {
write: function(packet){
packet.cmd='log'
packet.response='return of write command'
packet.via = 'tcp write'
return packet
},
write2: function(packet){
packet.cmd='log'
packet.response='return of write2 command'
packet.via = 'tcp write2'
return packet
}
}

View File

@ -27,11 +27,9 @@
"@std/esm": "cjs",
"devDependencies": {
"@std/esm": "^0.18.0",
"babel-eslint": "^8.2.1",
"chai": "^4.1.2",
"chai-as-promised": "^7.1.1",
"codecov": "^3.0.0",
"eslint": "^3.19.0",
"istanbul": "^0.4.5",
"mocha": "^4.0.1"
},

View File

@ -1,9 +1,12 @@
import { Socket, Consumer } from '@uci/socket'
// import { Socket, Consumer } from '@uci/socket'
import { Socket, Consumer } from '../../uci-socket/src'
import packet from './packet.mjs'
import EventEmitter from 'events'
const USOCKET = __dirname + '/unix.sock'
const delay = time => new Promise(res=>setTimeout(()=>res(),time))
export default class Base extends EventEmitter {
constructor(opts={}) {
super()
@ -12,52 +15,79 @@ export default class Base extends EventEmitter {
this.desc = opts.desc // additional details for humans
// attach unix socket OR consumer(default) only if path is supplied
this.socket={}
this.com = opts.com.split(/[,:|\s]+/)
this.com.forEach( sock => {
opts.sockets.split(/[,:|\s]+/).forEach( sock => {
opts.name = this.id +':'+sock
opts.type = sock
if (!opts[sock]) opts[sock] = {}
switch (sock) {
case
'us':
this.socket[sock] = new Socket(opts.path,opts)
opts.us.path = opts.us.path || opts.path
this.socket[sock] = new Socket(opts.us.path,opts)
break
case 'uc':
this.socket[sock] = new Consumer(opts.path,opts)
opts.uc.path = opts.uc.path || opts.path
this.socket[sock] = new Consumer(opts.uc.path,opts)
break
case 'ts':
this.socket[sock]= new Socket(opts)
this.socket[sock]= new Socket(opts.ts, opts)
break
case 'tc':
this.socket[sock]= new Consumer(opts)
this.socket[sock]= new Consumer(opts.tc,opts)
}
})
} // end constructor
async init (context) {
context = context || this // context is Base if not supplied
context = context || this // context is Base for all if not supplied
for(let type of Object.keys(this.socket)){
if (type.indexOf('s')!==-1) {
this.socket[type].packet = packet.socket
this.socket[type].packet = Object.assign({},packet.socket) // default packet processing
await this.socket[type].create(context)
// setTimeout(context =>{this.socket[type].create(context)},500)
}
else {
this.socket[type].packet = packet.consumer
this.socket[type].packet = Object.assign({},packet.consumer) // default packet processing
await this.socket[type].connect(context)
}
}
} // init
registerPacketContext(context) {
registerPacketContext(type, context) {
if (typeof type === 'string') {
this.socket[type].registerPacketContext(Object.assign({},context))
} else {
context=type
for(let type of Object.keys(this.socket)){
this.socket[type].registerPacketContext(context)
}
}
}
registerPacketProcessor(func) {
amendPacketContext(type, context) {
if (typeof type === 'string') {
Object.assign(this.socket[type].packet.context,context)
} else {
context=type
for(let type of Object.keys(this.socket)){
Object.assign(this.socket[type].packet.context,context)
}
}
}
amendPacketProcessing(type,funcs) {
this.socket[type].packet = Object.assign({},this.socket[type].packet,funcs)
}
registerPacketProcessor(type,func) {
if (typeof type === 'string') {
this.socket[type].packet._process = func
} else {
func = type
for(let type of Object.keys(this.socket)){
this.socket[type].registerPacketProcessor(func)
}
}
}
async send (packet) {
this.emit(packet.cmd, packet)

View File

@ -12,7 +12,7 @@ export default {
echo: packet => {
packet.processed = true
packet.cmd = 'log'
packet.info = 'echoed'
packet.info = 'default socket echo'
return packet
}
},