From f0c6e96d02423168cfc954dce3a71bb55ff5687a Mon Sep 17 00:00:00 2001 From: David Kebler Date: Tue, 16 Jan 2018 12:43:36 -0800 Subject: [PATCH] added option for tcp socket so now module is just uci-socket. No authentification nor https at this point --- src/consumer.mjs | 20 +++++++--- src/socket.mjs | 16 ++++++-- test/socket.test.mjs | 90 ++++++++++++++++++++++++++++++++------------ 3 files changed, 93 insertions(+), 33 deletions(-) diff --git a/src/consumer.mjs b/src/consumer.mjs index bba1864..09a3544 100644 --- a/src/consumer.mjs +++ b/src/consumer.mjs @@ -5,12 +5,22 @@ import bunyan from 'bunyan' export default class Consumer extends Socket { constructor (path, opts={}) { super() - this.path = path + // set or tcp socket + if (typeof(path)!=='string') { + opts = path + this.host = '127.0.0.1' + this.port = opts.port || 8080 + } else { + if (opts.tcp) { + this.host = path + this.port = opts.port || 8080 + } else this.path = path + } this.keepAlive = opts.keepAlive ? opts.keepAlive : true this._ready = false this.timeout = opts.timeout || 1000 this.wait = opts.wait || 30 - this.log_file=opts.log_file || './log.log' + this.log_file=opts.log_file || './socket.log' this.log_opts = {streams:[]} this.log_opts.name = opts.name ? opts.name : 'uci-unix-socket-consumer' this.log_opts.streams.push({level: 'info',path: this.log_file }) @@ -21,17 +31,17 @@ export default class Consumer extends Socket { ready() {return this._ready} async connect (app) { - await this.listen(app) this.log.info('listening') return new Promise( (resolve,reject) => { this.on('error', (err) => { + console.log(err.code) reject(err) }) - super.connect({ path: this.path }, async () => { + super.connect({ port:this.port, host:this.host, path: this.path }, async () => { this.log.info({path:this.path},'connecting') this.setKeepAlive(this.keepAlive) let [err, res] = await btc(isReady).bind(this)(this.ready.bind(this), this.wait, this.timeout) @@ -73,7 +83,7 @@ export default class Consumer extends Socket { this.log.info({packet:packet},'incoming packet from socket') return packet } } - app[app.ucpp](packet) // process the packet + await app[app.ucpp](packet) // process the packet } else { this.log.info({buf: buf.toString()},'bad packet JSON syntax')} }) diff --git a/src/socket.mjs b/src/socket.mjs index 3427d92..4b12dee 100644 --- a/src/socket.mjs +++ b/src/socket.mjs @@ -7,8 +7,16 @@ import bunyan from 'bunyan' export default class Socket extends Server { constructor (path, opts={}) { super() - this.path = path - this.log_file=opts.log_file || './log.log' + // set or tcp socket + if (typeof(path)!=='string') { + opts = path + this.listen_opts = { host: '127.0.0.1', port: opts.port || 8080} + } else { + if (opts.tcp) { + this.listen_opts = { host: path, port: opts.port || 8080} + } else this.listen_opts = { path: path } + } + this.log_file=opts.log_file || './socket.log' this.log_opts = {streams:[]} this.log_opts.name = opts.name ? opts.name : 'uci-unix-socket-consumer' this.log_opts.streams.push({level: 'info',path: this.log_file }) @@ -45,7 +53,7 @@ export default class Socket extends Server { reject(err) }) - this.listen(this.path, async (err, res) => { + this.listen(this.listen_opts, async (err, res) => { if (err) reject(err) // this gets called for each client connection and is unique to each @@ -55,7 +63,7 @@ export default class Socket extends Server { socket.on('data', async (buf) => { let [err, packet] = btc(JSON.parse)(buf.toString()) if (!err) { - if (this.log) this.logger.info(`data packet received to socket \n ${packet}`) + if (this.log) this.log.info(`data packet received to socket \n ${packet}`) this.log.info({packet:packet},'Server: packet received to socket') // set default packet processing diff --git a/test/socket.test.mjs b/test/socket.test.mjs index bfb6246..668cc22 100644 --- a/test/socket.test.mjs +++ b/test/socket.test.mjs @@ -1,6 +1,7 @@ import { spawn } from 'child_process' import chai from 'chai' import chaiAsPromised from 'chai-as-promised' +import btc from 'better-try-catch' chai.use(chaiAsPromised) const expect = chai.expect @@ -8,42 +9,83 @@ import { Consumer } from '../src' const USOCKET = __dirname + '/sample.sock' -let consumer = new Consumer(USOCKET) - -const app = { - ucpp: 'cprocessPacket', - cprocessPacket: async function (packet) { - if (packet.processed) consumer.emit(packet.cmd,packet.payload) - } -} +let uconsumer = new Consumer(USOCKET, {name:'test-uconsumer'}) +let tcpconsumer = new Consumer('localhost',{port: 8081, tcp:true, name:'test-tcpconsumer'}) const delay = time => new Promise(res=>setTimeout(()=>res(),time)) -; -(async () => { +let usocket = {} +let tcpsocket = {} - let socket ={} +describe('Connects and Processes a payload in a JSON packet', function(){ before(async function(){ - socket = spawn('node',['-r', '@std/esm', './test/socket']) - await delay(500) // wait for socket to get going + usocket = spawn('node',['-r', '@std/esm', './test/usocket']) + usocket.stdout.on('data', function(buf) { + console.log('[Socket]', String(buf)) + }) + + tcpsocket = spawn('node',['-r', '@std/esm', './test/tcpsocket']) + tcpsocket.stdout.on('data', function(buf) { + console.log('[Socket]', String(buf)) + }) + + await delay(500) // wait for sockets to get going }) after(async function(){ - socket.kill() + usocket.kill() + tcpsocket.kill() }) - it('Connects and Processes some payload', async function () { + it('via unix socket', async function () { - console.log('connection is ',await consumer.connect(app)) - consumer.on('test1', function(payload){ - expect(payload).to.equal('payload1') - }) - let packet = {id: 'test consumer', cmd:'test1', payload:'payload1'} - consumer.send(packet) - }) + return new Promise(async function (resolve, reject) { + const app = { + processPacket: function (packet) { + try { + expect(packet.payload).to.equal('processed unix payload') + resolve() + } + catch(error) { + reject(error) + } + } + } + + let [err] = await btc(uconsumer.connect.bind(uconsumer))(app) + if (err) reject(err) + let packet = {payload:'unix payload'} + uconsumer.send(packet) + + }) //end promise + + }) // end unix socket test + + it('via tcp socket', async function () { + + return new Promise(async function (resolve, reject) { + + const app = { + processPacket: function (packet) { + try { + expect(packet.payload).to.equal('processed tcp payload') + resolve() + } + catch(error) { + reject(error) + } + } + } + + let [err] = await btc(tcpconsumer.connect.bind(tcpconsumer))(app) + if (err) reject(err) + let packet = {payload:'tcp payload'} + tcpconsumer.send(packet) + + }) //end promise + + }) // end unix socket test -})().catch(err => { - console.error('FATAL: UNABLE TO START SYSTEM!\n',err) })