From c7d25521e8c3f344a972ee20d634ea27e3b9dbd9 Mon Sep 17 00:00:00 2001 From: David Kebler Date: Sat, 20 Jan 2018 13:52:07 -0800 Subject: [PATCH] added dual consumer testing of socket catch if consumeris already connected and continue --- src/consumer.mjs | 32 +++++++++++++++++++++++--------- test/socket.test.mjs | 44 +++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 66 insertions(+), 10 deletions(-) diff --git a/src/consumer.mjs b/src/consumer.mjs index 0ab46df..227894a 100644 --- a/src/consumer.mjs +++ b/src/consumer.mjs @@ -31,14 +31,17 @@ export default class Consumer extends Socket { ready() {return this._ready} async connect () { - await this.listen() + this.listen() this.log.info('listening') return new Promise( (resolve,reject) => { - this.on('error', (err) => { - console.log(err.code) - reject(err) + this.on('error', async (err) => { + if (err.code === 'EISCONN') { + console.log('===============',err) + return resolve('ready') + } + return reject(err) }) super.connect({ port:this.port, host:this.host, path: this.path }, async () => { @@ -49,7 +52,18 @@ export default class Consumer extends Socket { this.log.info('handshake done, connected') resolve(res) }) - }) + // catch (err){ + // console.log('===============',err) + // resolve('ready') + // } + // if (err) { + // console.log('===============',err) + // if (err.code === 'EISCONN') resolve('ready') + // else reject(err) + // } + + + }) //end promise } @@ -73,18 +87,18 @@ export default class Consumer extends Socket { let packet = new Stream() - this.on('data', async (chunk) => { + this.on('data', (chunk) => { packet.write(chunk) }) - packet.on('data', async (strJSON) => { + packet.on('data', (strJSON) => { let [err, packet] = btc(JSON.parse)(strJSON) if (!err) { if (packet.ready) { this._ready = true return } - // set packet processing + // set packet processing this.pp = this.pp || this._pp // if no processor provided use this console logger one @@ -96,7 +110,7 @@ export default class Consumer extends Socket { console.dir(packet) return packet } } - await this[this.pp].bind(this)(packet) // process the packet + this[this.pp].bind(this)(packet) // process the packet } else { this.log.info({strJSON: strJSON},'bad packet JSON syntax')} }) diff --git a/test/socket.test.mjs b/test/socket.test.mjs index 0ea5831..670cc39 100644 --- a/test/socket.test.mjs +++ b/test/socket.test.mjs @@ -10,6 +10,7 @@ import { Consumer } from '../src' const USOCKET = __dirname + '/sample.sock' let uconsumer = new Consumer(USOCKET, {name:'test-uconsumer'}) +let uconsumer2 = new Consumer(USOCKET, {name:'test-uconsumer-2'}) let tcpconsumer = new Consumer({name:'test-tcpconsumer'}) let tcpconsumer2 = new Consumer({port:9080, packetProcessor:'test', name:'test-tcpconsumer-2'}) @@ -47,7 +48,7 @@ describe('Connects and Processes a payload in a JSON packet', function(){ tcpsocket2.kill() }) - it('via unix socket with defaults', async function () { + it('via unix socket with defaults testing stream JSON packet parser, 10 packets', async function () { uconsumer.times = 0 @@ -80,6 +81,47 @@ describe('Connects and Processes a payload in a JSON packet', function(){ }) // end unix socket test + + it('unix socket with two consumers alternating packets, 10 packets each', async function () { + + uconsumer.times = 0 + + return new Promise(async function (resolve, reject) { + + setTimeout(() =>{ reject('10 packets not received in time')},1900) + + uconsumer.processPacket = function (packet) { + this.times++ + if (this.times<10) return + + try { + // expect(packet.payload).to.equal('unix payload') + expect(packet.payload).to.equal('unix payload') + resolve() + } + catch(error) { + reject(error) + } + } + + uconsumer2.processPacket = function (packet) { + return packet + } + + + let [err] = await btc(uconsumer2.connect)() + console.log('connect error', err) + let packet = {payload:'unix payload'} + for (var i = 0; i < 11; i++) { + uconsumer.send(packet) + uconsumer2.send(packet) + } + + }) //end promise + + }) // end unix socket test + + it('via tcp socket with defaults', async function () { return new Promise(async function (resolve, reject) {