From 4b13cef73a6b45e2304c2f88a61a68ffbd87c8bc Mon Sep 17 00:00:00 2001 From: David Kebler Date: Fri, 19 Jan 2018 20:43:16 -0800 Subject: [PATCH] streamline class method binding and setting of packet processing function --- src/consumer.mjs | 35 +++++++++++------- src/socket.mjs | 22 +++++------ test/socket.test.mjs | 87 ++++++++++++++++++++++++++++++++------------ test/tcpsocket.mjs | 11 ++---- test/tcpsocket2.mjs | 20 ++++++++++ test/usocket.mjs | 10 +---- 6 files changed, 119 insertions(+), 66 deletions(-) create mode 100644 test/tcpsocket2.mjs diff --git a/src/consumer.mjs b/src/consumer.mjs index c256285..0ab46df 100644 --- a/src/consumer.mjs +++ b/src/consumer.mjs @@ -12,23 +12,26 @@ export default class Consumer extends Socket { this.host = opts.host || '127.0.0.1' this.port = opts.port || 8080 } else this.path = path - + this._pp = opts.packetProcessor || 'processPacket' this.keepAlive = opts.keepAlive ? opts.keepAlive : true this._ready = false this.timeout = opts.timeout || 1000 - this.wait = opts.wait || 30 + this.wait = opts.wait || 20 + // logging this.log_file=opts.log_file || './socket.log' this.log_opts = {streams:[]} this.log_opts.name = opts.name ? opts.name : 'uci-unix-socket-consumer' this.log_opts.streams.push({level: 'info',path: this.log_file }) if (opts.log) this.log_opts.streams.push({level: 'info',stream: process.stdout}) this.log = bunyan.createLogger(this.log_opts) + // bind to class for other class functions + this.connect = this.connect.bind(this) } ready() {return this._ready} - async connect (app) { - await this.listen(app) + async connect () { + await this.listen() this.log.info('listening') return new Promise( (resolve,reject) => { @@ -66,7 +69,7 @@ export default class Consumer extends Socket { else { this.log.info({packet:packet}, 'bad packet JSON syntax')} } - async listen (app={}) { + async listen () { let packet = new Stream() @@ -74,22 +77,26 @@ export default class Consumer extends Socket { packet.write(chunk) }) - packet.on('data', async (strJSON) => { let [err, packet] = btc(JSON.parse)(strJSON) if (!err) { if (packet.ready) { this._ready = true return } - // set default packet processing - simple print to console of packet - app.cpp = app.cpp || 'processPacket' - if (!app[app.cpp]) { - app.cpp = 'processPacket' - app.processPacket = async (packet) => { - this.log.info({packet:packet},'incoming packet from socket') + + // set packet processing + this.pp = this.pp || this._pp + + // if no processor provided use this console logger one + if (!this[this.pp]) { + this.pp = 'processPacket' + this.processPacket = async (packet) => { + this.log.info({packet:packet},'process with default logger') + console.log('packet from socket') + console.dir(packet) return packet } } - await app[app.cpp](packet) // process the packet + await this[this.pp].bind(this)(packet) // process the packet } else { this.log.info({strJSON: strJSON},'bad packet JSON syntax')} }) @@ -97,7 +104,7 @@ export default class Consumer extends Socket { } // end class -// wait for handshake from socket +// wait for handshake packet from socket function isReady(ready, wait=30, timeout=1000) { let log = this.log let time = 0 diff --git a/src/socket.mjs b/src/socket.mjs index 88c9eef..e444df4 100644 --- a/src/socket.mjs +++ b/src/socket.mjs @@ -13,7 +13,7 @@ export default class Socket extends Server { opts = path this.listen_opts = { host: opts.host || '127.0.0.1', port: opts.port || 8080} } else this.listen_opts = { path: path } - this.spp = opts.spp || 'processPacket' + this._pp = opts.packetProcessor || 'processPacket' // logging this.log_file=opts.log_file || './socket.log' this.log_opts = {streams:[]} @@ -25,7 +25,7 @@ export default class Socket extends Server { } // end constructor - async create (app={}) { + async create () { return new Promise( async (resolve,reject) => { @@ -47,7 +47,7 @@ export default class Socket extends Server { if (path) { // if TCP socket should already be dead this.log.info({socket: path}, 'already exists...deleting') await fileDelete(path) - return await this.listen.bind(this)(this.listen_opts, app) + return await this.listen.bind(this)(this.listen_opts) } } // otherwise fatally exit @@ -55,7 +55,7 @@ export default class Socket extends Server { reject(err) }) - let [err, res] = await btc(this.listen.bind(this))(this.listen_opts,app) + let [err, res] = await btc(this.listen.bind(this))(this.listen_opts) if (err) reject(err) resolve(res) @@ -63,7 +63,7 @@ export default class Socket extends Server { } // end create - async listen (opts,app) { + async listen (opts) { super.listen(opts, async (err, res) => { @@ -86,18 +86,17 @@ export default class Socket extends Server { this.log.info({packet:packet},'Server: packet received to socket') // set default packet processing - // console.log('==========',app.spp,'====',this.spp) - this.spp = app.spp || this.spp + + this.pp = this.pp || this._pp // console.log('==========',app.spp,'====',this.spp) - if (!app[this.spp]) { - // app.spp = 'processPacket' - app.processPacket = async (packet) => { + if (!this[this.pp]) { + this.processPacket = async (packet) => { packet.res='echoed' this.log.info({packet:packet},'packet being sent to consumer') return packet } } - socket.write(JSON.stringify(await app[this.spp].bind(app)(packet))+'\n' ) + socket.write(JSON.stringify(await this[this.pp].bind(this)(packet))+'\n' ) } else { this.log.info(`bad packet JSON syntax \n ${strJSON}`) @@ -119,7 +118,6 @@ export default class Socket extends Server { } - async destroy () { this.log.info('closing down socket') diff --git a/test/socket.test.mjs b/test/socket.test.mjs index d7389da..0ea5831 100644 --- a/test/socket.test.mjs +++ b/test/socket.test.mjs @@ -11,11 +11,14 @@ const USOCKET = __dirname + '/sample.sock' let uconsumer = new Consumer(USOCKET, {name:'test-uconsumer'}) let tcpconsumer = new Consumer({name:'test-tcpconsumer'}) +let tcpconsumer2 = new Consumer({port:9080, packetProcessor:'test', name:'test-tcpconsumer-2'}) + const delay = time => new Promise(res=>setTimeout(()=>res(),time)) let usocket = {} let tcpsocket = {} +let tcpsocket2 = {} describe('Connects and Processes a payload in a JSON packet', function(){ @@ -30,62 +33,98 @@ describe('Connects and Processes a payload in a JSON packet', function(){ console.log('[Socket]', String(buf)) }) + tcpsocket2 = spawn('node',['-r', '@std/esm', './test/tcpsocket2']) + tcpsocket2.stdout.on('data', function(buf) { + console.log('[Socket]', String(buf)) + }) + await delay(500) // wait for sockets to get going }) after(async function(){ usocket.kill() tcpsocket.kill() + tcpsocket2.kill() }) - it('via unix socket', async function () { + it('via unix socket with defaults', async function () { + + uconsumer.times = 0 return new Promise(async function (resolve, reject) { - const app = { - processPacket: function (packet) { - try { - expect(packet.payload).to.equal('processed unix payload') - resolve() - } - catch(error) { - reject(error) - } + setTimeout(() =>{ reject('10 packets not received in time')},1900) + + uconsumer.processPacket = function (packet) { + this.times++ + if (this.times<10) return + + try { + // expect(packet.payload).to.equal('unix payload') + expect(packet.payload).to.equal('unix payload') + resolve() + } + catch(error) { + reject(error) } } - let [err] = await btc(uconsumer.connect.bind(uconsumer))(app) + let [err] = await btc(uconsumer.connect)() if (err) reject(err) let packet = {payload:'unix payload'} - uconsumer.send(packet) + for (var i = 0; i < 11; i++) { + uconsumer.send(packet) + } }) //end promise }) // end unix socket test - it('via tcp socket', async function () { + it('via tcp socket with defaults', async function () { return new Promise(async function (resolve, reject) { - const app = { - processPacket: function (packet) { - try { - expect(packet.payload).to.equal('processed tcp payload') - resolve() - } - catch(error) { - reject(error) - } + tcpconsumer.processPacket = function (packet) { + try { + expect(packet.payload).to.equal('tcp processed tcp payload') + resolve() + } + catch(error) { + reject(error) } } - let [err] = await btc(tcpconsumer.connect.bind(tcpconsumer))(app) + let [err] = await btc(tcpconsumer.connect)() if (err) reject(err) let packet = {payload:'tcp payload'} tcpconsumer.send(packet) }) //end promise - }) // end unix socket test + }) // end tcp socket test + + it('via alternate port tcp socket with attached custom socket processing and alt named socket processing', async function () { + + return new Promise(async function (resolve, reject) { + + tcpconsumer2.test = function (packet) { + try { + expect(packet.payload).to.equal('alt tcp processed tcp payload') + resolve() + } + catch(error) { + reject(error) + } + } + + let [err] = await btc(tcpconsumer2.connect)() + if (err) reject(err) + let packet = {payload:'tcp payload'} + tcpconsumer2.send(packet) + + }) //end promise + + }) // end tcp socket 2 test + }) diff --git a/test/tcpsocket.mjs b/test/tcpsocket.mjs index 9eb6f14..af19a26 100644 --- a/test/tcpsocket.mjs +++ b/test/tcpsocket.mjs @@ -2,18 +2,15 @@ import { Socket } from '../src' let socket = new Socket({name:'tcp socket'}) -const app = { - spp: 'sprocessPacket', - sprocessPacket: async function (packet) { - packet.payload = 'tcp processed '+packet.payload - return packet - } +socket.processPacket = async function (packet) { + packet.payload = 'tcp processed '+packet.payload + return packet } ; (async () => { - await socket.create(app) + await socket.create() })().catch(err => { console.error('FATAL: UNABLE TO START SYSTEM!\n',err) diff --git a/test/tcpsocket2.mjs b/test/tcpsocket2.mjs new file mode 100644 index 0000000..7bae5a1 --- /dev/null +++ b/test/tcpsocket2.mjs @@ -0,0 +1,20 @@ +import { Socket } from '../src' + +let socket = new Socket({port:9080, name:'tcp socket 8081'}) + +// socket.spp = 'saltprocessPacket' +socket.pp ='sprocessPacket' +socket.sprocessPacket = async function (packet) { + packet.payload = 'alt tcp processed '+packet.payload + return packet +} + + +; +(async () => { + + await socket.create() + +})().catch(err => { + console.error('FATAL: UNABLE TO START SYSTEM!\n',err) +}) diff --git a/test/usocket.mjs b/test/usocket.mjs index f9a2bba..015001f 100644 --- a/test/usocket.mjs +++ b/test/usocket.mjs @@ -4,18 +4,10 @@ const USOCKET = __dirname + '/sample.sock' let socket = new Socket(USOCKET,{name:'unix socket'}) -const app = { - spp: 'sprocessPacket', - sprocessPacket: async function (packet) { - packet.payload = 'processed '+packet.payload - return packet - } -} - ; (async () => { - await socket.create(app) + await socket.create() })().catch(err => { console.error('FATAL: UNABLE TO START SYSTEM!\n',err)