diff --git a/.eslintrc.js b/.eslintrc.js new file mode 100644 index 0000000..2bed546 --- /dev/null +++ b/.eslintrc.js @@ -0,0 +1,33 @@ +module.exports = { + "ecmaFeatures": { + "modules": true, + "spread" : true, + "restParams" : true + }, + "env": { + "es6": true, + "node": true, + "mocha": true + }, + "parserOptions": { + "ecmaVersion": 2017, + "sourceType": "module" + }, + "extends": "eslint:recommended", + "rules": { + "indent": [ + "error", + 2 + ], + "no-console": 0, + "semi": ["error", "never"], + "linebreak-style": [ + "error", + "unix" + ], + "quotes": [ + "error", + "single" + ] + } +} diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..caddd9f --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +/node_modules/ +/coverage/ +/syncd/ +*.log +/temp/ diff --git a/.npmignore b/.npmignore new file mode 100644 index 0000000..f16fc41 --- /dev/null +++ b/.npmignore @@ -0,0 +1,4 @@ +tests/ +test/ +*.test.js +testing/ diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..5b0b93e --- /dev/null +++ b/.travis.yml @@ -0,0 +1,12 @@ +language: node_js + +node_js: + - '7.10' + - 'node' + +sudo: false + +script: npm test + +after_success: + - bash <(curl -s https://codecov.io/bash) || echo "Codecov did not collect coverage reports" diff --git a/examples/.gitignore b/examples/.gitignore new file mode 100644 index 0000000..b049ffa --- /dev/null +++ b/examples/.gitignore @@ -0,0 +1,2 @@ +*.sock +/node_modules/ diff --git a/examples/server.mjs b/examples/server.mjs new file mode 100644 index 0000000..bd8770c --- /dev/null +++ b/examples/server.mjs @@ -0,0 +1,37 @@ +import { Socket } from '../src' + + ; +(async () => { + + class Test extends Socket { + constructor(opts) { + super(opts) + } + + // async _packetProcess(packet) { + // console.log('packet being processed at socket', packet) + // if (packet.cmd) return await this[packet.cmd](packet.data,packet.name) + // return {error: 'no command in packet', packet: packet } + // } + // + // async doit(data,name) { + // return new Promise(resolve => { + // let res = {} + // console.log('data sent to doit = ', data) + // res.status ='success' + // res.name = name + // res.cmd = 'reply' + // res.data = data + // resolve(res) + // }) + // } + + } + + // let test = new Test() + let test = new Test({port:8090}) + console.log(await test.create()) + +})().catch(err => { + console.error('FATAL: UNABLE TO START SYSTEM!\n',err) +}) diff --git a/package.json b/package.json new file mode 100644 index 0000000..4f446c7 --- /dev/null +++ b/package.json @@ -0,0 +1,51 @@ +{ + "name": "@uci/websocket", + "version": "0.1.5", + "description": "JSON packet host websocket server", + "main": "src", + "scripts": { + "testw": "mocha -r @std/esm test/*.test.mjs --watch --recurse --watch-extensions mjs", + "test": "mocha -r @std/esm --timeout 10000 test/*.test.mjs", + "testlog": "DEBUG=true mocha -r @std/esm --timeout 10000 test/*.test.mjs", + "testci": "istanbul cover ./node_modules/.bin/_mocha --report lcovonly -- -R spec --recursive && codecov || true", + "s": "DEBUG=true node -r @std/esm examples/server" + }, + "author": "David Kebler", + "license": "MIT", + "repository": { + "type": "git", + "url": "git+https://github.com/uCOMmandIt/websocket.git" + }, + "keywords": [ + "node.js", + "socket", + "websocket", + "JSON", + "packet", + "serialize", + "named pipe", + "unix socket", + "TCP" + ], + "bugs": { + "url": "https://github.com/uCOMmandIt/websocket/issues" + }, + "homepage": "https://github.com/uCOMmandIt/websocket#readme", + "@std/esm": "cjs", + "devDependencies": { + "@std/esm": "^0.22.0", + "chai": "^4.1.2", + "chai-as-promised": "^7.1.1", + "codecov": "^3.0.0", + "istanbul": "^0.4.5", + "mocha": "^5.0.1", + "nodemon": "^1.15.1" + }, + "dependencies": { + "@uci/logger": "0.0.1", + "better-try-catch": "^0.6.2", + "clone": "^2.1.1", + "death": "^1.1.0", + "ws": "^5.1.0" + } +} diff --git a/readme.md b/readme.md new file mode 100644 index 0000000..5bed77f --- /dev/null +++ b/readme.md @@ -0,0 +1,13 @@ +# UComandIt Class Extenson of websocket module + +## What is it + + + +## TL/DR; + +## What's it good for + +## Why Bother + +## Getting Started diff --git a/src/index.mjs b/src/index.mjs new file mode 100644 index 0000000..c87bb44 --- /dev/null +++ b/src/index.mjs @@ -0,0 +1,4 @@ +import Socket from './socket' + +export { Socket as Socket } +export default Socket diff --git a/src/socket.mjs b/src/socket.mjs new file mode 100644 index 0000000..f2e3b17 --- /dev/null +++ b/src/socket.mjs @@ -0,0 +1,119 @@ + +import WebSocket from 'ws' +import btc from 'better-try-catch' +import _ON_DEATH from 'death' //this is intentionally ugly +import clone from 'clone' + +import logger from '@uci/logger' +let log = {} + +export default class Socket extends WebSocket.Server { + constructor (opts = {}) { + opts.host = opts.host || '0.0.0.0' + opts.port = opts.port || 8080 + super(opts) + this.id = opts.id || opts.name || 'Websocket:'+ new Date().getTime() + this.opts = opts // for use to recover from selected errors + //self bindings + this._listen = this._listen.bind(this) + this.create = this.create.bind(this) + log = logger.child( + { + repo:'uci-websocket', + npm:'@uci/websocket', + file:'src/socket.mjs', + class:'Socket', + id:this.id, + instance_created:new Date().getTime() + }) + + } // end constructor + + async create () { + + return new Promise((resolve, reject) => { + + _ON_DEATH( async () => { + log.info('\nhe\'s dead jim') + await this._destroy() + }) + process.once('SIGUSR2', async () => { + await this._destroy + process.kill(process.pid, 'SIGUSR2') + }) + + this.on('error', async (err) => { + log.fatal(err, 'socket server error') + console.error(err, 'socket server error') + reject(err) + }) + + this.on('listening', async () => { + this._listen() + log.info('websocket server created and listening at', this.address()) + resolve('websocket ready and listening') + }) + + }) + + } // end create + + registerPacketProcessor (func) { + this._packetProcess = func + } + + _listen () { + + this.on('connection', async (socket) => { + const send = this._send.bind(socket) + log.info('new consumer connecting') + socket.on('message', messageProcess.bind(this)) + + async function messageProcess (strPacket) { + log.info(' incoming packet on socket side') + let res = {} + let [err, packet] = btc(JSON.parse)(strPacket) + if (err) { res = {error: `Could not parse JSON: ${packet}`} } + else { + res = await this._packetProcess(clone(packet)) + if (Object.keys(res).length === 0) res = { error: 'socket packet command function likely did not return a promise', packet:packet} + } + res._header = clone(packet._header,false) //make sure return packet has header with id in case it was removed in processing + delete packet._header // remove before adding to response header as request + res._header.request = clone(packet,false) + res._header.responder = {name:this.name,instanceID:this.id} + res._header.socket = this.address() + if (!res.cmd) res.cmd = 'reply' // by default return command is 'reply' + log.info(await send(res)) + } + + }) // end connected consumer + log.info('socket created') + } // end listen + + async _destroy () { + log.info('closing down socket') + await this.close() + log.info('all connections closed....exiting') + process.exit() + } + + // default packet process, just a simple echo - replace + async _packetProcess (packet) { + return new Promise(resolve => { + resolve(packet) + }) + } + + // must have a consumer socket instance bound to call this!! + async _send (packet) { + return new Promise( (resolve,reject) => { + if (this.readyState !== 1 ) reject (`Connection not Ready, CODE:${this.readyState}`) + let [err,message] = btc(JSON.stringify)(packet) + if (err) reject(`Could not JSON stringify: ${packet}`) + this.send(message) + resolve('sent packet', packet) + }) + } + +} // end class diff --git a/test/.gitignore b/test/.gitignore new file mode 100644 index 0000000..b049ffa --- /dev/null +++ b/test/.gitignore @@ -0,0 +1,2 @@ +*.sock +/node_modules/ diff --git a/test/sockets/.gitignore b/test/sockets/.gitignore new file mode 100644 index 0000000..2ccbe46 --- /dev/null +++ b/test/sockets/.gitignore @@ -0,0 +1 @@ +/node_modules/ diff --git a/test/sockets/tcp-process.mjs b/test/sockets/tcp-process.mjs new file mode 100644 index 0000000..5eeb46a --- /dev/null +++ b/test/sockets/tcp-process.mjs @@ -0,0 +1,4 @@ +export default async function (packet) { + packet.payload = this.opts.port +':'+packet.payload + return packet +} diff --git a/test/sockets/tcpsocket-9080.mjs b/test/sockets/tcpsocket-9080.mjs new file mode 100644 index 0000000..af5b0a5 --- /dev/null +++ b/test/sockets/tcpsocket-9080.mjs @@ -0,0 +1,14 @@ +import { Socket } from '../../src' +import process from './tcp-process' + +let socket = new Socket({port:9080, name:'tcp socket 9080'}) + +socket.registerPacketProcessor(process) +; +(async () => { + + await socket.create() + +})().catch(err => { + console.error('FATAL: UNABLE TO START SYSTEM!\n',err) +}) diff --git a/test/sockets/tcpsocket-default.mjs b/test/sockets/tcpsocket-default.mjs new file mode 100644 index 0000000..86ef932 --- /dev/null +++ b/test/sockets/tcpsocket-default.mjs @@ -0,0 +1,15 @@ +import { Socket } from '../../src' +import process from './tcp-process' + +let socket = new Socket({name:'tcp socket'}) + +socket.registerPacketProcessor(process) + +; +(async () => { + + await socket.create() + +})().catch(err => { + console.error('FATAL: UNABLE TO START SYSTEM!\n',err) +}) diff --git a/test/sockets/usocket-default.mjs b/test/sockets/usocket-default.mjs new file mode 100644 index 0000000..3de8d32 --- /dev/null +++ b/test/sockets/usocket-default.mjs @@ -0,0 +1,11 @@ +import { Socket } from '../../src' + +let socket = new Socket({path:true,name:'default-unix-socket'}) + +; +(async () => { + await socket.create() + +})().catch(err => { + console.error('FATAL: UNABLE TO START SYSTEM!\n',err) +}) diff --git a/test/sockets/usocket.mjs b/test/sockets/usocket.mjs new file mode 100644 index 0000000..b4bac21 --- /dev/null +++ b/test/sockets/usocket.mjs @@ -0,0 +1,23 @@ +import { Socket } from '../../src' + +const USOCKET = 'usocket' + +let socket = new Socket({path:USOCKET,name:'default-unix-socket'}) + +socket.test = 'at socket => ' + +socket.registerPacketProcessor(async function (packet) { + return new Promise((resolve) => { + packet.payload = 'overwrite default processor from instance '+ this.test + packet.payload + resolve(packet) + }) +}) + +; +(async () => { + + await socket.create() + +})().catch(err => { + console.error('FATAL: UNABLE TO START SYSTEM!\n',err) +}) diff --git a/test/tcp.test.mjs b/test/tcp.test.mjs new file mode 100644 index 0000000..15510a2 --- /dev/null +++ b/test/tcp.test.mjs @@ -0,0 +1,78 @@ +import { spawn } from 'child_process' +import chai from 'chai' +import chaiAsPromised from 'chai-as-promised' +import btc from 'better-try-catch' +chai.use(chaiAsPromised) +const expect = chai.expect + +import { Consumer } from '../src' + +let tcpsocket_default = {} +let tcpsocket_9080 = {} + +describe('Connects and Processes a payload in a JSON packet via TCP Socket', function(){ + + before(async function(){ + tcpsocket_default = spawn('node',['-r', '@std/esm', './test/sockets/tcpsocket-default']) + tcpsocket_default.stdout.on('data', function(buf) { + console.log('[Socket]', String(buf)) + }) + + tcpsocket_9080 = spawn('node',['-r', '@std/esm', './test/sockets/tcpsocket-9080']) + tcpsocket_9080.stdout.on('data', function(buf) { + console.log('[Socket]', String(buf)) + }) + + }) + + after(async function(){ + tcpsocket_default.kill() + tcpsocket_9080.kill() + }) + + it('with default host and port', async function () { + let tcpconsumer_default = new Consumer({name:'tcpconsumer'}) + + let [err] = await btc(tcpconsumer_default.connect)() + if (err) { + console.log('unable to connect to socket to start test', tcpconsumer_default.port) + process.kill(process.pid, 'SIGTERM') + } + + tcpconsumer_default.registerPacketProcessor(async function (packet) { + return new Promise((resolve) => { + packet.payload = packet.payload +':local' + resolve(packet)}) + }) + + let packet = {payload:'tcp payload'} + let res = await tcpconsumer_default.send(packet) + expect(res.payload).to.equal('8080:tcp payload:local') + + }) // end tcp socket test + + it('with alternate port, and registered consumer processor', async function () { + + let tcpconsumer_9080 = new Consumer({port:9080, name:'tcp-consumer-9080'}) + + + let [err] = await btc(tcpconsumer_9080.connect)() + if (err) { + console.log('unable to connect to socket to start test', tcpconsumer_9080.port) + process.kill(process.pid, 'SIGTERM') + } + + tcpconsumer_9080.registerPacketProcessor(async function (packet) { + return new Promise((resolve) => { + packet.payload = packet.payload +':local' + resolve(packet)}) + }) + + let packet = {payload:'tcp payload'} + let res = await tcpconsumer_9080.send(packet) + expect(res.payload).to.equal('9080:tcp payload:local') + + }) // end tcp socket 2 test + + +}) // end describe diff --git a/test/usocket-default.test.mjs b/test/usocket-default.test.mjs new file mode 100644 index 0000000..001f11b --- /dev/null +++ b/test/usocket-default.test.mjs @@ -0,0 +1,83 @@ +import { spawn } from 'child_process' +import chai from 'chai' +import chaiAsPromised from 'chai-as-promised' +import btc from 'better-try-catch' +chai.use(chaiAsPromised) +const expect = chai.expect + +import { Consumer } from '../src' + +const SOCKET_FILE = 'usocket-default' + +let consumer = new Consumer({path:true,name:'unix-consumer'}) +let consumer2 = new Consumer({path:true, name:'unix-consumer2'}) + +let socket = {} + +describe('Connects and Processes a payload via Unix Socket using JSON packet with defaults', function(){ + + before(async function(){ + socket = spawn('node',['-r', '@std/esm', './test/sockets/'+SOCKET_FILE]) + socket.stdout.on('data', function(buf) { + console.log('[Socket]', String(buf)) + }) + }) + + after(async function(){ + socket.kill() + }) + + const TIMES = 3000 + + it(`Tests unix socket with default echo JSON packet procssing with ${TIMES} packets sent`, async function () { + + let [err] = await btc(consumer.connect)() + if (err) { + console.log('unable to connect to socket to start test', consumer.path) + process.kill(process.pid, 'SIGTERM') + } + + consumer.registerPacketProcessor(async function (packet) { + return new Promise((resolve) => { + packet.times += 1 + if (packet.times === TIMES) packet.payload = 'local1:'+packet.payload + resolve(packet)}) + }) + + + let packet = {payload:'payload', times:0} + for (let i = 1; i <= TIMES; i++) { + packet = await consumer.send(packet) + } + expect(packet.payload+':'+packet.times).to.equal('local1:payload:'+TIMES) + + }) // end unix socket test + + + it(`unix socket with two consumers alternating packets, ${TIMES} packets each and local processing`, async function () { + + + let [err] = await btc(consumer2.connect)() + if (err) { + console.log('unable to connect to socket to start test', consumer.path) + process.kill(process.pid, 'SIGTERM') + } + + consumer2.registerPacketProcessor(async function (packet) { + return new Promise((resolve) => { + packet.times += 1 + if (packet.times === TIMES) packet.payload = 'local2:'+packet.payload + resolve(packet)}) + }) + + let packet = {consumer:1, payload:'payload', times:-1} + for (let i = 0; i < TIMES; i++) { + packet = await consumer.send(packet) + if (packet.times === TIMES) packet.times = 1 + packet = await consumer2.send(packet) + } + expect(packet.payload+':'+packet.times).to.equal('local2:local1:payload:'+TIMES) + + }) // end unix socket test + +}) // end describe diff --git a/test/usocket.test.mjs b/test/usocket.test.mjs new file mode 100644 index 0000000..51d7341 --- /dev/null +++ b/test/usocket.test.mjs @@ -0,0 +1,51 @@ +import { spawn } from 'child_process' +import chai from 'chai' +import chaiAsPromised from 'chai-as-promised' +import btc from 'better-try-catch' +chai.use(chaiAsPromised) +const expect = chai.expect + +import { Consumer } from '../src' + +const SOCKET_FILE = 'usocket' + +let consumer = new Consumer({path:SOCKET_FILE,name:'unix-consumer'}) + +// const delay = time => new Promise(res=>setTimeout(()=>res(),time)) + +let socket = {} + +describe('Connects and Processes a payload via Unix Socket using JSON packet with alt processor', function(){ + + before(async function(){ + socket = spawn('node',['-r', '@std/esm', './test/sockets/'+SOCKET_FILE]) + socket.stdout.on('data', function(buf) { + console.log('[Socket]', String(buf)) + }) + }) + + after(async function(){ + socket.kill() + }) + + it('Tests alternate JSON packet procssing at socket and consumer', async function () { + + let [err] = await btc(consumer.connect)() + if (err) { + console.log('unable to connect to socket to start test', consumer.path) + process.kill(process.pid, 'SIGTERM') + } + + consumer.registerPacketProcessor(async function (packet) { + return new Promise((resolve) => { + packet.payload = 'local:'+packet.payload + resolve(packet)}) + }) + let packet = { payload:'payload'} + let res = await consumer.send(packet) + expect(res.payload).to.equal('local:overwrite default processor from instance at socket => payload') + + + }) // end unix socket test + +}) // end describe