added handshaking from socket before finishing consumer connection, fixed async flow control issues, added in mocha test, improved passing of external app for packet processing

tls
David Kebler 2018-01-10 15:03:32 -08:00
parent e5167c3995
commit ae72d3af02
6 changed files with 4204 additions and 85 deletions

View File

@ -1,6 +1,6 @@
import Consumer from '../src/consumer'
const USOCKET = '/opt/sockets/samplecs.sock'
const USOCKET = __dirname + '/sample.sock'
const socket1 = new Consumer(USOCKET)
const socket2 = new Consumer(USOCKET)
@ -8,6 +8,15 @@ const socket2 = new Consumer(USOCKET)
let packet1 = {name: 'socket1', cmd:'doit', data:'data sent by socket1'}
let packet2 = {name: 'socket2', cmd:'doit', data:'data sent by socket2'}
// This is your socket handler waiting on a message to do something
let app = {
processIt: function processPacket (packet) {
console.log('incoming packet from socket to process')
console.dir(packet)
},
ucpp: 'processIt'
}
;
(async () => {
@ -21,10 +30,3 @@ let packet2 = {name: 'socket2', cmd:'doit', data:'data sent by socket2'}
})().catch(err => {
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)
})
// This is your socket handler waiting on a message to do something
function app (packet) {
console.log('incoming packet from socket to process')
console.dir(packet)
}

View File

@ -1,12 +1,15 @@
import { Socket } from '../src'
const USOCKET = '/opt/sockets/samplecs.sock'
const USOCKET = __dirname + '/sample.sock'
console.log(USOCKET)
;
(async () => {
class Test {
constructor() {
constructor(opts={}) {
this.socket = new Socket(USOCKET)
this.uspp = opts.uspp || 'processPacket'
}
async processPacket(packet) {

4166
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@ -4,14 +4,12 @@
"description": "Bare bones intra Host Unix Socket for basic IPC on same machine",
"main": "src",
"scripts": {
"test": "node -r @std/esm test",
"testw": "./node_modules/.bin/nodemon -r @std/esm test",
"testw": "mocha -r @std/esm test/*.test.mjs --watch --recurse --watch-extensions mjs",
"test": "mocha -r @std/esm test/*.test.mjs",
"testci": "istanbul cover ./node_modules/.bin/_mocha --report lcovonly -- -R spec --recursive && codecov || true",
"s": "node -r @std/esm examples/server",
"devs": "./node_modules/.bin/nodemon -r @std/esm -e mjs examples/server",
"c": "node -r @std/esm examples/client",
"cs": "node -r @std/esm client-server",
"ms": "node -r @std/esm mqtt-server",
"mc": "node -r @std/esm mqtt-client"
"c": "node -r @std/esm examples/client"
},
"author": "David Kebler",
"license": "MIT",
@ -32,7 +30,13 @@
"@std/esm": "cjs",
"devDependencies": {
"@std/esm": "^0.18.0",
"nodemon": "^1.14.3"
"nodemon": "^1.14.3",
"chai": "^4.1.2",
"chai-as-promised": "^7.1.1",
"codecov": "^3.0.0",
"eslint": "^3.19.0",
"istanbul": "^0.4.5",
"mocha": "^4.0.1"
},
"dependencies": {
"better-try-catch": "^0.6.2",

View File

@ -6,38 +6,79 @@ export default class Consumer extends Socket {
super()
this.path = path
this.keepAlive = opts.keepAlive ? opts.keepAlive : true
this._ready = false
}
async connect () {
ready() {return this._ready}
console.log('attempting to connect to socket: ', this.path)
async connect (app) {
await this.listen(app)
console.log('consumer: listening')
await super.connect({ path: this.path })
console.log(`connected to ${this.path}`)
console.log(`consumer: connected to ${this.path}`)
this.setKeepAlive(this.keepAlive)
this.on('error', (error) => {
'client socket error \n ', error.code
})
return await isReady(this.ready.bind(this))
}
async send(packet) {
let [err, strbuf] = btc(JSON.stringify)(packet)
if (!err) {
this.write(strbuf)
// await promisify(this.write)(strbuf)
console.log('attempting to send')
// console.log(await this.write(strbuf))
let res = await new Promise((resolve, reject) => { //returning promise
this.write(strbuf, (err) => {
if (err) reject(err)
else resolve('complete')
})
})
console.log('send is', res)
}
else { console.log(`bad packet JSON syntax \n ${packet}`)}
else { console.log(`bad packet JSON syntax \n ${packet} \n${err}`)}
}
async listen (app) {
this.on('data', async (buf) => {
let [err, packet] = btc(JSON.parse)(buf.toString())
if (!err) {
app(packet)
if (packet.ready) {
this._ready = true
return }
// set default packet processing - simple print to console of packet
app.ucpp = app.ucpp || 'processPacket'
if (!app[app.ucpp]) {
app.ucpp = 'processPacket'
app.processPacket = async (packet) => {
console.log('incoming packet from socket')
console.dir(packet)
return packet }
}
app[app.ucpp](packet) // process the packet
}
else { console.log(`bad packet JSON syntax \n ${buf.toString()}`)}
})
}
} // end class
// wait for handshake from socket
function isReady(ready) {
let time = 0
return new Promise(function (resolve, reject) {
(function waitReady(){
if (time > 3000) return reject('timeout')
if (ready()) return resolve('ready')
console.log('waiting for 30ms')
time += 30
setTimeout(waitReady, 30)
})()
})
}

View File

@ -13,7 +13,10 @@ export default class Socket extends Server {
constructor (path, opts={}) {
super()
this.path = path
this.logger = Logger.createSimpleLogger(opts.logger ? opts.logger: logger)
if (opts.log) {
this.log = opts.log
this.logger = Logger.createSimpleLogger(opts.logger ? opts.logger: logger)
}
} // end constructor
@ -33,23 +36,33 @@ export default class Socket extends Server {
})
//
this.on('listening', async () => {
this.once('listening', async () => {
console.log(`socket created at ${this.path}`)
// this gets called for each client connection and is unique to each
this.on('connection', (socket) => {
console.log('new consumer connected')
this.once('connection', (socket) => {
console.log('server: new consumer connected')
socket.on('data', async (buf) => {
let [err, packet] = btc(JSON.parse)(buf.toString())
if (!err) {
this.logger.info(`data packet received to socket \n ${packet}`)
socket.write(JSON.stringify(await app.processPacket.bind(app)(packet)))
if (this.log) this.logger.info(`data packet received to socket \n ${packet}`)
// set default packet processing
app.uspp = app.uspp || 'processPacket'
if (!app[app.uspp]) {
app.uspp = 'processPacket'
app.processPacket = async (packet) => {
packet.res='echoed'
return packet }
}
socket.write(JSON.stringify(await app[app.uspp].bind(app)(packet)))
}
else { console.log(`bad packet JSON syntax \n ${buf.toString()}`)}
}) // end incoming data listerner
socket.write('{"ready":true}')
}) // end connected consumer
}) // end socket listening listener