// adpated from https://github.com/sebastianseilund/node-json-socket import { StringDecoder } from 'string_decoder' import EventEmitter from 'events' import btc from 'better-try-catch' const decoder = new StringDecoder() /** * JsonStream - Description * @extends EventEmitter */ class JsonStream extends EventEmitter { constructor(opts = {}) { super() this._contentLength = null this._buffer = '' this._delimeter = opts.delimiter || '#' this.onData = this.onData.bind(this) this.serialize = this.serialize.bind(this) this._state = 'online' this._queue = [] } get state () {return this._state} offline () { this._state = 'offline' } pause () {this._state = 'paused'} // queue messages in handler resume () { // emit the messages in the queue this._state='online' } online() {this._state = 'online'} onData(data) { data = decoder.write(data) try { this._handleData(data) } catch (err) { // emit an error on the socket that handled with other socket errors this.emit('error', err) } } async serialize(message) { return new Promise((resolve, reject) => { let [err, messageData] = btc(JSON.stringify)(message) if (err) reject(err) let [err2, length] = btc(Buffer.byteLength)(messageData, 'utf8') if (err2) reject(err2) let data = length + this._delimeter + messageData resolve(data) }) } _handleData(data) { this._buffer += data if (this._contentLength == null) { var i = this._buffer.indexOf(this._delimeter) //Check if the buffer has a this._opts.delimeter or "#", if not, the end of the buffer string might be in the middle of a content length string if (i !== -1) { var rawContentLength = this._buffer.substring(0, i) this._contentLength = parseInt(rawContentLength) if (isNaN(this._contentLength)) { this._contentLength = null this._buffer = '' let err = new Error( 'Invalid content length supplied (' + rawContentLength + ') in: ' + this._buffer ) err.code = 'E_INVALID_CONTENT_LENGTH' throw err } this._buffer = this._buffer.substring(i + 1) } } if (this._contentLength != null) { let length = Buffer.byteLength(this._buffer, 'utf8') if (length == this._contentLength) { this._handleMessage(this._buffer) } else if (length > this._contentLength) { let message = this._buffer.substring(0, this._contentLength) let rest = this._buffer.substring(this._contentLength) this._handleMessage(message) this.onData(rest) } } } _handleMessage(data) { this._contentLength = null this._buffer = '' var message try { message = JSON.parse(data) } catch (e) { let err = new Error( 'Could not parse JSON: ' + e.message + '\nRequest data: ' + data ) err.code = 'E_INVALID_JSON' throw err } message = message || {} // console.log('stream message', message, this._state) if (this._stream ==='pause') { if (message._header.id !== 'ping') this.queue.shift(message) } if(this._state==='online') this.emit('message', message) } } export default JsonStream