switched to _header property for packet id

added sender and responder info to header
_header can now be used for anything specifically socket related
Fixed bug where socket must create a copy of processed packet to avoid issue when coder returns the original packet just altered from the packet processor
This commit is contained in:
David Kebler 2018-02-13 13:51:58 -08:00
parent 28768c8007
commit 450a85d8b9
6 changed files with 35 additions and 28 deletions

View file

@ -11,7 +11,7 @@ const process = function (packet) {
console.log(`Packet from ${packet.name} Processed by Socket: ${packet.status}`)
}
// client1.registerPacketProcessor(process)
client1.registerPacketProcessor(process)
//
// client2._packetProcess = process

View file

@ -9,8 +9,7 @@ import { Socket } from '../src'
}
async _packetProcess(packet) {
console.log('packet being processed')
console.dir(packet)
console.log('packet being processed at socket')
if (packet.cmd) return await this[packet.cmd](packet.data,packet.name)
return {error: 'no command in packet', packet: packet }
}
@ -18,7 +17,7 @@ import { Socket } from '../src'
async doit(data,name) {
return new Promise(resolve => {
let res = {}
console.log('data:', data)
console.log('data sent to doit = ', data)
res.status ='success'
res.name = name
res.cmd = 'reply'

View file

@ -6,7 +6,7 @@
"scripts": {
"testw": "mocha -r @std/esm test/*.test.mjs --watch --recurse --watch-extensions mjs",
"test": "mocha -r @std/esm test/*.test.mjs",
"testlog": "mocha -r @std/esm test/*.test.mjs | ./node_modules/.bin/bunyan",
"testlog": "DEBUG=true mocha -r @std/esm test/*.test.mjs",
"testci": "istanbul cover ./node_modules/.bin/_mocha --report lcovonly -- -R spec --recursive && codecov || true",
"s": "SOCKETS_DIR=/opt/sockets DEBUG=true node -r @std/esm examples/server",
"devs": "SOCKETS_DIR=/opt/sockets DEBUG=true ./node_modules/.bin/nodemon -r @std/esm -e mjs examples/server",

View file

@ -88,23 +88,29 @@ export default class Consumer extends Socket {
}
async send(packet) {
async send(ipacket) {
return new Promise( async (resolve) => {
// need this for when multiple sends for different consumers use same packet instance
let packet = Object.assign({},ipacket)
setTimeout(() => {resolve({error:'no response from socket in 10sec'})},10000)
packet._id = Math.random().toString().slice(2)
// console.log('sending to socket', packet.id)
packet._header =
{ id:Math.random().toString().slice(2), // need this for when multiple sends for different consumers use same packet instanceack
sender:{ name:this.name, instanceID:this.id },
path: this.opts.path,
port: this.opts.port,
host: this.opts.host
}
let [err, res] = await btc(this.stream.serialize)(packet)
if (err) resolve({error:'unable to serialize packet for sending', packet:packet})
log.info(await this.__write(res))
this.once(packet._id,async function(reply){
// console.log('listerner set', packet._header.id, packet._header.sender.instanceID)
this.once(packet._header.id,async function(reply){
// console.log('reply emitted',reply)
this.removeAllListeners(reply.id)
delete reply._id
let res = await this._packetProcess(reply)
if (!res) { // if process was not promise returning like just logged to console
res = reply
log.warn('consumer function was not promise returning - resolving unprocessed')
}
}
resolve(res)
}) //end listener
})
@ -122,7 +128,7 @@ export default class Consumer extends Socket {
async __write(packet) {
// timeout already set if sockect can't be drained in 10 secs
return new Promise(resolve => {
const cb = () => resolve('packet written to socket stream')
const cb = () => resolve('=====>packet written to consumer side socket stream ')
if (!super.write(packet)) {
this.once('drain',cb )
} else {
@ -139,13 +145,13 @@ export default class Consumer extends Socket {
this.on('data', this.stream.onData)
this.stream.on('message', messageProcess.bind(this))
async function messageProcess (packet) {
// console.log('incoming packet from socket',packet)
if (packet._handshake) {
this._ready = true
return }
// TODO send back ack with consumer ID and authorization and wait
// when authorized drop through here to emit
// console.log('incoming packet',packet)
this.emit(packet._id, packet)
this.emit(packet._header.id, packet)
}
}
@ -157,7 +163,6 @@ export default class Consumer extends Socket {
} // end class
// HELP FUNCTIONS
// wait until a passed ready function returns true
function isReady(ready, wait=30, timeout=1000) {

View file

@ -67,7 +67,6 @@ export default class Socket extends Server {
}) // end creeate promise
} // end create
registerPacketProcessor (func) {
this._packetProcess = func
}
@ -85,12 +84,20 @@ export default class Socket extends Server {
// TODO need to start error listener for stream so errors can be processed
stream.on('message', messageProcess.bind(this))
async function messageProcess (packet) {
// console.log('before processing',packet)
let processed = await this._packetProcess(packet)
if (!processed) processed = { error: 'socket packet command function likely did not return a promise', packet:packet}
processed._id = packet._id //make sure return packet has its ID
// console.log(' incoming packet on socket side',packet)
let processed = Object.assign({},await this._packetProcess(packet)) // must make copy in case processed returned passed packet
if (Object.keys(processed).length === 0) processed = { error: 'socket packet command function likely did not return a promise', packet:packet}
processed._header = packet._header //make sure return packet has header in case it was removed in processing
delete packet._header // remove before adding to response header as request
processed._header.request = packet
processed._header.responder = {name:this.name,instanceID:this.id}
processed._header.socket = this._connectionKey
if (!processed.cmd) processed.cmd = 'reply' // by default return command is 'reply'
// console.log('after processing',processed)
write(await stream.serialize(processed))
let [err, ser] = await btc(stream.serialize)(processed)
if (err) ser = await stream.serialize({ error: 'was not able to serialze the processed packet', err:err, _header:{id:processed._header.id}})
// console.log('serialized ready for write',ser)
log.info(await write(ser))
}
}) // end connecttion consumer
log.info({opts: this.opts},'socket created')
@ -107,12 +114,10 @@ export default class Socket extends Server {
}
// default packet process, just a simple echo
_packetProcess (packet) {
async _packetProcess (packet) {
return new Promise(resolve => {
packet.response='this packet was echoed by default'
resolve(packet)
})
}
// must have a consumer socket bound to use
@ -125,7 +130,6 @@ export default class Socket extends Server {
} else {
process.nextTick(cb)
}
})
}

View file

@ -30,7 +30,7 @@ describe('Connects and Processes a payload via Unix Socket using JSON packet wit
socket.kill()
})
const TIMES = 500
const TIMES = 5000
it(`Tests unix socket with default echo JSON packet procssing with ${TIMES} packets sent`, async function () {
@ -40,7 +40,6 @@ describe('Connects and Processes a payload via Unix Socket using JSON packet wit
process.kill(process.pid, 'SIGTERM')
}
consumer.registerPacketProcessor(async function (packet) {
return new Promise((resolve) => {
packet.times += 1