• Jump To … +
    ./src/consumer.js ./src/index.js ./src/json-stream.js ./src/socket.js
  • json-stream.js

  • ¶

    adpated from https://github.com/sebastianseilund/node-json-socket

    import {StringDecoder} from 'string_decoder'
    import EventEmitter from 'events'
    import btc from 'better-try-catch'
    
    const decoder = new StringDecoder()
    
    export default class JsonStream extends EventEmitter{
      constructor(opts={}){
        super()
        this._contentLength = null
        this._buffer = ''
        this._delimeter = opts.delimiter || '#'
        this.onData = this.onData.bind(this)
        this.serialize = this.serialize.bind(this)
      }
    
    
      onData (data) {
  • ¶

    console.log(‘a chunk arrived’, data)

        data = decoder.write(data)
        try {
          this._handleData(data)
        } catch (e) {
          this.emit('error', { error: e })
        }
      }
    
      async serialize(message) {
        return new Promise( (resolve,reject) => {
          let [err,messageData] = btc(JSON.stringify)(message)
          if (err) reject(err)
          let [err2,length] = btc(Buffer.byteLength)(messageData, 'utf8')
          if (err2) reject(err2)
          let data = length + this._delimeter + messageData
  • ¶

    console.log(‘serialized’,data)

          resolve(data)
        })
      }
    
      _handleData (data) {
        this._buffer += data
        if (this._contentLength == null) {
          var i = this._buffer.indexOf(this._delimeter)
  • ¶

    Check if the buffer has a this._opts.delimeter or “#”, if not, the end of the buffer string might be in the middle of a content length string

          if (i !== -1) {
            var rawContentLength = this._buffer.substring(0, i)
            this._contentLength = parseInt(rawContentLength)
            if (isNaN(this._contentLength)) {
              this._contentLength = null
              this._buffer = ''
              var err = new Error('Invalid content length supplied ('+rawContentLength+') in: '+this._buffer)
              err.code = 'E_INVALID_CONTENT_LENGTH'
              throw err
            }
            this._buffer = this._buffer.substring(i+1)
          }
        }
        if (this._contentLength != null) {
          var length = Buffer.byteLength(this._buffer, 'utf8')
          if (length == this._contentLength) {
            this._handleMessage(this._buffer)
          } else if (length > this._contentLength) {
            var message = this._buffer.substring(0, this._contentLength)
            var rest = this._buffer.substring(this._contentLength)
            this._handleMessage(message)
            this.onData(rest)
          }
        }
      }
    
      _handleMessage (data) {
        this._contentLength = null
        this._buffer = ''
        var message
        try {
          message = JSON.parse(data)
        } catch (e) {
          var err = new Error('Could not parse JSON: '+e.message+'\nRequest data: '+data)
          err.code = 'E_INVALID_JSON'
          throw err
        }
        message = message || {}
        this.emit('message', message)
      }
    
    }