diff --git a/package.json b/package.json index 08caf04..34083a5 100644 --- a/package.json +++ b/package.json @@ -1,12 +1,12 @@ { - "name": "@uci-utils/json-stream", - "version": "0.1.0", - "description": "seriaized json stream class for communication in a pipe", - "main": "src/json-stream.js", + "name": "@uci-utils/obj-buffer", + "version": "0.1.1", + "description": "encode and decode a plain object for communication across a stream", + "main": "src/obj-stream.js", "type": "module", "scripts": { "test": "./node_modules/.bin/mocha --timeout 30000", - "test:dev": "UCI_ENV=dev ./node_modules/.bin/nodemon --exec './node_modules/.bin/mocha --timeout 3000' || exit 0", + "test:dev": "UCI_ENV=dev ./node_modules/.bin/nodemon --exec './node_modules/.bin/mocha --timeout 3000' || exit 1", "test:dev:trace": "UCI_LOG_LEVEL='trace' npm run test:dev", "test:dev:warn": "UCI_LOG_LEVEL='warn' npm run test:dev", "test:log": "UCI_ENV=pro UCI_LOG_PATH=./test/test.log 0 npm run test || exit 0" @@ -32,13 +32,13 @@ "await-to-js": "^3.0.0" }, "devDependencies": { - "@babel/core": "^7.14.0", - "@babel/eslint-parser": "^7.13.14", - "@babel/preset-env": "^7.14.1", + "@babel/core": "^7.14.3", + "@babel/eslint-parser": "^7.14.4", + "@babel/preset-env": "^7.14.4", "chai": "^4.3.4", - "eslint": "^7.26.0", - "eslint-config-standard": "^16.0.2", + "eslint": "^7.27.0", + "eslint-config-standard": "^16.0.3", "mocha": "^8.4.0", "nodemon": "^2.0.7" } -} \ No newline at end of file +} diff --git a/readme.md b/readme.md index f518120..8c2ca19 100644 --- a/readme.md +++ b/readme.md @@ -1,6 +1,6 @@ -# uCOMmandIt JSON Stream Utility +# uCOMmandIt Object Buffer -A calls designed to serialize a js object (JSON) for steaming across a pipe +Encode/Decode a js object via JSON for steaming. This allows one to send/receive packets that are a js object between processes or in any stream. diff --git a/src/json-stream.js b/src/obj-buffer.js similarity index 52% rename from src/json-stream.js rename to src/obj-buffer.js index 5455969..7c8c7f1 100644 --- a/src/json-stream.js +++ b/src/obj-buffer.js @@ -12,20 +12,21 @@ const decoder = new StringDecoder() * JsonStream - Description * @extends EventEmitter */ -class JsonStream extends EventEmitter { +class ObjBuffer extends EventEmitter { // private fields #contentLength = null - #buffer ='' - #delimiter='#' + #strData ='' + #delimiter='_%#_' #state='online' #queue=[] - #stream + #event = 'decoded' constructor (opts = {}) { super() this.#delimiter = opts.delimiter || this.#delimiter - this.onData = this.onData.bind(this) - this.serialize = this.serialize.bind(this) + this.#event = opts.emit || this.#event + this.decode = this.decode.bind(this) + this.resume = this.resume.bind(this) } get state () { return this.#state } @@ -39,7 +40,7 @@ class JsonStream extends EventEmitter { resume () { // emit FIFO for (let index = 0; index < this.#queue.length; index++) { - this.emit('message', this.#queue[index]) + this.emit(this.#event, this.#queue[index]) } this.#queue = [] this.#state = 'online' @@ -47,88 +48,91 @@ class JsonStream extends EventEmitter { online () { this.#state = 'online' } - onData (data) { - data = decoder.write(data) + decode (buf) { + const str = decoder.write(buf) try { - this.#handleData(data) + this.#handleData(str) } catch (err) { // emit an error this.emit('error', err) } } - async serialize (message) { + async encode (obj) { return new Promise((resolve, reject) => { - let messageData; let length - try { messageData = JSON.stringify(message) } catch (err) { reject(err) } - try { length = Buffer.byteLength(messageData, 'utf8') } catch (err) { reject(err) } - const data = length + this.#delimiter + messageData - resolve(data) + let json; let length + try { json = JSON.stringify(obj) } catch (err) { reject(err) } + if (json.indexOf(this.#delimiter) != -1) reject(new Error(`object to serialize contains the buffer delimiter ${this.#delimiter}, set alternative delimiter`)) + try { length = Buffer.byteLength(json, 'utf8') } catch (err) { reject(err) } + const buf = length + this.#delimiter + json + resolve(buf) }) } - #handleData (data) { - this.#buffer += data + #handleData (str) { + this.#strData += str if (this.#contentLength == null) { - const i = this.#buffer.indexOf(this.#delimiter) - // Check if the buffer has a this.#opts.delimeter or "#", if not, the end of the buffer string might be in the middle of a content length string - if (i !== -1) { - const rawContentLength = this.#buffer.substring(0, i) + const i = this.#strData.indexOf(this.#delimiter) + if (i !== -1) { // delimiter found + // Check if decoded string has delimiter, if not the end of the buffer string might be in the middle of a content length string + const rawContentLength = this.#strData.substring(0, i) this.#contentLength = parseInt(rawContentLength) if (isNaN(this.#contentLength)) { this.#contentLength = null - this.#buffer = '' + this.#strData = '' const err = new Error( 'Invalid content length supplied (' + rawContentLength + ') in: ' + - this.#buffer + this.#strData ) err.code = 'E#INVALID#CONTENT#LENGTH' throw err } - this.#buffer = this.#buffer.substring(i + 1) + // remove delimiter + this.#strData = this.#strData.substring(i + this.#delimiter.length) } } if (this.#contentLength != null) { - const length = Buffer.byteLength(this.#buffer, 'utf8') + const length = Buffer.byteLength(this.#strData, 'utf8') if (length === this.#contentLength) { - this.#handleMessage(this.#buffer) + // json to parse is complete + this.#makeObj(this.#strData) } else if (length > this.#contentLength) { - const message = this.#buffer.substring(0, this.#contentLength) - const rest = this.#buffer.substring(this.#contentLength) - this.#handleMessage(message) - this.onData(rest) + const json = this.#strData.substring(0, this.#contentLength) + const more = this.#strData.substring(this.#contentLength) + this.#makeObj(json) + this.decode(more) } } } // private methods - #handleMessage (data) { + #makeObj (json) { this.#contentLength = null - this.#buffer = '' - let message + this.#strData = '' + let obj try { - message = JSON.parse(data) + obj = JSON.parse(json) } catch (e) { const err = new Error( - 'Could not parse JSON: ' + e.message + '\nRequest data: ' + data + 'Could not parse JSON: ' + e.message + '\nRequest data: ' + json ) err.code = 'E#INVALID#JSON' throw err } - message = message || {} + obj = obj || {} // console.log('stream message', message, this.#state) if (this.#state === 'paused') { - this.#queue.push(message) + this.#queue.push(obj) } if (this.#state === 'online') { this.#queue = [] - this.emit('message', message) + this.emit(this.#event, obj) } } } -export { JsonStream } -export default JsonStream +export { ObjBuffer } +export default ObjBuffer diff --git a/test/test.js b/test/test.js index 5a57506..2821619 100644 --- a/test/test.js +++ b/test/test.js @@ -1,36 +1,56 @@ import { expect } from 'chai' -import JsonStream from '../src/json-stream.js' +import ObjBuffer from '../src/obj-buffer.js' -const obj = { cmd: 'test', _header: { name: 'a name', id: '12345' }, arg: 5 } -const ser = '63#{"cmd":"test","_header":{"name":"a name","id":"12345"},"arg":5}' -const jss = new JsonStream() +const obj = { cmd: 'test', _header: { name: 'a # name', id: 'i12345' }, arg: 5 } +const ser = '66_%#_{"cmd":"test","_header":{"name":"a # name","id":"i12345"},"arg":5}' +const otb = new ObjBuffer() +const event = 'decoded' -describe('JSON stream check', function () { - - it('Should stringify and serialize a message/object', async function () { - expect(await jss.serialize(obj)).to.equal(ser) +describe('Object to Buffer encode/decode', function () { + it('Should stringify and encode a message/object', async function () { + expect(await otb.encode(obj)).to.equal(ser) }) - it('Should serialize and decode a message', async function () { - jss.on('message', mes => { - // console.log(mes) - expect(mes).to.deep.equal(obj) + it('Should encode and decode a message', function () { + return new Promise(async (resolve, reject) => { + otb.once(event, mes => { + // console.log('message received', mes) + expect(mes).to.deep.equal(obj) + resolve() + }) + const ser = await otb.encode(obj) + // console.log('encoded', ser) + otb.decode(ser) }) - const ser = await jss.serialize(obj) - jss.onData(ser) }) - it('Should queue messages if paused and resumed', async function () { - jss.on('message', mes => { - // console.log('queued message', mes) - expect(mes).to.deep.equal(obj) + it('encode throws an error if object contains the delmiter', async function () { + obj._header.name = '_%#_' + return otb.encode(obj) + .then(res => { + expect.fail('no error was thrown for') + }) + .catch(error => { + // console.log('error', error) + expect(error).to.be.an('error') + } + ) + }) + + it('Should queue message if paused and emit if resumed', function () { + return new Promise(async (resolve, reject) => { + otb.once(event, mes => { + // console.log('queued message', mes) + expect(mes).to.deep.equal(obj) + resolve() + }) + obj._header.name = 'name' + const ser = await otb.encode(obj) + otb.pause() + otb.decode(ser) + // console.log(otb.state, otb.queue[0]) + expect(otb.queue[0]).to.deep.equal(obj) + otb.resume() }) - const ser = await jss.serialize(obj) - jss.pause() - jss.onData(ser) - // console.log(jss.state, jss.queue[0]) - expect(jss.queue[0]).to.deep.equal(obj) - jss.resume() }) - })