From d0173c045f543ee377d7b97e86398689637c1529 Mon Sep 17 00:00:00 2001 From: David Kebler Date: Tue, 13 Feb 2018 14:19:18 -0800 Subject: [PATCH] With changes in socket module now send is async and returns the response from the socket. send now transport send now process as many consumer sockets as have been created by pushing them onto array and the using promise.all and map to send out together. Return for one than one is an array of responses. getPacketByName was added to make it easy to grab just one of those in returned array. --- examples/four-in-one.mjs | 46 ++++++++----------- package.json | 6 ++- src/base.mjs | 29 ++++++++++-- ...ault-processing.mjs => processing.mjs.org} | 3 +- 4 files changed, 50 insertions(+), 34 deletions(-) rename src/{default-processing.mjs => processing.mjs.org} (92%) diff --git a/examples/four-in-one.mjs b/examples/four-in-one.mjs index 5a7bd91..743780e 100644 --- a/examples/four-in-one.mjs +++ b/examples/four-in-one.mjs @@ -1,23 +1,19 @@ import Base from '../src/base' -const USOCKET = __dirname + '/sample.sock' +// const USOCKET = __dirname + '/sample.sock' const socketfuncs = { write: { happy: function(packet){ return new Promise( async (resolve) => { let res = {} - res.req = packet - res.cmd='reply' - res.response='Im a happy puppy :)' + res.response=packet.data+ ' and Im a happy puppy :)' return resolve(res) }) }, sad: function(packet){ return new Promise( async (resolve) => { let res = {} - res.req = packet - res.cmd='reply' res.response='Im a sad dog :(' return resolve(res) }) @@ -32,16 +28,14 @@ const tcpfuncs = { happy: function(packet){ return new Promise( async (resolve) => { let res = {} - res.req = packet res.cmd='reply' - res.response='Im a TCP happy puppy :)' + res.response=packet.data+ ' and Im a TCP happy puppy :)' return resolve(res) }) }, sad: function(packet){ return new Promise( async (resolve) => { let res = {} - res.req = packet res.cmd='reply' res.response='Im a TCP sad dog :(' return resolve(res) @@ -51,24 +45,25 @@ const tcpfuncs = { } -const delay = time => new Promise(res=>setTimeout(()=>res(),time)) +// const delay = time => new Promise(res=>setTimeout(()=>res(),time)) ; (async () => { - let fio = new Base({sockets:'uc#c>n,us#s>n,tc#c>t,ts#s>t', id:'four-in-one'}) + let fio = new Base({sockets:'uc#c>n,us#s>n,tc#c>t,ts#s>t,tc2#c>t', id:'four-in-one'}) // let fio = new Base({sockets:'uc#c>n,us#s>n', id:'four-in-one'}) await fio.init() fio.s = socketfuncs fio.st = tcpfuncs fio.ct = {reply: packet =>{ console.log('==============Packet Displayed for TCP consumer received packets only') - console.dir(packet) + console.dir(packet.response) + console.log('===========================') }} fio.cn = { reply: function (packet) { console.log('==============Replay for only Named Pipe Consumer=========') - console.dir(packet) + console.dir(packet.response) console.log('===========================') } } @@ -94,24 +89,23 @@ const delay = time => new Promise(res=>setTimeout(()=>res(),time)) console.log('=============sending============') // packet = {cmd:'echo', data:'some data to echo'} // console.log(packet) - // await fio.send(packet) - // packet = {cmd:'write:happy', data:'data to write'} - // console.log(packet) - // await fio.send(packet) - // packet = {cmd:'write:sad', data:'data to write'} - // console.log(packet) - // await fio.send(packet) - // packet = {cmd:'write:sad', data:'sent only via tcp'} - // console.log(packet) - // await fio.sendTCP(packet) - packet = {cmd:'good:bad', data:'data sent that has no corresponding function'} + // await fio.send(packet,'uc') + packet = {cmd:'write:happy', data:'My name is Zoe'} + console.log(packet) + //console.log(await fio.send(packet)) + console.log(fio.getPacketByName('uc',await fio.send(packet)).response) + packet = {cmd:'write:sad', data:'data to write'} + console.log(packet) + await fio.send(packet) + packet = {cmd:'write:sad', data:'sent only via tcp'} + console.log(packet) + console.log(fio.getPacketByName('tc2',await fio.sendTCP(packet))) + packet = {cmd:'good:bad'} console.log(packet) await fio.send(packet) - await delay(2000) process.kill(process.pid, 'SIGTERM') - })().catch(err => { console.error('FATAL: UNABLE TO START SYSTEM!\n',err) process.kill(process.pid, 'SIGTERM') diff --git a/package.json b/package.json index 189ccaf..139963b 100644 --- a/package.json +++ b/package.json @@ -4,7 +4,8 @@ "description": "Mutli Level/Transport Message/Event Classes", "main": "src/base", "scripts": { - "deve": "./node_modules/.bin/nodemon -r @std/esm -e mjs examples/four-in-one", + "deve": "SOCKETS_DIR=/opt/sockets ./node_modules/.bin/nodemon -r @std/esm -e mjs examples/four-in-one", + "fio": "SOCKETS_DIR=/opt/sockets node -r @std/esm examples/four-in-one", "testw": "mocha -r @std/esm test/*.test.mjs --watch --recurse --watch-extensions mjs", "test": "mocha -r @std/esm test/*.test.mjs", "testci": "istanbul cover ./node_modules/.bin/_mocha --report lcovonly -- -R spec --recursive && codecov || true" @@ -32,7 +33,8 @@ "chai-as-promised": "^7.1.1", "codecov": "^3.0.0", "istanbul": "^0.4.5", - "mocha": "^4.0.1" + "mocha": "^4.0.1", + "nodemon": "^1.14.12" }, "dependencies": { "@uci/socket": "^0.1.0" diff --git a/src/base.mjs b/src/base.mjs index f0f82a0..280f7a2 100644 --- a/src/base.mjs +++ b/src/base.mjs @@ -45,35 +45,55 @@ export default class Base extends EventEmitter { } // init - // TODO make some kind of ability to wait for reply if needed before sending any more requests async send (packet,name) { if (name) { if (this.socket[name]) await this.socket[name].send(packet) } else { + let sends = [] for(let name of Object.keys(this.socket)){ if (this.socket[name].type ==='c') { - await this.socket[name].send(packet) + // console.log(name) + sends.push(this.socket[name].send.bind(this.socket[name])) } } + // console.log(sends.map(send => {return send(packet)})) + if (sends.length === 1) return sends[0](packet) + return Promise.all(sends.map(send => {return send(packet)})) } this.emit('packet', packet) // emits on instance for instance use } async sendTransport(packet,transport) { + let sends = [] for(let name of Object.keys(this.socket)){ if (this.socket[name].type ==='c') { - if (this.socket[name].transport === transport) return await this.socket[name].send(packet) + if (this.socket[name].transport === transport) { + sends.push(this.socket[name].send.bind(this.socket[name])) + } } } + if (sends.length === 1) return sends[0](packet) + return Promise.all(sends.map(send => {return send(packet)})) } async sendTCP(packet) {return this.sendTransport(packet,'t')} async sendIPC(packet) {return this.sendTransport(packet,'n')} - getSocket(name) {return this.socket[name]} + getPacketByName(name, packets) { + if (!packets.length) packets = [packets] + let found = {} + packets.some((packet,index,packets) => { + if (packet._header.sender.name === name) { + found = packets[index] + return true + } + }) + return found + } + amendConsumerProcessing(funcs,trans) { if (trans) { if (!this._defaultCmds.c[trans]) this._defaultCmds.c[trans] ={} @@ -184,6 +204,7 @@ export default class Base extends EventEmitter { */ async _packetProcess (socket_name,packet) { + // console.log(socket_name,packet) let processor = packet._processor || this._processors[socket_name] || '_default' return await this._processors[processor].bind(this)(packet,socket_name,this._processors[processor]) } diff --git a/src/default-processing.mjs b/src/processing.mjs.org similarity index 92% rename from src/default-processing.mjs rename to src/processing.mjs.org index f64158a..37523e9 100644 --- a/src/default-processing.mjs +++ b/src/processing.mjs.org @@ -1,6 +1,6 @@ -// this._default refers to this module/hash +// this._processing refers to this module/hash export default { s:{ // s is for socket/server @@ -30,7 +30,6 @@ export default { // TODO before hook here. // TODO Add namespce if (this._default[sname]) if (this._default[sname][packet.cmd]) return await this._default[sname][packet.cmd](packet) - // if (this.app[sname]) if (this.app[sname][packet.cmd]) return await this.app[sname][packet.cmd](packet) if (this[packet.cmd]) return await this[packet.cmd](packet) if (this._default.c[packet.cmd]) return await this._default.c[packet.cmd](packet) // TODO after hook here