From 28768c8007bd34e1d52f1582d4d1fa91884c438e Mon Sep 17 00:00:00 2001 From: David Kebler Date: Mon, 12 Feb 2018 14:41:06 -0800 Subject: [PATCH] refactor send/write so send is async and will wait for reponse from socket. write now makes sure to drain before allowing another packet write refactored tests with new await send returning processed packet. --- examples/client.mjs | 27 +++-- examples/server.mjs | 23 ++-- package.json | 26 +++-- src/consumer.mjs | 50 ++++++-- src/json-stream.mjs | 8 +- src/socket.mjs | 35 ++++-- ...cket-default-overwrite.mjs => usocket.mjs} | 6 +- test/tcp.test.mjs | 67 +++++------ test/usocket-default-overwrite.test.mjs | 107 ------------------ test/usocket-default.test.mjs | 95 ++++++---------- test/usocket.test.mjs | 52 +++++++++ 11 files changed, 230 insertions(+), 266 deletions(-) rename test/sockets/{usocket-default-overwrite.mjs => usocket.mjs} (69%) delete mode 100644 test/usocket-default-overwrite.test.mjs create mode 100644 test/usocket.test.mjs diff --git a/examples/client.mjs b/examples/client.mjs index a612566..cca864b 100644 --- a/examples/client.mjs +++ b/examples/client.mjs @@ -1,9 +1,7 @@ import Consumer from '../src/consumer' -const USOCKET = __dirname + '/sample.sock' - -const client1= new Consumer({path:USOCKET,name:'example-consumer1' }) -const client2 = new Consumer({path:USOCKET,name:'example-consumer2'}) +const client1= new Consumer({np:true,name:'example-consumer1' }) +const client2 = new Consumer({np:true,name:'example-consumer2'}) let packet1 = {name: 'client1', cmd:'doit', data:'data sent by client1'} let packet2 = {name: 'client2', cmd:'doit', data:'data sent by client2'} @@ -13,18 +11,23 @@ const process = function (packet) { console.log(`Packet from ${packet.name} Processed by Socket: ${packet.status}`) } -client1.registerPacketProcessor(process) - -client2._packetProcess = process +// client1.registerPacketProcessor(process) +// +// client2._packetProcess = process ; (async () => { - await Promise.all([client1.connect(),client2.connect()]) - client1.send(packet1) - client2.send(packet2) - client1.end() - client2.end() + + + // await Promise.all([client1.connect(),client2.connect()]) + await client1.connect() + console.log('=========\n',await client1.send(packet1)) + + // client1.send(packet1) + // client2.send(packet2) + // client1.end() + // client2.end() })().catch(err => { console.error('FATAL: UNABLE TO START SYSTEM!\n',err) diff --git a/examples/server.mjs b/examples/server.mjs index 3e2907a..c0968e4 100644 --- a/examples/server.mjs +++ b/examples/server.mjs @@ -1,8 +1,6 @@ import { Socket } from '../src' -const USOCKET = __dirname + '/sample.sock' - -; + ; (async () => { class Test extends Socket { @@ -18,18 +16,21 @@ const USOCKET = __dirname + '/sample.sock' } async doit(data,name) { - let res = {} - console.log('data:', data) - res.status ='success' - res.name = name - res.cmd = 'reply' - res.data = 'this might be response data from another process' - return(res) + return new Promise(resolve => { + let res = {} + console.log('data:', data) + res.status ='success' + res.name = name + res.cmd = 'reply' + res.data = 'this would be response data from socket doit function' + resolve(res) + }) + } } - let test = new Test({path:USOCKET}) + let test = new Test({np:true}) await test.create() })().catch(err => { diff --git a/package.json b/package.json index d781a77..1a22134 100644 --- a/package.json +++ b/package.json @@ -1,17 +1,18 @@ { "name": "@uci/socket", "version": "0.1.0", - "description": "Bare bones intra Host Unix Socket for basic IPC on same machine", + "description": "JSON packet intra(named)/inter(TCP) host communication over socket", "main": "src", "scripts": { "testw": "mocha -r @std/esm test/*.test.mjs --watch --recurse --watch-extensions mjs", "test": "mocha -r @std/esm test/*.test.mjs", "testlog": "mocha -r @std/esm test/*.test.mjs | ./node_modules/.bin/bunyan", "testci": "istanbul cover ./node_modules/.bin/_mocha --report lcovonly -- -R spec --recursive && codecov || true", - "s": "node -r @std/esm examples/server | ./node_modules/.bin/bunyan", - "devs": "./node_modules/.bin/nodemon -r @std/esm -e mjs examples/server", - "c": "node -r @std/esm examples/client | ./node_modules/.bin/bunyan -o short", - "c2": "node -r @std/esm examples/client2 | ./node_modules/.bin/bunyan -o short" + "s": "SOCKETS_DIR=/opt/sockets DEBUG=true node -r @std/esm examples/server", + "devs": "SOCKETS_DIR=/opt/sockets DEBUG=true ./node_modules/.bin/nodemon -r @std/esm -e mjs examples/server", + "c": "SOCKETS_DIR=/opt/sockets node -r @std/esm examples/client", + "devc": "SOCKETS_DIR=/opt/sockets DEBUG=true node -r @std/esm examples/client", + "c2": "node -r @std/esm examples/client2" }, "author": "David Kebler", "license": "MIT", @@ -21,9 +22,14 @@ }, "keywords": [ "node.js", - "i2c", - "rpi", - "raspberrypi" + "socket", + "net", + "JSON", + "packet", + "serialize", + "named pipe", + "unix socket", + "TCP" ], "bugs": { "url": "https://github.com/uCOMmandIt/message/issues" @@ -36,11 +42,11 @@ "chai-as-promised": "^7.1.1", "codecov": "^3.0.0", "istanbul": "^0.4.5", - "mocha": "^4.0.1" + "mocha": "^4.0.1", + "nodemon": "^1.14.12" }, "dependencies": { "better-try-catch": "^0.6.2", - "bunyan": "^1.8.12", "death": "^1.1.0" } } diff --git a/src/consumer.mjs b/src/consumer.mjs index 9d6338c..4a2b4d9 100644 --- a/src/consumer.mjs +++ b/src/consumer.mjs @@ -1,6 +1,7 @@ import { Socket } from 'net' import btc from 'better-try-catch' import JsonStream from './json-stream' +import {promisify} from 'util' import logger from '../../uci-logger/src/logger' let log = {} @@ -34,6 +35,7 @@ export default class Consumer extends Socket { // bind to class for other class functions this.connect = this.connect.bind(this) this.__ready = this.__ready.bind(this) + // this._write = this._write.bind(this) } async connect () { @@ -85,15 +87,30 @@ export default class Consumer extends Socket { } + async send(packet) { - await this.write(await this.stream.serialize(packet)) - // TODO handle possible error - // TODO await response if required by setting id to packet - // then set a flag (and promise) that is resovled in the listener + return new Promise( async (resolve) => { + setTimeout(() => {resolve({error:'no response from socket in 10sec'})},10000) + packet._id = Math.random().toString().slice(2) + // console.log('sending to socket', packet.id) + let [err, res] = await btc(this.stream.serialize)(packet) + if (err) resolve({error:'unable to serialize packet for sending', packet:packet}) + log.info(await this.__write(res)) + this.once(packet._id,async function(reply){ + // console.log('reply emitted',reply) + this.removeAllListeners(reply.id) + delete reply._id + let res = await this._packetProcess(reply) + if (!res) { // if process was not promise returning like just logged to console + res = reply + log.warn('consumer function was not promise returning - resolving unprocessed') + } + resolve(res) + }) //end listener + }) } // TODO register alt stream processor (emit 'message' with JSON, serialize function, onData method for raw socket chucks) - // TODO register authenciation function (set up default) registerPacketProcessor (func) { @@ -102,6 +119,19 @@ export default class Consumer extends Socket { // PRIVATE METHODS + async __write(packet) { + // timeout already set if sockect can't be drained in 10 secs + return new Promise(resolve => { + const cb = () => resolve('packet written to socket stream') + if (!super.write(packet)) { + this.once('drain',cb ) + } else { + process.nextTick(cb) + } + + }) + } + __ready() {return this._ready} async _listen () { @@ -112,17 +142,19 @@ export default class Consumer extends Socket { if (packet._handshake) { this._ready = true return } - await this._packetProcess(packet) + // TODO send back ack with consumer ID and authorization and wait + // when authorized drop through here to emit + // console.log('incoming packet',packet) + this.emit(packet._id, packet) } } -// default packet process just a simple console logger + // default packet process just a simple console logger. ignores any cmd: prop _packetProcess (packet) { - console.log('default consumer processor -- packet from socket') + console.log('default consumer processor -- log packet from socket to console') console.dir(packet) } - } // end class diff --git a/src/json-stream.mjs b/src/json-stream.mjs index 6cf06e6..8357da8 100644 --- a/src/json-stream.mjs +++ b/src/json-stream.mjs @@ -13,10 +13,12 @@ export default class JsonStream extends EventEmitter{ this._buffer = '' this._delimeter = opts.delimiter || '#' this.onData = this.onData.bind(this) + this.serialize = this.serialize.bind(this) } onData (data) { + // console.log('a chunk arrived', data) data = decoder.write(data) try { this._handleData(data) @@ -32,8 +34,8 @@ export default class JsonStream extends EventEmitter{ let [err2,length] = btc(Buffer.byteLength)(messageData, 'utf8') if (err2) reject(err2) let data = length + this._delimeter + messageData - // console.log('serialized',data) - return resolve(data) + // console.log('serialized',data) + resolve(data) }) } @@ -41,7 +43,7 @@ export default class JsonStream extends EventEmitter{ this._buffer += data if (this._contentLength == null) { var i = this._buffer.indexOf(this._delimeter) - //Check if the buffer has a this._opts.delimeter or "#", if not, the end of the buffer string might be in the middle of a content length string + //Check if the buffer has a this._opts.delimeter or "#", if not, the end of the buffer string might be in the middle of a content length string if (i !== -1) { var rawContentLength = this._buffer.substring(0, i) this._contentLength = parseInt(rawContentLength) diff --git a/src/socket.mjs b/src/socket.mjs index 5500031..06a1e0d 100644 --- a/src/socket.mjs +++ b/src/socket.mjs @@ -36,7 +36,7 @@ export default class Socket extends Server { async create () { return new Promise( async (resolve,reject) => { - // couple ways to kill socket process when needed + // couple ways to kill socket process when needed _ON_DEATH( async () => { log.info('\nhe\'s dead jim') await this._destroy() @@ -46,7 +46,7 @@ export default class Socket extends Server { process.kill(process.pid, 'SIGUSR2') }) - this.on('error', async (err) => { + this.once('error', async (err) => { // recover from socket file that was not removed if (err.code === 'EADDRINUSE') { if (this.opts.np) { // if TCP socket should already be dead @@ -75,20 +75,22 @@ export default class Socket extends Server { async _listen (opts) { super.listen(opts, async (err, res) => { if (err) return err - // this gets called for each client connection and is unique to each + // this gets called for each client connection and is unique to each this.on('connection', async (socket) => { + let write = this._write.bind(socket) const stream = new JSONStream() log.info('new consumer connecting sending handshake') - socket.write(await stream.serialize({'_handshake':true})) + write(await stream.serialize({'_handshake':true})) socket.on('data', stream.onData) // TODO need to start error listener for stream so errors can be processed stream.on('message', messageProcess.bind(this)) async function messageProcess (packet) { // console.log('before processing',packet) let processed = await this._packetProcess(packet) - if (!processed) processed = { error: 'packet command function likely did not return a promise', packet:packet} + if (!processed) processed = { error: 'socket packet command function likely did not return a promise', packet:packet} + processed._id = packet._id //make sure return packet has its ID // console.log('after processing',processed) - socket.write(await stream.serialize(processed)) + write(await stream.serialize(processed)) } }) // end connecttion consumer log.info({opts: this.opts},'socket created') @@ -106,8 +108,25 @@ export default class Socket extends Server { // default packet process, just a simple echo _packetProcess (packet) { - packet.res='echoed' - return packet + return new Promise(resolve => { + packet.response='this packet was echoed by default' + resolve(packet) + }) + + } + + // must have a consumer socket bound to use + async _write(packet) { + // timeout already set if sockect can't be drained in 10 secs + return new Promise(resolve => { + const cb = () => resolve('packet written to socket stream') + if (!this.write(packet)) { + this.once('drain',cb ) + } else { + process.nextTick(cb) + } + + }) } } // end class diff --git a/test/sockets/usocket-default-overwrite.mjs b/test/sockets/usocket.mjs similarity index 69% rename from test/sockets/usocket-default-overwrite.mjs rename to test/sockets/usocket.mjs index 47bf915..167aed3 100644 --- a/test/sockets/usocket-default-overwrite.mjs +++ b/test/sockets/usocket.mjs @@ -7,8 +7,10 @@ let socket = new Socket({path:USOCKET,name:'default-unix-socket'}) socket.test = 'at socket => ' socket.registerPacketProcessor(async function (packet) { - packet.payload = 'overwrite default processor from instance '+ this.test + packet.payload - return packet + return new Promise((resolve) => { + packet.payload = 'overwrite default processor from instance '+ this.test + packet.payload + resolve(packet) + }) }) ; diff --git a/test/tcp.test.mjs b/test/tcp.test.mjs index f9f8ee6..c810510 100644 --- a/test/tcp.test.mjs +++ b/test/tcp.test.mjs @@ -33,29 +33,21 @@ describe('Connects and Processes a payload in a JSON packet via TCP Socket', fun it('with default host and port', async function () { let tcpconsumer_default = new Consumer({name:'tcpconsumer', log:false}) - return new Promise(async function (resolve, reject) { + let [err] = await btc(tcpconsumer_default.connect)() + if (err) { + console.log('unable to connect to socket to start test', tcpconsumer_default.port) + process.kill(process.pid, 'SIGTERM') + } + tcpconsumer_default.registerPacketProcessor(async function (packet) { + return new Promise((resolve) => { + packet.payload = packet.payload +':local' + resolve(packet)}) + }) - let [err] = await btc(tcpconsumer_default.connect)() - if (err) { - console.log('unable to connect to socket to start test', tcpconsumer_default.port) - process.kill(process.pid, 'SIGTERM') - } - - tcpconsumer_default._packetProcess = function (packet) { - try { - expect(packet.payload).to.equal('8080:tcp payload') - resolve() - } - catch(error) { - reject(error) - } - } - - let packet = {payload:'tcp payload'} - tcpconsumer_default.send(packet) - - }) //end promise + let packet = {payload:'tcp payload'} + let res = await tcpconsumer_default.send(packet) + expect(res.payload).to.equal('8080:tcp payload:local') }) // end tcp socket test @@ -63,29 +55,22 @@ describe('Connects and Processes a payload in a JSON packet via TCP Socket', fun let tcpconsumer_9080 = new Consumer({port:9080, name:'tcp-consumer-9080'}) - return new Promise(async function (resolve, reject) { + let [err] = await btc(tcpconsumer_9080.connect)() + if (err) { + console.log('unable to connect to socket to start test', tcpconsumer_9080.port) + process.kill(process.pid, 'SIGTERM') + } - let [err] = await btc(tcpconsumer_9080.connect)() - if (err) { - console.log('unable to connect to socket to start test', tcpconsumer_9080.port) - process.kill(process.pid, 'SIGTERM') - } + tcpconsumer_9080.registerPacketProcessor(async function (packet) { + return new Promise((resolve) => { + packet.payload = packet.payload +':local' + resolve(packet)}) + }) - tcpconsumer_9080.registerPacketProcessor(function (packet) { - try { - expect(packet.payload).to.equal('9080:tcp payload') - resolve() - } - catch(error) { - reject(error) - } - }) - - let packet = {payload:'tcp payload'} - tcpconsumer_9080.send(packet) - - }) //end promise + let packet = {payload:'tcp payload'} + let res = await tcpconsumer_9080.send(packet) + expect(res.payload).to.equal('9080:tcp payload:local') }) // end tcp socket 2 test diff --git a/test/usocket-default-overwrite.test.mjs b/test/usocket-default-overwrite.test.mjs deleted file mode 100644 index 62d6073..0000000 --- a/test/usocket-default-overwrite.test.mjs +++ /dev/null @@ -1,107 +0,0 @@ -import { spawn } from 'child_process' -import chai from 'chai' -import chaiAsPromised from 'chai-as-promised' -import btc from 'better-try-catch' -chai.use(chaiAsPromised) -const expect = chai.expect - -import { Consumer } from '../src' - -const USOCKET = __dirname + '/sockets/test.sock' -const SOCKET_FILE = 'usocket-default-overwrite' - -let consumer = new Consumer({path:USOCKET,name:'unix-consumer'}) -let consumer2 = new Consumer({path:USOCKET, name:'unix-consumer2'}) - -// const delay = time => new Promise(res=>setTimeout(()=>res(),time)) - -let socket = {} - -describe('Connects and Processes a payload via Unix Socket using JSON packet with alt default processor', function(){ - - before(async function(){ - socket = spawn('node',['-r', '@std/esm', './test/sockets/'+SOCKET_FILE]) - socket.stdout.on('data', function(buf) { - console.log('[Socket]', String(buf)) - }) - }) - - after(async function(){ - socket.kill() - }) - - it('Tests JSON packet procssing, 10 packets', async function () { - - consumer.times = 0 - - return new Promise(async function (resolve, reject) { - - setTimeout(() =>{ reject('10 packets not received in time')},1900) - - consumer._packetProcess = function (packet) { - this.times++ - if (this.times!==11) return - - try { - expect(packet.payload).to.equal('overwrite default processor from instance at socket => unix payload') - resolve() - } - catch(error) { - reject(error) - } - } - - let [err] = await btc(consumer.connect)() - if (err) reject(err) - let packet = {payload:'unix payload'} - for (var i = 0; i < 11; i++) { - consumer.send(packet) - } - - }) //end promise - - }) // end unix socket test - - - it('unix socket with two consumers alternating packets, 10 packets each', async function () { - - consumer.times = 0 - - return new Promise(async function (resolve, reject) { - - setTimeout(() =>{ reject('10 packets not received in time')},1900) - - consumer._packetProcess = function (packet) { - this.times++ - // console.log(this.times,packet.payload) - if (this.times!==11) return - - try { - expect(packet.payload).to.equal('overwrite default processor from instance at socket => consumer 1 unix payload') - resolve() - } - catch(error) { - reject(error) - } - } - - consumer2._packetProcess = function (packet) { - return packet - } - - let [err] = await btc(consumer2.connect)() - if (err) reject(err) - let packet1 = {payload:'consumer 1 unix payload'} - let packet2 = {payload:'consumer2 unix payload'} - for (var i = 0; i < 11; i++) { - consumer.send(packet1) - consumer2.send(packet2) - } - - }) //end promise - - }) // end unix socket test - - - -}) // end describe diff --git a/test/usocket-default.test.mjs b/test/usocket-default.test.mjs index cda83b1..df71584 100644 --- a/test/usocket-default.test.mjs +++ b/test/usocket-default.test.mjs @@ -30,47 +30,36 @@ describe('Connects and Processes a payload via Unix Socket using JSON packet wit socket.kill() }) - it('Tests unix socket with default echo JSON packet procssing, 10 packets with conect via connect', async function () { + const TIMES = 500 - consumer.times = 0 + it(`Tests unix socket with default echo JSON packet procssing with ${TIMES} packets sent`, async function () { - return new Promise(async function (resolve, reject) { + let [err] = await btc(consumer.connect)() + if (err) { + console.log('unable to connect to socket to start test', consumer.path) + process.kill(process.pid, 'SIGTERM') + } - let [err] = await btc(consumer.connect)() - if (err) { - console.log('unable to connect to socket to start test', consumer.path) - process.kill(process.pid, 'SIGTERM') - } - setTimeout(() =>{ reject('10 packets not received in time')},1900) + consumer.registerPacketProcessor(async function (packet) { + return new Promise((resolve) => { + packet.times += 1 + if (packet.times === TIMES) packet.payload = 'local1:'+packet.payload + resolve(packet)}) + }) - consumer._packetProcess = function (packet) { - this.times++ - if (this.times!==11) return - try { - expect(packet.payload).to.equal('unix payload') - resolve() - } - catch(error) { - reject(error) - } - } - - for (var i = 0; i < 11; i++) { - let packet = {payload:'unix payload'} - consumer.send(packet) - } - - }) //end promise + let packet = {payload:'payload', times:0} + for (let i = 1; i <= TIMES; i++) { + packet = await consumer.send(packet) + } + expect(packet.payload+':'+packet.times).to.equal('local1:payload:'+TIMES) }) // end unix socket test - it('unix socket with two consumers alternating packets, 10 packets each with local and added context', async function () { + it(`unix socket with two consumers alternating packets, ${TIMES} packets each and local processing`, async function () { - consumer.times = 0 - consumer.test = ':local' let [err] = await btc(consumer2.connect)() if (err) { @@ -78,41 +67,21 @@ describe('Connects and Processes a payload via Unix Socket using JSON packet wit process.kill(process.pid, 'SIGTERM') } - return new Promise(async function (resolve, reject) { + consumer2.registerPacketProcessor(async function (packet) { + return new Promise((resolve) => { + packet.times += 1 + if (packet.times === TIMES) packet.payload = 'local2:'+packet.payload + resolve(packet)}) + }) - setTimeout(() =>{ reject('10 packets not received in time')},1900) - - consumer.registerPacketProcessor(function (packet) { - this.times++ - // console.log(this.times,packet.payload) - if (this.times!==11) return - - packet.payload = packet.payload + this.test - - try { - expect(packet.payload).to.equal('consumer 1 unix payload:local') - resolve() - } - catch(error) { - reject(error) - } - }) - - consumer2._packetProcess = function (packet) { - return packet - } - - let packet1 = {payload:'consumer 1 unix payload'} - let packet2 = {payload:'consumer2 unix payload'} - for (var i = 0; i < 11; i++) { - consumer.send(packet1) - consumer2.send(packet2) - } - - }) //end promise + let packet = {consumer:1, payload:'payload', times:-1} + for (let i = 0; i < TIMES; i++) { + packet = await consumer.send(packet) + if (packet.times === TIMES) packet.times = 1 + packet = await consumer2.send(packet) + } + expect(packet.payload+':'+packet.times).to.equal('local2:local1:payload:'+TIMES) }) // end unix socket test - - }) // end describe diff --git a/test/usocket.test.mjs b/test/usocket.test.mjs new file mode 100644 index 0000000..61d0ef9 --- /dev/null +++ b/test/usocket.test.mjs @@ -0,0 +1,52 @@ +import { spawn } from 'child_process' +import chai from 'chai' +import chaiAsPromised from 'chai-as-promised' +import btc from 'better-try-catch' +chai.use(chaiAsPromised) +const expect = chai.expect + +import { Consumer } from '../src' + +const USOCKET = __dirname + '/sockets/test.sock' +const SOCKET_FILE = 'usocket' + +let consumer = new Consumer({path:USOCKET,name:'unix-consumer'}) + +// const delay = time => new Promise(res=>setTimeout(()=>res(),time)) + +let socket = {} + +describe('Connects and Processes a payload via Unix Socket using JSON packet with alt processor', function(){ + + before(async function(){ + socket = spawn('node',['-r', '@std/esm', './test/sockets/'+SOCKET_FILE]) + socket.stdout.on('data', function(buf) { + console.log('[Socket]', String(buf)) + }) + }) + + after(async function(){ + socket.kill() + }) + + it('Tests alternate JSON packet procssing at socket and consumer', async function () { + + let [err] = await btc(consumer.connect)() + if (err) { + console.log('unable to connect to socket to start test', consumer.path) + process.kill(process.pid, 'SIGTERM') + } + + consumer.registerPacketProcessor(async function (packet) { + return new Promise((resolve) => { + packet.payload = 'local:'+packet.payload + resolve(packet)}) + }) + let packet = { payload:'payload'} + let res = await consumer.send(packet) + expect(res.payload).to.equal('local:overwrite default processor from instance at socket => payload') + + + }) // end unix socket test + +}) // end describe