139 lines
3.7 KiB
JavaScript
139 lines
3.7 KiB
JavaScript
|
// adpated from https://github.com/sebastianseilund/node-json-socket
|
||
|
|
||
|
// native
|
||
|
import { StringDecoder } from 'string_decoder'
|
||
|
import EventEmitter from 'events'
|
||
|
|
||
|
// import to from 'await-to-js'
|
||
|
|
||
|
const decoder = new StringDecoder()
|
||
|
|
||
|
/**
|
||
|
* JsonStream - Description
|
||
|
* @extends EventEmitter
|
||
|
*/
|
||
|
class ObjBuffer extends EventEmitter {
|
||
|
// private fields
|
||
|
#contentLength = null
|
||
|
#strData =''
|
||
|
#delimiter='_%#_'
|
||
|
#state='online'
|
||
|
#queue=[]
|
||
|
#event = 'decoded'
|
||
|
|
||
|
constructor (opts = {}) {
|
||
|
super()
|
||
|
this.#delimiter = opts.delimiter || this.#delimiter
|
||
|
this.#event = opts.emit || this.#event
|
||
|
this.decode = this.decode.bind(this)
|
||
|
this.resume = this.resume.bind(this)
|
||
|
}
|
||
|
|
||
|
get state () { return this.#state }
|
||
|
|
||
|
get queue () { return this.#queue }
|
||
|
|
||
|
offline () { this.#state = 'offline' }
|
||
|
|
||
|
pause () { this.#state = 'paused' } // queue messages in handler
|
||
|
|
||
|
resume () {
|
||
|
// emit FIFO
|
||
|
for (let index = 0; index < this.#queue.length; index++) {
|
||
|
this.emit(this.#event, this.#queue[index])
|
||
|
}
|
||
|
this.#queue = []
|
||
|
this.#state = 'online'
|
||
|
}
|
||
|
|
||
|
online () { this.#state = 'online' }
|
||
|
|
||
|
decode (buf) {
|
||
|
const str = decoder.write(buf)
|
||
|
try {
|
||
|
this.#handleData(str)
|
||
|
} catch (err) {
|
||
|
// emit an error
|
||
|
this.emit('error', err)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
async encode (obj) {
|
||
|
return new Promise((resolve, reject) => {
|
||
|
let json; let length
|
||
|
try { json = JSON.stringify(obj) } catch (err) { reject(err) }
|
||
|
if (json.indexOf(this.#delimiter) != -1) reject(new Error(`object to serialize contains the buffer delimiter ${this.#delimiter}, set alternative delimiter`))
|
||
|
try { length = Buffer.byteLength(json, 'utf8') } catch (err) { reject(err) }
|
||
|
const buf = length + this.#delimiter + json
|
||
|
resolve(buf)
|
||
|
})
|
||
|
}
|
||
|
|
||
|
#handleData (str) {
|
||
|
this.#strData += str
|
||
|
if (this.#contentLength == null) {
|
||
|
const i = this.#strData.indexOf(this.#delimiter)
|
||
|
if (i !== -1) { // delimiter found
|
||
|
// Check if decoded string has delimiter, if not the end of the buffer string might be in the middle of a content length string
|
||
|
const rawContentLength = this.#strData.substring(0, i)
|
||
|
this.#contentLength = parseInt(rawContentLength)
|
||
|
if (isNaN(this.#contentLength)) {
|
||
|
this.#contentLength = null
|
||
|
this.#strData = ''
|
||
|
const err = new Error(
|
||
|
'Invalid content length supplied (' +
|
||
|
rawContentLength +
|
||
|
') in: ' +
|
||
|
this.#strData
|
||
|
)
|
||
|
err.code = 'E#INVALID#CONTENT#LENGTH'
|
||
|
throw err
|
||
|
}
|
||
|
// remove delimiter
|
||
|
this.#strData = this.#strData.substring(i + this.#delimiter.length)
|
||
|
}
|
||
|
}
|
||
|
if (this.#contentLength != null) {
|
||
|
const length = Buffer.byteLength(this.#strData, 'utf8')
|
||
|
if (length === this.#contentLength) {
|
||
|
// json to parse is complete
|
||
|
this.#makeObj(this.#strData)
|
||
|
} else if (length > this.#contentLength) {
|
||
|
const json = this.#strData.substring(0, this.#contentLength)
|
||
|
const more = this.#strData.substring(this.#contentLength)
|
||
|
this.#makeObj(json)
|
||
|
this.decode(more)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// private methods
|
||
|
|
||
|
#makeObj (json) {
|
||
|
this.#contentLength = null
|
||
|
this.#strData = ''
|
||
|
let obj
|
||
|
try {
|
||
|
obj = JSON.parse(json)
|
||
|
} catch (e) {
|
||
|
const err = new Error(
|
||
|
'Could not parse JSON: ' + e.message + '\nRequest data: ' + json
|
||
|
)
|
||
|
err.code = 'E#INVALID#JSON'
|
||
|
throw err
|
||
|
}
|
||
|
obj = obj || {}
|
||
|
// console.log('stream message', message, this.#state)
|
||
|
if (this.#state === 'paused') {
|
||
|
this.#queue.push(obj)
|
||
|
}
|
||
|
if (this.#state === 'online') {
|
||
|
this.#queue = []
|
||
|
this.emit(this.#event, obj)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
export { ObjBuffer }
|
||
|
export default ObjBuffer
|