From 88c61f089515872183b5c0e46f1e140bea6d8e44 Mon Sep 17 00:00:00 2001 From: David Kebler Date: Sun, 8 Sep 2019 13:58:41 -0700 Subject: [PATCH] 0.3.9 (merged from http-server) refactored connected and added auto reconnect ability like the unix/tcp socket including authentication and per consumer ping added in consumer ping add emits for consumer connection --- examples/server.js | 92 +++++-- package.json | 7 +- src/socket.js | 325 +++++++++++++++++++------ test/.gitignore | 2 - test/sockets/.gitignore | 1 - test/sockets/tcp-process.js | 4 - test/sockets/tcpsocket-9080.js | 14 -- test/sockets/tcpsocket-default.js | 15 -- test/sockets/usocket-default.js | 11 - test/sockets/usocket.js | 23 -- test/tcp.test.js | 78 ------ test/thesearejustcopiesneedtobeadapted | 0 test/usocket-default.test.js | 83 ------- test/usocket.test.js | 51 ---- 14 files changed, 325 insertions(+), 381 deletions(-) delete mode 100644 test/.gitignore delete mode 100644 test/sockets/.gitignore delete mode 100644 test/sockets/tcp-process.js delete mode 100644 test/sockets/tcpsocket-9080.js delete mode 100644 test/sockets/tcpsocket-default.js delete mode 100644 test/sockets/usocket-default.js delete mode 100644 test/sockets/usocket.js delete mode 100644 test/tcp.test.js delete mode 100644 test/thesearejustcopiesneedtobeadapted delete mode 100644 test/usocket-default.test.js delete mode 100644 test/usocket.test.js diff --git a/examples/server.js b/examples/server.js index 8d71872..384b075 100644 --- a/examples/server.js +++ b/examples/server.js @@ -1,27 +1,83 @@ import Socket from '../src' -async function packetProcess(packet) { - // Here one registers a packet processor which gets an plain object hash from the client - // should return a promise - // for example if client sends a property cmd on can take action on that - return new Promise(resolve => { - let res = {} - switch (packet.cmd) { - case 'echo': - res.msg = 'Echoing Back any payload propery' - res.payload = packet.payload - this.push(res) - break - default: - res.msg = `.cmd, command ${packet.cmd} was unknown at server` - res.sent_packet = packet +const ANONYMOUS = true +const PORT = 8090 + +function packetProcess (packet) { + + const cmd = { + ack: function(packet){ + return new Promise( async (resolve) => { + packet.ack = true + packet.sender= packet.sender || (packet._header ? packet._header.sender.name : 'unknown') + packet.msg=`this is acknlowdgement that ${packet.sender} ack was received` + this.push(packet) // push to all active socket servers + return resolve(packet) // this goes back to sender + }) + }, + switch:{ + status: function(packet){ + return new Promise( async (resolve) => { + packet.cmd='switch/status' + packet.state=this.switches[packet.id-1] + packet.sender= packet.sender || (packet._header ? packet._header.sender.name : 'unknown') + this.push(packet) // push to all active socket servers + let res = { response:'status pushed on to all clients'} + return resolve(res) // this goes back to sender + }) + }, + on: function(packet){ + return new Promise( async (resolve) => { + packet.cmd='switch/status' + packet.state='on' + this.switches[packet.id-1] = 'on' + packet.sender= packet.sender || (packet._header ? packet._header.sender.name : 'unknown') + this.push(packet) // push to all active socket servers + let res = { response:'status change - pushed on to all clients', id:packet.id} + return resolve(res) // this goes back to sender + }) + }, + off: function(packet){ + return new Promise( async (resolve) => { + packet.cmd='switch/status' + packet.state='off' + this.switches[packet.id-1] = 'off' + packet.sender= packet.sender || (packet._header ? packet._header.sender.name : 'unknown') + this.push(packet) // push to all active socket servers + let res = { response:'status change - pushed off to all clients'} + return resolve(res) // this goes back to sender + }) + }, + toggle: function(packet){ + return new Promise( async (resolve) => { + this.switches[packet.id-1] = this.switches[packet.id-1]==='on' ? 'off' : 'on' + packet.state = this.switches[packet.id-1] + packet.cmd ='switch/status' + packet.sender= packet.sender || (packet._header ? packet._header.sender.name : 'unknown') + this.push(packet) // push to all active socket servers + let res = { response:`status change - pushed toggled state of ${packet.state} to all clients`} + return resolve(res) // this goes back to sender + }) + } } - resolve(res) - }) + } + // console.log('this in packet processor',packet.cmd) + let cmdProps = packet.cmd.split('/') + let func = cmd[cmdProps[0]] + // console.log(func) + if (cmdProps[1]) func = func[cmdProps[1]] + // console.log(func) + if (typeof func === 'function') return func.call(this,packet) + else { + packet.error='no function for command' + return packet + } } + // let test = new Test() -let test = new Socket({ port: 8090, clientTracking: true }) +let test = new Socket({ port: PORT, allowAnonymous:ANONYMOUS }) +test.switches = ['off','off','off','off'] // database to hold switch states test.registerPacketProcessor(packetProcess) ;(async () => { console.log(await test.create()) diff --git a/package.json b/package.json index 63df590..c4cf8e1 100644 --- a/package.json +++ b/package.json @@ -1,11 +1,12 @@ { "name": "@uci/websocket", - "version": "0.3.8", + "version": "0.3.9", "description": "JSON packet host websocket server", "main": "src", "scripts": { - "example": "node -r esm examples/server", - "dev": "UCI_ENV=dev UCI_LOG_LEVEL=debug ./node_modules/.bin/nodemon -r esm examples/server" + "server": "node -r esm examples/server", + "server:dev": "UCI_ENV=dev ./node_modules/.bin/nodemon -r esm examples/server", + "server:debug": "UCI_LOG_LEVEL=debug npm run server:dev" }, "author": "David Kebler", "license": "MIT", diff --git a/src/socket.js b/src/socket.js index c9dcabf..6d781e9 100644 --- a/src/socket.js +++ b/src/socket.js @@ -1,6 +1,6 @@ -import WebSocket from 'ws' +import { Server as WSS } from 'ws' +import { Server } from 'http' import btc from 'better-try-catch' -import to from 'await-to-js' import { promisify } from 'util' import _ON_DEATH from 'death' //this is intentionally ugly import clone from 'clone' @@ -9,22 +9,36 @@ import logger from '@uci-utils/logger' let log = {} /** - * Socket - a websocket that supports uci packets - * @class WebSocket - * @extends ws.server - * + * Socket - Description + * @extends Server */ -class Socket extends WebSocket.Server { +class Socket extends Server { + + /** + * constructor - Description + * + * @param {object} [opts={}] Description + * + * @returns {type} Description + */ constructor(opts = {}) { + super(opts) opts.host = opts.host || '0.0.0.0' opts.port = opts.port || 8090 - opts.clientTracking = opts.clientTracking || true - super(opts) this.id = opts.id || opts.name || 'Websocket:' + new Date().getTime() this.opts = opts // for use to recover from selected errors - //self bindings - this._listen = this._listen.bind(this) + this.wss = {} // web socket instance goes here + this.allowAnonymous = (!opts.tokens || !!process.env.UCI_ANON || opts.allowAnonymous) ? true : false + this.tokens = opts.tokens || [] + this.pingInterval = opts.pingInterval === false ? opts.pingInterval : (opts.pingInterval * 1000 || 5000) + this.nextconsumerID = 0 // incrementer for default initial consumer ID + this.consumers = new Map() + this.errors = [] + this.errorCount = 0 this.create = this.create.bind(this) + this._destroy = this._destroy.bind(this) + this.authenticateconsumer = this.authenticateconsumer.bind(this) + this._authenticate = this._authenticate.bind(this) this.close = promisify(this.close).bind(this) log = logger({ file: 'src/socket.js', @@ -34,76 +48,231 @@ class Socket extends WebSocket.Server { }) } // end constructor + /** + * create - Description + * + * @returns {type} Description + */ async create() { + // return Promise.resolve('what') return new Promise((resolve, reject) => { _ON_DEATH(async () => { - log.error({method:'create', line:39, msg:'\nhe\'s dead jim'}) + log.error({method:'create', line:51, msg:'\nhe\'s dead jim'}) await this._destroy() }) + + // kills nodemon properly process.once('SIGUSR2', async () => { - await this._destroy + await this._destroy() process.kill(process.pid, 'SIGUSR2') }) - this.on('error', async err => { - log.error({method:'creaete', line:48, err:err, msg:'socket server error'}) + this.once('error', async err => { + log.error({method:'creaete', line:48, err:err, msg:'http server error'}) reject(err) }) - this.on('listening', async () => { - this._listen() + this.once('listening', () => { log.info({method:'create', line:54, msg:'websocket server created and listening at', address:this.address()}) - resolve( - `websocket ready and listening at ${this.address().address}:${ - this.address().port - }` - ) + // emit ready + this.on('error', err => { + this.errorCount +=1 // log errors here + this.errors.push(err) + if(this.errorCount>2) this.emit('warn', {msg:'something bad maybe going on, 3 errors', errors:this.errors}) + if(this.errorCount>5) this.emit('fatal', {msg:'something fatal is going on, 6 errors', errors:this.errors}) + }) + this.wss = new WSS({server:this}) + this.wss.on('error', err => {this.emit('error', err)}) // bubble up errors + this.wss.on('connection', this._connectionHandler.bind(this)) + resolve(`websocket ready and listening at ${this.address().address}:${this.address().port}`) }) + super.listen(this.opts) }) } // end create registerPacketProcessor(func) { - this._packetProcess = func + this._packetProcess = func.bind(this) } - _listen() { - this.on('connection', async (socket, req) => { - let send = this._send.bind(socket) - log.debug({method:'_listen', line:71, req: req, msg: 'new consumer connecting'}) - socket.address = req.remoteAddress + addTokens(tokens) { + if (typeof tokens ==='string'){ + tokens = tokens.split(',') + } + this.tokens = this.tokens.concat(tokens) + if (this.tokens.length>0) this.allowAnonymous = false + } - socket.on('error', (err) => { - log.error({msg:'client connection error during listen',error:err}) + removeTokens(tokens) { + if (typeof tokens ==='string'){ + if (tokens === 'all') { + this.tokens = [] + this.allowAnonymous = true + return + } + tokens = tokens.split(',') + } + this.tokens = this.tokens.filter(token => !tokens.includes(token)) + if (this.tokens.length===0) { + log.warn({msg:'all tokens have been removed, switching to allow anonymous connections'}) + this.allowAnonymous = true + } + } + + registerTokenValidator (func) { + this.allowAnonymous = false + this._validateToken = func + } + + registerAuthenticator (func) { + this.allowAnonymous = false + this._authenticate = func + } + + + + /** + * push + * + * @param {Object} packet - this is the parameter packet + * @param {} id - this is the parameter id + * + */ + async push(packet={},id) { + packet._header = {id: id || 'pushed'} + if (this.consumers.size > 0) { + log.debug({method:'push', line:142, id:packet._header.id, opts: this.opts, packet: packet, msg:'pushing a packet to all connected consumers'}) + this.consumers.forEach(async consumer => { + this._send(consumer,packet) }) + } else { + log.debug({method:'push', line:165, id:packet._header.id, opts: this.opts, packet: packet, msg:'no connected consumers, push ignored'}) + } + } - socket.on('close', (msg) => { - log.warn({msg:'client connection closed during listen',error:msg}) - }) + removeconsumer (id) { + let consumer = this.consumers.get(id) + this.emit('consumer-connection',{state:'disconnected', id:consumer.id, name:consumer.name, socketSide:true}) + clearInterval(consumer._ping) + consumer.removeAllListeners() + log.warn({msg:`consumer ${id}:${consumer.name} removed from server tracking`, id:id, name:consumer.name, curconsumerCount:this.consumers.size}) + this.consumers.delete(id) + } - socket.on('message', messageProcess.bind(this, socket)) + async authenticateconsumer(consumer) { + // let server = this + return new Promise(async function(resolve, reject) { + // when consumer gets the handshake they must follow with authentication + consumer.on('message', authenticate.bind(this,consumer)) + let [err] = await btc(this._send)(consumer,{_handshake: true, id:consumer.id}) + if (err) { + log.error({msg:'error in handshake send', error:err}) + reject(err) + } - async function messageProcess(client, strPacket) { - log.debug({method:'_listen', line:76, packet: strPacket, msg:' incoming packet on web socket side'}) - let res = {} - let [err, packet] = btc(JSON.parse)(strPacket) - log.debug({method:'_listen', line:79, error:err, packet:packet, msg:'parsed websocket packet'}) - if (err) { - res = { error: `Could not parse JSON: ${packet}` } - } else { - if (packet.clientID) { - client.ID = packet.clientID - res.cmd = 'ackID' - } else { - res = (await this._packetProcess(clone(packet))) || {} - if (Object.keys(res).length === 0) - res = { - error: - 'socket packet command function likely did not return a promise', - packet: packet - } + async function authenticate (consumer,message) { + let [err,packet] = btc(JSON.parse)(message) + if (err) reject('unable to parse authentication packet from consumer') + log.debug({msg:`authentication packet from consumer ${consumer.id}`, packet:packet}) + if (!packet._authenticate) reject('first consumer packet was not authentication') + else { + let [err, res] = await btc(this._authenticate)(packet) + consumer.authenticated = this.allowAnonymous ? 'anonymous' : (err ? false : res) + consumer.name = packet.consumerName + packet.authenticated = consumer.authenticated + if (err && !this.allowAnonymous) packet.reason = err + log.debug({msg:'sending authorization result to consumer', packet:packet}) + await this._send(consumer,packet) // send either way + if (packet.reason) { + log.info({msg:'consumer authentication failed', consumer:consumer.name, consumer_id:consumer.id, reason:err}) + reject(packet.reason) + } + else { + log.info({msg:'consumer authenticated successfuly', consumer:consumer.name, consumer_id:consumer.id}) + if (this.allowAnonymous) log.warn({msg:'consumer connected anonymously', consumer:consumer.name, consumer_id:consumer.id}) + resolve(consumer.authenticated) } } + } + }.bind(this)) + } + + // private methods + + + // default validator + _validateToken (token) { + if (token) return this.tokens.includes(token) + return false + } + + // default authenticator - reject value should be reason which is returned to consumer + async _authenticate (packet) { + if (!this._validateToken(packet.token)) return Promise.reject('invalid token') + return true + } + + async _connectionHandler(consumer) { + log.debug({method:'_connectionHandler', line:76, msg: 'new web consumer connecting'}) + + consumer.id = ++this.nextconsumerID // server assigned ID + this.consumers.set(consumer.id,consumer) + consumer.authenticated = false + consumer.connected = true + + // add listeners + const consumerCloseHandler = (id) => { + log.warn({msg:'consumer connection closed during listen,',id:id}) + this.removeconsumer(id) + } + + consumer.on('close', consumerCloseHandler.bind(this,consumer.id)) + + consumer.on('error', (err) => { + log.error({msg:'consumer connection error during listen',error:err}) + // TODO do more handling than just logging + }) + + let [err] = await btc(this.authenticateconsumer)(consumer) + if (!this.allowAnonymous) { + if (err) { + consumer.close()// abort new connection consumer, cleanup, remove listeners + return + } + } + + if (this.opts.conPacket) { + this.opts.conPacket._header = { id: 'pushed' } + log.debug({method:'_listen', line:171, conPacket: this.opts.conPacket, msg:'pushing a preset command to just connected consumer'}) + this._send(consumer,this.opts.conPacket) // send a packet command on to consumer on connection + } + + this.emit('consumer-connection',{state:'connected', consumer:consumer}) + this.emit('status',{level:'info', msg:'a consumer connected', name:consumer.name, id:consumer.id}) + + consumer._ping = setInterval( () => { + consumer.ping(JSON.stringify({pingInterval:this.pingInterval})) + this._send(consumer,{_header:{id:'ping'},pingInterval:this.pingInterval}) + },this.pingInterval) + + consumer.on('message', messageProcess.bind(this, consumer)) + + async function messageProcess(consumer, strPacket) { + log.debug({method:'_listen', line:76, packet: strPacket, msg:' incoming packet from web consumer'}) + let res = {} + let [err, packet] = btc(JSON.parse)(strPacket) + if (err) { + res = { error: 'Could not JSON parse packet', packet:strPacket } + } + else { + log.warn({method:'_listen', line:266, packet:packet, msg:'parsed packet'}) + res = (await this._packetProcess(clone(packet))) || {} + if (Object.keys(res).length === 0) + res = { + error: + 'consumer packet command function likely did not return a promise', + packet: packet + } if (packet) { res._header = clone(packet._header, false) || {} //make sure return packet has header with id in case it was removed in processing delete packet._header // remove before adding to response header as request @@ -112,17 +281,21 @@ class Socket extends WebSocket.Server { res._header.responder = { name: this.name, instanceID: this.id } res._header.socket = this.address() if (!res.cmd) res.cmd = 'reply' // by default return command is 'reply' - let sres = await send(res) - log.debug({method:'_listen', line:105, packet: res, sendResponse:sres, msg:'packet reply send back to client'}) + let [err] = await btc(this._send)(consumer,res) + if (err) log.error({msg:err, error:err}) } - }) // end connected consumer - log.debug('socket created') - } // end listen + } // end message process + + } // end connection Handler async _destroy() { - log.debug({method:'_listen', line:105, msg:'closing down socket'}) + log.debug({method:'_destroy', line:302, msg:'closing down http server and attached sockets', port:this.port}) + this.consumers.forEach(consumer => { + consumer.terminate() + consumer.emit('close') + }) await this.close() - log.debug({method:'_listen', line:105, msg:'all connections closed....exiting'}) + log.debug({method:'_listen', line:105, msg:'all connections closed, all consumer sockets deleted....exiting'}) process.exit() } @@ -133,27 +306,23 @@ class Socket extends WebSocket.Server { }) } - async push(packet, id) { - packet._header = { id: id || 'pushed' } - this.clients.forEach(async client => { - if (client.readyState === WebSocket.OPEN) { - if (!id || id === client.ID) await this._send.bind(client)(packet) - } + + + async _send(consumer, packet) { + log.debug({msg:`sending to client:${consumer.id}:${consumer.name}`, packet:packet}) + return new Promise(async (resolve, reject) => { + // if (!consumer._connected) reject('can not send no connection') + let [err, message] = btc(JSON.stringify)(packet) + if (err) reject('unable to serialze the packet') + consumer.send(message,(err) => { + if (!err) resolve('packet written to socket stream successfully') + else reject(`error sending: ${err}`) + }) }) } - // must have a consumer socket instance bound to call this function - async _send(packet) { - if (this.readyState !== 1) log.error({method:'_send', line:147, msg:`Connection not Ready, CODE:${this.readyState}`}) - else { - let [err, message] = btc(JSON.stringify)(packet) - if (err) log.error({method:'_send', line:147, msg:`WS Could not JSON stringify: ${packet}`}) - else { - try { this.send(message) } - catch(err) { log.error({method:'_send', line:153, msg:'error sending from ws server', error:err}) } - } - } - } + + } // end class export default Socket diff --git a/test/.gitignore b/test/.gitignore deleted file mode 100644 index b049ffa..0000000 --- a/test/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -*.sock -/node_modules/ diff --git a/test/sockets/.gitignore b/test/sockets/.gitignore deleted file mode 100644 index 2ccbe46..0000000 --- a/test/sockets/.gitignore +++ /dev/null @@ -1 +0,0 @@ -/node_modules/ diff --git a/test/sockets/tcp-process.js b/test/sockets/tcp-process.js deleted file mode 100644 index 5eeb46a..0000000 --- a/test/sockets/tcp-process.js +++ /dev/null @@ -1,4 +0,0 @@ -export default async function (packet) { - packet.payload = this.opts.port +':'+packet.payload - return packet -} diff --git a/test/sockets/tcpsocket-9080.js b/test/sockets/tcpsocket-9080.js deleted file mode 100644 index af5b0a5..0000000 --- a/test/sockets/tcpsocket-9080.js +++ /dev/null @@ -1,14 +0,0 @@ -import { Socket } from '../../src' -import process from './tcp-process' - -let socket = new Socket({port:9080, name:'tcp socket 9080'}) - -socket.registerPacketProcessor(process) -; -(async () => { - - await socket.create() - -})().catch(err => { - console.error('FATAL: UNABLE TO START SYSTEM!\n',err) -}) diff --git a/test/sockets/tcpsocket-default.js b/test/sockets/tcpsocket-default.js deleted file mode 100644 index 86ef932..0000000 --- a/test/sockets/tcpsocket-default.js +++ /dev/null @@ -1,15 +0,0 @@ -import { Socket } from '../../src' -import process from './tcp-process' - -let socket = new Socket({name:'tcp socket'}) - -socket.registerPacketProcessor(process) - -; -(async () => { - - await socket.create() - -})().catch(err => { - console.error('FATAL: UNABLE TO START SYSTEM!\n',err) -}) diff --git a/test/sockets/usocket-default.js b/test/sockets/usocket-default.js deleted file mode 100644 index 3de8d32..0000000 --- a/test/sockets/usocket-default.js +++ /dev/null @@ -1,11 +0,0 @@ -import { Socket } from '../../src' - -let socket = new Socket({path:true,name:'default-unix-socket'}) - -; -(async () => { - await socket.create() - -})().catch(err => { - console.error('FATAL: UNABLE TO START SYSTEM!\n',err) -}) diff --git a/test/sockets/usocket.js b/test/sockets/usocket.js deleted file mode 100644 index b4bac21..0000000 --- a/test/sockets/usocket.js +++ /dev/null @@ -1,23 +0,0 @@ -import { Socket } from '../../src' - -const USOCKET = 'usocket' - -let socket = new Socket({path:USOCKET,name:'default-unix-socket'}) - -socket.test = 'at socket => ' - -socket.registerPacketProcessor(async function (packet) { - return new Promise((resolve) => { - packet.payload = 'overwrite default processor from instance '+ this.test + packet.payload - resolve(packet) - }) -}) - -; -(async () => { - - await socket.create() - -})().catch(err => { - console.error('FATAL: UNABLE TO START SYSTEM!\n',err) -}) diff --git a/test/tcp.test.js b/test/tcp.test.js deleted file mode 100644 index 15510a2..0000000 --- a/test/tcp.test.js +++ /dev/null @@ -1,78 +0,0 @@ -import { spawn } from 'child_process' -import chai from 'chai' -import chaiAsPromised from 'chai-as-promised' -import btc from 'better-try-catch' -chai.use(chaiAsPromised) -const expect = chai.expect - -import { Consumer } from '../src' - -let tcpsocket_default = {} -let tcpsocket_9080 = {} - -describe('Connects and Processes a payload in a JSON packet via TCP Socket', function(){ - - before(async function(){ - tcpsocket_default = spawn('node',['-r', '@std/esm', './test/sockets/tcpsocket-default']) - tcpsocket_default.stdout.on('data', function(buf) { - console.log('[Socket]', String(buf)) - }) - - tcpsocket_9080 = spawn('node',['-r', '@std/esm', './test/sockets/tcpsocket-9080']) - tcpsocket_9080.stdout.on('data', function(buf) { - console.log('[Socket]', String(buf)) - }) - - }) - - after(async function(){ - tcpsocket_default.kill() - tcpsocket_9080.kill() - }) - - it('with default host and port', async function () { - let tcpconsumer_default = new Consumer({name:'tcpconsumer'}) - - let [err] = await btc(tcpconsumer_default.connect)() - if (err) { - console.log('unable to connect to socket to start test', tcpconsumer_default.port) - process.kill(process.pid, 'SIGTERM') - } - - tcpconsumer_default.registerPacketProcessor(async function (packet) { - return new Promise((resolve) => { - packet.payload = packet.payload +':local' - resolve(packet)}) - }) - - let packet = {payload:'tcp payload'} - let res = await tcpconsumer_default.send(packet) - expect(res.payload).to.equal('8080:tcp payload:local') - - }) // end tcp socket test - - it('with alternate port, and registered consumer processor', async function () { - - let tcpconsumer_9080 = new Consumer({port:9080, name:'tcp-consumer-9080'}) - - - let [err] = await btc(tcpconsumer_9080.connect)() - if (err) { - console.log('unable to connect to socket to start test', tcpconsumer_9080.port) - process.kill(process.pid, 'SIGTERM') - } - - tcpconsumer_9080.registerPacketProcessor(async function (packet) { - return new Promise((resolve) => { - packet.payload = packet.payload +':local' - resolve(packet)}) - }) - - let packet = {payload:'tcp payload'} - let res = await tcpconsumer_9080.send(packet) - expect(res.payload).to.equal('9080:tcp payload:local') - - }) // end tcp socket 2 test - - -}) // end describe diff --git a/test/thesearejustcopiesneedtobeadapted b/test/thesearejustcopiesneedtobeadapted deleted file mode 100644 index e69de29..0000000 diff --git a/test/usocket-default.test.js b/test/usocket-default.test.js deleted file mode 100644 index 001f11b..0000000 --- a/test/usocket-default.test.js +++ /dev/null @@ -1,83 +0,0 @@ -import { spawn } from 'child_process' -import chai from 'chai' -import chaiAsPromised from 'chai-as-promised' -import btc from 'better-try-catch' -chai.use(chaiAsPromised) -const expect = chai.expect - -import { Consumer } from '../src' - -const SOCKET_FILE = 'usocket-default' - -let consumer = new Consumer({path:true,name:'unix-consumer'}) -let consumer2 = new Consumer({path:true, name:'unix-consumer2'}) - -let socket = {} - -describe('Connects and Processes a payload via Unix Socket using JSON packet with defaults', function(){ - - before(async function(){ - socket = spawn('node',['-r', '@std/esm', './test/sockets/'+SOCKET_FILE]) - socket.stdout.on('data', function(buf) { - console.log('[Socket]', String(buf)) - }) - }) - - after(async function(){ - socket.kill() - }) - - const TIMES = 3000 - - it(`Tests unix socket with default echo JSON packet procssing with ${TIMES} packets sent`, async function () { - - let [err] = await btc(consumer.connect)() - if (err) { - console.log('unable to connect to socket to start test', consumer.path) - process.kill(process.pid, 'SIGTERM') - } - - consumer.registerPacketProcessor(async function (packet) { - return new Promise((resolve) => { - packet.times += 1 - if (packet.times === TIMES) packet.payload = 'local1:'+packet.payload - resolve(packet)}) - }) - - - let packet = {payload:'payload', times:0} - for (let i = 1; i <= TIMES; i++) { - packet = await consumer.send(packet) - } - expect(packet.payload+':'+packet.times).to.equal('local1:payload:'+TIMES) - - }) // end unix socket test - - - it(`unix socket with two consumers alternating packets, ${TIMES} packets each and local processing`, async function () { - - - let [err] = await btc(consumer2.connect)() - if (err) { - console.log('unable to connect to socket to start test', consumer.path) - process.kill(process.pid, 'SIGTERM') - } - - consumer2.registerPacketProcessor(async function (packet) { - return new Promise((resolve) => { - packet.times += 1 - if (packet.times === TIMES) packet.payload = 'local2:'+packet.payload - resolve(packet)}) - }) - - let packet = {consumer:1, payload:'payload', times:-1} - for (let i = 0; i < TIMES; i++) { - packet = await consumer.send(packet) - if (packet.times === TIMES) packet.times = 1 - packet = await consumer2.send(packet) - } - expect(packet.payload+':'+packet.times).to.equal('local2:local1:payload:'+TIMES) - - }) // end unix socket test - -}) // end describe diff --git a/test/usocket.test.js b/test/usocket.test.js deleted file mode 100644 index 51d7341..0000000 --- a/test/usocket.test.js +++ /dev/null @@ -1,51 +0,0 @@ -import { spawn } from 'child_process' -import chai from 'chai' -import chaiAsPromised from 'chai-as-promised' -import btc from 'better-try-catch' -chai.use(chaiAsPromised) -const expect = chai.expect - -import { Consumer } from '../src' - -const SOCKET_FILE = 'usocket' - -let consumer = new Consumer({path:SOCKET_FILE,name:'unix-consumer'}) - -// const delay = time => new Promise(res=>setTimeout(()=>res(),time)) - -let socket = {} - -describe('Connects and Processes a payload via Unix Socket using JSON packet with alt processor', function(){ - - before(async function(){ - socket = spawn('node',['-r', '@std/esm', './test/sockets/'+SOCKET_FILE]) - socket.stdout.on('data', function(buf) { - console.log('[Socket]', String(buf)) - }) - }) - - after(async function(){ - socket.kill() - }) - - it('Tests alternate JSON packet procssing at socket and consumer', async function () { - - let [err] = await btc(consumer.connect)() - if (err) { - console.log('unable to connect to socket to start test', consumer.path) - process.kill(process.pid, 'SIGTERM') - } - - consumer.registerPacketProcessor(async function (packet) { - return new Promise((resolve) => { - packet.payload = 'local:'+packet.payload - resolve(packet)}) - }) - let packet = { payload:'payload'} - let res = await consumer.send(packet) - expect(res.payload).to.equal('local:overwrite default processor from instance at socket => payload') - - - }) // end unix socket test - -}) // end describe