diff --git a/.babelrc b/.babelrc new file mode 100644 index 0000000..3b48cbc --- /dev/null +++ b/.babelrc @@ -0,0 +1,10 @@ +{ + "presets": [ + [ + "@babel/preset-env", + { + "shippedProposals": true + } + ] + ] +} \ No newline at end of file diff --git a/.eslintrc.yml b/.eslintrc.yml new file mode 100644 index 0000000..c2ca30a --- /dev/null +++ b/.eslintrc.yml @@ -0,0 +1,16 @@ +env: + node: true + es2021: true + mocha: true +extends: + - standard +parser: "@babel/eslint-parser" +parserOptions: + ecmaVersion: 12 + sourceType: module +rules: + indent: ["error", 2] + no-console: 0 + semi: ["error", "never"] + # linebreak-style: ["error", "unix"] + quotes: ["error", "single"] diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..faad3eb --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +/node_modules/ +/coverage/ +*.lock diff --git a/.npmignore b/.npmignore new file mode 100644 index 0000000..f16fc41 --- /dev/null +++ b/.npmignore @@ -0,0 +1,4 @@ +tests/ +test/ +*.test.js +testing/ diff --git a/package.json b/package.json new file mode 100644 index 0000000..08caf04 --- /dev/null +++ b/package.json @@ -0,0 +1,44 @@ +{ + "name": "@uci-utils/json-stream", + "version": "0.1.0", + "description": "seriaized json stream class for communication in a pipe", + "main": "src/json-stream.js", + "type": "module", + "scripts": { + "test": "./node_modules/.bin/mocha --timeout 30000", + "test:dev": "UCI_ENV=dev ./node_modules/.bin/nodemon --exec './node_modules/.bin/mocha --timeout 3000' || exit 0", + "test:dev:trace": "UCI_LOG_LEVEL='trace' npm run test:dev", + "test:dev:warn": "UCI_LOG_LEVEL='warn' npm run test:dev", + "test:log": "UCI_ENV=pro UCI_LOG_PATH=./test/test.log 0 npm run test || exit 0" + }, + "author": "David Kebler", + "license": "MIT", + "repository": { + "type": "git", + "url": "" + }, + "keywords": [ + "node.js", + "stream", + "JSON", + "pipe", + "socket" + ], + "bugs": { + "url": "" + }, + "homepage": "", + "dependencies": { + "await-to-js": "^3.0.0" + }, + "devDependencies": { + "@babel/core": "^7.14.0", + "@babel/eslint-parser": "^7.13.14", + "@babel/preset-env": "^7.14.1", + "chai": "^4.3.4", + "eslint": "^7.26.0", + "eslint-config-standard": "^16.0.2", + "mocha": "^8.4.0", + "nodemon": "^2.0.7" + } +} \ No newline at end of file diff --git a/readme.md b/readme.md new file mode 100644 index 0000000..f518120 --- /dev/null +++ b/readme.md @@ -0,0 +1,6 @@ +# uCOMmandIt JSON Stream Utility + +A calls designed to serialize a js object (JSON) for steaming across a pipe + + + diff --git a/src/json-stream.js b/src/json-stream.js new file mode 100644 index 0000000..5455969 --- /dev/null +++ b/src/json-stream.js @@ -0,0 +1,134 @@ +// adpated from https://github.com/sebastianseilund/node-json-socket + +// native +import { StringDecoder } from 'string_decoder' +import EventEmitter from 'events' + +// import to from 'await-to-js' + +const decoder = new StringDecoder() + +/** + * JsonStream - Description + * @extends EventEmitter + */ +class JsonStream extends EventEmitter { + // private fields + #contentLength = null + #buffer ='' + #delimiter='#' + #state='online' + #queue=[] + #stream + + constructor (opts = {}) { + super() + this.#delimiter = opts.delimiter || this.#delimiter + this.onData = this.onData.bind(this) + this.serialize = this.serialize.bind(this) + } + + get state () { return this.#state } + + get queue () { return this.#queue } + + offline () { this.#state = 'offline' } + + pause () { this.#state = 'paused' } // queue messages in handler + + resume () { + // emit FIFO + for (let index = 0; index < this.#queue.length; index++) { + this.emit('message', this.#queue[index]) + } + this.#queue = [] + this.#state = 'online' + } + + online () { this.#state = 'online' } + + onData (data) { + data = decoder.write(data) + try { + this.#handleData(data) + } catch (err) { + // emit an error + this.emit('error', err) + } + } + + async serialize (message) { + return new Promise((resolve, reject) => { + let messageData; let length + try { messageData = JSON.stringify(message) } catch (err) { reject(err) } + try { length = Buffer.byteLength(messageData, 'utf8') } catch (err) { reject(err) } + const data = length + this.#delimiter + messageData + resolve(data) + }) + } + + #handleData (data) { + this.#buffer += data + if (this.#contentLength == null) { + const i = this.#buffer.indexOf(this.#delimiter) + // Check if the buffer has a this.#opts.delimeter or "#", if not, the end of the buffer string might be in the middle of a content length string + if (i !== -1) { + const rawContentLength = this.#buffer.substring(0, i) + this.#contentLength = parseInt(rawContentLength) + if (isNaN(this.#contentLength)) { + this.#contentLength = null + this.#buffer = '' + const err = new Error( + 'Invalid content length supplied (' + + rawContentLength + + ') in: ' + + this.#buffer + ) + err.code = 'E#INVALID#CONTENT#LENGTH' + throw err + } + this.#buffer = this.#buffer.substring(i + 1) + } + } + if (this.#contentLength != null) { + const length = Buffer.byteLength(this.#buffer, 'utf8') + if (length === this.#contentLength) { + this.#handleMessage(this.#buffer) + } else if (length > this.#contentLength) { + const message = this.#buffer.substring(0, this.#contentLength) + const rest = this.#buffer.substring(this.#contentLength) + this.#handleMessage(message) + this.onData(rest) + } + } + } + + // private methods + + #handleMessage (data) { + this.#contentLength = null + this.#buffer = '' + let message + try { + message = JSON.parse(data) + } catch (e) { + const err = new Error( + 'Could not parse JSON: ' + e.message + '\nRequest data: ' + data + ) + err.code = 'E#INVALID#JSON' + throw err + } + message = message || {} + // console.log('stream message', message, this.#state) + if (this.#state === 'paused') { + this.#queue.push(message) + } + if (this.#state === 'online') { + this.#queue = [] + this.emit('message', message) + } + } +} + +export { JsonStream } +export default JsonStream diff --git a/test/test.js b/test/test.js new file mode 100644 index 0000000..5a57506 --- /dev/null +++ b/test/test.js @@ -0,0 +1,36 @@ +import { expect } from 'chai' +import JsonStream from '../src/json-stream.js' + +const obj = { cmd: 'test', _header: { name: 'a name', id: '12345' }, arg: 5 } +const ser = '63#{"cmd":"test","_header":{"name":"a name","id":"12345"},"arg":5}' +const jss = new JsonStream() + +describe('JSON stream check', function () { + + it('Should stringify and serialize a message/object', async function () { + expect(await jss.serialize(obj)).to.equal(ser) + }) + + it('Should serialize and decode a message', async function () { + jss.on('message', mes => { + // console.log(mes) + expect(mes).to.deep.equal(obj) + }) + const ser = await jss.serialize(obj) + jss.onData(ser) + }) + + it('Should queue messages if paused and resumed', async function () { + jss.on('message', mes => { + // console.log('queued message', mes) + expect(mes).to.deep.equal(obj) + }) + const ser = await jss.serialize(obj) + jss.pause() + jss.onData(ser) + // console.log(jss.state, jss.queue[0]) + expect(jss.queue[0]).to.deep.equal(obj) + jss.resume() + }) + +})