Compare commits

..

1 Commits
master ... tls

Author SHA1 Message Date
David Kebler cec399ecc5 0.2.12 update deps, clean up logging 2019-04-26 10:15:11 -07:00
16 changed files with 2140 additions and 827 deletions

View File

@ -9,9 +9,11 @@ module.exports = {
"node": true,
"mocha": true
},
"parser": "babel-eslint",
"parserOptions": {
"ecmaVersion": 2017,
"sourceType": "module"
"sourceType": "module",
"allowImportExportEverywhere": true
},
"extends": "eslint:recommended",
"rules": {

518
docs/docco.css Normal file
View File

@ -0,0 +1,518 @@
/*--------------------- Typography ----------------------------*/
@font-face {
font-family: 'aller-light';
src: url('public/fonts/aller-light.eot');
src: url('public/fonts/aller-light.eot?#iefix') format('embedded-opentype'),
url('public/fonts/aller-light.woff') format('woff'),
url('public/fonts/aller-light.ttf') format('truetype');
font-weight: normal;
font-style: normal;
}
@font-face {
font-family: 'aller-bold';
src: url('public/fonts/aller-bold.eot');
src: url('public/fonts/aller-bold.eot?#iefix') format('embedded-opentype'),
url('public/fonts/aller-bold.woff') format('woff'),
url('public/fonts/aller-bold.ttf') format('truetype');
font-weight: normal;
font-style: normal;
}
@font-face {
font-family: 'roboto-black';
src: url('public/fonts/roboto-black.eot');
src: url('public/fonts/roboto-black.eot?#iefix') format('embedded-opentype'),
url('public/fonts/roboto-black.woff') format('woff'),
url('public/fonts/roboto-black.ttf') format('truetype');
font-weight: normal;
font-style: normal;
}
/*--------------------- Layout ----------------------------*/
html { height: 100%; }
body {
font-family: "aller-light";
font-size: 14px;
line-height: 18px;
color: #30404f;
margin: 0; padding: 0;
height:100%;
}
#container { min-height: 100%; }
a {
color: #000;
}
b, strong {
font-weight: normal;
font-family: "aller-bold";
}
p {
margin: 15px 0 0px;
}
.annotation ul, .annotation ol {
margin: 25px 0;
}
.annotation ul li, .annotation ol li {
font-size: 14px;
line-height: 18px;
margin: 10px 0;
}
h1, h2, h3, h4, h5, h6 {
color: #112233;
line-height: 1em;
font-weight: normal;
font-family: "roboto-black";
text-transform: uppercase;
margin: 30px 0 15px 0;
}
h1 {
margin-top: 40px;
}
h2 {
font-size: 1.26em;
}
hr {
border: 0;
background: 1px #ddd;
height: 1px;
margin: 20px 0;
}
pre, tt, code {
font-size: 12px; line-height: 16px;
font-family: Menlo, Monaco, Consolas, "Lucida Console", monospace;
margin: 0; padding: 0;
}
.annotation pre {
display: block;
margin: 0;
padding: 7px 10px;
background: #fcfcfc;
-moz-box-shadow: inset 0 0 10px rgba(0,0,0,0.1);
-webkit-box-shadow: inset 0 0 10px rgba(0,0,0,0.1);
box-shadow: inset 0 0 10px rgba(0,0,0,0.1);
overflow-x: auto;
}
.annotation pre code {
border: 0;
padding: 0;
background: transparent;
}
blockquote {
border-left: 5px solid #ccc;
margin: 0;
padding: 1px 0 1px 1em;
}
.sections blockquote p {
font-family: Menlo, Consolas, Monaco, monospace;
font-size: 12px; line-height: 16px;
color: #999;
margin: 10px 0 0;
white-space: pre-wrap;
}
ul.sections {
list-style: none;
padding:0 0 5px 0;;
margin:0;
}
/*
Force border-box so that % widths fit the parent
container without overlap because of margin/padding.
More Info : http://www.quirksmode.org/css/box.html
*/
ul.sections > li > div {
-moz-box-sizing: border-box; /* firefox */
-ms-box-sizing: border-box; /* ie */
-webkit-box-sizing: border-box; /* webkit */
-khtml-box-sizing: border-box; /* konqueror */
box-sizing: border-box; /* css3 */
}
/*---------------------- Jump Page -----------------------------*/
#jump_to, #jump_page {
margin: 0;
background: white;
-webkit-box-shadow: 0 0 25px #777; -moz-box-shadow: 0 0 25px #777;
-webkit-border-bottom-left-radius: 5px; -moz-border-radius-bottomleft: 5px;
font: 16px Arial;
cursor: pointer;
text-align: right;
list-style: none;
}
#jump_to a {
text-decoration: none;
}
#jump_to a.large {
display: none;
}
#jump_to a.small {
font-size: 22px;
font-weight: bold;
color: #676767;
}
#jump_to, #jump_wrapper {
position: fixed;
right: 0; top: 0;
padding: 10px 15px;
margin:0;
}
#jump_wrapper {
display: none;
padding:0;
}
#jump_to:hover #jump_wrapper {
display: block;
}
#jump_page_wrapper{
position: fixed;
right: 0;
top: 0;
bottom: 0;
}
#jump_page {
padding: 5px 0 3px;
margin: 0 0 25px 25px;
max-height: 100%;
overflow: auto;
}
#jump_page .source {
display: block;
padding: 15px;
text-decoration: none;
border-top: 1px solid #eee;
}
#jump_page .source:hover {
background: #f5f5ff;
}
#jump_page .source:first-child {
}
/*---------------------- Low resolutions (> 320px) ---------------------*/
@media only screen and (min-width: 320px) {
.pilwrap { display: none; }
ul.sections > li > div {
display: block;
padding:5px 10px 0 10px;
}
ul.sections > li > div.annotation ul, ul.sections > li > div.annotation ol {
padding-left: 30px;
}
ul.sections > li > div.content {
overflow-x:auto;
-webkit-box-shadow: inset 0 0 5px #e5e5ee;
box-shadow: inset 0 0 5px #e5e5ee;
border: 1px solid #dedede;
margin:5px 10px 5px 10px;
padding-bottom: 5px;
}
ul.sections > li > div.annotation pre {
margin: 7px 0 7px;
padding-left: 15px;
}
ul.sections > li > div.annotation p tt, .annotation code {
background: #f8f8ff;
border: 1px solid #dedede;
font-size: 12px;
padding: 0 0.2em;
}
}
/*---------------------- (> 481px) ---------------------*/
@media only screen and (min-width: 481px) {
#container {
position: relative;
}
body {
background-color: #F5F5FF;
font-size: 15px;
line-height: 21px;
}
pre, tt, code {
line-height: 18px;
}
p, ul, ol {
margin: 0 0 15px;
}
#jump_to {
padding: 5px 10px;
}
#jump_wrapper {
padding: 0;
}
#jump_to, #jump_page {
font: 10px Arial;
text-transform: uppercase;
}
#jump_page .source {
padding: 5px 10px;
}
#jump_to a.large {
display: inline-block;
}
#jump_to a.small {
display: none;
}
#background {
position: absolute;
top: 0; bottom: 0;
width: 350px;
background: #fff;
border-right: 1px solid #e5e5ee;
z-index: -1;
}
ul.sections > li > div.annotation ul, ul.sections > li > div.annotation ol {
padding-left: 40px;
}
ul.sections > li {
white-space: nowrap;
}
ul.sections > li > div {
display: inline-block;
}
ul.sections > li > div.annotation {
max-width: 350px;
min-width: 350px;
min-height: 5px;
padding: 13px;
overflow-x: hidden;
white-space: normal;
vertical-align: top;
text-align: left;
}
ul.sections > li > div.annotation pre {
margin: 15px 0 15px;
padding-left: 15px;
}
ul.sections > li > div.content {
padding: 13px;
vertical-align: top;
border: none;
-webkit-box-shadow: none;
box-shadow: none;
}
.pilwrap {
position: relative;
display: inline;
}
.pilcrow {
font: 12px Arial;
text-decoration: none;
color: #454545;
position: absolute;
top: 3px; left: -20px;
padding: 1px 2px;
opacity: 0;
-webkit-transition: opacity 0.2s linear;
}
.for-h1 .pilcrow {
top: 47px;
}
.for-h2 .pilcrow, .for-h3 .pilcrow, .for-h4 .pilcrow {
top: 35px;
}
ul.sections > li > div.annotation:hover .pilcrow {
opacity: 1;
}
}
/*---------------------- (> 1025px) ---------------------*/
@media only screen and (min-width: 1025px) {
body {
font-size: 16px;
line-height: 24px;
}
#background {
width: 525px;
}
ul.sections > li > div.annotation {
max-width: 525px;
min-width: 525px;
padding: 10px 25px 1px 50px;
}
ul.sections > li > div.content {
padding: 9px 15px 16px 25px;
}
}
/*---------------------- Syntax Highlighting -----------------------------*/
td.linenos { background-color: #f0f0f0; padding-right: 10px; }
span.lineno { background-color: #f0f0f0; padding: 0 5px 0 5px; }
/*
github.com style (c) Vasily Polovnyov <vast@whiteants.net>
*/
pre code {
display: block; padding: 0.5em;
color: #000;
background: #f8f8ff
}
pre .hljs-comment,
pre .hljs-template_comment,
pre .hljs-diff .hljs-header,
pre .hljs-javadoc {
color: #408080;
font-style: italic
}
pre .hljs-keyword,
pre .hljs-assignment,
pre .hljs-literal,
pre .hljs-css .hljs-rule .hljs-keyword,
pre .hljs-winutils,
pre .hljs-javascript .hljs-title,
pre .hljs-lisp .hljs-title,
pre .hljs-subst {
color: #954121;
/*font-weight: bold*/
}
pre .hljs-number,
pre .hljs-hexcolor {
color: #40a070
}
pre .hljs-string,
pre .hljs-tag .hljs-value,
pre .hljs-phpdoc,
pre .hljs-tex .hljs-formula {
color: #219161;
}
pre .hljs-title,
pre .hljs-id {
color: #19469D;
}
pre .hljs-params {
color: #00F;
}
pre .hljs-javascript .hljs-title,
pre .hljs-lisp .hljs-title,
pre .hljs-subst {
font-weight: normal
}
pre .hljs-class .hljs-title,
pre .hljs-haskell .hljs-label,
pre .hljs-tex .hljs-command {
color: #458;
font-weight: bold
}
pre .hljs-tag,
pre .hljs-tag .hljs-title,
pre .hljs-rules .hljs-property,
pre .hljs-django .hljs-tag .hljs-keyword {
color: #000080;
font-weight: normal
}
pre .hljs-attribute,
pre .hljs-variable,
pre .hljs-instancevar,
pre .hljs-lisp .hljs-body {
color: #008080
}
pre .hljs-regexp {
color: #B68
}
pre .hljs-class {
color: #458;
font-weight: bold
}
pre .hljs-symbol,
pre .hljs-ruby .hljs-symbol .hljs-string,
pre .hljs-ruby .hljs-symbol .hljs-keyword,
pre .hljs-ruby .hljs-symbol .hljs-keymethods,
pre .hljs-lisp .hljs-keyword,
pre .hljs-tex .hljs-special,
pre .hljs-input_number {
color: #990073
}
pre .hljs-builtin,
pre .hljs-constructor,
pre .hljs-built_in,
pre .hljs-lisp .hljs-title {
color: #0086b3
}
pre .hljs-preprocessor,
pre .hljs-pi,
pre .hljs-doctype,
pre .hljs-shebang,
pre .hljs-cdata {
color: #999;
font-weight: bold
}
pre .hljs-deletion {
background: #fdd
}
pre .hljs-addition {
background: #dfd
}
pre .hljs-diff .hljs-change {
background: #0086b3
}
pre .hljs-chunk {
color: #aaa
}
pre .hljs-tex .hljs-formula {
opacity: 0.5;
}

494
docs/src/consumer.html Normal file
View File

@ -0,0 +1,494 @@
<!DOCTYPE html>
<html>
<head>
<title>consumer.js</title>
<meta http-equiv="content-type" content="text/html; charset=UTF-8">
<meta name="viewport" content="width=device-width, target-densitydpi=160dpi, initial-scale=1.0; maximum-scale=1.0; user-scalable=0;">
<link rel="stylesheet" media="all" href="../docco.css" />
</head>
<body>
<div id="container">
<div id="background"></div>
<ul id="jump_to">
<li>
<a class="large" href="javascript:void(0);">Jump To &hellip;</a>
<a class="small" href="javascript:void(0);">+</a>
<div id="jump_wrapper">
<div id="jump_page_wrapper">
<div id="jump_page">
<a class="source" href="consumer.html">
./src/consumer.js
</a>
<a class="source" href="index.html">
./src/index.js
</a>
<a class="source" href="json-stream.html">
./src/json-stream.js
</a>
<a class="source" href="socket.html">
./src/socket.js
</a>
</div>
</div>
</li>
</ul>
<ul class="sections">
<li id="title">
<div class="annotation">
<h1>consumer.js</h1>
</div>
</li>
<li id="section-1">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-1">&#182;</a>
</div>
</div>
<div class="content"><div class='highlight'><pre><span class="hljs-keyword">import</span> { Socket } <span class="hljs-keyword">from</span> <span class="hljs-string">'net'</span>
<span class="hljs-keyword">import</span> path <span class="hljs-keyword">from</span> <span class="hljs-string">'path'</span>
<span class="hljs-keyword">import</span> btc <span class="hljs-keyword">from</span> <span class="hljs-string">'better-try-catch'</span>
<span class="hljs-keyword">import</span> JsonStream <span class="hljs-keyword">from</span> <span class="hljs-string">'./json-stream'</span></pre></div></div>
</li>
<li id="section-2">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-2">&#182;</a>
</div>
<p>import logger from ../../uci-logger/src/logger</p>
</div>
<div class="content"><div class='highlight'><pre><span class="hljs-keyword">import</span> logger <span class="hljs-keyword">from</span> <span class="hljs-string">'@uci/logger'</span>
<span class="hljs-keyword">let</span> log = {}</pre></div></div>
</li>
<li id="section-3">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-3">&#182;</a>
</div>
<p>TODO change default pipe dir for windows and mac os</p>
</div>
<div class="content"><div class='highlight'><pre><span class="hljs-keyword">const</span> DEFAULT_PIPE_DIR = (process.env.SOCKETS_DIR || <span class="hljs-string">'/tmp/UCI'</span>)
<span class="hljs-keyword">const</span> DEFAULT_SOCKET_NAME = <span class="hljs-string">'uci-sock'</span>
<span class="hljs-keyword">export</span> <span class="hljs-keyword">default</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">Consumer</span> <span class="hljs-keyword">extends</span> <span class="hljs-title">Socket</span> </span>{
<span class="hljs-keyword">constructor</span> (opts={}) {
<span class="hljs-keyword">super</span>()
log = logger({<span class="hljs-attr">file</span>:<span class="hljs-string">'src/consumer.js'</span>,<span class="hljs-attr">class</span>:<span class="hljs-string">'Consumer'</span>,<span class="hljs-attr">name</span>:<span class="hljs-string">'socket'</span>,<span class="hljs-attr">id</span>:<span class="hljs-keyword">this</span>.id})
<span class="hljs-keyword">this</span>.id = opts.id || opts.name || <span class="hljs-string">'socket:'</span>+ <span class="hljs-keyword">new</span> <span class="hljs-built_in">Date</span>().getTime()
<span class="hljs-keyword">if</span> (!opts.path) {
log.warn({<span class="hljs-attr">opts</span>:opts},<span class="hljs-string">'no host supplied using localhost...use named piped instead'</span>)
opts.host = opts.host || <span class="hljs-string">'127.0.0.1'</span>
opts.port = opts.port || <span class="hljs-number">8080</span>
} <span class="hljs-keyword">else</span> {
<span class="hljs-keyword">if</span> (<span class="hljs-keyword">typeof</span> opts.path === <span class="hljs-string">'boolean'</span>) opts.path = path.join(DEFAULT_PIPE_DIR,DEFAULT_SOCKET_NAME )
<span class="hljs-keyword">if</span> (path.dirname(opts.path)===<span class="hljs-string">'.'</span>) opts.path = path.join(DEFAULT_PIPE_DIR,opts.path )
}
<span class="hljs-keyword">this</span>.opts=opts
<span class="hljs-keyword">this</span>.keepAlive = <span class="hljs-string">'keepAlive'</span> <span class="hljs-keyword">in</span> opts ? opts.keepAlive : <span class="hljs-literal">true</span>
<span class="hljs-keyword">this</span>._ready = <span class="hljs-literal">false</span>
<span class="hljs-keyword">this</span>.timeout = opts.timeout || <span class="hljs-number">30</span>
<span class="hljs-keyword">this</span>.wait = opts.wait || <span class="hljs-number">1</span>
<span class="hljs-keyword">this</span>.stream = <span class="hljs-keyword">new</span> JsonStream()</pre></div></div>
</li>
<li id="section-4">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-4">&#182;</a>
</div>
<p>bind to class for other class functions</p>
</div>
<div class="content"><div class='highlight'><pre> <span class="hljs-keyword">this</span>.connect = <span class="hljs-keyword">this</span>.connect.bind(<span class="hljs-keyword">this</span>)
<span class="hljs-keyword">this</span>.__ready = <span class="hljs-keyword">this</span>.__ready.bind(<span class="hljs-keyword">this</span>)</pre></div></div>
</li>
<li id="section-5">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-5">&#182;</a>
</div>
<p>this.<em>write = this.</em>write.bind(this)</p>
</div>
<div class="content"><div class='highlight'><pre> }
<span class="hljs-keyword">async</span> connect () {
<span class="hljs-keyword">return</span> <span class="hljs-keyword">new</span> <span class="hljs-built_in">Promise</span>( <span class="hljs-function">(<span class="hljs-params">resolve,reject</span>) =&gt;</span> {
<span class="hljs-keyword">const</span> connect = <span class="hljs-function"><span class="hljs-params">()</span> =&gt;</span> {
<span class="hljs-keyword">if</span> (<span class="hljs-keyword">this</span>.opts.host ===<span class="hljs-string">'127.0.0.1'</span>) log.warn(<span class="hljs-string">'tcp consumer on same machine as host, use named Pipe(Unix) Socket Instead'</span>)
log.info({<span class="hljs-attr">opts</span>:<span class="hljs-keyword">this</span>.opts},<span class="hljs-string">`attempting to connect <span class="hljs-subst">${<span class="hljs-keyword">this</span>.id}</span> to socket`</span>)
<span class="hljs-keyword">super</span>.connect(<span class="hljs-keyword">this</span>.opts)
}
<span class="hljs-keyword">let</span> reconnect = {}
<span class="hljs-keyword">const</span> timeout = setTimeout(<span class="hljs-function"><span class="hljs-params">()</span> =&gt;</span>{
clearTimeout(reconnect)
log.fatal({<span class="hljs-attr">opts</span>:<span class="hljs-keyword">this</span>.opts},<span class="hljs-string">`unable to connect in <span class="hljs-subst">${<span class="hljs-keyword">this</span>.timeout}</span>s`</span>)
reject({<span class="hljs-attr">opts</span>:<span class="hljs-keyword">this</span>.opts},<span class="hljs-string">`unable to connect to socket server in <span class="hljs-subst">${<span class="hljs-keyword">this</span>.timeout}</span>secs`</span>)
}
,<span class="hljs-keyword">this</span>.timeout*<span class="hljs-number">1000</span>)
<span class="hljs-keyword">this</span>.once(<span class="hljs-string">'connect'</span>, <span class="hljs-keyword">async</span> () =&gt; {
clearTimeout(timeout)
<span class="hljs-keyword">this</span>._listen()
log.info({<span class="hljs-attr">opts</span>:<span class="hljs-keyword">this</span>.opts},<span class="hljs-string">'connected waiting for socket ready handshake'</span>)
<span class="hljs-keyword">this</span>.setKeepAlive(<span class="hljs-keyword">this</span>.keepAlive,<span class="hljs-number">100</span>)
<span class="hljs-keyword">let</span> [err, res] = <span class="hljs-keyword">await</span> btc(isReady).bind(<span class="hljs-keyword">this</span>)(<span class="hljs-keyword">this</span>.__ready, <span class="hljs-keyword">this</span>.wait, <span class="hljs-keyword">this</span>.timeout)
<span class="hljs-keyword">if</span> (err) reject(err)
log.info(<span class="hljs-string">'handshake done, authenticating'</span>)</pre></div></div>
</li>
<li id="section-6">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-6">&#182;</a>
</div>
<p>TODO authenticate here by encrypting a payload with private key and sending that.
await btc(authenticate)</p>
</div>
<div class="content"><div class='highlight'><pre> resolve(res)
})
<span class="hljs-keyword">this</span>.on(<span class="hljs-string">'error'</span>, <span class="hljs-keyword">async</span> (err) =&gt; {
log.warn({<span class="hljs-attr">error</span>:err.code},<span class="hljs-string">`connect error <span class="hljs-subst">${err.code}</span>`</span>)
<span class="hljs-keyword">if</span> (err.code === <span class="hljs-string">'EISCONN'</span>) {
<span class="hljs-keyword">return</span> resolve(<span class="hljs-string">'ready'</span>)
}
reconnect = setTimeout( <span class="hljs-function"><span class="hljs-params">()</span> =&gt;</span>{ connect()
},<span class="hljs-keyword">this</span>.wait*<span class="hljs-number">1000</span> )
})
<span class="hljs-keyword">this</span>.on(<span class="hljs-string">'end'</span>, <span class="hljs-keyword">async</span> () =&gt; {
log.warn(<span class="hljs-string">'socket (server) terminated unexpectantly'</span>)
<span class="hljs-keyword">if</span> (<span class="hljs-keyword">this</span>.keepAlive) {
log.info(<span class="hljs-string">'keep alive was set, so waiting on server to come online for reconnect'</span>)
<span class="hljs-keyword">this</span>.destroy()
<span class="hljs-keyword">this</span>.emit(<span class="hljs-string">'error'</span>, {<span class="hljs-attr">code</span>:<span class="hljs-string">'DISCONNECTED'</span>})
}
})
connect() <span class="hljs-comment">// initial connect request</span>
}) <span class="hljs-comment">//end promise</span>
}
<span class="hljs-keyword">async</span> send(ipacket) {
<span class="hljs-keyword">return</span> <span class="hljs-keyword">new</span> <span class="hljs-built_in">Promise</span>( <span class="hljs-keyword">async</span> (resolve) =&gt; {</pre></div></div>
</li>
<li id="section-7">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-7">&#182;</a>
</div>
<p>need this for when multiple sends for different consumers use same packet instance</p>
</div>
<div class="content"><div class='highlight'><pre> <span class="hljs-keyword">let</span> packet = <span class="hljs-built_in">Object</span>.assign({},ipacket)
setTimeout(<span class="hljs-function"><span class="hljs-params">()</span> =&gt;</span> {resolve({<span class="hljs-attr">error</span>:<span class="hljs-string">'no response from socket in 10sec'</span>})},<span class="hljs-number">10000</span>)
packet._header =
{ <span class="hljs-attr">id</span>:<span class="hljs-built_in">Math</span>.random().toString().slice(<span class="hljs-number">2</span>), <span class="hljs-comment">// need this for when multiple sends for different consumers use same packet instanceack</span>
sender:{ <span class="hljs-attr">name</span>:<span class="hljs-keyword">this</span>.name, <span class="hljs-attr">instanceID</span>:<span class="hljs-keyword">this</span>.id },
<span class="hljs-attr">path</span>: <span class="hljs-keyword">this</span>.opts.path,
<span class="hljs-attr">port</span>: <span class="hljs-keyword">this</span>.opts.port,
<span class="hljs-attr">host</span>: <span class="hljs-keyword">this</span>.opts.host
}
<span class="hljs-keyword">let</span> [err, res] = <span class="hljs-keyword">await</span> btc(<span class="hljs-keyword">this</span>.stream.serialize)(packet)
<span class="hljs-keyword">if</span> (err) resolve({<span class="hljs-attr">error</span>:<span class="hljs-string">'unable to serialize packet for sending'</span>, <span class="hljs-attr">packet</span>:packet})
<span class="hljs-keyword">await</span> <span class="hljs-keyword">this</span>.__write(res)
<span class="hljs-keyword">this</span>.once(packet._header.id,<span class="hljs-keyword">async</span> <span class="hljs-function"><span class="hljs-keyword">function</span>(<span class="hljs-params">reply</span>)</span>{
<span class="hljs-keyword">let</span> res = <span class="hljs-keyword">await</span> <span class="hljs-keyword">this</span>._packetProcess(reply)
<span class="hljs-keyword">if</span> (!res) { <span class="hljs-comment">// if process was not promise returning like just logged to console</span>
res = reply</pre></div></div>
</li>
<li id="section-8">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-8">&#182;</a>
</div>
<p>log.warn(consumer function was not promise returning further processing may be out of sequence)</p>
</div>
<div class="content"><div class='highlight'><pre> }
resolve(res)
}) <span class="hljs-comment">//end listener</span>
})
}</pre></div></div>
</li>
<li id="section-9">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-9">&#182;</a>
</div>
<p>TODO register alt stream processor (emit message with JSON, serialize function, onData method for raw socket chucks)
TODO register authenciation function (set up default)</p>
</div>
<div class="content"><div class='highlight'><pre>
registerPacketProcessor (func) {
<span class="hljs-keyword">this</span>._packetProcess = func
}</pre></div></div>
</li>
<li id="section-10">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-10">&#182;</a>
</div>
<p>PRIVATE METHODS</p>
</div>
<div class="content"><div class='highlight'><pre>
<span class="hljs-keyword">async</span> __write(packet) {</pre></div></div>
</li>
<li id="section-11">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-11">&#182;</a>
</div>
<p>timeout already set if sockect cant be drained in 10 secs</p>
</div>
<div class="content"><div class='highlight'><pre> <span class="hljs-keyword">return</span> <span class="hljs-keyword">new</span> <span class="hljs-built_in">Promise</span>(<span class="hljs-function"><span class="hljs-params">resolve</span> =&gt;</span> {
<span class="hljs-keyword">const</span> cb = <span class="hljs-function"><span class="hljs-params">()</span> =&gt;</span> resolve(<span class="hljs-string">'packet written to consumer side socket stream '</span>)
<span class="hljs-keyword">if</span> (!<span class="hljs-keyword">super</span>.write(packet)) {
<span class="hljs-keyword">this</span>.once(<span class="hljs-string">'drain'</span>,cb )
} <span class="hljs-keyword">else</span> {
process.nextTick(cb)
}
})
}
__ready() {<span class="hljs-keyword">return</span> <span class="hljs-keyword">this</span>._ready}
<span class="hljs-keyword">async</span> _listen () {
log.info(<span class="hljs-string">'listening for incoming packets from socket'</span>)</pre></div></div>
</li>
<li id="section-12">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-12">&#182;</a>
</div>
<p>listen for pushed packets</p>
</div>
<div class="content"><div class='highlight'><pre> <span class="hljs-keyword">this</span>.on(<span class="hljs-string">'pushed'</span>,<span class="hljs-keyword">async</span> <span class="hljs-function"><span class="hljs-keyword">function</span>(<span class="hljs-params">packet</span>)</span>{</pre></div></div>
</li>
<li id="section-13">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-13">&#182;</a>
</div>
<p>TODO do some extra security here?</p>
</div>
<div class="content"><div class='highlight'><pre> <span class="hljs-keyword">let</span> res = <span class="hljs-keyword">await</span> <span class="hljs-keyword">this</span>._packetProcess(packet)
<span class="hljs-keyword">if</span> (!res) { <span class="hljs-comment">// if process was not promise returning like just logged to console</span></pre></div></div>
</li>
<li id="section-14">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-14">&#182;</a>
</div>
<p>log.warn(consumer function was not promise returning)</p>
</div>
<div class="content"><div class='highlight'><pre> }
})</pre></div></div>
</li>
<li id="section-15">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-15">&#182;</a>
</div>
<p>listen on socket stream</p>
</div>
<div class="content"><div class='highlight'><pre> <span class="hljs-keyword">this</span>.on(<span class="hljs-string">'data'</span>, <span class="hljs-keyword">this</span>.stream.onData)
<span class="hljs-keyword">this</span>.stream.on(<span class="hljs-string">'message'</span>, messageProcess.bind(<span class="hljs-keyword">this</span>))
<span class="hljs-keyword">async</span> <span class="hljs-function"><span class="hljs-keyword">function</span> <span class="hljs-title">messageProcess</span> (<span class="hljs-params">packet</span>) </span>{</pre></div></div>
</li>
<li id="section-16">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-16">&#182;</a>
</div>
<p>console.log(incoming packet from socket,packet)</p>
</div>
<div class="content"><div class='highlight'><pre> <span class="hljs-keyword">if</span> (packet._handshake) {
<span class="hljs-keyword">this</span>._ready = <span class="hljs-literal">true</span>
<span class="hljs-keyword">return</span> }</pre></div></div>
</li>
<li id="section-17">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-17">&#182;</a>
</div>
<p>TODO send back ack with consumer ID and authorization and wait
when authorized drop through here to emit</p>
</div>
<div class="content"><div class='highlight'><pre> <span class="hljs-keyword">this</span>.emit(packet._header.id, packet)
}
}</pre></div></div>
</li>
<li id="section-18">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-18">&#182;</a>
</div>
<p>default packet process just a simple console logger. ignores any cmd: prop</p>
</div>
<div class="content"><div class='highlight'><pre> _packetProcess (packet) {
<span class="hljs-built_in">console</span>.log(<span class="hljs-string">'default consumer processor -- log packet from socket to console'</span>)
<span class="hljs-built_in">console</span>.dir(packet)
}
} <span class="hljs-comment">// end class</span></pre></div></div>
</li>
<li id="section-19">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-19">&#182;</a>
</div>
<p>HELP FUNCTIONS
wait until a passed ready function returns true</p>
</div>
<div class="content"><div class='highlight'><pre><span class="hljs-function"><span class="hljs-keyword">function</span> <span class="hljs-title">isReady</span>(<span class="hljs-params">ready, wait=<span class="hljs-number">30</span>, timeout=<span class="hljs-number">1000</span></span>) </span>{
<span class="hljs-keyword">let</span> time = <span class="hljs-number">0</span>
<span class="hljs-keyword">return</span> <span class="hljs-keyword">new</span> <span class="hljs-built_in">Promise</span>(<span class="hljs-function">(<span class="hljs-params">resolve, reject</span>) =&gt;</span> {
(<span class="hljs-function"><span class="hljs-keyword">function</span> <span class="hljs-title">waitReady</span>(<span class="hljs-params"></span>)</span>{
<span class="hljs-keyword">if</span> (time &gt; timeout) <span class="hljs-keyword">return</span> reject(<span class="hljs-string">`timeout waiting for socket ready handshake - <span class="hljs-subst">${timeout}</span>ms`</span>)
<span class="hljs-keyword">if</span> (ready()) <span class="hljs-keyword">return</span> resolve(<span class="hljs-string">'ready'</span>)
log.info(<span class="hljs-string">`waiting <span class="hljs-subst">${wait}</span>ms for handshake`</span>)
time += wait
setTimeout(waitReady, wait)
})()
})
}</pre></div></div>
</li>
</ul>
</div>
</body>
</html>

78
docs/src/index.html Normal file
View File

@ -0,0 +1,78 @@
<!DOCTYPE html>
<html>
<head>
<title>index.js</title>
<meta http-equiv="content-type" content="text/html; charset=UTF-8">
<meta name="viewport" content="width=device-width, target-densitydpi=160dpi, initial-scale=1.0; maximum-scale=1.0; user-scalable=0;">
<link rel="stylesheet" media="all" href="../docco.css" />
</head>
<body>
<div id="container">
<div id="background"></div>
<ul id="jump_to">
<li>
<a class="large" href="javascript:void(0);">Jump To &hellip;</a>
<a class="small" href="javascript:void(0);">+</a>
<div id="jump_wrapper">
<div id="jump_page_wrapper">
<div id="jump_page">
<a class="source" href="consumer.html">
./src/consumer.js
</a>
<a class="source" href="index.html">
./src/index.js
</a>
<a class="source" href="json-stream.html">
./src/json-stream.js
</a>
<a class="source" href="socket.html">
./src/socket.js
</a>
</div>
</div>
</li>
</ul>
<ul class="sections">
<li id="title">
<div class="annotation">
<h1>index.js</h1>
</div>
</li>
<li id="section-1">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-1">&#182;</a>
</div>
</div>
<div class="content"><div class='highlight'><pre><span class="hljs-keyword">import</span> Socket <span class="hljs-keyword">from</span> <span class="hljs-string">'./socket'</span>
<span class="hljs-keyword">import</span> Consumer <span class="hljs-keyword">from</span> <span class="hljs-string">'./consumer'</span>
<span class="hljs-keyword">export</span> { Socket <span class="hljs-keyword">as</span> Socket }
<span class="hljs-keyword">export</span> { Consumer <span class="hljs-keyword">as</span> Consumer }
<span class="hljs-keyword">export</span> <span class="hljs-keyword">default</span> { Socket, Consumer }</pre></div></div>
</li>
</ul>
</div>
</body>
</html>

199
docs/src/json-stream.html Normal file
View File

@ -0,0 +1,199 @@
<!DOCTYPE html>
<html>
<head>
<title>json-stream.js</title>
<meta http-equiv="content-type" content="text/html; charset=UTF-8">
<meta name="viewport" content="width=device-width, target-densitydpi=160dpi, initial-scale=1.0; maximum-scale=1.0; user-scalable=0;">
<link rel="stylesheet" media="all" href="../docco.css" />
</head>
<body>
<div id="container">
<div id="background"></div>
<ul id="jump_to">
<li>
<a class="large" href="javascript:void(0);">Jump To &hellip;</a>
<a class="small" href="javascript:void(0);">+</a>
<div id="jump_wrapper">
<div id="jump_page_wrapper">
<div id="jump_page">
<a class="source" href="consumer.html">
./src/consumer.js
</a>
<a class="source" href="index.html">
./src/index.js
</a>
<a class="source" href="json-stream.html">
./src/json-stream.js
</a>
<a class="source" href="socket.html">
./src/socket.js
</a>
</div>
</div>
</li>
</ul>
<ul class="sections">
<li id="title">
<div class="annotation">
<h1>json-stream.js</h1>
</div>
</li>
<li id="section-1">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-1">&#182;</a>
</div>
<p>adpated from <a href="https://github.com/sebastianseilund/node-json-socket">https://github.com/sebastianseilund/node-json-socket</a></p>
</div>
<div class="content"><div class='highlight'><pre>
<span class="hljs-keyword">import</span> {StringDecoder} <span class="hljs-keyword">from</span> <span class="hljs-string">'string_decoder'</span>
<span class="hljs-keyword">import</span> EventEmitter <span class="hljs-keyword">from</span> <span class="hljs-string">'events'</span>
<span class="hljs-keyword">import</span> btc <span class="hljs-keyword">from</span> <span class="hljs-string">'better-try-catch'</span>
<span class="hljs-keyword">const</span> decoder = <span class="hljs-keyword">new</span> StringDecoder()
<span class="hljs-keyword">export</span> <span class="hljs-keyword">default</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">JsonStream</span> <span class="hljs-keyword">extends</span> <span class="hljs-title">EventEmitter</span></span>{
<span class="hljs-keyword">constructor</span>(opts={}){
<span class="hljs-keyword">super</span>()
<span class="hljs-keyword">this</span>._contentLength = <span class="hljs-literal">null</span>
<span class="hljs-keyword">this</span>._buffer = <span class="hljs-string">''</span>
<span class="hljs-keyword">this</span>._delimeter = opts.delimiter || <span class="hljs-string">'#'</span>
<span class="hljs-keyword">this</span>.onData = <span class="hljs-keyword">this</span>.onData.bind(<span class="hljs-keyword">this</span>)
<span class="hljs-keyword">this</span>.serialize = <span class="hljs-keyword">this</span>.serialize.bind(<span class="hljs-keyword">this</span>)
}
onData (data) {</pre></div></div>
</li>
<li id="section-2">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-2">&#182;</a>
</div>
<p>console.log(a chunk arrived, data)</p>
</div>
<div class="content"><div class='highlight'><pre> data = decoder.write(data)
<span class="hljs-keyword">try</span> {
<span class="hljs-keyword">this</span>._handleData(data)
} <span class="hljs-keyword">catch</span> (e) {
<span class="hljs-keyword">this</span>.emit(<span class="hljs-string">'error'</span>, { <span class="hljs-attr">error</span>: e })
}
}
<span class="hljs-keyword">async</span> serialize(message) {
<span class="hljs-keyword">return</span> <span class="hljs-keyword">new</span> <span class="hljs-built_in">Promise</span>( <span class="hljs-function">(<span class="hljs-params">resolve,reject</span>) =&gt;</span> {
<span class="hljs-keyword">let</span> [err,messageData] = btc(<span class="hljs-built_in">JSON</span>.stringify)(message)
<span class="hljs-keyword">if</span> (err) reject(err)
<span class="hljs-keyword">let</span> [err2,length] = btc(Buffer.byteLength)(messageData, <span class="hljs-string">'utf8'</span>)
<span class="hljs-keyword">if</span> (err2) reject(err2)
<span class="hljs-keyword">let</span> data = length + <span class="hljs-keyword">this</span>._delimeter + messageData</pre></div></div>
</li>
<li id="section-3">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-3">&#182;</a>
</div>
<p>console.log(serialized,data)</p>
</div>
<div class="content"><div class='highlight'><pre> resolve(data)
})
}
_handleData (data) {
<span class="hljs-keyword">this</span>._buffer += data
<span class="hljs-keyword">if</span> (<span class="hljs-keyword">this</span>._contentLength == <span class="hljs-literal">null</span>) {
<span class="hljs-keyword">var</span> i = <span class="hljs-keyword">this</span>._buffer.indexOf(<span class="hljs-keyword">this</span>._delimeter)</pre></div></div>
</li>
<li id="section-4">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-4">&#182;</a>
</div>
<p>Check if the buffer has a this._opts.delimeter or “#”, if not, the end of the buffer string might be in the middle of a content length string</p>
</div>
<div class="content"><div class='highlight'><pre> <span class="hljs-keyword">if</span> (i !== <span class="hljs-number">-1</span>) {
<span class="hljs-keyword">var</span> rawContentLength = <span class="hljs-keyword">this</span>._buffer.substring(<span class="hljs-number">0</span>, i)
<span class="hljs-keyword">this</span>._contentLength = <span class="hljs-built_in">parseInt</span>(rawContentLength)
<span class="hljs-keyword">if</span> (<span class="hljs-built_in">isNaN</span>(<span class="hljs-keyword">this</span>._contentLength)) {
<span class="hljs-keyword">this</span>._contentLength = <span class="hljs-literal">null</span>
<span class="hljs-keyword">this</span>._buffer = <span class="hljs-string">''</span>
<span class="hljs-keyword">var</span> err = <span class="hljs-keyword">new</span> <span class="hljs-built_in">Error</span>(<span class="hljs-string">'Invalid content length supplied ('</span>+rawContentLength+<span class="hljs-string">') in: '</span>+<span class="hljs-keyword">this</span>._buffer)
err.code = <span class="hljs-string">'E_INVALID_CONTENT_LENGTH'</span>
<span class="hljs-keyword">throw</span> err
}
<span class="hljs-keyword">this</span>._buffer = <span class="hljs-keyword">this</span>._buffer.substring(i+<span class="hljs-number">1</span>)
}
}
<span class="hljs-keyword">if</span> (<span class="hljs-keyword">this</span>._contentLength != <span class="hljs-literal">null</span>) {
<span class="hljs-keyword">var</span> length = Buffer.byteLength(<span class="hljs-keyword">this</span>._buffer, <span class="hljs-string">'utf8'</span>)
<span class="hljs-keyword">if</span> (length == <span class="hljs-keyword">this</span>._contentLength) {
<span class="hljs-keyword">this</span>._handleMessage(<span class="hljs-keyword">this</span>._buffer)
} <span class="hljs-keyword">else</span> <span class="hljs-keyword">if</span> (length &gt; <span class="hljs-keyword">this</span>._contentLength) {
<span class="hljs-keyword">var</span> message = <span class="hljs-keyword">this</span>._buffer.substring(<span class="hljs-number">0</span>, <span class="hljs-keyword">this</span>._contentLength)
<span class="hljs-keyword">var</span> rest = <span class="hljs-keyword">this</span>._buffer.substring(<span class="hljs-keyword">this</span>._contentLength)
<span class="hljs-keyword">this</span>._handleMessage(message)
<span class="hljs-keyword">this</span>.onData(rest)
}
}
}
_handleMessage (data) {
<span class="hljs-keyword">this</span>._contentLength = <span class="hljs-literal">null</span>
<span class="hljs-keyword">this</span>._buffer = <span class="hljs-string">''</span>
<span class="hljs-keyword">var</span> message
<span class="hljs-keyword">try</span> {
message = <span class="hljs-built_in">JSON</span>.parse(data)
} <span class="hljs-keyword">catch</span> (e) {
<span class="hljs-keyword">var</span> err = <span class="hljs-keyword">new</span> <span class="hljs-built_in">Error</span>(<span class="hljs-string">'Could not parse JSON: '</span>+e.message+<span class="hljs-string">'\nRequest data: '</span>+data)
err.code = <span class="hljs-string">'E_INVALID_JSON'</span>
<span class="hljs-keyword">throw</span> err
}
message = message || {}
<span class="hljs-keyword">this</span>.emit(<span class="hljs-string">'message'</span>, message)
}
}</pre></div></div>
</li>
</ul>
</div>
</body>
</html>

403
docs/src/socket.html Normal file
View File

@ -0,0 +1,403 @@
<!DOCTYPE html>
<html>
<head>
<title>socket.js</title>
<meta http-equiv="content-type" content="text/html; charset=UTF-8">
<meta name="viewport" content="width=device-width, target-densitydpi=160dpi, initial-scale=1.0; maximum-scale=1.0; user-scalable=0;">
<link rel="stylesheet" media="all" href="../docco.css" />
</head>
<body>
<div id="container">
<div id="background"></div>
<ul id="jump_to">
<li>
<a class="large" href="javascript:void(0);">Jump To &hellip;</a>
<a class="small" href="javascript:void(0);">+</a>
<div id="jump_wrapper">
<div id="jump_page_wrapper">
<div id="jump_page">
<a class="source" href="consumer.html">
./src/consumer.js
</a>
<a class="source" href="index.html">
./src/index.js
</a>
<a class="source" href="json-stream.html">
./src/json-stream.js
</a>
<a class="source" href="socket.html">
./src/socket.js
</a>
</div>
</div>
</li>
</ul>
<ul class="sections">
<li id="title">
<div class="annotation">
<h1>socket.js</h1>
</div>
</li>
<li id="section-1">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-1">&#182;</a>
</div>
</div>
<div class="content"><div class='highlight'><pre><span class="hljs-keyword">import</span> { Server } <span class="hljs-keyword">from</span> <span class="hljs-string">'net'</span>
<span class="hljs-keyword">import</span> { unlink <span class="hljs-keyword">as</span> fileDelete } <span class="hljs-keyword">from</span> <span class="hljs-string">'fs'</span>
<span class="hljs-keyword">import</span> { promisify } <span class="hljs-keyword">from</span> <span class="hljs-string">'util'</span>
<span class="hljs-keyword">import</span> path <span class="hljs-keyword">from</span> <span class="hljs-string">'path'</span>
<span class="hljs-keyword">import</span> mkdir <span class="hljs-keyword">from</span> <span class="hljs-string">'make-dir'</span>
<span class="hljs-keyword">import</span> btc <span class="hljs-keyword">from</span> <span class="hljs-string">'better-try-catch'</span>
<span class="hljs-keyword">import</span> _ON_DEATH <span class="hljs-keyword">from</span> <span class="hljs-string">'death'</span> <span class="hljs-comment">//this is intentionally ugly</span>
<span class="hljs-keyword">import</span> JSONStream <span class="hljs-keyword">from</span> <span class="hljs-string">'./json-stream'</span>
<span class="hljs-keyword">import</span> clone <span class="hljs-keyword">from</span> <span class="hljs-string">'clone'</span></pre></div></div>
</li>
<li id="section-2">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-2">&#182;</a>
</div>
<p>import logger from ../../uci-logger/src/logger</p>
</div>
<div class="content"><div class='highlight'><pre><span class="hljs-keyword">import</span> logger <span class="hljs-keyword">from</span> <span class="hljs-string">'@uci/logger'</span>
<span class="hljs-keyword">let</span> log = {}</pre></div></div>
</li>
<li id="section-3">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-3">&#182;</a>
</div>
<p>TODO change default pipe dir for windows and mac os</p>
</div>
<div class="content"><div class='highlight'><pre><span class="hljs-keyword">const</span> DEFAULT_PIPE_DIR = (process.env.SOCKETS_DIR || <span class="hljs-string">'/tmp/UCI'</span>)
<span class="hljs-keyword">const</span> DEFAULT_SOCKET_NAME = <span class="hljs-string">'uci-sock'</span>
<span class="hljs-keyword">export</span> <span class="hljs-keyword">default</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">Socket</span> <span class="hljs-keyword">extends</span> <span class="hljs-title">Server</span> </span>{
<span class="hljs-keyword">constructor</span> (opts={}) {
<span class="hljs-keyword">super</span>()
<span class="hljs-keyword">this</span>.id = opts.id || opts.name || <span class="hljs-string">'socket:'</span>+ <span class="hljs-keyword">new</span> <span class="hljs-built_in">Date</span>().getTime()
<span class="hljs-keyword">if</span> (!opts.path) {
opts.host = opts.host || <span class="hljs-string">'0.0.0.0'</span>
opts.port = opts.port || <span class="hljs-number">8080</span>
} <span class="hljs-keyword">else</span> {
<span class="hljs-keyword">if</span> (<span class="hljs-keyword">typeof</span> opts.path === <span class="hljs-string">'boolean'</span>) opts.path = path.join(DEFAULT_PIPE_DIR,DEFAULT_SOCKET_NAME )
<span class="hljs-keyword">if</span> (path.dirname(opts.path)===<span class="hljs-string">'.'</span>) opts.path = path.join(DEFAULT_PIPE_DIR,opts.path )
}
<span class="hljs-keyword">this</span>.clientTracking = opts.clientTracking || <span class="hljs-literal">true</span>
<span class="hljs-keyword">this</span>.clients = [] <span class="hljs-comment">// track consumers (i.e. clients)</span>
<span class="hljs-keyword">this</span>.opts = opts <span class="hljs-comment">// for use to recover from selected errors</span></pre></div></div>
</li>
<li id="section-4">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-4">&#182;</a>
</div>
<p>self bindings</p>
</div>
<div class="content"><div class='highlight'><pre> <span class="hljs-keyword">this</span>._listen = <span class="hljs-keyword">this</span>._listen.bind(<span class="hljs-keyword">this</span>)
<span class="hljs-keyword">this</span>.create = <span class="hljs-keyword">this</span>.create.bind(<span class="hljs-keyword">this</span>)
log = logger({<span class="hljs-attr">file</span>:<span class="hljs-string">'src/socket.js'</span>,<span class="hljs-attr">class</span>:<span class="hljs-string">'Socket'</span>,<span class="hljs-attr">name</span>:<span class="hljs-string">'socket'</span>,<span class="hljs-attr">id</span>:<span class="hljs-keyword">this</span>.id})
} <span class="hljs-comment">// end constructor</span>
<span class="hljs-keyword">async</span> create () {
<span class="hljs-keyword">return</span> <span class="hljs-keyword">new</span> <span class="hljs-built_in">Promise</span>( <span class="hljs-keyword">async</span> (resolve,reject) =&gt; {</pre></div></div>
</li>
<li id="section-5">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-5">&#182;</a>
</div>
<p>couple ways to kill socket process when needed</p>
</div>
<div class="content"><div class='highlight'><pre> _ON_DEATH( <span class="hljs-keyword">async</span> () =&gt; {
log.info(<span class="hljs-string">'\nhe\'s dead jim'</span>)
<span class="hljs-keyword">await</span> <span class="hljs-keyword">this</span>._destroy()
})
process.once(<span class="hljs-string">'SIGUSR2'</span>, <span class="hljs-keyword">async</span> () =&gt; {
<span class="hljs-keyword">await</span> <span class="hljs-keyword">this</span>._destroy
process.kill(process.pid, <span class="hljs-string">'SIGUSR2'</span>)
})
<span class="hljs-keyword">this</span>.once(<span class="hljs-string">'error'</span>, <span class="hljs-keyword">async</span> (err) =&gt; {</pre></div></div>
</li>
<li id="section-6">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-6">&#182;</a>
</div>
<p>recover from socket file that was not removed</p>
</div>
<div class="content"><div class='highlight'><pre> <span class="hljs-keyword">if</span> (err.code === <span class="hljs-string">'EADDRINUSE'</span>) {
<span class="hljs-keyword">if</span> (<span class="hljs-keyword">this</span>.opts.path) { <span class="hljs-comment">// if TCP socket should already be dead</span>
<span class="hljs-keyword">let</span> [err, res] = <span class="hljs-keyword">await</span> btc(promisify(fileDelete))(<span class="hljs-keyword">this</span>.opts.path)
<span class="hljs-keyword">if</span>(!err) {
log.info({<span class="hljs-attr">res</span>:res, <span class="hljs-attr">socket</span>: <span class="hljs-keyword">this</span>.opts.path}, <span class="hljs-string">'socket already exists.....deleted'</span>)
<span class="hljs-keyword">return</span> <span class="hljs-keyword">await</span> <span class="hljs-keyword">this</span>._listen(<span class="hljs-keyword">this</span>.opts)
}
log.fatal({<span class="hljs-attr">err</span>:err},<span class="hljs-string">'error deleting socket. Can not establish a socket'</span>)
<span class="hljs-keyword">return</span> err
}
}
<span class="hljs-keyword">if</span> (err.code ===<span class="hljs-string">'EACCES'</span>){
<span class="hljs-built_in">console</span>.log({<span class="hljs-attr">socket</span>: <span class="hljs-keyword">this</span>.opts.path}, <span class="hljs-string">'directory does not exist...creating'</span>)
<span class="hljs-keyword">await</span> mkdir(path.dirname(<span class="hljs-keyword">this</span>.opts.path))
<span class="hljs-built_in">console</span>.log({<span class="hljs-attr">socket</span>: <span class="hljs-keyword">this</span>.opts.path}, <span class="hljs-string">'created'</span>)
log.warn({<span class="hljs-attr">socket</span>: <span class="hljs-keyword">this</span>.opts.path}, <span class="hljs-string">'directory does not exist...creating'</span>)
<span class="hljs-keyword">return</span> <span class="hljs-keyword">await</span> <span class="hljs-keyword">this</span>._listen(<span class="hljs-keyword">this</span>.opts)
}</pre></div></div>
</li>
<li id="section-7">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-7">&#182;</a>
</div>
<p>otherwise fatally exit</p>
</div>
<div class="content"><div class='highlight'><pre> log.info(err, <span class="hljs-string">'creating socket'</span>)
reject(err)
})
<span class="hljs-keyword">let</span> [err, res] = <span class="hljs-keyword">await</span> btc(<span class="hljs-keyword">this</span>._listen)(<span class="hljs-keyword">this</span>.opts)
<span class="hljs-keyword">if</span> (err) reject(err)
resolve(res)
}) <span class="hljs-comment">// end creeate promise</span>
} <span class="hljs-comment">// end create</span>
registerPacketProcessor (func) {
<span class="hljs-keyword">this</span>._packetProcess = func
}
<span class="hljs-keyword">async</span> _listen (opts) {
<span class="hljs-keyword">super</span>.listen(opts, <span class="hljs-keyword">async</span> (err, res) =&gt; {
<span class="hljs-keyword">if</span> (err) <span class="hljs-keyword">return</span> err</pre></div></div>
</li>
<li id="section-8">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-8">&#182;</a>
</div>
<p>this gets called for each client connection and is unique to each</p>
</div>
<div class="content"><div class='highlight'><pre> <span class="hljs-keyword">this</span>.on(<span class="hljs-string">'connection'</span>, <span class="hljs-keyword">async</span> (socket) =&gt; {
<span class="hljs-keyword">const</span> stream = <span class="hljs-keyword">new</span> JSONStream()
socket.stream = stream <span class="hljs-comment">// need this to track clients</span>
<span class="hljs-keyword">let</span> send = <span class="hljs-keyword">this</span>._send.bind(socket)
<span class="hljs-keyword">if</span> (<span class="hljs-keyword">this</span>.clientTracking) <span class="hljs-keyword">this</span>.clients.push(socket)</pre></div></div>
</li>
<li id="section-9">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-9">&#182;</a>
</div>
<p>TODO add close listener to socket to remove from this.clients</p>
</div>
<div class="content"><div class='highlight'><pre> log.info(<span class="hljs-string">'new consumer connecting'</span>)
log.info(<span class="hljs-keyword">await</span> send(<span class="hljs-keyword">await</span> stream.serialize({<span class="hljs-string">'_handshake'</span>:<span class="hljs-literal">true</span>})))
<span class="hljs-keyword">if</span> (<span class="hljs-keyword">this</span>.opts.conPacket) {
<span class="hljs-keyword">this</span>.opts.conPacket._header = { <span class="hljs-attr">id</span>:<span class="hljs-string">'pushed'</span>}
log.info({<span class="hljs-attr">conPacket</span>:<span class="hljs-keyword">this</span>.opts.conPacket},<span class="hljs-string">'pushing a preset command to just connected consumer'</span>)
send(<span class="hljs-keyword">await</span> stream.serialize(<span class="hljs-keyword">this</span>.opts.conPacket)) <span class="hljs-comment">// send a packet command on to consumer on connection</span>
}
socket.on(<span class="hljs-string">'data'</span>, stream.onData)</pre></div></div>
</li>
<li id="section-10">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-10">&#182;</a>
</div>
<p>TODO need to start error listener for stream so errors can be processed</p>
</div>
<div class="content"><div class='highlight'><pre> stream.on(<span class="hljs-string">'message'</span>, messageProcess.bind(<span class="hljs-keyword">this</span>,socket))
<span class="hljs-keyword">async</span> <span class="hljs-function"><span class="hljs-keyword">function</span> <span class="hljs-title">messageProcess</span> (<span class="hljs-params">client, packet</span>) </span>{
log.info({<span class="hljs-attr">packet</span>:packet},<span class="hljs-string">'incoming packet on socket side'</span>)
<span class="hljs-keyword">let</span> res = {}
<span class="hljs-keyword">if</span> (<span class="hljs-keyword">this</span>.clientTracking &amp;&amp; packet.clientID) {
client.ID = packet.clientID
res.cmd=<span class="hljs-string">'ackID'</span>
}
<span class="hljs-keyword">else</span> {
res = <span class="hljs-keyword">await</span> <span class="hljs-keyword">this</span>._packetProcess(clone(packet)) || {}
<span class="hljs-keyword">if</span> (<span class="hljs-built_in">Object</span>.keys(res).length === <span class="hljs-number">0</span>) res = { <span class="hljs-attr">error</span>: <span class="hljs-string">'socket packet command function likely did not return a promise'</span>, <span class="hljs-attr">packet</span>:packet}
}
<span class="hljs-keyword">if</span> (packet) {
res._header = clone(packet._header,<span class="hljs-literal">false</span>) || {} <span class="hljs-comment">//make sure return packet has header with id in case it was removed in processing</span>
<span class="hljs-keyword">delete</span> packet._header <span class="hljs-comment">// remove before adding to response header as request</span>
} <span class="hljs-keyword">else</span> res._header = {}
res._header.request = clone(packet,<span class="hljs-literal">false</span>)
res._header.responder = {<span class="hljs-attr">name</span>:<span class="hljs-keyword">this</span>.name,<span class="hljs-attr">instanceID</span>:<span class="hljs-keyword">this</span>.id}
res._header.socket = <span class="hljs-keyword">this</span>.address()
<span class="hljs-keyword">if</span> (!res.cmd) res.cmd = <span class="hljs-string">'reply'</span> <span class="hljs-comment">// by default return command is 'reply'</span>
<span class="hljs-keyword">let</span> [err, ser] = <span class="hljs-keyword">await</span> btc(stream.serialize)(res)
<span class="hljs-keyword">if</span> (err) ser = <span class="hljs-keyword">await</span> stream.serialize({ <span class="hljs-attr">error</span>: <span class="hljs-string">'was not able to serialze the res packet'</span>, <span class="hljs-attr">err</span>:err, <span class="hljs-attr">_header</span>:{<span class="hljs-attr">id</span>:res._header.id}})
log.info(<span class="hljs-keyword">await</span> send(ser))
} <span class="hljs-comment">// end process message</span>
}) <span class="hljs-comment">// end connecttion consumer</span>
log.info({<span class="hljs-attr">opts</span>: <span class="hljs-keyword">this</span>.opts},<span class="hljs-string">'socket created'</span>)
<span class="hljs-keyword">return</span> res
}) <span class="hljs-comment">// end super listen callback</span>
} <span class="hljs-comment">// end listen</span>
<span class="hljs-keyword">async</span> _destroy () {
log.info(<span class="hljs-string">'closing down socket'</span>)
<span class="hljs-keyword">await</span> <span class="hljs-keyword">this</span>.close()
log.info(<span class="hljs-string">'all connections closed....exiting'</span>)
process.exit()
}</pre></div></div>
</li>
<li id="section-11">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-11">&#182;</a>
</div>
<p>default packet process, just a simple echo</p>
</div>
<div class="content"><div class='highlight'><pre> <span class="hljs-keyword">async</span> _packetProcess (packet) {
<span class="hljs-keyword">return</span> <span class="hljs-keyword">new</span> <span class="hljs-built_in">Promise</span>(<span class="hljs-function"><span class="hljs-params">resolve</span> =&gt;</span> {
resolve(packet)
})
}</pre></div></div>
</li>
<li id="section-12">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-12">&#182;</a>
</div>
<p>must have a consumer socket bound to use</p>
</div>
<div class="content"><div class='highlight'><pre> <span class="hljs-keyword">async</span> _send(packet) {</pre></div></div>
</li>
<li id="section-13">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-13">&#182;</a>
</div>
<p>timeout already set if sockect cant be drained in 10 secs</p>
</div>
<div class="content"><div class='highlight'><pre> <span class="hljs-keyword">return</span> <span class="hljs-keyword">new</span> <span class="hljs-built_in">Promise</span>(<span class="hljs-function"><span class="hljs-params">resolve</span> =&gt;</span> {
<span class="hljs-keyword">const</span> cb = <span class="hljs-function"><span class="hljs-params">()</span> =&gt;</span> resolve(<span class="hljs-string">'packet written to socket stream'</span>)
<span class="hljs-keyword">if</span> (!<span class="hljs-keyword">this</span>.write(packet)) {
<span class="hljs-keyword">this</span>.once(<span class="hljs-string">'drain'</span>,cb )
} <span class="hljs-keyword">else</span> {
process.nextTick(cb)
}
})
}
<span class="hljs-keyword">async</span> push (packet,id) {
packet._header = { <span class="hljs-attr">id</span>:<span class="hljs-string">'pushed'</span>}
log.info({<span class="hljs-attr">opts</span>:<span class="hljs-keyword">this</span>.opts,<span class="hljs-attr">packet</span>:packet},<span class="hljs-string">'pushing a packet to all connected consumers'</span>)
<span class="hljs-keyword">this</span>.clients.forEach(<span class="hljs-keyword">async</span> (client) =&gt; {
<span class="hljs-keyword">if</span> (client.writable) {
<span class="hljs-keyword">let</span> [err, ser] = <span class="hljs-keyword">await</span> btc(client.stream.serialize)(packet)
<span class="hljs-keyword">if</span> (err) ser = <span class="hljs-keyword">await</span> client.stream.serialize({ <span class="hljs-attr">error</span>: <span class="hljs-string">'was not able to serialze the res packet'</span>, <span class="hljs-attr">err</span>:err, <span class="hljs-attr">_header</span>:{<span class="hljs-attr">id</span>:packet._header.id}})
<span class="hljs-keyword">if</span> (!id || id ===client.ID ) <span class="hljs-keyword">await</span> <span class="hljs-keyword">this</span>._send.bind(client)(ser)
}
})
}
} <span class="hljs-comment">// end class</span></pre></div></div>
</li>
</ul>
</div>
</body>
</html>

33
examples/client-push.js Normal file
View File

@ -0,0 +1,33 @@
import Consumer from '../src/consumer'
// const client1= new Consumer({name:'example-consumer1' })
const client= new Consumer({path:true, name:'example-consumer'})
// let packet = {name: 'client', cmd:'doit', data:'data sent by client'}
// This is your client handler object waiting on a message to do something
const process = async function (packet) {
// return new Promise((resolve) => {
// console.log('====== packet pushed from server ======')
// console.dir(packet)
// // setTimeout(resolve('done'),100)
// resolve('done')
// })
console.log('====== packet pushed from server ======')
console.dir(packet)
// setTimeout(resolve('done'),100)
return Promise.resolve('done')
}
client.registerPacketProcessor(process)
;
(async () => {
console.log(await client.connect())
})().catch(err => {
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)
})

View File

@ -1,80 +1,25 @@
import Consumer from '../src/consumer'
import btc from 'better-try-catch'
// const PATH ='/opt/bogus/socket'
const PATH=true
const client= new Consumer({path:PATH, name:'example-consumer', initTimeout:30 })
// const client1= new Consumer({name:'example-consumer1' })
const client= new Consumer({path:true, name:'example-consumer' })
let packet = {name: 'client', cmd:'doit', data:'data sent by client'}
// This is your client handler object waiting on a message to do something
async function processor (packet) {
// console.log('packet being processed at socket', packet)
if (packet.cmd) {
if (this[packet.cmd]) return await this[packet.cmd](packet)
else {
console.log('no processing function for command', packet.cmd)
return {error: 'command has no processing function', packet: packet }
}
}
console.log('no command in packet', packet)
return {error: 'no command in packet', packet: packet }
const process = function (packet) {
console.log(`Packet from ${packet.name} Processed by Socket: ${packet.status}`)
}
client.registerPacketProcessor(processor)
client.reply = (packet) => {
return new Promise(resolve => {
console.log('generic reply command from server\n',packet.data)
resolve()
})
}
client.onconnect = (packet) => {
return new Promise(resolve => {
console.log('on connect server sent command processed here\n',packet.data)
resolve()
})
}
client.pushed = (packet) => {
return new Promise(resolve => {
console.log('pushed packet\n',packet.status)
resolve()
})
}
client.on('status', event => {
console.log('============ socket status ============')
console.log('status level',event.level)
console.log(event.id)
console.log(event.msg)
console.log(`Consumer is ${event.connected ? 'connected' : 'disconnected'}`)
console.log(`Authenticated? ${event.authenticated || false}`)
console.log('======================================')
})
client.registerPacketProcessor(process)
;
(async () => {
console.log('ready at start',client.ready)
// await Promise.all([client1.connect(),client2.connect()])
let [err, res] = await btc(client.connect)()
if (err) {
console.log('error', err)
} else {
console.log(res)
let packet = {name: 'client', cmd:'doit', data:'sent by client'}
console.log('=============sending a test packet=========\n', packet)
console.log('can also await relay of any send too for processing in sync manner\n',await client.send(packet))
console.log('===============')
}
setTimeout(()=> {
console.log('=============Consumer now Offline=============')
process.exit(1)
}
,120000)
// await Promise.all([client1.connect(),client2.connect()])
await client.connect()
console.log('sending packet ', packet)
console.log('=========\n',await client.send(packet))
// client.end()
})().catch(err => {
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)

37
examples/client2.js Normal file
View File

@ -0,0 +1,37 @@
import Consumer from '../src/consumer'
const USOCKET = __dirname + '/sample.sock'
class Client extends Consumer {
constructor(path,opts) {
super(path,opts)
}
async _packetProcess (packet) {
this[packet.cmd](packet)
}
async reply (packet) {
console.log(`Packet from ${packet.name} Processed by Socket: ${packet.status}`)
console.log(`Socket replied with data: ${packet.data}`)
}
}
const client1= new Client({path:true,name:'example-consumer1' })
const client2 = new Client({path:true,name:'example-consumer2'})
let packet1 = {name: 'client1', cmd:'doit', data:'data sent by client1'}
let packet2 = {name: 'client2', cmd:'doit', data:'data sent by client2'}
;
(async () => {
await Promise.all([client1.connect(),client2.connect()])
console.log(await Promise.all([client1.send(packet1),client2.send(packet2)]))
client1.end()
client2.end()
})().catch(err => {
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)
})

View File

42
examples/server-push.js Normal file
View File

@ -0,0 +1,42 @@
import { Socket } from '../src'
;
(async () => {
class Test extends Socket {
constructor(opts) {
super(opts)
}
async _packetProcess(packet) {
console.log('packet being processed at socket')
if (packet.cmd) return await this[packet.cmd](packet.data,packet.name)
return {error: 'no command in packet', packet: packet }
}
async doit(data,name) {
return new Promise(resolve => {
let res = {}
console.log('data sent to doit = ', data)
res.status ='success'
res.name = name
res.cmd = 'reply'
res.data = 'this would be response data from socket doit function'
resolve(res)
})
}
}
// let test = new Test()
let test = new Test({path:true, conPacket:{onconnect:'this is a packet sent to consumer as soon as it connects'}})
await test.create()
let count = 0
setInterval( () => {
count++
test.push({name:'pushed', count:count, status:'some pushed data'}) },10000)
})().catch(err => {
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)
})

View File

@ -1,4 +1,5 @@
import { Socket as uSocket} from '../src'
// import { fs } from 'mz'
// made key cert into module that also uses environment variables
// const TLS = process.env.TLS || false
@ -9,56 +10,6 @@ import { Socket as uSocket} from '../src'
let Socket = uSocket
class Test extends Socket {
constructor(opts) {
super(opts)
}
async doit(packet) {
return new Promise(resolve => {
let res = {}
console.log('command doit sent with data = ', packet.data)
res.status ='success'
res.cmd = 'reply'
res.data = 'this would be response data from socket doit function'
resolve(res)
})
}
}
// const options = {
// tls: TLS,
// key: await fs.readFile(TLS_KEY_PATH),
// cert: await fs.readFile(TLS_CRT_PATH),
// // This is necessary only if using client certificate authentication.
// // requestCert: true,
// // This is necessary only if the client uses a self-signed certificate.
// // ca: [ fs.readFileSync('client-cert.pem') ]
// }
// const PATH = '/opt/bogus/socket'
const PATH = true
const PUSHES = 3
// options.conPacket = {cmd:'onconnect', data:'this is a packet data sent consumer after handshake/authentification'}
const TOKENS = ['cheetos']
let test = new Test({path:PATH, tokens:TOKENS})
async function processor (packet) {
// console.log('packet being processed at socket', packet)
if (packet.cmd) {
if (this[packet.cmd]) return await this[packet.cmd](packet)
else {
console.log('no processing function for command', packet.cmd)
return {error: 'command has no processing function', packet: packet }
}
}
console.log('no command in packet', packet)
return {error: 'no command in packet', packet: packet }
}
test.registerPacketProcessor(processor)
;
(async () => {
// TODO dynamic import
@ -67,24 +18,49 @@ test.registerPacketProcessor(processor)
// console.log('using TLS')
// }
// test.addTokens('cheetos')
await test.create()
class Test extends Socket {
constructor(opts) {
super(opts)
}
let count = 0
const push = setInterval( () => {
count++
test.push({cmd:'pushed', count:count, status:`pushing some data ${count} of ${PUSHES}`})
if (count >PUSHES) {
clearInterval(push)
test.push({cmd:'pushed',status:'now will simulate server going offline by stopping to send ping for 15 seconds'})
test.disablePing()
setTimeout( () => {
test.enablePing()
},15000)
async _packetProcess(packet) {
console.log('packet being processed at socket')
if (packet.cmd) return await this[packet.cmd](packet.data,packet.name)
return {error: 'no command in packet', packet: packet }
}
async doit(data,name) {
return new Promise(resolve => {
let res = {}
console.log('data sent to doit = ', data)
res.status ='success'
res.name = name
res.cmd = 'reply'
res.data = 'this would be response data from socket doit function'
resolve(res)
})
}
},3000)
}
// const options = {
// tls: TLS,
// key: await fs.readFile(TLS_KEY_PATH),
// cert: await fs.readFile(TLS_CRT_PATH),
// // This is necessary only if using client certificate authentication.
// // requestCert: true,
// // This is necessary only if the client uses a self-signed certificate.
// // ca: [ fs.readFileSync('client-cert.pem') ]
// }
let options = {path:true}
// let test = new Test()
let test = new Test(options)
await test.create()
console.log('ready')
})().catch(err => {
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)

View File

@ -1,6 +1,6 @@
{
"name": "@uci/socket",
"version": "0.3.3",
"version": "0.2.12",
"description": "JSON packet intra(named)/inter(TCP) host communication over socket",
"main": "src",
"scripts": {
@ -8,15 +8,13 @@
"test": "mocha -r esm --timeout 10000 test/*.test.mjs",
"testlog": "UCI_DEV=true mocha -r esm --timeout 10000 test/*.test.mjs",
"testci": "istanbul cover ./node_modules/.bin/_mocha --report lcovonly -- -R spec --recursive && codecov || true",
"s": "node -r esm examples/server",
"devs": "UCI_ENV=dev ./node_modules/.bin/nodemon -r esm examples/server",
"devs:anon": "UCI_ANON=true npm run devs",
"devs:anon:debug": "UCI_LOG_LEVEL=debug npm run devs:anon",
"devs:debug": "UCI_LOG_LEVEL=debug npm run devs",
"client": "node -r esm examples/client",
"devc": "UCI_ENV=dev ./node_modules/.bin/nodemon -r esm examples/client",
"devc:token": "UCI_CLIENT_TOKEN='cheetos' npm run devc",
"devc:debug": "UCI_LOG_LEVEL=debug npm run devc"
"s": "UCI_ENV=dev node -r esm examples/server",
"sp": "UCI_DEV=true node -r esm examples/server-push",
"devs": "SOCKETS_DIR=/opt/sockets UCI_DEV=true ./node_modules/.bin/nodemon -r esm-e mjs examples/server",
"c": "UCI_ENV=dev node -r esm examples/client",
"cp": "UCI_DEV=true node -r esm examples/client-push",
"devc": "SOCKETS_DIR=/opt/sockets UCI_DEV=true node -r esm examples/client",
"c2": "node -r esm examples/client2"
},
"author": "David Kebler",
"license": "MIT",
@ -42,17 +40,17 @@
"devDependencies": {
"chai": "^4.2.0",
"chai-as-promised": "^7.1.1",
"esm": "^3.2.25",
"mocha": "^8.0.1",
"nodemon": "^2.0.4"
"codecov": "^3.1.0",
"esm": "^3.0.84",
"istanbul": "^0.4.5",
"mocha": "^5.2.0",
"nodemon": "^1.18.6"
},
"dependencies": {
"@uci-utils/logger": "^0.0.18",
"@uci-utils/logger": "0.0.14",
"better-try-catch": "^0.6.2",
"clone": "^2.1.2",
"death": "^1.1.0",
"delay": "^4.4.0",
"make-dir": "^3.1.0",
"p-reflect": "^2.1.0"
"make-dir": "^3.0.0"
}
}

View File

@ -1,7 +1,6 @@
import { Socket } from 'net'
import path from 'path'
import { promisify } from 'util'
// import pause from 'delay'
import btc from 'better-try-catch'
import JsonStream from './json-stream'
@ -26,17 +25,15 @@ class SocketConsumer extends Socket {
* @param {object} [opts={}] test
*/
constructor(options = {}) {
constructor(opts = {}) {
super()
const opts = Object.assign({},options) // don't allow mutation
this.id = opts.id || opts.name || 'socket:' + new Date().getTime()
this.name = opts.name || 'a socket consumer'
log = logger({
file: 'src/consumer.js',
class: 'Consumer',
name: 'socket',
id: this.id
})
this.id = opts.id || opts.name || 'socket:' + new Date().getTime()
if (!opts.path) {
if(!opts.host) log.warn({method:'constructor', line:38, opts: opts, msg:'no host supplied using localhost...use named piped instead - opts.path'})
opts.host = opts.host || '127.0.0.1'
@ -48,139 +45,126 @@ class SocketConsumer extends Socket {
opts.path = path.join(DEFAULT_PIPE_DIR, opts.path)
}
this.opts = opts
this._data = {id:this.id, name:opts.name||this.id}
Object.assign(this._data,opts.data||{}) // holds consumer specific data that will be passed to server in header and on connection
// default is keepAlive true, must set to false to explicitly disable
// if keepAlive is true then consumer will also be reconnecting consumer
// initTimeout > 4 means socketInit will return a promise
this.initTimeout = opts.initTimeout > 4 ? opts.initTimeout * 1000 : null
this.pingFailedTimeout = opts.pingFailedTimeout * 1000 || 5000
this.reconnectLimit = opts.reconnectLimit || 0
this.retryWait = opts.retryWait==null ? 5000 : opts.retryWait * 1000
this.heartBeat = !!process.env.HEARTBEAT || opts.heartBeat
this.keepAlive = 'keepAlive' in opts ? opts.keepAlive : true
this._ready = false
this.timeout = opts.timeout || 60 // initial connect timeout in secs and then rejects
this.wait = opts.wait || 2
this.stream = new JsonStream()
// bind to class for other class functions
this.connect = this.connect.bind(this)
this.close = promisify(this.end).bind(this)
this._reconnectCount = 0
this._connected = false
this._authenticated = false
this._active = false
this._connection = 'offline'
this._first = true // first connection or not
this._pingTimeout // value sent from socket upon connect
this.__ready = this.__ready.bind(this)
// this._write = this._write.bind(this)
}
get connected() { return this._connected}
// FIXME change active
get active() { return this._authenticated }
get connection() { return this._connection }
notify(state, moreOpts={}) {
this._connection = state || this._connection
let opts = {state:this._connection, id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected}
Object.assign(opts,moreOpts)
this.emit('connection:socket',opts)
}
log(level='debug', msg) {
let opts = {level:level, msg:msg, id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected, state:this._connection}
if (typeof msg !== 'string') Object.assign(opts,msg)
this.emit('log',opts)
log[level](opts)
}
async connect(timeout=0) {
this.initTimeout = timeout > 4 ? timeout * 1000 : this.initTimeout
this.notify('starting')
if (this.opts.host === '127.0.0.1'|| this.opts.host ==='localhost') {
let msg ='tcp consumer on same machine as host, use named Pipe(Unix) Socket Instead'
log.warn({method:'connect', msg:'msg'})
this.log('warn',msg)
}
this.once('error', errorHandler.bind(this))
this.once('connect',connectHandler.bind(this))
super.connect(this.opts)
// returns promise for initial connection when initTimeout is not zero with reject on timeout
if (this._first && this.initTimeout) {
let initTimeout = {}
this._first=false
async connect() {
return new Promise((resolve, reject) => {
initTimeout = setTimeout(async () => {
this.disconnect()
let msg=`unable to connect initially to socket server in ${this.initTimeout/1000} secs, giving up no more attempts`
this.log('fatal',msg)
this.notify('failed',{timeout:this.initTimeout, msg:msg})
reject({ error:msg, opts: this.opts})
}
, this.initTimeout)
let initial = true
const successHandler = (ev) => {
if (ev.state === 'connected') {
// this is only for initial connection
const initTimeout = setTimeout(() => {
log.fatal({method:'connect', line:69, opts: this.opts, msg:`unable to connect in ${this.timeout}s`})
reject(
{ opts: this.opts },
`unable to connect to socket server in ${this.timeout}secs`
)
}, this.timeout * 1000)
this.once('connect', async () => {
clearTimeout(initTimeout)
this.removeListener('connection:socket',successHandler)
this.log('info','initial connection successfull')
resolve({opts: this.opts, msg: 'initial connection successfull'})
}
this._listen()
log.debug({method:'connect', line:80, opts: this.opts, msg:'initial connect waiting for socket ready handshake'})
this.setKeepAlive(this.keepAlive, 3000)
let [err, res] = await btc(isReady).bind(this)(this.__ready,this.wait,this.timeout)
if (err) reject(err)
initial = false
log.debug({method:'connect', line:85, msg:'handshake to socket done, TODO authenticating'})
// TODO authenticate here by encrypting a payload with private key and sending that.
// await btc(authenticate)
this.emit('connected') // for end users to take action
resolve(res)
})
let reconTimeout
// function that sets a reconnect timeout
const reconnect = () => {
reconTimeout = setTimeout(() => {
this.removeAllListeners()
this.stream.removeAllListeners()
this.destroy()
connect()
}, this.wait * 1000)
}
this.on('connection:socket',successHandler.bind(this))
// connection function that sets listeners and deals with reconnect
const connect = () => {
if (this.opts.host === '127.0.0.1'|| this.opts.host ==='localhost')
log.warn({method:'connect', line:107, msg:'tcp consumer on same machine as host, use named Pipe(Unix) Socket Instead'})
if(!initial) {
this.once('connect', async () => {
clearTimeout(reconTimeout)
this._listen()
log.debug({method:'connect', line:113, msg:'reconnected waiting for socket ready handshake'})
this.setKeepAlive(this.keepAlive, 3000)
let [err, res] = await btc(isReady).bind(this)(this.__ready,this.wait,this.timeout)
if (err) reject(err)
log.debug({method:'connect', line:69, msg:'rehandshake done, reauthenticating'})
// TODO authenticate here by encrypting a payload with private key and sending that.
// await btc(authenticate)
this.emit('connected')
this.emit('reconnected') // emit also reconnected for special end user action
resolve(res)
})
}
this._first=false
return 'connection in progress'
} // end connect
// manual disonnect
async disconnect() {
clearTimeout(this._errorRetry)
this.removeAllListeners('ping')
this.removeAllListeners('connect')
this.removeAllListeners('error')
this.removeAllListeners('data')
this.removeAllListeners('pushed')
this.stream.removeAllListeners()
this._connected=false
this._authenticated=false
this.notify('disconnected')
this._first = true
}
async reconnect () {
if (this._connected) this.disconnect()
this._first = false
if ( this.reconnectLimit === 0 || this._reconnectCount < this.reconnectLimit ) {
this._reconnectCount += 1
this.log('warn',`Attempting a reconnect: attempt ${this._reconnectCount} of ${this.reconnectLimit ||'unlimited'}`)
this.connect()
this.on('error', async err => {
if (err.code !== 'EISCONN') {
this._ready = false
this.emit('ready', false)
log.debug({ method:'connect', line:130, error: err.code, msg:`connect error ${err.code}, attempting reconnect`})
reconnect()
}
else {
this.log('fatal',`${this.reconnectLimit} reconnect attempts exceeded no more tries`)
this._ready = true
this.emit('ready', true)
log.error({method:'connect', line:69, msg:'reconnected to socket, ready to go again'})
}
})
if (this.keepAlive) { // only attempt reconnect is keepAlive is set which it is by default
this.on('end', async () => {
log.error({method:'connect', line:142, msg:'socket (server) terminated unexpectantly, keepalive set, wait for server to come online'})
this._ready = false
this.emit('error', { code: 'DISCONNECTED' })
})
}
// attempt connection
log.debug({method:'connect', line:149, opts: this.opts, msg:`attempting to connect ${this.id} to socket`})
super.connect(this.opts)
} // end connect function
connect() // initial connect request
}) //end promise
}
async send(ipacket) {
return new Promise(async resolve => {
if (!this.active) {
resolve({ error: 'socket consumer not connected, aborting send' })
} else {
if (!this._ready) resolve({ error: 'socket consumer not connected, aborting send' })
let packet = Object.assign({}, ipacket) // need to avoid mutuation for different consumers using same packet instance
setTimeout(() => {resolve({ error: 'no response from socket in 10sec' })}, 10000)
packet._header = {
id: Math.random()
.toString()
.slice(2), // need this for when multiple sends for different consumers use same packet instanceack
sender: { data: this._data, instanceID: this.id},
sender: { name: this.name, instanceID: this.id },
path: this.opts.path,
port: this.opts.port,
host: this.opts.host
@ -188,70 +172,31 @@ class SocketConsumer extends Socket {
let [err, res] = await btc(this.stream.serialize)(packet)
if (err)
resolve({error: 'unable to serialize packet for sending',packet: packet})
if (this.active && this.writable) {
let res2 = await this.__write(res)
if (res2.error) resolve(res2)
// if no write error then wait for send response
await this.__write(res)
this.once(packet._header.id, async function(reply) {
let res = await this._packetProcess(reply)
if (!res) { // if packetProcess was not promise
res = reply
log.debug({method:'send', line:180, msg:'consumer function was not promise returning further processing may be out of sequence'})
}
resolve(res) // resolves processed packet not return packet
resolve(res)
}) //end listener
}
}
})
}
// TODO register user alt stream processor (emit 'message' with JSON, serialize function, onData method for raw socket chucks)
// TODO register authenciation function (set up default)
registerPacketProcessor(func) {
this._packetProcess = func
}
// func should return an object the server expects
registerAuthenticator (func) {
this._authenticate = func
}
// stream needs an .onData method and will be stream bound handler for data event
resgisterStreamProcessor (func) {
this.stream = func
}
// PRIVATE METHODS
// default authentication using a simple token
_authenticate () {
return { token: process.env.UCI_CLIENT_TOKEN || this.token || 'default' }
}
async _authenticateSend (authPacket={}) {
return new Promise(async resolve => {
setTimeout(() => {resolve({ error: 'no response from socket in 10sec' })}, 10000)
let [err, serPacket] = await btc(this.stream.serialize)(authPacket)
if (err)
resolve({error: 'unable to serialize packet for sending',packet: authPacket})
this.stream.once('message',(res) => {
resolve(res)
})
let res = await this.__write(serPacket)
if (res.error) resolve(res)
// otherwise wait for message listener above to return
})
}
async __write(packet) {
// timeout already set if sockect can't be drained in 10 secs
return new Promise((resolve) => {
// most stream write errors will be caught by socket error listener but make sure
if (!this.writable) { // stream is writeable as can't catch epipe errors there
resolve({error:'socket stream closed can not send packet'})
return
}
const cb = () => resolve({response:'packet written to consumer side socket stream '})
return new Promise(resolve => {
const cb = () => resolve('packet written to consumer side socket stream ')
if (!super.write(packet)) {
this.once('drain', cb)
} else {
@ -260,138 +205,61 @@ class SocketConsumer extends Socket {
})
}
__ready() {
return this._ready
}
async _listen() {
log.debug('listening for incoming packets from socket')
// listen for pushed packets
this.on('pushed', async function(packet) {
// TODO do some extra security here?
let res = await this._packetProcess(packet)
if (!res) {
// if process was not promise returning then res will be undefined
log.debug('consumer packet processing function was not promise returning')
}
})
// listen on socket stream
this.on('data', this.stream.onData)
this.stream.on('message', messageProcess.bind(this))
async function messageProcess(packet) {
log.debug('incoming packet from socket',packet)
if (packet._handshake) {
this._ready = true
return
}
// TODO send back ack with consumer ID and authorization and wait
// when authorized drop through here to emit
this.emit(packet._header.id, packet)
}
}
// default packet process just a simple console logger. ignores any cmd: prop
_packetProcess(packet) {
console.log('default consumer processor -- log packet from socket to console')
console.log('replace by calling .registerPacketProcessor(func) with your function')
console.dir(packet)
}
} // end class
export default SocketConsumer
// CONNECTION HANDLERS
async function connectHandler () {
this.removeAllListeners('error') // turn off error handler during handshake
this._doneAuthenticate = setTimeout(() =>{
let msg ='authentication not completed in 3 secs, forcing reconnect attempt'
this.notify('failed',{msg:msg})
this.log('warn',msg)
this.reconnect()
},3000)
this.on('data', this.stream.onData)
this.stream.on('message', handshake.bind(this))
this.log('info','in process of connecting waiting for socket handshake/authenticate')
this.notify('connecting')
}
async function handshake (packet) {
if (packet._handshake) {
this._connected = true
this.notify('handshake')
const authPacket = Object.assign(this._authenticate() || {}, {_authenticate:true, data:this._data})
// console.log('----------------authentication packet---------------',authPacket)
let res ={}
if (!this.writable) res.error ='socket stream not writable'
else res = await this._authenticateSend(authPacket)
this.stream.removeAllListeners('message')
clearTimeout(this._doneAuthenticate)
if (res.error) {
let msg =`error during authentication ${res.error}, attempting reconnect in ${this.retryWait/1000}s to see if error clears`
this.notify('error',{msg:msg})
this.log('error',msg)
this._errorRetry = setTimeout(this.reconnect.bind(this),this.retryWait)
} else {
if (!res.authenticated) {
let msg =`authentication failed: ${res.reason}, disconnecting with no reconnect`
this.notify('failed',{msg:msg})
this.log('error',msg)
this.disconnect()
}
else {
this._authenticated = res.authenticated
let msg ='authentication succeeded connection ready'
this.notify('connected',{msg:msg})
this.log('info',msg)
this._reconnectCount = 0
this.stream.on('message', messageHandler.bind(this)) // reset default message handler
this.on('pushed', pushHandler.bind(this) )
this.once('error', errorHandler.bind(this)) // listen for errors on authenticated socket
if (this.keepAlive) { // only attempt reconnect if keepAlive is set which it is by default
this.on('ping',pingHandler.bind(this))
this.setKeepAlive(this.keepAlive,3000) // keep connection alive unless disabled
}
if (this.opts.conPacket) (this.send(this.conPacket))
}
}
}
}
// general message handler handler
function messageHandler(packet) {
if (packet._header.id !== 'ping') { // ping has it's own listner
let obj = { msg:'incoming packet from socket sever',packet:packet}
this.log('trace',obj)
}
this.emit(packet._header.id, packet)
}
// assume all errors are fatal and the socket needs to be disconnected/reconnected
async function errorHandler (err) {
this.disconnect()
let msg = `error, this consumer is disconnected, trying reconnect in ${this.retryWait/1000} secs`
this.notify('error',{error:err,msg:msg})
this.log('error',msg)
this._errorRetry = setTimeout(this.reconnect.bind(this),this.retryWait)
}
// const pushHandler = async (packet) => {
async function pushHandler (packet) {
// TODO do some extra security here?
let msg = {msg:'packed was pushed from socket sever, processing', packet:packet}
this.log('trace',msg)
let res = await this._packetProcess(packet)
if (!res) {
// if process was not promise returning then res will be undefined
log.debug('consumer packet processing function was not promise returning')
}
}
// const pingHandler = async (packet) => {
async function pingHandler (packet) {
if (this.heartBeat) console.log('lub dub')
clearTimeout(this._ping)
this._pingTimeout= (packet.pingInterval || 5000) + 1000 // set value from server
monitorPing.call(this)
}
async function pingFailedHandler () {
clearTimeout(this._pingFailed)
clearTimeout(this._ping) // clear any others set
this.removeAllListeners('ping')
this.on('ping',pingHandler.bind(this))
let msg = 'ping has been received again, back to normal connection'
this.notify('connected',{msg:msg})
this.log('info',msg)
}
function monitorPing () {
this._ping = setTimeout( () => {
this.removeAllListeners('ping')
let msg = `ping failed waiting ${this.pingFailedTimeout/1000} secs to before forced reconnect`
this.notify('offline',{msg:msg})
this.log('warn',msg)
this.on('ping',pingFailedHandler.bind(this))
this._pingFailed = setTimeout (() => {
this.removeAllListeners('ping')
let msg =`no ping received for ${this.pingFailedTimeout/1000} secs, force disconnect/reconnect`
this.notify('failed',{msg:msg})
this.log('warn',msg)
this.reconnect()
// this.emit('error',{code:'PING_FAILED'})
}, this.pingFailedTimeout)
},this._pingTimeout)
// Helper Functions
// wait until a passed ready function returns true
function isReady(ready, wait = 30, timeout = 1000) {
let time = 0
return new Promise((resolve, reject) => {
(function waitReady() {
if (time > timeout)
return reject(
`timeout waiting for socket ready handshake - ${timeout}ms`
)
if (ready()) return resolve('ready')
log.debug({function:'isReady', line:261, msg:`waiting ${wait}ms for handshake`})
time += wait
setTimeout(waitReady, wait)
})()
})
}

View File

@ -18,26 +18,14 @@ class JsonStream extends EventEmitter {
this._delimeter = opts.delimiter || '#'
this.onData = this.onData.bind(this)
this.serialize = this.serialize.bind(this)
this._state = 'online'
this._queue = []
}
get state () {return this._state}
offline () { this._state = 'offline' }
pause () {this._state = 'paused'} // queue messages in handler
resume () {
// emit the messages in the queue
this._state='online'
}
online() {this._state = 'online'}
onData(data) {
data = decoder.write(data)
try {
this._handleData(data)
} catch (err) {
// emit an error on the socket that handled with other socket errors
this.emit('error', err)
} catch (e) {
this.emit('error', { error: e })
}
}
@ -63,7 +51,7 @@ class JsonStream extends EventEmitter {
if (isNaN(this._contentLength)) {
this._contentLength = null
this._buffer = ''
let err = new Error(
var err = new Error(
'Invalid content length supplied (' +
rawContentLength +
') in: ' +
@ -76,12 +64,12 @@ class JsonStream extends EventEmitter {
}
}
if (this._contentLength != null) {
let length = Buffer.byteLength(this._buffer, 'utf8')
var length = Buffer.byteLength(this._buffer, 'utf8')
if (length == this._contentLength) {
this._handleMessage(this._buffer)
} else if (length > this._contentLength) {
let message = this._buffer.substring(0, this._contentLength)
let rest = this._buffer.substring(this._contentLength)
var message = this._buffer.substring(0, this._contentLength)
var rest = this._buffer.substring(this._contentLength)
this._handleMessage(message)
this.onData(rest)
}
@ -95,18 +83,14 @@ class JsonStream extends EventEmitter {
try {
message = JSON.parse(data)
} catch (e) {
let err = new Error(
var err = new Error(
'Could not parse JSON: ' + e.message + '\nRequest data: ' + data
)
err.code = 'E_INVALID_JSON'
throw err
}
message = message || {}
// console.log('stream message', message, this._state)
if (this._stream ==='pause') {
if (message._header.id !== 'ping') this.queue.shift(message)
}
if(this._state==='online') this.emit('message', message)
this.emit('message', message)
}
}

View File

@ -5,7 +5,6 @@ import path from 'path'
// npmjs modules
import mkdir from 'make-dir'
import btc from 'better-try-catch'
import pReflect from 'p-reflect'
import _ON_DEATH from 'death' //this is intentionally ugly
import JSONStream from './json-stream'
import clone from 'clone'
@ -40,7 +39,7 @@ export default function socketClass(Server) {
* @param {String} options.host a tcp host name nornally not used as 0.0.0.0 is set by default
* @param {String} options.port a tcp
* @param {String | Boolean} options.path xeither full path to where socket should be created or if just 'true' then use default
* @param {Boolean} options.consumerTracking track connected consumers for push notifications - default: true
* @param {Boolean} options.clientTracking track connected clients for push notifications - default: true
* @param {Object} options.conPacket A json operson's property
*
*/
@ -48,7 +47,6 @@ export default function socketClass(Server) {
super(opts)
delete opts.key
delete opts.cert
this.name = opts.name
this.id = opts.id || opts.name || 'socket:' + new Date().getTime()
if (!opts.path) {
opts.host = opts.host || '0.0.0.0'
@ -59,46 +57,35 @@ export default function socketClass(Server) {
if (path.dirname(opts.path) === '.') // relative path sent
opts.path = path.join(DEFAULT_PIPE_DIR, opts.path)
}
this.defaultReturnCmd = opts.defaultReturnCmd
this.allowAnonymous = (!opts.tokens || !!process.env.UCI_ANON || opts.allowAnonymous) ? true : false
this.tokens = opts.tokens || []
this.keepAlive = 'keepAlive' in opts ? opts.keepAlive : true
this.pingInterval = opts.pingInterval === false ? 0 : (opts.pingInterval * 1000 || 5000)
this.consumers = new Map() // track consumers (i.e. clients) TODO use a Map
this.nextConsumerID = 0 // incrementer for default initial consumer ID
this.conPackets = opts.conPackets || [opts.conPacket]
this.clientTracking = opts.clientTracking || true
this.clients = [] // track consumers (i.e. clients)
this.opts = opts // for use to recover from selected errors
this.errorCount = 0
//self bindings
this._listen = this._listen.bind(this)
this.create = this.create.bind(this)
this.authenticateConsumer = this.authenticateConsumer.bind(this)
this._authenticate = this._authenticate.bind(this)
this.close = promisify(this.close).bind(this)
log = logger({
package:'@uci/socket',
file: 'src/socket.js',
class: 'Socket',
name: 'socket',
id: this.id
})
} // end constructor
get active() { return this.listening }
/**
* create - Description
*
* @returns {type} Description
*/
async create() {
this.emit('socket',{state:'creating', msg:'creating socket for consumers to connect'})
return new Promise(async (resolve, reject) => {
// set up a couple ways to gracefully destroy socket process is killed/aborted
_ON_DEATH(async () => {
log.error({method:'create', line:84, msg:'\nhe\'s dead jim'})
await this._destroy()
})
process.once('SIGUSR2', async () => {
await this._destroy()
await this._destroy
process.kill(process.pid, 'SIGUSR2')
})
@ -109,72 +96,30 @@ export default function socketClass(Server) {
// if TCP socket should already be dead
let [err, res] = await btc(promisify(fileDelete))(this.opts.path)
if (!err) {
log.info({method:'create', line:99, res: res, socket: this.opts.path, msg:'socket already exists.....deleted'})
// try again
this.removeAllListeners('listening')
resolve(await this.create())
log.debug({method:'create', line:99, res: res, socket: this.opts.path, msg:'socket already exists.....deleted'})
return await this._listen(this.opts)
}
log.error({method:'create', line:102, err: err, msg:'error deleting socket. Can not establish a socket'})
return err
}
}
if (err.code === 'EACCES') {
log.debug({method:'create', line:107, socket: this.opts.path, msg:'directory does not exist...creating'})
await mkdir(path.dirname(this.opts.path))
log.debug({method:'create', line:109, socket: this.opts.path, msg:'directory created'})
this.removeAllListeners('listening')
resolve(await this.create())
return await this._listen(this.opts)
}
// otherwise fatally exit
log.error({method:'create', line:113, err:err, opts:this.opts, msg:`error creating socket server ${this.name}`})
log.error({method:'create', line:113, err:err, msg:'error creating socket'})
reject(err)
})
this.once('listening', () => {
this.on('error', err => {
this.errorCount +=1 // log errors here
this.errors.push(err)
if(this.errorCount>2 && this.errorCount<6) {
let errors= {level:'warn',msg:'something bad maybe going on, 3 errors', errors:this.errors}
this.emit('socket',{state:'error', msg:'2 to 5 socket errors', errors:this.errors})
this.emit('log', errors)
log.error(errors)
}
if(this.errorCount>5) {
let errors = {level:'fatal',msg:'something fatal is going on, 6 errors', errors:this.errors}
log.fatal(errors)
this.removeAllListeners('listening')
this.listening=false
this.close(() => {
this.emit('socket',{state:'offline', msg:'too many socket errors no longer listening for consumers to connect'})
})
this.emit('log', errors)
}
})
let msg = `socket ready and listening ${typeof this.address() ==='string' ? `at ${this.address()}` : `on port ${this.address().port}`}`
this.emit('socket',{state:'listening', msg:msg})
let obj = {method:'create', line:54, msg:msg}
log.info(obj)
this.on('connection', this._connectionHandler.bind(this))
resolve(msg)
})
super.listen(this.opts)
this.enablePing()
let [err, res] = await btc(this._listen)(this.opts)
if (err) reject(err)
resolve(res)
}) // end creeate promise
} // end create
async stop() {
return new Promise(function(resolve) {
this.removeAllListeners('listening')
this.listening=false
this.close(() => {
this.emit('socket',{state:'offline', msg:'manually closed socket on request'})
resolve('socket is offline')
})
})
}
/**
* registerPacketProcessor - Description
* @public
@ -185,260 +130,66 @@ export default function socketClass(Server) {
this._packetProcess = func
}
enablePing () {
if (this.pingInterval > 499) {
this._ping = setInterval( async () =>{
if (this.consumers.size > 0) this.push({pingInterval:this.pingInterval},{packetId:'ping'})
},this.pingInterval)
}
}
disablePing() {
clearInterval(this._ping)
}
addTokens(tokens) {
if (typeof tokens ==='string'){
tokens = tokens.split(',')
}
this.tokens = this.tokens.concat(tokens)
if (this.tokens.length>0) this.allowAnonymous = false
}
removeTokens(tokens) {
if (typeof tokens ==='string'){
if (tokens === 'all') {
this.tokens = []
this.allowAnonymous = true
return
}
tokens = tokens.split(',')
}
this.tokens = this.tokens.filter(token => !tokens.includes(token))
if (this.tokens.length===0) {
log.warn({msg:'all tokens have been removed, switching to allow anonymous connections'})
this.allowAnonymous = true
}
}
registerTokenValidator (func) {
this.allowAnonymous = false
this._validateToken = func
}
registerAuthenticator (func) {
this.allowAnonymous = false
this._authenticate = func
}
/**
* push - pushes a supplied UCI object packet to all connected consumers
* push - pushes a supplied UCI object packet to all connected clients
*
* @param {object} packet Description
* @param {string} id the header id string of the pushed packet, default: 'pushed'
*
*/
// TODO support multiple consumers in options
async push(packet={},opts={}) {
if (this.consumers.size > 0) {
packet._header = {id: opts.packetId || 'pushed'}
let consumers = []
if ( opts.consumers || opts.consumer ) {
if (opts.consumer) opts.consumers = [opts.consumer]
consumers = Array.from(this.consumers).filter(([sid,consumer]) =>
opts.consumers.some(con=> {
// console.log('filtering consumers', consumer.sid,consumer.data,con)
return (
con === ( (consumer.data ||{}).name || (consumer.data ||{}).id ) ||
con.sid=== sid ||
con.name === (consumer.data ||{}).name ||
con.id === (consumer.data ||{}).id
)
}
)
).map(con=>con[1])
// console.log('custom consumers',consumers.length)
} else consumers = Array.from(this.consumers.values())
// if (!opts.packetId) {
// console.log('socket class push',packet,opts,consumers.length)
// console.log('consumer for push', consumers.map(consumer=>(consumer.data ||{}).name))
// }
consumers = consumers.filter(consumer=>consumer.writable)
const send = consumer => this._send(consumer,packet)
const res = await Promise.all(consumers.map(send).map(pReflect))
const success = res.filter(result => result.isFulfilled).map((result,index) => [consumers[index].name,result.value])
const errors =res.filter(result => result.isRejected).map((result,index) => [consumers[index].name,result.reason])
this.emit('log',{level:errors.length? 'error': packet._header.id==='ping'?'trace':'info', msg:'packet was pushed', socket:this.name||this.id, errors:errors, packet:packet, success:success, headerId:packet._header.id})
} else {
this.emit('log',{level:'debug', msg:'no connected consumers packet push ignored', packet:packet})
// log.debug({method:'push', id:packet._header.id, opts: this.opts, packet: packet, msg:'no connected consumers, push ignored'})
}
}
removeConsumer (sid) {
let consumer=this.consumers.get(sid)
this.emit('log',{level:'info', msg:'a consumer disconnected', consumer:consumer.data, sid:consumer.sid})
this.emit('connection:consumer',{state:'disconnected', msg:'a consumer disconnected', consumer:consumer.data, sid:consumer.sid})
consumer.removeAllListeners()
consumer.stream.removeAllListeners()
this.consumers.delete(sid)
log.warn({msg:'consumer removed from tracking',sid:sid, curConsumerCount:this.consumers.size})
}
async authenticateConsumer(consumer) {
return new Promise(async (resolve, reject) => {
// when consumer gets the handshake they must follow with authentication
consumer.stream.on('message', authenticate.bind(this,consumer))
let [err] = await btc(this._send)(consumer,{_handshake: true, sid:consumer.sid})
if (err) {
log.error({msg:'error in handshake send', error:err})
reject(err)
}
async function authenticate (consumer,packet) {
log.debug({msg:`authentication packet from consumer ${consumer.name}:${consumer.id}:${consumer.sid}`, packet:packet})
consumer.stream.removeAllListeners('message')
if (!packet._authenticate) reject('first consumer packet was not authentication')
else {
let [err, res] = await btc(this._authenticate)(packet)
consumer.authenticated = this.allowAnonymous ? 'anonymous' : (err ? false : res)
consumer.data = packet.data || {}
consumer.name = packet.name || consumer.data.name
consumer.id = packet.id || consumer.data.id
// console.log('-------------------Inbound Consumer Authenticated---------------------------')
// console.log(packet)
// console.log(consumer.authenticated, consumer.name,consumer.id,consumer.data)
// console.log('--------------------------------------------------------')
packet.authenticated = consumer.authenticated
packet.reason = err || null
log.debug({msg:'sending authorization result to consumer', packet:packet})
this._send(consumer,packet) // send either way
if (err && !this.allowAnonymous) {
log.info({msg:`consumer ${consumer.data.name} authentication failed`, name:consumer.name, id:consumer.id, data:consumer.data, consumer_sid:consumer.sid, reason:err})
reject(packet.reason)
}
else {
log.info({msg:`consumer ${consumer.name} authenticated successfuly`, name:consumer.name, id:consumer.id, data:consumer.data})
if (this.allowAnonymous) log.warn({msg:`consumer ${consumer.data.name}, connected anonymously`})
resolve(consumer.authenticated)
}
}
async push(packet, id) {
packet._header = { id: id || 'pushed' }
log.debug({method:'push', line:142, opts: this.opts, packet: packet, msg:'pushing a packet to all connected consumers'})
this.clients.forEach(async client => {
if (client.writable) {
let [err, ser] = await btc(client.stream.serialize)(packet)
if (err)
ser = await client.stream.serialize({
error: 'was not able to serialze the res packet',
err: err,
_header: { id: packet._header.id }
})
if (!id || id === client.ID) await this._send.bind(client)(ser)
}
})
}
// private methods
// default validator
_validateToken (token) {
if (token) return this.tokens.includes(token)
return false
}
// default authenticator - reject value should be reason which is returned to consumer
async _authenticate (packet) {
if (!this._validateToken(packet.token)) return Promise.reject('invalid token')
return true
}
// async _connectionHandler({consumer, server}) { // this gets called for each consumer connection and is unique to
async _connectionHandler(consumer) { // this gets called for each consumer connection and is unique to each
async _listen(opts) {
super.listen(opts, async (err, res) => {
if (err) return err
// this gets called for each client connection and is unique to each
this.on('connection', async socket => {
const stream = new JSONStream()
consumer.stream = stream
consumer.data = {}
consumer.connected = true
// add listeners
consumer.on('error', (err) => {
log.error({msg:'consumer connection error',error:err})
// TODO do more handling than just logging
})
consumer.on('end', (err) => {
log.error({msg:`'consumer connection ended: ${consumer.data.name}`, error:err})
if (consumer.sid) this.removeConsumer(consumer.sid)
else {
consumer.removeAllListeners()
consumer.stream.removeAllListeners()
socket.stream = stream // need this to track clients
let send = this._send.bind(socket)
if (this.clientTracking) this.clients.push(socket)
// TODO add 'close' listener to socket to remove from this.clients
log.debug({method:'_listen', line:167, msg:'new consumer connecting'})
log.debug(await send(await stream.serialize({ _handshake: true })))
if (this.opts.conPacket) {
this.opts.conPacket._header = { id: 'pushed' }
log.debug({method:'_listen', line:171, conPacket: this.opts.conPacket, msg:'pushing a preset command to just connected consumer'})
send(await stream.serialize(this.opts.conPacket)) // send a packet command on to consumer on connection
}
})
socket.on('data', stream.onData)
// TODO need to start error listener for stream so errors can be processed
stream.on('message', messageProcess.bind(this, socket))
consumer.on('data', stream.onData) // send data to
stream.on('error', (err) => {
log.error({msg:'consumer stream error during listen',error:err})
// TODO do more handling than just logging
})
// consumer.authenticated = true
let [err] = await btc(this.authenticateConsumer)(consumer)
if (!this.allowAnonymous) {
if (err) {
consumer.removeAllListeners()
consumer.stream.removeAllListeners()
consumer.end()// abort new connection consumer, cleanup, remove listeners
consumer.emit('end',err)
return
}
}
// authenticated consumer, add to list of consumers
consumer.sid = ++this.nextConsumerID // server assigned ID
// consumer.authenticated = true
this.consumers.set(consumer.sid, consumer) // add current consumer to consumers
consumer.setKeepAlive(this.keepAlive,30)
const consumerCloseHandler = (sid) => {
log.warn({msg:'consumer connection was closed',sid:sid})
this.removeConsumer(sid)
}
consumer.on('close', consumerCloseHandler.bind(this,consumer.sid))
log.debug({method:'_listen', line:364, msg:'new consumer connected/authenticated', cname:consumer.name, cid:consumer.id, totalConsumers:this.consumers.size})
// all's set enable main incoming message processor
stream.on('message', messageProcess.bind(this, consumer))
if (this.conPackets.length) {
setTimeout(async () => {
for (let packet of this.conPackets) {
packet._header = {type:'on connection packet', id: 'pushed'}
await this._send(consumer,packet) // send a packet command on to consumer on connection
}
},100)
}
this.emit('log',{level:'info', msg:'a consumer connected and authenticated', name:consumer.name, id:consumer.id})
this.emit('connection:consumer',{state:'connected', msg:`consumer ${(consumer.data ||{}).name} connected and authenticated to socket ${this.id}`,
name:(consumer.data ||{}).name ||(consumer.data ||{}).id || consumer.sid,
sid:consumer.sid,
data:consumer.data,
authenticated:consumer.authenticated
})
// that's it. Connection is active
async function messageProcess(consumer, packet) {
this.emit('log',{level:'mcp', packet: packet, consumer:consumer.data, msg:'incoming packet on socket side'})
let res = (await this._packetProcess(clone(packet))) || {}
async function messageProcess(client, packet) {
log.debug({method:'_listen', line:179, packet: packet, msg:'incoming packet on socket side'})
let res = {}
if (this.clientTracking && packet.clientID) {
client.ID = packet.clientID
res.cmd = 'ackID'
} else {
res = (await this._packetProcess(clone(packet))) || {}
if (Object.keys(res).length === 0)
res = {
error:
'socket packet command function likely did not return a promise',
packet: packet
}
}
if (packet) {
res._header = clone(packet._header, false) || {} //make sure return packet has header with id in case it was removed in processing
delete packet._header // remove before adding to response header as request
@ -446,61 +197,46 @@ export default function socketClass(Server) {
res._header.request = clone(packet, false)
res._header.responder = { name: this.name, instanceID: this.id }
res._header.socket = this.address()
if (!res.cmd) res.cmd = this.defaultReturnCmd || 'reply' // by default return command is 'reply'
let [err] = await btc(this._send)(consumer,res)
if (err) log.error({msg:err, error:err})
} // end message process
if (!res.cmd) res.cmd = 'reply' // by default return command is 'reply'
let [err, ser] = await btc(stream.serialize)(res)
if (err)
ser = await stream.serialize({
error: 'was not able to serialze the res packet',
err: err,
_header: { id: res._header.id }
})
await send(ser)
} // end process message
}) // end connecttion consumer
log.info({method:'_listen', line:211, opts: this.opt, msg:'socket created and listening'})
return res
}) // end super listen callback
} // end listen
// call when socket server is going down
async _destroy() {
log.fatal({method:'_destroy', line:217, msg:'closing down socket server'})
// this.push()
clearInterval(this._ping)
log.debug({method:'_destroy', line:217, msg:'closing down socket'})
await this.close()
this.consumers.forEach(consumer => {
consumer.removeAllListeners()
consumer.stream.removeAllListeners()
})
log.debug({method:'_destroy', line:219, msg:'all connections closed....exiting'})
process.exit()
}
// default packet process, just a simple echo, override with registerPacketProcessor
// default packet process, just a simple echo
async _packetProcess(packet) {
return new Promise(resolve => {
resolve(packet)
})
}
async _send(consumer, packet) {
log.trace({msg:`sending to consumer:${consumer.sid}:${consumer.data.name}`, consumer:consumer.data, packet:packet})
return new Promise(async (resolve, reject) => {
if (!consumer) {
console.log('no consumer rejecting packet send')
reject('no consumer specified can not send packet')
return
}
if (!consumer.writable) {
console.log('no consumer writeable stream rejecting packet send')
reject('socket stream closed can not send packet')
return
}
let [err,ser] = await btc(consumer.stream.serialize)(packet)
if (err) reject('unable to serialze the packet')
if (!ser) {
// console.log('empty-serialized packet', consumer.name, consumer.socketName)
reject('empty packet rejecting send, nothing to send')
return
}
// consumer send, must have a consumer socket bound to use
async _send(packet) {
// timeout already set if sockect can't be drained in 10 secs
return new Promise(resolve => {
const cb = () => resolve('packet written to socket stream')
if (!consumer.write(ser)) {
consumer.once('drain', cb)
if (!this.write(packet)) {
this.once('drain', cb)
} else {
process.nextTick(cb)
}
})
}
} // end class