working websocket (server) with json packet processing. Goes with uci-websocket-client repo

This commit is contained in:
David Kebler 2018-04-05 15:05:59 -07:00
parent e254e058a7
commit 4651fd4c2d
20 changed files with 562 additions and 0 deletions

33
.eslintrc.js Normal file
View file

@ -0,0 +1,33 @@
module.exports = {
"ecmaFeatures": {
"modules": true,
"spread" : true,
"restParams" : true
},
"env": {
"es6": true,
"node": true,
"mocha": true
},
"parserOptions": {
"ecmaVersion": 2017,
"sourceType": "module"
},
"extends": "eslint:recommended",
"rules": {
"indent": [
"error",
2
],
"no-console": 0,
"semi": ["error", "never"],
"linebreak-style": [
"error",
"unix"
],
"quotes": [
"error",
"single"
]
}
}

5
.gitignore vendored Normal file
View file

@ -0,0 +1,5 @@
/node_modules/
/coverage/
/syncd/
*.log
/temp/

4
.npmignore Normal file
View file

@ -0,0 +1,4 @@
tests/
test/
*.test.js
testing/

12
.travis.yml Normal file
View file

@ -0,0 +1,12 @@
language: node_js
node_js:
- '7.10'
- 'node'
sudo: false
script: npm test
after_success:
- bash <(curl -s https://codecov.io/bash) || echo "Codecov did not collect coverage reports"

2
examples/.gitignore vendored Normal file
View file

@ -0,0 +1,2 @@
*.sock
/node_modules/

37
examples/server.mjs Normal file
View file

@ -0,0 +1,37 @@
import { Socket } from '../src'
;
(async () => {
class Test extends Socket {
constructor(opts) {
super(opts)
}
// async _packetProcess(packet) {
// console.log('packet being processed at socket', packet)
// if (packet.cmd) return await this[packet.cmd](packet.data,packet.name)
// return {error: 'no command in packet', packet: packet }
// }
//
// async doit(data,name) {
// return new Promise(resolve => {
// let res = {}
// console.log('data sent to doit = ', data)
// res.status ='success'
// res.name = name
// res.cmd = 'reply'
// res.data = data
// resolve(res)
// })
// }
}
// let test = new Test()
let test = new Test({port:8090})
console.log(await test.create())
})().catch(err => {
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)
})

51
package.json Normal file
View file

@ -0,0 +1,51 @@
{
"name": "@uci/websocket",
"version": "0.1.5",
"description": "JSON packet host websocket server",
"main": "src",
"scripts": {
"testw": "mocha -r @std/esm test/*.test.mjs --watch --recurse --watch-extensions mjs",
"test": "mocha -r @std/esm --timeout 10000 test/*.test.mjs",
"testlog": "DEBUG=true mocha -r @std/esm --timeout 10000 test/*.test.mjs",
"testci": "istanbul cover ./node_modules/.bin/_mocha --report lcovonly -- -R spec --recursive && codecov || true",
"s": "DEBUG=true node -r @std/esm examples/server"
},
"author": "David Kebler",
"license": "MIT",
"repository": {
"type": "git",
"url": "git+https://github.com/uCOMmandIt/websocket.git"
},
"keywords": [
"node.js",
"socket",
"websocket",
"JSON",
"packet",
"serialize",
"named pipe",
"unix socket",
"TCP"
],
"bugs": {
"url": "https://github.com/uCOMmandIt/websocket/issues"
},
"homepage": "https://github.com/uCOMmandIt/websocket#readme",
"@std/esm": "cjs",
"devDependencies": {
"@std/esm": "^0.22.0",
"chai": "^4.1.2",
"chai-as-promised": "^7.1.1",
"codecov": "^3.0.0",
"istanbul": "^0.4.5",
"mocha": "^5.0.1",
"nodemon": "^1.15.1"
},
"dependencies": {
"@uci/logger": "0.0.1",
"better-try-catch": "^0.6.2",
"clone": "^2.1.1",
"death": "^1.1.0",
"ws": "^5.1.0"
}
}

13
readme.md Normal file
View file

@ -0,0 +1,13 @@
# UComandIt Class Extenson of websocket module
## What is it
## TL/DR;
## What's it good for
## Why Bother
## Getting Started

4
src/index.mjs Normal file
View file

@ -0,0 +1,4 @@
import Socket from './socket'
export { Socket as Socket }
export default Socket

119
src/socket.mjs Normal file
View file

@ -0,0 +1,119 @@
import WebSocket from 'ws'
import btc from 'better-try-catch'
import _ON_DEATH from 'death' //this is intentionally ugly
import clone from 'clone'
import logger from '@uci/logger'
let log = {}
export default class Socket extends WebSocket.Server {
constructor (opts = {}) {
opts.host = opts.host || '0.0.0.0'
opts.port = opts.port || 8080
super(opts)
this.id = opts.id || opts.name || 'Websocket:'+ new Date().getTime()
this.opts = opts // for use to recover from selected errors
//self bindings
this._listen = this._listen.bind(this)
this.create = this.create.bind(this)
log = logger.child(
{
repo:'uci-websocket',
npm:'@uci/websocket',
file:'src/socket.mjs',
class:'Socket',
id:this.id,
instance_created:new Date().getTime()
})
} // end constructor
async create () {
return new Promise((resolve, reject) => {
_ON_DEATH( async () => {
log.info('\nhe\'s dead jim')
await this._destroy()
})
process.once('SIGUSR2', async () => {
await this._destroy
process.kill(process.pid, 'SIGUSR2')
})
this.on('error', async (err) => {
log.fatal(err, 'socket server error')
console.error(err, 'socket server error')
reject(err)
})
this.on('listening', async () => {
this._listen()
log.info('websocket server created and listening at', this.address())
resolve('websocket ready and listening')
})
})
} // end create
registerPacketProcessor (func) {
this._packetProcess = func
}
_listen () {
this.on('connection', async (socket) => {
const send = this._send.bind(socket)
log.info('new consumer connecting')
socket.on('message', messageProcess.bind(this))
async function messageProcess (strPacket) {
log.info(' incoming packet on socket side')
let res = {}
let [err, packet] = btc(JSON.parse)(strPacket)
if (err) { res = {error: `Could not parse JSON: ${packet}`} }
else {
res = await this._packetProcess(clone(packet))
if (Object.keys(res).length === 0) res = { error: 'socket packet command function likely did not return a promise', packet:packet}
}
res._header = clone(packet._header,false) //make sure return packet has header with id in case it was removed in processing
delete packet._header // remove before adding to response header as request
res._header.request = clone(packet,false)
res._header.responder = {name:this.name,instanceID:this.id}
res._header.socket = this.address()
if (!res.cmd) res.cmd = 'reply' // by default return command is 'reply'
log.info(await send(res))
}
}) // end connected consumer
log.info('socket created')
} // end listen
async _destroy () {
log.info('closing down socket')
await this.close()
log.info('all connections closed....exiting')
process.exit()
}
// default packet process, just a simple echo - replace
async _packetProcess (packet) {
return new Promise(resolve => {
resolve(packet)
})
}
// must have a consumer socket instance bound to call this!!
async _send (packet) {
return new Promise( (resolve,reject) => {
if (this.readyState !== 1 ) reject (`Connection not Ready, CODE:${this.readyState}`)
let [err,message] = btc(JSON.stringify)(packet)
if (err) reject(`Could not JSON stringify: ${packet}`)
this.send(message)
resolve('sent packet', packet)
})
}
} // end class

2
test/.gitignore vendored Normal file
View file

@ -0,0 +1,2 @@
*.sock
/node_modules/

1
test/sockets/.gitignore vendored Normal file
View file

@ -0,0 +1 @@
/node_modules/

View file

@ -0,0 +1,4 @@
export default async function (packet) {
packet.payload = this.opts.port +':'+packet.payload
return packet
}

View file

@ -0,0 +1,14 @@
import { Socket } from '../../src'
import process from './tcp-process'
let socket = new Socket({port:9080, name:'tcp socket 9080'})
socket.registerPacketProcessor(process)
;
(async () => {
await socket.create()
})().catch(err => {
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)
})

View file

@ -0,0 +1,15 @@
import { Socket } from '../../src'
import process from './tcp-process'
let socket = new Socket({name:'tcp socket'})
socket.registerPacketProcessor(process)
;
(async () => {
await socket.create()
})().catch(err => {
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)
})

View file

@ -0,0 +1,11 @@
import { Socket } from '../../src'
let socket = new Socket({path:true,name:'default-unix-socket'})
;
(async () => {
await socket.create()
})().catch(err => {
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)
})

23
test/sockets/usocket.mjs Normal file
View file

@ -0,0 +1,23 @@
import { Socket } from '../../src'
const USOCKET = 'usocket'
let socket = new Socket({path:USOCKET,name:'default-unix-socket'})
socket.test = 'at socket => '
socket.registerPacketProcessor(async function (packet) {
return new Promise((resolve) => {
packet.payload = 'overwrite default processor from instance '+ this.test + packet.payload
resolve(packet)
})
})
;
(async () => {
await socket.create()
})().catch(err => {
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)
})

78
test/tcp.test.mjs Normal file
View file

@ -0,0 +1,78 @@
import { spawn } from 'child_process'
import chai from 'chai'
import chaiAsPromised from 'chai-as-promised'
import btc from 'better-try-catch'
chai.use(chaiAsPromised)
const expect = chai.expect
import { Consumer } from '../src'
let tcpsocket_default = {}
let tcpsocket_9080 = {}
describe('Connects and Processes a payload in a JSON packet via TCP Socket', function(){
before(async function(){
tcpsocket_default = spawn('node',['-r', '@std/esm', './test/sockets/tcpsocket-default'])
tcpsocket_default.stdout.on('data', function(buf) {
console.log('[Socket]', String(buf))
})
tcpsocket_9080 = spawn('node',['-r', '@std/esm', './test/sockets/tcpsocket-9080'])
tcpsocket_9080.stdout.on('data', function(buf) {
console.log('[Socket]', String(buf))
})
})
after(async function(){
tcpsocket_default.kill()
tcpsocket_9080.kill()
})
it('with default host and port', async function () {
let tcpconsumer_default = new Consumer({name:'tcpconsumer'})
let [err] = await btc(tcpconsumer_default.connect)()
if (err) {
console.log('unable to connect to socket to start test', tcpconsumer_default.port)
process.kill(process.pid, 'SIGTERM')
}
tcpconsumer_default.registerPacketProcessor(async function (packet) {
return new Promise((resolve) => {
packet.payload = packet.payload +':local'
resolve(packet)})
})
let packet = {payload:'tcp payload'}
let res = await tcpconsumer_default.send(packet)
expect(res.payload).to.equal('8080:tcp payload:local')
}) // end tcp socket test
it('with alternate port, and registered consumer processor', async function () {
let tcpconsumer_9080 = new Consumer({port:9080, name:'tcp-consumer-9080'})
let [err] = await btc(tcpconsumer_9080.connect)()
if (err) {
console.log('unable to connect to socket to start test', tcpconsumer_9080.port)
process.kill(process.pid, 'SIGTERM')
}
tcpconsumer_9080.registerPacketProcessor(async function (packet) {
return new Promise((resolve) => {
packet.payload = packet.payload +':local'
resolve(packet)})
})
let packet = {payload:'tcp payload'}
let res = await tcpconsumer_9080.send(packet)
expect(res.payload).to.equal('9080:tcp payload:local')
}) // end tcp socket 2 test
}) // end describe

View file

@ -0,0 +1,83 @@
import { spawn } from 'child_process'
import chai from 'chai'
import chaiAsPromised from 'chai-as-promised'
import btc from 'better-try-catch'
chai.use(chaiAsPromised)
const expect = chai.expect
import { Consumer } from '../src'
const SOCKET_FILE = 'usocket-default'
let consumer = new Consumer({path:true,name:'unix-consumer'})
let consumer2 = new Consumer({path:true, name:'unix-consumer2'})
let socket = {}
describe('Connects and Processes a payload via Unix Socket using JSON packet with defaults', function(){
before(async function(){
socket = spawn('node',['-r', '@std/esm', './test/sockets/'+SOCKET_FILE])
socket.stdout.on('data', function(buf) {
console.log('[Socket]', String(buf))
})
})
after(async function(){
socket.kill()
})
const TIMES = 3000
it(`Tests unix socket with default echo JSON packet procssing with ${TIMES} packets sent`, async function () {
let [err] = await btc(consumer.connect)()
if (err) {
console.log('unable to connect to socket to start test', consumer.path)
process.kill(process.pid, 'SIGTERM')
}
consumer.registerPacketProcessor(async function (packet) {
return new Promise((resolve) => {
packet.times += 1
if (packet.times === TIMES) packet.payload = 'local1:'+packet.payload
resolve(packet)})
})
let packet = {payload:'payload', times:0}
for (let i = 1; i <= TIMES; i++) {
packet = await consumer.send(packet)
}
expect(packet.payload+':'+packet.times).to.equal('local1:payload:'+TIMES)
}) // end unix socket test
it(`unix socket with two consumers alternating packets, ${TIMES} packets each and local processing`, async function () {
let [err] = await btc(consumer2.connect)()
if (err) {
console.log('unable to connect to socket to start test', consumer.path)
process.kill(process.pid, 'SIGTERM')
}
consumer2.registerPacketProcessor(async function (packet) {
return new Promise((resolve) => {
packet.times += 1
if (packet.times === TIMES) packet.payload = 'local2:'+packet.payload
resolve(packet)})
})
let packet = {consumer:1, payload:'payload', times:-1}
for (let i = 0; i < TIMES; i++) {
packet = await consumer.send(packet)
if (packet.times === TIMES) packet.times = 1
packet = await consumer2.send(packet)
}
expect(packet.payload+':'+packet.times).to.equal('local2:local1:payload:'+TIMES)
}) // end unix socket test
}) // end describe

51
test/usocket.test.mjs Normal file
View file

@ -0,0 +1,51 @@
import { spawn } from 'child_process'
import chai from 'chai'
import chaiAsPromised from 'chai-as-promised'
import btc from 'better-try-catch'
chai.use(chaiAsPromised)
const expect = chai.expect
import { Consumer } from '../src'
const SOCKET_FILE = 'usocket'
let consumer = new Consumer({path:SOCKET_FILE,name:'unix-consumer'})
// const delay = time => new Promise(res=>setTimeout(()=>res(),time))
let socket = {}
describe('Connects and Processes a payload via Unix Socket using JSON packet with alt processor', function(){
before(async function(){
socket = spawn('node',['-r', '@std/esm', './test/sockets/'+SOCKET_FILE])
socket.stdout.on('data', function(buf) {
console.log('[Socket]', String(buf))
})
})
after(async function(){
socket.kill()
})
it('Tests alternate JSON packet procssing at socket and consumer', async function () {
let [err] = await btc(consumer.connect)()
if (err) {
console.log('unable to connect to socket to start test', consumer.path)
process.kill(process.pid, 'SIGTERM')
}
consumer.registerPacketProcessor(async function (packet) {
return new Promise((resolve) => {
packet.payload = 'local:'+packet.payload
resolve(packet)})
})
let packet = { payload:'payload'}
let res = await consumer.send(packet)
expect(res.payload).to.equal('local:overwrite default processor from instance at socket => payload')
}) // end unix socket test
}) // end describe