// adpated from https://github.com/sebastianseilund/node-json-socket // native import { StringDecoder } from 'string_decoder' import EventEmitter from 'events' // import to from 'await-to-js' const decoder = new StringDecoder() /** * JsonStream - Description * @extends EventEmitter */ class ObjBuffer extends EventEmitter { // private fields #contentLength = null #strData ='' #delimiter='_%#_' #state='online' #queue=[] #event = 'decoded' constructor (opts = {}) { super() this.#delimiter = opts.delimiter || this.#delimiter this.#event = opts.emit || this.#event this.decode = this.decode.bind(this) this.resume = this.resume.bind(this) } get state () { return this.#state } get queue () { return this.#queue } offline () { this.#state = 'offline' } pause () { this.#state = 'paused' } // queue messages in handler resume () { // emit FIFO for (let index = 0; index < this.#queue.length; index++) { this.emit(this.#event, this.#queue[index]) } this.#queue = [] this.#state = 'online' } online () { this.#state = 'online' } decode (buf) { const str = decoder.write(buf) try { this.#handleData(str) } catch (err) { // emit an error this.emit('error', err) } } async encode (obj) { return new Promise((resolve, reject) => { let json; let length try { json = JSON.stringify(obj) } catch (err) { reject(err) } if (json.indexOf(this.#delimiter) != -1) reject(new Error(`object to serialize contains the buffer delimiter ${this.#delimiter}, set alternative delimiter`)) try { length = Buffer.byteLength(json, 'utf8') } catch (err) { reject(err) } const buf = length + this.#delimiter + json resolve(buf) }) } #handleData (str) { this.#strData += str if (this.#contentLength == null) { const i = this.#strData.indexOf(this.#delimiter) if (i !== -1) { // delimiter found // Check if decoded string has delimiter, if not the end of the buffer string might be in the middle of a content length string const rawContentLength = this.#strData.substring(0, i) this.#contentLength = parseInt(rawContentLength) if (isNaN(this.#contentLength)) { this.#contentLength = null this.#strData = '' const err = new Error( 'Invalid content length supplied (' + rawContentLength + ') in: ' + this.#strData ) err.code = 'E#INVALID#CONTENT#LENGTH' throw err } // remove delimiter this.#strData = this.#strData.substring(i + this.#delimiter.length) } } if (this.#contentLength != null) { const length = Buffer.byteLength(this.#strData, 'utf8') if (length === this.#contentLength) { // json to parse is complete this.#makeObj(this.#strData) } else if (length > this.#contentLength) { const json = this.#strData.substring(0, this.#contentLength) const more = this.#strData.substring(this.#contentLength) this.#makeObj(json) this.decode(more) } } } // private methods #makeObj (json) { this.#contentLength = null this.#strData = '' let obj try { obj = JSON.parse(json) } catch (e) { const err = new Error( 'Could not parse JSON: ' + e.message + '\nRequest data: ' + json ) err.code = 'E#INVALID#JSON' throw err } obj = obj || {} // console.log('stream message', message, this.#state) if (this.#state === 'paused') { this.#queue.push(obj) } if (this.#state === 'online') { this.#queue = [] this.emit(this.#event, obj) } } } export { ObjBuffer } export default ObjBuffer