removed context processing from modules (will be done in uci-base)

added connection retry for consumer when connection to socket fails
refactored tests and examples to suit
Got a start on readme file
This commit is contained in:
David Kebler 2018-01-30 16:59:57 -08:00
parent 2d38eab240
commit 3beb53d2a3
19 changed files with 133 additions and 223 deletions

1
.gitignore vendored
View file

@ -2,3 +2,4 @@
/coverage/
/syncd/
*.log
/temp/

View file

@ -9,20 +9,18 @@ let packet1 = {name: 'client1', cmd:'doit', data:'data sent by client1'}
let packet2 = {name: 'client2', cmd:'doit', data:'data sent by client2'}
// This is your client handler object waiting on a message to do something
let app = {
processIt: function processPacket (packet) {
console.log(`Packet from ${packet.name} Processed by Socket: ${packet.status}`)
},
pp: 'processIt'
const process = function (packet) {
console.log(`Packet from ${packet.name} Processed by Socket: ${packet.status}`)
}
Object.assign(client1,app)
// alternatively if you pass an object to the connect method it will merge whatever you pass
// or even better extend the consumer class and then make instances. See client2
client1.registerPacketProcessor(process)
client2.packet._process = process
;
(async () => {
await Promise.all([client1.connect(),client2.connect(app)])
await Promise.all([client1.connect(),client2.connect()])
await Promise.all([client1.send(packet1),client2.send(packet2)])
})().catch(err => {

View file

@ -6,11 +6,9 @@ const USOCKET = __dirname + '/sample.sock'
class Client extends Consumer {
constructor(path,opts) {
super(path,opts)
this.pp = 'processIt'
}
async processIt(packet) {
console.log(`Packet from ${packet.name} Processed by Socket: ${packet.status}`)
this.packet._process = async function (packet) {
console.log(`Packet from ${packet.name} Processed by Socket: ${packet.status}`)
}
}
}

View file

@ -10,7 +10,7 @@ const USOCKET = __dirname + '/sample.sock'
super(path,opts)
}
async processPacket(packet) {
async _process(packet) {
console.log('packet being processed')
console.dir(packet)
if (packet.cmd) return await this[packet.cmd](packet.data,packet.name)
@ -29,6 +29,7 @@ const USOCKET = __dirname + '/sample.sock'
}
let test = new Test(USOCKET,{path: USOCKET, name:'example-socket'})
test.packet = test
await test.create()
})().catch(err => {

View file

@ -6,6 +6,7 @@
"scripts": {
"testw": "mocha -r @std/esm test/*.test.mjs --watch --recurse --watch-extensions mjs",
"test": "mocha -r @std/esm test/*.test.mjs",
"testlog": "mocha -r @std/esm test/*.test.mjs | ./node_modules/.bin/bunyan",
"testci": "istanbul cover ./node_modules/.bin/_mocha --report lcovonly -- -R spec --recursive && codecov || true",
"s": "node -r @std/esm examples/server | ./node_modules/.bin/bunyan",
"devs": "./node_modules/.bin/nodemon -r @std/esm -e mjs examples/server",

View file

@ -1,4 +1,31 @@
## Extensons of Nodejs net 'Socket` and `Server` classes
# UComandIt Extenson of Nodejs `net` `Socket` and `Server` classes
Sockets now pass JSON objects (message packets)
Allows extension or passing of custom packet processors (e.g. MQTT) at each end of the socket.
## What is it
the Server and Socket Classes in the `'net'` package are rudimentary. This library extends these two classes into something more useable and robust.
I have changed the class name convention to make more "sense" (at least to me). `Server` extends to `Socket` and `Socket` extends to `Consumer`
With these you then `.create()` a `Socket` instance and `.connect()` a `Consumer` instance to a `Socket` instance.
Both sockets by default stream JSON objects between each other better know as packets. The properties of these json packets can then be used to do all manner of processing at either the socket or consumer end. The classes allow attaching a custom packet processor (MQTT if you so desire). See `@uci/base` for an example of extending these classes with a particular processor that supports a simple 'cmd' packet property protocol for calling functions at either the socket or the consumer.
## TL/DR;
See the one server and two client files in the `/examples` folder, Fire up the server file and then in another termial one of the two client files.
**Note**: these are ES6 (`.mjs`) modules you must either
1) use `-r @std/esm` when running as in `node -r @std/esm server` or
2) must use babel or the like.
This repo DOES NOT provide a /dist folder with preprocessed ES6 since as of node 9+ ES6 modules (`.mjs`) mixed with CommonJS (`.js`) are supported with the [@std/esm](https://github.com/standard-things/esm) flag.
## What's it good for
With the two classes you can create either an IPC(Unix) or TCP socket/consumer. This means you can have inter process communication between the processes on one machine (Unix) and at the same time send/receive packets to processes on other machines (TCP). See UCI's [`base`]() repo for a class which allows you to make intances that do that.
## Why Bother
You could just start with the net classes like I did but look but I've made a number of useful additions without creating a lot of dependencies. Iv'e made it super easy to fire off instances with sensible defaults, socket data stream chunking and processing of JSON to/from JS objects, consumer connect retry, registering custom packet processor.
On the flip side you could just use the MQTT repos but they require running a broker and there is a lot of overhead before you get down to doing `something` with the packet you just sent and received. That's really overkill and slow for IPC on the same machine. It's even overkill for direct TCP communication between a couple SBCs in local system. Still since you can change the packet processor with a register command it's easy to process MQTT packets with these classes.
## Getting Started

View file

@ -2,7 +2,7 @@ import { Socket } from 'net'
import btc from 'better-try-catch'
import bunyan from 'bunyan'
// import Stream from 'delimiter-stream'
import JsonStream from './json'
import JsonStream from './json-stream'
export default class Consumer extends Socket {
constructor (path={}, opts={}) {
@ -44,61 +44,75 @@ export default class Consumer extends Socket {
ready() {return this._ready}
async connect (context) {
async connect () {
if (context) this.packet.context = context
else this.packet.context = this
this.listen()
this.log.info('listening')
// if (context) this.packet.context = context
// else this.packet.context = this
return new Promise( (resolve,reject) => {
const connect = () => {
super.connect({ port:this.port, host:this.host, path: this.path })
}
const timeout = setTimeout(() =>{
reject(`unable to connect in ${this.timeout*10}ms`)
}
,this.timeout*10)
this.once('connect', async () => {
clearTimeout(timeout)
this.listen()
this.log.info({path:this.path},'connected waiting for socket ready handshake')
this.setKeepAlive(this.keepAlive)
let [err, res] = await btc(isReady).bind(this)(this.ready, this.wait, this.timeout)
if (err) reject(err)
this.log.info('handshake done, authenticating')
// TODO authenticate here by encrypting a payload with private key and sending that.
// await btc(authenticate)
resolve(res)
})
this.on('error', async (err) => {
if (err.code === 'EISCONN') {
return resolve('ready')
}
return reject(err)
this.log.warn(err.code)
setTimeout(() =>{
this.log.warn('retry connect')
connect()
}
,this.wait*10)
})
super.connect({ port:this.port, host:this.host, path: this.path }, async () => {
this.log.info({path:this.path},'connecting')
this.setKeepAlive(this.keepAlive)
let [err, res] = await btc(isReady).bind(this)(this.ready, this.wait, this.timeout)
if (err) reject(err)
this.log.info('handshake done, connected')
resolve(res)
})
connect()
}) //end promise
}
async send(packet) {
await this.write(this.stream.serialize(packet))
// handle error here?
}
async listen () {
this.log.info('listening for incoming packets from socket')
this.on('data', this.stream.onData)
this.stream.on('message', messageProcess.bind(this))
async function messageProcess (packet) {
// console.log('incoming packet from server', packet)
if (packet.ready) {
this._ready = true
return }
// console.log('consumer processor',this.packet._process)
await this.packet._process(packet)
}
}
registerPacketContext(obj) {
this.packet.context = obj
async send(packet) {
await this.write(this.stream.serialize(packet))
// TODO handle error here? and/or await response if required before allowing more sending
}
// TODO register alt stream processor (emit 'message' with JSON, serialize function, onData method for raw socket chucks)
// TODO register authenciation function (set up default)
registerPacketProcessor (func) {
this.packet._process = func
}
@ -111,7 +125,7 @@ function isReady(ready, wait=30, timeout=1000) {
let time = 0
return new Promise((resolve, reject) => {
(function waitReady(){
if (time > timeout) return reject(`timeout trying to connect after ${timeout}ms`)
if (time > timeout) return reject(`timeout waiting for socket ready handshake - ${timeout}ms`)
if (ready()) return resolve('ready')
log.info(`waiting ${wait}ms for handshake`)
time += wait

View file

@ -4,18 +4,16 @@ import btc from 'better-try-catch'
import ON_DEATH from 'death' //this is intentionally ugly
import bunyan from 'bunyan'
// import Stream from 'delimiter-stream'
import JsonStream from './json'
import JsonStream from './json-stream'
export default class Socket extends Server {
constructor (path={},opts={}) {
super()
// set or tcp socket
if (typeof(path)!=='string') {
if (!path.host || !path.port) opts = path
this.listen_opts = { host: path.host || '0.0.0.0', port: path.port || 8080}
} else this.listen_opts = { path: path }
// default packet processing - simple echo server
this.packet = {
this.packet = { // default packet processing - simple echo server
_process: (packet) => {
packet.res='echoed'
return packet }
@ -33,10 +31,7 @@ export default class Socket extends Server {
} // end constructor
async create (context) {
if (context) this.packet.context = context
else this.packet.context = this
async create () {
return new Promise( async (resolve,reject) => {
@ -91,7 +86,6 @@ export default class Socket extends Server {
stream.on('message', messageProcess.bind(this))
async function messageProcess (packet) {
// console.log(this.packet._process.toString())
socket.write(stream.serialize(await this.packet._process(packet)))
}
@ -111,10 +105,6 @@ export default class Socket extends Server {
process.exit()
}
registerPacketContext(obj) {
this.packet.context = obj
}
registerPacketProcessor (func) {
this.packet._process = func
}

View file

@ -1,5 +1,4 @@
export default async function (packet) {
// console.log(this.context)
packet.payload = this.context.port +':'+packet.payload
packet.payload = this.port +':'+packet.payload
return packet
}

View file

@ -3,12 +3,12 @@ import process from './tcp-process'
let socket = new Socket({port:9080, name:'tcp socket 9080'})
let app = {port: socket.listen_opts.port}
socket.registerPacketProcessor(process)
socket.packet.port = socket.listen_opts.port
;
(async () => {
await socket.create(app)
await socket.create()
})().catch(err => {
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)

View file

@ -1,15 +1,15 @@
import { Socket } from '../../src'
import process from './tcp-process'
let socket = new Socket({name:'tcp socket'})
let socket = new Socket({name:'tcp socket', log:false})
let app = {port: socket.listen_opts.port}
socket.registerPacketProcessor(process)
socket.packet.port = socket.listen_opts.port
;
(async () => {
await socket.create(app)
await socket.create()
})().catch(err => {
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)

View file

@ -4,12 +4,10 @@ const USOCKET = __dirname + '/test.sock'
let socket = new Socket(USOCKET,{name:'unix socket'})
socket.registerPacketContext({test:'alt'})
socket.packet.test = 'local'
socket.packet.test = 'at socket => '
socket.registerPacketProcessor(async function (packet) {
packet.payload = 'overwrite default processor from instance '+packet.payload
packet.payload = 'overwrite default processor from instance '+ this.test + packet.payload
return packet
})

View file

@ -2,7 +2,7 @@ import { Socket } from '../../src'
const USOCKET = __dirname + '/test.sock'
let socket = new Socket(USOCKET,{name:'unix socket'})
let socket = new Socket(USOCKET,{name:'unix socket', log:false})
;
(async () => {

View file

@ -1,25 +0,0 @@
import { Socket } from '../../src'
const USOCKET = __dirname + '/test.sock'
let socket = new Socket(USOCKET,{name:'unix socket'})
// over writes default packet processing
// socket.registerPacketContext({test:'alt'})
socket.packet.test = 'local'
socket.registerPacketProcessor(async function (packet) {
packet.payload = packet.payload +':'+this.test+':'+this.context.test
return packet
})
;
(async () => {
await socket.create({test:'alt'})
})().catch(err => {
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)
})

View file

@ -25,7 +25,6 @@ describe('Connects and Processes a payload in a JSON packet via TCP Socket', fun
console.log('[Socket]', String(buf))
})
await delay(500) // wait for sockets to get going
})
after(async function(){
@ -34,10 +33,17 @@ describe('Connects and Processes a payload in a JSON packet via TCP Socket', fun
})
it('with default host and port', async function () {
let tcpconsumer_default = new Consumer({name:'tcpconsumer'})
let tcpconsumer_default = new Consumer({name:'tcpconsumer', log:false})
return new Promise(async function (resolve, reject) {
let [err] = await btc(tcpconsumer_default.connect)()
if (err) {
console.log('unable to connect to socket to start test', tcpconsumer_default.port)
process.kill(process.pid, 'SIGTERM')
}
tcpconsumer_default.packet._process = function (packet) {
try {
expect(packet.payload).to.equal('8080:tcp payload')
@ -48,8 +54,6 @@ describe('Connects and Processes a payload in a JSON packet via TCP Socket', fun
}
}
let [err] = await btc(tcpconsumer_default.connect)()
if (err) reject(err)
let packet = {payload:'tcp payload'}
tcpconsumer_default.send(packet)
@ -63,6 +67,13 @@ describe('Connects and Processes a payload in a JSON packet via TCP Socket', fun
return new Promise(async function (resolve, reject) {
let [err] = await btc(tcpconsumer_9080.connect)()
if (err) {
console.log('unable to connect to socket to start test', tcpconsumer_9080.port)
process.kill(process.pid, 'SIGTERM')
}
tcpconsumer_9080.registerPacketProcessor(function (packet) {
try {
expect(packet.payload).to.equal('9080:tcp payload')
@ -73,8 +84,6 @@ describe('Connects and Processes a payload in a JSON packet via TCP Socket', fun
}
})
let [err] = await btc(tcpconsumer_9080.connect)()
if (err) reject(err)
let packet = {payload:'tcp payload'}
tcpconsumer_9080.send(packet)

View file

@ -45,7 +45,7 @@ describe('Connects and Processes a payload via Unix Socket using JSON packet wit
if (this.times!==11) return
try {
expect(packet.payload).to.equal('overwrite default processor from instance unix payload')
expect(packet.payload).to.equal('overwrite default processor from instance at socket => unix payload')
resolve()
}
catch(error) {
@ -79,7 +79,7 @@ describe('Connects and Processes a payload via Unix Socket using JSON packet wit
if (this.times!==11) return
try {
expect(packet.payload).to.equal('overwrite default processor from instance consumer 1 unix payload')
expect(packet.payload).to.equal('overwrite default processor from instance at socket => consumer 1 unix payload')
resolve()
}
catch(error) {

View file

@ -10,7 +10,7 @@ import { Consumer } from '../src'
const USOCKET = __dirname + '/sockets/test.sock'
const SOCKET_FILE = 'usocket-default'
let consumer = new Consumer(USOCKET, {name:'unix-consumer'})
let consumer = new Consumer(USOCKET, {name:'unix-consumer', log:false})
let consumer2 = new Consumer(USOCKET, {name:'unix-consumer2'})
const delay = time => new Promise(res=>setTimeout(()=>res(),time))
@ -24,8 +24,7 @@ describe('Connects and Processes a payload via Unix Socket using JSON packet wit
socket.stdout.on('data', function(buf) {
console.log('[Socket]', String(buf))
})
await delay(500) // wait for sockets to get going
// await delay(500) // wait for sockets to get going
})
after(async function(){
@ -38,16 +37,20 @@ describe('Connects and Processes a payload via Unix Socket using JSON packet wit
return new Promise(async function (resolve, reject) {
let [err] = await btc(consumer.connect)()
if (err) {
console.log('unable to connect to socket to start test', consumer.path)
process.kill(process.pid, 'SIGTERM')
}
setTimeout(() =>{ reject('10 packets not received in time')},1900)
consumer.packet._process = function (packet) {
this.times++
if (this.times!==11) return
packet.payload = packet.payload + this.context.test
try {
expect(packet.payload).to.equal('unix payload plus context at connect')
expect(packet.payload).to.equal('unix payload')
resolve()
}
catch(error) {
@ -55,10 +58,8 @@ describe('Connects and Processes a payload via Unix Socket using JSON packet wit
}
}
let [err] = await btc(consumer.connect)({test:' plus context at connect'})
if (err) reject(err)
let packet = {payload:'unix payload'}
for (var i = 0; i < 11; i++) {
let packet = {payload:'unix payload'}
consumer.send(packet)
}
@ -71,7 +72,12 @@ describe('Connects and Processes a payload via Unix Socket using JSON packet wit
consumer.packet.times = 0
consumer.packet.test = ':local'
consumer.registerPacketContext({test:':added'})
let [err] = await btc(consumer2.connect)()
if (err) {
console.log('unable to connect to socket to start test', consumer.path)
process.kill(process.pid, 'SIGTERM')
}
return new Promise(async function (resolve, reject) {
@ -82,10 +88,10 @@ describe('Connects and Processes a payload via Unix Socket using JSON packet wit
// console.log(this.times,packet.payload)
if (this.times!==11) return
packet.payload = packet.payload + this.context.test + this.test
packet.payload = packet.payload + this.test
try {
expect(packet.payload).to.equal('consumer 1 unix payload:added:local')
expect(packet.payload).to.equal('consumer 1 unix payload:local')
resolve()
}
catch(error) {
@ -97,8 +103,6 @@ describe('Connects and Processes a payload via Unix Socket using JSON packet wit
return packet
}
let [err] = await btc(consumer2.connect)()
if (err) reject(err)
let packet1 = {payload:'consumer 1 unix payload'}
let packet2 = {payload:'consumer2 unix payload'}
for (var i = 0; i < 11; i++) {

View file

@ -1,105 +0,0 @@
import { spawn } from 'child_process'
import chai from 'chai'
import chaiAsPromised from 'chai-as-promised'
import btc from 'better-try-catch'
chai.use(chaiAsPromised)
const expect = chai.expect
import { Consumer } from '../src'
const USOCKET = __dirname + '/sockets/test.sock'
const SOCKET_FILE = 'usocket-packet-alt-context'
const COUNT = 10000
let consumer = new Consumer(USOCKET, {name:'unix-consumer'})
let consumer2 = new Consumer(USOCKET, {name:'unix-consumer2'})
const delay = time => new Promise(res=>setTimeout(()=>res(),time))
let socket = {}
describe('Connects and Processes a payload via Unix Socket using JSON packet with local and alt context', function(){
before(async function(){
socket = spawn('node',['-r', '@std/esm', './test/sockets/'+SOCKET_FILE])
socket.stdout.on('data', function(buf) {
console.log('[Socket]', String(buf))
})
await delay(500) // wait for sockets to get going
})
after(async function(){
socket.kill()
})
it('Tests JSON packet procssing, 10 packets using external context at socket', async function () {
consumer.packet.times = 0
return new Promise(async function (resolve, reject) {
setTimeout(() =>{ reject('10 packets not received in time')},1900)
consumer.packet._process = function (packet) {
this.times++
if (this.times!==11) return
try {
expect(packet.payload).to.equal('unix payload:local:alt')
resolve()
}
catch(error) {
reject(error)
}
}
let [err] = await btc(consumer.connect)()
if (err) reject(err)
let packet = {payload:'unix payload'}
for (var i = 0; i < 11; i++) {
consumer.send(packet)
}
}) //end promise
}) // end unix socket test
it(`unix socket with two consumers alternating packets, ${COUNT} packets each using external context at socket`, async function () {
consumer.packet.times = 0
return new Promise(async function (resolve, reject) {
setTimeout(() =>{ reject('10 packets not received in time')},1900)
consumer.packet._process = function (packet) {
this.times++
// console.log(this.times,packet.payload)
if (this.times!==COUNT) return
try {
expect(packet.payload).to.equal('consumer 1 unix payload:local:alt')
resolve()
}
catch(error) {
reject(error)
}
}
let [err] = await btc(consumer2.connect)()
if (err) reject(err)
let packet1 = {payload:'consumer 1 unix payload'}
let packet2 = {payload:'consumer2 unix payload'}
for (var i = 0; i < COUNT; i++) {
consumer.send(packet1)
consumer2.send(packet2)
}
}) //end promise
}) // end unix socket test
}) // end describe