From 63beca4199dd3c46bc6882262bf4d64d49a92c09 Mon Sep 17 00:00:00 2001 From: David Kebler Date: Tue, 1 Jan 2019 16:53:12 -0800 Subject: [PATCH] Added server push method. Pushes to any/all connected clients with id='pushed' which clients can listen for Add jsdoc documenation to source and did a few experiments with doc generators --- docs/docco.css | 518 ++++++++++++++++++++++++++++++++++++++ docs/src/consumer.html | 494 ++++++++++++++++++++++++++++++++++++ docs/src/index.html | 78 ++++++ docs/src/json-stream.html | 199 +++++++++++++++ docs/src/socket.html | 403 +++++++++++++++++++++++++++++ package.json | 14 +- src/consumer.js | 169 ++++++++----- src/json-stream.js | 39 +-- src/socket.js | 213 +++++++++++----- 9 files changed, 1980 insertions(+), 147 deletions(-) create mode 100644 docs/docco.css create mode 100644 docs/src/consumer.html create mode 100644 docs/src/index.html create mode 100644 docs/src/json-stream.html create mode 100644 docs/src/socket.html diff --git a/docs/docco.css b/docs/docco.css new file mode 100644 index 0000000..b60f6fa --- /dev/null +++ b/docs/docco.css @@ -0,0 +1,518 @@ +/*--------------------- Typography ----------------------------*/ + +@font-face { + font-family: 'aller-light'; + src: url('public/fonts/aller-light.eot'); + src: url('public/fonts/aller-light.eot?#iefix') format('embedded-opentype'), + url('public/fonts/aller-light.woff') format('woff'), + url('public/fonts/aller-light.ttf') format('truetype'); + font-weight: normal; + font-style: normal; +} + +@font-face { + font-family: 'aller-bold'; + src: url('public/fonts/aller-bold.eot'); + src: url('public/fonts/aller-bold.eot?#iefix') format('embedded-opentype'), + url('public/fonts/aller-bold.woff') format('woff'), + url('public/fonts/aller-bold.ttf') format('truetype'); + font-weight: normal; + font-style: normal; +} + +@font-face { + font-family: 'roboto-black'; + src: url('public/fonts/roboto-black.eot'); + src: url('public/fonts/roboto-black.eot?#iefix') format('embedded-opentype'), + url('public/fonts/roboto-black.woff') format('woff'), + url('public/fonts/roboto-black.ttf') format('truetype'); + font-weight: normal; + font-style: normal; +} + +/*--------------------- Layout ----------------------------*/ +html { height: 100%; } +body { + font-family: "aller-light"; + font-size: 14px; + line-height: 18px; + color: #30404f; + margin: 0; padding: 0; + height:100%; +} +#container { min-height: 100%; } + +a { + color: #000; +} + +b, strong { + font-weight: normal; + font-family: "aller-bold"; +} + +p { + margin: 15px 0 0px; +} + .annotation ul, .annotation ol { + margin: 25px 0; + } + .annotation ul li, .annotation ol li { + font-size: 14px; + line-height: 18px; + margin: 10px 0; + } + +h1, h2, h3, h4, h5, h6 { + color: #112233; + line-height: 1em; + font-weight: normal; + font-family: "roboto-black"; + text-transform: uppercase; + margin: 30px 0 15px 0; +} + +h1 { + margin-top: 40px; +} +h2 { + font-size: 1.26em; +} + +hr { + border: 0; + background: 1px #ddd; + height: 1px; + margin: 20px 0; +} + +pre, tt, code { + font-size: 12px; line-height: 16px; + font-family: Menlo, Monaco, Consolas, "Lucida Console", monospace; + margin: 0; padding: 0; +} + .annotation pre { + display: block; + margin: 0; + padding: 7px 10px; + background: #fcfcfc; + -moz-box-shadow: inset 0 0 10px rgba(0,0,0,0.1); + -webkit-box-shadow: inset 0 0 10px rgba(0,0,0,0.1); + box-shadow: inset 0 0 10px rgba(0,0,0,0.1); + overflow-x: auto; + } + .annotation pre code { + border: 0; + padding: 0; + background: transparent; + } + + +blockquote { + border-left: 5px solid #ccc; + margin: 0; + padding: 1px 0 1px 1em; +} + .sections blockquote p { + font-family: Menlo, Consolas, Monaco, monospace; + font-size: 12px; line-height: 16px; + color: #999; + margin: 10px 0 0; + white-space: pre-wrap; + } + +ul.sections { + list-style: none; + padding:0 0 5px 0;; + margin:0; +} + +/* + Force border-box so that % widths fit the parent + container without overlap because of margin/padding. + + More Info : http://www.quirksmode.org/css/box.html +*/ +ul.sections > li > div { + -moz-box-sizing: border-box; /* firefox */ + -ms-box-sizing: border-box; /* ie */ + -webkit-box-sizing: border-box; /* webkit */ + -khtml-box-sizing: border-box; /* konqueror */ + box-sizing: border-box; /* css3 */ +} + + +/*---------------------- Jump Page -----------------------------*/ +#jump_to, #jump_page { + margin: 0; + background: white; + -webkit-box-shadow: 0 0 25px #777; -moz-box-shadow: 0 0 25px #777; + -webkit-border-bottom-left-radius: 5px; -moz-border-radius-bottomleft: 5px; + font: 16px Arial; + cursor: pointer; + text-align: right; + list-style: none; +} + +#jump_to a { + text-decoration: none; +} + +#jump_to a.large { + display: none; +} +#jump_to a.small { + font-size: 22px; + font-weight: bold; + color: #676767; +} + +#jump_to, #jump_wrapper { + position: fixed; + right: 0; top: 0; + padding: 10px 15px; + margin:0; +} + +#jump_wrapper { + display: none; + padding:0; +} + +#jump_to:hover #jump_wrapper { + display: block; +} + +#jump_page_wrapper{ + position: fixed; + right: 0; + top: 0; + bottom: 0; +} + +#jump_page { + padding: 5px 0 3px; + margin: 0 0 25px 25px; + max-height: 100%; + overflow: auto; +} + +#jump_page .source { + display: block; + padding: 15px; + text-decoration: none; + border-top: 1px solid #eee; +} + +#jump_page .source:hover { + background: #f5f5ff; +} + +#jump_page .source:first-child { +} + +/*---------------------- Low resolutions (> 320px) ---------------------*/ +@media only screen and (min-width: 320px) { + .pilwrap { display: none; } + + ul.sections > li > div { + display: block; + padding:5px 10px 0 10px; + } + + ul.sections > li > div.annotation ul, ul.sections > li > div.annotation ol { + padding-left: 30px; + } + + ul.sections > li > div.content { + overflow-x:auto; + -webkit-box-shadow: inset 0 0 5px #e5e5ee; + box-shadow: inset 0 0 5px #e5e5ee; + border: 1px solid #dedede; + margin:5px 10px 5px 10px; + padding-bottom: 5px; + } + + ul.sections > li > div.annotation pre { + margin: 7px 0 7px; + padding-left: 15px; + } + + ul.sections > li > div.annotation p tt, .annotation code { + background: #f8f8ff; + border: 1px solid #dedede; + font-size: 12px; + padding: 0 0.2em; + } +} + +/*---------------------- (> 481px) ---------------------*/ +@media only screen and (min-width: 481px) { + #container { + position: relative; + } + body { + background-color: #F5F5FF; + font-size: 15px; + line-height: 21px; + } + pre, tt, code { + line-height: 18px; + } + p, ul, ol { + margin: 0 0 15px; + } + + + #jump_to { + padding: 5px 10px; + } + #jump_wrapper { + padding: 0; + } + #jump_to, #jump_page { + font: 10px Arial; + text-transform: uppercase; + } + #jump_page .source { + padding: 5px 10px; + } + #jump_to a.large { + display: inline-block; + } + #jump_to a.small { + display: none; + } + + + + #background { + position: absolute; + top: 0; bottom: 0; + width: 350px; + background: #fff; + border-right: 1px solid #e5e5ee; + z-index: -1; + } + + ul.sections > li > div.annotation ul, ul.sections > li > div.annotation ol { + padding-left: 40px; + } + + ul.sections > li { + white-space: nowrap; + } + + ul.sections > li > div { + display: inline-block; + } + + ul.sections > li > div.annotation { + max-width: 350px; + min-width: 350px; + min-height: 5px; + padding: 13px; + overflow-x: hidden; + white-space: normal; + vertical-align: top; + text-align: left; + } + ul.sections > li > div.annotation pre { + margin: 15px 0 15px; + padding-left: 15px; + } + + ul.sections > li > div.content { + padding: 13px; + vertical-align: top; + border: none; + -webkit-box-shadow: none; + box-shadow: none; + } + + .pilwrap { + position: relative; + display: inline; + } + + .pilcrow { + font: 12px Arial; + text-decoration: none; + color: #454545; + position: absolute; + top: 3px; left: -20px; + padding: 1px 2px; + opacity: 0; + -webkit-transition: opacity 0.2s linear; + } + .for-h1 .pilcrow { + top: 47px; + } + .for-h2 .pilcrow, .for-h3 .pilcrow, .for-h4 .pilcrow { + top: 35px; + } + + ul.sections > li > div.annotation:hover .pilcrow { + opacity: 1; + } +} + +/*---------------------- (> 1025px) ---------------------*/ +@media only screen and (min-width: 1025px) { + + body { + font-size: 16px; + line-height: 24px; + } + + #background { + width: 525px; + } + ul.sections > li > div.annotation { + max-width: 525px; + min-width: 525px; + padding: 10px 25px 1px 50px; + } + ul.sections > li > div.content { + padding: 9px 15px 16px 25px; + } +} + +/*---------------------- Syntax Highlighting -----------------------------*/ + +td.linenos { background-color: #f0f0f0; padding-right: 10px; } +span.lineno { background-color: #f0f0f0; padding: 0 5px 0 5px; } +/* + +github.com style (c) Vasily Polovnyov + +*/ + +pre code { + display: block; padding: 0.5em; + color: #000; + background: #f8f8ff +} + +pre .hljs-comment, +pre .hljs-template_comment, +pre .hljs-diff .hljs-header, +pre .hljs-javadoc { + color: #408080; + font-style: italic +} + +pre .hljs-keyword, +pre .hljs-assignment, +pre .hljs-literal, +pre .hljs-css .hljs-rule .hljs-keyword, +pre .hljs-winutils, +pre .hljs-javascript .hljs-title, +pre .hljs-lisp .hljs-title, +pre .hljs-subst { + color: #954121; + /*font-weight: bold*/ +} + +pre .hljs-number, +pre .hljs-hexcolor { + color: #40a070 +} + +pre .hljs-string, +pre .hljs-tag .hljs-value, +pre .hljs-phpdoc, +pre .hljs-tex .hljs-formula { + color: #219161; +} + +pre .hljs-title, +pre .hljs-id { + color: #19469D; +} +pre .hljs-params { + color: #00F; +} + +pre .hljs-javascript .hljs-title, +pre .hljs-lisp .hljs-title, +pre .hljs-subst { + font-weight: normal +} + +pre .hljs-class .hljs-title, +pre .hljs-haskell .hljs-label, +pre .hljs-tex .hljs-command { + color: #458; + font-weight: bold +} + +pre .hljs-tag, +pre .hljs-tag .hljs-title, +pre .hljs-rules .hljs-property, +pre .hljs-django .hljs-tag .hljs-keyword { + color: #000080; + font-weight: normal +} + +pre .hljs-attribute, +pre .hljs-variable, +pre .hljs-instancevar, +pre .hljs-lisp .hljs-body { + color: #008080 +} + +pre .hljs-regexp { + color: #B68 +} + +pre .hljs-class { + color: #458; + font-weight: bold +} + +pre .hljs-symbol, +pre .hljs-ruby .hljs-symbol .hljs-string, +pre .hljs-ruby .hljs-symbol .hljs-keyword, +pre .hljs-ruby .hljs-symbol .hljs-keymethods, +pre .hljs-lisp .hljs-keyword, +pre .hljs-tex .hljs-special, +pre .hljs-input_number { + color: #990073 +} + +pre .hljs-builtin, +pre .hljs-constructor, +pre .hljs-built_in, +pre .hljs-lisp .hljs-title { + color: #0086b3 +} + +pre .hljs-preprocessor, +pre .hljs-pi, +pre .hljs-doctype, +pre .hljs-shebang, +pre .hljs-cdata { + color: #999; + font-weight: bold +} + +pre .hljs-deletion { + background: #fdd +} + +pre .hljs-addition { + background: #dfd +} + +pre .hljs-diff .hljs-change { + background: #0086b3 +} + +pre .hljs-chunk { + color: #aaa +} + +pre .hljs-tex .hljs-formula { + opacity: 0.5; +} diff --git a/docs/src/consumer.html b/docs/src/consumer.html new file mode 100644 index 0000000..932ea57 --- /dev/null +++ b/docs/src/consumer.html @@ -0,0 +1,494 @@ + + + + + consumer.js + + + + + +
+
+ + + +
    + +
  • +
    +

    consumer.js

    +
    +
  • + + + +
  • +
    + +
    + +
    + +
    + +
    import { Socket } from 'net'
    +import path from 'path'
    +import btc from 'better-try-catch'
    +import JsonStream from './json-stream'
    + +
  • + + +
  • +
    + +
    + +
    +

    import logger from ‘../../uci-logger/src/logger’

    + +
    + +
    import logger from '@uci/logger'
    +
    +let log = {}
    + +
  • + + +
  • +
    + +
    + +
    +

    TODO change default pipe dir for windows and mac os

    + +
    + +
    const DEFAULT_PIPE_DIR = (process.env.SOCKETS_DIR || '/tmp/UCI')
    +const DEFAULT_SOCKET_NAME = 'uci-sock'
    +
    +export default class Consumer extends Socket {
    +  constructor (opts={}) {
    +    super()
    +    log = logger({file:'src/consumer.js',class:'Consumer',name:'socket',id:this.id})
    +    this.id = opts.id || opts.name || 'socket:'+ new Date().getTime()
    +    if (!opts.path) {
    +      log.warn({opts:opts},'no host supplied using localhost...use named piped instead')
    +      opts.host = opts.host || '127.0.0.1'
    +      opts.port = opts.port || 8080
    +    } else {
    +      if (typeof opts.path === 'boolean') opts.path = path.join(DEFAULT_PIPE_DIR,DEFAULT_SOCKET_NAME )
    +      if (path.dirname(opts.path)==='.') opts.path = path.join(DEFAULT_PIPE_DIR,opts.path )
    +    }
    +    this.opts=opts
    +    this.keepAlive = 'keepAlive' in opts ? opts.keepAlive : true
    +    this._ready = false
    +    this.timeout = opts.timeout || 30
    +    this.wait = opts.wait || 1
    +    this.stream = new JsonStream()
    + +
  • + + +
  • +
    + +
    + +
    +

    bind to class for other class functions

    + +
    + +
        this.connect = this.connect.bind(this)
    +    this.__ready = this.__ready.bind(this)
    + +
  • + + +
  • +
    + +
    + +
    +

    this.write = this.write.bind(this)

    + +
    + +
      }
    +
    +  async connect () {
    +
    +    return new Promise( (resolve,reject) => {
    +
    +      const connect = () => {
    +        if (this.opts.host ==='127.0.0.1') log.warn('tcp consumer on same machine as host, use named Pipe(Unix) Socket Instead')
    +        log.info({opts:this.opts},`attempting to connect ${this.id} to socket`)
    +        super.connect(this.opts)
    +      }
    +
    +      let reconnect = {}
    +      const timeout = setTimeout(() =>{
    +        clearTimeout(reconnect)
    +        log.fatal({opts:this.opts},`unable to connect in ${this.timeout}s`)
    +        reject({opts:this.opts},`unable to connect to socket server in ${this.timeout}secs`)
    +      }
    +        ,this.timeout*1000)
    +
    +      this.once('connect', async () => {
    +        clearTimeout(timeout)
    +        this._listen()
    +        log.info({opts:this.opts},'connected waiting for socket ready handshake')
    +        this.setKeepAlive(this.keepAlive,100)
    +        let [err, res] = await btc(isReady).bind(this)(this.__ready, this.wait, this.timeout)
    +        if (err) reject(err)
    +        log.info('handshake done, authenticating')
    + +
  • + + +
  • +
    + +
    + +
    +

    TODO authenticate here by encrypting a payload with private key and sending that. +await btc(authenticate)

    + +
    + +
            resolve(res)
    +      })
    +
    +      this.on('error', async (err) => {
    +        log.warn({error:err.code},`connect error ${err.code}`)
    +        if (err.code === 'EISCONN') {
    +          return resolve('ready')
    +        }
    +
    +        reconnect = setTimeout( () =>{ connect()
    +        },this.wait*1000 )
    +
    +      })
    +
    +      this.on('end', async () => {
    +        log.warn('socket (server) terminated unexpectantly')
    +        if (this.keepAlive) {
    +          log.info('keep alive was set, so waiting on server to come online for reconnect')
    +          this.destroy()
    +          this.emit('error', {code:'DISCONNECTED'})
    +        }
    +      })
    +
    +      connect()  // initial connect request
    +
    +    }) //end promise
    +
    +  }
    +
    +
    +  async send(ipacket) {
    +    return new Promise( async (resolve) => {
    + +
  • + + +
  • +
    + +
    + +
    +

    need this for when multiple sends for different consumers use same packet instance

    + +
    + +
          let packet = Object.assign({},ipacket)
    +      setTimeout(() => {resolve({error:'no response from socket in 10sec'})},10000)
    +      packet._header =
    +      { id:Math.random().toString().slice(2), // need this for when multiple sends for different consumers use same packet instanceack
    +        sender:{ name:this.name, instanceID:this.id },
    +        path: this.opts.path,
    +        port: this.opts.port,
    +        host: this.opts.host
    +      }
    +      let [err, res] = await btc(this.stream.serialize)(packet)
    +      if (err) resolve({error:'unable to serialize packet for sending', packet:packet})
    +      await this.__write(res)
    +      this.once(packet._header.id,async function(reply){
    +        let res = await this._packetProcess(reply)
    +        if (!res) {   // if process was not promise returning like just logged to console
    +          res = reply
    + +
  • + + +
  • +
    + +
    + +
    +

    log.warn(‘consumer function was not promise returning further processing may be out of sequence’)

    + +
    + +
            }
    +        resolve(res)
    +      }) //end listener
    +    })
    +  }
    + +
  • + + +
  • +
    + +
    + +
    +

    TODO register alt stream processor (emit ‘message’ with JSON, serialize function, onData method for raw socket chucks) +TODO register authenciation function (set up default)

    + +
    + +
    +  registerPacketProcessor (func) {
    +    this._packetProcess = func
    +  }
    + +
  • + + +
  • +
    + +
    + +
    +

    PRIVATE METHODS

    + +
    + +
    +  async __write(packet) {
    + +
  • + + +
  • +
    + +
    + +
    +

    timeout already set if sockect can’t be drained in 10 secs

    + +
    + +
        return new Promise(resolve => {
    +      const cb = () => resolve('packet written to consumer side socket stream ')
    +      if (!super.write(packet)) {
    +        this.once('drain',cb )
    +      } else {
    +        process.nextTick(cb)
    +      }
    +
    +    })
    +  }
    +
    +  __ready() {return this._ready}
    +
    +  async _listen () {
    +    log.info('listening for incoming packets from socket')
    + +
  • + + +
  • +
    + +
    + +
    +

    listen for pushed packets

    + +
    + +
        this.on('pushed',async function(packet){
    + +
  • + + +
  • +
    + +
    + +
    +

    TODO do some extra security here?

    + +
    + +
          let res = await this._packetProcess(packet)
    +      if (!res) {   // if process was not promise returning like just logged to console
    + +
  • + + +
  • +
    + +
    + +
    +

    log.warn(‘consumer function was not promise returning’)

    + +
    + +
          }
    +    })
    + +
  • + + +
  • +
    + +
    + +
    +

    listen on socket stream

    + +
    + +
        this.on('data', this.stream.onData)
    +    this.stream.on('message', messageProcess.bind(this))
    +
    +    async function messageProcess (packet) {
    + +
  • + + +
  • +
    + +
    + +
    +

    console.log(‘incoming packet from socket’,packet)

    + +
    + +
          if (packet._handshake) {
    +        this._ready = true
    +        return }
    + +
  • + + +
  • +
    + +
    + +
    +

    TODO send back ack with consumer ID and authorization and wait +when authorized drop through here to emit

    + +
    + +
          this.emit(packet._header.id, packet)
    +    }
    +  }
    + +
  • + + +
  • +
    + +
    + +
    +

    default packet process just a simple console logger. ignores any cmd: prop

    + +
    + +
      _packetProcess (packet) {
    +    console.log('default consumer processor -- log packet from socket to console')
    +    console.dir(packet)
    +  }
    +
    +}  // end class
    + +
  • + + +
  • +
    + +
    + +
    +

    HELP FUNCTIONS +wait until a passed ready function returns true

    + +
    + +
    function isReady(ready, wait=30, timeout=1000) {
    +  let time = 0
    +  return new Promise((resolve, reject) => {
    +    (function waitReady(){
    +      if (time > timeout) return reject(`timeout waiting for socket ready handshake - ${timeout}ms`)
    +      if (ready()) return resolve('ready')
    +      log.info(`waiting ${wait}ms for handshake`)
    +      time += wait
    +      setTimeout(waitReady, wait)
    +    })()
    +  })
    +}
    + +
  • + +
+
+ + diff --git a/docs/src/index.html b/docs/src/index.html new file mode 100644 index 0000000..8918163 --- /dev/null +++ b/docs/src/index.html @@ -0,0 +1,78 @@ + + + + + index.js + + + + + +
+
+ + + +
    + +
  • +
    +

    index.js

    +
    +
  • + + + +
  • +
    + +
    + +
    + +
    + +
    import Socket from './socket'
    +import Consumer from  './consumer'
    +
    +export { Socket as Socket }
    +export { Consumer as Consumer }
    +export default { Socket, Consumer }
    + +
  • + +
+
+ + diff --git a/docs/src/json-stream.html b/docs/src/json-stream.html new file mode 100644 index 0000000..35fe841 --- /dev/null +++ b/docs/src/json-stream.html @@ -0,0 +1,199 @@ + + + + + json-stream.js + + + + + +
+
+ + + +
    + +
  • +
    +

    json-stream.js

    +
    +
  • + + + +
  • + + +
    +import {StringDecoder} from 'string_decoder'
    +import EventEmitter from 'events'
    +import btc from 'better-try-catch'
    +
    +const decoder = new StringDecoder()
    +
    +export default class JsonStream extends EventEmitter{
    +  constructor(opts={}){
    +    super()
    +    this._contentLength = null
    +    this._buffer = ''
    +    this._delimeter = opts.delimiter || '#'
    +    this.onData = this.onData.bind(this)
    +    this.serialize = this.serialize.bind(this)
    +  }
    +
    +
    +  onData (data) {
    + +
  • + + +
  • +
    + +
    + +
    +

    console.log(‘a chunk arrived’, data)

    + +
    + +
        data = decoder.write(data)
    +    try {
    +      this._handleData(data)
    +    } catch (e) {
    +      this.emit('error', { error: e })
    +    }
    +  }
    +
    +  async serialize(message) {
    +    return new Promise( (resolve,reject) => {
    +      let [err,messageData] = btc(JSON.stringify)(message)
    +      if (err) reject(err)
    +      let [err2,length] = btc(Buffer.byteLength)(messageData, 'utf8')
    +      if (err2) reject(err2)
    +      let data = length + this._delimeter + messageData
    + +
  • + + +
  • +
    + +
    + +
    +

    console.log(‘serialized’,data)

    + +
    + +
          resolve(data)
    +    })
    +  }
    +
    +  _handleData (data) {
    +    this._buffer += data
    +    if (this._contentLength == null) {
    +      var i = this._buffer.indexOf(this._delimeter)
    + +
  • + + +
  • +
    + +
    + +
    +

    Check if the buffer has a this._opts.delimeter or “#”, if not, the end of the buffer string might be in the middle of a content length string

    + +
    + +
          if (i !== -1) {
    +        var rawContentLength = this._buffer.substring(0, i)
    +        this._contentLength = parseInt(rawContentLength)
    +        if (isNaN(this._contentLength)) {
    +          this._contentLength = null
    +          this._buffer = ''
    +          var err = new Error('Invalid content length supplied ('+rawContentLength+') in: '+this._buffer)
    +          err.code = 'E_INVALID_CONTENT_LENGTH'
    +          throw err
    +        }
    +        this._buffer = this._buffer.substring(i+1)
    +      }
    +    }
    +    if (this._contentLength != null) {
    +      var length = Buffer.byteLength(this._buffer, 'utf8')
    +      if (length == this._contentLength) {
    +        this._handleMessage(this._buffer)
    +      } else if (length > this._contentLength) {
    +        var message = this._buffer.substring(0, this._contentLength)
    +        var rest = this._buffer.substring(this._contentLength)
    +        this._handleMessage(message)
    +        this.onData(rest)
    +      }
    +    }
    +  }
    +
    +  _handleMessage (data) {
    +    this._contentLength = null
    +    this._buffer = ''
    +    var message
    +    try {
    +      message = JSON.parse(data)
    +    } catch (e) {
    +      var err = new Error('Could not parse JSON: '+e.message+'\nRequest data: '+data)
    +      err.code = 'E_INVALID_JSON'
    +      throw err
    +    }
    +    message = message || {}
    +    this.emit('message', message)
    +  }
    +
    +}
    + +
  • + +
+
+ + diff --git a/docs/src/socket.html b/docs/src/socket.html new file mode 100644 index 0000000..a5d91ed --- /dev/null +++ b/docs/src/socket.html @@ -0,0 +1,403 @@ + + + + + socket.js + + + + + +
+
+ + + +
    + +
  • +
    +

    socket.js

    +
    +
  • + + + +
  • +
    + +
    + +
    + +
    + +
    import { Server } from 'net'
    +import { unlink as fileDelete } from 'fs'
    +import { promisify } from 'util'
    +import path from 'path'
    +import mkdir from 'make-dir'
    +import btc from 'better-try-catch'
    +import _ON_DEATH from 'death' //this is intentionally ugly
    +import JSONStream from './json-stream'
    +import clone from 'clone'
    + +
  • + + +
  • +
    + +
    + +
    +

    import logger from ‘../../uci-logger/src/logger’

    + +
    + +
    import logger from '@uci/logger'
    +let log = {}
    + +
  • + + +
  • +
    + +
    + +
    +

    TODO change default pipe dir for windows and mac os

    + +
    + +
    const DEFAULT_PIPE_DIR = (process.env.SOCKETS_DIR || '/tmp/UCI')
    +const DEFAULT_SOCKET_NAME = 'uci-sock'
    +
    +export default class Socket extends Server {
    +  constructor (opts={}) {
    +    super()
    +    this.id = opts.id || opts.name || 'socket:'+ new Date().getTime()
    +    if (!opts.path) {
    +      opts.host = opts.host || '0.0.0.0'
    +      opts.port = opts.port || 8080
    +    } else {
    +      if (typeof opts.path === 'boolean') opts.path = path.join(DEFAULT_PIPE_DIR,DEFAULT_SOCKET_NAME )
    +      if (path.dirname(opts.path)==='.') opts.path = path.join(DEFAULT_PIPE_DIR,opts.path )
    +    }
    +    this.clientTracking = opts.clientTracking || true
    +    this.clients = []  // track consumers (i.e. clients)
    +    this.opts = opts  // for use to recover from selected errors
    + +
  • + + +
  • +
    + +
    + +
    +

    self bindings

    + +
    + +
        this._listen = this._listen.bind(this)
    +    this.create = this.create.bind(this)
    +    log = logger({file:'src/socket.js',class:'Socket',name:'socket',id:this.id})
    +  } // end constructor
    +
    +  async create () {
    +
    +    return new Promise( async (resolve,reject) => {
    + +
  • + + +
  • +
    + +
    + +
    +

    couple ways to kill socket process when needed

    + +
    + +
          _ON_DEATH( async () => {
    +        log.info('\nhe\'s dead jim')
    +        await this._destroy()
    +      })
    +      process.once('SIGUSR2', async () => {
    +        await this._destroy
    +        process.kill(process.pid, 'SIGUSR2')
    +      })
    +
    +      this.once('error', async (err) => {
    + +
  • + + +
  • +
    + +
    + +
    +

    recover from socket file that was not removed

    + +
    + +
            if (err.code === 'EADDRINUSE') {
    +          if (this.opts.path) { // if TCP socket should already be dead
    +            let [err, res] = await btc(promisify(fileDelete))(this.opts.path)
    +            if(!err) {
    +              log.info({res:res, socket: this.opts.path}, 'socket already exists.....deleted')
    +              return await this._listen(this.opts)
    +            }
    +            log.fatal({err:err},'error deleting socket.  Can not establish a socket')
    +            return err
    +          }
    +        }
    +        if (err.code ==='EACCES'){
    +          console.log({socket: this.opts.path}, 'directory does not exist...creating')
    +          await mkdir(path.dirname(this.opts.path))
    +          console.log({socket: this.opts.path}, 'created')
    +          log.warn({socket: this.opts.path}, 'directory does not exist...creating')
    +          return await this._listen(this.opts)
    +        }
    + +
  • + + +
  • +
    + +
    + +
    +

    otherwise fatally exit

    + +
    + +
            log.info(err, 'creating socket')
    +        reject(err)
    +      })
    +
    +      let [err, res] = await btc(this._listen)(this.opts)
    +      if (err) reject(err)
    +      resolve(res)
    +
    +    }) // end creeate promise
    +  } // end create
    +
    +  registerPacketProcessor (func) {
    +    this._packetProcess = func
    +  }
    +
    +  async _listen (opts) {
    +    super.listen(opts, async (err, res) => {
    +      if (err) return err
    + +
  • + + +
  • +
    + +
    + +
    +

    this gets called for each client connection and is unique to each

    + +
    + +
          this.on('connection', async (socket) => {
    +        const stream = new JSONStream()
    +        socket.stream = stream // need this to track clients
    +        let send = this._send.bind(socket)
    +        if (this.clientTracking) this.clients.push(socket)
    + +
  • + + +
  • +
    + +
    + +
    +

    TODO add ‘close’ listener to socket to remove from this.clients

    + +
    + +
            log.info('new consumer connecting')
    +        log.info(await send(await stream.serialize({'_handshake':true})))
    +        if (this.opts.conPacket) {
    +          this.opts.conPacket._header = { id:'pushed'}
    +          log.info({conPacket:this.opts.conPacket},'pushing a preset command to just connected consumer')
    +          send(await stream.serialize(this.opts.conPacket)) // send a packet command on to consumer on connection
    +        }
    +        socket.on('data', stream.onData)
    + +
  • + + +
  • +
    + +
    + +
    +

    TODO need to start error listener for stream so errors can be processed

    + +
    + +
            stream.on('message', messageProcess.bind(this,socket))
    +
    +        async function messageProcess (client, packet) {
    +          log.info({packet:packet},'incoming packet on socket side')
    +          let res = {}
    +          if (this.clientTracking && packet.clientID) {
    +            client.ID = packet.clientID
    +            res.cmd='ackID'
    +          }
    +          else {
    +            res = await this._packetProcess(clone(packet)) || {}
    +            if (Object.keys(res).length === 0) res = { error: 'socket packet command function likely did not return a promise', packet:packet}
    +          }
    +          if (packet) {
    +            res._header = clone(packet._header,false) || {} //make sure return packet has header with id in case it was removed in processing
    +            delete packet._header  // remove before adding to response header as request
    +          } else res._header = {}
    +          res._header.request = clone(packet,false)
    +          res._header.responder = {name:this.name,instanceID:this.id}
    +          res._header.socket = this.address()
    +          if (!res.cmd) res.cmd = 'reply'  // by default return command is 'reply'
    +          let [err, ser] = await btc(stream.serialize)(res)
    +          if (err) ser = await stream.serialize({ error: 'was not able to serialze the res packet', err:err, _header:{id:res._header.id}})
    +          log.info(await send(ser))
    +        } // end process message
    +
    +      }) // end connecttion consumer
    +      log.info({opts: this.opts},'socket created')
    +      return res
    +    }) // end super listen callback
    +
    +  } // end listen
    +
    +  async _destroy () {
    +    log.info('closing down socket')
    +    await this.close()
    +    log.info('all connections closed....exiting')
    +    process.exit()
    +  }
    + +
  • + + +
  • +
    + +
    + +
    +

    default packet process, just a simple echo

    + +
    + +
      async _packetProcess (packet) {
    +    return new Promise(resolve => {
    +      resolve(packet)
    +    })
    +  }
    + +
  • + + +
  • +
    + +
    + +
    +

    must have a consumer socket bound to use

    + +
    + +
      async _send(packet) {
    + +
  • + + +
  • +
    + +
    + +
    +

    timeout already set if sockect can’t be drained in 10 secs

    + +
    + +
        return new Promise(resolve => {
    +      const cb = () => resolve('packet written to socket stream')
    +      if (!this.write(packet)) {
    +        this.once('drain',cb )
    +      } else {
    +        process.nextTick(cb)
    +      }
    +    })
    +  }
    +
    +  async push (packet,id) {
    +    packet._header = { id:'pushed'}
    +    log.info({opts:this.opts,packet:packet},'pushing a packet to all connected consumers')
    +    this.clients.forEach(async (client) => {
    +      if (client.writable) {
    +        let [err, ser] = await btc(client.stream.serialize)(packet)
    +        if (err) ser = await client.stream.serialize({ error: 'was not able to serialze the res packet', err:err, _header:{id:packet._header.id}})
    +        if (!id || id ===client.ID ) await this._send.bind(client)(ser)
    +      }
    +    })
    +  }
    +
    +} // end class
    + +
  • + +
+
+ + diff --git a/package.json b/package.json index af1327e..e2352dc 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@uci/socket", - "version": "0.1.9", + "version": "0.2.1", "description": "JSON packet intra(named)/inter(TCP) host communication over socket", "main": "src", "scripts": { @@ -38,18 +38,18 @@ }, "homepage": "https://github.com/uCOMmandIt/message#readme", "devDependencies": { - "chai": "^4.1.2", + "chai": "^4.2.0", "chai-as-promised": "^7.1.1", - "codecov": "^3.0.2", - "esm": "^3.0.37", + "codecov": "^3.1.0", + "esm": "^3.0.84", "istanbul": "^0.4.5", "mocha": "^5.2.0", - "nodemon": "^1.17.5" + "nodemon": "^1.18.6" }, "dependencies": { - "@uci/logger": "0.0.3", + "@uci/logger": "0.0.6", "better-try-catch": "^0.6.2", - "clone": "^2.1.1", + "clone": "^2.1.2", "death": "^1.1.0", "make-dir": "^1.3.0" } diff --git a/src/consumer.js b/src/consumer.js index 51c8ae3..c6cc1f1 100644 --- a/src/consumer.js +++ b/src/consumer.js @@ -9,23 +9,44 @@ import logger from '@uci/logger' let log = {} // TODO change default pipe dir for windows and mac os -const DEFAULT_PIPE_DIR = (process.env.SOCKETS_DIR || '/tmp/UCI') +const DEFAULT_PIPE_DIR = process.env.SOCKETS_DIR || '/tmp/UCI' const DEFAULT_SOCKET_NAME = 'uci-sock' -export default class Consumer extends Socket { - constructor (opts={}) { +/** + * Socket Consumer - connects to UCI TCP or Named Pipe Sockets and coummunicates with uci packet.
+ * Extends {@link https://nodejs.org/api/net.html#net_class_net_socket | nodejs net.Socket} + * @extends Socket + */ +class SocketConsumer extends Socket { + /** + * constructor - Description + * + * @param {object} [opts={}] test + */ + + constructor(opts = {}) { super() - log = logger({file:'src/consumer.js',class:'Consumer',name:'socket',id:this.id}) - this.id = opts.id || opts.name || 'socket:'+ new Date().getTime() + log = logger({ + file: 'src/consumer.js', + class: 'Consumer', + name: 'socket', + id: this.id + }) + this.id = opts.id || opts.name || 'socket:' + new Date().getTime() if (!opts.path) { - log.warn({opts:opts},'no host supplied using localhost...use named piped instead') + log.warn( + { opts: opts }, + 'no host supplied using localhost...use named piped instead' + ) opts.host = opts.host || '127.0.0.1' opts.port = opts.port || 8080 } else { - if (typeof opts.path === 'boolean') opts.path = path.join(DEFAULT_PIPE_DIR,DEFAULT_SOCKET_NAME ) - if (path.dirname(opts.path)==='.') opts.path = path.join(DEFAULT_PIPE_DIR,opts.path ) + if (typeof opts.path === 'boolean') + opts.path = path.join(DEFAULT_PIPE_DIR, DEFAULT_SOCKET_NAME) + if (path.dirname(opts.path) === '.') + opts.path = path.join(DEFAULT_PIPE_DIR, opts.path) } - this.opts=opts + this.opts = opts this.keepAlive = 'keepAlive' in opts ? opts.keepAlive : true this._ready = false this.timeout = opts.timeout || 30 @@ -37,30 +58,43 @@ export default class Consumer extends Socket { // this._write = this._write.bind(this) } - async connect () { - - return new Promise( (resolve,reject) => { - + async connect() { + return new Promise((resolve, reject) => { const connect = () => { - if (this.opts.host ==='127.0.0.1') log.warn('tcp consumer on same machine as host, use named Pipe(Unix) Socket Instead') - log.info({opts:this.opts},`attempting to connect ${this.id} to socket`) + if (this.opts.host === '127.0.0.1') + log.warn( + 'tcp consumer on same machine as host, use named Pipe(Unix) Socket Instead' + ) + log.info( + { opts: this.opts }, + `attempting to connect ${this.id} to socket` + ) super.connect(this.opts) } let reconnect = {} - const timeout = setTimeout(() =>{ + const timeout = setTimeout(() => { clearTimeout(reconnect) - log.fatal({opts:this.opts},`unable to connect in ${this.timeout}s`) - reject({opts:this.opts},`unable to connect to socket server in ${this.timeout}secs`) - } - ,this.timeout*1000) + log.fatal({ opts: this.opts }, `unable to connect in ${this.timeout}s`) + reject( + { opts: this.opts }, + `unable to connect to socket server in ${this.timeout}secs` + ) + }, this.timeout * 1000) this.once('connect', async () => { clearTimeout(timeout) this._listen() - log.info({opts:this.opts},'connected waiting for socket ready handshake') - this.setKeepAlive(this.keepAlive,100) - let [err, res] = await btc(isReady).bind(this)(this.__ready, this.wait, this.timeout) + log.info( + { opts: this.opts }, + 'connected waiting for socket ready handshake' + ) + this.setKeepAlive(this.keepAlive, 100) + let [err, res] = await btc(isReady).bind(this)( + this.__ready, + this.wait, + this.timeout + ) if (err) reject(err) log.info('handshake done, authenticating') // TODO authenticate here by encrypting a payload with private key and sending that. @@ -68,51 +102,59 @@ export default class Consumer extends Socket { resolve(res) }) - this.on('error', async (err) => { - log.warn({error:err.code},`connect error ${err.code}`) + this.on('error', async err => { + log.warn({ error: err.code }, `connect error ${err.code}`) if (err.code === 'EISCONN') { return resolve('ready') } - reconnect = setTimeout( () =>{ connect() - },this.wait*1000 ) - + reconnect = setTimeout(() => { + connect() + }, this.wait * 1000) }) this.on('end', async () => { log.warn('socket (server) terminated unexpectantly') if (this.keepAlive) { - log.info('keep alive was set, so waiting on server to come online for reconnect') + log.info( + 'keep alive was set, so waiting on server to come online for reconnect' + ) this.destroy() - this.emit('error', {code:'DISCONNECTED'}) + this.emit('error', { code: 'DISCONNECTED' }) } }) - connect() // initial connect request - + connect() // initial connect request }) //end promise - } - async send(ipacket) { - return new Promise( async (resolve) => { + return new Promise(async resolve => { // need this for when multiple sends for different consumers use same packet instance - let packet = Object.assign({},ipacket) - setTimeout(() => {resolve({error:'no response from socket in 10sec'})},10000) - packet._header = - { id:Math.random().toString().slice(2), // need this for when multiple sends for different consumers use same packet instanceack - sender:{ name:this.name, instanceID:this.id }, + let packet = Object.assign({}, ipacket) + setTimeout(() => { + resolve({ error: 'no response from socket in 10sec' }) + }, 10000) + packet._header = { + id: Math.random() + .toString() + .slice(2), // need this for when multiple sends for different consumers use same packet instanceack + sender: { name: this.name, instanceID: this.id }, path: this.opts.path, port: this.opts.port, host: this.opts.host } let [err, res] = await btc(this.stream.serialize)(packet) - if (err) resolve({error:'unable to serialize packet for sending', packet:packet}) + if (err) + resolve({ + error: 'unable to serialize packet for sending', + packet: packet + }) await this.__write(res) - this.once(packet._header.id,async function(reply){ + this.once(packet._header.id, async function(reply) { let res = await this._packetProcess(reply) - if (!res) { // if process was not promise returning like just logged to console + if (!res) { + // if process was not promise returning like just logged to console res = reply // log.warn('consumer function was not promise returning further processing may be out of sequence') } @@ -124,7 +166,7 @@ export default class Consumer extends Socket { // TODO register alt stream processor (emit 'message' with JSON, serialize function, onData method for raw socket chucks) // TODO register authenciation function (set up default) - registerPacketProcessor (func) { + registerPacketProcessor(func) { this._packetProcess = func } @@ -135,23 +177,25 @@ export default class Consumer extends Socket { return new Promise(resolve => { const cb = () => resolve('packet written to consumer side socket stream ') if (!super.write(packet)) { - this.once('drain',cb ) + this.once('drain', cb) } else { process.nextTick(cb) } - }) } - __ready() {return this._ready} + __ready() { + return this._ready + } - async _listen () { + async _listen() { log.info('listening for incoming packets from socket') // listen for pushed packets - this.on('pushed',async function(packet){ + this.on('pushed', async function(packet) { // TODO do some extra security here? let res = await this._packetProcess(packet) - if (!res) { // if process was not promise returning like just logged to console + if (!res) { + // if process was not promise returning like just logged to console // log.warn('consumer function was not promise returning') } }) @@ -159,11 +203,12 @@ export default class Consumer extends Socket { this.on('data', this.stream.onData) this.stream.on('message', messageProcess.bind(this)) - async function messageProcess (packet) { + async function messageProcess(packet) { // console.log('incoming packet from socket',packet) if (packet._handshake) { this._ready = true - return } + return + } // TODO send back ack with consumer ID and authorization and wait // when authorized drop through here to emit this.emit(packet._header.id, packet) @@ -171,20 +216,26 @@ export default class Consumer extends Socket { } // default packet process just a simple console logger. ignores any cmd: prop - _packetProcess (packet) { - console.log('default consumer processor -- log packet from socket to console') + _packetProcess(packet) { + console.log( + 'default consumer processor -- log packet from socket to console' + ) console.dir(packet) } +} // end class -} // end class +export default SocketConsumer -// HELP FUNCTIONS +// Helper Functions // wait until a passed ready function returns true -function isReady(ready, wait=30, timeout=1000) { +function isReady(ready, wait = 30, timeout = 1000) { let time = 0 return new Promise((resolve, reject) => { - (function waitReady(){ - if (time > timeout) return reject(`timeout waiting for socket ready handshake - ${timeout}ms`) + (function waitReady() { + if (time > timeout) + return reject( + `timeout waiting for socket ready handshake - ${timeout}ms` + ) if (ready()) return resolve('ready') log.info(`waiting ${wait}ms for handshake`) time += wait diff --git a/src/json-stream.js b/src/json-stream.js index 8357da8..74c7d92 100644 --- a/src/json-stream.js +++ b/src/json-stream.js @@ -1,13 +1,17 @@ // adpated from https://github.com/sebastianseilund/node-json-socket -import {StringDecoder} from 'string_decoder' +import { StringDecoder } from 'string_decoder' import EventEmitter from 'events' import btc from 'better-try-catch' const decoder = new StringDecoder() -export default class JsonStream extends EventEmitter{ - constructor(opts={}){ +/** + * JsonStream - Description + * @extends EventEmitter + */ +class JsonStream extends EventEmitter { + constructor(opts = {}) { super() this._contentLength = null this._buffer = '' @@ -16,8 +20,7 @@ export default class JsonStream extends EventEmitter{ this.serialize = this.serialize.bind(this) } - - onData (data) { + onData(data) { // console.log('a chunk arrived', data) data = decoder.write(data) try { @@ -28,10 +31,10 @@ export default class JsonStream extends EventEmitter{ } async serialize(message) { - return new Promise( (resolve,reject) => { - let [err,messageData] = btc(JSON.stringify)(message) + return new Promise((resolve, reject) => { + let [err, messageData] = btc(JSON.stringify)(message) if (err) reject(err) - let [err2,length] = btc(Buffer.byteLength)(messageData, 'utf8') + let [err2, length] = btc(Buffer.byteLength)(messageData, 'utf8') if (err2) reject(err2) let data = length + this._delimeter + messageData // console.log('serialized',data) @@ -39,7 +42,7 @@ export default class JsonStream extends EventEmitter{ }) } - _handleData (data) { + _handleData(data) { this._buffer += data if (this._contentLength == null) { var i = this._buffer.indexOf(this._delimeter) @@ -50,11 +53,16 @@ export default class JsonStream extends EventEmitter{ if (isNaN(this._contentLength)) { this._contentLength = null this._buffer = '' - var err = new Error('Invalid content length supplied ('+rawContentLength+') in: '+this._buffer) + var err = new Error( + 'Invalid content length supplied (' + + rawContentLength + + ') in: ' + + this._buffer + ) err.code = 'E_INVALID_CONTENT_LENGTH' throw err } - this._buffer = this._buffer.substring(i+1) + this._buffer = this._buffer.substring(i + 1) } } if (this._contentLength != null) { @@ -70,19 +78,22 @@ export default class JsonStream extends EventEmitter{ } } - _handleMessage (data) { + _handleMessage(data) { this._contentLength = null this._buffer = '' var message try { message = JSON.parse(data) } catch (e) { - var err = new Error('Could not parse JSON: '+e.message+'\nRequest data: '+data) + var err = new Error( + 'Could not parse JSON: ' + e.message + '\nRequest data: ' + data + ) err.code = 'E_INVALID_JSON' throw err } message = message || {} this.emit('message', message) } - } + +export default JsonStream diff --git a/src/socket.js b/src/socket.js index f6bac98..5e7d5e1 100644 --- a/src/socket.js +++ b/src/socket.js @@ -1,46 +1,81 @@ +// node modules import { Server } from 'net' import { unlink as fileDelete } from 'fs' import { promisify } from 'util' import path from 'path' +// npmjs modules import mkdir from 'make-dir' import btc from 'better-try-catch' import _ON_DEATH from 'death' //this is intentionally ugly import JSONStream from './json-stream' import clone from 'clone' - -// import logger from '../../uci-logger/src/logger' +// uci modules import logger from '@uci/logger' -let log = {} +let log = {} // must declare here and set later for module wide access -// TODO change default pipe dir for windows and mac os -const DEFAULT_PIPE_DIR = (process.env.SOCKETS_DIR || '/tmp/UCI') +// TODO change default pipe dir depending on OS linux,windows,mac +/** @constant {String} DEFAULT_PIPE_DIR + * @description SOCKETS_DIR environment variable or '/tmp/UCI' + */ +const DEFAULT_PIPE_DIR = process.env.SOCKETS_DIR || '/tmp/UCI' +/** @constant {String} DEFAULT_SOCKET_NAME + * @description for named pipe 'uci-sock' if not set in options */ const DEFAULT_SOCKET_NAME = 'uci-sock' -export default class Socket extends Server { - constructor (opts={}) { +/** + * UCI Socket - class used to create a socket (server) that supports passing json packets + * supports both named pipes and tcp sockets + * also supports push of packets to all connected consumers (clients) + * is extended from {@link https://nodejs.org/api/net.html#net_class_net_server | nodejs net.Server } + * @extends Server + */ + +class Socket extends Server { + /** + * UCI Socket class constructor + * @param {Object} opts hash of options + * @param {String} options.host a tcp host name nornally not used as 0.0.0.0 is set by default + * @param {String} options.port a tcp + * @param {String | Boolean} options.path xeither full path to where socket should be created or if just 'true' then use default + * @param {Boolean} options.clientTracking track connected clients for push notifications - default: true + * @param {Object} options.conPacket A json operson's property + * + */ + constructor(opts = {}) { super() - this.id = opts.id || opts.name || 'socket:'+ new Date().getTime() + this.id = opts.id || opts.name || 'socket:' + new Date().getTime() if (!opts.path) { opts.host = opts.host || '0.0.0.0' opts.port = opts.port || 8080 } else { - if (typeof opts.path === 'boolean') opts.path = path.join(DEFAULT_PIPE_DIR,DEFAULT_SOCKET_NAME ) - if (path.dirname(opts.path)==='.') opts.path = path.join(DEFAULT_PIPE_DIR,opts.path ) + if (typeof opts.path === 'boolean') + opts.path = path.join(DEFAULT_PIPE_DIR, DEFAULT_SOCKET_NAME) + if (path.dirname(opts.path) === '.') + opts.path = path.join(DEFAULT_PIPE_DIR, opts.path) } this.clientTracking = opts.clientTracking || true - this.clients = [] // track consumers (i.e. clients) - this.opts = opts // for use to recover from selected errors + this.clients = [] // track consumers (i.e. clients) + this.opts = opts // for use to recover from selected errors //self bindings this._listen = this._listen.bind(this) this.create = this.create.bind(this) - log = logger({file:'src/socket.js',class:'Socket',name:'socket',id:this.id}) + log = logger({ + file: 'src/socket.js', + class: 'Socket', + name: 'socket', + id: this.id + }) } // end constructor - async create () { - - return new Promise( async (resolve,reject) => { - // couple ways to kill socket process when needed - _ON_DEATH( async () => { + /** + * create - Description + * + * @returns {type} Description + */ + async create() { + return new Promise(async (resolve, reject) => { + // set up a couple ways to gracefully destroy socket process is killed/aborted + _ON_DEATH(async () => { log.info('\nhe\'s dead jim') await this._destroy() }) @@ -49,24 +84,37 @@ export default class Socket extends Server { process.kill(process.pid, 'SIGUSR2') }) - this.once('error', async (err) => { + this.once('error', async err => { // recover from socket file that was not removed if (err.code === 'EADDRINUSE') { - if (this.opts.path) { // if TCP socket should already be dead + if (this.opts.path) { + // if TCP socket should already be dead let [err, res] = await btc(promisify(fileDelete))(this.opts.path) - if(!err) { - log.info({res:res, socket: this.opts.path}, 'socket already exists.....deleted') + if (!err) { + log.info( + { res: res, socket: this.opts.path }, + 'socket already exists.....deleted' + ) return await this._listen(this.opts) } - log.fatal({err:err},'error deleting socket. Can not establish a socket') + log.fatal( + { err: err }, + 'error deleting socket. Can not establish a socket' + ) return err } } - if (err.code ==='EACCES'){ - console.log({socket: this.opts.path}, 'directory does not exist...creating') + if (err.code === 'EACCES') { + console.log( + { socket: this.opts.path }, + 'directory does not exist...creating' + ) await mkdir(path.dirname(this.opts.path)) - console.log({socket: this.opts.path}, 'created') - log.warn({socket: this.opts.path}, 'directory does not exist...creating') + console.log({ socket: this.opts.path }, 'created') + log.warn( + { socket: this.opts.path }, + 'directory does not exist...creating' + ) return await this._listen(this.opts) } // otherwise fatally exit @@ -77,67 +125,109 @@ export default class Socket extends Server { let [err, res] = await btc(this._listen)(this.opts) if (err) reject(err) resolve(res) - }) // end creeate promise } // end create - registerPacketProcessor (func) { + /** + * registerPacketProcessor - Description + * @public + * @param {func} Description + * + */ + registerPacketProcessor(func) { this._packetProcess = func } - async _listen (opts) { + /** + * push - pushes a supplied UCI object packet to all connected clients + * + * @param {object} packet Description + * @param {string} id the header id string of the pushed packet, default: 'pushed' + * + */ + async push(packet, id) { + packet._header = { id: id || 'pushed' } + log.info( + { opts: this.opts, packet: packet }, + 'pushing a packet to all connected consumers' + ) + this.clients.forEach(async client => { + if (client.writable) { + let [err, ser] = await btc(client.stream.serialize)(packet) + if (err) + ser = await client.stream.serialize({ + error: 'was not able to serialze the res packet', + err: err, + _header: { id: packet._header.id } + }) + if (!id || id === client.ID) await this._send.bind(client)(ser) + } + }) + } + + async _listen(opts) { super.listen(opts, async (err, res) => { if (err) return err // this gets called for each client connection and is unique to each - this.on('connection', async (socket) => { + this.on('connection', async socket => { const stream = new JSONStream() socket.stream = stream // need this to track clients let send = this._send.bind(socket) if (this.clientTracking) this.clients.push(socket) // TODO add 'close' listener to socket to remove from this.clients log.info('new consumer connecting') - log.info(await send(await stream.serialize({'_handshake':true}))) + log.info(await send(await stream.serialize({ _handshake: true }))) if (this.opts.conPacket) { - this.opts.conPacket._header = { id:'pushed'} - log.info({conPacket:this.opts.conPacket},'pushing a preset command to just connected consumer') + this.opts.conPacket._header = { id: 'pushed' } + log.info( + { conPacket: this.opts.conPacket }, + 'pushing a preset command to just connected consumer' + ) send(await stream.serialize(this.opts.conPacket)) // send a packet command on to consumer on connection } socket.on('data', stream.onData) // TODO need to start error listener for stream so errors can be processed - stream.on('message', messageProcess.bind(this,socket)) + stream.on('message', messageProcess.bind(this, socket)) - async function messageProcess (client, packet) { - log.info({packet:packet},'incoming packet on socket side') + async function messageProcess(client, packet) { + log.info({ packet: packet }, 'incoming packet on socket side') let res = {} if (this.clientTracking && packet.clientID) { client.ID = packet.clientID - res.cmd='ackID' - } - else { - res = await this._packetProcess(clone(packet)) || {} - if (Object.keys(res).length === 0) res = { error: 'socket packet command function likely did not return a promise', packet:packet} + res.cmd = 'ackID' + } else { + res = (await this._packetProcess(clone(packet))) || {} + if (Object.keys(res).length === 0) + res = { + error: + 'socket packet command function likely did not return a promise', + packet: packet + } } if (packet) { - res._header = clone(packet._header,false) || {} //make sure return packet has header with id in case it was removed in processing - delete packet._header // remove before adding to response header as request + res._header = clone(packet._header, false) || {} //make sure return packet has header with id in case it was removed in processing + delete packet._header // remove before adding to response header as request } else res._header = {} - res._header.request = clone(packet,false) - res._header.responder = {name:this.name,instanceID:this.id} + res._header.request = clone(packet, false) + res._header.responder = { name: this.name, instanceID: this.id } res._header.socket = this.address() - if (!res.cmd) res.cmd = 'reply' // by default return command is 'reply' + if (!res.cmd) res.cmd = 'reply' // by default return command is 'reply' let [err, ser] = await btc(stream.serialize)(res) - if (err) ser = await stream.serialize({ error: 'was not able to serialze the res packet', err:err, _header:{id:res._header.id}}) + if (err) + ser = await stream.serialize({ + error: 'was not able to serialze the res packet', + err: err, + _header: { id: res._header.id } + }) log.info(await send(ser)) } // end process message - }) // end connecttion consumer - log.info({opts: this.opts},'socket created') + log.info({ opts: this.opts }, 'socket created') return res }) // end super listen callback - } // end listen - async _destroy () { + async _destroy() { log.info('closing down socket') await this.close() log.info('all connections closed....exiting') @@ -145,7 +235,7 @@ export default class Socket extends Server { } // default packet process, just a simple echo - async _packetProcess (packet) { + async _packetProcess(packet) { return new Promise(resolve => { resolve(packet) }) @@ -157,23 +247,12 @@ export default class Socket extends Server { return new Promise(resolve => { const cb = () => resolve('packet written to socket stream') if (!this.write(packet)) { - this.once('drain',cb ) + this.once('drain', cb) } else { process.nextTick(cb) } }) } - - async push (packet,id) { - packet._header = { id:'pushed'} - log.info({opts:this.opts,packet:packet},'pushing a packet to all connected consumers') - this.clients.forEach(async (client) => { - if (client.writable) { - let [err, ser] = await btc(client.stream.serialize)(packet) - if (err) ser = await client.stream.serialize({ error: 'was not able to serialze the res packet', err:err, _header:{id:packet._header.id}}) - if (!id || id ===client.ID ) await this._send.bind(client)(ser) - } - }) - } - } // end class + +export default Socket