diff --git a/examples/client.mjs b/examples/client.mjs index cca864b..4dfe215 100644 --- a/examples/client.mjs +++ b/examples/client.mjs @@ -11,7 +11,7 @@ const process = function (packet) { console.log(`Packet from ${packet.name} Processed by Socket: ${packet.status}`) } -// client1.registerPacketProcessor(process) +client1.registerPacketProcessor(process) // // client2._packetProcess = process diff --git a/examples/server.mjs b/examples/server.mjs index c0968e4..4338131 100644 --- a/examples/server.mjs +++ b/examples/server.mjs @@ -9,8 +9,7 @@ import { Socket } from '../src' } async _packetProcess(packet) { - console.log('packet being processed') - console.dir(packet) + console.log('packet being processed at socket') if (packet.cmd) return await this[packet.cmd](packet.data,packet.name) return {error: 'no command in packet', packet: packet } } @@ -18,7 +17,7 @@ import { Socket } from '../src' async doit(data,name) { return new Promise(resolve => { let res = {} - console.log('data:', data) + console.log('data sent to doit = ', data) res.status ='success' res.name = name res.cmd = 'reply' diff --git a/package.json b/package.json index 1a22134..c49fde2 100644 --- a/package.json +++ b/package.json @@ -6,7 +6,7 @@ "scripts": { "testw": "mocha -r @std/esm test/*.test.mjs --watch --recurse --watch-extensions mjs", "test": "mocha -r @std/esm test/*.test.mjs", - "testlog": "mocha -r @std/esm test/*.test.mjs | ./node_modules/.bin/bunyan", + "testlog": "DEBUG=true mocha -r @std/esm test/*.test.mjs", "testci": "istanbul cover ./node_modules/.bin/_mocha --report lcovonly -- -R spec --recursive && codecov || true", "s": "SOCKETS_DIR=/opt/sockets DEBUG=true node -r @std/esm examples/server", "devs": "SOCKETS_DIR=/opt/sockets DEBUG=true ./node_modules/.bin/nodemon -r @std/esm -e mjs examples/server", diff --git a/src/consumer.mjs b/src/consumer.mjs index 4a2b4d9..5265d53 100644 --- a/src/consumer.mjs +++ b/src/consumer.mjs @@ -88,23 +88,29 @@ export default class Consumer extends Socket { } - async send(packet) { + async send(ipacket) { return new Promise( async (resolve) => { + // need this for when multiple sends for different consumers use same packet instance + let packet = Object.assign({},ipacket) setTimeout(() => {resolve({error:'no response from socket in 10sec'})},10000) - packet._id = Math.random().toString().slice(2) - // console.log('sending to socket', packet.id) + packet._header = + { id:Math.random().toString().slice(2), // need this for when multiple sends for different consumers use same packet instanceack + sender:{ name:this.name, instanceID:this.id }, + path: this.opts.path, + port: this.opts.port, + host: this.opts.host + } let [err, res] = await btc(this.stream.serialize)(packet) if (err) resolve({error:'unable to serialize packet for sending', packet:packet}) log.info(await this.__write(res)) - this.once(packet._id,async function(reply){ + // console.log('listerner set', packet._header.id, packet._header.sender.instanceID) + this.once(packet._header.id,async function(reply){ // console.log('reply emitted',reply) - this.removeAllListeners(reply.id) - delete reply._id let res = await this._packetProcess(reply) if (!res) { // if process was not promise returning like just logged to console res = reply log.warn('consumer function was not promise returning - resolving unprocessed') - } + } resolve(res) }) //end listener }) @@ -122,7 +128,7 @@ export default class Consumer extends Socket { async __write(packet) { // timeout already set if sockect can't be drained in 10 secs return new Promise(resolve => { - const cb = () => resolve('packet written to socket stream') + const cb = () => resolve('=====>packet written to consumer side socket stream ') if (!super.write(packet)) { this.once('drain',cb ) } else { @@ -139,13 +145,13 @@ export default class Consumer extends Socket { this.on('data', this.stream.onData) this.stream.on('message', messageProcess.bind(this)) async function messageProcess (packet) { + // console.log('incoming packet from socket',packet) if (packet._handshake) { this._ready = true return } // TODO send back ack with consumer ID and authorization and wait // when authorized drop through here to emit - // console.log('incoming packet',packet) - this.emit(packet._id, packet) + this.emit(packet._header.id, packet) } } @@ -157,7 +163,6 @@ export default class Consumer extends Socket { } // end class - // HELP FUNCTIONS // wait until a passed ready function returns true function isReady(ready, wait=30, timeout=1000) { diff --git a/src/socket.mjs b/src/socket.mjs index 06a1e0d..9d8f2e3 100644 --- a/src/socket.mjs +++ b/src/socket.mjs @@ -67,7 +67,6 @@ export default class Socket extends Server { }) // end creeate promise } // end create - registerPacketProcessor (func) { this._packetProcess = func } @@ -85,12 +84,20 @@ export default class Socket extends Server { // TODO need to start error listener for stream so errors can be processed stream.on('message', messageProcess.bind(this)) async function messageProcess (packet) { - // console.log('before processing',packet) - let processed = await this._packetProcess(packet) - if (!processed) processed = { error: 'socket packet command function likely did not return a promise', packet:packet} - processed._id = packet._id //make sure return packet has its ID + // console.log(' incoming packet on socket side',packet) + let processed = Object.assign({},await this._packetProcess(packet)) // must make copy in case processed returned passed packet + if (Object.keys(processed).length === 0) processed = { error: 'socket packet command function likely did not return a promise', packet:packet} + processed._header = packet._header //make sure return packet has header in case it was removed in processing + delete packet._header // remove before adding to response header as request + processed._header.request = packet + processed._header.responder = {name:this.name,instanceID:this.id} + processed._header.socket = this._connectionKey + if (!processed.cmd) processed.cmd = 'reply' // by default return command is 'reply' // console.log('after processing',processed) - write(await stream.serialize(processed)) + let [err, ser] = await btc(stream.serialize)(processed) + if (err) ser = await stream.serialize({ error: 'was not able to serialze the processed packet', err:err, _header:{id:processed._header.id}}) + // console.log('serialized ready for write',ser) + log.info(await write(ser)) } }) // end connecttion consumer log.info({opts: this.opts},'socket created') @@ -107,12 +114,10 @@ export default class Socket extends Server { } // default packet process, just a simple echo - _packetProcess (packet) { + async _packetProcess (packet) { return new Promise(resolve => { - packet.response='this packet was echoed by default' resolve(packet) }) - } // must have a consumer socket bound to use @@ -125,7 +130,6 @@ export default class Socket extends Server { } else { process.nextTick(cb) } - }) } diff --git a/test/usocket-default.test.mjs b/test/usocket-default.test.mjs index df71584..6e44b09 100644 --- a/test/usocket-default.test.mjs +++ b/test/usocket-default.test.mjs @@ -30,7 +30,7 @@ describe('Connects and Processes a payload via Unix Socket using JSON packet wit socket.kill() }) - const TIMES = 500 + const TIMES = 5000 it(`Tests unix socket with default echo JSON packet procssing with ${TIMES} packets sent`, async function () { @@ -40,7 +40,6 @@ describe('Connects and Processes a payload via Unix Socket using JSON packet wit process.kill(process.pid, 'SIGTERM') } - consumer.registerPacketProcessor(async function (packet) { return new Promise((resolve) => { packet.times += 1