diff --git a/examples/client.mjs b/examples/client.mjs index a612566..cca864b 100644 --- a/examples/client.mjs +++ b/examples/client.mjs @@ -1,9 +1,7 @@ import Consumer from '../src/consumer' -const USOCKET = __dirname + '/sample.sock' - -const client1= new Consumer({path:USOCKET,name:'example-consumer1' }) -const client2 = new Consumer({path:USOCKET,name:'example-consumer2'}) +const client1= new Consumer({np:true,name:'example-consumer1' }) +const client2 = new Consumer({np:true,name:'example-consumer2'}) let packet1 = {name: 'client1', cmd:'doit', data:'data sent by client1'} let packet2 = {name: 'client2', cmd:'doit', data:'data sent by client2'} @@ -13,18 +11,23 @@ const process = function (packet) { console.log(`Packet from ${packet.name} Processed by Socket: ${packet.status}`) } -client1.registerPacketProcessor(process) - -client2._packetProcess = process +// client1.registerPacketProcessor(process) +// +// client2._packetProcess = process ; (async () => { - await Promise.all([client1.connect(),client2.connect()]) - client1.send(packet1) - client2.send(packet2) - client1.end() - client2.end() + + + // await Promise.all([client1.connect(),client2.connect()]) + await client1.connect() + console.log('=========\n',await client1.send(packet1)) + + // client1.send(packet1) + // client2.send(packet2) + // client1.end() + // client2.end() })().catch(err => { console.error('FATAL: UNABLE TO START SYSTEM!\n',err) diff --git a/examples/server.mjs b/examples/server.mjs index 3e2907a..c0968e4 100644 --- a/examples/server.mjs +++ b/examples/server.mjs @@ -1,8 +1,6 @@ import { Socket } from '../src' -const USOCKET = __dirname + '/sample.sock' - -; + ; (async () => { class Test extends Socket { @@ -18,18 +16,21 @@ const USOCKET = __dirname + '/sample.sock' } async doit(data,name) { - let res = {} - console.log('data:', data) - res.status ='success' - res.name = name - res.cmd = 'reply' - res.data = 'this might be response data from another process' - return(res) + return new Promise(resolve => { + let res = {} + console.log('data:', data) + res.status ='success' + res.name = name + res.cmd = 'reply' + res.data = 'this would be response data from socket doit function' + resolve(res) + }) + } } - let test = new Test({path:USOCKET}) + let test = new Test({np:true}) await test.create() })().catch(err => { diff --git a/package.json b/package.json index d781a77..1a22134 100644 --- a/package.json +++ b/package.json @@ -1,17 +1,18 @@ { "name": "@uci/socket", "version": "0.1.0", - "description": "Bare bones intra Host Unix Socket for basic IPC on same machine", + "description": "JSON packet intra(named)/inter(TCP) host communication over socket", "main": "src", "scripts": { "testw": "mocha -r @std/esm test/*.test.mjs --watch --recurse --watch-extensions mjs", "test": "mocha -r @std/esm test/*.test.mjs", "testlog": "mocha -r @std/esm test/*.test.mjs | ./node_modules/.bin/bunyan", "testci": "istanbul cover ./node_modules/.bin/_mocha --report lcovonly -- -R spec --recursive && codecov || true", - "s": "node -r @std/esm examples/server | ./node_modules/.bin/bunyan", - "devs": "./node_modules/.bin/nodemon -r @std/esm -e mjs examples/server", - "c": "node -r @std/esm examples/client | ./node_modules/.bin/bunyan -o short", - "c2": "node -r @std/esm examples/client2 | ./node_modules/.bin/bunyan -o short" + "s": "SOCKETS_DIR=/opt/sockets DEBUG=true node -r @std/esm examples/server", + "devs": "SOCKETS_DIR=/opt/sockets DEBUG=true ./node_modules/.bin/nodemon -r @std/esm -e mjs examples/server", + "c": "SOCKETS_DIR=/opt/sockets node -r @std/esm examples/client", + "devc": "SOCKETS_DIR=/opt/sockets DEBUG=true node -r @std/esm examples/client", + "c2": "node -r @std/esm examples/client2" }, "author": "David Kebler", "license": "MIT", @@ -21,9 +22,14 @@ }, "keywords": [ "node.js", - "i2c", - "rpi", - "raspberrypi" + "socket", + "net", + "JSON", + "packet", + "serialize", + "named pipe", + "unix socket", + "TCP" ], "bugs": { "url": "https://github.com/uCOMmandIt/message/issues" @@ -36,11 +42,11 @@ "chai-as-promised": "^7.1.1", "codecov": "^3.0.0", "istanbul": "^0.4.5", - "mocha": "^4.0.1" + "mocha": "^4.0.1", + "nodemon": "^1.14.12" }, "dependencies": { "better-try-catch": "^0.6.2", - "bunyan": "^1.8.12", "death": "^1.1.0" } } diff --git a/src/consumer.mjs b/src/consumer.mjs index 9d6338c..4a2b4d9 100644 --- a/src/consumer.mjs +++ b/src/consumer.mjs @@ -1,6 +1,7 @@ import { Socket } from 'net' import btc from 'better-try-catch' import JsonStream from './json-stream' +import {promisify} from 'util' import logger from '../../uci-logger/src/logger' let log = {} @@ -34,6 +35,7 @@ export default class Consumer extends Socket { // bind to class for other class functions this.connect = this.connect.bind(this) this.__ready = this.__ready.bind(this) + // this._write = this._write.bind(this) } async connect () { @@ -85,15 +87,30 @@ export default class Consumer extends Socket { } + async send(packet) { - await this.write(await this.stream.serialize(packet)) - // TODO handle possible error - // TODO await response if required by setting id to packet - // then set a flag (and promise) that is resovled in the listener + return new Promise( async (resolve) => { + setTimeout(() => {resolve({error:'no response from socket in 10sec'})},10000) + packet._id = Math.random().toString().slice(2) + // console.log('sending to socket', packet.id) + let [err, res] = await btc(this.stream.serialize)(packet) + if (err) resolve({error:'unable to serialize packet for sending', packet:packet}) + log.info(await this.__write(res)) + this.once(packet._id,async function(reply){ + // console.log('reply emitted',reply) + this.removeAllListeners(reply.id) + delete reply._id + let res = await this._packetProcess(reply) + if (!res) { // if process was not promise returning like just logged to console + res = reply + log.warn('consumer function was not promise returning - resolving unprocessed') + } + resolve(res) + }) //end listener + }) } // TODO register alt stream processor (emit 'message' with JSON, serialize function, onData method for raw socket chucks) - // TODO register authenciation function (set up default) registerPacketProcessor (func) { @@ -102,6 +119,19 @@ export default class Consumer extends Socket { // PRIVATE METHODS + async __write(packet) { + // timeout already set if sockect can't be drained in 10 secs + return new Promise(resolve => { + const cb = () => resolve('packet written to socket stream') + if (!super.write(packet)) { + this.once('drain',cb ) + } else { + process.nextTick(cb) + } + + }) + } + __ready() {return this._ready} async _listen () { @@ -112,17 +142,19 @@ export default class Consumer extends Socket { if (packet._handshake) { this._ready = true return } - await this._packetProcess(packet) + // TODO send back ack with consumer ID and authorization and wait + // when authorized drop through here to emit + // console.log('incoming packet',packet) + this.emit(packet._id, packet) } } -// default packet process just a simple console logger + // default packet process just a simple console logger. ignores any cmd: prop _packetProcess (packet) { - console.log('default consumer processor -- packet from socket') + console.log('default consumer processor -- log packet from socket to console') console.dir(packet) } - } // end class diff --git a/src/json-stream.mjs b/src/json-stream.mjs index 6cf06e6..8357da8 100644 --- a/src/json-stream.mjs +++ b/src/json-stream.mjs @@ -13,10 +13,12 @@ export default class JsonStream extends EventEmitter{ this._buffer = '' this._delimeter = opts.delimiter || '#' this.onData = this.onData.bind(this) + this.serialize = this.serialize.bind(this) } onData (data) { + // console.log('a chunk arrived', data) data = decoder.write(data) try { this._handleData(data) @@ -32,8 +34,8 @@ export default class JsonStream extends EventEmitter{ let [err2,length] = btc(Buffer.byteLength)(messageData, 'utf8') if (err2) reject(err2) let data = length + this._delimeter + messageData - // console.log('serialized',data) - return resolve(data) + // console.log('serialized',data) + resolve(data) }) } @@ -41,7 +43,7 @@ export default class JsonStream extends EventEmitter{ this._buffer += data if (this._contentLength == null) { var i = this._buffer.indexOf(this._delimeter) - //Check if the buffer has a this._opts.delimeter or "#", if not, the end of the buffer string might be in the middle of a content length string + //Check if the buffer has a this._opts.delimeter or "#", if not, the end of the buffer string might be in the middle of a content length string if (i !== -1) { var rawContentLength = this._buffer.substring(0, i) this._contentLength = parseInt(rawContentLength) diff --git a/src/socket.mjs b/src/socket.mjs index 5500031..06a1e0d 100644 --- a/src/socket.mjs +++ b/src/socket.mjs @@ -36,7 +36,7 @@ export default class Socket extends Server { async create () { return new Promise( async (resolve,reject) => { - // couple ways to kill socket process when needed + // couple ways to kill socket process when needed _ON_DEATH( async () => { log.info('\nhe\'s dead jim') await this._destroy() @@ -46,7 +46,7 @@ export default class Socket extends Server { process.kill(process.pid, 'SIGUSR2') }) - this.on('error', async (err) => { + this.once('error', async (err) => { // recover from socket file that was not removed if (err.code === 'EADDRINUSE') { if (this.opts.np) { // if TCP socket should already be dead @@ -75,20 +75,22 @@ export default class Socket extends Server { async _listen (opts) { super.listen(opts, async (err, res) => { if (err) return err - // this gets called for each client connection and is unique to each + // this gets called for each client connection and is unique to each this.on('connection', async (socket) => { + let write = this._write.bind(socket) const stream = new JSONStream() log.info('new consumer connecting sending handshake') - socket.write(await stream.serialize({'_handshake':true})) + write(await stream.serialize({'_handshake':true})) socket.on('data', stream.onData) // TODO need to start error listener for stream so errors can be processed stream.on('message', messageProcess.bind(this)) async function messageProcess (packet) { // console.log('before processing',packet) let processed = await this._packetProcess(packet) - if (!processed) processed = { error: 'packet command function likely did not return a promise', packet:packet} + if (!processed) processed = { error: 'socket packet command function likely did not return a promise', packet:packet} + processed._id = packet._id //make sure return packet has its ID // console.log('after processing',processed) - socket.write(await stream.serialize(processed)) + write(await stream.serialize(processed)) } }) // end connecttion consumer log.info({opts: this.opts},'socket created') @@ -106,8 +108,25 @@ export default class Socket extends Server { // default packet process, just a simple echo _packetProcess (packet) { - packet.res='echoed' - return packet + return new Promise(resolve => { + packet.response='this packet was echoed by default' + resolve(packet) + }) + + } + + // must have a consumer socket bound to use + async _write(packet) { + // timeout already set if sockect can't be drained in 10 secs + return new Promise(resolve => { + const cb = () => resolve('packet written to socket stream') + if (!this.write(packet)) { + this.once('drain',cb ) + } else { + process.nextTick(cb) + } + + }) } } // end class diff --git a/test/sockets/usocket-default-overwrite.mjs b/test/sockets/usocket.mjs similarity index 69% rename from test/sockets/usocket-default-overwrite.mjs rename to test/sockets/usocket.mjs index 47bf915..167aed3 100644 --- a/test/sockets/usocket-default-overwrite.mjs +++ b/test/sockets/usocket.mjs @@ -7,8 +7,10 @@ let socket = new Socket({path:USOCKET,name:'default-unix-socket'}) socket.test = 'at socket => ' socket.registerPacketProcessor(async function (packet) { - packet.payload = 'overwrite default processor from instance '+ this.test + packet.payload - return packet + return new Promise((resolve) => { + packet.payload = 'overwrite default processor from instance '+ this.test + packet.payload + resolve(packet) + }) }) ; diff --git a/test/tcp.test.mjs b/test/tcp.test.mjs index f9f8ee6..c810510 100644 --- a/test/tcp.test.mjs +++ b/test/tcp.test.mjs @@ -33,29 +33,21 @@ describe('Connects and Processes a payload in a JSON packet via TCP Socket', fun it('with default host and port', async function () { let tcpconsumer_default = new Consumer({name:'tcpconsumer', log:false}) - return new Promise(async function (resolve, reject) { + let [err] = await btc(tcpconsumer_default.connect)() + if (err) { + console.log('unable to connect to socket to start test', tcpconsumer_default.port) + process.kill(process.pid, 'SIGTERM') + } + tcpconsumer_default.registerPacketProcessor(async function (packet) { + return new Promise((resolve) => { + packet.payload = packet.payload +':local' + resolve(packet)}) + }) - let [err] = await btc(tcpconsumer_default.connect)() - if (err) { - console.log('unable to connect to socket to start test', tcpconsumer_default.port) - process.kill(process.pid, 'SIGTERM') - } - - tcpconsumer_default._packetProcess = function (packet) { - try { - expect(packet.payload).to.equal('8080:tcp payload') - resolve() - } - catch(error) { - reject(error) - } - } - - let packet = {payload:'tcp payload'} - tcpconsumer_default.send(packet) - - }) //end promise + let packet = {payload:'tcp payload'} + let res = await tcpconsumer_default.send(packet) + expect(res.payload).to.equal('8080:tcp payload:local') }) // end tcp socket test @@ -63,29 +55,22 @@ describe('Connects and Processes a payload in a JSON packet via TCP Socket', fun let tcpconsumer_9080 = new Consumer({port:9080, name:'tcp-consumer-9080'}) - return new Promise(async function (resolve, reject) { + let [err] = await btc(tcpconsumer_9080.connect)() + if (err) { + console.log('unable to connect to socket to start test', tcpconsumer_9080.port) + process.kill(process.pid, 'SIGTERM') + } - let [err] = await btc(tcpconsumer_9080.connect)() - if (err) { - console.log('unable to connect to socket to start test', tcpconsumer_9080.port) - process.kill(process.pid, 'SIGTERM') - } + tcpconsumer_9080.registerPacketProcessor(async function (packet) { + return new Promise((resolve) => { + packet.payload = packet.payload +':local' + resolve(packet)}) + }) - tcpconsumer_9080.registerPacketProcessor(function (packet) { - try { - expect(packet.payload).to.equal('9080:tcp payload') - resolve() - } - catch(error) { - reject(error) - } - }) - - let packet = {payload:'tcp payload'} - tcpconsumer_9080.send(packet) - - }) //end promise + let packet = {payload:'tcp payload'} + let res = await tcpconsumer_9080.send(packet) + expect(res.payload).to.equal('9080:tcp payload:local') }) // end tcp socket 2 test diff --git a/test/usocket-default-overwrite.test.mjs b/test/usocket-default-overwrite.test.mjs deleted file mode 100644 index 62d6073..0000000 --- a/test/usocket-default-overwrite.test.mjs +++ /dev/null @@ -1,107 +0,0 @@ -import { spawn } from 'child_process' -import chai from 'chai' -import chaiAsPromised from 'chai-as-promised' -import btc from 'better-try-catch' -chai.use(chaiAsPromised) -const expect = chai.expect - -import { Consumer } from '../src' - -const USOCKET = __dirname + '/sockets/test.sock' -const SOCKET_FILE = 'usocket-default-overwrite' - -let consumer = new Consumer({path:USOCKET,name:'unix-consumer'}) -let consumer2 = new Consumer({path:USOCKET, name:'unix-consumer2'}) - -// const delay = time => new Promise(res=>setTimeout(()=>res(),time)) - -let socket = {} - -describe('Connects and Processes a payload via Unix Socket using JSON packet with alt default processor', function(){ - - before(async function(){ - socket = spawn('node',['-r', '@std/esm', './test/sockets/'+SOCKET_FILE]) - socket.stdout.on('data', function(buf) { - console.log('[Socket]', String(buf)) - }) - }) - - after(async function(){ - socket.kill() - }) - - it('Tests JSON packet procssing, 10 packets', async function () { - - consumer.times = 0 - - return new Promise(async function (resolve, reject) { - - setTimeout(() =>{ reject('10 packets not received in time')},1900) - - consumer._packetProcess = function (packet) { - this.times++ - if (this.times!==11) return - - try { - expect(packet.payload).to.equal('overwrite default processor from instance at socket => unix payload') - resolve() - } - catch(error) { - reject(error) - } - } - - let [err] = await btc(consumer.connect)() - if (err) reject(err) - let packet = {payload:'unix payload'} - for (var i = 0; i < 11; i++) { - consumer.send(packet) - } - - }) //end promise - - }) // end unix socket test - - - it('unix socket with two consumers alternating packets, 10 packets each', async function () { - - consumer.times = 0 - - return new Promise(async function (resolve, reject) { - - setTimeout(() =>{ reject('10 packets not received in time')},1900) - - consumer._packetProcess = function (packet) { - this.times++ - // console.log(this.times,packet.payload) - if (this.times!==11) return - - try { - expect(packet.payload).to.equal('overwrite default processor from instance at socket => consumer 1 unix payload') - resolve() - } - catch(error) { - reject(error) - } - } - - consumer2._packetProcess = function (packet) { - return packet - } - - let [err] = await btc(consumer2.connect)() - if (err) reject(err) - let packet1 = {payload:'consumer 1 unix payload'} - let packet2 = {payload:'consumer2 unix payload'} - for (var i = 0; i < 11; i++) { - consumer.send(packet1) - consumer2.send(packet2) - } - - }) //end promise - - }) // end unix socket test - - - -}) // end describe diff --git a/test/usocket-default.test.mjs b/test/usocket-default.test.mjs index cda83b1..df71584 100644 --- a/test/usocket-default.test.mjs +++ b/test/usocket-default.test.mjs @@ -30,47 +30,36 @@ describe('Connects and Processes a payload via Unix Socket using JSON packet wit socket.kill() }) - it('Tests unix socket with default echo JSON packet procssing, 10 packets with conect via connect', async function () { + const TIMES = 500 - consumer.times = 0 + it(`Tests unix socket with default echo JSON packet procssing with ${TIMES} packets sent`, async function () { - return new Promise(async function (resolve, reject) { + let [err] = await btc(consumer.connect)() + if (err) { + console.log('unable to connect to socket to start test', consumer.path) + process.kill(process.pid, 'SIGTERM') + } - let [err] = await btc(consumer.connect)() - if (err) { - console.log('unable to connect to socket to start test', consumer.path) - process.kill(process.pid, 'SIGTERM') - } - setTimeout(() =>{ reject('10 packets not received in time')},1900) + consumer.registerPacketProcessor(async function (packet) { + return new Promise((resolve) => { + packet.times += 1 + if (packet.times === TIMES) packet.payload = 'local1:'+packet.payload + resolve(packet)}) + }) - consumer._packetProcess = function (packet) { - this.times++ - if (this.times!==11) return - try { - expect(packet.payload).to.equal('unix payload') - resolve() - } - catch(error) { - reject(error) - } - } - - for (var i = 0; i < 11; i++) { - let packet = {payload:'unix payload'} - consumer.send(packet) - } - - }) //end promise + let packet = {payload:'payload', times:0} + for (let i = 1; i <= TIMES; i++) { + packet = await consumer.send(packet) + } + expect(packet.payload+':'+packet.times).to.equal('local1:payload:'+TIMES) }) // end unix socket test - it('unix socket with two consumers alternating packets, 10 packets each with local and added context', async function () { + it(`unix socket with two consumers alternating packets, ${TIMES} packets each and local processing`, async function () { - consumer.times = 0 - consumer.test = ':local' let [err] = await btc(consumer2.connect)() if (err) { @@ -78,41 +67,21 @@ describe('Connects and Processes a payload via Unix Socket using JSON packet wit process.kill(process.pid, 'SIGTERM') } - return new Promise(async function (resolve, reject) { + consumer2.registerPacketProcessor(async function (packet) { + return new Promise((resolve) => { + packet.times += 1 + if (packet.times === TIMES) packet.payload = 'local2:'+packet.payload + resolve(packet)}) + }) - setTimeout(() =>{ reject('10 packets not received in time')},1900) - - consumer.registerPacketProcessor(function (packet) { - this.times++ - // console.log(this.times,packet.payload) - if (this.times!==11) return - - packet.payload = packet.payload + this.test - - try { - expect(packet.payload).to.equal('consumer 1 unix payload:local') - resolve() - } - catch(error) { - reject(error) - } - }) - - consumer2._packetProcess = function (packet) { - return packet - } - - let packet1 = {payload:'consumer 1 unix payload'} - let packet2 = {payload:'consumer2 unix payload'} - for (var i = 0; i < 11; i++) { - consumer.send(packet1) - consumer2.send(packet2) - } - - }) //end promise + let packet = {consumer:1, payload:'payload', times:-1} + for (let i = 0; i < TIMES; i++) { + packet = await consumer.send(packet) + if (packet.times === TIMES) packet.times = 1 + packet = await consumer2.send(packet) + } + expect(packet.payload+':'+packet.times).to.equal('local2:local1:payload:'+TIMES) }) // end unix socket test - - }) // end describe diff --git a/test/usocket.test.mjs b/test/usocket.test.mjs new file mode 100644 index 0000000..61d0ef9 --- /dev/null +++ b/test/usocket.test.mjs @@ -0,0 +1,52 @@ +import { spawn } from 'child_process' +import chai from 'chai' +import chaiAsPromised from 'chai-as-promised' +import btc from 'better-try-catch' +chai.use(chaiAsPromised) +const expect = chai.expect + +import { Consumer } from '../src' + +const USOCKET = __dirname + '/sockets/test.sock' +const SOCKET_FILE = 'usocket' + +let consumer = new Consumer({path:USOCKET,name:'unix-consumer'}) + +// const delay = time => new Promise(res=>setTimeout(()=>res(),time)) + +let socket = {} + +describe('Connects and Processes a payload via Unix Socket using JSON packet with alt processor', function(){ + + before(async function(){ + socket = spawn('node',['-r', '@std/esm', './test/sockets/'+SOCKET_FILE]) + socket.stdout.on('data', function(buf) { + console.log('[Socket]', String(buf)) + }) + }) + + after(async function(){ + socket.kill() + }) + + it('Tests alternate JSON packet procssing at socket and consumer', async function () { + + let [err] = await btc(consumer.connect)() + if (err) { + console.log('unable to connect to socket to start test', consumer.path) + process.kill(process.pid, 'SIGTERM') + } + + consumer.registerPacketProcessor(async function (packet) { + return new Promise((resolve) => { + packet.payload = 'local:'+packet.payload + resolve(packet)}) + }) + let packet = { payload:'payload'} + let res = await consumer.send(packet) + expect(res.payload).to.equal('local:overwrite default processor from instance at socket => payload') + + + }) // end unix socket test + +}) // end describe