uci-socket/src/json-stream.js

114 lines
3.3 KiB
JavaScript

// adpated from https://github.com/sebastianseilund/node-json-socket
import { StringDecoder } from 'string_decoder'
import EventEmitter from 'events'
import btc from 'better-try-catch'
const decoder = new StringDecoder()
/**
* JsonStream - Description
* @extends EventEmitter
*/
class JsonStream extends EventEmitter {
constructor(opts = {}) {
super()
this._contentLength = null
this._buffer = ''
this._delimeter = opts.delimiter || '#'
this.onData = this.onData.bind(this)
this.serialize = this.serialize.bind(this)
this._state = 'online'
this._queue = []
}
get state () {return this._state}
offline () { this._state = 'offline' }
pause () {this._state = 'paused'} // queue messages in handler
resume () {
// emit the messages in the queue
this._state='online'
}
online() {this._state = 'online'}
onData(data) {
data = decoder.write(data)
try {
this._handleData(data)
} catch (err) {
// emit an error on the socket that handled with other socket errors
this.emit('error', err)
}
}
async serialize(message) {
return new Promise((resolve, reject) => {
let [err, messageData] = btc(JSON.stringify)(message)
if (err) reject(err)
let [err2, length] = btc(Buffer.byteLength)(messageData, 'utf8')
if (err2) reject(err2)
let data = length + this._delimeter + messageData
resolve(data)
})
}
_handleData(data) {
this._buffer += data
if (this._contentLength == null) {
var i = this._buffer.indexOf(this._delimeter)
//Check if the buffer has a this._opts.delimeter or "#", if not, the end of the buffer string might be in the middle of a content length string
if (i !== -1) {
var rawContentLength = this._buffer.substring(0, i)
this._contentLength = parseInt(rawContentLength)
if (isNaN(this._contentLength)) {
this._contentLength = null
this._buffer = ''
let err = new Error(
'Invalid content length supplied (' +
rawContentLength +
') in: ' +
this._buffer
)
err.code = 'E_INVALID_CONTENT_LENGTH'
throw err
}
this._buffer = this._buffer.substring(i + 1)
}
}
if (this._contentLength != null) {
let length = Buffer.byteLength(this._buffer, 'utf8')
if (length == this._contentLength) {
this._handleMessage(this._buffer)
} else if (length > this._contentLength) {
let message = this._buffer.substring(0, this._contentLength)
let rest = this._buffer.substring(this._contentLength)
this._handleMessage(message)
this.onData(rest)
}
}
}
_handleMessage(data) {
this._contentLength = null
this._buffer = ''
var message
try {
message = JSON.parse(data)
} catch (e) {
let err = new Error(
'Could not parse JSON: ' + e.message + '\nRequest data: ' + data
)
err.code = 'E_INVALID_JSON'
throw err
}
message = message || {}
// console.log('stream message', message, this._state)
if (this._stream ==='pause') {
if (message._header.id !== 'ping') this.queue.shift(message)
}
if(this._state==='online') this.emit('message', message)
}
}
export default JsonStream