With changes in socket module now send is async and returns the response from the socket.

send now transport send now process as many consumer sockets as have been created by pushing them onto array and the using promise.all and map to send out together.
Return for one than one is an array of responses.
getPacketByName was added to make it easy to grab just one of those in returned array.
This commit is contained in:
David Kebler 2018-02-13 14:19:18 -08:00
parent b938647efe
commit d0173c045f
4 changed files with 50 additions and 34 deletions

View file

@ -1,23 +1,19 @@
import Base from '../src/base'
const USOCKET = __dirname + '/sample.sock'
// const USOCKET = __dirname + '/sample.sock'
const socketfuncs = {
write: {
happy: function(packet){
return new Promise( async (resolve) => {
let res = {}
res.req = packet
res.cmd='reply'
res.response='Im a happy puppy :)'
res.response=packet.data+ ' and Im a happy puppy :)'
return resolve(res)
})
},
sad: function(packet){
return new Promise( async (resolve) => {
let res = {}
res.req = packet
res.cmd='reply'
res.response='Im a sad dog :('
return resolve(res)
})
@ -32,16 +28,14 @@ const tcpfuncs = {
happy: function(packet){
return new Promise( async (resolve) => {
let res = {}
res.req = packet
res.cmd='reply'
res.response='Im a TCP happy puppy :)'
res.response=packet.data+ ' and Im a TCP happy puppy :)'
return resolve(res)
})
},
sad: function(packet){
return new Promise( async (resolve) => {
let res = {}
res.req = packet
res.cmd='reply'
res.response='Im a TCP sad dog :('
return resolve(res)
@ -51,24 +45,25 @@ const tcpfuncs = {
}
const delay = time => new Promise(res=>setTimeout(()=>res(),time))
// const delay = time => new Promise(res=>setTimeout(()=>res(),time))
;
(async () => {
let fio = new Base({sockets:'uc#c>n,us#s>n,tc#c>t,ts#s>t', id:'four-in-one'})
let fio = new Base({sockets:'uc#c>n,us#s>n,tc#c>t,ts#s>t,tc2#c>t', id:'four-in-one'})
// let fio = new Base({sockets:'uc#c>n,us#s>n', id:'four-in-one'})
await fio.init()
fio.s = socketfuncs
fio.st = tcpfuncs
fio.ct = {reply: packet =>{
console.log('==============Packet Displayed for TCP consumer received packets only')
console.dir(packet)
console.dir(packet.response)
console.log('===========================')
}}
fio.cn = {
reply: function (packet) {
console.log('==============Replay for only Named Pipe Consumer=========')
console.dir(packet)
console.dir(packet.response)
console.log('===========================')
}
}
@ -94,24 +89,23 @@ const delay = time => new Promise(res=>setTimeout(()=>res(),time))
console.log('=============sending============')
// packet = {cmd:'echo', data:'some data to echo'}
// console.log(packet)
// await fio.send(packet)
// packet = {cmd:'write:happy', data:'data to write'}
// console.log(packet)
// await fio.send(packet)
// packet = {cmd:'write:sad', data:'data to write'}
// console.log(packet)
// await fio.send(packet)
// packet = {cmd:'write:sad', data:'sent only via tcp'}
// console.log(packet)
// await fio.sendTCP(packet)
packet = {cmd:'good:bad', data:'data sent that has no corresponding function'}
// await fio.send(packet,'uc')
packet = {cmd:'write:happy', data:'My name is Zoe'}
console.log(packet)
//console.log(await fio.send(packet))
console.log(fio.getPacketByName('uc',await fio.send(packet)).response)
packet = {cmd:'write:sad', data:'data to write'}
console.log(packet)
await fio.send(packet)
packet = {cmd:'write:sad', data:'sent only via tcp'}
console.log(packet)
console.log(fio.getPacketByName('tc2',await fio.sendTCP(packet)))
packet = {cmd:'good:bad'}
console.log(packet)
await fio.send(packet)
await delay(2000)
process.kill(process.pid, 'SIGTERM')
})().catch(err => {
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)
process.kill(process.pid, 'SIGTERM')

View file

@ -4,7 +4,8 @@
"description": "Mutli Level/Transport Message/Event Classes",
"main": "src/base",
"scripts": {
"deve": "./node_modules/.bin/nodemon -r @std/esm -e mjs examples/four-in-one",
"deve": "SOCKETS_DIR=/opt/sockets ./node_modules/.bin/nodemon -r @std/esm -e mjs examples/four-in-one",
"fio": "SOCKETS_DIR=/opt/sockets node -r @std/esm examples/four-in-one",
"testw": "mocha -r @std/esm test/*.test.mjs --watch --recurse --watch-extensions mjs",
"test": "mocha -r @std/esm test/*.test.mjs",
"testci": "istanbul cover ./node_modules/.bin/_mocha --report lcovonly -- -R spec --recursive && codecov || true"
@ -32,7 +33,8 @@
"chai-as-promised": "^7.1.1",
"codecov": "^3.0.0",
"istanbul": "^0.4.5",
"mocha": "^4.0.1"
"mocha": "^4.0.1",
"nodemon": "^1.14.12"
},
"dependencies": {
"@uci/socket": "^0.1.0"

View file

@ -45,35 +45,55 @@ export default class Base extends EventEmitter {
} // init
// TODO make some kind of ability to wait for reply if needed before sending any more requests
async send (packet,name) {
if (name) {
if (this.socket[name]) await this.socket[name].send(packet)
}
else {
let sends = []
for(let name of Object.keys(this.socket)){
if (this.socket[name].type ==='c') {
await this.socket[name].send(packet)
// console.log(name)
sends.push(this.socket[name].send.bind(this.socket[name]))
}
}
// console.log(sends.map(send => {return send(packet)}))
if (sends.length === 1) return sends[0](packet)
return Promise.all(sends.map(send => {return send(packet)}))
}
this.emit('packet', packet) // emits on instance for instance use
}
async sendTransport(packet,transport) {
let sends = []
for(let name of Object.keys(this.socket)){
if (this.socket[name].type ==='c') {
if (this.socket[name].transport === transport) return await this.socket[name].send(packet)
if (this.socket[name].transport === transport) {
sends.push(this.socket[name].send.bind(this.socket[name]))
}
}
}
if (sends.length === 1) return sends[0](packet)
return Promise.all(sends.map(send => {return send(packet)}))
}
async sendTCP(packet) {return this.sendTransport(packet,'t')}
async sendIPC(packet) {return this.sendTransport(packet,'n')}
getSocket(name) {return this.socket[name]}
getPacketByName(name, packets) {
if (!packets.length) packets = [packets]
let found = {}
packets.some((packet,index,packets) => {
if (packet._header.sender.name === name) {
found = packets[index]
return true
}
})
return found
}
amendConsumerProcessing(funcs,trans) {
if (trans) {
if (!this._defaultCmds.c[trans]) this._defaultCmds.c[trans] ={}
@ -184,6 +204,7 @@ export default class Base extends EventEmitter {
*/
async _packetProcess (socket_name,packet) {
// console.log(socket_name,packet)
let processor = packet._processor || this._processors[socket_name] || '_default'
return await this._processors[processor].bind(this)(packet,socket_name,this._processors[processor])
}

View file

@ -1,6 +1,6 @@
// this._default refers to this module/hash
// this._processing refers to this module/hash
export default {
s:{ // s is for socket/server
@ -30,7 +30,6 @@ export default {
// TODO before hook here.
// TODO Add namespce
if (this._default[sname]) if (this._default[sname][packet.cmd]) return await this._default[sname][packet.cmd](packet)
// if (this.app[sname]) if (this.app[sname][packet.cmd]) return await this.app[sname][packet.cmd](packet)
if (this[packet.cmd]) return await this[packet.cmd](packet)
if (this._default.c[packet.cmd]) return await this._default.c[packet.cmd](packet)
// TODO after hook here