added dual consumer testing of socket
catch if consumeris already connected and continuetls
parent
4b13cef73a
commit
c7d25521e8
|
@ -31,14 +31,17 @@ export default class Consumer extends Socket {
|
||||||
ready() {return this._ready}
|
ready() {return this._ready}
|
||||||
|
|
||||||
async connect () {
|
async connect () {
|
||||||
await this.listen()
|
this.listen()
|
||||||
this.log.info('listening')
|
this.log.info('listening')
|
||||||
|
|
||||||
return new Promise( (resolve,reject) => {
|
return new Promise( (resolve,reject) => {
|
||||||
|
|
||||||
this.on('error', (err) => {
|
this.on('error', async (err) => {
|
||||||
console.log(err.code)
|
if (err.code === 'EISCONN') {
|
||||||
reject(err)
|
console.log('===============',err)
|
||||||
|
return resolve('ready')
|
||||||
|
}
|
||||||
|
return reject(err)
|
||||||
})
|
})
|
||||||
|
|
||||||
super.connect({ port:this.port, host:this.host, path: this.path }, async () => {
|
super.connect({ port:this.port, host:this.host, path: this.path }, async () => {
|
||||||
|
@ -49,7 +52,18 @@ export default class Consumer extends Socket {
|
||||||
this.log.info('handshake done, connected')
|
this.log.info('handshake done, connected')
|
||||||
resolve(res)
|
resolve(res)
|
||||||
})
|
})
|
||||||
})
|
// catch (err){
|
||||||
|
// console.log('===============',err)
|
||||||
|
// resolve('ready')
|
||||||
|
// }
|
||||||
|
// if (err) {
|
||||||
|
// console.log('===============',err)
|
||||||
|
// if (err.code === 'EISCONN') resolve('ready')
|
||||||
|
// else reject(err)
|
||||||
|
// }
|
||||||
|
|
||||||
|
|
||||||
|
}) //end promise
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -73,11 +87,11 @@ export default class Consumer extends Socket {
|
||||||
|
|
||||||
let packet = new Stream()
|
let packet = new Stream()
|
||||||
|
|
||||||
this.on('data', async (chunk) => {
|
this.on('data', (chunk) => {
|
||||||
packet.write(chunk)
|
packet.write(chunk)
|
||||||
})
|
})
|
||||||
|
|
||||||
packet.on('data', async (strJSON) => {
|
packet.on('data', (strJSON) => {
|
||||||
let [err, packet] = btc(JSON.parse)(strJSON)
|
let [err, packet] = btc(JSON.parse)(strJSON)
|
||||||
if (!err) {
|
if (!err) {
|
||||||
if (packet.ready) {
|
if (packet.ready) {
|
||||||
|
@ -96,7 +110,7 @@ export default class Consumer extends Socket {
|
||||||
console.dir(packet)
|
console.dir(packet)
|
||||||
return packet }
|
return packet }
|
||||||
}
|
}
|
||||||
await this[this.pp].bind(this)(packet) // process the packet
|
this[this.pp].bind(this)(packet) // process the packet
|
||||||
}
|
}
|
||||||
else { this.log.info({strJSON: strJSON},'bad packet JSON syntax')}
|
else { this.log.info({strJSON: strJSON},'bad packet JSON syntax')}
|
||||||
})
|
})
|
||||||
|
|
|
@ -10,6 +10,7 @@ import { Consumer } from '../src'
|
||||||
const USOCKET = __dirname + '/sample.sock'
|
const USOCKET = __dirname + '/sample.sock'
|
||||||
|
|
||||||
let uconsumer = new Consumer(USOCKET, {name:'test-uconsumer'})
|
let uconsumer = new Consumer(USOCKET, {name:'test-uconsumer'})
|
||||||
|
let uconsumer2 = new Consumer(USOCKET, {name:'test-uconsumer-2'})
|
||||||
let tcpconsumer = new Consumer({name:'test-tcpconsumer'})
|
let tcpconsumer = new Consumer({name:'test-tcpconsumer'})
|
||||||
let tcpconsumer2 = new Consumer({port:9080, packetProcessor:'test', name:'test-tcpconsumer-2'})
|
let tcpconsumer2 = new Consumer({port:9080, packetProcessor:'test', name:'test-tcpconsumer-2'})
|
||||||
|
|
||||||
|
@ -47,7 +48,7 @@ describe('Connects and Processes a payload in a JSON packet', function(){
|
||||||
tcpsocket2.kill()
|
tcpsocket2.kill()
|
||||||
})
|
})
|
||||||
|
|
||||||
it('via unix socket with defaults', async function () {
|
it('via unix socket with defaults testing stream JSON packet parser, 10 packets', async function () {
|
||||||
|
|
||||||
uconsumer.times = 0
|
uconsumer.times = 0
|
||||||
|
|
||||||
|
@ -80,6 +81,47 @@ describe('Connects and Processes a payload in a JSON packet', function(){
|
||||||
|
|
||||||
}) // end unix socket test
|
}) // end unix socket test
|
||||||
|
|
||||||
|
|
||||||
|
it('unix socket with two consumers alternating packets, 10 packets each', async function () {
|
||||||
|
|
||||||
|
uconsumer.times = 0
|
||||||
|
|
||||||
|
return new Promise(async function (resolve, reject) {
|
||||||
|
|
||||||
|
setTimeout(() =>{ reject('10 packets not received in time')},1900)
|
||||||
|
|
||||||
|
uconsumer.processPacket = function (packet) {
|
||||||
|
this.times++
|
||||||
|
if (this.times<10) return
|
||||||
|
|
||||||
|
try {
|
||||||
|
// expect(packet.payload).to.equal('unix payload')
|
||||||
|
expect(packet.payload).to.equal('unix payload')
|
||||||
|
resolve()
|
||||||
|
}
|
||||||
|
catch(error) {
|
||||||
|
reject(error)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
uconsumer2.processPacket = function (packet) {
|
||||||
|
return packet
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
let [err] = await btc(uconsumer2.connect)()
|
||||||
|
console.log('connect error', err)
|
||||||
|
let packet = {payload:'unix payload'}
|
||||||
|
for (var i = 0; i < 11; i++) {
|
||||||
|
uconsumer.send(packet)
|
||||||
|
uconsumer2.send(packet)
|
||||||
|
}
|
||||||
|
|
||||||
|
}) //end promise
|
||||||
|
|
||||||
|
}) // end unix socket test
|
||||||
|
|
||||||
|
|
||||||
it('via tcp socket with defaults', async function () {
|
it('via tcp socket with defaults', async function () {
|
||||||
|
|
||||||
return new Promise(async function (resolve, reject) {
|
return new Promise(async function (resolve, reject) {
|
||||||
|
|
Loading…
Reference in New Issue