uci-utils-obj-buffer/src/json-stream.js

135 lines
3.5 KiB
JavaScript

// adpated from https://github.com/sebastianseilund/node-json-socket
// native
import { StringDecoder } from 'string_decoder'
import EventEmitter from 'events'
// import to from 'await-to-js'
const decoder = new StringDecoder()
/**
* JsonStream - Description
* @extends EventEmitter
*/
class JsonStream extends EventEmitter {
// private fields
#contentLength = null
#buffer =''
#delimiter='#'
#state='online'
#queue=[]
#stream
constructor (opts = {}) {
super()
this.#delimiter = opts.delimiter || this.#delimiter
this.onData = this.onData.bind(this)
this.serialize = this.serialize.bind(this)
}
get state () { return this.#state }
get queue () { return this.#queue }
offline () { this.#state = 'offline' }
pause () { this.#state = 'paused' } // queue messages in handler
resume () {
// emit FIFO
for (let index = 0; index < this.#queue.length; index++) {
this.emit('message', this.#queue[index])
}
this.#queue = []
this.#state = 'online'
}
online () { this.#state = 'online' }
onData (data) {
data = decoder.write(data)
try {
this.#handleData(data)
} catch (err) {
// emit an error
this.emit('error', err)
}
}
async serialize (message) {
return new Promise((resolve, reject) => {
let messageData; let length
try { messageData = JSON.stringify(message) } catch (err) { reject(err) }
try { length = Buffer.byteLength(messageData, 'utf8') } catch (err) { reject(err) }
const data = length + this.#delimiter + messageData
resolve(data)
})
}
#handleData (data) {
this.#buffer += data
if (this.#contentLength == null) {
const i = this.#buffer.indexOf(this.#delimiter)
// Check if the buffer has a this.#opts.delimeter or "#", if not, the end of the buffer string might be in the middle of a content length string
if (i !== -1) {
const rawContentLength = this.#buffer.substring(0, i)
this.#contentLength = parseInt(rawContentLength)
if (isNaN(this.#contentLength)) {
this.#contentLength = null
this.#buffer = ''
const err = new Error(
'Invalid content length supplied (' +
rawContentLength +
') in: ' +
this.#buffer
)
err.code = 'E#INVALID#CONTENT#LENGTH'
throw err
}
this.#buffer = this.#buffer.substring(i + 1)
}
}
if (this.#contentLength != null) {
const length = Buffer.byteLength(this.#buffer, 'utf8')
if (length === this.#contentLength) {
this.#handleMessage(this.#buffer)
} else if (length > this.#contentLength) {
const message = this.#buffer.substring(0, this.#contentLength)
const rest = this.#buffer.substring(this.#contentLength)
this.#handleMessage(message)
this.onData(rest)
}
}
}
// private methods
#handleMessage (data) {
this.#contentLength = null
this.#buffer = ''
let message
try {
message = JSON.parse(data)
} catch (e) {
const err = new Error(
'Could not parse JSON: ' + e.message + '\nRequest data: ' + data
)
err.code = 'E#INVALID#JSON'
throw err
}
message = message || {}
// console.log('stream message', message, this.#state)
if (this.#state === 'paused') {
this.#queue.push(message)
}
if (this.#state === 'online') {
this.#queue = []
this.emit('message', message)
}
}
}
export { JsonStream }
export default JsonStream