From 5f28baaa74ce0e186d7bdf0da47c50a2f9400e3f Mon Sep 17 00:00:00 2001 From: David Kebler Date: Sun, 8 Sep 2019 19:49:05 -0700 Subject: [PATCH] 0.2.20 emit status events in consumer refactor listen method into a handler --- docs/docco.css | 518 -------------------------- docs/public/stylesheets/normalize.css | 375 ------------------- docs/src/consumer.html | 494 ------------------------ docs/src/index.html | 78 ---- docs/src/json-stream.html | 199 ---------- docs/src/socket.html | 403 -------------------- examples/client.js | 10 +- examples/opts.yaml | 0 examples/server.js | 37 +- package.json | 2 +- src/consumer.js | 21 +- src/socket-class.js | 162 ++++---- 12 files changed, 113 insertions(+), 2186 deletions(-) delete mode 100644 docs/docco.css delete mode 100644 docs/public/stylesheets/normalize.css delete mode 100644 docs/src/consumer.html delete mode 100644 docs/src/index.html delete mode 100644 docs/src/json-stream.html delete mode 100644 docs/src/socket.html create mode 100644 examples/opts.yaml diff --git a/docs/docco.css b/docs/docco.css deleted file mode 100644 index b60f6fa..0000000 --- a/docs/docco.css +++ /dev/null @@ -1,518 +0,0 @@ -/*--------------------- Typography ----------------------------*/ - -@font-face { - font-family: 'aller-light'; - src: url('public/fonts/aller-light.eot'); - src: url('public/fonts/aller-light.eot?#iefix') format('embedded-opentype'), - url('public/fonts/aller-light.woff') format('woff'), - url('public/fonts/aller-light.ttf') format('truetype'); - font-weight: normal; - font-style: normal; -} - -@font-face { - font-family: 'aller-bold'; - src: url('public/fonts/aller-bold.eot'); - src: url('public/fonts/aller-bold.eot?#iefix') format('embedded-opentype'), - url('public/fonts/aller-bold.woff') format('woff'), - url('public/fonts/aller-bold.ttf') format('truetype'); - font-weight: normal; - font-style: normal; -} - -@font-face { - font-family: 'roboto-black'; - src: url('public/fonts/roboto-black.eot'); - src: url('public/fonts/roboto-black.eot?#iefix') format('embedded-opentype'), - url('public/fonts/roboto-black.woff') format('woff'), - url('public/fonts/roboto-black.ttf') format('truetype'); - font-weight: normal; - font-style: normal; -} - -/*--------------------- Layout ----------------------------*/ -html { height: 100%; } -body { - font-family: "aller-light"; - font-size: 14px; - line-height: 18px; - color: #30404f; - margin: 0; padding: 0; - height:100%; -} -#container { min-height: 100%; } - -a { - color: #000; -} - -b, strong { - font-weight: normal; - font-family: "aller-bold"; -} - -p { - margin: 15px 0 0px; -} - .annotation ul, .annotation ol { - margin: 25px 0; - } - .annotation ul li, .annotation ol li { - font-size: 14px; - line-height: 18px; - margin: 10px 0; - } - -h1, h2, h3, h4, h5, h6 { - color: #112233; - line-height: 1em; - font-weight: normal; - font-family: "roboto-black"; - text-transform: uppercase; - margin: 30px 0 15px 0; -} - -h1 { - margin-top: 40px; -} -h2 { - font-size: 1.26em; -} - -hr { - border: 0; - background: 1px #ddd; - height: 1px; - margin: 20px 0; -} - -pre, tt, code { - font-size: 12px; line-height: 16px; - font-family: Menlo, Monaco, Consolas, "Lucida Console", monospace; - margin: 0; padding: 0; -} - .annotation pre { - display: block; - margin: 0; - padding: 7px 10px; - background: #fcfcfc; - -moz-box-shadow: inset 0 0 10px rgba(0,0,0,0.1); - -webkit-box-shadow: inset 0 0 10px rgba(0,0,0,0.1); - box-shadow: inset 0 0 10px rgba(0,0,0,0.1); - overflow-x: auto; - } - .annotation pre code { - border: 0; - padding: 0; - background: transparent; - } - - -blockquote { - border-left: 5px solid #ccc; - margin: 0; - padding: 1px 0 1px 1em; -} - .sections blockquote p { - font-family: Menlo, Consolas, Monaco, monospace; - font-size: 12px; line-height: 16px; - color: #999; - margin: 10px 0 0; - white-space: pre-wrap; - } - -ul.sections { - list-style: none; - padding:0 0 5px 0;; - margin:0; -} - -/* - Force border-box so that % widths fit the parent - container without overlap because of margin/padding. - - More Info : http://www.quirksmode.org/css/box.html -*/ -ul.sections > li > div { - -moz-box-sizing: border-box; /* firefox */ - -ms-box-sizing: border-box; /* ie */ - -webkit-box-sizing: border-box; /* webkit */ - -khtml-box-sizing: border-box; /* konqueror */ - box-sizing: border-box; /* css3 */ -} - - -/*---------------------- Jump Page -----------------------------*/ -#jump_to, #jump_page { - margin: 0; - background: white; - -webkit-box-shadow: 0 0 25px #777; -moz-box-shadow: 0 0 25px #777; - -webkit-border-bottom-left-radius: 5px; -moz-border-radius-bottomleft: 5px; - font: 16px Arial; - cursor: pointer; - text-align: right; - list-style: none; -} - -#jump_to a { - text-decoration: none; -} - -#jump_to a.large { - display: none; -} -#jump_to a.small { - font-size: 22px; - font-weight: bold; - color: #676767; -} - -#jump_to, #jump_wrapper { - position: fixed; - right: 0; top: 0; - padding: 10px 15px; - margin:0; -} - -#jump_wrapper { - display: none; - padding:0; -} - -#jump_to:hover #jump_wrapper { - display: block; -} - -#jump_page_wrapper{ - position: fixed; - right: 0; - top: 0; - bottom: 0; -} - -#jump_page { - padding: 5px 0 3px; - margin: 0 0 25px 25px; - max-height: 100%; - overflow: auto; -} - -#jump_page .source { - display: block; - padding: 15px; - text-decoration: none; - border-top: 1px solid #eee; -} - -#jump_page .source:hover { - background: #f5f5ff; -} - -#jump_page .source:first-child { -} - -/*---------------------- Low resolutions (> 320px) ---------------------*/ -@media only screen and (min-width: 320px) { - .pilwrap { display: none; } - - ul.sections > li > div { - display: block; - padding:5px 10px 0 10px; - } - - ul.sections > li > div.annotation ul, ul.sections > li > div.annotation ol { - padding-left: 30px; - } - - ul.sections > li > div.content { - overflow-x:auto; - -webkit-box-shadow: inset 0 0 5px #e5e5ee; - box-shadow: inset 0 0 5px #e5e5ee; - border: 1px solid #dedede; - margin:5px 10px 5px 10px; - padding-bottom: 5px; - } - - ul.sections > li > div.annotation pre { - margin: 7px 0 7px; - padding-left: 15px; - } - - ul.sections > li > div.annotation p tt, .annotation code { - background: #f8f8ff; - border: 1px solid #dedede; - font-size: 12px; - padding: 0 0.2em; - } -} - -/*---------------------- (> 481px) ---------------------*/ -@media only screen and (min-width: 481px) { - #container { - position: relative; - } - body { - background-color: #F5F5FF; - font-size: 15px; - line-height: 21px; - } - pre, tt, code { - line-height: 18px; - } - p, ul, ol { - margin: 0 0 15px; - } - - - #jump_to { - padding: 5px 10px; - } - #jump_wrapper { - padding: 0; - } - #jump_to, #jump_page { - font: 10px Arial; - text-transform: uppercase; - } - #jump_page .source { - padding: 5px 10px; - } - #jump_to a.large { - display: inline-block; - } - #jump_to a.small { - display: none; - } - - - - #background { - position: absolute; - top: 0; bottom: 0; - width: 350px; - background: #fff; - border-right: 1px solid #e5e5ee; - z-index: -1; - } - - ul.sections > li > div.annotation ul, ul.sections > li > div.annotation ol { - padding-left: 40px; - } - - ul.sections > li { - white-space: nowrap; - } - - ul.sections > li > div { - display: inline-block; - } - - ul.sections > li > div.annotation { - max-width: 350px; - min-width: 350px; - min-height: 5px; - padding: 13px; - overflow-x: hidden; - white-space: normal; - vertical-align: top; - text-align: left; - } - ul.sections > li > div.annotation pre { - margin: 15px 0 15px; - padding-left: 15px; - } - - ul.sections > li > div.content { - padding: 13px; - vertical-align: top; - border: none; - -webkit-box-shadow: none; - box-shadow: none; - } - - .pilwrap { - position: relative; - display: inline; - } - - .pilcrow { - font: 12px Arial; - text-decoration: none; - color: #454545; - position: absolute; - top: 3px; left: -20px; - padding: 1px 2px; - opacity: 0; - -webkit-transition: opacity 0.2s linear; - } - .for-h1 .pilcrow { - top: 47px; - } - .for-h2 .pilcrow, .for-h3 .pilcrow, .for-h4 .pilcrow { - top: 35px; - } - - ul.sections > li > div.annotation:hover .pilcrow { - opacity: 1; - } -} - -/*---------------------- (> 1025px) ---------------------*/ -@media only screen and (min-width: 1025px) { - - body { - font-size: 16px; - line-height: 24px; - } - - #background { - width: 525px; - } - ul.sections > li > div.annotation { - max-width: 525px; - min-width: 525px; - padding: 10px 25px 1px 50px; - } - ul.sections > li > div.content { - padding: 9px 15px 16px 25px; - } -} - -/*---------------------- Syntax Highlighting -----------------------------*/ - -td.linenos { background-color: #f0f0f0; padding-right: 10px; } -span.lineno { background-color: #f0f0f0; padding: 0 5px 0 5px; } -/* - -github.com style (c) Vasily Polovnyov - -*/ - -pre code { - display: block; padding: 0.5em; - color: #000; - background: #f8f8ff -} - -pre .hljs-comment, -pre .hljs-template_comment, -pre .hljs-diff .hljs-header, -pre .hljs-javadoc { - color: #408080; - font-style: italic -} - -pre .hljs-keyword, -pre .hljs-assignment, -pre .hljs-literal, -pre .hljs-css .hljs-rule .hljs-keyword, -pre .hljs-winutils, -pre .hljs-javascript .hljs-title, -pre .hljs-lisp .hljs-title, -pre .hljs-subst { - color: #954121; - /*font-weight: bold*/ -} - -pre .hljs-number, -pre .hljs-hexcolor { - color: #40a070 -} - -pre .hljs-string, -pre .hljs-tag .hljs-value, -pre .hljs-phpdoc, -pre .hljs-tex .hljs-formula { - color: #219161; -} - -pre .hljs-title, -pre .hljs-id { - color: #19469D; -} -pre .hljs-params { - color: #00F; -} - -pre .hljs-javascript .hljs-title, -pre .hljs-lisp .hljs-title, -pre .hljs-subst { - font-weight: normal -} - -pre .hljs-class .hljs-title, -pre .hljs-haskell .hljs-label, -pre .hljs-tex .hljs-command { - color: #458; - font-weight: bold -} - -pre .hljs-tag, -pre .hljs-tag .hljs-title, -pre .hljs-rules .hljs-property, -pre .hljs-django .hljs-tag .hljs-keyword { - color: #000080; - font-weight: normal -} - -pre .hljs-attribute, -pre .hljs-variable, -pre .hljs-instancevar, -pre .hljs-lisp .hljs-body { - color: #008080 -} - -pre .hljs-regexp { - color: #B68 -} - -pre .hljs-class { - color: #458; - font-weight: bold -} - -pre .hljs-symbol, -pre .hljs-ruby .hljs-symbol .hljs-string, -pre .hljs-ruby .hljs-symbol .hljs-keyword, -pre .hljs-ruby .hljs-symbol .hljs-keymethods, -pre .hljs-lisp .hljs-keyword, -pre .hljs-tex .hljs-special, -pre .hljs-input_number { - color: #990073 -} - -pre .hljs-builtin, -pre .hljs-constructor, -pre .hljs-built_in, -pre .hljs-lisp .hljs-title { - color: #0086b3 -} - -pre .hljs-preprocessor, -pre .hljs-pi, -pre .hljs-doctype, -pre .hljs-shebang, -pre .hljs-cdata { - color: #999; - font-weight: bold -} - -pre .hljs-deletion { - background: #fdd -} - -pre .hljs-addition { - background: #dfd -} - -pre .hljs-diff .hljs-change { - background: #0086b3 -} - -pre .hljs-chunk { - color: #aaa -} - -pre .hljs-tex .hljs-formula { - opacity: 0.5; -} diff --git a/docs/public/stylesheets/normalize.css b/docs/public/stylesheets/normalize.css deleted file mode 100644 index 73abb76..0000000 --- a/docs/public/stylesheets/normalize.css +++ /dev/null @@ -1,375 +0,0 @@ -/*! normalize.css v2.0.1 | MIT License | git.io/normalize */ - -/* ========================================================================== - HTML5 display definitions - ========================================================================== */ - -/* - * Corrects `block` display not defined in IE 8/9. - */ - -article, -aside, -details, -figcaption, -figure, -footer, -header, -hgroup, -nav, -section, -summary { - display: block; -} - -/* - * Corrects `inline-block` display not defined in IE 8/9. - */ - -audio, -canvas, -video { - display: inline-block; -} - -/* - * Prevents modern browsers from displaying `audio` without controls. - * Remove excess height in iOS 5 devices. - */ - -audio:not([controls]) { - display: none; - height: 0; -} - -/* - * Addresses styling for `hidden` attribute not present in IE 8/9. - */ - -[hidden] { - display: none; -} - -/* ========================================================================== - Base - ========================================================================== */ - -/* - * 1. Sets default font family to sans-serif. - * 2. Prevents iOS text size adjust after orientation change, without disabling - * user zoom. - */ - -html { - font-family: sans-serif; /* 1 */ - -webkit-text-size-adjust: 100%; /* 2 */ - -ms-text-size-adjust: 100%; /* 2 */ -} - -/* - * Removes default margin. - */ - -body { - margin: 0; -} - -/* ========================================================================== - Links - ========================================================================== */ - -/* - * Addresses `outline` inconsistency between Chrome and other browsers. - */ - -a:focus { - outline: thin dotted; -} - -/* - * Improves readability when focused and also mouse hovered in all browsers. - */ - -a:active, -a:hover { - outline: 0; -} - -/* ========================================================================== - Typography - ========================================================================== */ - -/* - * Addresses `h1` font sizes within `section` and `article` in Firefox 4+, - * Safari 5, and Chrome. - */ - -h1 { - font-size: 2em; -} - -/* - * Addresses styling not present in IE 8/9, Safari 5, and Chrome. - */ - -abbr[title] { - border-bottom: 1px dotted; -} - -/* - * Addresses style set to `bolder` in Firefox 4+, Safari 5, and Chrome. - */ - -b, -strong { - font-weight: bold; -} - -/* - * Addresses styling not present in Safari 5 and Chrome. - */ - -dfn { - font-style: italic; -} - -/* - * Addresses styling not present in IE 8/9. - */ - -mark { - background: #ff0; - color: #000; -} - - -/* - * Corrects font family set oddly in Safari 5 and Chrome. - */ - -code, -kbd, -pre, -samp { - font-family: monospace, serif; - font-size: 1em; -} - -/* - * Improves readability of pre-formatted text in all browsers. - */ - -pre { - white-space: pre; - white-space: pre-wrap; - word-wrap: break-word; -} - -/* - * Sets consistent quote types. - */ - -q { - quotes: "\201C" "\201D" "\2018" "\2019"; -} - -/* - * Addresses inconsistent and variable font size in all browsers. - */ - -small { - font-size: 80%; -} - -/* - * Prevents `sub` and `sup` affecting `line-height` in all browsers. - */ - -sub, -sup { - font-size: 75%; - line-height: 0; - position: relative; - vertical-align: baseline; -} - -sup { - top: -0.5em; -} - -sub { - bottom: -0.25em; -} - -/* ========================================================================== - Embedded content - ========================================================================== */ - -/* - * Removes border when inside `a` element in IE 8/9. - */ - -img { - border: 0; -} - -/* - * Corrects overflow displayed oddly in IE 9. - */ - -svg:not(:root) { - overflow: hidden; -} - -/* ========================================================================== - Figures - ========================================================================== */ - -/* - * Addresses margin not present in IE 8/9 and Safari 5. - */ - -figure { - margin: 0; -} - -/* ========================================================================== - Forms - ========================================================================== */ - -/* - * Define consistent border, margin, and padding. - */ - -fieldset { - border: 1px solid #c0c0c0; - margin: 0 2px; - padding: 0.35em 0.625em 0.75em; -} - -/* - * 1. Corrects color not being inherited in IE 8/9. - * 2. Remove padding so people aren't caught out if they zero out fieldsets. - */ - -legend { - border: 0; /* 1 */ - padding: 0; /* 2 */ -} - -/* - * 1. Corrects font family not being inherited in all browsers. - * 2. Corrects font size not being inherited in all browsers. - * 3. Addresses margins set differently in Firefox 4+, Safari 5, and Chrome - */ - -button, -input, -select, -textarea { - font-family: inherit; /* 1 */ - font-size: 100%; /* 2 */ - margin: 0; /* 3 */ -} - -/* - * Addresses Firefox 4+ setting `line-height` on `input` using `!important` in - * the UA stylesheet. - */ - -button, -input { - line-height: normal; -} - -/* - * 1. Avoid the WebKit bug in Android 4.0.* where (2) destroys native `audio` - * and `video` controls. - * 2. Corrects inability to style clickable `input` types in iOS. - * 3. Improves usability and consistency of cursor style between image-type - * `input` and others. - */ - -button, -html input[type="button"], /* 1 */ -input[type="reset"], -input[type="submit"] { - -webkit-appearance: button; /* 2 */ - cursor: pointer; /* 3 */ -} - -/* - * Re-set default cursor for disabled elements. - */ - -button[disabled], -input[disabled] { - cursor: default; -} - -/* - * 1. Addresses box sizing set to `content-box` in IE 8/9. - * 2. Removes excess padding in IE 8/9. - */ - -input[type="checkbox"], -input[type="radio"] { - box-sizing: border-box; /* 1 */ - padding: 0; /* 2 */ -} - -/* - * 1. Addresses `appearance` set to `searchfield` in Safari 5 and Chrome. - * 2. Addresses `box-sizing` set to `border-box` in Safari 5 and Chrome - * (include `-moz` to future-proof). - */ - -input[type="search"] { - -webkit-appearance: textfield; /* 1 */ - -moz-box-sizing: content-box; - -webkit-box-sizing: content-box; /* 2 */ - box-sizing: content-box; -} - -/* - * Removes inner padding and search cancel button in Safari 5 and Chrome - * on OS X. - */ - -input[type="search"]::-webkit-search-cancel-button, -input[type="search"]::-webkit-search-decoration { - -webkit-appearance: none; -} - -/* - * Removes inner padding and border in Firefox 4+. - */ - -button::-moz-focus-inner, -input::-moz-focus-inner { - border: 0; - padding: 0; -} - -/* - * 1. Removes default vertical scrollbar in IE 8/9. - * 2. Improves readability and alignment in all browsers. - */ - -textarea { - overflow: auto; /* 1 */ - vertical-align: top; /* 2 */ -} - -/* ========================================================================== - Tables - ========================================================================== */ - -/* - * Remove most spacing between table cells. - */ - -table { - border-collapse: collapse; - border-spacing: 0; -} \ No newline at end of file diff --git a/docs/src/consumer.html b/docs/src/consumer.html deleted file mode 100644 index 932ea57..0000000 --- a/docs/src/consumer.html +++ /dev/null @@ -1,494 +0,0 @@ - - - - - consumer.js - - - - - -
-
- - - -
    - -
  • -
    -

    consumer.js

    -
    -
  • - - - -
  • -
    - -
    - -
    - -
    - -
    import { Socket } from 'net'
    -import path from 'path'
    -import btc from 'better-try-catch'
    -import JsonStream from './json-stream'
    - -
  • - - -
  • -
    - -
    - -
    -

    import logger from ‘../../uci-logger/src/logger’

    - -
    - -
    import logger from '@uci/logger'
    -
    -let log = {}
    - -
  • - - -
  • -
    - -
    - -
    -

    TODO change default pipe dir for windows and mac os

    - -
    - -
    const DEFAULT_PIPE_DIR = (process.env.SOCKETS_DIR || '/tmp/UCI')
    -const DEFAULT_SOCKET_NAME = 'uci-sock'
    -
    -export default class Consumer extends Socket {
    -  constructor (opts={}) {
    -    super()
    -    log = logger({file:'src/consumer.js',class:'Consumer',name:'socket',id:this.id})
    -    this.id = opts.id || opts.name || 'socket:'+ new Date().getTime()
    -    if (!opts.path) {
    -      log.warn({opts:opts},'no host supplied using localhost...use named piped instead')
    -      opts.host = opts.host || '127.0.0.1'
    -      opts.port = opts.port || 8080
    -    } else {
    -      if (typeof opts.path === 'boolean') opts.path = path.join(DEFAULT_PIPE_DIR,DEFAULT_SOCKET_NAME )
    -      if (path.dirname(opts.path)==='.') opts.path = path.join(DEFAULT_PIPE_DIR,opts.path )
    -    }
    -    this.opts=opts
    -    this.keepAlive = 'keepAlive' in opts ? opts.keepAlive : true
    -    this._ready = false
    -    this.timeout = opts.timeout || 30
    -    this.wait = opts.wait || 1
    -    this.stream = new JsonStream()
    - -
  • - - -
  • -
    - -
    - -
    -

    bind to class for other class functions

    - -
    - -
        this.connect = this.connect.bind(this)
    -    this.__ready = this.__ready.bind(this)
    - -
  • - - -
  • -
    - -
    - -
    -

    this.write = this.write.bind(this)

    - -
    - -
      }
    -
    -  async connect () {
    -
    -    return new Promise( (resolve,reject) => {
    -
    -      const connect = () => {
    -        if (this.opts.host ==='127.0.0.1') log.warn('tcp consumer on same machine as host, use named Pipe(Unix) Socket Instead')
    -        log.info({opts:this.opts},`attempting to connect ${this.id} to socket`)
    -        super.connect(this.opts)
    -      }
    -
    -      let reconnect = {}
    -      const timeout = setTimeout(() =>{
    -        clearTimeout(reconnect)
    -        log.fatal({opts:this.opts},`unable to connect in ${this.timeout}s`)
    -        reject({opts:this.opts},`unable to connect to socket server in ${this.timeout}secs`)
    -      }
    -        ,this.timeout*1000)
    -
    -      this.once('connect', async () => {
    -        clearTimeout(timeout)
    -        this._listen()
    -        log.info({opts:this.opts},'connected waiting for socket ready handshake')
    -        this.setKeepAlive(this.keepAlive,100)
    -        let [err, res] = await btc(isReady).bind(this)(this.__ready, this.wait, this.timeout)
    -        if (err) reject(err)
    -        log.info('handshake done, authenticating')
    - -
  • - - -
  • -
    - -
    - -
    -

    TODO authenticate here by encrypting a payload with private key and sending that. -await btc(authenticate)

    - -
    - -
            resolve(res)
    -      })
    -
    -      this.on('error', async (err) => {
    -        log.warn({error:err.code},`connect error ${err.code}`)
    -        if (err.code === 'EISCONN') {
    -          return resolve('ready')
    -        }
    -
    -        reconnect = setTimeout( () =>{ connect()
    -        },this.wait*1000 )
    -
    -      })
    -
    -      this.on('end', async () => {
    -        log.warn('socket (server) terminated unexpectantly')
    -        if (this.keepAlive) {
    -          log.info('keep alive was set, so waiting on server to come online for reconnect')
    -          this.destroy()
    -          this.emit('error', {code:'DISCONNECTED'})
    -        }
    -      })
    -
    -      connect()  // initial connect request
    -
    -    }) //end promise
    -
    -  }
    -
    -
    -  async send(ipacket) {
    -    return new Promise( async (resolve) => {
    - -
  • - - -
  • -
    - -
    - -
    -

    need this for when multiple sends for different consumers use same packet instance

    - -
    - -
          let packet = Object.assign({},ipacket)
    -      setTimeout(() => {resolve({error:'no response from socket in 10sec'})},10000)
    -      packet._header =
    -      { id:Math.random().toString().slice(2), // need this for when multiple sends for different consumers use same packet instanceack
    -        sender:{ name:this.name, instanceID:this.id },
    -        path: this.opts.path,
    -        port: this.opts.port,
    -        host: this.opts.host
    -      }
    -      let [err, res] = await btc(this.stream.serialize)(packet)
    -      if (err) resolve({error:'unable to serialize packet for sending', packet:packet})
    -      await this.__write(res)
    -      this.once(packet._header.id,async function(reply){
    -        let res = await this._packetProcess(reply)
    -        if (!res) {   // if process was not promise returning like just logged to console
    -          res = reply
    - -
  • - - -
  • -
    - -
    - -
    -

    log.warn(‘consumer function was not promise returning further processing may be out of sequence’)

    - -
    - -
            }
    -        resolve(res)
    -      }) //end listener
    -    })
    -  }
    - -
  • - - -
  • -
    - -
    - -
    -

    TODO register alt stream processor (emit ‘message’ with JSON, serialize function, onData method for raw socket chucks) -TODO register authenciation function (set up default)

    - -
    - -
    -  registerPacketProcessor (func) {
    -    this._packetProcess = func
    -  }
    - -
  • - - -
  • -
    - -
    - -
    -

    PRIVATE METHODS

    - -
    - -
    -  async __write(packet) {
    - -
  • - - -
  • -
    - -
    - -
    -

    timeout already set if sockect can’t be drained in 10 secs

    - -
    - -
        return new Promise(resolve => {
    -      const cb = () => resolve('packet written to consumer side socket stream ')
    -      if (!super.write(packet)) {
    -        this.once('drain',cb )
    -      } else {
    -        process.nextTick(cb)
    -      }
    -
    -    })
    -  }
    -
    -  __ready() {return this._ready}
    -
    -  async _listen () {
    -    log.info('listening for incoming packets from socket')
    - -
  • - - -
  • -
    - -
    - -
    -

    listen for pushed packets

    - -
    - -
        this.on('pushed',async function(packet){
    - -
  • - - -
  • -
    - -
    - -
    -

    TODO do some extra security here?

    - -
    - -
          let res = await this._packetProcess(packet)
    -      if (!res) {   // if process was not promise returning like just logged to console
    - -
  • - - -
  • -
    - -
    - -
    -

    log.warn(‘consumer function was not promise returning’)

    - -
    - -
          }
    -    })
    - -
  • - - -
  • -
    - -
    - -
    -

    listen on socket stream

    - -
    - -
        this.on('data', this.stream.onData)
    -    this.stream.on('message', messageProcess.bind(this))
    -
    -    async function messageProcess (packet) {
    - -
  • - - -
  • -
    - -
    - -
    -

    console.log(‘incoming packet from socket’,packet)

    - -
    - -
          if (packet._handshake) {
    -        this._ready = true
    -        return }
    - -
  • - - -
  • -
    - -
    - -
    -

    TODO send back ack with consumer ID and authorization and wait -when authorized drop through here to emit

    - -
    - -
          this.emit(packet._header.id, packet)
    -    }
    -  }
    - -
  • - - -
  • -
    - -
    - -
    -

    default packet process just a simple console logger. ignores any cmd: prop

    - -
    - -
      _packetProcess (packet) {
    -    console.log('default consumer processor -- log packet from socket to console')
    -    console.dir(packet)
    -  }
    -
    -}  // end class
    - -
  • - - -
  • -
    - -
    - -
    -

    HELP FUNCTIONS -wait until a passed ready function returns true

    - -
    - -
    function isReady(ready, wait=30, timeout=1000) {
    -  let time = 0
    -  return new Promise((resolve, reject) => {
    -    (function waitReady(){
    -      if (time > timeout) return reject(`timeout waiting for socket ready handshake - ${timeout}ms`)
    -      if (ready()) return resolve('ready')
    -      log.info(`waiting ${wait}ms for handshake`)
    -      time += wait
    -      setTimeout(waitReady, wait)
    -    })()
    -  })
    -}
    - -
  • - -
-
- - diff --git a/docs/src/index.html b/docs/src/index.html deleted file mode 100644 index 8918163..0000000 --- a/docs/src/index.html +++ /dev/null @@ -1,78 +0,0 @@ - - - - - index.js - - - - - -
-
- - - -
    - -
  • -
    -

    index.js

    -
    -
  • - - - -
  • -
    - -
    - -
    - -
    - -
    import Socket from './socket'
    -import Consumer from  './consumer'
    -
    -export { Socket as Socket }
    -export { Consumer as Consumer }
    -export default { Socket, Consumer }
    - -
  • - -
-
- - diff --git a/docs/src/json-stream.html b/docs/src/json-stream.html deleted file mode 100644 index 35fe841..0000000 --- a/docs/src/json-stream.html +++ /dev/null @@ -1,199 +0,0 @@ - - - - - json-stream.js - - - - - -
-
- - - -
    - -
  • -
    -

    json-stream.js

    -
    -
  • - - - -
  • - - -
    -import {StringDecoder} from 'string_decoder'
    -import EventEmitter from 'events'
    -import btc from 'better-try-catch'
    -
    -const decoder = new StringDecoder()
    -
    -export default class JsonStream extends EventEmitter{
    -  constructor(opts={}){
    -    super()
    -    this._contentLength = null
    -    this._buffer = ''
    -    this._delimeter = opts.delimiter || '#'
    -    this.onData = this.onData.bind(this)
    -    this.serialize = this.serialize.bind(this)
    -  }
    -
    -
    -  onData (data) {
    - -
  • - - -
  • -
    - -
    - -
    -

    console.log(‘a chunk arrived’, data)

    - -
    - -
        data = decoder.write(data)
    -    try {
    -      this._handleData(data)
    -    } catch (e) {
    -      this.emit('error', { error: e })
    -    }
    -  }
    -
    -  async serialize(message) {
    -    return new Promise( (resolve,reject) => {
    -      let [err,messageData] = btc(JSON.stringify)(message)
    -      if (err) reject(err)
    -      let [err2,length] = btc(Buffer.byteLength)(messageData, 'utf8')
    -      if (err2) reject(err2)
    -      let data = length + this._delimeter + messageData
    - -
  • - - -
  • -
    - -
    - -
    -

    console.log(‘serialized’,data)

    - -
    - -
          resolve(data)
    -    })
    -  }
    -
    -  _handleData (data) {
    -    this._buffer += data
    -    if (this._contentLength == null) {
    -      var i = this._buffer.indexOf(this._delimeter)
    - -
  • - - -
  • -
    - -
    - -
    -

    Check if the buffer has a this._opts.delimeter or “#”, if not, the end of the buffer string might be in the middle of a content length string

    - -
    - -
          if (i !== -1) {
    -        var rawContentLength = this._buffer.substring(0, i)
    -        this._contentLength = parseInt(rawContentLength)
    -        if (isNaN(this._contentLength)) {
    -          this._contentLength = null
    -          this._buffer = ''
    -          var err = new Error('Invalid content length supplied ('+rawContentLength+') in: '+this._buffer)
    -          err.code = 'E_INVALID_CONTENT_LENGTH'
    -          throw err
    -        }
    -        this._buffer = this._buffer.substring(i+1)
    -      }
    -    }
    -    if (this._contentLength != null) {
    -      var length = Buffer.byteLength(this._buffer, 'utf8')
    -      if (length == this._contentLength) {
    -        this._handleMessage(this._buffer)
    -      } else if (length > this._contentLength) {
    -        var message = this._buffer.substring(0, this._contentLength)
    -        var rest = this._buffer.substring(this._contentLength)
    -        this._handleMessage(message)
    -        this.onData(rest)
    -      }
    -    }
    -  }
    -
    -  _handleMessage (data) {
    -    this._contentLength = null
    -    this._buffer = ''
    -    var message
    -    try {
    -      message = JSON.parse(data)
    -    } catch (e) {
    -      var err = new Error('Could not parse JSON: '+e.message+'\nRequest data: '+data)
    -      err.code = 'E_INVALID_JSON'
    -      throw err
    -    }
    -    message = message || {}
    -    this.emit('message', message)
    -  }
    -
    -}
    - -
  • - -
-
- - diff --git a/docs/src/socket.html b/docs/src/socket.html deleted file mode 100644 index a5d91ed..0000000 --- a/docs/src/socket.html +++ /dev/null @@ -1,403 +0,0 @@ - - - - - socket.js - - - - - -
-
- - - -
    - -
  • -
    -

    socket.js

    -
    -
  • - - - -
  • -
    - -
    - -
    - -
    - -
    import { Server } from 'net'
    -import { unlink as fileDelete } from 'fs'
    -import { promisify } from 'util'
    -import path from 'path'
    -import mkdir from 'make-dir'
    -import btc from 'better-try-catch'
    -import _ON_DEATH from 'death' //this is intentionally ugly
    -import JSONStream from './json-stream'
    -import clone from 'clone'
    - -
  • - - -
  • -
    - -
    - -
    -

    import logger from ‘../../uci-logger/src/logger’

    - -
    - -
    import logger from '@uci/logger'
    -let log = {}
    - -
  • - - -
  • -
    - -
    - -
    -

    TODO change default pipe dir for windows and mac os

    - -
    - -
    const DEFAULT_PIPE_DIR = (process.env.SOCKETS_DIR || '/tmp/UCI')
    -const DEFAULT_SOCKET_NAME = 'uci-sock'
    -
    -export default class Socket extends Server {
    -  constructor (opts={}) {
    -    super()
    -    this.id = opts.id || opts.name || 'socket:'+ new Date().getTime()
    -    if (!opts.path) {
    -      opts.host = opts.host || '0.0.0.0'
    -      opts.port = opts.port || 8080
    -    } else {
    -      if (typeof opts.path === 'boolean') opts.path = path.join(DEFAULT_PIPE_DIR,DEFAULT_SOCKET_NAME )
    -      if (path.dirname(opts.path)==='.') opts.path = path.join(DEFAULT_PIPE_DIR,opts.path )
    -    }
    -    this.clientTracking = opts.clientTracking || true
    -    this.clients = []  // track consumers (i.e. clients)
    -    this.opts = opts  // for use to recover from selected errors
    - -
  • - - -
  • -
    - -
    - -
    -

    self bindings

    - -
    - -
        this._listen = this._listen.bind(this)
    -    this.create = this.create.bind(this)
    -    log = logger({file:'src/socket.js',class:'Socket',name:'socket',id:this.id})
    -  } // end constructor
    -
    -  async create () {
    -
    -    return new Promise( async (resolve,reject) => {
    - -
  • - - -
  • -
    - -
    - -
    -

    couple ways to kill socket process when needed

    - -
    - -
          _ON_DEATH( async () => {
    -        log.info('\nhe\'s dead jim')
    -        await this._destroy()
    -      })
    -      process.once('SIGUSR2', async () => {
    -        await this._destroy
    -        process.kill(process.pid, 'SIGUSR2')
    -      })
    -
    -      this.once('error', async (err) => {
    - -
  • - - -
  • -
    - -
    - -
    -

    recover from socket file that was not removed

    - -
    - -
            if (err.code === 'EADDRINUSE') {
    -          if (this.opts.path) { // if TCP socket should already be dead
    -            let [err, res] = await btc(promisify(fileDelete))(this.opts.path)
    -            if(!err) {
    -              log.info({res:res, socket: this.opts.path}, 'socket already exists.....deleted')
    -              return await this._listen(this.opts)
    -            }
    -            log.fatal({err:err},'error deleting socket.  Can not establish a socket')
    -            return err
    -          }
    -        }
    -        if (err.code ==='EACCES'){
    -          console.log({socket: this.opts.path}, 'directory does not exist...creating')
    -          await mkdir(path.dirname(this.opts.path))
    -          console.log({socket: this.opts.path}, 'created')
    -          log.warn({socket: this.opts.path}, 'directory does not exist...creating')
    -          return await this._listen(this.opts)
    -        }
    - -
  • - - -
  • -
    - -
    - -
    -

    otherwise fatally exit

    - -
    - -
            log.info(err, 'creating socket')
    -        reject(err)
    -      })
    -
    -      let [err, res] = await btc(this._listen)(this.opts)
    -      if (err) reject(err)
    -      resolve(res)
    -
    -    }) // end creeate promise
    -  } // end create
    -
    -  registerPacketProcessor (func) {
    -    this._packetProcess = func
    -  }
    -
    -  async _listen (opts) {
    -    super.listen(opts, async (err, res) => {
    -      if (err) return err
    - -
  • - - -
  • -
    - -
    - -
    -

    this gets called for each client connection and is unique to each

    - -
    - -
          this.on('connection', async (socket) => {
    -        const stream = new JSONStream()
    -        socket.stream = stream // need this to track clients
    -        let send = this._send.bind(socket)
    -        if (this.clientTracking) this.clients.push(socket)
    - -
  • - - -
  • -
    - -
    - -
    -

    TODO add ‘close’ listener to socket to remove from this.clients

    - -
    - -
            log.info('new consumer connecting')
    -        log.info(await send(await stream.serialize({'_handshake':true})))
    -        if (this.opts.conPacket) {
    -          this.opts.conPacket._header = { id:'pushed'}
    -          log.info({conPacket:this.opts.conPacket},'pushing a preset command to just connected consumer')
    -          send(await stream.serialize(this.opts.conPacket)) // send a packet command on to consumer on connection
    -        }
    -        socket.on('data', stream.onData)
    - -
  • - - -
  • -
    - -
    - -
    -

    TODO need to start error listener for stream so errors can be processed

    - -
    - -
            stream.on('message', messageProcess.bind(this,socket))
    -
    -        async function messageProcess (client, packet) {
    -          log.info({packet:packet},'incoming packet on socket side')
    -          let res = {}
    -          if (this.clientTracking && packet.clientID) {
    -            client.ID = packet.clientID
    -            res.cmd='ackID'
    -          }
    -          else {
    -            res = await this._packetProcess(clone(packet)) || {}
    -            if (Object.keys(res).length === 0) res = { error: 'socket packet command function likely did not return a promise', packet:packet}
    -          }
    -          if (packet) {
    -            res._header = clone(packet._header,false) || {} //make sure return packet has header with id in case it was removed in processing
    -            delete packet._header  // remove before adding to response header as request
    -          } else res._header = {}
    -          res._header.request = clone(packet,false)
    -          res._header.responder = {name:this.name,instanceID:this.id}
    -          res._header.socket = this.address()
    -          if (!res.cmd) res.cmd = 'reply'  // by default return command is 'reply'
    -          let [err, ser] = await btc(stream.serialize)(res)
    -          if (err) ser = await stream.serialize({ error: 'was not able to serialze the res packet', err:err, _header:{id:res._header.id}})
    -          log.info(await send(ser))
    -        } // end process message
    -
    -      }) // end connecttion consumer
    -      log.info({opts: this.opts},'socket created')
    -      return res
    -    }) // end super listen callback
    -
    -  } // end listen
    -
    -  async _destroy () {
    -    log.info('closing down socket')
    -    await this.close()
    -    log.info('all connections closed....exiting')
    -    process.exit()
    -  }
    - -
  • - - -
  • -
    - -
    - -
    -

    default packet process, just a simple echo

    - -
    - -
      async _packetProcess (packet) {
    -    return new Promise(resolve => {
    -      resolve(packet)
    -    })
    -  }
    - -
  • - - -
  • -
    - -
    - -
    -

    must have a consumer socket bound to use

    - -
    - -
      async _send(packet) {
    - -
  • - - -
  • -
    - -
    - -
    -

    timeout already set if sockect can’t be drained in 10 secs

    - -
    - -
        return new Promise(resolve => {
    -      const cb = () => resolve('packet written to socket stream')
    -      if (!this.write(packet)) {
    -        this.once('drain',cb )
    -      } else {
    -        process.nextTick(cb)
    -      }
    -    })
    -  }
    -
    -  async push (packet,id) {
    -    packet._header = { id:'pushed'}
    -    log.info({opts:this.opts,packet:packet},'pushing a packet to all connected consumers')
    -    this.clients.forEach(async (client) => {
    -      if (client.writable) {
    -        let [err, ser] = await btc(client.stream.serialize)(packet)
    -        if (err) ser = await client.stream.serialize({ error: 'was not able to serialze the res packet', err:err, _header:{id:packet._header.id}})
    -        if (!id || id ===client.ID ) await this._send.bind(client)(ser)
    -      }
    -    })
    -  }
    -
    -} // end class
    - -
  • - -
-
- - diff --git a/examples/client.js b/examples/client.js index 3d11ea2..b2467b4 100644 --- a/examples/client.js +++ b/examples/client.js @@ -1,8 +1,9 @@ import Consumer from '../src/consumer' import btc from 'better-try-catch' -// const client1= new Consumer({name:'example-consumer1' }) -const client= new Consumer({path:true, name:'example-consumer', initTimeout:30 }) +// const PATH ='/opt/bogus/socket' +const PATH=true +const client= new Consumer({path:PATH, name:'example-consumer', initTimeout:30 }) // This is your client handler object waiting on a message to do something async function processor (packet) { @@ -44,8 +45,9 @@ client.pushed = (packet) => { -client.on('connection', event => { - console.log('============ connection update ============') +client.on('status', event => { + console.log('============ socket status ============') + console.log('status level',event.level) console.log(event.id) console.log(event.msg) console.log(`Consumer is ${event.connected ? 'connected' : 'disconnected'}`) diff --git a/examples/opts.yaml b/examples/opts.yaml new file mode 100644 index 0000000..e69de29 diff --git a/examples/server.js b/examples/server.js index e3d802f..aa29dd6 100644 --- a/examples/server.js +++ b/examples/server.js @@ -36,11 +36,11 @@ class Test extends Socket { // // This is necessary only if the client uses a self-signed certificate. // // ca: [ fs.readFileSync('client-cert.pem') ] // } - -let options = {path:true} +// const PATH = '/opt/bogus/socket' +const PATH = true // options.conPacket = {cmd:'onconnect', data:'this is a packet data sent consumer after handshake/authentification'} -options.tokens = ['cheetos'] -let test = new Test(options) +const TOKENS = ['cheetos'] +let test = new Test({path:PATH, tokens:TOKENS}) async function processor (packet) { // console.log('packet being processed at socket', packet) @@ -70,25 +70,20 @@ test.registerPacketProcessor(processor) await test.create() let count = 0 - // const push = setInterval( () => { - // count++ - // test.push({cmd:'pushed', count:count, status:`some pushed data ${count}`}) - // if (count >3) { - // clearInterval(push) - // test.push({cmd:'pushed',status:'now will simulate server going offline by stopping to send pingfor 10 seconds'}) - // test.disablePing() - // setTimeout( () => { - // test.enablePing() - // },10000) - // - // } - // },3000) + const push = setInterval( () => { + count++ + test.push({cmd:'pushed', count:count, status:`some pushed data ${count}`}) + if (count >3) { + clearInterval(push) + test.push({cmd:'pushed',status:'now will simulate server going offline by stopping to send ping for 10 seconds'}) + test.disablePing() + setTimeout( () => { + test.enablePing() + },10000) + } + },3000) - // setTimeout( () => { - // console.log('closing server') - // test._destroy() - // },20000) })().catch(err => { console.error('FATAL: UNABLE TO START SYSTEM!\n',err) diff --git a/package.json b/package.json index 1f81bd8..26a5945 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@uci/socket", - "version": "0.2.19", + "version": "0.2.20", "description": "JSON packet intra(named)/inter(TCP) host communication over socket", "main": "src", "scripts": { diff --git a/src/consumer.js b/src/consumer.js index 70546dc..001b162 100644 --- a/src/consumer.js +++ b/src/consumer.js @@ -73,12 +73,12 @@ class SocketConsumer extends Socket { log.warn({method:'connect', line:107, msg:'tcp consumer on same machine as host, use named Pipe(Unix) Socket Instead'}) log.debug('first connnect attempt for', this.opts.name) - this.emit('connection',{msg:'attempting initial connection', id:this.id, opts:this.opts, ready:false}) + this.emit('status',{level:'info', msg:'attempting initial connection', id:this.id, opts:this.opts, ready:false}) let initTimeout = {} if (this.initTimeout > 499) { initTimeout = setTimeout(() => { - this.emit('connection',{msg:'initial connection timed out', id:this.id, timeout:this.initTimeout, wait:this.retryWait, opts:this.opts, ready:false}) + this.emit('status',{level:'info', msg:'initial connection timed out', id:this.id, timeout:this.initTimeout, wait:this.retryWait, opts:this.opts, ready:false}) this.removeAllListeners() log.fatal({method:'connect', line:69, opts: this.opts, msg:`unable to initially connect to ${this.opts.name} in ${this.initTimeout/1000} secs no more attempts!`}) this.stream.removeAllListeners() @@ -98,7 +98,7 @@ class SocketConsumer extends Socket { authPacket.clientName = this.id let res = (await this._authenticateSend(authPacket)) || {} if (!res.authenticated) { - this.emit('connection',{msg:`authentication failed: ${res.reason}`, id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected}) + this.emit('status',{level:'info', msg:`authentication failed: ${res.reason}`, id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected}) reject('unable to authenticate') } else { @@ -106,7 +106,7 @@ class SocketConsumer extends Socket { this.removeListener('error',initialErrorHandler) this._listen() // setup for active connection log.info({method:'connect', line:87, msg:'initial connect/authentication complete ready for communication'}) - this.emit('connection',{msg:'authentication succesfull', id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected}) + this.emit('status',{level:'info', msg:'authentication succesfull', id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected}) resolve('initial connection successful') } } @@ -137,7 +137,6 @@ class SocketConsumer extends Socket { return new Promise(async resolve => { if (!this._connected) { resolve({ error: 'socket consumer not connected, aborting send' }) - return } let packet = Object.assign({}, ipacket) // need to avoid mutuation for different consumers using same packet instance setTimeout(() => {resolve({ error: 'no response from socket in 10sec' })}, 10000) @@ -206,7 +205,7 @@ class SocketConsumer extends Socket { const reconnectHandler = () => { this.stream.once('message', handshake.bind(this)) log.debug({method:'connect', line:113, msg:'connected waiting for socket ready handshake'}) - this.emit('connection',{msg:'attemping reconnect', id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected}) + this.emit('status',{level:'info', msg:'attemping reconnect', id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected}) } const handshake = async (packet) => { @@ -217,19 +216,19 @@ class SocketConsumer extends Socket { authPacket.clientName = this.id let res = (await this._authenticateSend(authPacket)) || {} if (!res.authenticated) { - this.emit('connection',{msg:`authentication failed: ${res.reason}`, id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected}) + this.emit('status',{level:'error', msg:`authentication failed: ${res.reason}`, id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected}) this.emit('error',{code:'authentification failed'}) } else { this._authenticated = res.authenticated log.info({method:'connect', line:87, msg:'initial connect/authentication complete ready for communication'}) - this.emit('connection',{msg:'authentication successful', id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected}) + this.emit('status',{level:'info', msg:'authentication successful', id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected}) if (this.keepAlive) { // only attempt reconnect if keepAlive is set which it is by default this.on('ping',pingHandler) this.setKeepAlive(this.keepAlive,3000) // keep connection alive unless disabled } this.stream.on('message', messageHandler.bind(this)) // reset default message handler - this.emit('connection',{msg:'reconnected', id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected}) + this.emit('status',{level:'info', msg:'reconnected', id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected}) } } } @@ -238,7 +237,7 @@ class SocketConsumer extends Socket { log.debug({msg:'connection error emitted ', error:err}) this._connected = false this._authenticated = false - this.emit('connection',{msg:'connection(socket) error', id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected}) + this.emit('status',{level:'error', msg:'connection error', id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected}) log.debug({ method:'connect', line:130, error: err.code, msg:`connect error ${err.code}, attempting reconnect after ${this.retryWait/1000} secs`}) await pause(this.retryWait) this.stream.removeAllListeners('message') // remove regular message handler in prep for reconnect @@ -304,8 +303,6 @@ class SocketConsumer extends Socket { }) } - - // default packet process just a simple console logger. ignores any cmd: prop _packetProcess(packet) { console.log('default consumer processor -- log packet from socket to console') diff --git a/src/socket-class.js b/src/socket-class.js index cd4df60..ff57447 100644 --- a/src/socket-class.js +++ b/src/socket-class.js @@ -65,8 +65,8 @@ export default function socketClass(Server) { this.clients = [] // track consumers (i.e. clients) this.nextClientID = 0 // incrementer for default initial client ID this.opts = opts // for use to recover from selected errors + this.errorCount = 0 //self bindings - this._listen = this._listen.bind(this) this.create = this.create.bind(this) this.authenticateClient = this.authenticateClient.bind(this) this._authenticate = this._authenticate.bind(this) @@ -86,7 +86,6 @@ export default function socketClass(Server) { */ async create() { return new Promise(async (resolve, reject) => { - // set up a couple ways to gracefully destroy socket process is killed/aborted _ON_DEATH(async () => { log.error({method:'create', line:84, msg:'\nhe\'s dead jim'}) await this._destroy() @@ -103,28 +102,41 @@ export default function socketClass(Server) { // if TCP socket should already be dead let [err, res] = await btc(promisify(fileDelete))(this.opts.path) if (!err) { - log.debug({method:'create', line:99, res: res, socket: this.opts.path, msg:'socket already exists.....deleted'}) - return await this._listen(this.opts) + log.info({method:'create', line:99, res: res, socket: this.opts.path, msg:'socket already exists.....deleted'}) + // try again + this.removeAllListeners('listening') + return await this.create() } log.error({method:'create', line:102, err: err, msg:'error deleting socket. Can not establish a socket'}) - return err } } if (err.code === 'EACCES') { log.debug({method:'create', line:107, socket: this.opts.path, msg:'directory does not exist...creating'}) await mkdir(path.dirname(this.opts.path)) log.debug({method:'create', line:109, socket: this.opts.path, msg:'directory created'}) - return await this._listen(this.opts) + this.removeAllListeners('listening') + return await this.create() } // otherwise fatally exit - log.error({method:'create', line:113, err:err, msg:'error creating socket'}) + log.error({method:'create', line:113, err:err, opts:this.opts, msg:`error creating socket server ${this.name}`}) reject(err) }) - let [err, res] = await btc(this._listen)(this.opts) - if (err) reject(err) + this.once('listening', () => { + this.on('error', err => { + this.errorCount +=1 // log errors here + this.errors.push(err) + if(this.errorCount>2) this.emit('warn', {msg:'something bad maybe going on, 3 errors', errors:this.errors}) + if(this.errorCount>5) this.emit('fatal', {msg:'something fatal is going on, 6 errors', errors:this.errors}) + }) + log.info({method:'create', line:54, msg:'socket server created and listening at', address:this.address()}) + this.on('connection', this._connectionHandler.bind(this)) + resolve(`socket ready and listening at ${this.address().address}:${this.address().port}`) + }) + + super.listen(this.opts) this.enablePing() - resolve(res) + }) // end creeate promise } // end create @@ -268,88 +280,76 @@ export default function socketClass(Server) { return true } - async _listen(opts) { - return super.listen(opts, async (err, res) => { - if (err) return Promise.reject(err) - // this gets called for each client connection and is unique to each - this.on('connection', async socket => { - log.debug({method:'_listen', line:167, msg:'new consumer connecting'}) - socket.id = ++this.nextClientID // server assigned ID - socket.authenticated = false - this.clients.push(socket) // add client to list - const stream = new JSONStream() - socket.stream = stream - socket.setKeepAlive(this.keepAlive,3000) - // add listeners - const clientCloseHandler = (id) => { - log.warn({msg:'client connection closed during listen,',id:id}) - this.removeClient(id) - } + async _connectionHandler(socket) { // this gets called for each client connection and is unique to each + log.debug({method:'_listen', line:167, msg:'new consumer connecting'}) + socket.id = ++this.nextClientID // server assigned ID + socket.authenticated = false + this.clients.push(socket) // add client to list + const stream = new JSONStream() + socket.stream = stream + socket.setKeepAlive(this.keepAlive,3000) - socket.on('close', clientCloseHandler.bind(this,socket.id) ) + // add listeners + const clientCloseHandler = (id) => { + log.warn({msg:'client connection closed during listen,',id:id}) + this.removeClient(id) + } - socket.on('error', (err) => { - log.error({msg:'client connection error during listen',error:err}) - // TODO do more handling than just logging - }) + socket.on('close', clientCloseHandler.bind(this,socket.id) ) - socket.on('data', stream.onData) // send data to + socket.on('error', (err) => { + log.error({msg:'client connection error during listen',error:err}) + // TODO do more handling than just logging + }) - stream.on('error', (err) => { - log.error({msg:'client-socket stream error during listen',error:err}) - // TODO do more handling than just logging - }) + socket.on('data', stream.onData) // send data to - let [err] = await btc(this.authenticateClient)(socket) - if (!this.allowAnonymous) { - if (err) { - socket.end()// abort new connection socket, cleanup, remove listeners - this.removeClient(socket.id) - return - } - } + stream.on('error', (err) => { + log.error({msg:'client-socket stream error during listen',error:err}) + // TODO do more handling than just logging + }) - // all's set main message processor - stream.on('message', messageProcess.bind(this, socket)) + let [err] = await btc(this.authenticateClient)(socket) + if (!this.allowAnonymous) { + if (err) { + socket.end()// abort new connection socket, cleanup, remove listeners + this.removeClient(socket.id) + return + } + } - if (this.opts.conPacket) { - this.opts.conPacket._header = { id: 'pushed' } - log.debug({method:'_listen', line:171, conPacket: this.opts.conPacket, msg:'pushing a preset command to just connected consumer'}) - this._send(socket,this.opts.conPacket) // send a packet command on to consumer on connection - } + // all's set main message processor + stream.on('message', messageProcess.bind(this, socket)) - // that's it. Connection is active + if (this.opts.conPacket) { + this.opts.conPacket._header = { id: 'pushed' } + log.debug({method:'_listen', line:171, conPacket: this.opts.conPacket, msg:'pushing a preset command to just connected consumer'}) + this._send(socket,this.opts.conPacket) // send a packet command on to consumer on connection + } - async function messageProcess(client, packet) { - log.debug({method:'_listen', line:179, packet: packet, client:client.name, msg:'incoming packet on socket side'}) - let res = (await this._packetProcess(clone(packet))) || {} - if (Object.keys(res).length === 0) - res = { - error: + // that's it. Connection is active + + async function messageProcess(client, packet) { + log.debug({method:'_listen', line:179, packet: packet, client:client.name, msg:'incoming packet on socket side'}) + let res = (await this._packetProcess(clone(packet))) || {} + if (Object.keys(res).length === 0) + res = { + error: 'socket packet command function likely did not return a promise', - packet: packet - } - if (packet) { - res._header = clone(packet._header, false) || {} //make sure return packet has header with id in case it was removed in processing - delete packet._header // remove before adding to response header as request - } else res._header = {} - res._header.request = clone(packet, false) - res._header.responder = { name: this.name, instanceID: this.id } - res._header.socket = this.address() - if (!res.cmd) res.cmd = 'reply' // by default return command is 'reply' - let [err] = await btc(this._send)(client,res) - if (err) log.error({msg:err, error:err}) - } // end message process - - - }) // end connecttion handler - - - log.info({method:'_listen', line:255, opts: this.opt, msg:'socket server created and listening'}) - return res - - }) // end super listen callback + packet: packet + } + if (packet) { + res._header = clone(packet._header, false) || {} //make sure return packet has header with id in case it was removed in processing + delete packet._header // remove before adding to response header as request + } else res._header = {} + res._header.request = clone(packet, false) + res._header.responder = { name: this.name, instanceID: this.id } + res._header.socket = this.address() + if (!res.cmd) res.cmd = 'reply' // by default return command is 'reply' + let [err] = await btc(this._send)(client,res) + if (err) log.error({msg:err, error:err}) + } // end message process } // end listen