Compare commits

..

28 Commits
tls ... master

Author SHA1 Message Date
David Kebler 09da08b548 0.3.3 update deps 2020-07-26 15:32:48 -07:00
David Kebler b856710a3b 0.3.2 catch no consumer on send 2020-07-12 18:28:44 -07:00
David Kebler 9528e71abb 0.3.1 added 100ms delay to connection packet push to avoid issue where connection packet is not sent/received 2020-03-24 14:13:50 -07:00
David Kebler 70fee6fc7c 0.3.0 bumping to .3 with upcoming 3-2020 deployment of lighting system.
no apparent bugs
2020-03-15 15:25:46 -07:00
David Kebler 0d578958a8 0.2.33 refactor push, put in more send/write guards for closed stream 2020-02-10 21:38:56 -08:00
David Kebler 42d4b54008 0.2.31 support for multiple conPackets as an array 2020-01-18 22:34:52 -08:00
David Kebler 65169e3ded 0.2.30
socket-class.js
now emits 'socket' on socket listening state
added stop method to stop socket listening manually.  (no remote access via base yet)
added .name and .id to consumer from either packet props or packet data prop
consumer.js
  don't allow passed options mutation
  add .name prop
  merge in passed data option prop into authentication packet sent to socket
2020-01-14 13:38:50 -08:00
David Kebler 8a4fdf067c 0.2.29
socket:
  removed all 'client' names and replaced with 'consumer'
  switched to using MAP for holding consumers
  incoming consumer includes passing opts.data for passing consumer specific information to socket
consumer:
   supports opts.data for passing to socket
updated examples accordingly
2020-01-06 23:10:30 -08:00
David Kebler 70e16fa08d 0.2.28 Prevent epipe error exception on socket stream write 2019-12-18 18:15:58 -08:00
David Kebler d7fc89cafb 0.2.27 fix initial success handler that had the old event 2019-12-08 14:53:24 -08:00
David Kebler ba112c484e 0.2.26
refactor the connection events,  status is now log,
connection event is now connection:socket or connection:consumer
'online' to just 'connected'
2019-12-05 14:49:28 -08:00
David Kebler dcd178de7b 0.2.25 2019-12-02 15:11:26 -08:00
David Kebler 2314670c04 0.2.24 refactor consumer for more robust error/disconnect handling
add online/offline/pause/resume to json stream
removed bad 'connection' emits from socket-class as that event is already used by actual consumer connections.
2019-12-02 14:31:38 -08:00
David Kebler e29b4ba838 0.2.23 2019-11-27 14:32:54 -08:00
David Kebler 18a65b42c5 0.2.22
add active getter
emit status active when connect state changes
2019-11-21 10:04:48 -08:00
David Kebler 7502902a51 0.2.21 add ability to set the default return command as option defaultReturnCmd
add consumer-connection emits
change name in connection handler to consumer from socket
add getClientIndex and getClient to api methods
2019-09-16 18:10:57 -07:00
David Kebler 5f28baaa74 0.2.20 emit status events in consumer
refactor listen method into a handler
2019-09-08 19:49:35 -07:00
David Kebler 6862d912b2 refactored create to better handle the listening and promise resolution 2019-08-29 13:42:28 -07:00
David Kebler 8078831971 0.2.19
register authenticator added to consumer and socket
default consumer authenticator looks for token in environment or opts or sets to 'default'
default socket authenticator looks for token validator method
add and remove token methods
default token validator method just checks sent token against list
can register alt token validator
improved authenticate function to handle fail reason and anonymous connect
2019-08-28 09:02:27 -07:00
David Kebler 3299e9125e refactored _send in socket class
added authentifcation with default being a token
attach any passed client name to server side client socket
2019-08-23 15:48:39 -07:00
David Kebler 375790bfc4 Add client ID to connected client on server and remove client when it closes 2019-08-20 10:52:59 -07:00
David Kebler 3bf375c8fd 0.2.18 Major refactor of consumer/client. Initial connect with promise resolution now separate from listening and reconnect handling. Added server ping and conusmer ping monitoring to know when to try a reconnect. Other errors will give a reconnect as well. 2019-08-20 10:52:02 -07:00
David Kebler db3e2e8054 0.2.17 Update Deps 2019-08-14 14:06:44 -07:00
David Kebler a332219bdd 0.2.16 truly fixed the client connect/reconnect so it works even if initially no connection was made 2019-08-14 13:31:37 -07:00
David Kebler 47fa6380bf 0.2.14 2019-05-01 15:34:18 -07:00
David Kebler ccf9ab8c8a modify default timeout to 120 seconds and add in enironment variable to allow changing it 2019-05-01 15:34:07 -07:00
David Kebler 456b9568ff 0.2.13 add better error handling to avoid reconnrest execpetion 2019-04-28 09:58:35 -07:00
David Kebler 072dd25dc4 0.2.12 update deps, clean up logging, merged in tlc branch, TLS not implemented! but had many other changes to merge 2019-04-26 10:23:30 -07:00
16 changed files with 829 additions and 2142 deletions

View File

@ -9,11 +9,9 @@ module.exports = {
"node": true,
"mocha": true
},
"parser": "babel-eslint",
"parserOptions": {
"ecmaVersion": 2017,
"sourceType": "module",
"allowImportExportEverywhere": true
"sourceType": "module"
},
"extends": "eslint:recommended",
"rules": {

View File

@ -1,518 +0,0 @@
/*--------------------- Typography ----------------------------*/
@font-face {
font-family: 'aller-light';
src: url('public/fonts/aller-light.eot');
src: url('public/fonts/aller-light.eot?#iefix') format('embedded-opentype'),
url('public/fonts/aller-light.woff') format('woff'),
url('public/fonts/aller-light.ttf') format('truetype');
font-weight: normal;
font-style: normal;
}
@font-face {
font-family: 'aller-bold';
src: url('public/fonts/aller-bold.eot');
src: url('public/fonts/aller-bold.eot?#iefix') format('embedded-opentype'),
url('public/fonts/aller-bold.woff') format('woff'),
url('public/fonts/aller-bold.ttf') format('truetype');
font-weight: normal;
font-style: normal;
}
@font-face {
font-family: 'roboto-black';
src: url('public/fonts/roboto-black.eot');
src: url('public/fonts/roboto-black.eot?#iefix') format('embedded-opentype'),
url('public/fonts/roboto-black.woff') format('woff'),
url('public/fonts/roboto-black.ttf') format('truetype');
font-weight: normal;
font-style: normal;
}
/*--------------------- Layout ----------------------------*/
html { height: 100%; }
body {
font-family: "aller-light";
font-size: 14px;
line-height: 18px;
color: #30404f;
margin: 0; padding: 0;
height:100%;
}
#container { min-height: 100%; }
a {
color: #000;
}
b, strong {
font-weight: normal;
font-family: "aller-bold";
}
p {
margin: 15px 0 0px;
}
.annotation ul, .annotation ol {
margin: 25px 0;
}
.annotation ul li, .annotation ol li {
font-size: 14px;
line-height: 18px;
margin: 10px 0;
}
h1, h2, h3, h4, h5, h6 {
color: #112233;
line-height: 1em;
font-weight: normal;
font-family: "roboto-black";
text-transform: uppercase;
margin: 30px 0 15px 0;
}
h1 {
margin-top: 40px;
}
h2 {
font-size: 1.26em;
}
hr {
border: 0;
background: 1px #ddd;
height: 1px;
margin: 20px 0;
}
pre, tt, code {
font-size: 12px; line-height: 16px;
font-family: Menlo, Monaco, Consolas, "Lucida Console", monospace;
margin: 0; padding: 0;
}
.annotation pre {
display: block;
margin: 0;
padding: 7px 10px;
background: #fcfcfc;
-moz-box-shadow: inset 0 0 10px rgba(0,0,0,0.1);
-webkit-box-shadow: inset 0 0 10px rgba(0,0,0,0.1);
box-shadow: inset 0 0 10px rgba(0,0,0,0.1);
overflow-x: auto;
}
.annotation pre code {
border: 0;
padding: 0;
background: transparent;
}
blockquote {
border-left: 5px solid #ccc;
margin: 0;
padding: 1px 0 1px 1em;
}
.sections blockquote p {
font-family: Menlo, Consolas, Monaco, monospace;
font-size: 12px; line-height: 16px;
color: #999;
margin: 10px 0 0;
white-space: pre-wrap;
}
ul.sections {
list-style: none;
padding:0 0 5px 0;;
margin:0;
}
/*
Force border-box so that % widths fit the parent
container without overlap because of margin/padding.
More Info : http://www.quirksmode.org/css/box.html
*/
ul.sections > li > div {
-moz-box-sizing: border-box; /* firefox */
-ms-box-sizing: border-box; /* ie */
-webkit-box-sizing: border-box; /* webkit */
-khtml-box-sizing: border-box; /* konqueror */
box-sizing: border-box; /* css3 */
}
/*---------------------- Jump Page -----------------------------*/
#jump_to, #jump_page {
margin: 0;
background: white;
-webkit-box-shadow: 0 0 25px #777; -moz-box-shadow: 0 0 25px #777;
-webkit-border-bottom-left-radius: 5px; -moz-border-radius-bottomleft: 5px;
font: 16px Arial;
cursor: pointer;
text-align: right;
list-style: none;
}
#jump_to a {
text-decoration: none;
}
#jump_to a.large {
display: none;
}
#jump_to a.small {
font-size: 22px;
font-weight: bold;
color: #676767;
}
#jump_to, #jump_wrapper {
position: fixed;
right: 0; top: 0;
padding: 10px 15px;
margin:0;
}
#jump_wrapper {
display: none;
padding:0;
}
#jump_to:hover #jump_wrapper {
display: block;
}
#jump_page_wrapper{
position: fixed;
right: 0;
top: 0;
bottom: 0;
}
#jump_page {
padding: 5px 0 3px;
margin: 0 0 25px 25px;
max-height: 100%;
overflow: auto;
}
#jump_page .source {
display: block;
padding: 15px;
text-decoration: none;
border-top: 1px solid #eee;
}
#jump_page .source:hover {
background: #f5f5ff;
}
#jump_page .source:first-child {
}
/*---------------------- Low resolutions (> 320px) ---------------------*/
@media only screen and (min-width: 320px) {
.pilwrap { display: none; }
ul.sections > li > div {
display: block;
padding:5px 10px 0 10px;
}
ul.sections > li > div.annotation ul, ul.sections > li > div.annotation ol {
padding-left: 30px;
}
ul.sections > li > div.content {
overflow-x:auto;
-webkit-box-shadow: inset 0 0 5px #e5e5ee;
box-shadow: inset 0 0 5px #e5e5ee;
border: 1px solid #dedede;
margin:5px 10px 5px 10px;
padding-bottom: 5px;
}
ul.sections > li > div.annotation pre {
margin: 7px 0 7px;
padding-left: 15px;
}
ul.sections > li > div.annotation p tt, .annotation code {
background: #f8f8ff;
border: 1px solid #dedede;
font-size: 12px;
padding: 0 0.2em;
}
}
/*---------------------- (> 481px) ---------------------*/
@media only screen and (min-width: 481px) {
#container {
position: relative;
}
body {
background-color: #F5F5FF;
font-size: 15px;
line-height: 21px;
}
pre, tt, code {
line-height: 18px;
}
p, ul, ol {
margin: 0 0 15px;
}
#jump_to {
padding: 5px 10px;
}
#jump_wrapper {
padding: 0;
}
#jump_to, #jump_page {
font: 10px Arial;
text-transform: uppercase;
}
#jump_page .source {
padding: 5px 10px;
}
#jump_to a.large {
display: inline-block;
}
#jump_to a.small {
display: none;
}
#background {
position: absolute;
top: 0; bottom: 0;
width: 350px;
background: #fff;
border-right: 1px solid #e5e5ee;
z-index: -1;
}
ul.sections > li > div.annotation ul, ul.sections > li > div.annotation ol {
padding-left: 40px;
}
ul.sections > li {
white-space: nowrap;
}
ul.sections > li > div {
display: inline-block;
}
ul.sections > li > div.annotation {
max-width: 350px;
min-width: 350px;
min-height: 5px;
padding: 13px;
overflow-x: hidden;
white-space: normal;
vertical-align: top;
text-align: left;
}
ul.sections > li > div.annotation pre {
margin: 15px 0 15px;
padding-left: 15px;
}
ul.sections > li > div.content {
padding: 13px;
vertical-align: top;
border: none;
-webkit-box-shadow: none;
box-shadow: none;
}
.pilwrap {
position: relative;
display: inline;
}
.pilcrow {
font: 12px Arial;
text-decoration: none;
color: #454545;
position: absolute;
top: 3px; left: -20px;
padding: 1px 2px;
opacity: 0;
-webkit-transition: opacity 0.2s linear;
}
.for-h1 .pilcrow {
top: 47px;
}
.for-h2 .pilcrow, .for-h3 .pilcrow, .for-h4 .pilcrow {
top: 35px;
}
ul.sections > li > div.annotation:hover .pilcrow {
opacity: 1;
}
}
/*---------------------- (> 1025px) ---------------------*/
@media only screen and (min-width: 1025px) {
body {
font-size: 16px;
line-height: 24px;
}
#background {
width: 525px;
}
ul.sections > li > div.annotation {
max-width: 525px;
min-width: 525px;
padding: 10px 25px 1px 50px;
}
ul.sections > li > div.content {
padding: 9px 15px 16px 25px;
}
}
/*---------------------- Syntax Highlighting -----------------------------*/
td.linenos { background-color: #f0f0f0; padding-right: 10px; }
span.lineno { background-color: #f0f0f0; padding: 0 5px 0 5px; }
/*
github.com style (c) Vasily Polovnyov <vast@whiteants.net>
*/
pre code {
display: block; padding: 0.5em;
color: #000;
background: #f8f8ff
}
pre .hljs-comment,
pre .hljs-template_comment,
pre .hljs-diff .hljs-header,
pre .hljs-javadoc {
color: #408080;
font-style: italic
}
pre .hljs-keyword,
pre .hljs-assignment,
pre .hljs-literal,
pre .hljs-css .hljs-rule .hljs-keyword,
pre .hljs-winutils,
pre .hljs-javascript .hljs-title,
pre .hljs-lisp .hljs-title,
pre .hljs-subst {
color: #954121;
/*font-weight: bold*/
}
pre .hljs-number,
pre .hljs-hexcolor {
color: #40a070
}
pre .hljs-string,
pre .hljs-tag .hljs-value,
pre .hljs-phpdoc,
pre .hljs-tex .hljs-formula {
color: #219161;
}
pre .hljs-title,
pre .hljs-id {
color: #19469D;
}
pre .hljs-params {
color: #00F;
}
pre .hljs-javascript .hljs-title,
pre .hljs-lisp .hljs-title,
pre .hljs-subst {
font-weight: normal
}
pre .hljs-class .hljs-title,
pre .hljs-haskell .hljs-label,
pre .hljs-tex .hljs-command {
color: #458;
font-weight: bold
}
pre .hljs-tag,
pre .hljs-tag .hljs-title,
pre .hljs-rules .hljs-property,
pre .hljs-django .hljs-tag .hljs-keyword {
color: #000080;
font-weight: normal
}
pre .hljs-attribute,
pre .hljs-variable,
pre .hljs-instancevar,
pre .hljs-lisp .hljs-body {
color: #008080
}
pre .hljs-regexp {
color: #B68
}
pre .hljs-class {
color: #458;
font-weight: bold
}
pre .hljs-symbol,
pre .hljs-ruby .hljs-symbol .hljs-string,
pre .hljs-ruby .hljs-symbol .hljs-keyword,
pre .hljs-ruby .hljs-symbol .hljs-keymethods,
pre .hljs-lisp .hljs-keyword,
pre .hljs-tex .hljs-special,
pre .hljs-input_number {
color: #990073
}
pre .hljs-builtin,
pre .hljs-constructor,
pre .hljs-built_in,
pre .hljs-lisp .hljs-title {
color: #0086b3
}
pre .hljs-preprocessor,
pre .hljs-pi,
pre .hljs-doctype,
pre .hljs-shebang,
pre .hljs-cdata {
color: #999;
font-weight: bold
}
pre .hljs-deletion {
background: #fdd
}
pre .hljs-addition {
background: #dfd
}
pre .hljs-diff .hljs-change {
background: #0086b3
}
pre .hljs-chunk {
color: #aaa
}
pre .hljs-tex .hljs-formula {
opacity: 0.5;
}

View File

@ -1,494 +0,0 @@
<!DOCTYPE html>
<html>
<head>
<title>consumer.js</title>
<meta http-equiv="content-type" content="text/html; charset=UTF-8">
<meta name="viewport" content="width=device-width, target-densitydpi=160dpi, initial-scale=1.0; maximum-scale=1.0; user-scalable=0;">
<link rel="stylesheet" media="all" href="../docco.css" />
</head>
<body>
<div id="container">
<div id="background"></div>
<ul id="jump_to">
<li>
<a class="large" href="javascript:void(0);">Jump To &hellip;</a>
<a class="small" href="javascript:void(0);">+</a>
<div id="jump_wrapper">
<div id="jump_page_wrapper">
<div id="jump_page">
<a class="source" href="consumer.html">
./src/consumer.js
</a>
<a class="source" href="index.html">
./src/index.js
</a>
<a class="source" href="json-stream.html">
./src/json-stream.js
</a>
<a class="source" href="socket.html">
./src/socket.js
</a>
</div>
</div>
</li>
</ul>
<ul class="sections">
<li id="title">
<div class="annotation">
<h1>consumer.js</h1>
</div>
</li>
<li id="section-1">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-1">&#182;</a>
</div>
</div>
<div class="content"><div class='highlight'><pre><span class="hljs-keyword">import</span> { Socket } <span class="hljs-keyword">from</span> <span class="hljs-string">'net'</span>
<span class="hljs-keyword">import</span> path <span class="hljs-keyword">from</span> <span class="hljs-string">'path'</span>
<span class="hljs-keyword">import</span> btc <span class="hljs-keyword">from</span> <span class="hljs-string">'better-try-catch'</span>
<span class="hljs-keyword">import</span> JsonStream <span class="hljs-keyword">from</span> <span class="hljs-string">'./json-stream'</span></pre></div></div>
</li>
<li id="section-2">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-2">&#182;</a>
</div>
<p>import logger from ../../uci-logger/src/logger</p>
</div>
<div class="content"><div class='highlight'><pre><span class="hljs-keyword">import</span> logger <span class="hljs-keyword">from</span> <span class="hljs-string">'@uci/logger'</span>
<span class="hljs-keyword">let</span> log = {}</pre></div></div>
</li>
<li id="section-3">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-3">&#182;</a>
</div>
<p>TODO change default pipe dir for windows and mac os</p>
</div>
<div class="content"><div class='highlight'><pre><span class="hljs-keyword">const</span> DEFAULT_PIPE_DIR = (process.env.SOCKETS_DIR || <span class="hljs-string">'/tmp/UCI'</span>)
<span class="hljs-keyword">const</span> DEFAULT_SOCKET_NAME = <span class="hljs-string">'uci-sock'</span>
<span class="hljs-keyword">export</span> <span class="hljs-keyword">default</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">Consumer</span> <span class="hljs-keyword">extends</span> <span class="hljs-title">Socket</span> </span>{
<span class="hljs-keyword">constructor</span> (opts={}) {
<span class="hljs-keyword">super</span>()
log = logger({<span class="hljs-attr">file</span>:<span class="hljs-string">'src/consumer.js'</span>,<span class="hljs-attr">class</span>:<span class="hljs-string">'Consumer'</span>,<span class="hljs-attr">name</span>:<span class="hljs-string">'socket'</span>,<span class="hljs-attr">id</span>:<span class="hljs-keyword">this</span>.id})
<span class="hljs-keyword">this</span>.id = opts.id || opts.name || <span class="hljs-string">'socket:'</span>+ <span class="hljs-keyword">new</span> <span class="hljs-built_in">Date</span>().getTime()
<span class="hljs-keyword">if</span> (!opts.path) {
log.warn({<span class="hljs-attr">opts</span>:opts},<span class="hljs-string">'no host supplied using localhost...use named piped instead'</span>)
opts.host = opts.host || <span class="hljs-string">'127.0.0.1'</span>
opts.port = opts.port || <span class="hljs-number">8080</span>
} <span class="hljs-keyword">else</span> {
<span class="hljs-keyword">if</span> (<span class="hljs-keyword">typeof</span> opts.path === <span class="hljs-string">'boolean'</span>) opts.path = path.join(DEFAULT_PIPE_DIR,DEFAULT_SOCKET_NAME )
<span class="hljs-keyword">if</span> (path.dirname(opts.path)===<span class="hljs-string">'.'</span>) opts.path = path.join(DEFAULT_PIPE_DIR,opts.path )
}
<span class="hljs-keyword">this</span>.opts=opts
<span class="hljs-keyword">this</span>.keepAlive = <span class="hljs-string">'keepAlive'</span> <span class="hljs-keyword">in</span> opts ? opts.keepAlive : <span class="hljs-literal">true</span>
<span class="hljs-keyword">this</span>._ready = <span class="hljs-literal">false</span>
<span class="hljs-keyword">this</span>.timeout = opts.timeout || <span class="hljs-number">30</span>
<span class="hljs-keyword">this</span>.wait = opts.wait || <span class="hljs-number">1</span>
<span class="hljs-keyword">this</span>.stream = <span class="hljs-keyword">new</span> JsonStream()</pre></div></div>
</li>
<li id="section-4">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-4">&#182;</a>
</div>
<p>bind to class for other class functions</p>
</div>
<div class="content"><div class='highlight'><pre> <span class="hljs-keyword">this</span>.connect = <span class="hljs-keyword">this</span>.connect.bind(<span class="hljs-keyword">this</span>)
<span class="hljs-keyword">this</span>.__ready = <span class="hljs-keyword">this</span>.__ready.bind(<span class="hljs-keyword">this</span>)</pre></div></div>
</li>
<li id="section-5">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-5">&#182;</a>
</div>
<p>this.<em>write = this.</em>write.bind(this)</p>
</div>
<div class="content"><div class='highlight'><pre> }
<span class="hljs-keyword">async</span> connect () {
<span class="hljs-keyword">return</span> <span class="hljs-keyword">new</span> <span class="hljs-built_in">Promise</span>( <span class="hljs-function">(<span class="hljs-params">resolve,reject</span>) =&gt;</span> {
<span class="hljs-keyword">const</span> connect = <span class="hljs-function"><span class="hljs-params">()</span> =&gt;</span> {
<span class="hljs-keyword">if</span> (<span class="hljs-keyword">this</span>.opts.host ===<span class="hljs-string">'127.0.0.1'</span>) log.warn(<span class="hljs-string">'tcp consumer on same machine as host, use named Pipe(Unix) Socket Instead'</span>)
log.info({<span class="hljs-attr">opts</span>:<span class="hljs-keyword">this</span>.opts},<span class="hljs-string">`attempting to connect <span class="hljs-subst">${<span class="hljs-keyword">this</span>.id}</span> to socket`</span>)
<span class="hljs-keyword">super</span>.connect(<span class="hljs-keyword">this</span>.opts)
}
<span class="hljs-keyword">let</span> reconnect = {}
<span class="hljs-keyword">const</span> timeout = setTimeout(<span class="hljs-function"><span class="hljs-params">()</span> =&gt;</span>{
clearTimeout(reconnect)
log.fatal({<span class="hljs-attr">opts</span>:<span class="hljs-keyword">this</span>.opts},<span class="hljs-string">`unable to connect in <span class="hljs-subst">${<span class="hljs-keyword">this</span>.timeout}</span>s`</span>)
reject({<span class="hljs-attr">opts</span>:<span class="hljs-keyword">this</span>.opts},<span class="hljs-string">`unable to connect to socket server in <span class="hljs-subst">${<span class="hljs-keyword">this</span>.timeout}</span>secs`</span>)
}
,<span class="hljs-keyword">this</span>.timeout*<span class="hljs-number">1000</span>)
<span class="hljs-keyword">this</span>.once(<span class="hljs-string">'connect'</span>, <span class="hljs-keyword">async</span> () =&gt; {
clearTimeout(timeout)
<span class="hljs-keyword">this</span>._listen()
log.info({<span class="hljs-attr">opts</span>:<span class="hljs-keyword">this</span>.opts},<span class="hljs-string">'connected waiting for socket ready handshake'</span>)
<span class="hljs-keyword">this</span>.setKeepAlive(<span class="hljs-keyword">this</span>.keepAlive,<span class="hljs-number">100</span>)
<span class="hljs-keyword">let</span> [err, res] = <span class="hljs-keyword">await</span> btc(isReady).bind(<span class="hljs-keyword">this</span>)(<span class="hljs-keyword">this</span>.__ready, <span class="hljs-keyword">this</span>.wait, <span class="hljs-keyword">this</span>.timeout)
<span class="hljs-keyword">if</span> (err) reject(err)
log.info(<span class="hljs-string">'handshake done, authenticating'</span>)</pre></div></div>
</li>
<li id="section-6">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-6">&#182;</a>
</div>
<p>TODO authenticate here by encrypting a payload with private key and sending that.
await btc(authenticate)</p>
</div>
<div class="content"><div class='highlight'><pre> resolve(res)
})
<span class="hljs-keyword">this</span>.on(<span class="hljs-string">'error'</span>, <span class="hljs-keyword">async</span> (err) =&gt; {
log.warn({<span class="hljs-attr">error</span>:err.code},<span class="hljs-string">`connect error <span class="hljs-subst">${err.code}</span>`</span>)
<span class="hljs-keyword">if</span> (err.code === <span class="hljs-string">'EISCONN'</span>) {
<span class="hljs-keyword">return</span> resolve(<span class="hljs-string">'ready'</span>)
}
reconnect = setTimeout( <span class="hljs-function"><span class="hljs-params">()</span> =&gt;</span>{ connect()
},<span class="hljs-keyword">this</span>.wait*<span class="hljs-number">1000</span> )
})
<span class="hljs-keyword">this</span>.on(<span class="hljs-string">'end'</span>, <span class="hljs-keyword">async</span> () =&gt; {
log.warn(<span class="hljs-string">'socket (server) terminated unexpectantly'</span>)
<span class="hljs-keyword">if</span> (<span class="hljs-keyword">this</span>.keepAlive) {
log.info(<span class="hljs-string">'keep alive was set, so waiting on server to come online for reconnect'</span>)
<span class="hljs-keyword">this</span>.destroy()
<span class="hljs-keyword">this</span>.emit(<span class="hljs-string">'error'</span>, {<span class="hljs-attr">code</span>:<span class="hljs-string">'DISCONNECTED'</span>})
}
})
connect() <span class="hljs-comment">// initial connect request</span>
}) <span class="hljs-comment">//end promise</span>
}
<span class="hljs-keyword">async</span> send(ipacket) {
<span class="hljs-keyword">return</span> <span class="hljs-keyword">new</span> <span class="hljs-built_in">Promise</span>( <span class="hljs-keyword">async</span> (resolve) =&gt; {</pre></div></div>
</li>
<li id="section-7">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-7">&#182;</a>
</div>
<p>need this for when multiple sends for different consumers use same packet instance</p>
</div>
<div class="content"><div class='highlight'><pre> <span class="hljs-keyword">let</span> packet = <span class="hljs-built_in">Object</span>.assign({},ipacket)
setTimeout(<span class="hljs-function"><span class="hljs-params">()</span> =&gt;</span> {resolve({<span class="hljs-attr">error</span>:<span class="hljs-string">'no response from socket in 10sec'</span>})},<span class="hljs-number">10000</span>)
packet._header =
{ <span class="hljs-attr">id</span>:<span class="hljs-built_in">Math</span>.random().toString().slice(<span class="hljs-number">2</span>), <span class="hljs-comment">// need this for when multiple sends for different consumers use same packet instanceack</span>
sender:{ <span class="hljs-attr">name</span>:<span class="hljs-keyword">this</span>.name, <span class="hljs-attr">instanceID</span>:<span class="hljs-keyword">this</span>.id },
<span class="hljs-attr">path</span>: <span class="hljs-keyword">this</span>.opts.path,
<span class="hljs-attr">port</span>: <span class="hljs-keyword">this</span>.opts.port,
<span class="hljs-attr">host</span>: <span class="hljs-keyword">this</span>.opts.host
}
<span class="hljs-keyword">let</span> [err, res] = <span class="hljs-keyword">await</span> btc(<span class="hljs-keyword">this</span>.stream.serialize)(packet)
<span class="hljs-keyword">if</span> (err) resolve({<span class="hljs-attr">error</span>:<span class="hljs-string">'unable to serialize packet for sending'</span>, <span class="hljs-attr">packet</span>:packet})
<span class="hljs-keyword">await</span> <span class="hljs-keyword">this</span>.__write(res)
<span class="hljs-keyword">this</span>.once(packet._header.id,<span class="hljs-keyword">async</span> <span class="hljs-function"><span class="hljs-keyword">function</span>(<span class="hljs-params">reply</span>)</span>{
<span class="hljs-keyword">let</span> res = <span class="hljs-keyword">await</span> <span class="hljs-keyword">this</span>._packetProcess(reply)
<span class="hljs-keyword">if</span> (!res) { <span class="hljs-comment">// if process was not promise returning like just logged to console</span>
res = reply</pre></div></div>
</li>
<li id="section-8">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-8">&#182;</a>
</div>
<p>log.warn(consumer function was not promise returning further processing may be out of sequence)</p>
</div>
<div class="content"><div class='highlight'><pre> }
resolve(res)
}) <span class="hljs-comment">//end listener</span>
})
}</pre></div></div>
</li>
<li id="section-9">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-9">&#182;</a>
</div>
<p>TODO register alt stream processor (emit message with JSON, serialize function, onData method for raw socket chucks)
TODO register authenciation function (set up default)</p>
</div>
<div class="content"><div class='highlight'><pre>
registerPacketProcessor (func) {
<span class="hljs-keyword">this</span>._packetProcess = func
}</pre></div></div>
</li>
<li id="section-10">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-10">&#182;</a>
</div>
<p>PRIVATE METHODS</p>
</div>
<div class="content"><div class='highlight'><pre>
<span class="hljs-keyword">async</span> __write(packet) {</pre></div></div>
</li>
<li id="section-11">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-11">&#182;</a>
</div>
<p>timeout already set if sockect cant be drained in 10 secs</p>
</div>
<div class="content"><div class='highlight'><pre> <span class="hljs-keyword">return</span> <span class="hljs-keyword">new</span> <span class="hljs-built_in">Promise</span>(<span class="hljs-function"><span class="hljs-params">resolve</span> =&gt;</span> {
<span class="hljs-keyword">const</span> cb = <span class="hljs-function"><span class="hljs-params">()</span> =&gt;</span> resolve(<span class="hljs-string">'packet written to consumer side socket stream '</span>)
<span class="hljs-keyword">if</span> (!<span class="hljs-keyword">super</span>.write(packet)) {
<span class="hljs-keyword">this</span>.once(<span class="hljs-string">'drain'</span>,cb )
} <span class="hljs-keyword">else</span> {
process.nextTick(cb)
}
})
}
__ready() {<span class="hljs-keyword">return</span> <span class="hljs-keyword">this</span>._ready}
<span class="hljs-keyword">async</span> _listen () {
log.info(<span class="hljs-string">'listening for incoming packets from socket'</span>)</pre></div></div>
</li>
<li id="section-12">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-12">&#182;</a>
</div>
<p>listen for pushed packets</p>
</div>
<div class="content"><div class='highlight'><pre> <span class="hljs-keyword">this</span>.on(<span class="hljs-string">'pushed'</span>,<span class="hljs-keyword">async</span> <span class="hljs-function"><span class="hljs-keyword">function</span>(<span class="hljs-params">packet</span>)</span>{</pre></div></div>
</li>
<li id="section-13">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-13">&#182;</a>
</div>
<p>TODO do some extra security here?</p>
</div>
<div class="content"><div class='highlight'><pre> <span class="hljs-keyword">let</span> res = <span class="hljs-keyword">await</span> <span class="hljs-keyword">this</span>._packetProcess(packet)
<span class="hljs-keyword">if</span> (!res) { <span class="hljs-comment">// if process was not promise returning like just logged to console</span></pre></div></div>
</li>
<li id="section-14">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-14">&#182;</a>
</div>
<p>log.warn(consumer function was not promise returning)</p>
</div>
<div class="content"><div class='highlight'><pre> }
})</pre></div></div>
</li>
<li id="section-15">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-15">&#182;</a>
</div>
<p>listen on socket stream</p>
</div>
<div class="content"><div class='highlight'><pre> <span class="hljs-keyword">this</span>.on(<span class="hljs-string">'data'</span>, <span class="hljs-keyword">this</span>.stream.onData)
<span class="hljs-keyword">this</span>.stream.on(<span class="hljs-string">'message'</span>, messageProcess.bind(<span class="hljs-keyword">this</span>))
<span class="hljs-keyword">async</span> <span class="hljs-function"><span class="hljs-keyword">function</span> <span class="hljs-title">messageProcess</span> (<span class="hljs-params">packet</span>) </span>{</pre></div></div>
</li>
<li id="section-16">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-16">&#182;</a>
</div>
<p>console.log(incoming packet from socket,packet)</p>
</div>
<div class="content"><div class='highlight'><pre> <span class="hljs-keyword">if</span> (packet._handshake) {
<span class="hljs-keyword">this</span>._ready = <span class="hljs-literal">true</span>
<span class="hljs-keyword">return</span> }</pre></div></div>
</li>
<li id="section-17">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-17">&#182;</a>
</div>
<p>TODO send back ack with consumer ID and authorization and wait
when authorized drop through here to emit</p>
</div>
<div class="content"><div class='highlight'><pre> <span class="hljs-keyword">this</span>.emit(packet._header.id, packet)
}
}</pre></div></div>
</li>
<li id="section-18">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-18">&#182;</a>
</div>
<p>default packet process just a simple console logger. ignores any cmd: prop</p>
</div>
<div class="content"><div class='highlight'><pre> _packetProcess (packet) {
<span class="hljs-built_in">console</span>.log(<span class="hljs-string">'default consumer processor -- log packet from socket to console'</span>)
<span class="hljs-built_in">console</span>.dir(packet)
}
} <span class="hljs-comment">// end class</span></pre></div></div>
</li>
<li id="section-19">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-19">&#182;</a>
</div>
<p>HELP FUNCTIONS
wait until a passed ready function returns true</p>
</div>
<div class="content"><div class='highlight'><pre><span class="hljs-function"><span class="hljs-keyword">function</span> <span class="hljs-title">isReady</span>(<span class="hljs-params">ready, wait=<span class="hljs-number">30</span>, timeout=<span class="hljs-number">1000</span></span>) </span>{
<span class="hljs-keyword">let</span> time = <span class="hljs-number">0</span>
<span class="hljs-keyword">return</span> <span class="hljs-keyword">new</span> <span class="hljs-built_in">Promise</span>(<span class="hljs-function">(<span class="hljs-params">resolve, reject</span>) =&gt;</span> {
(<span class="hljs-function"><span class="hljs-keyword">function</span> <span class="hljs-title">waitReady</span>(<span class="hljs-params"></span>)</span>{
<span class="hljs-keyword">if</span> (time &gt; timeout) <span class="hljs-keyword">return</span> reject(<span class="hljs-string">`timeout waiting for socket ready handshake - <span class="hljs-subst">${timeout}</span>ms`</span>)
<span class="hljs-keyword">if</span> (ready()) <span class="hljs-keyword">return</span> resolve(<span class="hljs-string">'ready'</span>)
log.info(<span class="hljs-string">`waiting <span class="hljs-subst">${wait}</span>ms for handshake`</span>)
time += wait
setTimeout(waitReady, wait)
})()
})
}</pre></div></div>
</li>
</ul>
</div>
</body>
</html>

View File

@ -1,78 +0,0 @@
<!DOCTYPE html>
<html>
<head>
<title>index.js</title>
<meta http-equiv="content-type" content="text/html; charset=UTF-8">
<meta name="viewport" content="width=device-width, target-densitydpi=160dpi, initial-scale=1.0; maximum-scale=1.0; user-scalable=0;">
<link rel="stylesheet" media="all" href="../docco.css" />
</head>
<body>
<div id="container">
<div id="background"></div>
<ul id="jump_to">
<li>
<a class="large" href="javascript:void(0);">Jump To &hellip;</a>
<a class="small" href="javascript:void(0);">+</a>
<div id="jump_wrapper">
<div id="jump_page_wrapper">
<div id="jump_page">
<a class="source" href="consumer.html">
./src/consumer.js
</a>
<a class="source" href="index.html">
./src/index.js
</a>
<a class="source" href="json-stream.html">
./src/json-stream.js
</a>
<a class="source" href="socket.html">
./src/socket.js
</a>
</div>
</div>
</li>
</ul>
<ul class="sections">
<li id="title">
<div class="annotation">
<h1>index.js</h1>
</div>
</li>
<li id="section-1">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-1">&#182;</a>
</div>
</div>
<div class="content"><div class='highlight'><pre><span class="hljs-keyword">import</span> Socket <span class="hljs-keyword">from</span> <span class="hljs-string">'./socket'</span>
<span class="hljs-keyword">import</span> Consumer <span class="hljs-keyword">from</span> <span class="hljs-string">'./consumer'</span>
<span class="hljs-keyword">export</span> { Socket <span class="hljs-keyword">as</span> Socket }
<span class="hljs-keyword">export</span> { Consumer <span class="hljs-keyword">as</span> Consumer }
<span class="hljs-keyword">export</span> <span class="hljs-keyword">default</span> { Socket, Consumer }</pre></div></div>
</li>
</ul>
</div>
</body>
</html>

View File

@ -1,199 +0,0 @@
<!DOCTYPE html>
<html>
<head>
<title>json-stream.js</title>
<meta http-equiv="content-type" content="text/html; charset=UTF-8">
<meta name="viewport" content="width=device-width, target-densitydpi=160dpi, initial-scale=1.0; maximum-scale=1.0; user-scalable=0;">
<link rel="stylesheet" media="all" href="../docco.css" />
</head>
<body>
<div id="container">
<div id="background"></div>
<ul id="jump_to">
<li>
<a class="large" href="javascript:void(0);">Jump To &hellip;</a>
<a class="small" href="javascript:void(0);">+</a>
<div id="jump_wrapper">
<div id="jump_page_wrapper">
<div id="jump_page">
<a class="source" href="consumer.html">
./src/consumer.js
</a>
<a class="source" href="index.html">
./src/index.js
</a>
<a class="source" href="json-stream.html">
./src/json-stream.js
</a>
<a class="source" href="socket.html">
./src/socket.js
</a>
</div>
</div>
</li>
</ul>
<ul class="sections">
<li id="title">
<div class="annotation">
<h1>json-stream.js</h1>
</div>
</li>
<li id="section-1">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-1">&#182;</a>
</div>
<p>adpated from <a href="https://github.com/sebastianseilund/node-json-socket">https://github.com/sebastianseilund/node-json-socket</a></p>
</div>
<div class="content"><div class='highlight'><pre>
<span class="hljs-keyword">import</span> {StringDecoder} <span class="hljs-keyword">from</span> <span class="hljs-string">'string_decoder'</span>
<span class="hljs-keyword">import</span> EventEmitter <span class="hljs-keyword">from</span> <span class="hljs-string">'events'</span>
<span class="hljs-keyword">import</span> btc <span class="hljs-keyword">from</span> <span class="hljs-string">'better-try-catch'</span>
<span class="hljs-keyword">const</span> decoder = <span class="hljs-keyword">new</span> StringDecoder()
<span class="hljs-keyword">export</span> <span class="hljs-keyword">default</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">JsonStream</span> <span class="hljs-keyword">extends</span> <span class="hljs-title">EventEmitter</span></span>{
<span class="hljs-keyword">constructor</span>(opts={}){
<span class="hljs-keyword">super</span>()
<span class="hljs-keyword">this</span>._contentLength = <span class="hljs-literal">null</span>
<span class="hljs-keyword">this</span>._buffer = <span class="hljs-string">''</span>
<span class="hljs-keyword">this</span>._delimeter = opts.delimiter || <span class="hljs-string">'#'</span>
<span class="hljs-keyword">this</span>.onData = <span class="hljs-keyword">this</span>.onData.bind(<span class="hljs-keyword">this</span>)
<span class="hljs-keyword">this</span>.serialize = <span class="hljs-keyword">this</span>.serialize.bind(<span class="hljs-keyword">this</span>)
}
onData (data) {</pre></div></div>
</li>
<li id="section-2">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-2">&#182;</a>
</div>
<p>console.log(a chunk arrived, data)</p>
</div>
<div class="content"><div class='highlight'><pre> data = decoder.write(data)
<span class="hljs-keyword">try</span> {
<span class="hljs-keyword">this</span>._handleData(data)
} <span class="hljs-keyword">catch</span> (e) {
<span class="hljs-keyword">this</span>.emit(<span class="hljs-string">'error'</span>, { <span class="hljs-attr">error</span>: e })
}
}
<span class="hljs-keyword">async</span> serialize(message) {
<span class="hljs-keyword">return</span> <span class="hljs-keyword">new</span> <span class="hljs-built_in">Promise</span>( <span class="hljs-function">(<span class="hljs-params">resolve,reject</span>) =&gt;</span> {
<span class="hljs-keyword">let</span> [err,messageData] = btc(<span class="hljs-built_in">JSON</span>.stringify)(message)
<span class="hljs-keyword">if</span> (err) reject(err)
<span class="hljs-keyword">let</span> [err2,length] = btc(Buffer.byteLength)(messageData, <span class="hljs-string">'utf8'</span>)
<span class="hljs-keyword">if</span> (err2) reject(err2)
<span class="hljs-keyword">let</span> data = length + <span class="hljs-keyword">this</span>._delimeter + messageData</pre></div></div>
</li>
<li id="section-3">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-3">&#182;</a>
</div>
<p>console.log(serialized,data)</p>
</div>
<div class="content"><div class='highlight'><pre> resolve(data)
})
}
_handleData (data) {
<span class="hljs-keyword">this</span>._buffer += data
<span class="hljs-keyword">if</span> (<span class="hljs-keyword">this</span>._contentLength == <span class="hljs-literal">null</span>) {
<span class="hljs-keyword">var</span> i = <span class="hljs-keyword">this</span>._buffer.indexOf(<span class="hljs-keyword">this</span>._delimeter)</pre></div></div>
</li>
<li id="section-4">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-4">&#182;</a>
</div>
<p>Check if the buffer has a this._opts.delimeter or “#”, if not, the end of the buffer string might be in the middle of a content length string</p>
</div>
<div class="content"><div class='highlight'><pre> <span class="hljs-keyword">if</span> (i !== <span class="hljs-number">-1</span>) {
<span class="hljs-keyword">var</span> rawContentLength = <span class="hljs-keyword">this</span>._buffer.substring(<span class="hljs-number">0</span>, i)
<span class="hljs-keyword">this</span>._contentLength = <span class="hljs-built_in">parseInt</span>(rawContentLength)
<span class="hljs-keyword">if</span> (<span class="hljs-built_in">isNaN</span>(<span class="hljs-keyword">this</span>._contentLength)) {
<span class="hljs-keyword">this</span>._contentLength = <span class="hljs-literal">null</span>
<span class="hljs-keyword">this</span>._buffer = <span class="hljs-string">''</span>
<span class="hljs-keyword">var</span> err = <span class="hljs-keyword">new</span> <span class="hljs-built_in">Error</span>(<span class="hljs-string">'Invalid content length supplied ('</span>+rawContentLength+<span class="hljs-string">') in: '</span>+<span class="hljs-keyword">this</span>._buffer)
err.code = <span class="hljs-string">'E_INVALID_CONTENT_LENGTH'</span>
<span class="hljs-keyword">throw</span> err
}
<span class="hljs-keyword">this</span>._buffer = <span class="hljs-keyword">this</span>._buffer.substring(i+<span class="hljs-number">1</span>)
}
}
<span class="hljs-keyword">if</span> (<span class="hljs-keyword">this</span>._contentLength != <span class="hljs-literal">null</span>) {
<span class="hljs-keyword">var</span> length = Buffer.byteLength(<span class="hljs-keyword">this</span>._buffer, <span class="hljs-string">'utf8'</span>)
<span class="hljs-keyword">if</span> (length == <span class="hljs-keyword">this</span>._contentLength) {
<span class="hljs-keyword">this</span>._handleMessage(<span class="hljs-keyword">this</span>._buffer)
} <span class="hljs-keyword">else</span> <span class="hljs-keyword">if</span> (length &gt; <span class="hljs-keyword">this</span>._contentLength) {
<span class="hljs-keyword">var</span> message = <span class="hljs-keyword">this</span>._buffer.substring(<span class="hljs-number">0</span>, <span class="hljs-keyword">this</span>._contentLength)
<span class="hljs-keyword">var</span> rest = <span class="hljs-keyword">this</span>._buffer.substring(<span class="hljs-keyword">this</span>._contentLength)
<span class="hljs-keyword">this</span>._handleMessage(message)
<span class="hljs-keyword">this</span>.onData(rest)
}
}
}
_handleMessage (data) {
<span class="hljs-keyword">this</span>._contentLength = <span class="hljs-literal">null</span>
<span class="hljs-keyword">this</span>._buffer = <span class="hljs-string">''</span>
<span class="hljs-keyword">var</span> message
<span class="hljs-keyword">try</span> {
message = <span class="hljs-built_in">JSON</span>.parse(data)
} <span class="hljs-keyword">catch</span> (e) {
<span class="hljs-keyword">var</span> err = <span class="hljs-keyword">new</span> <span class="hljs-built_in">Error</span>(<span class="hljs-string">'Could not parse JSON: '</span>+e.message+<span class="hljs-string">'\nRequest data: '</span>+data)
err.code = <span class="hljs-string">'E_INVALID_JSON'</span>
<span class="hljs-keyword">throw</span> err
}
message = message || {}
<span class="hljs-keyword">this</span>.emit(<span class="hljs-string">'message'</span>, message)
}
}</pre></div></div>
</li>
</ul>
</div>
</body>
</html>

View File

@ -1,403 +0,0 @@
<!DOCTYPE html>
<html>
<head>
<title>socket.js</title>
<meta http-equiv="content-type" content="text/html; charset=UTF-8">
<meta name="viewport" content="width=device-width, target-densitydpi=160dpi, initial-scale=1.0; maximum-scale=1.0; user-scalable=0;">
<link rel="stylesheet" media="all" href="../docco.css" />
</head>
<body>
<div id="container">
<div id="background"></div>
<ul id="jump_to">
<li>
<a class="large" href="javascript:void(0);">Jump To &hellip;</a>
<a class="small" href="javascript:void(0);">+</a>
<div id="jump_wrapper">
<div id="jump_page_wrapper">
<div id="jump_page">
<a class="source" href="consumer.html">
./src/consumer.js
</a>
<a class="source" href="index.html">
./src/index.js
</a>
<a class="source" href="json-stream.html">
./src/json-stream.js
</a>
<a class="source" href="socket.html">
./src/socket.js
</a>
</div>
</div>
</li>
</ul>
<ul class="sections">
<li id="title">
<div class="annotation">
<h1>socket.js</h1>
</div>
</li>
<li id="section-1">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-1">&#182;</a>
</div>
</div>
<div class="content"><div class='highlight'><pre><span class="hljs-keyword">import</span> { Server } <span class="hljs-keyword">from</span> <span class="hljs-string">'net'</span>
<span class="hljs-keyword">import</span> { unlink <span class="hljs-keyword">as</span> fileDelete } <span class="hljs-keyword">from</span> <span class="hljs-string">'fs'</span>
<span class="hljs-keyword">import</span> { promisify } <span class="hljs-keyword">from</span> <span class="hljs-string">'util'</span>
<span class="hljs-keyword">import</span> path <span class="hljs-keyword">from</span> <span class="hljs-string">'path'</span>
<span class="hljs-keyword">import</span> mkdir <span class="hljs-keyword">from</span> <span class="hljs-string">'make-dir'</span>
<span class="hljs-keyword">import</span> btc <span class="hljs-keyword">from</span> <span class="hljs-string">'better-try-catch'</span>
<span class="hljs-keyword">import</span> _ON_DEATH <span class="hljs-keyword">from</span> <span class="hljs-string">'death'</span> <span class="hljs-comment">//this is intentionally ugly</span>
<span class="hljs-keyword">import</span> JSONStream <span class="hljs-keyword">from</span> <span class="hljs-string">'./json-stream'</span>
<span class="hljs-keyword">import</span> clone <span class="hljs-keyword">from</span> <span class="hljs-string">'clone'</span></pre></div></div>
</li>
<li id="section-2">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-2">&#182;</a>
</div>
<p>import logger from ../../uci-logger/src/logger</p>
</div>
<div class="content"><div class='highlight'><pre><span class="hljs-keyword">import</span> logger <span class="hljs-keyword">from</span> <span class="hljs-string">'@uci/logger'</span>
<span class="hljs-keyword">let</span> log = {}</pre></div></div>
</li>
<li id="section-3">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-3">&#182;</a>
</div>
<p>TODO change default pipe dir for windows and mac os</p>
</div>
<div class="content"><div class='highlight'><pre><span class="hljs-keyword">const</span> DEFAULT_PIPE_DIR = (process.env.SOCKETS_DIR || <span class="hljs-string">'/tmp/UCI'</span>)
<span class="hljs-keyword">const</span> DEFAULT_SOCKET_NAME = <span class="hljs-string">'uci-sock'</span>
<span class="hljs-keyword">export</span> <span class="hljs-keyword">default</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">Socket</span> <span class="hljs-keyword">extends</span> <span class="hljs-title">Server</span> </span>{
<span class="hljs-keyword">constructor</span> (opts={}) {
<span class="hljs-keyword">super</span>()
<span class="hljs-keyword">this</span>.id = opts.id || opts.name || <span class="hljs-string">'socket:'</span>+ <span class="hljs-keyword">new</span> <span class="hljs-built_in">Date</span>().getTime()
<span class="hljs-keyword">if</span> (!opts.path) {
opts.host = opts.host || <span class="hljs-string">'0.0.0.0'</span>
opts.port = opts.port || <span class="hljs-number">8080</span>
} <span class="hljs-keyword">else</span> {
<span class="hljs-keyword">if</span> (<span class="hljs-keyword">typeof</span> opts.path === <span class="hljs-string">'boolean'</span>) opts.path = path.join(DEFAULT_PIPE_DIR,DEFAULT_SOCKET_NAME )
<span class="hljs-keyword">if</span> (path.dirname(opts.path)===<span class="hljs-string">'.'</span>) opts.path = path.join(DEFAULT_PIPE_DIR,opts.path )
}
<span class="hljs-keyword">this</span>.clientTracking = opts.clientTracking || <span class="hljs-literal">true</span>
<span class="hljs-keyword">this</span>.clients = [] <span class="hljs-comment">// track consumers (i.e. clients)</span>
<span class="hljs-keyword">this</span>.opts = opts <span class="hljs-comment">// for use to recover from selected errors</span></pre></div></div>
</li>
<li id="section-4">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-4">&#182;</a>
</div>
<p>self bindings</p>
</div>
<div class="content"><div class='highlight'><pre> <span class="hljs-keyword">this</span>._listen = <span class="hljs-keyword">this</span>._listen.bind(<span class="hljs-keyword">this</span>)
<span class="hljs-keyword">this</span>.create = <span class="hljs-keyword">this</span>.create.bind(<span class="hljs-keyword">this</span>)
log = logger({<span class="hljs-attr">file</span>:<span class="hljs-string">'src/socket.js'</span>,<span class="hljs-attr">class</span>:<span class="hljs-string">'Socket'</span>,<span class="hljs-attr">name</span>:<span class="hljs-string">'socket'</span>,<span class="hljs-attr">id</span>:<span class="hljs-keyword">this</span>.id})
} <span class="hljs-comment">// end constructor</span>
<span class="hljs-keyword">async</span> create () {
<span class="hljs-keyword">return</span> <span class="hljs-keyword">new</span> <span class="hljs-built_in">Promise</span>( <span class="hljs-keyword">async</span> (resolve,reject) =&gt; {</pre></div></div>
</li>
<li id="section-5">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-5">&#182;</a>
</div>
<p>couple ways to kill socket process when needed</p>
</div>
<div class="content"><div class='highlight'><pre> _ON_DEATH( <span class="hljs-keyword">async</span> () =&gt; {
log.info(<span class="hljs-string">'\nhe\'s dead jim'</span>)
<span class="hljs-keyword">await</span> <span class="hljs-keyword">this</span>._destroy()
})
process.once(<span class="hljs-string">'SIGUSR2'</span>, <span class="hljs-keyword">async</span> () =&gt; {
<span class="hljs-keyword">await</span> <span class="hljs-keyword">this</span>._destroy
process.kill(process.pid, <span class="hljs-string">'SIGUSR2'</span>)
})
<span class="hljs-keyword">this</span>.once(<span class="hljs-string">'error'</span>, <span class="hljs-keyword">async</span> (err) =&gt; {</pre></div></div>
</li>
<li id="section-6">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-6">&#182;</a>
</div>
<p>recover from socket file that was not removed</p>
</div>
<div class="content"><div class='highlight'><pre> <span class="hljs-keyword">if</span> (err.code === <span class="hljs-string">'EADDRINUSE'</span>) {
<span class="hljs-keyword">if</span> (<span class="hljs-keyword">this</span>.opts.path) { <span class="hljs-comment">// if TCP socket should already be dead</span>
<span class="hljs-keyword">let</span> [err, res] = <span class="hljs-keyword">await</span> btc(promisify(fileDelete))(<span class="hljs-keyword">this</span>.opts.path)
<span class="hljs-keyword">if</span>(!err) {
log.info({<span class="hljs-attr">res</span>:res, <span class="hljs-attr">socket</span>: <span class="hljs-keyword">this</span>.opts.path}, <span class="hljs-string">'socket already exists.....deleted'</span>)
<span class="hljs-keyword">return</span> <span class="hljs-keyword">await</span> <span class="hljs-keyword">this</span>._listen(<span class="hljs-keyword">this</span>.opts)
}
log.fatal({<span class="hljs-attr">err</span>:err},<span class="hljs-string">'error deleting socket. Can not establish a socket'</span>)
<span class="hljs-keyword">return</span> err
}
}
<span class="hljs-keyword">if</span> (err.code ===<span class="hljs-string">'EACCES'</span>){
<span class="hljs-built_in">console</span>.log({<span class="hljs-attr">socket</span>: <span class="hljs-keyword">this</span>.opts.path}, <span class="hljs-string">'directory does not exist...creating'</span>)
<span class="hljs-keyword">await</span> mkdir(path.dirname(<span class="hljs-keyword">this</span>.opts.path))
<span class="hljs-built_in">console</span>.log({<span class="hljs-attr">socket</span>: <span class="hljs-keyword">this</span>.opts.path}, <span class="hljs-string">'created'</span>)
log.warn({<span class="hljs-attr">socket</span>: <span class="hljs-keyword">this</span>.opts.path}, <span class="hljs-string">'directory does not exist...creating'</span>)
<span class="hljs-keyword">return</span> <span class="hljs-keyword">await</span> <span class="hljs-keyword">this</span>._listen(<span class="hljs-keyword">this</span>.opts)
}</pre></div></div>
</li>
<li id="section-7">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-7">&#182;</a>
</div>
<p>otherwise fatally exit</p>
</div>
<div class="content"><div class='highlight'><pre> log.info(err, <span class="hljs-string">'creating socket'</span>)
reject(err)
})
<span class="hljs-keyword">let</span> [err, res] = <span class="hljs-keyword">await</span> btc(<span class="hljs-keyword">this</span>._listen)(<span class="hljs-keyword">this</span>.opts)
<span class="hljs-keyword">if</span> (err) reject(err)
resolve(res)
}) <span class="hljs-comment">// end creeate promise</span>
} <span class="hljs-comment">// end create</span>
registerPacketProcessor (func) {
<span class="hljs-keyword">this</span>._packetProcess = func
}
<span class="hljs-keyword">async</span> _listen (opts) {
<span class="hljs-keyword">super</span>.listen(opts, <span class="hljs-keyword">async</span> (err, res) =&gt; {
<span class="hljs-keyword">if</span> (err) <span class="hljs-keyword">return</span> err</pre></div></div>
</li>
<li id="section-8">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-8">&#182;</a>
</div>
<p>this gets called for each client connection and is unique to each</p>
</div>
<div class="content"><div class='highlight'><pre> <span class="hljs-keyword">this</span>.on(<span class="hljs-string">'connection'</span>, <span class="hljs-keyword">async</span> (socket) =&gt; {
<span class="hljs-keyword">const</span> stream = <span class="hljs-keyword">new</span> JSONStream()
socket.stream = stream <span class="hljs-comment">// need this to track clients</span>
<span class="hljs-keyword">let</span> send = <span class="hljs-keyword">this</span>._send.bind(socket)
<span class="hljs-keyword">if</span> (<span class="hljs-keyword">this</span>.clientTracking) <span class="hljs-keyword">this</span>.clients.push(socket)</pre></div></div>
</li>
<li id="section-9">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-9">&#182;</a>
</div>
<p>TODO add close listener to socket to remove from this.clients</p>
</div>
<div class="content"><div class='highlight'><pre> log.info(<span class="hljs-string">'new consumer connecting'</span>)
log.info(<span class="hljs-keyword">await</span> send(<span class="hljs-keyword">await</span> stream.serialize({<span class="hljs-string">'_handshake'</span>:<span class="hljs-literal">true</span>})))
<span class="hljs-keyword">if</span> (<span class="hljs-keyword">this</span>.opts.conPacket) {
<span class="hljs-keyword">this</span>.opts.conPacket._header = { <span class="hljs-attr">id</span>:<span class="hljs-string">'pushed'</span>}
log.info({<span class="hljs-attr">conPacket</span>:<span class="hljs-keyword">this</span>.opts.conPacket},<span class="hljs-string">'pushing a preset command to just connected consumer'</span>)
send(<span class="hljs-keyword">await</span> stream.serialize(<span class="hljs-keyword">this</span>.opts.conPacket)) <span class="hljs-comment">// send a packet command on to consumer on connection</span>
}
socket.on(<span class="hljs-string">'data'</span>, stream.onData)</pre></div></div>
</li>
<li id="section-10">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-10">&#182;</a>
</div>
<p>TODO need to start error listener for stream so errors can be processed</p>
</div>
<div class="content"><div class='highlight'><pre> stream.on(<span class="hljs-string">'message'</span>, messageProcess.bind(<span class="hljs-keyword">this</span>,socket))
<span class="hljs-keyword">async</span> <span class="hljs-function"><span class="hljs-keyword">function</span> <span class="hljs-title">messageProcess</span> (<span class="hljs-params">client, packet</span>) </span>{
log.info({<span class="hljs-attr">packet</span>:packet},<span class="hljs-string">'incoming packet on socket side'</span>)
<span class="hljs-keyword">let</span> res = {}
<span class="hljs-keyword">if</span> (<span class="hljs-keyword">this</span>.clientTracking &amp;&amp; packet.clientID) {
client.ID = packet.clientID
res.cmd=<span class="hljs-string">'ackID'</span>
}
<span class="hljs-keyword">else</span> {
res = <span class="hljs-keyword">await</span> <span class="hljs-keyword">this</span>._packetProcess(clone(packet)) || {}
<span class="hljs-keyword">if</span> (<span class="hljs-built_in">Object</span>.keys(res).length === <span class="hljs-number">0</span>) res = { <span class="hljs-attr">error</span>: <span class="hljs-string">'socket packet command function likely did not return a promise'</span>, <span class="hljs-attr">packet</span>:packet}
}
<span class="hljs-keyword">if</span> (packet) {
res._header = clone(packet._header,<span class="hljs-literal">false</span>) || {} <span class="hljs-comment">//make sure return packet has header with id in case it was removed in processing</span>
<span class="hljs-keyword">delete</span> packet._header <span class="hljs-comment">// remove before adding to response header as request</span>
} <span class="hljs-keyword">else</span> res._header = {}
res._header.request = clone(packet,<span class="hljs-literal">false</span>)
res._header.responder = {<span class="hljs-attr">name</span>:<span class="hljs-keyword">this</span>.name,<span class="hljs-attr">instanceID</span>:<span class="hljs-keyword">this</span>.id}
res._header.socket = <span class="hljs-keyword">this</span>.address()
<span class="hljs-keyword">if</span> (!res.cmd) res.cmd = <span class="hljs-string">'reply'</span> <span class="hljs-comment">// by default return command is 'reply'</span>
<span class="hljs-keyword">let</span> [err, ser] = <span class="hljs-keyword">await</span> btc(stream.serialize)(res)
<span class="hljs-keyword">if</span> (err) ser = <span class="hljs-keyword">await</span> stream.serialize({ <span class="hljs-attr">error</span>: <span class="hljs-string">'was not able to serialze the res packet'</span>, <span class="hljs-attr">err</span>:err, <span class="hljs-attr">_header</span>:{<span class="hljs-attr">id</span>:res._header.id}})
log.info(<span class="hljs-keyword">await</span> send(ser))
} <span class="hljs-comment">// end process message</span>
}) <span class="hljs-comment">// end connecttion consumer</span>
log.info({<span class="hljs-attr">opts</span>: <span class="hljs-keyword">this</span>.opts},<span class="hljs-string">'socket created'</span>)
<span class="hljs-keyword">return</span> res
}) <span class="hljs-comment">// end super listen callback</span>
} <span class="hljs-comment">// end listen</span>
<span class="hljs-keyword">async</span> _destroy () {
log.info(<span class="hljs-string">'closing down socket'</span>)
<span class="hljs-keyword">await</span> <span class="hljs-keyword">this</span>.close()
log.info(<span class="hljs-string">'all connections closed....exiting'</span>)
process.exit()
}</pre></div></div>
</li>
<li id="section-11">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-11">&#182;</a>
</div>
<p>default packet process, just a simple echo</p>
</div>
<div class="content"><div class='highlight'><pre> <span class="hljs-keyword">async</span> _packetProcess (packet) {
<span class="hljs-keyword">return</span> <span class="hljs-keyword">new</span> <span class="hljs-built_in">Promise</span>(<span class="hljs-function"><span class="hljs-params">resolve</span> =&gt;</span> {
resolve(packet)
})
}</pre></div></div>
</li>
<li id="section-12">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-12">&#182;</a>
</div>
<p>must have a consumer socket bound to use</p>
</div>
<div class="content"><div class='highlight'><pre> <span class="hljs-keyword">async</span> _send(packet) {</pre></div></div>
</li>
<li id="section-13">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-13">&#182;</a>
</div>
<p>timeout already set if sockect cant be drained in 10 secs</p>
</div>
<div class="content"><div class='highlight'><pre> <span class="hljs-keyword">return</span> <span class="hljs-keyword">new</span> <span class="hljs-built_in">Promise</span>(<span class="hljs-function"><span class="hljs-params">resolve</span> =&gt;</span> {
<span class="hljs-keyword">const</span> cb = <span class="hljs-function"><span class="hljs-params">()</span> =&gt;</span> resolve(<span class="hljs-string">'packet written to socket stream'</span>)
<span class="hljs-keyword">if</span> (!<span class="hljs-keyword">this</span>.write(packet)) {
<span class="hljs-keyword">this</span>.once(<span class="hljs-string">'drain'</span>,cb )
} <span class="hljs-keyword">else</span> {
process.nextTick(cb)
}
})
}
<span class="hljs-keyword">async</span> push (packet,id) {
packet._header = { <span class="hljs-attr">id</span>:<span class="hljs-string">'pushed'</span>}
log.info({<span class="hljs-attr">opts</span>:<span class="hljs-keyword">this</span>.opts,<span class="hljs-attr">packet</span>:packet},<span class="hljs-string">'pushing a packet to all connected consumers'</span>)
<span class="hljs-keyword">this</span>.clients.forEach(<span class="hljs-keyword">async</span> (client) =&gt; {
<span class="hljs-keyword">if</span> (client.writable) {
<span class="hljs-keyword">let</span> [err, ser] = <span class="hljs-keyword">await</span> btc(client.stream.serialize)(packet)
<span class="hljs-keyword">if</span> (err) ser = <span class="hljs-keyword">await</span> client.stream.serialize({ <span class="hljs-attr">error</span>: <span class="hljs-string">'was not able to serialze the res packet'</span>, <span class="hljs-attr">err</span>:err, <span class="hljs-attr">_header</span>:{<span class="hljs-attr">id</span>:packet._header.id}})
<span class="hljs-keyword">if</span> (!id || id ===client.ID ) <span class="hljs-keyword">await</span> <span class="hljs-keyword">this</span>._send.bind(client)(ser)
}
})
}
} <span class="hljs-comment">// end class</span></pre></div></div>
</li>
</ul>
</div>
</body>
</html>

View File

@ -1,33 +0,0 @@
import Consumer from '../src/consumer'
// const client1= new Consumer({name:'example-consumer1' })
const client= new Consumer({path:true, name:'example-consumer'})
// let packet = {name: 'client', cmd:'doit', data:'data sent by client'}
// This is your client handler object waiting on a message to do something
const process = async function (packet) {
// return new Promise((resolve) => {
// console.log('====== packet pushed from server ======')
// console.dir(packet)
// // setTimeout(resolve('done'),100)
// resolve('done')
// })
console.log('====== packet pushed from server ======')
console.dir(packet)
// setTimeout(resolve('done'),100)
return Promise.resolve('done')
}
client.registerPacketProcessor(process)
;
(async () => {
console.log(await client.connect())
})().catch(err => {
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)
})

View File

@ -1,25 +1,80 @@
import Consumer from '../src/consumer'
import btc from 'better-try-catch'
// const client1= new Consumer({name:'example-consumer1' })
const client= new Consumer({path:true, name:'example-consumer' })
let packet = {name: 'client', cmd:'doit', data:'data sent by client'}
// const PATH ='/opt/bogus/socket'
const PATH=true
const client= new Consumer({path:PATH, name:'example-consumer', initTimeout:30 })
// This is your client handler object waiting on a message to do something
const process = function (packet) {
console.log(`Packet from ${packet.name} Processed by Socket: ${packet.status}`)
async function processor (packet) {
// console.log('packet being processed at socket', packet)
if (packet.cmd) {
if (this[packet.cmd]) return await this[packet.cmd](packet)
else {
console.log('no processing function for command', packet.cmd)
return {error: 'command has no processing function', packet: packet }
}
}
console.log('no command in packet', packet)
return {error: 'no command in packet', packet: packet }
}
client.registerPacketProcessor(process)
client.registerPacketProcessor(processor)
client.reply = (packet) => {
return new Promise(resolve => {
console.log('generic reply command from server\n',packet.data)
resolve()
})
}
client.onconnect = (packet) => {
return new Promise(resolve => {
console.log('on connect server sent command processed here\n',packet.data)
resolve()
})
}
client.pushed = (packet) => {
return new Promise(resolve => {
console.log('pushed packet\n',packet.status)
resolve()
})
}
client.on('status', event => {
console.log('============ socket status ============')
console.log('status level',event.level)
console.log(event.id)
console.log(event.msg)
console.log(`Consumer is ${event.connected ? 'connected' : 'disconnected'}`)
console.log(`Authenticated? ${event.authenticated || false}`)
console.log('======================================')
})
;
(async () => {
console.log('ready at start',client.ready)
// await Promise.all([client1.connect(),client2.connect()])
await client.connect()
console.log('sending packet ', packet)
console.log('=========\n',await client.send(packet))
// client.end()
let [err, res] = await btc(client.connect)()
if (err) {
console.log('error', err)
} else {
console.log(res)
let packet = {name: 'client', cmd:'doit', data:'sent by client'}
console.log('=============sending a test packet=========\n', packet)
console.log('can also await relay of any send too for processing in sync manner\n',await client.send(packet))
console.log('===============')
}
setTimeout(()=> {
console.log('=============Consumer now Offline=============')
process.exit(1)
}
,120000)
})().catch(err => {
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)

View File

@ -1,37 +0,0 @@
import Consumer from '../src/consumer'
const USOCKET = __dirname + '/sample.sock'
class Client extends Consumer {
constructor(path,opts) {
super(path,opts)
}
async _packetProcess (packet) {
this[packet.cmd](packet)
}
async reply (packet) {
console.log(`Packet from ${packet.name} Processed by Socket: ${packet.status}`)
console.log(`Socket replied with data: ${packet.data}`)
}
}
const client1= new Client({path:true,name:'example-consumer1' })
const client2 = new Client({path:true,name:'example-consumer2'})
let packet1 = {name: 'client1', cmd:'doit', data:'data sent by client1'}
let packet2 = {name: 'client2', cmd:'doit', data:'data sent by client2'}
;
(async () => {
await Promise.all([client1.connect(),client2.connect()])
console.log(await Promise.all([client1.send(packet1),client2.send(packet2)]))
client1.end()
client2.end()
})().catch(err => {
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)
})

0
examples/opts.yaml Normal file
View File

View File

@ -1,42 +0,0 @@
import { Socket } from '../src'
;
(async () => {
class Test extends Socket {
constructor(opts) {
super(opts)
}
async _packetProcess(packet) {
console.log('packet being processed at socket')
if (packet.cmd) return await this[packet.cmd](packet.data,packet.name)
return {error: 'no command in packet', packet: packet }
}
async doit(data,name) {
return new Promise(resolve => {
let res = {}
console.log('data sent to doit = ', data)
res.status ='success'
res.name = name
res.cmd = 'reply'
res.data = 'this would be response data from socket doit function'
resolve(res)
})
}
}
// let test = new Test()
let test = new Test({path:true, conPacket:{onconnect:'this is a packet sent to consumer as soon as it connects'}})
await test.create()
let count = 0
setInterval( () => {
count++
test.push({name:'pushed', count:count, status:'some pushed data'}) },10000)
})().catch(err => {
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)
})

View File

@ -1,5 +1,4 @@
import { Socket as uSocket} from '../src'
// import { fs } from 'mz'
// made key cert into module that also uses environment variables
// const TLS = process.env.TLS || false
@ -10,6 +9,56 @@ import { Socket as uSocket} from '../src'
let Socket = uSocket
class Test extends Socket {
constructor(opts) {
super(opts)
}
async doit(packet) {
return new Promise(resolve => {
let res = {}
console.log('command doit sent with data = ', packet.data)
res.status ='success'
res.cmd = 'reply'
res.data = 'this would be response data from socket doit function'
resolve(res)
})
}
}
// const options = {
// tls: TLS,
// key: await fs.readFile(TLS_KEY_PATH),
// cert: await fs.readFile(TLS_CRT_PATH),
// // This is necessary only if using client certificate authentication.
// // requestCert: true,
// // This is necessary only if the client uses a self-signed certificate.
// // ca: [ fs.readFileSync('client-cert.pem') ]
// }
// const PATH = '/opt/bogus/socket'
const PATH = true
const PUSHES = 3
// options.conPacket = {cmd:'onconnect', data:'this is a packet data sent consumer after handshake/authentification'}
const TOKENS = ['cheetos']
let test = new Test({path:PATH, tokens:TOKENS})
async function processor (packet) {
// console.log('packet being processed at socket', packet)
if (packet.cmd) {
if (this[packet.cmd]) return await this[packet.cmd](packet)
else {
console.log('no processing function for command', packet.cmd)
return {error: 'command has no processing function', packet: packet }
}
}
console.log('no command in packet', packet)
return {error: 'no command in packet', packet: packet }
}
test.registerPacketProcessor(processor)
;
(async () => {
// TODO dynamic import
@ -18,49 +67,24 @@ let Socket = uSocket
// console.log('using TLS')
// }
class Test extends Socket {
constructor(opts) {
super(opts)
}
async _packetProcess(packet) {
console.log('packet being processed at socket')
if (packet.cmd) return await this[packet.cmd](packet.data,packet.name)
return {error: 'no command in packet', packet: packet }
}
async doit(data,name) {
return new Promise(resolve => {
let res = {}
console.log('data sent to doit = ', data)
res.status ='success'
res.name = name
res.cmd = 'reply'
res.data = 'this would be response data from socket doit function'
resolve(res)
})
}
}
// const options = {
// tls: TLS,
// key: await fs.readFile(TLS_KEY_PATH),
// cert: await fs.readFile(TLS_CRT_PATH),
// // This is necessary only if using client certificate authentication.
// // requestCert: true,
// // This is necessary only if the client uses a self-signed certificate.
// // ca: [ fs.readFileSync('client-cert.pem') ]
// }
let options = {path:true}
// let test = new Test()
let test = new Test(options)
// test.addTokens('cheetos')
await test.create()
console.log('ready')
let count = 0
const push = setInterval( () => {
count++
test.push({cmd:'pushed', count:count, status:`pushing some data ${count} of ${PUSHES}`})
if (count >PUSHES) {
clearInterval(push)
test.push({cmd:'pushed',status:'now will simulate server going offline by stopping to send ping for 15 seconds'})
test.disablePing()
setTimeout( () => {
test.enablePing()
},15000)
}
},3000)
})().catch(err => {
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)

View File

@ -1,6 +1,6 @@
{
"name": "@uci/socket",
"version": "0.2.12",
"version": "0.3.3",
"description": "JSON packet intra(named)/inter(TCP) host communication over socket",
"main": "src",
"scripts": {
@ -8,13 +8,15 @@
"test": "mocha -r esm --timeout 10000 test/*.test.mjs",
"testlog": "UCI_DEV=true mocha -r esm --timeout 10000 test/*.test.mjs",
"testci": "istanbul cover ./node_modules/.bin/_mocha --report lcovonly -- -R spec --recursive && codecov || true",
"s": "UCI_ENV=dev node -r esm examples/server",
"sp": "UCI_DEV=true node -r esm examples/server-push",
"devs": "SOCKETS_DIR=/opt/sockets UCI_DEV=true ./node_modules/.bin/nodemon -r esm-e mjs examples/server",
"c": "UCI_ENV=dev node -r esm examples/client",
"cp": "UCI_DEV=true node -r esm examples/client-push",
"devc": "SOCKETS_DIR=/opt/sockets UCI_DEV=true node -r esm examples/client",
"c2": "node -r esm examples/client2"
"s": "node -r esm examples/server",
"devs": "UCI_ENV=dev ./node_modules/.bin/nodemon -r esm examples/server",
"devs:anon": "UCI_ANON=true npm run devs",
"devs:anon:debug": "UCI_LOG_LEVEL=debug npm run devs:anon",
"devs:debug": "UCI_LOG_LEVEL=debug npm run devs",
"client": "node -r esm examples/client",
"devc": "UCI_ENV=dev ./node_modules/.bin/nodemon -r esm examples/client",
"devc:token": "UCI_CLIENT_TOKEN='cheetos' npm run devc",
"devc:debug": "UCI_LOG_LEVEL=debug npm run devc"
},
"author": "David Kebler",
"license": "MIT",
@ -40,17 +42,17 @@
"devDependencies": {
"chai": "^4.2.0",
"chai-as-promised": "^7.1.1",
"codecov": "^3.1.0",
"esm": "^3.0.84",
"istanbul": "^0.4.5",
"mocha": "^5.2.0",
"nodemon": "^1.18.6"
"esm": "^3.2.25",
"mocha": "^8.0.1",
"nodemon": "^2.0.4"
},
"dependencies": {
"@uci-utils/logger": "0.0.14",
"@uci-utils/logger": "^0.0.18",
"better-try-catch": "^0.6.2",
"clone": "^2.1.2",
"death": "^1.1.0",
"make-dir": "^3.0.0"
"delay": "^4.4.0",
"make-dir": "^3.1.0",
"p-reflect": "^2.1.0"
}
}

View File

@ -1,6 +1,7 @@
import { Socket } from 'net'
import path from 'path'
import { promisify } from 'util'
// import pause from 'delay'
import btc from 'better-try-catch'
import JsonStream from './json-stream'
@ -25,15 +26,17 @@ class SocketConsumer extends Socket {
* @param {object} [opts={}] test
*/
constructor(opts = {}) {
constructor(options = {}) {
super()
const opts = Object.assign({},options) // don't allow mutation
this.id = opts.id || opts.name || 'socket:' + new Date().getTime()
this.name = opts.name || 'a socket consumer'
log = logger({
file: 'src/consumer.js',
class: 'Consumer',
name: 'socket',
id: this.id
})
this.id = opts.id || opts.name || 'socket:' + new Date().getTime()
if (!opts.path) {
if(!opts.host) log.warn({method:'constructor', line:38, opts: opts, msg:'no host supplied using localhost...use named piped instead - opts.path'})
opts.host = opts.host || '127.0.0.1'
@ -45,158 +48,210 @@ class SocketConsumer extends Socket {
opts.path = path.join(DEFAULT_PIPE_DIR, opts.path)
}
this.opts = opts
this._data = {id:this.id, name:opts.name||this.id}
Object.assign(this._data,opts.data||{}) // holds consumer specific data that will be passed to server in header and on connection
// default is keepAlive true, must set to false to explicitly disable
// if keepAlive is true then consumer will also be reconnecting consumer
// initTimeout > 4 means socketInit will return a promise
this.initTimeout = opts.initTimeout > 4 ? opts.initTimeout * 1000 : null
this.pingFailedTimeout = opts.pingFailedTimeout * 1000 || 5000
this.reconnectLimit = opts.reconnectLimit || 0
this.retryWait = opts.retryWait==null ? 5000 : opts.retryWait * 1000
this.heartBeat = !!process.env.HEARTBEAT || opts.heartBeat
this.keepAlive = 'keepAlive' in opts ? opts.keepAlive : true
this._ready = false
this.timeout = opts.timeout || 60 // initial connect timeout in secs and then rejects
this.wait = opts.wait || 2
this.stream = new JsonStream()
// bind to class for other class functions
this.connect = this.connect.bind(this)
this.close = promisify(this.end).bind(this)
this.__ready = this.__ready.bind(this)
// this._write = this._write.bind(this)
this._reconnectCount = 0
this._connected = false
this._authenticated = false
this._active = false
this._connection = 'offline'
this._first = true // first connection or not
this._pingTimeout // value sent from socket upon connect
}
async connect() {
return new Promise((resolve, reject) => {
get connected() { return this._connected}
// FIXME change active
get active() { return this._authenticated }
get connection() { return this._connection }
let initial = true
notify(state, moreOpts={}) {
this._connection = state || this._connection
let opts = {state:this._connection, id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected}
Object.assign(opts,moreOpts)
this.emit('connection:socket',opts)
}
// this is only for initial connection
const initTimeout = setTimeout(() => {
log.fatal({method:'connect', line:69, opts: this.opts, msg:`unable to connect in ${this.timeout}s`})
reject(
{ opts: this.opts },
`unable to connect to socket server in ${this.timeout}secs`
)
}, this.timeout * 1000)
log(level='debug', msg) {
let opts = {level:level, msg:msg, id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected, state:this._connection}
if (typeof msg !== 'string') Object.assign(opts,msg)
this.emit('log',opts)
log[level](opts)
}
async connect(timeout=0) {
this.initTimeout = timeout > 4 ? timeout * 1000 : this.initTimeout
this.notify('starting')
if (this.opts.host === '127.0.0.1'|| this.opts.host ==='localhost') {
let msg ='tcp consumer on same machine as host, use named Pipe(Unix) Socket Instead'
log.warn({method:'connect', msg:'msg'})
this.log('warn',msg)
}
this.once('error', errorHandler.bind(this))
this.once('connect',connectHandler.bind(this))
super.connect(this.opts)
// returns promise for initial connection when initTimeout is not zero with reject on timeout
if (this._first && this.initTimeout) {
let initTimeout = {}
this._first=false
return new Promise((resolve, reject) => {
initTimeout = setTimeout(async () => {
this.disconnect()
let msg=`unable to connect initially to socket server in ${this.initTimeout/1000} secs, giving up no more attempts`
this.log('fatal',msg)
this.notify('failed',{timeout:this.initTimeout, msg:msg})
reject({ error:msg, opts: this.opts})
}
, this.initTimeout)
const successHandler = (ev) => {
if (ev.state === 'connected') {
clearTimeout(initTimeout)
this.removeListener('connection:socket',successHandler)
this.log('info','initial connection successfull')
resolve({opts: this.opts, msg: 'initial connection successfull'})
}
}
this.on('connection:socket',successHandler.bind(this))
this.once('connect', async () => {
clearTimeout(initTimeout)
this._listen()
log.debug({method:'connect', line:80, opts: this.opts, msg:'initial connect waiting for socket ready handshake'})
this.setKeepAlive(this.keepAlive, 3000)
let [err, res] = await btc(isReady).bind(this)(this.__ready,this.wait,this.timeout)
if (err) reject(err)
initial = false
log.debug({method:'connect', line:85, msg:'handshake to socket done, TODO authenticating'})
// TODO authenticate here by encrypting a payload with private key and sending that.
// await btc(authenticate)
this.emit('connected') // for end users to take action
resolve(res)
})
}
this._first=false
return 'connection in progress'
let reconTimeout
// function that sets a reconnect timeout
const reconnect = () => {
reconTimeout = setTimeout(() => {
this.removeAllListeners()
this.stream.removeAllListeners()
this.destroy()
connect()
}, this.wait * 1000)
}
} // end connect
// connection function that sets listeners and deals with reconnect
const connect = () => {
if (this.opts.host === '127.0.0.1'|| this.opts.host ==='localhost')
log.warn({method:'connect', line:107, msg:'tcp consumer on same machine as host, use named Pipe(Unix) Socket Instead'})
// manual disonnect
async disconnect() {
clearTimeout(this._errorRetry)
this.removeAllListeners('ping')
this.removeAllListeners('connect')
this.removeAllListeners('error')
this.removeAllListeners('data')
this.removeAllListeners('pushed')
this.stream.removeAllListeners()
this._connected=false
this._authenticated=false
this.notify('disconnected')
this._first = true
}
if(!initial) {
this.once('connect', async () => {
clearTimeout(reconTimeout)
this._listen()
log.debug({method:'connect', line:113, msg:'reconnected waiting for socket ready handshake'})
this.setKeepAlive(this.keepAlive, 3000)
let [err, res] = await btc(isReady).bind(this)(this.__ready,this.wait,this.timeout)
if (err) reject(err)
log.debug({method:'connect', line:69, msg:'rehandshake done, reauthenticating'})
// TODO authenticate here by encrypting a payload with private key and sending that.
// await btc(authenticate)
this.emit('connected')
this.emit('reconnected') // emit also reconnected for special end user action
resolve(res)
})
}
this.on('error', async err => {
if (err.code !== 'EISCONN') {
this._ready = false
this.emit('ready', false)
log.debug({ method:'connect', line:130, error: err.code, msg:`connect error ${err.code}, attempting reconnect`})
reconnect()
}
else {
this._ready = true
this.emit('ready', true)
log.error({method:'connect', line:69, msg:'reconnected to socket, ready to go again'})
}
})
if (this.keepAlive) { // only attempt reconnect is keepAlive is set which it is by default
this.on('end', async () => {
log.error({method:'connect', line:142, msg:'socket (server) terminated unexpectantly, keepalive set, wait for server to come online'})
this._ready = false
this.emit('error', { code: 'DISCONNECTED' })
})
}
// attempt connection
log.debug({method:'connect', line:149, opts: this.opts, msg:`attempting to connect ${this.id} to socket`})
super.connect(this.opts)
} // end connect function
connect() // initial connect request
}) //end promise
async reconnect () {
if (this._connected) this.disconnect()
this._first = false
if ( this.reconnectLimit === 0 || this._reconnectCount < this.reconnectLimit ) {
this._reconnectCount += 1
this.log('warn',`Attempting a reconnect: attempt ${this._reconnectCount} of ${this.reconnectLimit ||'unlimited'}`)
this.connect()
}
else {
this.log('fatal',`${this.reconnectLimit} reconnect attempts exceeded no more tries`)
}
}
async send(ipacket) {
return new Promise(async resolve => {
if (!this._ready) resolve({ error: 'socket consumer not connected, aborting send' })
let packet = Object.assign({}, ipacket) // need to avoid mutuation for different consumers using same packet instance
setTimeout(() => {resolve({ error: 'no response from socket in 10sec' })}, 10000)
packet._header = {
id: Math.random()
.toString()
.slice(2), // need this for when multiple sends for different consumers use same packet instanceack
sender: { name: this.name, instanceID: this.id },
path: this.opts.path,
port: this.opts.port,
host: this.opts.host
}
let [err, res] = await btc(this.stream.serialize)(packet)
if (err)
resolve({error: 'unable to serialize packet for sending',packet: packet})
await this.__write(res)
this.once(packet._header.id, async function(reply) {
let res = await this._packetProcess(reply)
if (!res) { // if packetProcess was not promise
res = reply
log.debug({method:'send', line:180, msg:'consumer function was not promise returning further processing may be out of sequence'})
if (!this.active) {
resolve({ error: 'socket consumer not connected, aborting send' })
} else {
let packet = Object.assign({}, ipacket) // need to avoid mutuation for different consumers using same packet instance
setTimeout(() => {resolve({ error: 'no response from socket in 10sec' })}, 10000)
packet._header = {
id: Math.random()
.toString()
.slice(2), // need this for when multiple sends for different consumers use same packet instanceack
sender: { data: this._data, instanceID: this.id},
path: this.opts.path,
port: this.opts.port,
host: this.opts.host
}
resolve(res)
}) //end listener
let [err, res] = await btc(this.stream.serialize)(packet)
if (err)
resolve({error: 'unable to serialize packet for sending',packet: packet})
if (this.active && this.writable) {
let res2 = await this.__write(res)
if (res2.error) resolve(res2)
// if no write error then wait for send response
this.once(packet._header.id, async function(reply) {
let res = await this._packetProcess(reply)
if (!res) { // if packetProcess was not promise
res = reply
log.debug({method:'send', line:180, msg:'consumer function was not promise returning further processing may be out of sequence'})
}
resolve(res) // resolves processed packet not return packet
}) //end listener
}
}
})
}
// TODO register user alt stream processor (emit 'message' with JSON, serialize function, onData method for raw socket chucks)
// TODO register authenciation function (set up default)
registerPacketProcessor(func) {
this._packetProcess = func
}
// func should return an object the server expects
registerAuthenticator (func) {
this._authenticate = func
}
// stream needs an .onData method and will be stream bound handler for data event
resgisterStreamProcessor (func) {
this.stream = func
}
// PRIVATE METHODS
// default authentication using a simple token
_authenticate () {
return { token: process.env.UCI_CLIENT_TOKEN || this.token || 'default' }
}
async _authenticateSend (authPacket={}) {
return new Promise(async resolve => {
setTimeout(() => {resolve({ error: 'no response from socket in 10sec' })}, 10000)
let [err, serPacket] = await btc(this.stream.serialize)(authPacket)
if (err)
resolve({error: 'unable to serialize packet for sending',packet: authPacket})
this.stream.once('message',(res) => {
resolve(res)
})
let res = await this.__write(serPacket)
if (res.error) resolve(res)
// otherwise wait for message listener above to return
})
}
async __write(packet) {
// timeout already set if sockect can't be drained in 10 secs
return new Promise(resolve => {
const cb = () => resolve('packet written to consumer side socket stream ')
return new Promise((resolve) => {
// most stream write errors will be caught by socket error listener but make sure
if (!this.writable) { // stream is writeable as can't catch epipe errors there
resolve({error:'socket stream closed can not send packet'})
return
}
const cb = () => resolve({response:'packet written to consumer side socket stream '})
if (!super.write(packet)) {
this.once('drain', cb)
} else {
@ -205,61 +260,138 @@ class SocketConsumer extends Socket {
})
}
__ready() {
return this._ready
}
async _listen() {
log.debug('listening for incoming packets from socket')
// listen for pushed packets
this.on('pushed', async function(packet) {
// TODO do some extra security here?
let res = await this._packetProcess(packet)
if (!res) {
// if process was not promise returning then res will be undefined
log.debug('consumer packet processing function was not promise returning')
}
})
// listen on socket stream
this.on('data', this.stream.onData)
this.stream.on('message', messageProcess.bind(this))
async function messageProcess(packet) {
log.debug('incoming packet from socket',packet)
if (packet._handshake) {
this._ready = true
return
}
// TODO send back ack with consumer ID and authorization and wait
// when authorized drop through here to emit
this.emit(packet._header.id, packet)
}
}
// default packet process just a simple console logger. ignores any cmd: prop
_packetProcess(packet) {
console.log('default consumer processor -- log packet from socket to console')
console.log('replace by calling .registerPacketProcessor(func) with your function')
console.dir(packet)
}
} // end class
export default SocketConsumer
// Helper Functions
// wait until a passed ready function returns true
function isReady(ready, wait = 30, timeout = 1000) {
let time = 0
return new Promise((resolve, reject) => {
(function waitReady() {
if (time > timeout)
return reject(
`timeout waiting for socket ready handshake - ${timeout}ms`
)
if (ready()) return resolve('ready')
log.debug({function:'isReady', line:261, msg:`waiting ${wait}ms for handshake`})
time += wait
setTimeout(waitReady, wait)
})()
})
// CONNECTION HANDLERS
async function connectHandler () {
this.removeAllListeners('error') // turn off error handler during handshake
this._doneAuthenticate = setTimeout(() =>{
let msg ='authentication not completed in 3 secs, forcing reconnect attempt'
this.notify('failed',{msg:msg})
this.log('warn',msg)
this.reconnect()
},3000)
this.on('data', this.stream.onData)
this.stream.on('message', handshake.bind(this))
this.log('info','in process of connecting waiting for socket handshake/authenticate')
this.notify('connecting')
}
async function handshake (packet) {
if (packet._handshake) {
this._connected = true
this.notify('handshake')
const authPacket = Object.assign(this._authenticate() || {}, {_authenticate:true, data:this._data})
// console.log('----------------authentication packet---------------',authPacket)
let res ={}
if (!this.writable) res.error ='socket stream not writable'
else res = await this._authenticateSend(authPacket)
this.stream.removeAllListeners('message')
clearTimeout(this._doneAuthenticate)
if (res.error) {
let msg =`error during authentication ${res.error}, attempting reconnect in ${this.retryWait/1000}s to see if error clears`
this.notify('error',{msg:msg})
this.log('error',msg)
this._errorRetry = setTimeout(this.reconnect.bind(this),this.retryWait)
} else {
if (!res.authenticated) {
let msg =`authentication failed: ${res.reason}, disconnecting with no reconnect`
this.notify('failed',{msg:msg})
this.log('error',msg)
this.disconnect()
}
else {
this._authenticated = res.authenticated
let msg ='authentication succeeded connection ready'
this.notify('connected',{msg:msg})
this.log('info',msg)
this._reconnectCount = 0
this.stream.on('message', messageHandler.bind(this)) // reset default message handler
this.on('pushed', pushHandler.bind(this) )
this.once('error', errorHandler.bind(this)) // listen for errors on authenticated socket
if (this.keepAlive) { // only attempt reconnect if keepAlive is set which it is by default
this.on('ping',pingHandler.bind(this))
this.setKeepAlive(this.keepAlive,3000) // keep connection alive unless disabled
}
if (this.opts.conPacket) (this.send(this.conPacket))
}
}
}
}
// general message handler handler
function messageHandler(packet) {
if (packet._header.id !== 'ping') { // ping has it's own listner
let obj = { msg:'incoming packet from socket sever',packet:packet}
this.log('trace',obj)
}
this.emit(packet._header.id, packet)
}
// assume all errors are fatal and the socket needs to be disconnected/reconnected
async function errorHandler (err) {
this.disconnect()
let msg = `error, this consumer is disconnected, trying reconnect in ${this.retryWait/1000} secs`
this.notify('error',{error:err,msg:msg})
this.log('error',msg)
this._errorRetry = setTimeout(this.reconnect.bind(this),this.retryWait)
}
// const pushHandler = async (packet) => {
async function pushHandler (packet) {
// TODO do some extra security here?
let msg = {msg:'packed was pushed from socket sever, processing', packet:packet}
this.log('trace',msg)
let res = await this._packetProcess(packet)
if (!res) {
// if process was not promise returning then res will be undefined
log.debug('consumer packet processing function was not promise returning')
}
}
// const pingHandler = async (packet) => {
async function pingHandler (packet) {
if (this.heartBeat) console.log('lub dub')
clearTimeout(this._ping)
this._pingTimeout= (packet.pingInterval || 5000) + 1000 // set value from server
monitorPing.call(this)
}
async function pingFailedHandler () {
clearTimeout(this._pingFailed)
clearTimeout(this._ping) // clear any others set
this.removeAllListeners('ping')
this.on('ping',pingHandler.bind(this))
let msg = 'ping has been received again, back to normal connection'
this.notify('connected',{msg:msg})
this.log('info',msg)
}
function monitorPing () {
this._ping = setTimeout( () => {
this.removeAllListeners('ping')
let msg = `ping failed waiting ${this.pingFailedTimeout/1000} secs to before forced reconnect`
this.notify('offline',{msg:msg})
this.log('warn',msg)
this.on('ping',pingFailedHandler.bind(this))
this._pingFailed = setTimeout (() => {
this.removeAllListeners('ping')
let msg =`no ping received for ${this.pingFailedTimeout/1000} secs, force disconnect/reconnect`
this.notify('failed',{msg:msg})
this.log('warn',msg)
this.reconnect()
// this.emit('error',{code:'PING_FAILED'})
}, this.pingFailedTimeout)
},this._pingTimeout)
}

View File

@ -18,14 +18,26 @@ class JsonStream extends EventEmitter {
this._delimeter = opts.delimiter || '#'
this.onData = this.onData.bind(this)
this.serialize = this.serialize.bind(this)
this._state = 'online'
this._queue = []
}
get state () {return this._state}
offline () { this._state = 'offline' }
pause () {this._state = 'paused'} // queue messages in handler
resume () {
// emit the messages in the queue
this._state='online'
}
online() {this._state = 'online'}
onData(data) {
data = decoder.write(data)
try {
this._handleData(data)
} catch (e) {
this.emit('error', { error: e })
} catch (err) {
// emit an error on the socket that handled with other socket errors
this.emit('error', err)
}
}
@ -51,7 +63,7 @@ class JsonStream extends EventEmitter {
if (isNaN(this._contentLength)) {
this._contentLength = null
this._buffer = ''
var err = new Error(
let err = new Error(
'Invalid content length supplied (' +
rawContentLength +
') in: ' +
@ -64,12 +76,12 @@ class JsonStream extends EventEmitter {
}
}
if (this._contentLength != null) {
var length = Buffer.byteLength(this._buffer, 'utf8')
let length = Buffer.byteLength(this._buffer, 'utf8')
if (length == this._contentLength) {
this._handleMessage(this._buffer)
} else if (length > this._contentLength) {
var message = this._buffer.substring(0, this._contentLength)
var rest = this._buffer.substring(this._contentLength)
let message = this._buffer.substring(0, this._contentLength)
let rest = this._buffer.substring(this._contentLength)
this._handleMessage(message)
this.onData(rest)
}
@ -83,14 +95,18 @@ class JsonStream extends EventEmitter {
try {
message = JSON.parse(data)
} catch (e) {
var err = new Error(
let err = new Error(
'Could not parse JSON: ' + e.message + '\nRequest data: ' + data
)
err.code = 'E_INVALID_JSON'
throw err
}
message = message || {}
this.emit('message', message)
// console.log('stream message', message, this._state)
if (this._stream ==='pause') {
if (message._header.id !== 'ping') this.queue.shift(message)
}
if(this._state==='online') this.emit('message', message)
}
}

View File

@ -5,6 +5,7 @@ import path from 'path'
// npmjs modules
import mkdir from 'make-dir'
import btc from 'better-try-catch'
import pReflect from 'p-reflect'
import _ON_DEATH from 'death' //this is intentionally ugly
import JSONStream from './json-stream'
import clone from 'clone'
@ -39,7 +40,7 @@ export default function socketClass(Server) {
* @param {String} options.host a tcp host name nornally not used as 0.0.0.0 is set by default
* @param {String} options.port a tcp
* @param {String | Boolean} options.path xeither full path to where socket should be created or if just 'true' then use default
* @param {Boolean} options.clientTracking track connected clients for push notifications - default: true
* @param {Boolean} options.consumerTracking track connected consumers for push notifications - default: true
* @param {Object} options.conPacket A json operson's property
*
*/
@ -47,6 +48,7 @@ export default function socketClass(Server) {
super(opts)
delete opts.key
delete opts.cert
this.name = opts.name
this.id = opts.id || opts.name || 'socket:' + new Date().getTime()
if (!opts.path) {
opts.host = opts.host || '0.0.0.0'
@ -57,35 +59,46 @@ export default function socketClass(Server) {
if (path.dirname(opts.path) === '.') // relative path sent
opts.path = path.join(DEFAULT_PIPE_DIR, opts.path)
}
this.clientTracking = opts.clientTracking || true
this.clients = [] // track consumers (i.e. clients)
this.defaultReturnCmd = opts.defaultReturnCmd
this.allowAnonymous = (!opts.tokens || !!process.env.UCI_ANON || opts.allowAnonymous) ? true : false
this.tokens = opts.tokens || []
this.keepAlive = 'keepAlive' in opts ? opts.keepAlive : true
this.pingInterval = opts.pingInterval === false ? 0 : (opts.pingInterval * 1000 || 5000)
this.consumers = new Map() // track consumers (i.e. clients) TODO use a Map
this.nextConsumerID = 0 // incrementer for default initial consumer ID
this.conPackets = opts.conPackets || [opts.conPacket]
this.opts = opts // for use to recover from selected errors
this.errorCount = 0
//self bindings
this._listen = this._listen.bind(this)
this.create = this.create.bind(this)
this.authenticateConsumer = this.authenticateConsumer.bind(this)
this._authenticate = this._authenticate.bind(this)
this.close = promisify(this.close).bind(this)
log = logger({
package:'@uci/socket',
file: 'src/socket.js',
class: 'Socket',
name: 'socket',
id: this.id
})
} // end constructor
get active() { return this.listening }
/**
* create - Description
*
* @returns {type} Description
*/
async create() {
this.emit('socket',{state:'creating', msg:'creating socket for consumers to connect'})
return new Promise(async (resolve, reject) => {
// set up a couple ways to gracefully destroy socket process is killed/aborted
_ON_DEATH(async () => {
log.error({method:'create', line:84, msg:'\nhe\'s dead jim'})
await this._destroy()
})
process.once('SIGUSR2', async () => {
await this._destroy
await this._destroy()
process.kill(process.pid, 'SIGUSR2')
})
@ -96,30 +109,72 @@ export default function socketClass(Server) {
// if TCP socket should already be dead
let [err, res] = await btc(promisify(fileDelete))(this.opts.path)
if (!err) {
log.debug({method:'create', line:99, res: res, socket: this.opts.path, msg:'socket already exists.....deleted'})
return await this._listen(this.opts)
log.info({method:'create', line:99, res: res, socket: this.opts.path, msg:'socket already exists.....deleted'})
// try again
this.removeAllListeners('listening')
resolve(await this.create())
}
log.error({method:'create', line:102, err: err, msg:'error deleting socket. Can not establish a socket'})
return err
}
}
if (err.code === 'EACCES') {
log.debug({method:'create', line:107, socket: this.opts.path, msg:'directory does not exist...creating'})
await mkdir(path.dirname(this.opts.path))
log.debug({method:'create', line:109, socket: this.opts.path, msg:'directory created'})
return await this._listen(this.opts)
this.removeAllListeners('listening')
resolve(await this.create())
}
// otherwise fatally exit
log.error({method:'create', line:113, err:err, msg:'error creating socket'})
log.error({method:'create', line:113, err:err, opts:this.opts, msg:`error creating socket server ${this.name}`})
reject(err)
})
let [err, res] = await btc(this._listen)(this.opts)
if (err) reject(err)
resolve(res)
this.once('listening', () => {
this.on('error', err => {
this.errorCount +=1 // log errors here
this.errors.push(err)
if(this.errorCount>2 && this.errorCount<6) {
let errors= {level:'warn',msg:'something bad maybe going on, 3 errors', errors:this.errors}
this.emit('socket',{state:'error', msg:'2 to 5 socket errors', errors:this.errors})
this.emit('log', errors)
log.error(errors)
}
if(this.errorCount>5) {
let errors = {level:'fatal',msg:'something fatal is going on, 6 errors', errors:this.errors}
log.fatal(errors)
this.removeAllListeners('listening')
this.listening=false
this.close(() => {
this.emit('socket',{state:'offline', msg:'too many socket errors no longer listening for consumers to connect'})
})
this.emit('log', errors)
}
})
let msg = `socket ready and listening ${typeof this.address() ==='string' ? `at ${this.address()}` : `on port ${this.address().port}`}`
this.emit('socket',{state:'listening', msg:msg})
let obj = {method:'create', line:54, msg:msg}
log.info(obj)
this.on('connection', this._connectionHandler.bind(this))
resolve(msg)
})
super.listen(this.opts)
this.enablePing()
}) // end creeate promise
} // end create
async stop() {
return new Promise(function(resolve) {
this.removeAllListeners('listening')
this.listening=false
this.close(() => {
this.emit('socket',{state:'offline', msg:'manually closed socket on request'})
resolve('socket is offline')
})
})
}
/**
* registerPacketProcessor - Description
* @public
@ -130,113 +185,322 @@ export default function socketClass(Server) {
this._packetProcess = func
}
enablePing () {
if (this.pingInterval > 499) {
this._ping = setInterval( async () =>{
if (this.consumers.size > 0) this.push({pingInterval:this.pingInterval},{packetId:'ping'})
},this.pingInterval)
}
}
disablePing() {
clearInterval(this._ping)
}
addTokens(tokens) {
if (typeof tokens ==='string'){
tokens = tokens.split(',')
}
this.tokens = this.tokens.concat(tokens)
if (this.tokens.length>0) this.allowAnonymous = false
}
removeTokens(tokens) {
if (typeof tokens ==='string'){
if (tokens === 'all') {
this.tokens = []
this.allowAnonymous = true
return
}
tokens = tokens.split(',')
}
this.tokens = this.tokens.filter(token => !tokens.includes(token))
if (this.tokens.length===0) {
log.warn({msg:'all tokens have been removed, switching to allow anonymous connections'})
this.allowAnonymous = true
}
}
registerTokenValidator (func) {
this.allowAnonymous = false
this._validateToken = func
}
registerAuthenticator (func) {
this.allowAnonymous = false
this._authenticate = func
}
/**
* push - pushes a supplied UCI object packet to all connected clients
* push - pushes a supplied UCI object packet to all connected consumers
*
* @param {object} packet Description
* @param {string} id the header id string of the pushed packet, default: 'pushed'
*
*/
async push(packet, id) {
packet._header = { id: id || 'pushed' }
log.debug({method:'push', line:142, opts: this.opts, packet: packet, msg:'pushing a packet to all connected consumers'})
this.clients.forEach(async client => {
if (client.writable) {
let [err, ser] = await btc(client.stream.serialize)(packet)
if (err)
ser = await client.stream.serialize({
error: 'was not able to serialze the res packet',
err: err,
_header: { id: packet._header.id }
})
if (!id || id === client.ID) await this._send.bind(client)(ser)
// TODO support multiple consumers in options
async push(packet={},opts={}) {
if (this.consumers.size > 0) {
packet._header = {id: opts.packetId || 'pushed'}
let consumers = []
if ( opts.consumers || opts.consumer ) {
if (opts.consumer) opts.consumers = [opts.consumer]
consumers = Array.from(this.consumers).filter(([sid,consumer]) =>
opts.consumers.some(con=> {
// console.log('filtering consumers', consumer.sid,consumer.data,con)
return (
con === ( (consumer.data ||{}).name || (consumer.data ||{}).id ) ||
con.sid=== sid ||
con.name === (consumer.data ||{}).name ||
con.id === (consumer.data ||{}).id
)
}
)
).map(con=>con[1])
// console.log('custom consumers',consumers.length)
} else consumers = Array.from(this.consumers.values())
// if (!opts.packetId) {
// console.log('socket class push',packet,opts,consumers.length)
// console.log('consumer for push', consumers.map(consumer=>(consumer.data ||{}).name))
// }
consumers = consumers.filter(consumer=>consumer.writable)
const send = consumer => this._send(consumer,packet)
const res = await Promise.all(consumers.map(send).map(pReflect))
const success = res.filter(result => result.isFulfilled).map((result,index) => [consumers[index].name,result.value])
const errors =res.filter(result => result.isRejected).map((result,index) => [consumers[index].name,result.reason])
this.emit('log',{level:errors.length? 'error': packet._header.id==='ping'?'trace':'info', msg:'packet was pushed', socket:this.name||this.id, errors:errors, packet:packet, success:success, headerId:packet._header.id})
} else {
this.emit('log',{level:'debug', msg:'no connected consumers packet push ignored', packet:packet})
// log.debug({method:'push', id:packet._header.id, opts: this.opts, packet: packet, msg:'no connected consumers, push ignored'})
}
}
removeConsumer (sid) {
let consumer=this.consumers.get(sid)
this.emit('log',{level:'info', msg:'a consumer disconnected', consumer:consumer.data, sid:consumer.sid})
this.emit('connection:consumer',{state:'disconnected', msg:'a consumer disconnected', consumer:consumer.data, sid:consumer.sid})
consumer.removeAllListeners()
consumer.stream.removeAllListeners()
this.consumers.delete(sid)
log.warn({msg:'consumer removed from tracking',sid:sid, curConsumerCount:this.consumers.size})
}
async authenticateConsumer(consumer) {
return new Promise(async (resolve, reject) => {
// when consumer gets the handshake they must follow with authentication
consumer.stream.on('message', authenticate.bind(this,consumer))
let [err] = await btc(this._send)(consumer,{_handshake: true, sid:consumer.sid})
if (err) {
log.error({msg:'error in handshake send', error:err})
reject(err)
}
async function authenticate (consumer,packet) {
log.debug({msg:`authentication packet from consumer ${consumer.name}:${consumer.id}:${consumer.sid}`, packet:packet})
consumer.stream.removeAllListeners('message')
if (!packet._authenticate) reject('first consumer packet was not authentication')
else {
let [err, res] = await btc(this._authenticate)(packet)
consumer.authenticated = this.allowAnonymous ? 'anonymous' : (err ? false : res)
consumer.data = packet.data || {}
consumer.name = packet.name || consumer.data.name
consumer.id = packet.id || consumer.data.id
// console.log('-------------------Inbound Consumer Authenticated---------------------------')
// console.log(packet)
// console.log(consumer.authenticated, consumer.name,consumer.id,consumer.data)
// console.log('--------------------------------------------------------')
packet.authenticated = consumer.authenticated
packet.reason = err || null
log.debug({msg:'sending authorization result to consumer', packet:packet})
this._send(consumer,packet) // send either way
if (err && !this.allowAnonymous) {
log.info({msg:`consumer ${consumer.data.name} authentication failed`, name:consumer.name, id:consumer.id, data:consumer.data, consumer_sid:consumer.sid, reason:err})
reject(packet.reason)
}
else {
log.info({msg:`consumer ${consumer.name} authenticated successfuly`, name:consumer.name, id:consumer.id, data:consumer.data})
if (this.allowAnonymous) log.warn({msg:`consumer ${consumer.data.name}, connected anonymously`})
resolve(consumer.authenticated)
}
}
}
})
}
async _listen(opts) {
super.listen(opts, async (err, res) => {
if (err) return err
// this gets called for each client connection and is unique to each
this.on('connection', async socket => {
const stream = new JSONStream()
socket.stream = stream // need this to track clients
let send = this._send.bind(socket)
if (this.clientTracking) this.clients.push(socket)
// TODO add 'close' listener to socket to remove from this.clients
log.debug({method:'_listen', line:167, msg:'new consumer connecting'})
log.debug(await send(await stream.serialize({ _handshake: true })))
if (this.opts.conPacket) {
this.opts.conPacket._header = { id: 'pushed' }
log.debug({method:'_listen', line:171, conPacket: this.opts.conPacket, msg:'pushing a preset command to just connected consumer'})
send(await stream.serialize(this.opts.conPacket)) // send a packet command on to consumer on connection
}
socket.on('data', stream.onData)
// TODO need to start error listener for stream so errors can be processed
stream.on('message', messageProcess.bind(this, socket))
// private methods
async function messageProcess(client, packet) {
log.debug({method:'_listen', line:179, packet: packet, msg:'incoming packet on socket side'})
let res = {}
if (this.clientTracking && packet.clientID) {
client.ID = packet.clientID
res.cmd = 'ackID'
} else {
res = (await this._packetProcess(clone(packet))) || {}
if (Object.keys(res).length === 0)
res = {
error:
// default validator
_validateToken (token) {
if (token) return this.tokens.includes(token)
return false
}
// default authenticator - reject value should be reason which is returned to consumer
async _authenticate (packet) {
if (!this._validateToken(packet.token)) return Promise.reject('invalid token')
return true
}
// async _connectionHandler({consumer, server}) { // this gets called for each consumer connection and is unique to
async _connectionHandler(consumer) { // this gets called for each consumer connection and is unique to each
const stream = new JSONStream()
consumer.stream = stream
consumer.data = {}
consumer.connected = true
// add listeners
consumer.on('error', (err) => {
log.error({msg:'consumer connection error',error:err})
// TODO do more handling than just logging
})
consumer.on('end', (err) => {
log.error({msg:`'consumer connection ended: ${consumer.data.name}`, error:err})
if (consumer.sid) this.removeConsumer(consumer.sid)
else {
consumer.removeAllListeners()
consumer.stream.removeAllListeners()
}
})
consumer.on('data', stream.onData) // send data to
stream.on('error', (err) => {
log.error({msg:'consumer stream error during listen',error:err})
// TODO do more handling than just logging
})
// consumer.authenticated = true
let [err] = await btc(this.authenticateConsumer)(consumer)
if (!this.allowAnonymous) {
if (err) {
consumer.removeAllListeners()
consumer.stream.removeAllListeners()
consumer.end()// abort new connection consumer, cleanup, remove listeners
consumer.emit('end',err)
return
}
}
// authenticated consumer, add to list of consumers
consumer.sid = ++this.nextConsumerID // server assigned ID
// consumer.authenticated = true
this.consumers.set(consumer.sid, consumer) // add current consumer to consumers
consumer.setKeepAlive(this.keepAlive,30)
const consumerCloseHandler = (sid) => {
log.warn({msg:'consumer connection was closed',sid:sid})
this.removeConsumer(sid)
}
consumer.on('close', consumerCloseHandler.bind(this,consumer.sid))
log.debug({method:'_listen', line:364, msg:'new consumer connected/authenticated', cname:consumer.name, cid:consumer.id, totalConsumers:this.consumers.size})
// all's set enable main incoming message processor
stream.on('message', messageProcess.bind(this, consumer))
if (this.conPackets.length) {
setTimeout(async () => {
for (let packet of this.conPackets) {
packet._header = {type:'on connection packet', id: 'pushed'}
await this._send(consumer,packet) // send a packet command on to consumer on connection
}
},100)
}
this.emit('log',{level:'info', msg:'a consumer connected and authenticated', name:consumer.name, id:consumer.id})
this.emit('connection:consumer',{state:'connected', msg:`consumer ${(consumer.data ||{}).name} connected and authenticated to socket ${this.id}`,
name:(consumer.data ||{}).name ||(consumer.data ||{}).id || consumer.sid,
sid:consumer.sid,
data:consumer.data,
authenticated:consumer.authenticated
})
// that's it. Connection is active
async function messageProcess(consumer, packet) {
this.emit('log',{level:'mcp', packet: packet, consumer:consumer.data, msg:'incoming packet on socket side'})
let res = (await this._packetProcess(clone(packet))) || {}
if (Object.keys(res).length === 0)
res = {
error:
'socket packet command function likely did not return a promise',
packet: packet
}
}
if (packet) {
res._header = clone(packet._header, false) || {} //make sure return packet has header with id in case it was removed in processing
delete packet._header // remove before adding to response header as request
} else res._header = {}
res._header.request = clone(packet, false)
res._header.responder = { name: this.name, instanceID: this.id }
res._header.socket = this.address()
if (!res.cmd) res.cmd = 'reply' // by default return command is 'reply'
let [err, ser] = await btc(stream.serialize)(res)
if (err)
ser = await stream.serialize({
error: 'was not able to serialze the res packet',
err: err,
_header: { id: res._header.id }
})
await send(ser)
} // end process message
}) // end connecttion consumer
log.info({method:'_listen', line:211, opts: this.opt, msg:'socket created and listening'})
return res
}) // end super listen callback
packet: packet
}
if (packet) {
res._header = clone(packet._header, false) || {} //make sure return packet has header with id in case it was removed in processing
delete packet._header // remove before adding to response header as request
} else res._header = {}
res._header.request = clone(packet, false)
res._header.responder = { name: this.name, instanceID: this.id }
res._header.socket = this.address()
if (!res.cmd) res.cmd = this.defaultReturnCmd || 'reply' // by default return command is 'reply'
let [err] = await btc(this._send)(consumer,res)
if (err) log.error({msg:err, error:err})
} // end message process
} // end listen
// call when socket server is going down
async _destroy() {
log.debug({method:'_destroy', line:217, msg:'closing down socket'})
log.fatal({method:'_destroy', line:217, msg:'closing down socket server'})
// this.push()
clearInterval(this._ping)
await this.close()
this.consumers.forEach(consumer => {
consumer.removeAllListeners()
consumer.stream.removeAllListeners()
})
log.debug({method:'_destroy', line:219, msg:'all connections closed....exiting'})
process.exit()
}
// default packet process, just a simple echo
// default packet process, just a simple echo, override with registerPacketProcessor
async _packetProcess(packet) {
return new Promise(resolve => {
resolve(packet)
})
}
// consumer send, must have a consumer socket bound to use
async _send(packet) {
// timeout already set if sockect can't be drained in 10 secs
return new Promise(resolve => {
async _send(consumer, packet) {
log.trace({msg:`sending to consumer:${consumer.sid}:${consumer.data.name}`, consumer:consumer.data, packet:packet})
return new Promise(async (resolve, reject) => {
if (!consumer) {
console.log('no consumer rejecting packet send')
reject('no consumer specified can not send packet')
return
}
if (!consumer.writable) {
console.log('no consumer writeable stream rejecting packet send')
reject('socket stream closed can not send packet')
return
}
let [err,ser] = await btc(consumer.stream.serialize)(packet)
if (err) reject('unable to serialze the packet')
if (!ser) {
// console.log('empty-serialized packet', consumer.name, consumer.socketName)
reject('empty packet rejecting send, nothing to send')
return
}
const cb = () => resolve('packet written to socket stream')
if (!this.write(packet)) {
this.once('drain', cb)
if (!consumer.write(ser)) {
consumer.once('drain', cb)
} else {
process.nextTick(cb)
}
})
}
} // end class