improved and debugged packet processing with corresponding better more exhaustive testing

This commit is contained in:
David Kebler 2018-01-23 16:02:56 -08:00
parent 0f82f4aedd
commit ab56bddd72
20 changed files with 857 additions and 218 deletions

View file

@ -13,7 +13,7 @@ export default class Consumer extends Socket {
this.host = opts.host || '127.0.0.1'
this.port = opts.port || 8080
} else this.path = path
this._pp = opts.packetProcessor || 'processPacket'
this.pp = opts.packetProcessor || 'processPacket'
this.keepAlive = opts.keepAlive ? opts.keepAlive : true
this._ready = false
this.timeout = opts.timeout || 500
@ -35,19 +35,20 @@ export default class Consumer extends Socket {
async connect (app={}) {
// first set the packet process
this.pp = this.pp || this._pp
if (Object.keys(app).length === 0) app = this
else app.pp = app.pp || this._pp
else app.pp = app.pp || this.pp
// set a default processor if none provided
if (!app[this.pp]) {
this.pp = 'processPacket' // reset in case alt function is missing
if (!app[app.pp]) {
app.pp = 'processPacket' // reset in case app[app.pp] is missing
app.processPacket = async (packet) => {
console.log('packet from socket')
console.log('default processor -- packet from socket')
console.dir(packet)
return packet }
}
// console.log('consumer processing=>',app.pp,'=>', app[app.pp])
// console.log('========================')
this.listen(app)
this.log.info('listening')

View file

@ -14,7 +14,7 @@ export default class Socket extends Server {
opts = path
this.listen_opts = { host: opts.host || '127.0.0.1', port: opts.port || 8080}
} else this.listen_opts = { path: path }
this._pp = opts.packetProcessor || 'processPacket'
this.pp = opts.packetProcessor || 'processPacket'
// logging
this.log_file=opts.log_file || './socket.log'
this.log_opts = {streams:[]}
@ -28,19 +28,17 @@ export default class Socket extends Server {
async create (app={}) {
// first set the packet process
this.pp = this.pp || this._pp
if (Object.keys(app).length === 0) app = this
else app.pp = app.pp || this._pp
// set a default processor if none provided
else app.pp = app.pp || this.pp
if (!app[app.pp]) {
app.pp = 'processPacket' // reset in case alt function is missing
this.pp = 'processPacket' // reset in case app.pp was set but app.pp function is missing
app.processPacket = async (packet) => {
packet.res='echoed'
return packet }
}
// console.log('socket processing app ',app[app.pp])
// console.log('socket processing3=>',app.pp,'=>', app[app.pp])
// console.log('========================')
return new Promise( async (resolve,reject) => {
@ -94,11 +92,15 @@ export default class Socket extends Server {
socket.on('data', stream.onData)
stream.on('message', async function (packet) {
// console.log('incoming packet from consumer',packet)
// console.log('socket processing app ',app[app.pp])
socket.write(stream.serialize(await app[app.pp].bind(app)(packet)))
})
// stream.on('message', messageProcess.bind(this))
// async function messageProcess (packet) {
// console.log('pp', app.processPacket)
// socket.write(stream.serialize(await app[app.pp].bind(app)(packet)))
// }
}) // end connected consumer
this.log.info({socket: this.listen_opts},'socket created')
@ -107,7 +109,6 @@ export default class Socket extends Server {
}
async destroy () {
this.log.info('closing down socket')

86
test/off/tcp.test.mjs Normal file
View file

@ -0,0 +1,86 @@
import { spawn } from 'child_process'
import chai from 'chai'
import chaiAsPromised from 'chai-as-promised'
import btc from 'better-try-catch'
chai.use(chaiAsPromised)
const expect = chai.expect
import { Consumer } from '../src'
const delay = time => new Promise(res=>setTimeout(()=>res(),time))
let tcpsocket_default = {}
let tcpsocket_9080 = {}
describe('Connects and Processes a payload in a JSON packet via TCP Socket', function(){
before(async function(){
tcpsocket_default = spawn('node',['-r', '@std/esm', './test/sockets/tcpsocket-default'])
tcpsocket_default.stdout.on('data', function(buf) {
console.log('[Socket]', String(buf))
})
tcpsocket_9080 = spawn('node',['-r', '@std/esm', './test/sockets/tcpsocket-9080'])
tcpsocket_9080.stdout.on('data', function(buf) {
console.log('[Socket]', String(buf))
})
await delay(500) // wait for sockets to get going
})
after(async function(){
tcpsocket_default.kill()
tcpsocket_9080.kill()
})
it('with default host and port', async function () {
let tcpconsumer_default = new Consumer({name:'tcpconsumer'})
return new Promise(async function (resolve, reject) {
tcpconsumer_default.processPacket = function (packet) {
try {
expect(packet.payload).to.equal('8080:tcp payload')
resolve()
}
catch(error) {
reject(error)
}
}
let [err] = await btc(tcpconsumer_default.connect)()
if (err) reject(err)
let packet = {payload:'tcp payload'}
tcpconsumer_default.send(packet)
}) //end promise
}) // end tcp socket test
it('with alternate port, and passed consumer processor', async function () {
let tcpconsumer_9080 = new Consumer({packetProcessor: 'test', port:9080, name:'tcp-consumer-9080'})
return new Promise(async function (resolve, reject) {
tcpconsumer_9080.test = function (packet) {
try {
expect(packet.payload).to.equal('9080:tcp payload')
resolve()
}
catch(error) {
reject(error)
}
}
let [err] = await btc(tcpconsumer_9080.connect)()
if (err) reject(err)
let packet = {payload:'tcp payload'}
tcpconsumer_9080.send(packet)
}) //end promise
}) // end tcp socket 2 test
}) // end describe

View file

@ -1,172 +0,0 @@
import { spawn } from 'child_process'
import chai from 'chai'
import chaiAsPromised from 'chai-as-promised'
import btc from 'better-try-catch'
chai.use(chaiAsPromised)
const expect = chai.expect
import { Consumer } from '../src'
const USOCKET = __dirname + '/sample.sock'
let uconsumer = new Consumer(USOCKET, {name:'test-uconsumer'})
let uconsumer2 = new Consumer(USOCKET, {name:'test-uconsumer-2'})
let tcpconsumer = new Consumer({name:'test-tcpconsumer'})
let tcpconsumer2 = new Consumer({port:9080, packetProcessor:'test', name:'test-tcpconsumer-2'})
const delay = time => new Promise(res=>setTimeout(()=>res(),time))
let usocket = {}
let tcpsocket = {}
let tcpsocket2 = {}
describe('Connects and Processes a payload in a JSON packet', function(){
before(async function(){
usocket = spawn('node',['-r', '@std/esm', './test/usocket'])
usocket.stdout.on('data', function(buf) {
console.log('[Socket]', String(buf))
})
tcpsocket = spawn('node',['-r', '@std/esm', './test/tcpsocket'])
tcpsocket.stdout.on('data', function(buf) {
console.log('[Socket]', String(buf))
})
tcpsocket2 = spawn('node',['-r', '@std/esm', './test/tcpsocket2'])
tcpsocket2.stdout.on('data', function(buf) {
console.log('[Socket]', String(buf))
})
await delay(500) // wait for sockets to get going
})
after(async function(){
usocket.kill()
tcpsocket.kill()
tcpsocket2.kill()
})
it('via unix socket with defaults testing stream JSON packet parser, 10 packets', async function () {
uconsumer.times = 0
return new Promise(async function (resolve, reject) {
setTimeout(() =>{ reject('10 packets not received in time')},1900)
uconsumer.processPacket = function (packet) {
this.times++
if (this.times<10) return
try {
expect(packet.payload).to.equal('unix payload')
resolve()
}
catch(error) {
reject(error)
}
}
let [err] = await btc(uconsumer.connect)()
if (err) reject(err)
let packet = {payload:'unix payload'}
for (var i = 0; i < 11; i++) {
uconsumer.send(packet)
}
}) //end promise
}) // end unix socket test
it('unix socket with two consumers alternating packets, 10 packets each', async function () {
uconsumer.times = 0
return new Promise(async function (resolve, reject) {
setTimeout(() =>{ reject('10 packets not received in time')},1900)
uconsumer.processPacket = function (packet) {
this.times++
// console.log(this.times,packet.payload)
if (this.times<10) return
try {
expect(packet.payload).to.equal('consumer 1 unix payload')
resolve()
}
catch(error) {
reject(error)
}
}
uconsumer2.processPacket = function (packet) {
// console.log('processor2', packet.payload)
return packet
}
let [err] = await btc(uconsumer2.connect)()
if (err) reject(err)
let packet1 = {payload:'consumer 1 unix payload'}
let packet2 = {payload:'consumer2 unix payload'}
for (var i = 0; i < 11; i++) {
uconsumer.send(packet1)
uconsumer2.send(packet2)
}
}) //end promise
}) // end unix socket test
it('via tcp socket with defaults', async function () {
return new Promise(async function (resolve, reject) {
tcpconsumer.processPacket = function (packet) {
try {
expect(packet.payload).to.equal('tcp processed tcp payload')
resolve()
}
catch(error) {
reject(error)
}
}
let [err] = await btc(tcpconsumer.connect)()
if (err) reject(err)
let packet = {payload:'tcp payload'}
tcpconsumer.send(packet)
}) //end promise
}) // end tcp socket test
it('via alternate port tcp socket with attached custom socket processing and alt named socket processing', async function () {
return new Promise(async function (resolve, reject) {
tcpconsumer2.test = function (packet) {
try {
expect(packet.payload).to.equal('alt tcp processed tcp payload')
resolve()
}
catch(error) {
reject(error)
}
}
let [err] = await btc(tcpconsumer2.connect)()
if (err) reject(err)
let packet = {payload:'tcp payload'}
tcpconsumer2.send(packet)
}) //end promise
}) // end tcp socket 2 test
}) // end describe

1
test/sockets/.gitignore vendored Normal file
View file

@ -0,0 +1 @@
/node_modules/

6
test/sockets/tcp-app.mjs Normal file
View file

@ -0,0 +1,6 @@
export default {
processPacket: async function (packet) {
packet.payload = this.port +':'+packet.payload
return packet
}
}

View file

@ -0,0 +1,15 @@
import { Socket } from '../../src'
import app from './tcp-app'
let socket = new Socket({port:9080, name:'tcp socket 9080'})
app.port = socket.listen_opts.port
;
(async () => {
await socket.create(app)
})().catch(err => {
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)
})

View file

@ -1,12 +1,10 @@
import { Socket } from '../src'
import { Socket } from '../../src'
import app from './tcp-app'
let socket = new Socket({name:'tcp socket'})
socket.processPacket = async function (packet) {
packet.payload = 'tcp processed '+packet.payload
return packet
}
app.port = socket.listen_opts.port
Object.assign(socket,app)
;
(async () => {

View file

@ -0,0 +1,19 @@
import { Socket } from '../../src'
const USOCKET = __dirname + '/test.sock'
let socket = new Socket(USOCKET,{packetProcessor:'sprocessPacket',name:'unix socket'})
socket.sprocessPacket = async function (packet) {
packet.payload = 'using alt processor name '+packet.payload
return packet
}
;
(async () => {
await socket.create()
})().catch(err => {
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)
})

View file

@ -0,0 +1,21 @@
import { Socket } from '../../src'
const USOCKET = __dirname + '/test.sock'
let socket = new Socket(USOCKET,{name:'unix socket'})
let app ={}
app.pp = 'test'
app.test = async function (packet) {
packet.payload = 'using passed processor with alt name '+packet.payload
return packet
}
;
(async () => {
await socket.create(app)
})().catch(err => {
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)
})

View file

@ -0,0 +1,20 @@
import { Socket } from '../../src'
const USOCKET = __dirname + '/test.sock'
let socket = new Socket(USOCKET,{name:'unix socket'})
let app ={}
app.processPacket = async function (packet) {
packet.payload = 'using passed processor with default name '+packet.payload
return packet
}
;
(async () => {
await socket.create(app)
})().catch(err => {
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)
})

View file

@ -0,0 +1,19 @@
import { Socket } from '../../src'
const USOCKET = __dirname + '/test.sock'
let socket = new Socket(USOCKET,{name:'unix socket'})
socket.processPacket = async function (packet) {
packet.payload = 'using alt processor with default name '+packet.payload
return packet
}
;
(async () => {
await socket.create()
})().catch(err => {
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)
})

View file

@ -1,12 +1,11 @@
import { Socket } from '../src'
import { Socket } from '../../src'
const USOCKET = __dirname + '/sample.sock'
const USOCKET = __dirname + '/test.sock'
let socket = new Socket(USOCKET,{name:'unix socket'})
;
(async () => {
await socket.create()
})().catch(err => {

86
test/tcp.test.mjs Normal file
View file

@ -0,0 +1,86 @@
import { spawn } from 'child_process'
import chai from 'chai'
import chaiAsPromised from 'chai-as-promised'
import btc from 'better-try-catch'
chai.use(chaiAsPromised)
const expect = chai.expect
import { Consumer } from '../src'
const delay = time => new Promise(res=>setTimeout(()=>res(),time))
let tcpsocket_default = {}
let tcpsocket_9080 = {}
describe('Connects and Processes a payload in a JSON packet via TCP Socket', function(){
before(async function(){
tcpsocket_default = spawn('node',['-r', '@std/esm', './test/sockets/tcpsocket-default'])
tcpsocket_default.stdout.on('data', function(buf) {
console.log('[Socket]', String(buf))
})
tcpsocket_9080 = spawn('node',['-r', '@std/esm', './test/sockets/tcpsocket-9080'])
tcpsocket_9080.stdout.on('data', function(buf) {
console.log('[Socket]', String(buf))
})
await delay(500) // wait for sockets to get going
})
after(async function(){
tcpsocket_default.kill()
tcpsocket_9080.kill()
})
it('with default host and port', async function () {
let tcpconsumer_default = new Consumer({name:'tcpconsumer'})
return new Promise(async function (resolve, reject) {
tcpconsumer_default.processPacket = function (packet) {
try {
expect(packet.payload).to.equal('8080:tcp payload')
resolve()
}
catch(error) {
reject(error)
}
}
let [err] = await btc(tcpconsumer_default.connect)()
if (err) reject(err)
let packet = {payload:'tcp payload'}
tcpconsumer_default.send(packet)
}) //end promise
}) // end tcp socket test
it('with alternate port, and passed consumer processor', async function () {
let tcpconsumer_9080 = new Consumer({packetProcessor: 'test', port:9080, name:'tcp-consumer-9080'})
return new Promise(async function (resolve, reject) {
tcpconsumer_9080.test = function (packet) {
try {
expect(packet.payload).to.equal('9080:tcp payload')
resolve()
}
catch(error) {
reject(error)
}
}
let [err] = await btc(tcpconsumer_9080.connect)()
if (err) reject(err)
let packet = {payload:'tcp payload'}
tcpconsumer_9080.send(packet)
}) //end promise
}) // end tcp socket 2 test
}) // end describe

View file

@ -1,20 +0,0 @@
import { Socket } from '../src'
let socket = new Socket({port:9080, name:'tcp socket 8081'})
// socket.spp = 'saltprocessPacket'
socket.pp ='sprocessPacket'
socket.sprocessPacket = async function (packet) {
packet.payload = 'alt tcp processed '+packet.payload
return packet
}
;
(async () => {
await socket.create()
})().catch(err => {
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)
})

View file

@ -0,0 +1,109 @@
import { spawn } from 'child_process'
import chai from 'chai'
import chaiAsPromised from 'chai-as-promised'
import btc from 'better-try-catch'
chai.use(chaiAsPromised)
const expect = chai.expect
import { Consumer } from '../src'
const USOCKET = __dirname + '/sockets/test.sock'
const SOCKET_FILE = 'usocket-alt-name'
let consumer = new Consumer(USOCKET, {name:'unix-consumer'})
let consumer2 = new Consumer(USOCKET, {name:'unix-consumer2'})
const delay = time => new Promise(res=>setTimeout(()=>res(),time))
let socket = {}
describe('Connects and Processes a payload via Unix Socket using JSON packet with alt named processor', function(){
before(async function(){
socket = spawn('node',['-r', '@std/esm', './test/sockets/'+SOCKET_FILE])
socket.stdout.on('data', function(buf) {
console.log('[Socket]', String(buf))
})
await delay(500) // wait for sockets to get going
})
after(async function(){
socket.kill()
})
it('Tests JSON packet procssing, 10 packets', async function () {
consumer.times = 0
return new Promise(async function (resolve, reject) {
setTimeout(() =>{ reject('10 packets not received in time')},1900)
consumer.processPacket = function (packet) {
this.times++
if (this.times!==11) return
try {
expect(packet.payload).to.equal('using alt processor name unix payload')
resolve()
}
catch(error) {
reject(error)
}
}
let [err] = await btc(consumer.connect)()
if (err) reject(err)
let packet = {payload:'unix payload'}
for (var i = 0; i < 11; i++) {
consumer.send(packet)
}
}) //end promise
}) // end unix socket test
it('unix socket with two consumers alternating packets, 10 packets each', async function () {
consumer.times = 0
return new Promise(async function (resolve, reject) {
setTimeout(() =>{ reject('10 packets not received in time')},1900)
consumer.processPacket = function (packet) {
this.times++
// console.log(this.times,packet.payload)
if (this.times!==11) return
try {
expect(packet.payload).to.equal('using alt processor name consumer 1 unix payload')
resolve()
}
catch(error) {
reject(error)
}
}
consumer2.processPacket = function (packet) {
return packet
}
let [err] = await btc(consumer2.connect)()
if (err) reject(err)
let packet1 = {payload:'consumer 1 unix payload'}
let packet2 = {payload:'consumer2 unix payload'}
for (var i = 0; i < 11; i++) {
consumer.send(packet1)
consumer2.send(packet2)
}
}) //end promise
}) // end unix socket test
}) // end describe

View file

@ -0,0 +1,117 @@
import { spawn } from 'child_process'
import chai from 'chai'
import chaiAsPromised from 'chai-as-promised'
import btc from 'better-try-catch'
chai.use(chaiAsPromised)
const expect = chai.expect
import { Consumer } from '../src'
const USOCKET = __dirname + '/sockets/test.sock'
const SOCKET_FILE = 'usocket-app-alt-name'
const delay = time => new Promise(res=>setTimeout(()=>res(),time))
let socket = {}
describe('Connects and Processes a payload via Unix Socket using JSON packet with passed processor using default name', function(){
before(async function(){
socket = spawn('node',['-r', '@std/esm', './test/sockets/'+SOCKET_FILE])
socket.stdout.on('data', function(buf) {
console.log('[Socket]', String(buf))
})
await delay(500) // wait for sockets to get going
})
// beforeEach(async function(){
// let consumer = new Consumer(USOCKET, {name:'unix-consumer'})
// })
after(async function(){
socket.kill()
})
it('Tests JSON packet procssing, 10 packets', async function () {
let consumer = new Consumer(USOCKET, {name:'unix-consumer'})
return new Promise(async function (resolve, reject) {
setTimeout(() =>{ reject('10 packets not received in time')},1900)
let app = {}
app.times = 0
app.pp = 'xprocessPacket'
app.xprocessPacket = function (packet) {
this.times++
// console.log('times=',this.times,'\n',packet.payload)
if (this.times!==11) return
consumer.end()
consumer.destroy()
try {
expect(packet.payload).to.equal('using passed processor with alt name unix payload')
resolve()
}
catch(error) {
reject(error)
}
}
let [err] = await btc(consumer.connect)(app)
if (err) reject(err)
let packet = {payload:'unix payload'}
for (var i = 0; i < 11; i++) {
consumer.send(packet)
}
}) //end promise
}) // end unix socket test
it('unix socket with two consumers alternating packets, 10 packets each, alt consumer processor name', async function () {
let consumer = new Consumer(USOCKET, {name:'unix-consumer'})
let consumer2 = new Consumer(USOCKET, {name:'unix-consumer2'})
return new Promise(async function (resolve, reject) {
setTimeout(() =>{ reject('10 packets not received in time')},1900)
let app = {}
app.pp = 'yprocessPacket'
app.times = 0
app.yprocessPacket = function (packet) {
this.times++
if (this.times!==11) return
try {
expect(packet.payload).to.equal('using passed processor with alt name consumer 1 unix payload')
resolve()
}
catch(error) {
reject(error)
}
}
consumer2.processPacket = function (packet) {
return packet
}
let [err] = await btc(consumer.connect)(app)
if (err) reject(err)
let [err2] = await btc(consumer2.connect)()
if (err2) reject(err2)
let packet1 = {payload:'consumer 1 unix payload'}
let packet2 = {payload:'consumer2 unix payload'}
for (var i = 0; i < 11; i++) {
consumer.send(packet1)
consumer2.send(packet2)
}
}) //end promise
}) // end unix socket test
}) // end describe

View file

@ -0,0 +1,115 @@
import { spawn } from 'child_process'
import chai from 'chai'
import chaiAsPromised from 'chai-as-promised'
import btc from 'better-try-catch'
chai.use(chaiAsPromised)
const expect = chai.expect
import { Consumer } from '../src'
const USOCKET = __dirname + '/sockets/test.sock'
const SOCKET_FILE = 'usocket-app-default-name'
const delay = time => new Promise(res=>setTimeout(()=>res(),time))
let socket = {}
describe('Connects and Processes a payload via Unix Socket using JSON packet with passed processor using default name', function(){
before(async function(){
socket = spawn('node',['-r', '@std/esm', './test/sockets/'+SOCKET_FILE])
socket.stdout.on('data', function(buf) {
console.log('[Socket]', String(buf))
})
await delay(500) // wait for sockets to get going
})
// beforeEach(async function(){
// let consumer = new Consumer(USOCKET, {name:'unix-consumer'})
// })
after(async function(){
socket.kill()
})
it('Tests JSON packet procssing, 10 packets', async function () {
let consumer = new Consumer(USOCKET, {name:'unix-consumer'})
return new Promise(async function (resolve, reject) {
setTimeout(() =>{ reject('10 packets not received in time')},1900)
let app = {}
app.times = 0
app.processPacket = function (packet) {
this.times++
// console.log('times=',this.times,'\n',packet.payload)
if (this.times!==11) return
consumer.end()
consumer.destroy()
try {
expect(packet.payload).to.equal('using passed processor with default name unix payload')
resolve()
}
catch(error) {
reject(error)
}
}
let [err] = await btc(consumer.connect)(app)
if (err) reject(err)
let packet = {payload:'unix payload'}
for (var i = 0; i < 11; i++) {
consumer.send(packet)
}
}) //end promise
}) // end unix socket test
it('unix socket with two consumers alternating packets, 10 packets each, alt consumer processor name', async function () {
let consumer = new Consumer(USOCKET, {name:'unix-consumer'})
let consumer2 = new Consumer(USOCKET, {name:'unix-consumer2'})
return new Promise(async function (resolve, reject) {
setTimeout(() =>{ reject('10 packets not received in time')},1900)
let app = {}
app.times = 0
app.processPacket = function (packet) {
this.times++
if (this.times!==11) return
try {
expect(packet.payload).to.equal('using passed processor with default name consumer 1 unix payload')
resolve()
}
catch(error) {
reject(error)
}
}
consumer2.processPacket = function (packet) {
return packet
}
let [err] = await btc(consumer.connect)(app)
if (err) reject(err)
let [err2] = await btc(consumer2.connect)()
if (err2) reject(err2)
let packet1 = {payload:'consumer 1 unix payload'}
let packet2 = {payload:'consumer2 unix payload'}
for (var i = 0; i < 11; i++) {
consumer.send(packet1)
consumer2.send(packet2)
}
}) //end promise
}) // end unix socket test
}) // end describe

View file

@ -0,0 +1,109 @@
import { spawn } from 'child_process'
import chai from 'chai'
import chaiAsPromised from 'chai-as-promised'
import btc from 'better-try-catch'
chai.use(chaiAsPromised)
const expect = chai.expect
import { Consumer } from '../src'
const USOCKET = __dirname + '/sockets/test.sock'
const SOCKET_FILE = 'usocket-default-name'
let consumer = new Consumer(USOCKET, {name:'unix-consumer'})
let consumer2 = new Consumer(USOCKET, {name:'unix-consumer2'})
const delay = time => new Promise(res=>setTimeout(()=>res(),time))
let socket = {}
describe('Connects and Processes a payload via Unix Socket using JSON packet with alt default processor', function(){
before(async function(){
socket = spawn('node',['-r', '@std/esm', './test/sockets/'+SOCKET_FILE])
socket.stdout.on('data', function(buf) {
console.log('[Socket]', String(buf))
})
await delay(500) // wait for sockets to get going
})
after(async function(){
socket.kill()
})
it('Tests JSON packet procssing, 10 packets', async function () {
consumer.times = 0
return new Promise(async function (resolve, reject) {
setTimeout(() =>{ reject('10 packets not received in time')},1900)
consumer.processPacket = function (packet) {
this.times++
if (this.times!==11) return
try {
expect(packet.payload).to.equal('using alt processor with default name unix payload')
resolve()
}
catch(error) {
reject(error)
}
}
let [err] = await btc(consumer.connect)()
if (err) reject(err)
let packet = {payload:'unix payload'}
for (var i = 0; i < 11; i++) {
consumer.send(packet)
}
}) //end promise
}) // end unix socket test
it('unix socket with two consumers alternating packets, 10 packets each', async function () {
consumer.times = 0
return new Promise(async function (resolve, reject) {
setTimeout(() =>{ reject('10 packets not received in time')},1900)
consumer.processPacket = function (packet) {
this.times++
// console.log(this.times,packet.payload)
if (this.times!==11) return
try {
expect(packet.payload).to.equal('using alt processor with default name consumer 1 unix payload')
resolve()
}
catch(error) {
reject(error)
}
}
consumer2.processPacket = function (packet) {
return packet
}
let [err] = await btc(consumer2.connect)()
if (err) reject(err)
let packet1 = {payload:'consumer 1 unix payload'}
let packet2 = {payload:'consumer2 unix payload'}
for (var i = 0; i < 11; i++) {
consumer.send(packet1)
consumer2.send(packet2)
}
}) //end promise
}) // end unix socket test
}) // end describe

View file

@ -0,0 +1,109 @@
import { spawn } from 'child_process'
import chai from 'chai'
import chaiAsPromised from 'chai-as-promised'
import btc from 'better-try-catch'
chai.use(chaiAsPromised)
const expect = chai.expect
import { Consumer } from '../src'
const USOCKET = __dirname + '/sockets/test.sock'
const SOCKET_FILE = 'usocket-default'
let consumer = new Consumer(USOCKET, {name:'unix-consumer'})
let consumer2 = new Consumer(USOCKET, {name:'unix-consumer2'})
const delay = time => new Promise(res=>setTimeout(()=>res(),time))
let socket = {}
describe('Connects and Processes a payload via Unix Socket using JSON packet with defaults', function(){
before(async function(){
socket = spawn('node',['-r', '@std/esm', './test/sockets/'+SOCKET_FILE])
socket.stdout.on('data', function(buf) {
console.log('[Socket]', String(buf))
})
await delay(500) // wait for sockets to get going
})
after(async function(){
socket.kill()
})
it('Tests unix socket with default echo JSON packet procssing, 10 packets', async function () {
consumer.times = 0
return new Promise(async function (resolve, reject) {
setTimeout(() =>{ reject('10 packets not received in time')},1900)
consumer.processPacket = function (packet) {
this.times++
if (this.times!==11) return
try {
expect(packet.payload).to.equal('unix payload')
resolve()
}
catch(error) {
reject(error)
}
}
let [err] = await btc(consumer.connect)()
if (err) reject(err)
let packet = {payload:'unix payload'}
for (var i = 0; i < 11; i++) {
consumer.send(packet)
}
}) //end promise
}) // end unix socket test
it('unix socket with two consumers alternating packets, 10 packets each', async function () {
consumer.times = 0
return new Promise(async function (resolve, reject) {
setTimeout(() =>{ reject('10 packets not received in time')},1900)
consumer.processPacket = function (packet) {
this.times++
// console.log(this.times,packet.payload)
if (this.times!==11) return
try {
expect(packet.payload).to.equal('consumer 1 unix payload')
resolve()
}
catch(error) {
reject(error)
}
}
consumer2.processPacket = function (packet) {
return packet
}
let [err] = await btc(consumer2.connect)()
if (err) reject(err)
let packet1 = {payload:'consumer 1 unix payload'}
let packet2 = {payload:'consumer2 unix payload'}
for (var i = 0; i < 11; i++) {
consumer.send(packet1)
consumer2.send(packet2)
}
}) //end promise
}) // end unix socket test
}) // end describe