diff --git a/src/consumer.mjs b/src/consumer.mjs index cdb4064..248f84a 100644 --- a/src/consumer.mjs +++ b/src/consumer.mjs @@ -13,7 +13,7 @@ export default class Consumer extends Socket { this.host = opts.host || '127.0.0.1' this.port = opts.port || 8080 } else this.path = path - this._pp = opts.packetProcessor || 'processPacket' + this.pp = opts.packetProcessor || 'processPacket' this.keepAlive = opts.keepAlive ? opts.keepAlive : true this._ready = false this.timeout = opts.timeout || 500 @@ -35,19 +35,20 @@ export default class Consumer extends Socket { async connect (app={}) { - // first set the packet process - this.pp = this.pp || this._pp if (Object.keys(app).length === 0) app = this - else app.pp = app.pp || this._pp + else app.pp = app.pp || this.pp // set a default processor if none provided - if (!app[this.pp]) { - this.pp = 'processPacket' // reset in case alt function is missing + if (!app[app.pp]) { + app.pp = 'processPacket' // reset in case app[app.pp] is missing app.processPacket = async (packet) => { - console.log('packet from socket') + console.log('default processor -- packet from socket') console.dir(packet) return packet } } + // console.log('consumer processing=>',app.pp,'=>', app[app.pp]) + // console.log('========================') + this.listen(app) this.log.info('listening') diff --git a/src/socket.mjs b/src/socket.mjs index ba9027b..fc9f158 100644 --- a/src/socket.mjs +++ b/src/socket.mjs @@ -14,7 +14,7 @@ export default class Socket extends Server { opts = path this.listen_opts = { host: opts.host || '127.0.0.1', port: opts.port || 8080} } else this.listen_opts = { path: path } - this._pp = opts.packetProcessor || 'processPacket' + this.pp = opts.packetProcessor || 'processPacket' // logging this.log_file=opts.log_file || './socket.log' this.log_opts = {streams:[]} @@ -28,19 +28,17 @@ export default class Socket extends Server { async create (app={}) { -// first set the packet process - this.pp = this.pp || this._pp if (Object.keys(app).length === 0) app = this - else app.pp = app.pp || this._pp -// set a default processor if none provided + else app.pp = app.pp || this.pp if (!app[app.pp]) { - app.pp = 'processPacket' // reset in case alt function is missing + this.pp = 'processPacket' // reset in case app.pp was set but app.pp function is missing app.processPacket = async (packet) => { packet.res='echoed' return packet } } - // console.log('socket processing app ',app[app.pp]) + // console.log('socket processing3=>',app.pp,'=>', app[app.pp]) + // console.log('========================') return new Promise( async (resolve,reject) => { @@ -94,11 +92,15 @@ export default class Socket extends Server { socket.on('data', stream.onData) stream.on('message', async function (packet) { - // console.log('incoming packet from consumer',packet) - // console.log('socket processing app ',app[app.pp]) socket.write(stream.serialize(await app[app.pp].bind(app)(packet))) }) + // stream.on('message', messageProcess.bind(this)) + // async function messageProcess (packet) { + // console.log('pp', app.processPacket) + // socket.write(stream.serialize(await app[app.pp].bind(app)(packet))) + // } + }) // end connected consumer this.log.info({socket: this.listen_opts},'socket created') @@ -107,7 +109,6 @@ export default class Socket extends Server { } - async destroy () { this.log.info('closing down socket') diff --git a/test/off/tcp.test.mjs b/test/off/tcp.test.mjs new file mode 100644 index 0000000..6ef8762 --- /dev/null +++ b/test/off/tcp.test.mjs @@ -0,0 +1,86 @@ +import { spawn } from 'child_process' +import chai from 'chai' +import chaiAsPromised from 'chai-as-promised' +import btc from 'better-try-catch' +chai.use(chaiAsPromised) +const expect = chai.expect + +import { Consumer } from '../src' + +const delay = time => new Promise(res=>setTimeout(()=>res(),time)) + +let tcpsocket_default = {} +let tcpsocket_9080 = {} + +describe('Connects and Processes a payload in a JSON packet via TCP Socket', function(){ + + before(async function(){ + tcpsocket_default = spawn('node',['-r', '@std/esm', './test/sockets/tcpsocket-default']) + tcpsocket_default.stdout.on('data', function(buf) { + console.log('[Socket]', String(buf)) + }) + + tcpsocket_9080 = spawn('node',['-r', '@std/esm', './test/sockets/tcpsocket-9080']) + tcpsocket_9080.stdout.on('data', function(buf) { + console.log('[Socket]', String(buf)) + }) + + await delay(500) // wait for sockets to get going + }) + + after(async function(){ + tcpsocket_default.kill() + tcpsocket_9080.kill() + }) + + it('with default host and port', async function () { + let tcpconsumer_default = new Consumer({name:'tcpconsumer'}) + + return new Promise(async function (resolve, reject) { + + tcpconsumer_default.processPacket = function (packet) { + try { + expect(packet.payload).to.equal('8080:tcp payload') + resolve() + } + catch(error) { + reject(error) + } + } + + let [err] = await btc(tcpconsumer_default.connect)() + if (err) reject(err) + let packet = {payload:'tcp payload'} + tcpconsumer_default.send(packet) + + }) //end promise + + }) // end tcp socket test + + it('with alternate port, and passed consumer processor', async function () { + + let tcpconsumer_9080 = new Consumer({packetProcessor: 'test', port:9080, name:'tcp-consumer-9080'}) + + return new Promise(async function (resolve, reject) { + + tcpconsumer_9080.test = function (packet) { + try { + expect(packet.payload).to.equal('9080:tcp payload') + resolve() + } + catch(error) { + reject(error) + } + } + + let [err] = await btc(tcpconsumer_9080.connect)() + if (err) reject(err) + let packet = {payload:'tcp payload'} + tcpconsumer_9080.send(packet) + + }) //end promise + + }) // end tcp socket 2 test + + +}) // end describe diff --git a/test/socket.test.mjs b/test/socket.test.mjs deleted file mode 100644 index 93550a8..0000000 --- a/test/socket.test.mjs +++ /dev/null @@ -1,172 +0,0 @@ -import { spawn } from 'child_process' -import chai from 'chai' -import chaiAsPromised from 'chai-as-promised' -import btc from 'better-try-catch' -chai.use(chaiAsPromised) -const expect = chai.expect - -import { Consumer } from '../src' - -const USOCKET = __dirname + '/sample.sock' - -let uconsumer = new Consumer(USOCKET, {name:'test-uconsumer'}) -let uconsumer2 = new Consumer(USOCKET, {name:'test-uconsumer-2'}) -let tcpconsumer = new Consumer({name:'test-tcpconsumer'}) -let tcpconsumer2 = new Consumer({port:9080, packetProcessor:'test', name:'test-tcpconsumer-2'}) - - -const delay = time => new Promise(res=>setTimeout(()=>res(),time)) - -let usocket = {} -let tcpsocket = {} -let tcpsocket2 = {} - -describe('Connects and Processes a payload in a JSON packet', function(){ - - before(async function(){ - usocket = spawn('node',['-r', '@std/esm', './test/usocket']) - usocket.stdout.on('data', function(buf) { - console.log('[Socket]', String(buf)) - }) - - tcpsocket = spawn('node',['-r', '@std/esm', './test/tcpsocket']) - tcpsocket.stdout.on('data', function(buf) { - console.log('[Socket]', String(buf)) - }) - - tcpsocket2 = spawn('node',['-r', '@std/esm', './test/tcpsocket2']) - tcpsocket2.stdout.on('data', function(buf) { - console.log('[Socket]', String(buf)) - }) - - await delay(500) // wait for sockets to get going - }) - - after(async function(){ - usocket.kill() - tcpsocket.kill() - tcpsocket2.kill() - }) - - it('via unix socket with defaults testing stream JSON packet parser, 10 packets', async function () { - - uconsumer.times = 0 - - return new Promise(async function (resolve, reject) { - - setTimeout(() =>{ reject('10 packets not received in time')},1900) - - uconsumer.processPacket = function (packet) { - this.times++ - if (this.times<10) return - - try { - expect(packet.payload).to.equal('unix payload') - resolve() - } - catch(error) { - reject(error) - } - } - - let [err] = await btc(uconsumer.connect)() - if (err) reject(err) - let packet = {payload:'unix payload'} - for (var i = 0; i < 11; i++) { - uconsumer.send(packet) - } - - }) //end promise - - }) // end unix socket test - - - it('unix socket with two consumers alternating packets, 10 packets each', async function () { - - uconsumer.times = 0 - - return new Promise(async function (resolve, reject) { - - setTimeout(() =>{ reject('10 packets not received in time')},1900) - - uconsumer.processPacket = function (packet) { - this.times++ - // console.log(this.times,packet.payload) - if (this.times<10) return - - try { - expect(packet.payload).to.equal('consumer 1 unix payload') - resolve() - } - catch(error) { - reject(error) - } - } - - uconsumer2.processPacket = function (packet) { - // console.log('processor2', packet.payload) - return packet - } - - let [err] = await btc(uconsumer2.connect)() - if (err) reject(err) - let packet1 = {payload:'consumer 1 unix payload'} - let packet2 = {payload:'consumer2 unix payload'} - for (var i = 0; i < 11; i++) { - uconsumer.send(packet1) - uconsumer2.send(packet2) - } - - }) //end promise - - }) // end unix socket test - - - it('via tcp socket with defaults', async function () { - - return new Promise(async function (resolve, reject) { - - tcpconsumer.processPacket = function (packet) { - try { - expect(packet.payload).to.equal('tcp processed tcp payload') - resolve() - } - catch(error) { - reject(error) - } - } - - let [err] = await btc(tcpconsumer.connect)() - if (err) reject(err) - let packet = {payload:'tcp payload'} - tcpconsumer.send(packet) - - }) //end promise - - }) // end tcp socket test - - it('via alternate port tcp socket with attached custom socket processing and alt named socket processing', async function () { - - return new Promise(async function (resolve, reject) { - - tcpconsumer2.test = function (packet) { - try { - expect(packet.payload).to.equal('alt tcp processed tcp payload') - resolve() - } - catch(error) { - reject(error) - } - } - - let [err] = await btc(tcpconsumer2.connect)() - if (err) reject(err) - let packet = {payload:'tcp payload'} - tcpconsumer2.send(packet) - - }) //end promise - - }) // end tcp socket 2 test - - -}) // end describe diff --git a/test/sockets/.gitignore b/test/sockets/.gitignore new file mode 100644 index 0000000..2ccbe46 --- /dev/null +++ b/test/sockets/.gitignore @@ -0,0 +1 @@ +/node_modules/ diff --git a/test/sockets/tcp-app.mjs b/test/sockets/tcp-app.mjs new file mode 100644 index 0000000..bc008e4 --- /dev/null +++ b/test/sockets/tcp-app.mjs @@ -0,0 +1,6 @@ +export default { + processPacket: async function (packet) { + packet.payload = this.port +':'+packet.payload + return packet + } +} diff --git a/test/sockets/tcpsocket-9080.mjs b/test/sockets/tcpsocket-9080.mjs new file mode 100644 index 0000000..abf74f4 --- /dev/null +++ b/test/sockets/tcpsocket-9080.mjs @@ -0,0 +1,15 @@ +import { Socket } from '../../src' +import app from './tcp-app' + +let socket = new Socket({port:9080, name:'tcp socket 9080'}) + +app.port = socket.listen_opts.port + +; +(async () => { + + await socket.create(app) + +})().catch(err => { + console.error('FATAL: UNABLE TO START SYSTEM!\n',err) +}) diff --git a/test/tcpsocket.mjs b/test/sockets/tcpsocket-default.mjs similarity index 52% rename from test/tcpsocket.mjs rename to test/sockets/tcpsocket-default.mjs index af19a26..9ad7e9d 100644 --- a/test/tcpsocket.mjs +++ b/test/sockets/tcpsocket-default.mjs @@ -1,12 +1,10 @@ -import { Socket } from '../src' +import { Socket } from '../../src' +import app from './tcp-app' let socket = new Socket({name:'tcp socket'}) -socket.processPacket = async function (packet) { - packet.payload = 'tcp processed '+packet.payload - return packet -} - +app.port = socket.listen_opts.port +Object.assign(socket,app) ; (async () => { diff --git a/test/sockets/usocket-alt-name.mjs b/test/sockets/usocket-alt-name.mjs new file mode 100644 index 0000000..fd612e5 --- /dev/null +++ b/test/sockets/usocket-alt-name.mjs @@ -0,0 +1,19 @@ +import { Socket } from '../../src' + +const USOCKET = __dirname + '/test.sock' + +let socket = new Socket(USOCKET,{packetProcessor:'sprocessPacket',name:'unix socket'}) + +socket.sprocessPacket = async function (packet) { + packet.payload = 'using alt processor name '+packet.payload + return packet +} + +; +(async () => { + + await socket.create() + +})().catch(err => { + console.error('FATAL: UNABLE TO START SYSTEM!\n',err) +}) diff --git a/test/sockets/usocket-app-alt-name.mjs b/test/sockets/usocket-app-alt-name.mjs new file mode 100644 index 0000000..ebee0f7 --- /dev/null +++ b/test/sockets/usocket-app-alt-name.mjs @@ -0,0 +1,21 @@ +import { Socket } from '../../src' + +const USOCKET = __dirname + '/test.sock' + +let socket = new Socket(USOCKET,{name:'unix socket'}) + +let app ={} +app.pp = 'test' +app.test = async function (packet) { + packet.payload = 'using passed processor with alt name '+packet.payload + return packet +} + +; +(async () => { + + await socket.create(app) + +})().catch(err => { + console.error('FATAL: UNABLE TO START SYSTEM!\n',err) +}) diff --git a/test/sockets/usocket-app-default-name.mjs b/test/sockets/usocket-app-default-name.mjs new file mode 100644 index 0000000..42c0684 --- /dev/null +++ b/test/sockets/usocket-app-default-name.mjs @@ -0,0 +1,20 @@ +import { Socket } from '../../src' + +const USOCKET = __dirname + '/test.sock' + +let socket = new Socket(USOCKET,{name:'unix socket'}) + +let app ={} +app.processPacket = async function (packet) { + packet.payload = 'using passed processor with default name '+packet.payload + return packet +} + +; +(async () => { + + await socket.create(app) + +})().catch(err => { + console.error('FATAL: UNABLE TO START SYSTEM!\n',err) +}) diff --git a/test/sockets/usocket-default-name.mjs b/test/sockets/usocket-default-name.mjs new file mode 100644 index 0000000..97d8be6 --- /dev/null +++ b/test/sockets/usocket-default-name.mjs @@ -0,0 +1,19 @@ +import { Socket } from '../../src' + +const USOCKET = __dirname + '/test.sock' + +let socket = new Socket(USOCKET,{name:'unix socket'}) + +socket.processPacket = async function (packet) { + packet.payload = 'using alt processor with default name '+packet.payload + return packet +} + +; +(async () => { + + await socket.create() + +})().catch(err => { + console.error('FATAL: UNABLE TO START SYSTEM!\n',err) +}) diff --git a/test/usocket.mjs b/test/sockets/usocket-default.mjs similarity index 70% rename from test/usocket.mjs rename to test/sockets/usocket-default.mjs index 015001f..6e07265 100644 --- a/test/usocket.mjs +++ b/test/sockets/usocket-default.mjs @@ -1,12 +1,11 @@ -import { Socket } from '../src' +import { Socket } from '../../src' -const USOCKET = __dirname + '/sample.sock' +const USOCKET = __dirname + '/test.sock' let socket = new Socket(USOCKET,{name:'unix socket'}) ; (async () => { - await socket.create() })().catch(err => { diff --git a/test/tcp.test.mjs b/test/tcp.test.mjs new file mode 100644 index 0000000..6ef8762 --- /dev/null +++ b/test/tcp.test.mjs @@ -0,0 +1,86 @@ +import { spawn } from 'child_process' +import chai from 'chai' +import chaiAsPromised from 'chai-as-promised' +import btc from 'better-try-catch' +chai.use(chaiAsPromised) +const expect = chai.expect + +import { Consumer } from '../src' + +const delay = time => new Promise(res=>setTimeout(()=>res(),time)) + +let tcpsocket_default = {} +let tcpsocket_9080 = {} + +describe('Connects and Processes a payload in a JSON packet via TCP Socket', function(){ + + before(async function(){ + tcpsocket_default = spawn('node',['-r', '@std/esm', './test/sockets/tcpsocket-default']) + tcpsocket_default.stdout.on('data', function(buf) { + console.log('[Socket]', String(buf)) + }) + + tcpsocket_9080 = spawn('node',['-r', '@std/esm', './test/sockets/tcpsocket-9080']) + tcpsocket_9080.stdout.on('data', function(buf) { + console.log('[Socket]', String(buf)) + }) + + await delay(500) // wait for sockets to get going + }) + + after(async function(){ + tcpsocket_default.kill() + tcpsocket_9080.kill() + }) + + it('with default host and port', async function () { + let tcpconsumer_default = new Consumer({name:'tcpconsumer'}) + + return new Promise(async function (resolve, reject) { + + tcpconsumer_default.processPacket = function (packet) { + try { + expect(packet.payload).to.equal('8080:tcp payload') + resolve() + } + catch(error) { + reject(error) + } + } + + let [err] = await btc(tcpconsumer_default.connect)() + if (err) reject(err) + let packet = {payload:'tcp payload'} + tcpconsumer_default.send(packet) + + }) //end promise + + }) // end tcp socket test + + it('with alternate port, and passed consumer processor', async function () { + + let tcpconsumer_9080 = new Consumer({packetProcessor: 'test', port:9080, name:'tcp-consumer-9080'}) + + return new Promise(async function (resolve, reject) { + + tcpconsumer_9080.test = function (packet) { + try { + expect(packet.payload).to.equal('9080:tcp payload') + resolve() + } + catch(error) { + reject(error) + } + } + + let [err] = await btc(tcpconsumer_9080.connect)() + if (err) reject(err) + let packet = {payload:'tcp payload'} + tcpconsumer_9080.send(packet) + + }) //end promise + + }) // end tcp socket 2 test + + +}) // end describe diff --git a/test/tcpsocket2.mjs b/test/tcpsocket2.mjs deleted file mode 100644 index 7bae5a1..0000000 --- a/test/tcpsocket2.mjs +++ /dev/null @@ -1,20 +0,0 @@ -import { Socket } from '../src' - -let socket = new Socket({port:9080, name:'tcp socket 8081'}) - -// socket.spp = 'saltprocessPacket' -socket.pp ='sprocessPacket' -socket.sprocessPacket = async function (packet) { - packet.payload = 'alt tcp processed '+packet.payload - return packet -} - - -; -(async () => { - - await socket.create() - -})().catch(err => { - console.error('FATAL: UNABLE TO START SYSTEM!\n',err) -}) diff --git a/test/usocket-alt-name.test.mjs b/test/usocket-alt-name.test.mjs new file mode 100644 index 0000000..c277881 --- /dev/null +++ b/test/usocket-alt-name.test.mjs @@ -0,0 +1,109 @@ +import { spawn } from 'child_process' +import chai from 'chai' +import chaiAsPromised from 'chai-as-promised' +import btc from 'better-try-catch' +chai.use(chaiAsPromised) +const expect = chai.expect + +import { Consumer } from '../src' + +const USOCKET = __dirname + '/sockets/test.sock' +const SOCKET_FILE = 'usocket-alt-name' + +let consumer = new Consumer(USOCKET, {name:'unix-consumer'}) +let consumer2 = new Consumer(USOCKET, {name:'unix-consumer2'}) + +const delay = time => new Promise(res=>setTimeout(()=>res(),time)) + +let socket = {} + +describe('Connects and Processes a payload via Unix Socket using JSON packet with alt named processor', function(){ + + before(async function(){ + socket = spawn('node',['-r', '@std/esm', './test/sockets/'+SOCKET_FILE]) + socket.stdout.on('data', function(buf) { + console.log('[Socket]', String(buf)) + }) + + await delay(500) // wait for sockets to get going + }) + + after(async function(){ + socket.kill() + }) + + it('Tests JSON packet procssing, 10 packets', async function () { + + consumer.times = 0 + + return new Promise(async function (resolve, reject) { + + setTimeout(() =>{ reject('10 packets not received in time')},1900) + + consumer.processPacket = function (packet) { + this.times++ + if (this.times!==11) return + + try { + expect(packet.payload).to.equal('using alt processor name unix payload') + resolve() + } + catch(error) { + reject(error) + } + } + + let [err] = await btc(consumer.connect)() + if (err) reject(err) + let packet = {payload:'unix payload'} + for (var i = 0; i < 11; i++) { + consumer.send(packet) + } + + }) //end promise + + }) // end unix socket test + + + it('unix socket with two consumers alternating packets, 10 packets each', async function () { + + consumer.times = 0 + + return new Promise(async function (resolve, reject) { + + setTimeout(() =>{ reject('10 packets not received in time')},1900) + + consumer.processPacket = function (packet) { + this.times++ + // console.log(this.times,packet.payload) + if (this.times!==11) return + + try { + expect(packet.payload).to.equal('using alt processor name consumer 1 unix payload') + resolve() + } + catch(error) { + reject(error) + } + } + + consumer2.processPacket = function (packet) { + return packet + } + + let [err] = await btc(consumer2.connect)() + if (err) reject(err) + let packet1 = {payload:'consumer 1 unix payload'} + let packet2 = {payload:'consumer2 unix payload'} + for (var i = 0; i < 11; i++) { + consumer.send(packet1) + consumer2.send(packet2) + } + + }) //end promise + + }) // end unix socket test + + + +}) // end describe diff --git a/test/usocket-app-alt-name.test.mjs b/test/usocket-app-alt-name.test.mjs new file mode 100644 index 0000000..6ab8257 --- /dev/null +++ b/test/usocket-app-alt-name.test.mjs @@ -0,0 +1,117 @@ +import { spawn } from 'child_process' +import chai from 'chai' +import chaiAsPromised from 'chai-as-promised' +import btc from 'better-try-catch' +chai.use(chaiAsPromised) +const expect = chai.expect + +import { Consumer } from '../src' + +const USOCKET = __dirname + '/sockets/test.sock' +const SOCKET_FILE = 'usocket-app-alt-name' + +const delay = time => new Promise(res=>setTimeout(()=>res(),time)) + +let socket = {} + +describe('Connects and Processes a payload via Unix Socket using JSON packet with passed processor using default name', function(){ + + before(async function(){ + socket = spawn('node',['-r', '@std/esm', './test/sockets/'+SOCKET_FILE]) + socket.stdout.on('data', function(buf) { + console.log('[Socket]', String(buf)) + }) + + await delay(500) // wait for sockets to get going + }) + + // beforeEach(async function(){ + // let consumer = new Consumer(USOCKET, {name:'unix-consumer'}) + // }) + + after(async function(){ + socket.kill() + }) + + it('Tests JSON packet procssing, 10 packets', async function () { + + let consumer = new Consumer(USOCKET, {name:'unix-consumer'}) + + return new Promise(async function (resolve, reject) { + + setTimeout(() =>{ reject('10 packets not received in time')},1900) + let app = {} + app.times = 0 + app.pp = 'xprocessPacket' + app.xprocessPacket = function (packet) { + this.times++ + // console.log('times=',this.times,'\n',packet.payload) + if (this.times!==11) return + consumer.end() + consumer.destroy() + + try { + expect(packet.payload).to.equal('using passed processor with alt name unix payload') + resolve() + } + catch(error) { + reject(error) + } + } + + let [err] = await btc(consumer.connect)(app) + if (err) reject(err) + let packet = {payload:'unix payload'} + for (var i = 0; i < 11; i++) { + consumer.send(packet) + } + + }) //end promise + + }) // end unix socket test + + + it('unix socket with two consumers alternating packets, 10 packets each, alt consumer processor name', async function () { + + let consumer = new Consumer(USOCKET, {name:'unix-consumer'}) + let consumer2 = new Consumer(USOCKET, {name:'unix-consumer2'}) + + return new Promise(async function (resolve, reject) { + + setTimeout(() =>{ reject('10 packets not received in time')},1900) + + let app = {} + app.pp = 'yprocessPacket' + app.times = 0 + app.yprocessPacket = function (packet) { + this.times++ + if (this.times!==11) return + try { + expect(packet.payload).to.equal('using passed processor with alt name consumer 1 unix payload') + resolve() + } + catch(error) { + reject(error) + } + } + + consumer2.processPacket = function (packet) { + return packet + } + + let [err] = await btc(consumer.connect)(app) + if (err) reject(err) + let [err2] = await btc(consumer2.connect)() + if (err2) reject(err2) + let packet1 = {payload:'consumer 1 unix payload'} + let packet2 = {payload:'consumer2 unix payload'} + for (var i = 0; i < 11; i++) { + consumer.send(packet1) + consumer2.send(packet2) + } + + }) //end promise + + }) // end unix socket test + +}) // end describe diff --git a/test/usocket-app-default-name.test.mjs b/test/usocket-app-default-name.test.mjs new file mode 100644 index 0000000..106c351 --- /dev/null +++ b/test/usocket-app-default-name.test.mjs @@ -0,0 +1,115 @@ +import { spawn } from 'child_process' +import chai from 'chai' +import chaiAsPromised from 'chai-as-promised' +import btc from 'better-try-catch' +chai.use(chaiAsPromised) +const expect = chai.expect + +import { Consumer } from '../src' + +const USOCKET = __dirname + '/sockets/test.sock' +const SOCKET_FILE = 'usocket-app-default-name' + +const delay = time => new Promise(res=>setTimeout(()=>res(),time)) + +let socket = {} + +describe('Connects and Processes a payload via Unix Socket using JSON packet with passed processor using default name', function(){ + + before(async function(){ + socket = spawn('node',['-r', '@std/esm', './test/sockets/'+SOCKET_FILE]) + socket.stdout.on('data', function(buf) { + console.log('[Socket]', String(buf)) + }) + + await delay(500) // wait for sockets to get going + }) + + // beforeEach(async function(){ + // let consumer = new Consumer(USOCKET, {name:'unix-consumer'}) + // }) + + after(async function(){ + socket.kill() + }) + + it('Tests JSON packet procssing, 10 packets', async function () { + + let consumer = new Consumer(USOCKET, {name:'unix-consumer'}) + + return new Promise(async function (resolve, reject) { + + setTimeout(() =>{ reject('10 packets not received in time')},1900) + let app = {} + app.times = 0 + app.processPacket = function (packet) { + this.times++ + // console.log('times=',this.times,'\n',packet.payload) + if (this.times!==11) return + consumer.end() + consumer.destroy() + + try { + expect(packet.payload).to.equal('using passed processor with default name unix payload') + resolve() + } + catch(error) { + reject(error) + } + } + + let [err] = await btc(consumer.connect)(app) + if (err) reject(err) + let packet = {payload:'unix payload'} + for (var i = 0; i < 11; i++) { + consumer.send(packet) + } + + }) //end promise + + }) // end unix socket test + + + it('unix socket with two consumers alternating packets, 10 packets each, alt consumer processor name', async function () { + + let consumer = new Consumer(USOCKET, {name:'unix-consumer'}) + let consumer2 = new Consumer(USOCKET, {name:'unix-consumer2'}) + + return new Promise(async function (resolve, reject) { + + setTimeout(() =>{ reject('10 packets not received in time')},1900) + + let app = {} + app.times = 0 + app.processPacket = function (packet) { + this.times++ + if (this.times!==11) return + try { + expect(packet.payload).to.equal('using passed processor with default name consumer 1 unix payload') + resolve() + } + catch(error) { + reject(error) + } + } + + consumer2.processPacket = function (packet) { + return packet + } + + let [err] = await btc(consumer.connect)(app) + if (err) reject(err) + let [err2] = await btc(consumer2.connect)() + if (err2) reject(err2) + let packet1 = {payload:'consumer 1 unix payload'} + let packet2 = {payload:'consumer2 unix payload'} + for (var i = 0; i < 11; i++) { + consumer.send(packet1) + consumer2.send(packet2) + } + + }) //end promise + + }) // end unix socket test + +}) // end describe diff --git a/test/usocket-default-name.test.mjs b/test/usocket-default-name.test.mjs new file mode 100644 index 0000000..3cdf939 --- /dev/null +++ b/test/usocket-default-name.test.mjs @@ -0,0 +1,109 @@ +import { spawn } from 'child_process' +import chai from 'chai' +import chaiAsPromised from 'chai-as-promised' +import btc from 'better-try-catch' +chai.use(chaiAsPromised) +const expect = chai.expect + +import { Consumer } from '../src' + +const USOCKET = __dirname + '/sockets/test.sock' +const SOCKET_FILE = 'usocket-default-name' + +let consumer = new Consumer(USOCKET, {name:'unix-consumer'}) +let consumer2 = new Consumer(USOCKET, {name:'unix-consumer2'}) + +const delay = time => new Promise(res=>setTimeout(()=>res(),time)) + +let socket = {} + +describe('Connects and Processes a payload via Unix Socket using JSON packet with alt default processor', function(){ + + before(async function(){ + socket = spawn('node',['-r', '@std/esm', './test/sockets/'+SOCKET_FILE]) + socket.stdout.on('data', function(buf) { + console.log('[Socket]', String(buf)) + }) + + await delay(500) // wait for sockets to get going + }) + + after(async function(){ + socket.kill() + }) + + it('Tests JSON packet procssing, 10 packets', async function () { + + consumer.times = 0 + + return new Promise(async function (resolve, reject) { + + setTimeout(() =>{ reject('10 packets not received in time')},1900) + + consumer.processPacket = function (packet) { + this.times++ + if (this.times!==11) return + + try { + expect(packet.payload).to.equal('using alt processor with default name unix payload') + resolve() + } + catch(error) { + reject(error) + } + } + + let [err] = await btc(consumer.connect)() + if (err) reject(err) + let packet = {payload:'unix payload'} + for (var i = 0; i < 11; i++) { + consumer.send(packet) + } + + }) //end promise + + }) // end unix socket test + + + it('unix socket with two consumers alternating packets, 10 packets each', async function () { + + consumer.times = 0 + + return new Promise(async function (resolve, reject) { + + setTimeout(() =>{ reject('10 packets not received in time')},1900) + + consumer.processPacket = function (packet) { + this.times++ + // console.log(this.times,packet.payload) + if (this.times!==11) return + + try { + expect(packet.payload).to.equal('using alt processor with default name consumer 1 unix payload') + resolve() + } + catch(error) { + reject(error) + } + } + + consumer2.processPacket = function (packet) { + return packet + } + + let [err] = await btc(consumer2.connect)() + if (err) reject(err) + let packet1 = {payload:'consumer 1 unix payload'} + let packet2 = {payload:'consumer2 unix payload'} + for (var i = 0; i < 11; i++) { + consumer.send(packet1) + consumer2.send(packet2) + } + + }) //end promise + + }) // end unix socket test + + + +}) // end describe diff --git a/test/usocket-default.test.mjs b/test/usocket-default.test.mjs new file mode 100644 index 0000000..9fb523e --- /dev/null +++ b/test/usocket-default.test.mjs @@ -0,0 +1,109 @@ +import { spawn } from 'child_process' +import chai from 'chai' +import chaiAsPromised from 'chai-as-promised' +import btc from 'better-try-catch' +chai.use(chaiAsPromised) +const expect = chai.expect + +import { Consumer } from '../src' + +const USOCKET = __dirname + '/sockets/test.sock' +const SOCKET_FILE = 'usocket-default' + +let consumer = new Consumer(USOCKET, {name:'unix-consumer'}) +let consumer2 = new Consumer(USOCKET, {name:'unix-consumer2'}) + +const delay = time => new Promise(res=>setTimeout(()=>res(),time)) + +let socket = {} + +describe('Connects and Processes a payload via Unix Socket using JSON packet with defaults', function(){ + + before(async function(){ + socket = spawn('node',['-r', '@std/esm', './test/sockets/'+SOCKET_FILE]) + socket.stdout.on('data', function(buf) { + console.log('[Socket]', String(buf)) + }) + + await delay(500) // wait for sockets to get going + }) + + after(async function(){ + socket.kill() + }) + + it('Tests unix socket with default echo JSON packet procssing, 10 packets', async function () { + + consumer.times = 0 + + return new Promise(async function (resolve, reject) { + + setTimeout(() =>{ reject('10 packets not received in time')},1900) + + consumer.processPacket = function (packet) { + this.times++ + if (this.times!==11) return + + try { + expect(packet.payload).to.equal('unix payload') + resolve() + } + catch(error) { + reject(error) + } + } + + let [err] = await btc(consumer.connect)() + if (err) reject(err) + let packet = {payload:'unix payload'} + for (var i = 0; i < 11; i++) { + consumer.send(packet) + } + + }) //end promise + + }) // end unix socket test + + + it('unix socket with two consumers alternating packets, 10 packets each', async function () { + + consumer.times = 0 + + return new Promise(async function (resolve, reject) { + + setTimeout(() =>{ reject('10 packets not received in time')},1900) + + consumer.processPacket = function (packet) { + this.times++ + // console.log(this.times,packet.payload) + if (this.times!==11) return + + try { + expect(packet.payload).to.equal('consumer 1 unix payload') + resolve() + } + catch(error) { + reject(error) + } + } + + consumer2.processPacket = function (packet) { + return packet + } + + let [err] = await btc(consumer2.connect)() + if (err) reject(err) + let packet1 = {payload:'consumer 1 unix payload'} + let packet2 = {payload:'consumer2 unix payload'} + for (var i = 0; i < 11; i++) { + consumer.send(packet1) + consumer2.send(packet2) + } + + }) //end promise + + }) // end unix socket test + + + +}) // end describe