moved default processing to constructor

push creates and connects to array and use promise.all
cleanup amend and register functions
alter send function and add sendTCP and sendIPC
This commit is contained in:
David Kebler 2018-01-30 21:12:38 -08:00
parent 3c1340baf0
commit 9679f61aae
2 changed files with 71 additions and 38 deletions

View file

@ -1,16 +1,32 @@
import Base from '../src/base'
const USOCKET = __dirname + '/sample.sock'
const basefuncs = {
write: function(packet){
packet.cmd='log'
packet.response='return of write command'
return packet
},
write2: function(packet){
packet.cmd='log'
packet.response='return of write2 command'
return packet
}
}
;
(async () => {
const delay = time => new Promise(res=>setTimeout(()=>res(),time))
// let app = new Base({com:'us,uc,ts,tc', id:'example', path: USOCKET, log:false})
let app = new Base({sockets:'us,uc,ts,tc', id:'example', log:false})
let app = new Base({sockets:'uc,us,tc,ts', id:'example', log:false})
app.amendPacketProcessing(basefuncs)
await app.init()
console.log('after init')
app.amendPacketProcessing('tc',{
log: packet => {
console.log('==============Packet returned to TCP consumer==========')
@ -29,15 +45,14 @@ const USOCKET = __dirname + '/sample.sock'
})
let packet = {}
console.log('ready=============sending============')
// packet = {cmd:'echo', data:'some data to echo'}
// await app.send(packet)
console.log('=============sending============')
packet = {cmd:'echo', data:'some data to echo'}
await app.send('tc',packet)
Object.assign(app,unixfuncs)
app.registerPacketContext('ts',tcpfuncs)
packet = {cmd:'write', data:'data to write'}
await app.send(packet)
await delay(1000)
await delay(500)
app.amendPacketContext(
{write: function(packet){
packet.cmd='log'
@ -45,12 +60,12 @@ const USOCKET = __dirname + '/sample.sock'
return packet
}}
)
await delay(1000)
await delay(500)
packet = {cmd:'write', data:'2ND data to write'}
await app.send(packet)
await app.sendIPC(packet)
packet = {cmd:'write2', data:'data to write'}
await app.send(packet)
await app.sendTCP(packet)
//
await delay(2000)
process.kill(process.pid, 'SIGTERM')
@ -60,18 +75,7 @@ const USOCKET = __dirname + '/sample.sock'
process.kill(process.pid, 'SIGTERM')
})
const unixfuncs = {
write: function(packet){
packet.cmd='log'
packet.response='return of write command'
return packet
},
write2: function(packet){
packet.cmd='log'
packet.response='return of write2 command'
return packet
}
}
const tcpfuncs = {
write: function(packet){

View file

@ -1,5 +1,5 @@
// import { Socket, Consumer } from '@uci/socket'
import { Socket, Consumer } from '../../uci-socket/src'
import { Socket, Consumer } from '@uci/socket'
// import { Socket, Consumer } from '../../uci-socket/src'
import packet from './packet.mjs'
import EventEmitter from 'events'
@ -23,42 +23,47 @@ export default class Base extends EventEmitter {
'us':
opts.us.path = opts.us.path || opts.path
this.socket[sock] = new Socket(opts.us.path,opts)
this.socket[sock].packet = Object.assign({},packet.socket)
break
case 'uc':
opts.uc.path = opts.uc.path || opts.path
this.socket[sock] = new Consumer(opts.uc.path,opts)
this.socket[sock].packet = Object.assign({},packet.consumer)
break
case 'ts':
this.socket[sock]= new Socket(opts.ts, opts)
this.socket[sock].packet = Object.assign({},packet.socket)
break
case 'tc':
this.socket[sock]= new Consumer(opts.tc,opts)
this.socket[sock].packet = Object.assign({},packet.consumer)
}
})
} // end constructor
async init (context) {
context = context || this // context is Base for all if not supplied
let sockets = []
context = context || this // additional context is Base instance (or callsite) if not supplied
this.registerPacketContext(context)
for(let type of Object.keys(this.socket)){
if (type.indexOf('s')!==-1) {
this.socket[type].packet = Object.assign({},packet.socket) // default packet processing
await this.socket[type].create(context)
// setTimeout(context =>{this.socket[type].create(context)},500)
sockets.push(this.socket[type].create())
// setTimeout(this.socket[type].create,200) // test for consumer retry for delayed socket creation
}
else {
this.socket[type].packet = Object.assign({},packet.consumer) // default packet processing
await this.socket[type].connect(context)
sockets.push(this.socket[type].connect())
}
}
await Promise.all(sockets)
} // init
registerPacketContext(type, context) {
if (typeof type === 'string') {
this.socket[type].registerPacketContext(Object.assign({},context))
this.socket[type].packet.context = Object.assign({},context)
} else {
context=type
for(let type of Object.keys(this.socket)){
this.socket[type].registerPacketContext(context)
this.socket[type].packet.context = Object.assign({},context)
}
}
}
@ -75,7 +80,14 @@ export default class Base extends EventEmitter {
}
amendPacketProcessing(type,funcs) {
this.socket[type].packet = Object.assign({},this.socket[type].packet,funcs)
if (typeof type === 'string') {
this.socket[type].packet = Object.assign({},this.socket[type].packet,funcs)
} else {
funcs=type
for(let type of Object.keys(this.socket)){
this.socket[type].packet = Object.assign(this.socket[type].packet,funcs)
}
}
}
registerPacketProcessor(type,func) {
@ -89,10 +101,27 @@ export default class Base extends EventEmitter {
}
}
async send (packet) {
this.emit(packet.cmd, packet)
if (this.socket.uc) await this.socket.uc.send(packet)
if (this.socket.tc) await this.socket.tc.send(packet)
async sendTCP(packet) {
await this.socket.tc.send(packet)
}
async sendIPC(packet) {
await this.socket.uc.send(packet)
}
async send (transport, packet) {
if (typeof transport === 'string') {
if (this.socket[transport]) await this.socket.uc.send(packet)
}
else {
packet = transport
for(let type of Object.keys(this.socket)){
if (type.indexOf('c')!==-1) {
await this.socket[type].send(packet)
}
}
}
this.emit('packet', packet) // intra process, need to set a 'packet' listener to process
}
} // end class