rename package to obj-buffer, change default delimiter and some prop/variable names to make more sense.  Allow custom event (default: decoded)
master
Kebler Network System Administrator 2021-05-28 11:02:28 -07:00
parent 4fe5c7c0e9
commit 2c668d9d33
4 changed files with 103 additions and 79 deletions

View File

@ -1,12 +1,12 @@
{ {
"name": "@uci-utils/json-stream", "name": "@uci-utils/obj-buffer",
"version": "0.1.0", "version": "0.1.1",
"description": "seriaized json stream class for communication in a pipe", "description": "encode and decode a plain object for communication across a stream",
"main": "src/json-stream.js", "main": "src/obj-stream.js",
"type": "module", "type": "module",
"scripts": { "scripts": {
"test": "./node_modules/.bin/mocha --timeout 30000", "test": "./node_modules/.bin/mocha --timeout 30000",
"test:dev": "UCI_ENV=dev ./node_modules/.bin/nodemon --exec './node_modules/.bin/mocha --timeout 3000' || exit 0", "test:dev": "UCI_ENV=dev ./node_modules/.bin/nodemon --exec './node_modules/.bin/mocha --timeout 3000' || exit 1",
"test:dev:trace": "UCI_LOG_LEVEL='trace' npm run test:dev", "test:dev:trace": "UCI_LOG_LEVEL='trace' npm run test:dev",
"test:dev:warn": "UCI_LOG_LEVEL='warn' npm run test:dev", "test:dev:warn": "UCI_LOG_LEVEL='warn' npm run test:dev",
"test:log": "UCI_ENV=pro UCI_LOG_PATH=./test/test.log 0 npm run test || exit 0" "test:log": "UCI_ENV=pro UCI_LOG_PATH=./test/test.log 0 npm run test || exit 0"
@ -32,13 +32,13 @@
"await-to-js": "^3.0.0" "await-to-js": "^3.0.0"
}, },
"devDependencies": { "devDependencies": {
"@babel/core": "^7.14.0", "@babel/core": "^7.14.3",
"@babel/eslint-parser": "^7.13.14", "@babel/eslint-parser": "^7.14.4",
"@babel/preset-env": "^7.14.1", "@babel/preset-env": "^7.14.4",
"chai": "^4.3.4", "chai": "^4.3.4",
"eslint": "^7.26.0", "eslint": "^7.27.0",
"eslint-config-standard": "^16.0.2", "eslint-config-standard": "^16.0.3",
"mocha": "^8.4.0", "mocha": "^8.4.0",
"nodemon": "^2.0.7" "nodemon": "^2.0.7"
} }
} }

View File

@ -1,6 +1,6 @@
# uCOMmandIt JSON Stream Utility # uCOMmandIt Object Buffer
A calls designed to serialize a js object (JSON) for steaming across a pipe Encode/Decode a js object via JSON for steaming. This allows one to send/receive packets that are a js object between processes or in any stream.

View File

@ -12,20 +12,21 @@ const decoder = new StringDecoder()
* JsonStream - Description * JsonStream - Description
* @extends EventEmitter * @extends EventEmitter
*/ */
class JsonStream extends EventEmitter { class ObjBuffer extends EventEmitter {
// private fields // private fields
#contentLength = null #contentLength = null
#buffer ='' #strData =''
#delimiter='#' #delimiter='_%#_'
#state='online' #state='online'
#queue=[] #queue=[]
#stream #event = 'decoded'
constructor (opts = {}) { constructor (opts = {}) {
super() super()
this.#delimiter = opts.delimiter || this.#delimiter this.#delimiter = opts.delimiter || this.#delimiter
this.onData = this.onData.bind(this) this.#event = opts.emit || this.#event
this.serialize = this.serialize.bind(this) this.decode = this.decode.bind(this)
this.resume = this.resume.bind(this)
} }
get state () { return this.#state } get state () { return this.#state }
@ -39,7 +40,7 @@ class JsonStream extends EventEmitter {
resume () { resume () {
// emit FIFO // emit FIFO
for (let index = 0; index < this.#queue.length; index++) { for (let index = 0; index < this.#queue.length; index++) {
this.emit('message', this.#queue[index]) this.emit(this.#event, this.#queue[index])
} }
this.#queue = [] this.#queue = []
this.#state = 'online' this.#state = 'online'
@ -47,88 +48,91 @@ class JsonStream extends EventEmitter {
online () { this.#state = 'online' } online () { this.#state = 'online' }
onData (data) { decode (buf) {
data = decoder.write(data) const str = decoder.write(buf)
try { try {
this.#handleData(data) this.#handleData(str)
} catch (err) { } catch (err) {
// emit an error // emit an error
this.emit('error', err) this.emit('error', err)
} }
} }
async serialize (message) { async encode (obj) {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
let messageData; let length let json; let length
try { messageData = JSON.stringify(message) } catch (err) { reject(err) } try { json = JSON.stringify(obj) } catch (err) { reject(err) }
try { length = Buffer.byteLength(messageData, 'utf8') } catch (err) { reject(err) } if (json.indexOf(this.#delimiter) != -1) reject(new Error(`object to serialize contains the buffer delimiter ${this.#delimiter}, set alternative delimiter`))
const data = length + this.#delimiter + messageData try { length = Buffer.byteLength(json, 'utf8') } catch (err) { reject(err) }
resolve(data) const buf = length + this.#delimiter + json
resolve(buf)
}) })
} }
#handleData (data) { #handleData (str) {
this.#buffer += data this.#strData += str
if (this.#contentLength == null) { if (this.#contentLength == null) {
const i = this.#buffer.indexOf(this.#delimiter) const i = this.#strData.indexOf(this.#delimiter)
// Check if the buffer has a this.#opts.delimeter or "#", if not, the end of the buffer string might be in the middle of a content length string if (i !== -1) { // delimiter found
if (i !== -1) { // Check if decoded string has delimiter, if not the end of the buffer string might be in the middle of a content length string
const rawContentLength = this.#buffer.substring(0, i) const rawContentLength = this.#strData.substring(0, i)
this.#contentLength = parseInt(rawContentLength) this.#contentLength = parseInt(rawContentLength)
if (isNaN(this.#contentLength)) { if (isNaN(this.#contentLength)) {
this.#contentLength = null this.#contentLength = null
this.#buffer = '' this.#strData = ''
const err = new Error( const err = new Error(
'Invalid content length supplied (' + 'Invalid content length supplied (' +
rawContentLength + rawContentLength +
') in: ' + ') in: ' +
this.#buffer this.#strData
) )
err.code = 'E#INVALID#CONTENT#LENGTH' err.code = 'E#INVALID#CONTENT#LENGTH'
throw err throw err
} }
this.#buffer = this.#buffer.substring(i + 1) // remove delimiter
this.#strData = this.#strData.substring(i + this.#delimiter.length)
} }
} }
if (this.#contentLength != null) { if (this.#contentLength != null) {
const length = Buffer.byteLength(this.#buffer, 'utf8') const length = Buffer.byteLength(this.#strData, 'utf8')
if (length === this.#contentLength) { if (length === this.#contentLength) {
this.#handleMessage(this.#buffer) // json to parse is complete
this.#makeObj(this.#strData)
} else if (length > this.#contentLength) { } else if (length > this.#contentLength) {
const message = this.#buffer.substring(0, this.#contentLength) const json = this.#strData.substring(0, this.#contentLength)
const rest = this.#buffer.substring(this.#contentLength) const more = this.#strData.substring(this.#contentLength)
this.#handleMessage(message) this.#makeObj(json)
this.onData(rest) this.decode(more)
} }
} }
} }
// private methods // private methods
#handleMessage (data) { #makeObj (json) {
this.#contentLength = null this.#contentLength = null
this.#buffer = '' this.#strData = ''
let message let obj
try { try {
message = JSON.parse(data) obj = JSON.parse(json)
} catch (e) { } catch (e) {
const err = new Error( const err = new Error(
'Could not parse JSON: ' + e.message + '\nRequest data: ' + data 'Could not parse JSON: ' + e.message + '\nRequest data: ' + json
) )
err.code = 'E#INVALID#JSON' err.code = 'E#INVALID#JSON'
throw err throw err
} }
message = message || {} obj = obj || {}
// console.log('stream message', message, this.#state) // console.log('stream message', message, this.#state)
if (this.#state === 'paused') { if (this.#state === 'paused') {
this.#queue.push(message) this.#queue.push(obj)
} }
if (this.#state === 'online') { if (this.#state === 'online') {
this.#queue = [] this.#queue = []
this.emit('message', message) this.emit(this.#event, obj)
} }
} }
} }
export { JsonStream } export { ObjBuffer }
export default JsonStream export default ObjBuffer

View File

@ -1,36 +1,56 @@
import { expect } from 'chai' import { expect } from 'chai'
import JsonStream from '../src/json-stream.js' import ObjBuffer from '../src/obj-buffer.js'
const obj = { cmd: 'test', _header: { name: 'a name', id: '12345' }, arg: 5 } const obj = { cmd: 'test', _header: { name: 'a # name', id: 'i12345' }, arg: 5 }
const ser = '63#{"cmd":"test","_header":{"name":"a name","id":"12345"},"arg":5}' const ser = '66_%#_{"cmd":"test","_header":{"name":"a # name","id":"i12345"},"arg":5}'
const jss = new JsonStream() const otb = new ObjBuffer()
const event = 'decoded'
describe('JSON stream check', function () { describe('Object to Buffer encode/decode', function () {
it('Should stringify and encode a message/object', async function () {
it('Should stringify and serialize a message/object', async function () { expect(await otb.encode(obj)).to.equal(ser)
expect(await jss.serialize(obj)).to.equal(ser)
}) })
it('Should serialize and decode a message', async function () { it('Should encode and decode a message', function () {
jss.on('message', mes => { return new Promise(async (resolve, reject) => {
// console.log(mes) otb.once(event, mes => {
expect(mes).to.deep.equal(obj) // console.log('message received', mes)
expect(mes).to.deep.equal(obj)
resolve()
})
const ser = await otb.encode(obj)
// console.log('encoded', ser)
otb.decode(ser)
}) })
const ser = await jss.serialize(obj)
jss.onData(ser)
}) })
it('Should queue messages if paused and resumed', async function () { it('encode throws an error if object contains the delmiter', async function () {
jss.on('message', mes => { obj._header.name = '_%#_'
// console.log('queued message', mes) return otb.encode(obj)
expect(mes).to.deep.equal(obj) .then(res => {
expect.fail('no error was thrown for')
})
.catch(error => {
// console.log('error', error)
expect(error).to.be.an('error')
}
)
})
it('Should queue message if paused and emit if resumed', function () {
return new Promise(async (resolve, reject) => {
otb.once(event, mes => {
// console.log('queued message', mes)
expect(mes).to.deep.equal(obj)
resolve()
})
obj._header.name = 'name'
const ser = await otb.encode(obj)
otb.pause()
otb.decode(ser)
// console.log(otb.state, otb.queue[0])
expect(otb.queue[0]).to.deep.equal(obj)
otb.resume()
}) })
const ser = await jss.serialize(obj)
jss.pause()
jss.onData(ser)
// console.log(jss.state, jss.queue[0])
expect(jss.queue[0]).to.deep.equal(obj)
jss.resume()
}) })
}) })