diff --git a/src/consumer.mjs b/src/consumer.mjs index 248f84a..12ba4e5 100644 --- a/src/consumer.mjs +++ b/src/consumer.mjs @@ -13,12 +13,22 @@ export default class Consumer extends Socket { this.host = opts.host || '127.0.0.1' this.port = opts.port || 8080 } else this.path = path - this.pp = opts.packetProcessor || 'processPacket' + this.packet = { + _process: async (packet) => { + console.log('default processor -- packet from socket') + console.dir(packet) + return packet } + } this.keepAlive = opts.keepAlive ? opts.keepAlive : true this._ready = false this.timeout = opts.timeout || 500 this.wait = opts.wait || 5 this.stream = new JsonStream() + this.packet = { + _process: (packet) => { + packet.res='echoed' + return packet } + } // logging this.log_file=opts.log_file || './socket.log' this.log_opts = {streams:[]} @@ -33,23 +43,14 @@ export default class Consumer extends Socket { ready() {return this._ready} - async connect (app={}) { + async connect (context) { - if (Object.keys(app).length === 0) app = this - else app.pp = app.pp || this.pp - // set a default processor if none provided - if (!app[app.pp]) { - app.pp = 'processPacket' // reset in case app[app.pp] is missing - app.processPacket = async (packet) => { - console.log('default processor -- packet from socket') - console.dir(packet) - return packet } - } + if (context) this.packet.context = context // console.log('consumer processing=>',app.pp,'=>', app[app.pp]) // console.log('========================') - this.listen(app) + this.listen() this.log.info('listening') return new Promise( (resolve,reject) => { @@ -76,10 +77,10 @@ export default class Consumer extends Socket { async send(packet) { await this.write(this.stream.serialize(packet)) - // throw new Error('Cannot send connection not ready') + // handle error here? } - async listen (app) { + async listen () { this.on('data', this.stream.onData) @@ -90,11 +91,19 @@ export default class Consumer extends Socket { if (packet.ready) { this._ready = true return } - // console.log('consumer processor',app[app.pp]) - await app[app.pp].bind(app)(packet) + // console.log('consumer processor',this.packet._process) + await this.packet._process(packet) } } + registerPacketContext(obj) { + this.packet.context = obj + } + + registerPacketProcessor (func) { + this.packet._process = func + } + } // end class // wait for handshake packet from socket diff --git a/src/json.mjs b/src/json.mjs index 46516c9..152fa97 100644 --- a/src/json.mjs +++ b/src/json.mjs @@ -1,3 +1,5 @@ +// adpated from https://github.com/sebastianseilund/node-json-socket + import {StringDecoder} from 'string_decoder' import EventEmitter from 'events' diff --git a/src/socket.mjs b/src/socket.mjs index fc9f158..62b45fe 100644 --- a/src/socket.mjs +++ b/src/socket.mjs @@ -14,7 +14,12 @@ export default class Socket extends Server { opts = path this.listen_opts = { host: opts.host || '127.0.0.1', port: opts.port || 8080} } else this.listen_opts = { path: path } - this.pp = opts.packetProcessor || 'processPacket' + // default packet processing + this.packet = { + _process: (packet) => { + packet.res='echoed' + return packet } + } // logging this.log_file=opts.log_file || './socket.log' this.log_opts = {streams:[]} @@ -23,22 +28,14 @@ export default class Socket extends Server { if (opts.log) this.log_opts.streams.push({level: 'info',stream: process.stdout}) this.log = bunyan.createLogger(this.log_opts) //binding + this.listen = this.listen.bind(this) + this.create = this.create.bind(this) } // end constructor - async create (app={}) { + async create (context) { - if (Object.keys(app).length === 0) app = this - else app.pp = app.pp || this.pp - if (!app[app.pp]) { - this.pp = 'processPacket' // reset in case app.pp was set but app.pp function is missing - app.processPacket = async (packet) => { - packet.res='echoed' - return packet } - } - - // console.log('socket processing3=>',app.pp,'=>', app[app.pp]) - // console.log('========================') + if (context) this.packet.context = context return new Promise( async (resolve,reject) => { @@ -60,7 +57,7 @@ export default class Socket extends Server { if (path) { // if TCP socket should already be dead this.log.info({socket: path}, 'already exists...deleting') await fileDelete(path) - return await this.listen.bind(this)(this.listen_opts,app) + return await this.listen(this.listen_opts) } } // otherwise fatally exit @@ -68,14 +65,14 @@ export default class Socket extends Server { reject(err) }) - let [err, res] = await btc(this.listen).bind(this)(this.listen_opts,app) + let [err, res] = await btc(this.listen)(this.listen_opts) if (err) reject(err) resolve(res) }) // end promise } // end create - async listen (opts,app) { + async listen (opts) { super.listen(opts, async (err, res) => { @@ -91,15 +88,11 @@ export default class Socket extends Server { socket.on('data', stream.onData) - stream.on('message', async function (packet) { - socket.write(stream.serialize(await app[app.pp].bind(app)(packet))) - }) - - // stream.on('message', messageProcess.bind(this)) - // async function messageProcess (packet) { - // console.log('pp', app.processPacket) - // socket.write(stream.serialize(await app[app.pp].bind(app)(packet))) - // } + stream.on('message', messageProcess.bind(this)) + async function messageProcess (packet) { + // console.log(this.packet._process.toString()) + socket.write(stream.serialize(await this.packet._process(packet))) + } }) // end connected consumer @@ -115,8 +108,14 @@ export default class Socket extends Server { await this.close() this.log.info('all connections closed....exiting') process.exit() + } - } // end destroy + registerPacketContext(obj) { + this.packet.context = obj + } + registerPacketProcessor (func) { + this.packet._process = func + } } // end class diff --git a/test/off/tcp.test.mjs b/test/off/tcp.test.mjs deleted file mode 100644 index 6ef8762..0000000 --- a/test/off/tcp.test.mjs +++ /dev/null @@ -1,86 +0,0 @@ -import { spawn } from 'child_process' -import chai from 'chai' -import chaiAsPromised from 'chai-as-promised' -import btc from 'better-try-catch' -chai.use(chaiAsPromised) -const expect = chai.expect - -import { Consumer } from '../src' - -const delay = time => new Promise(res=>setTimeout(()=>res(),time)) - -let tcpsocket_default = {} -let tcpsocket_9080 = {} - -describe('Connects and Processes a payload in a JSON packet via TCP Socket', function(){ - - before(async function(){ - tcpsocket_default = spawn('node',['-r', '@std/esm', './test/sockets/tcpsocket-default']) - tcpsocket_default.stdout.on('data', function(buf) { - console.log('[Socket]', String(buf)) - }) - - tcpsocket_9080 = spawn('node',['-r', '@std/esm', './test/sockets/tcpsocket-9080']) - tcpsocket_9080.stdout.on('data', function(buf) { - console.log('[Socket]', String(buf)) - }) - - await delay(500) // wait for sockets to get going - }) - - after(async function(){ - tcpsocket_default.kill() - tcpsocket_9080.kill() - }) - - it('with default host and port', async function () { - let tcpconsumer_default = new Consumer({name:'tcpconsumer'}) - - return new Promise(async function (resolve, reject) { - - tcpconsumer_default.processPacket = function (packet) { - try { - expect(packet.payload).to.equal('8080:tcp payload') - resolve() - } - catch(error) { - reject(error) - } - } - - let [err] = await btc(tcpconsumer_default.connect)() - if (err) reject(err) - let packet = {payload:'tcp payload'} - tcpconsumer_default.send(packet) - - }) //end promise - - }) // end tcp socket test - - it('with alternate port, and passed consumer processor', async function () { - - let tcpconsumer_9080 = new Consumer({packetProcessor: 'test', port:9080, name:'tcp-consumer-9080'}) - - return new Promise(async function (resolve, reject) { - - tcpconsumer_9080.test = function (packet) { - try { - expect(packet.payload).to.equal('9080:tcp payload') - resolve() - } - catch(error) { - reject(error) - } - } - - let [err] = await btc(tcpconsumer_9080.connect)() - if (err) reject(err) - let packet = {payload:'tcp payload'} - tcpconsumer_9080.send(packet) - - }) //end promise - - }) // end tcp socket 2 test - - -}) // end describe diff --git a/test/sockets/tcp-app.mjs b/test/sockets/tcp-app.mjs deleted file mode 100644 index bc008e4..0000000 --- a/test/sockets/tcp-app.mjs +++ /dev/null @@ -1,6 +0,0 @@ -export default { - processPacket: async function (packet) { - packet.payload = this.port +':'+packet.payload - return packet - } -} diff --git a/test/sockets/tcp-process.mjs b/test/sockets/tcp-process.mjs new file mode 100644 index 0000000..8aef323 --- /dev/null +++ b/test/sockets/tcp-process.mjs @@ -0,0 +1,5 @@ +export default async function (packet) { + // console.log(this.context) + packet.payload = this.context.port +':'+packet.payload + return packet +} diff --git a/test/sockets/tcpsocket-9080.mjs b/test/sockets/tcpsocket-9080.mjs index abf74f4..86d41e3 100644 --- a/test/sockets/tcpsocket-9080.mjs +++ b/test/sockets/tcpsocket-9080.mjs @@ -1,10 +1,10 @@ import { Socket } from '../../src' -import app from './tcp-app' +import process from './tcp-process' let socket = new Socket({port:9080, name:'tcp socket 9080'}) -app.port = socket.listen_opts.port - +let app = {port: socket.listen_opts.port} +socket.registerPacketProcessor(process) ; (async () => { diff --git a/test/sockets/tcpsocket-default.mjs b/test/sockets/tcpsocket-default.mjs index 9ad7e9d..6503b01 100644 --- a/test/sockets/tcpsocket-default.mjs +++ b/test/sockets/tcpsocket-default.mjs @@ -1,14 +1,15 @@ import { Socket } from '../../src' -import app from './tcp-app' +import process from './tcp-process' let socket = new Socket({name:'tcp socket'}) -app.port = socket.listen_opts.port -Object.assign(socket,app) +let app = {port: socket.listen_opts.port} +socket.registerPacketProcessor(process) + ; (async () => { - await socket.create() + await socket.create(app) })().catch(err => { console.error('FATAL: UNABLE TO START SYSTEM!\n',err) diff --git a/test/sockets/usocket-alt-name.mjs b/test/sockets/usocket-alt-name.mjs deleted file mode 100644 index fd612e5..0000000 --- a/test/sockets/usocket-alt-name.mjs +++ /dev/null @@ -1,19 +0,0 @@ -import { Socket } from '../../src' - -const USOCKET = __dirname + '/test.sock' - -let socket = new Socket(USOCKET,{packetProcessor:'sprocessPacket',name:'unix socket'}) - -socket.sprocessPacket = async function (packet) { - packet.payload = 'using alt processor name '+packet.payload - return packet -} - -; -(async () => { - - await socket.create() - -})().catch(err => { - console.error('FATAL: UNABLE TO START SYSTEM!\n',err) -}) diff --git a/test/sockets/usocket-app-alt-name.mjs b/test/sockets/usocket-app-alt-name.mjs deleted file mode 100644 index ebee0f7..0000000 --- a/test/sockets/usocket-app-alt-name.mjs +++ /dev/null @@ -1,21 +0,0 @@ -import { Socket } from '../../src' - -const USOCKET = __dirname + '/test.sock' - -let socket = new Socket(USOCKET,{name:'unix socket'}) - -let app ={} -app.pp = 'test' -app.test = async function (packet) { - packet.payload = 'using passed processor with alt name '+packet.payload - return packet -} - -; -(async () => { - - await socket.create(app) - -})().catch(err => { - console.error('FATAL: UNABLE TO START SYSTEM!\n',err) -}) diff --git a/test/sockets/usocket-app-default-name.mjs b/test/sockets/usocket-app-default-name.mjs deleted file mode 100644 index 42c0684..0000000 --- a/test/sockets/usocket-app-default-name.mjs +++ /dev/null @@ -1,20 +0,0 @@ -import { Socket } from '../../src' - -const USOCKET = __dirname + '/test.sock' - -let socket = new Socket(USOCKET,{name:'unix socket'}) - -let app ={} -app.processPacket = async function (packet) { - packet.payload = 'using passed processor with default name '+packet.payload - return packet -} - -; -(async () => { - - await socket.create(app) - -})().catch(err => { - console.error('FATAL: UNABLE TO START SYSTEM!\n',err) -}) diff --git a/test/sockets/usocket-default-name.mjs b/test/sockets/usocket-default-overwrite.mjs similarity index 56% rename from test/sockets/usocket-default-name.mjs rename to test/sockets/usocket-default-overwrite.mjs index 97d8be6..edb0da4 100644 --- a/test/sockets/usocket-default-name.mjs +++ b/test/sockets/usocket-default-overwrite.mjs @@ -4,10 +4,14 @@ const USOCKET = __dirname + '/test.sock' let socket = new Socket(USOCKET,{name:'unix socket'}) -socket.processPacket = async function (packet) { - packet.payload = 'using alt processor with default name '+packet.payload +socket.registerPacketContext({test:'alt'}) + +socket.packet.test = 'local' + +socket.registerPacketProcessor(async function (packet) { + packet.payload = 'overwrite default processor from instance '+packet.payload return packet -} +}) ; (async () => { diff --git a/test/sockets/usocket-packet-alt-context.mjs b/test/sockets/usocket-packet-alt-context.mjs new file mode 100644 index 0000000..b34e84d --- /dev/null +++ b/test/sockets/usocket-packet-alt-context.mjs @@ -0,0 +1,25 @@ +import { Socket } from '../../src' + +const USOCKET = __dirname + '/test.sock' + +let socket = new Socket(USOCKET,{name:'unix socket'}) + +// over writes default packet processing + +// socket.registerPacketContext({test:'alt'}) + +socket.packet.test = 'local' + +socket.registerPacketProcessor(async function (packet) { + packet.payload = packet.payload +':'+this.test+':'+this.context.test + return packet +}) + +; +(async () => { + + await socket.create({test:'alt'}) + +})().catch(err => { + console.error('FATAL: UNABLE TO START SYSTEM!\n',err) +}) diff --git a/test/tcp.test.mjs b/test/tcp.test.mjs index 6ef8762..b65bd0d 100644 --- a/test/tcp.test.mjs +++ b/test/tcp.test.mjs @@ -38,7 +38,7 @@ describe('Connects and Processes a payload in a JSON packet via TCP Socket', fun return new Promise(async function (resolve, reject) { - tcpconsumer_default.processPacket = function (packet) { + tcpconsumer_default.packet._process = function (packet) { try { expect(packet.payload).to.equal('8080:tcp payload') resolve() @@ -57,13 +57,13 @@ describe('Connects and Processes a payload in a JSON packet via TCP Socket', fun }) // end tcp socket test - it('with alternate port, and passed consumer processor', async function () { + it('with alternate port, and registered consumer processor', async function () { - let tcpconsumer_9080 = new Consumer({packetProcessor: 'test', port:9080, name:'tcp-consumer-9080'}) + let tcpconsumer_9080 = new Consumer({port:9080, name:'tcp-consumer-9080'}) return new Promise(async function (resolve, reject) { - tcpconsumer_9080.test = function (packet) { + tcpconsumer_9080.registerPacketProcessor(function (packet) { try { expect(packet.payload).to.equal('9080:tcp payload') resolve() @@ -71,7 +71,7 @@ describe('Connects and Processes a payload in a JSON packet via TCP Socket', fun catch(error) { reject(error) } - } + }) let [err] = await btc(tcpconsumer_9080.connect)() if (err) reject(err) diff --git a/test/usocket-app-alt-name.test.mjs b/test/usocket-app-alt-name.test.mjs deleted file mode 100644 index 6ab8257..0000000 --- a/test/usocket-app-alt-name.test.mjs +++ /dev/null @@ -1,117 +0,0 @@ -import { spawn } from 'child_process' -import chai from 'chai' -import chaiAsPromised from 'chai-as-promised' -import btc from 'better-try-catch' -chai.use(chaiAsPromised) -const expect = chai.expect - -import { Consumer } from '../src' - -const USOCKET = __dirname + '/sockets/test.sock' -const SOCKET_FILE = 'usocket-app-alt-name' - -const delay = time => new Promise(res=>setTimeout(()=>res(),time)) - -let socket = {} - -describe('Connects and Processes a payload via Unix Socket using JSON packet with passed processor using default name', function(){ - - before(async function(){ - socket = spawn('node',['-r', '@std/esm', './test/sockets/'+SOCKET_FILE]) - socket.stdout.on('data', function(buf) { - console.log('[Socket]', String(buf)) - }) - - await delay(500) // wait for sockets to get going - }) - - // beforeEach(async function(){ - // let consumer = new Consumer(USOCKET, {name:'unix-consumer'}) - // }) - - after(async function(){ - socket.kill() - }) - - it('Tests JSON packet procssing, 10 packets', async function () { - - let consumer = new Consumer(USOCKET, {name:'unix-consumer'}) - - return new Promise(async function (resolve, reject) { - - setTimeout(() =>{ reject('10 packets not received in time')},1900) - let app = {} - app.times = 0 - app.pp = 'xprocessPacket' - app.xprocessPacket = function (packet) { - this.times++ - // console.log('times=',this.times,'\n',packet.payload) - if (this.times!==11) return - consumer.end() - consumer.destroy() - - try { - expect(packet.payload).to.equal('using passed processor with alt name unix payload') - resolve() - } - catch(error) { - reject(error) - } - } - - let [err] = await btc(consumer.connect)(app) - if (err) reject(err) - let packet = {payload:'unix payload'} - for (var i = 0; i < 11; i++) { - consumer.send(packet) - } - - }) //end promise - - }) // end unix socket test - - - it('unix socket with two consumers alternating packets, 10 packets each, alt consumer processor name', async function () { - - let consumer = new Consumer(USOCKET, {name:'unix-consumer'}) - let consumer2 = new Consumer(USOCKET, {name:'unix-consumer2'}) - - return new Promise(async function (resolve, reject) { - - setTimeout(() =>{ reject('10 packets not received in time')},1900) - - let app = {} - app.pp = 'yprocessPacket' - app.times = 0 - app.yprocessPacket = function (packet) { - this.times++ - if (this.times!==11) return - try { - expect(packet.payload).to.equal('using passed processor with alt name consumer 1 unix payload') - resolve() - } - catch(error) { - reject(error) - } - } - - consumer2.processPacket = function (packet) { - return packet - } - - let [err] = await btc(consumer.connect)(app) - if (err) reject(err) - let [err2] = await btc(consumer2.connect)() - if (err2) reject(err2) - let packet1 = {payload:'consumer 1 unix payload'} - let packet2 = {payload:'consumer2 unix payload'} - for (var i = 0; i < 11; i++) { - consumer.send(packet1) - consumer2.send(packet2) - } - - }) //end promise - - }) // end unix socket test - -}) // end describe diff --git a/test/usocket-app-default-name.test.mjs b/test/usocket-app-default-name.test.mjs deleted file mode 100644 index 106c351..0000000 --- a/test/usocket-app-default-name.test.mjs +++ /dev/null @@ -1,115 +0,0 @@ -import { spawn } from 'child_process' -import chai from 'chai' -import chaiAsPromised from 'chai-as-promised' -import btc from 'better-try-catch' -chai.use(chaiAsPromised) -const expect = chai.expect - -import { Consumer } from '../src' - -const USOCKET = __dirname + '/sockets/test.sock' -const SOCKET_FILE = 'usocket-app-default-name' - -const delay = time => new Promise(res=>setTimeout(()=>res(),time)) - -let socket = {} - -describe('Connects and Processes a payload via Unix Socket using JSON packet with passed processor using default name', function(){ - - before(async function(){ - socket = spawn('node',['-r', '@std/esm', './test/sockets/'+SOCKET_FILE]) - socket.stdout.on('data', function(buf) { - console.log('[Socket]', String(buf)) - }) - - await delay(500) // wait for sockets to get going - }) - - // beforeEach(async function(){ - // let consumer = new Consumer(USOCKET, {name:'unix-consumer'}) - // }) - - after(async function(){ - socket.kill() - }) - - it('Tests JSON packet procssing, 10 packets', async function () { - - let consumer = new Consumer(USOCKET, {name:'unix-consumer'}) - - return new Promise(async function (resolve, reject) { - - setTimeout(() =>{ reject('10 packets not received in time')},1900) - let app = {} - app.times = 0 - app.processPacket = function (packet) { - this.times++ - // console.log('times=',this.times,'\n',packet.payload) - if (this.times!==11) return - consumer.end() - consumer.destroy() - - try { - expect(packet.payload).to.equal('using passed processor with default name unix payload') - resolve() - } - catch(error) { - reject(error) - } - } - - let [err] = await btc(consumer.connect)(app) - if (err) reject(err) - let packet = {payload:'unix payload'} - for (var i = 0; i < 11; i++) { - consumer.send(packet) - } - - }) //end promise - - }) // end unix socket test - - - it('unix socket with two consumers alternating packets, 10 packets each, alt consumer processor name', async function () { - - let consumer = new Consumer(USOCKET, {name:'unix-consumer'}) - let consumer2 = new Consumer(USOCKET, {name:'unix-consumer2'}) - - return new Promise(async function (resolve, reject) { - - setTimeout(() =>{ reject('10 packets not received in time')},1900) - - let app = {} - app.times = 0 - app.processPacket = function (packet) { - this.times++ - if (this.times!==11) return - try { - expect(packet.payload).to.equal('using passed processor with default name consumer 1 unix payload') - resolve() - } - catch(error) { - reject(error) - } - } - - consumer2.processPacket = function (packet) { - return packet - } - - let [err] = await btc(consumer.connect)(app) - if (err) reject(err) - let [err2] = await btc(consumer2.connect)() - if (err2) reject(err2) - let packet1 = {payload:'consumer 1 unix payload'} - let packet2 = {payload:'consumer2 unix payload'} - for (var i = 0; i < 11; i++) { - consumer.send(packet1) - consumer2.send(packet2) - } - - }) //end promise - - }) // end unix socket test - -}) // end describe diff --git a/test/usocket-default-name.test.mjs b/test/usocket-default-overwrite.test.mjs similarity index 85% rename from test/usocket-default-name.test.mjs rename to test/usocket-default-overwrite.test.mjs index 3cdf939..654a7d2 100644 --- a/test/usocket-default-name.test.mjs +++ b/test/usocket-default-overwrite.test.mjs @@ -8,7 +8,7 @@ const expect = chai.expect import { Consumer } from '../src' const USOCKET = __dirname + '/sockets/test.sock' -const SOCKET_FILE = 'usocket-default-name' +const SOCKET_FILE = 'usocket-default-overwrite' let consumer = new Consumer(USOCKET, {name:'unix-consumer'}) let consumer2 = new Consumer(USOCKET, {name:'unix-consumer2'}) @@ -34,18 +34,18 @@ describe('Connects and Processes a payload via Unix Socket using JSON packet wit it('Tests JSON packet procssing, 10 packets', async function () { - consumer.times = 0 + consumer.packet.times = 0 return new Promise(async function (resolve, reject) { setTimeout(() =>{ reject('10 packets not received in time')},1900) - consumer.processPacket = function (packet) { + consumer.packet._process = function (packet) { this.times++ if (this.times!==11) return try { - expect(packet.payload).to.equal('using alt processor with default name unix payload') + expect(packet.payload).to.equal('overwrite default processor from instance unix payload') resolve() } catch(error) { @@ -67,19 +67,19 @@ describe('Connects and Processes a payload via Unix Socket using JSON packet wit it('unix socket with two consumers alternating packets, 10 packets each', async function () { - consumer.times = 0 + consumer.packet.times = 0 return new Promise(async function (resolve, reject) { setTimeout(() =>{ reject('10 packets not received in time')},1900) - consumer.processPacket = function (packet) { + consumer.packet._process = function (packet) { this.times++ // console.log(this.times,packet.payload) if (this.times!==11) return try { - expect(packet.payload).to.equal('using alt processor with default name consumer 1 unix payload') + expect(packet.payload).to.equal('overwrite default processor from instance consumer 1 unix payload') resolve() } catch(error) { diff --git a/test/usocket-default.test.mjs b/test/usocket-default.test.mjs index 9fb523e..cca37d5 100644 --- a/test/usocket-default.test.mjs +++ b/test/usocket-default.test.mjs @@ -32,20 +32,22 @@ describe('Connects and Processes a payload via Unix Socket using JSON packet wit socket.kill() }) - it('Tests unix socket with default echo JSON packet procssing, 10 packets', async function () { + it('Tests unix socket with default echo JSON packet procssing, 10 packets with conect via connect', async function () { - consumer.times = 0 + consumer.packet.times = 0 return new Promise(async function (resolve, reject) { setTimeout(() =>{ reject('10 packets not received in time')},1900) - consumer.processPacket = function (packet) { + consumer.packet._process = function (packet) { this.times++ if (this.times!==11) return + packet.payload = packet.payload + this.context.test + try { - expect(packet.payload).to.equal('unix payload') + expect(packet.payload).to.equal('unix payload plus context at connect') resolve() } catch(error) { @@ -53,7 +55,7 @@ describe('Connects and Processes a payload via Unix Socket using JSON packet wit } } - let [err] = await btc(consumer.connect)() + let [err] = await btc(consumer.connect)({test:' plus context at connect'}) if (err) reject(err) let packet = {payload:'unix payload'} for (var i = 0; i < 11; i++) { @@ -65,29 +67,33 @@ describe('Connects and Processes a payload via Unix Socket using JSON packet wit }) // end unix socket test - it('unix socket with two consumers alternating packets, 10 packets each', async function () { + it('unix socket with two consumers alternating packets, 10 packets each with local and added context', async function () { - consumer.times = 0 + consumer.packet.times = 0 + consumer.packet.test = ':local' + consumer.registerPacketContext({test:':added'}) return new Promise(async function (resolve, reject) { setTimeout(() =>{ reject('10 packets not received in time')},1900) - consumer.processPacket = function (packet) { + consumer.registerPacketProcessor(function (packet) { this.times++ // console.log(this.times,packet.payload) if (this.times!==11) return + packet.payload = packet.payload + this.context.test + this.test + try { - expect(packet.payload).to.equal('consumer 1 unix payload') + expect(packet.payload).to.equal('consumer 1 unix payload:added:local') resolve() } catch(error) { reject(error) } - } + }) - consumer2.processPacket = function (packet) { + consumer2.packet._process = function (packet) { return packet } diff --git a/test/usocket-alt-name.test.mjs b/test/usocket-packet-alt-context.test.mjs similarity index 73% rename from test/usocket-alt-name.test.mjs rename to test/usocket-packet-alt-context.test.mjs index c277881..659f5f3 100644 --- a/test/usocket-alt-name.test.mjs +++ b/test/usocket-packet-alt-context.test.mjs @@ -8,7 +8,8 @@ const expect = chai.expect import { Consumer } from '../src' const USOCKET = __dirname + '/sockets/test.sock' -const SOCKET_FILE = 'usocket-alt-name' +const SOCKET_FILE = 'usocket-packet-alt-context' +const COUNT = 10000 let consumer = new Consumer(USOCKET, {name:'unix-consumer'}) let consumer2 = new Consumer(USOCKET, {name:'unix-consumer2'}) @@ -17,7 +18,7 @@ const delay = time => new Promise(res=>setTimeout(()=>res(),time)) let socket = {} -describe('Connects and Processes a payload via Unix Socket using JSON packet with alt named processor', function(){ +describe('Connects and Processes a payload via Unix Socket using JSON packet with local and alt context', function(){ before(async function(){ socket = spawn('node',['-r', '@std/esm', './test/sockets/'+SOCKET_FILE]) @@ -32,20 +33,20 @@ describe('Connects and Processes a payload via Unix Socket using JSON packet wit socket.kill() }) - it('Tests JSON packet procssing, 10 packets', async function () { + it('Tests JSON packet procssing, 10 packets using external context at socket', async function () { - consumer.times = 0 + consumer.packet.times = 0 return new Promise(async function (resolve, reject) { setTimeout(() =>{ reject('10 packets not received in time')},1900) - consumer.processPacket = function (packet) { + consumer.packet._process = function (packet) { this.times++ if (this.times!==11) return try { - expect(packet.payload).to.equal('using alt processor name unix payload') + expect(packet.payload).to.equal('unix payload:local:alt') resolve() } catch(error) { @@ -65,21 +66,20 @@ describe('Connects and Processes a payload via Unix Socket using JSON packet wit }) // end unix socket test - it('unix socket with two consumers alternating packets, 10 packets each', async function () { + it(`unix socket with two consumers alternating packets, ${COUNT} packets each using external context at socket`, async function () { - consumer.times = 0 + consumer.packet.times = 0 return new Promise(async function (resolve, reject) { setTimeout(() =>{ reject('10 packets not received in time')},1900) - consumer.processPacket = function (packet) { + consumer.packet._process = function (packet) { this.times++ // console.log(this.times,packet.payload) - if (this.times!==11) return - + if (this.times!==COUNT) return try { - expect(packet.payload).to.equal('using alt processor name consumer 1 unix payload') + expect(packet.payload).to.equal('consumer 1 unix payload:local:alt') resolve() } catch(error) { @@ -87,15 +87,11 @@ describe('Connects and Processes a payload via Unix Socket using JSON packet wit } } - consumer2.processPacket = function (packet) { - return packet - } - let [err] = await btc(consumer2.connect)() if (err) reject(err) let packet1 = {payload:'consumer 1 unix payload'} let packet2 = {payload:'consumer2 unix payload'} - for (var i = 0; i < 11; i++) { + for (var i = 0; i < COUNT; i++) { consumer.send(packet1) consumer2.send(packet2) }