added packet property to hold processor and both local and added context in which to process packets for both socket and consumer
changed tests accordingly
This commit is contained in:
parent
ab56bddd72
commit
1b846c4753
19 changed files with 140 additions and 477 deletions
|
@ -13,12 +13,22 @@ export default class Consumer extends Socket {
|
|||
this.host = opts.host || '127.0.0.1'
|
||||
this.port = opts.port || 8080
|
||||
} else this.path = path
|
||||
this.pp = opts.packetProcessor || 'processPacket'
|
||||
this.packet = {
|
||||
_process: async (packet) => {
|
||||
console.log('default processor -- packet from socket')
|
||||
console.dir(packet)
|
||||
return packet }
|
||||
}
|
||||
this.keepAlive = opts.keepAlive ? opts.keepAlive : true
|
||||
this._ready = false
|
||||
this.timeout = opts.timeout || 500
|
||||
this.wait = opts.wait || 5
|
||||
this.stream = new JsonStream()
|
||||
this.packet = {
|
||||
_process: (packet) => {
|
||||
packet.res='echoed'
|
||||
return packet }
|
||||
}
|
||||
// logging
|
||||
this.log_file=opts.log_file || './socket.log'
|
||||
this.log_opts = {streams:[]}
|
||||
|
@ -33,23 +43,14 @@ export default class Consumer extends Socket {
|
|||
|
||||
ready() {return this._ready}
|
||||
|
||||
async connect (app={}) {
|
||||
async connect (context) {
|
||||
|
||||
if (Object.keys(app).length === 0) app = this
|
||||
else app.pp = app.pp || this.pp
|
||||
// set a default processor if none provided
|
||||
if (!app[app.pp]) {
|
||||
app.pp = 'processPacket' // reset in case app[app.pp] is missing
|
||||
app.processPacket = async (packet) => {
|
||||
console.log('default processor -- packet from socket')
|
||||
console.dir(packet)
|
||||
return packet }
|
||||
}
|
||||
if (context) this.packet.context = context
|
||||
|
||||
// console.log('consumer processing=>',app.pp,'=>', app[app.pp])
|
||||
// console.log('========================')
|
||||
|
||||
this.listen(app)
|
||||
this.listen()
|
||||
this.log.info('listening')
|
||||
|
||||
return new Promise( (resolve,reject) => {
|
||||
|
@ -76,10 +77,10 @@ export default class Consumer extends Socket {
|
|||
|
||||
async send(packet) {
|
||||
await this.write(this.stream.serialize(packet))
|
||||
// throw new Error('Cannot send connection not ready')
|
||||
// handle error here?
|
||||
}
|
||||
|
||||
async listen (app) {
|
||||
async listen () {
|
||||
|
||||
this.on('data', this.stream.onData)
|
||||
|
||||
|
@ -90,11 +91,19 @@ export default class Consumer extends Socket {
|
|||
if (packet.ready) {
|
||||
this._ready = true
|
||||
return }
|
||||
// console.log('consumer processor',app[app.pp])
|
||||
await app[app.pp].bind(app)(packet)
|
||||
// console.log('consumer processor',this.packet._process)
|
||||
await this.packet._process(packet)
|
||||
}
|
||||
}
|
||||
|
||||
registerPacketContext(obj) {
|
||||
this.packet.context = obj
|
||||
}
|
||||
|
||||
registerPacketProcessor (func) {
|
||||
this.packet._process = func
|
||||
}
|
||||
|
||||
} // end class
|
||||
|
||||
// wait for handshake packet from socket
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
// adpated from https://github.com/sebastianseilund/node-json-socket
|
||||
|
||||
import {StringDecoder} from 'string_decoder'
|
||||
import EventEmitter from 'events'
|
||||
|
||||
|
|
|
@ -14,7 +14,12 @@ export default class Socket extends Server {
|
|||
opts = path
|
||||
this.listen_opts = { host: opts.host || '127.0.0.1', port: opts.port || 8080}
|
||||
} else this.listen_opts = { path: path }
|
||||
this.pp = opts.packetProcessor || 'processPacket'
|
||||
// default packet processing
|
||||
this.packet = {
|
||||
_process: (packet) => {
|
||||
packet.res='echoed'
|
||||
return packet }
|
||||
}
|
||||
// logging
|
||||
this.log_file=opts.log_file || './socket.log'
|
||||
this.log_opts = {streams:[]}
|
||||
|
@ -23,22 +28,14 @@ export default class Socket extends Server {
|
|||
if (opts.log) this.log_opts.streams.push({level: 'info',stream: process.stdout})
|
||||
this.log = bunyan.createLogger(this.log_opts)
|
||||
//binding
|
||||
this.listen = this.listen.bind(this)
|
||||
this.create = this.create.bind(this)
|
||||
|
||||
} // end constructor
|
||||
|
||||
async create (app={}) {
|
||||
async create (context) {
|
||||
|
||||
if (Object.keys(app).length === 0) app = this
|
||||
else app.pp = app.pp || this.pp
|
||||
if (!app[app.pp]) {
|
||||
this.pp = 'processPacket' // reset in case app.pp was set but app.pp function is missing
|
||||
app.processPacket = async (packet) => {
|
||||
packet.res='echoed'
|
||||
return packet }
|
||||
}
|
||||
|
||||
// console.log('socket processing3=>',app.pp,'=>', app[app.pp])
|
||||
// console.log('========================')
|
||||
if (context) this.packet.context = context
|
||||
|
||||
return new Promise( async (resolve,reject) => {
|
||||
|
||||
|
@ -60,7 +57,7 @@ export default class Socket extends Server {
|
|||
if (path) { // if TCP socket should already be dead
|
||||
this.log.info({socket: path}, 'already exists...deleting')
|
||||
await fileDelete(path)
|
||||
return await this.listen.bind(this)(this.listen_opts,app)
|
||||
return await this.listen(this.listen_opts)
|
||||
}
|
||||
}
|
||||
// otherwise fatally exit
|
||||
|
@ -68,14 +65,14 @@ export default class Socket extends Server {
|
|||
reject(err)
|
||||
})
|
||||
|
||||
let [err, res] = await btc(this.listen).bind(this)(this.listen_opts,app)
|
||||
let [err, res] = await btc(this.listen)(this.listen_opts)
|
||||
if (err) reject(err)
|
||||
resolve(res)
|
||||
|
||||
}) // end promise
|
||||
} // end create
|
||||
|
||||
async listen (opts,app) {
|
||||
async listen (opts) {
|
||||
|
||||
super.listen(opts, async (err, res) => {
|
||||
|
||||
|
@ -91,15 +88,11 @@ export default class Socket extends Server {
|
|||
|
||||
socket.on('data', stream.onData)
|
||||
|
||||
stream.on('message', async function (packet) {
|
||||
socket.write(stream.serialize(await app[app.pp].bind(app)(packet)))
|
||||
})
|
||||
|
||||
// stream.on('message', messageProcess.bind(this))
|
||||
// async function messageProcess (packet) {
|
||||
// console.log('pp', app.processPacket)
|
||||
// socket.write(stream.serialize(await app[app.pp].bind(app)(packet)))
|
||||
// }
|
||||
stream.on('message', messageProcess.bind(this))
|
||||
async function messageProcess (packet) {
|
||||
// console.log(this.packet._process.toString())
|
||||
socket.write(stream.serialize(await this.packet._process(packet)))
|
||||
}
|
||||
|
||||
}) // end connected consumer
|
||||
|
||||
|
@ -115,8 +108,14 @@ export default class Socket extends Server {
|
|||
await this.close()
|
||||
this.log.info('all connections closed....exiting')
|
||||
process.exit()
|
||||
}
|
||||
|
||||
} // end destroy
|
||||
registerPacketContext(obj) {
|
||||
this.packet.context = obj
|
||||
}
|
||||
|
||||
registerPacketProcessor (func) {
|
||||
this.packet._process = func
|
||||
}
|
||||
|
||||
} // end class
|
||||
|
|
|
@ -1,86 +0,0 @@
|
|||
import { spawn } from 'child_process'
|
||||
import chai from 'chai'
|
||||
import chaiAsPromised from 'chai-as-promised'
|
||||
import btc from 'better-try-catch'
|
||||
chai.use(chaiAsPromised)
|
||||
const expect = chai.expect
|
||||
|
||||
import { Consumer } from '../src'
|
||||
|
||||
const delay = time => new Promise(res=>setTimeout(()=>res(),time))
|
||||
|
||||
let tcpsocket_default = {}
|
||||
let tcpsocket_9080 = {}
|
||||
|
||||
describe('Connects and Processes a payload in a JSON packet via TCP Socket', function(){
|
||||
|
||||
before(async function(){
|
||||
tcpsocket_default = spawn('node',['-r', '@std/esm', './test/sockets/tcpsocket-default'])
|
||||
tcpsocket_default.stdout.on('data', function(buf) {
|
||||
console.log('[Socket]', String(buf))
|
||||
})
|
||||
|
||||
tcpsocket_9080 = spawn('node',['-r', '@std/esm', './test/sockets/tcpsocket-9080'])
|
||||
tcpsocket_9080.stdout.on('data', function(buf) {
|
||||
console.log('[Socket]', String(buf))
|
||||
})
|
||||
|
||||
await delay(500) // wait for sockets to get going
|
||||
})
|
||||
|
||||
after(async function(){
|
||||
tcpsocket_default.kill()
|
||||
tcpsocket_9080.kill()
|
||||
})
|
||||
|
||||
it('with default host and port', async function () {
|
||||
let tcpconsumer_default = new Consumer({name:'tcpconsumer'})
|
||||
|
||||
return new Promise(async function (resolve, reject) {
|
||||
|
||||
tcpconsumer_default.processPacket = function (packet) {
|
||||
try {
|
||||
expect(packet.payload).to.equal('8080:tcp payload')
|
||||
resolve()
|
||||
}
|
||||
catch(error) {
|
||||
reject(error)
|
||||
}
|
||||
}
|
||||
|
||||
let [err] = await btc(tcpconsumer_default.connect)()
|
||||
if (err) reject(err)
|
||||
let packet = {payload:'tcp payload'}
|
||||
tcpconsumer_default.send(packet)
|
||||
|
||||
}) //end promise
|
||||
|
||||
}) // end tcp socket test
|
||||
|
||||
it('with alternate port, and passed consumer processor', async function () {
|
||||
|
||||
let tcpconsumer_9080 = new Consumer({packetProcessor: 'test', port:9080, name:'tcp-consumer-9080'})
|
||||
|
||||
return new Promise(async function (resolve, reject) {
|
||||
|
||||
tcpconsumer_9080.test = function (packet) {
|
||||
try {
|
||||
expect(packet.payload).to.equal('9080:tcp payload')
|
||||
resolve()
|
||||
}
|
||||
catch(error) {
|
||||
reject(error)
|
||||
}
|
||||
}
|
||||
|
||||
let [err] = await btc(tcpconsumer_9080.connect)()
|
||||
if (err) reject(err)
|
||||
let packet = {payload:'tcp payload'}
|
||||
tcpconsumer_9080.send(packet)
|
||||
|
||||
}) //end promise
|
||||
|
||||
}) // end tcp socket 2 test
|
||||
|
||||
|
||||
}) // end describe
|
|
@ -1,6 +0,0 @@
|
|||
export default {
|
||||
processPacket: async function (packet) {
|
||||
packet.payload = this.port +':'+packet.payload
|
||||
return packet
|
||||
}
|
||||
}
|
5
test/sockets/tcp-process.mjs
Normal file
5
test/sockets/tcp-process.mjs
Normal file
|
@ -0,0 +1,5 @@
|
|||
export default async function (packet) {
|
||||
// console.log(this.context)
|
||||
packet.payload = this.context.port +':'+packet.payload
|
||||
return packet
|
||||
}
|
|
@ -1,10 +1,10 @@
|
|||
import { Socket } from '../../src'
|
||||
import app from './tcp-app'
|
||||
import process from './tcp-process'
|
||||
|
||||
let socket = new Socket({port:9080, name:'tcp socket 9080'})
|
||||
|
||||
app.port = socket.listen_opts.port
|
||||
|
||||
let app = {port: socket.listen_opts.port}
|
||||
socket.registerPacketProcessor(process)
|
||||
;
|
||||
(async () => {
|
||||
|
||||
|
|
|
@ -1,14 +1,15 @@
|
|||
import { Socket } from '../../src'
|
||||
import app from './tcp-app'
|
||||
import process from './tcp-process'
|
||||
|
||||
let socket = new Socket({name:'tcp socket'})
|
||||
|
||||
app.port = socket.listen_opts.port
|
||||
Object.assign(socket,app)
|
||||
let app = {port: socket.listen_opts.port}
|
||||
socket.registerPacketProcessor(process)
|
||||
|
||||
;
|
||||
(async () => {
|
||||
|
||||
await socket.create()
|
||||
await socket.create(app)
|
||||
|
||||
})().catch(err => {
|
||||
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)
|
||||
|
|
|
@ -1,19 +0,0 @@
|
|||
import { Socket } from '../../src'
|
||||
|
||||
const USOCKET = __dirname + '/test.sock'
|
||||
|
||||
let socket = new Socket(USOCKET,{packetProcessor:'sprocessPacket',name:'unix socket'})
|
||||
|
||||
socket.sprocessPacket = async function (packet) {
|
||||
packet.payload = 'using alt processor name '+packet.payload
|
||||
return packet
|
||||
}
|
||||
|
||||
;
|
||||
(async () => {
|
||||
|
||||
await socket.create()
|
||||
|
||||
})().catch(err => {
|
||||
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)
|
||||
})
|
|
@ -1,21 +0,0 @@
|
|||
import { Socket } from '../../src'
|
||||
|
||||
const USOCKET = __dirname + '/test.sock'
|
||||
|
||||
let socket = new Socket(USOCKET,{name:'unix socket'})
|
||||
|
||||
let app ={}
|
||||
app.pp = 'test'
|
||||
app.test = async function (packet) {
|
||||
packet.payload = 'using passed processor with alt name '+packet.payload
|
||||
return packet
|
||||
}
|
||||
|
||||
;
|
||||
(async () => {
|
||||
|
||||
await socket.create(app)
|
||||
|
||||
})().catch(err => {
|
||||
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)
|
||||
})
|
|
@ -1,20 +0,0 @@
|
|||
import { Socket } from '../../src'
|
||||
|
||||
const USOCKET = __dirname + '/test.sock'
|
||||
|
||||
let socket = new Socket(USOCKET,{name:'unix socket'})
|
||||
|
||||
let app ={}
|
||||
app.processPacket = async function (packet) {
|
||||
packet.payload = 'using passed processor with default name '+packet.payload
|
||||
return packet
|
||||
}
|
||||
|
||||
;
|
||||
(async () => {
|
||||
|
||||
await socket.create(app)
|
||||
|
||||
})().catch(err => {
|
||||
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)
|
||||
})
|
|
@ -4,10 +4,14 @@ const USOCKET = __dirname + '/test.sock'
|
|||
|
||||
let socket = new Socket(USOCKET,{name:'unix socket'})
|
||||
|
||||
socket.processPacket = async function (packet) {
|
||||
packet.payload = 'using alt processor with default name '+packet.payload
|
||||
socket.registerPacketContext({test:'alt'})
|
||||
|
||||
socket.packet.test = 'local'
|
||||
|
||||
socket.registerPacketProcessor(async function (packet) {
|
||||
packet.payload = 'overwrite default processor from instance '+packet.payload
|
||||
return packet
|
||||
}
|
||||
})
|
||||
|
||||
;
|
||||
(async () => {
|
25
test/sockets/usocket-packet-alt-context.mjs
Normal file
25
test/sockets/usocket-packet-alt-context.mjs
Normal file
|
@ -0,0 +1,25 @@
|
|||
import { Socket } from '../../src'
|
||||
|
||||
const USOCKET = __dirname + '/test.sock'
|
||||
|
||||
let socket = new Socket(USOCKET,{name:'unix socket'})
|
||||
|
||||
// over writes default packet processing
|
||||
|
||||
// socket.registerPacketContext({test:'alt'})
|
||||
|
||||
socket.packet.test = 'local'
|
||||
|
||||
socket.registerPacketProcessor(async function (packet) {
|
||||
packet.payload = packet.payload +':'+this.test+':'+this.context.test
|
||||
return packet
|
||||
})
|
||||
|
||||
;
|
||||
(async () => {
|
||||
|
||||
await socket.create({test:'alt'})
|
||||
|
||||
})().catch(err => {
|
||||
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)
|
||||
})
|
|
@ -38,7 +38,7 @@ describe('Connects and Processes a payload in a JSON packet via TCP Socket', fun
|
|||
|
||||
return new Promise(async function (resolve, reject) {
|
||||
|
||||
tcpconsumer_default.processPacket = function (packet) {
|
||||
tcpconsumer_default.packet._process = function (packet) {
|
||||
try {
|
||||
expect(packet.payload).to.equal('8080:tcp payload')
|
||||
resolve()
|
||||
|
@ -57,13 +57,13 @@ describe('Connects and Processes a payload in a JSON packet via TCP Socket', fun
|
|||
|
||||
}) // end tcp socket test
|
||||
|
||||
it('with alternate port, and passed consumer processor', async function () {
|
||||
it('with alternate port, and registered consumer processor', async function () {
|
||||
|
||||
let tcpconsumer_9080 = new Consumer({packetProcessor: 'test', port:9080, name:'tcp-consumer-9080'})
|
||||
let tcpconsumer_9080 = new Consumer({port:9080, name:'tcp-consumer-9080'})
|
||||
|
||||
return new Promise(async function (resolve, reject) {
|
||||
|
||||
tcpconsumer_9080.test = function (packet) {
|
||||
tcpconsumer_9080.registerPacketProcessor(function (packet) {
|
||||
try {
|
||||
expect(packet.payload).to.equal('9080:tcp payload')
|
||||
resolve()
|
||||
|
@ -71,7 +71,7 @@ describe('Connects and Processes a payload in a JSON packet via TCP Socket', fun
|
|||
catch(error) {
|
||||
reject(error)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
let [err] = await btc(tcpconsumer_9080.connect)()
|
||||
if (err) reject(err)
|
||||
|
|
|
@ -1,117 +0,0 @@
|
|||
import { spawn } from 'child_process'
|
||||
import chai from 'chai'
|
||||
import chaiAsPromised from 'chai-as-promised'
|
||||
import btc from 'better-try-catch'
|
||||
chai.use(chaiAsPromised)
|
||||
const expect = chai.expect
|
||||
|
||||
import { Consumer } from '../src'
|
||||
|
||||
const USOCKET = __dirname + '/sockets/test.sock'
|
||||
const SOCKET_FILE = 'usocket-app-alt-name'
|
||||
|
||||
const delay = time => new Promise(res=>setTimeout(()=>res(),time))
|
||||
|
||||
let socket = {}
|
||||
|
||||
describe('Connects and Processes a payload via Unix Socket using JSON packet with passed processor using default name', function(){
|
||||
|
||||
before(async function(){
|
||||
socket = spawn('node',['-r', '@std/esm', './test/sockets/'+SOCKET_FILE])
|
||||
socket.stdout.on('data', function(buf) {
|
||||
console.log('[Socket]', String(buf))
|
||||
})
|
||||
|
||||
await delay(500) // wait for sockets to get going
|
||||
})
|
||||
|
||||
// beforeEach(async function(){
|
||||
// let consumer = new Consumer(USOCKET, {name:'unix-consumer'})
|
||||
// })
|
||||
|
||||
after(async function(){
|
||||
socket.kill()
|
||||
})
|
||||
|
||||
it('Tests JSON packet procssing, 10 packets', async function () {
|
||||
|
||||
let consumer = new Consumer(USOCKET, {name:'unix-consumer'})
|
||||
|
||||
return new Promise(async function (resolve, reject) {
|
||||
|
||||
setTimeout(() =>{ reject('10 packets not received in time')},1900)
|
||||
let app = {}
|
||||
app.times = 0
|
||||
app.pp = 'xprocessPacket'
|
||||
app.xprocessPacket = function (packet) {
|
||||
this.times++
|
||||
// console.log('times=',this.times,'\n',packet.payload)
|
||||
if (this.times!==11) return
|
||||
consumer.end()
|
||||
consumer.destroy()
|
||||
|
||||
try {
|
||||
expect(packet.payload).to.equal('using passed processor with alt name unix payload')
|
||||
resolve()
|
||||
}
|
||||
catch(error) {
|
||||
reject(error)
|
||||
}
|
||||
}
|
||||
|
||||
let [err] = await btc(consumer.connect)(app)
|
||||
if (err) reject(err)
|
||||
let packet = {payload:'unix payload'}
|
||||
for (var i = 0; i < 11; i++) {
|
||||
consumer.send(packet)
|
||||
}
|
||||
|
||||
}) //end promise
|
||||
|
||||
}) // end unix socket test
|
||||
|
||||
|
||||
it('unix socket with two consumers alternating packets, 10 packets each, alt consumer processor name', async function () {
|
||||
|
||||
let consumer = new Consumer(USOCKET, {name:'unix-consumer'})
|
||||
let consumer2 = new Consumer(USOCKET, {name:'unix-consumer2'})
|
||||
|
||||
return new Promise(async function (resolve, reject) {
|
||||
|
||||
setTimeout(() =>{ reject('10 packets not received in time')},1900)
|
||||
|
||||
let app = {}
|
||||
app.pp = 'yprocessPacket'
|
||||
app.times = 0
|
||||
app.yprocessPacket = function (packet) {
|
||||
this.times++
|
||||
if (this.times!==11) return
|
||||
try {
|
||||
expect(packet.payload).to.equal('using passed processor with alt name consumer 1 unix payload')
|
||||
resolve()
|
||||
}
|
||||
catch(error) {
|
||||
reject(error)
|
||||
}
|
||||
}
|
||||
|
||||
consumer2.processPacket = function (packet) {
|
||||
return packet
|
||||
}
|
||||
|
||||
let [err] = await btc(consumer.connect)(app)
|
||||
if (err) reject(err)
|
||||
let [err2] = await btc(consumer2.connect)()
|
||||
if (err2) reject(err2)
|
||||
let packet1 = {payload:'consumer 1 unix payload'}
|
||||
let packet2 = {payload:'consumer2 unix payload'}
|
||||
for (var i = 0; i < 11; i++) {
|
||||
consumer.send(packet1)
|
||||
consumer2.send(packet2)
|
||||
}
|
||||
|
||||
}) //end promise
|
||||
|
||||
}) // end unix socket test
|
||||
|
||||
}) // end describe
|
|
@ -1,115 +0,0 @@
|
|||
import { spawn } from 'child_process'
|
||||
import chai from 'chai'
|
||||
import chaiAsPromised from 'chai-as-promised'
|
||||
import btc from 'better-try-catch'
|
||||
chai.use(chaiAsPromised)
|
||||
const expect = chai.expect
|
||||
|
||||
import { Consumer } from '../src'
|
||||
|
||||
const USOCKET = __dirname + '/sockets/test.sock'
|
||||
const SOCKET_FILE = 'usocket-app-default-name'
|
||||
|
||||
const delay = time => new Promise(res=>setTimeout(()=>res(),time))
|
||||
|
||||
let socket = {}
|
||||
|
||||
describe('Connects and Processes a payload via Unix Socket using JSON packet with passed processor using default name', function(){
|
||||
|
||||
before(async function(){
|
||||
socket = spawn('node',['-r', '@std/esm', './test/sockets/'+SOCKET_FILE])
|
||||
socket.stdout.on('data', function(buf) {
|
||||
console.log('[Socket]', String(buf))
|
||||
})
|
||||
|
||||
await delay(500) // wait for sockets to get going
|
||||
})
|
||||
|
||||
// beforeEach(async function(){
|
||||
// let consumer = new Consumer(USOCKET, {name:'unix-consumer'})
|
||||
// })
|
||||
|
||||
after(async function(){
|
||||
socket.kill()
|
||||
})
|
||||
|
||||
it('Tests JSON packet procssing, 10 packets', async function () {
|
||||
|
||||
let consumer = new Consumer(USOCKET, {name:'unix-consumer'})
|
||||
|
||||
return new Promise(async function (resolve, reject) {
|
||||
|
||||
setTimeout(() =>{ reject('10 packets not received in time')},1900)
|
||||
let app = {}
|
||||
app.times = 0
|
||||
app.processPacket = function (packet) {
|
||||
this.times++
|
||||
// console.log('times=',this.times,'\n',packet.payload)
|
||||
if (this.times!==11) return
|
||||
consumer.end()
|
||||
consumer.destroy()
|
||||
|
||||
try {
|
||||
expect(packet.payload).to.equal('using passed processor with default name unix payload')
|
||||
resolve()
|
||||
}
|
||||
catch(error) {
|
||||
reject(error)
|
||||
}
|
||||
}
|
||||
|
||||
let [err] = await btc(consumer.connect)(app)
|
||||
if (err) reject(err)
|
||||
let packet = {payload:'unix payload'}
|
||||
for (var i = 0; i < 11; i++) {
|
||||
consumer.send(packet)
|
||||
}
|
||||
|
||||
}) //end promise
|
||||
|
||||
}) // end unix socket test
|
||||
|
||||
|
||||
it('unix socket with two consumers alternating packets, 10 packets each, alt consumer processor name', async function () {
|
||||
|
||||
let consumer = new Consumer(USOCKET, {name:'unix-consumer'})
|
||||
let consumer2 = new Consumer(USOCKET, {name:'unix-consumer2'})
|
||||
|
||||
return new Promise(async function (resolve, reject) {
|
||||
|
||||
setTimeout(() =>{ reject('10 packets not received in time')},1900)
|
||||
|
||||
let app = {}
|
||||
app.times = 0
|
||||
app.processPacket = function (packet) {
|
||||
this.times++
|
||||
if (this.times!==11) return
|
||||
try {
|
||||
expect(packet.payload).to.equal('using passed processor with default name consumer 1 unix payload')
|
||||
resolve()
|
||||
}
|
||||
catch(error) {
|
||||
reject(error)
|
||||
}
|
||||
}
|
||||
|
||||
consumer2.processPacket = function (packet) {
|
||||
return packet
|
||||
}
|
||||
|
||||
let [err] = await btc(consumer.connect)(app)
|
||||
if (err) reject(err)
|
||||
let [err2] = await btc(consumer2.connect)()
|
||||
if (err2) reject(err2)
|
||||
let packet1 = {payload:'consumer 1 unix payload'}
|
||||
let packet2 = {payload:'consumer2 unix payload'}
|
||||
for (var i = 0; i < 11; i++) {
|
||||
consumer.send(packet1)
|
||||
consumer2.send(packet2)
|
||||
}
|
||||
|
||||
}) //end promise
|
||||
|
||||
}) // end unix socket test
|
||||
|
||||
}) // end describe
|
|
@ -8,7 +8,7 @@ const expect = chai.expect
|
|||
import { Consumer } from '../src'
|
||||
|
||||
const USOCKET = __dirname + '/sockets/test.sock'
|
||||
const SOCKET_FILE = 'usocket-default-name'
|
||||
const SOCKET_FILE = 'usocket-default-overwrite'
|
||||
|
||||
let consumer = new Consumer(USOCKET, {name:'unix-consumer'})
|
||||
let consumer2 = new Consumer(USOCKET, {name:'unix-consumer2'})
|
||||
|
@ -34,18 +34,18 @@ describe('Connects and Processes a payload via Unix Socket using JSON packet wit
|
|||
|
||||
it('Tests JSON packet procssing, 10 packets', async function () {
|
||||
|
||||
consumer.times = 0
|
||||
consumer.packet.times = 0
|
||||
|
||||
return new Promise(async function (resolve, reject) {
|
||||
|
||||
setTimeout(() =>{ reject('10 packets not received in time')},1900)
|
||||
|
||||
consumer.processPacket = function (packet) {
|
||||
consumer.packet._process = function (packet) {
|
||||
this.times++
|
||||
if (this.times!==11) return
|
||||
|
||||
try {
|
||||
expect(packet.payload).to.equal('using alt processor with default name unix payload')
|
||||
expect(packet.payload).to.equal('overwrite default processor from instance unix payload')
|
||||
resolve()
|
||||
}
|
||||
catch(error) {
|
||||
|
@ -67,19 +67,19 @@ describe('Connects and Processes a payload via Unix Socket using JSON packet wit
|
|||
|
||||
it('unix socket with two consumers alternating packets, 10 packets each', async function () {
|
||||
|
||||
consumer.times = 0
|
||||
consumer.packet.times = 0
|
||||
|
||||
return new Promise(async function (resolve, reject) {
|
||||
|
||||
setTimeout(() =>{ reject('10 packets not received in time')},1900)
|
||||
|
||||
consumer.processPacket = function (packet) {
|
||||
consumer.packet._process = function (packet) {
|
||||
this.times++
|
||||
// console.log(this.times,packet.payload)
|
||||
if (this.times!==11) return
|
||||
|
||||
try {
|
||||
expect(packet.payload).to.equal('using alt processor with default name consumer 1 unix payload')
|
||||
expect(packet.payload).to.equal('overwrite default processor from instance consumer 1 unix payload')
|
||||
resolve()
|
||||
}
|
||||
catch(error) {
|
|
@ -32,20 +32,22 @@ describe('Connects and Processes a payload via Unix Socket using JSON packet wit
|
|||
socket.kill()
|
||||
})
|
||||
|
||||
it('Tests unix socket with default echo JSON packet procssing, 10 packets', async function () {
|
||||
it('Tests unix socket with default echo JSON packet procssing, 10 packets with conect via connect', async function () {
|
||||
|
||||
consumer.times = 0
|
||||
consumer.packet.times = 0
|
||||
|
||||
return new Promise(async function (resolve, reject) {
|
||||
|
||||
setTimeout(() =>{ reject('10 packets not received in time')},1900)
|
||||
|
||||
consumer.processPacket = function (packet) {
|
||||
consumer.packet._process = function (packet) {
|
||||
this.times++
|
||||
if (this.times!==11) return
|
||||
|
||||
packet.payload = packet.payload + this.context.test
|
||||
|
||||
try {
|
||||
expect(packet.payload).to.equal('unix payload')
|
||||
expect(packet.payload).to.equal('unix payload plus context at connect')
|
||||
resolve()
|
||||
}
|
||||
catch(error) {
|
||||
|
@ -53,7 +55,7 @@ describe('Connects and Processes a payload via Unix Socket using JSON packet wit
|
|||
}
|
||||
}
|
||||
|
||||
let [err] = await btc(consumer.connect)()
|
||||
let [err] = await btc(consumer.connect)({test:' plus context at connect'})
|
||||
if (err) reject(err)
|
||||
let packet = {payload:'unix payload'}
|
||||
for (var i = 0; i < 11; i++) {
|
||||
|
@ -65,29 +67,33 @@ describe('Connects and Processes a payload via Unix Socket using JSON packet wit
|
|||
}) // end unix socket test
|
||||
|
||||
|
||||
it('unix socket with two consumers alternating packets, 10 packets each', async function () {
|
||||
it('unix socket with two consumers alternating packets, 10 packets each with local and added context', async function () {
|
||||
|
||||
consumer.times = 0
|
||||
consumer.packet.times = 0
|
||||
consumer.packet.test = ':local'
|
||||
consumer.registerPacketContext({test:':added'})
|
||||
|
||||
return new Promise(async function (resolve, reject) {
|
||||
|
||||
setTimeout(() =>{ reject('10 packets not received in time')},1900)
|
||||
|
||||
consumer.processPacket = function (packet) {
|
||||
consumer.registerPacketProcessor(function (packet) {
|
||||
this.times++
|
||||
// console.log(this.times,packet.payload)
|
||||
if (this.times!==11) return
|
||||
|
||||
packet.payload = packet.payload + this.context.test + this.test
|
||||
|
||||
try {
|
||||
expect(packet.payload).to.equal('consumer 1 unix payload')
|
||||
expect(packet.payload).to.equal('consumer 1 unix payload:added:local')
|
||||
resolve()
|
||||
}
|
||||
catch(error) {
|
||||
reject(error)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
consumer2.processPacket = function (packet) {
|
||||
consumer2.packet._process = function (packet) {
|
||||
return packet
|
||||
}
|
||||
|
||||
|
|
|
@ -8,7 +8,8 @@ const expect = chai.expect
|
|||
import { Consumer } from '../src'
|
||||
|
||||
const USOCKET = __dirname + '/sockets/test.sock'
|
||||
const SOCKET_FILE = 'usocket-alt-name'
|
||||
const SOCKET_FILE = 'usocket-packet-alt-context'
|
||||
const COUNT = 10000
|
||||
|
||||
let consumer = new Consumer(USOCKET, {name:'unix-consumer'})
|
||||
let consumer2 = new Consumer(USOCKET, {name:'unix-consumer2'})
|
||||
|
@ -17,7 +18,7 @@ const delay = time => new Promise(res=>setTimeout(()=>res(),time))
|
|||
|
||||
let socket = {}
|
||||
|
||||
describe('Connects and Processes a payload via Unix Socket using JSON packet with alt named processor', function(){
|
||||
describe('Connects and Processes a payload via Unix Socket using JSON packet with local and alt context', function(){
|
||||
|
||||
before(async function(){
|
||||
socket = spawn('node',['-r', '@std/esm', './test/sockets/'+SOCKET_FILE])
|
||||
|
@ -32,20 +33,20 @@ describe('Connects and Processes a payload via Unix Socket using JSON packet wit
|
|||
socket.kill()
|
||||
})
|
||||
|
||||
it('Tests JSON packet procssing, 10 packets', async function () {
|
||||
it('Tests JSON packet procssing, 10 packets using external context at socket', async function () {
|
||||
|
||||
consumer.times = 0
|
||||
consumer.packet.times = 0
|
||||
|
||||
return new Promise(async function (resolve, reject) {
|
||||
|
||||
setTimeout(() =>{ reject('10 packets not received in time')},1900)
|
||||
|
||||
consumer.processPacket = function (packet) {
|
||||
consumer.packet._process = function (packet) {
|
||||
this.times++
|
||||
if (this.times!==11) return
|
||||
|
||||
try {
|
||||
expect(packet.payload).to.equal('using alt processor name unix payload')
|
||||
expect(packet.payload).to.equal('unix payload:local:alt')
|
||||
resolve()
|
||||
}
|
||||
catch(error) {
|
||||
|
@ -65,21 +66,20 @@ describe('Connects and Processes a payload via Unix Socket using JSON packet wit
|
|||
}) // end unix socket test
|
||||
|
||||
|
||||
it('unix socket with two consumers alternating packets, 10 packets each', async function () {
|
||||
it(`unix socket with two consumers alternating packets, ${COUNT} packets each using external context at socket`, async function () {
|
||||
|
||||
consumer.times = 0
|
||||
consumer.packet.times = 0
|
||||
|
||||
return new Promise(async function (resolve, reject) {
|
||||
|
||||
setTimeout(() =>{ reject('10 packets not received in time')},1900)
|
||||
|
||||
consumer.processPacket = function (packet) {
|
||||
consumer.packet._process = function (packet) {
|
||||
this.times++
|
||||
// console.log(this.times,packet.payload)
|
||||
if (this.times!==11) return
|
||||
|
||||
if (this.times!==COUNT) return
|
||||
try {
|
||||
expect(packet.payload).to.equal('using alt processor name consumer 1 unix payload')
|
||||
expect(packet.payload).to.equal('consumer 1 unix payload:local:alt')
|
||||
resolve()
|
||||
}
|
||||
catch(error) {
|
||||
|
@ -87,15 +87,11 @@ describe('Connects and Processes a payload via Unix Socket using JSON packet wit
|
|||
}
|
||||
}
|
||||
|
||||
consumer2.processPacket = function (packet) {
|
||||
return packet
|
||||
}
|
||||
|
||||
let [err] = await btc(consumer2.connect)()
|
||||
if (err) reject(err)
|
||||
let packet1 = {payload:'consumer 1 unix payload'}
|
||||
let packet2 = {payload:'consumer2 unix payload'}
|
||||
for (var i = 0; i < 11; i++) {
|
||||
for (var i = 0; i < COUNT; i++) {
|
||||
consumer.send(packet1)
|
||||
consumer2.send(packet2)
|
||||
}
|
Loading…
Reference in a new issue