404 lines
22 KiB
HTML
404 lines
22 KiB
HTML
<!DOCTYPE html>
|
||
|
||
<html>
|
||
<head>
|
||
<title>socket.js</title>
|
||
<meta http-equiv="content-type" content="text/html; charset=UTF-8">
|
||
<meta name="viewport" content="width=device-width, target-densitydpi=160dpi, initial-scale=1.0; maximum-scale=1.0; user-scalable=0;">
|
||
<link rel="stylesheet" media="all" href="../docco.css" />
|
||
</head>
|
||
<body>
|
||
<div id="container">
|
||
<div id="background"></div>
|
||
|
||
<ul id="jump_to">
|
||
<li>
|
||
<a class="large" href="javascript:void(0);">Jump To …</a>
|
||
<a class="small" href="javascript:void(0);">+</a>
|
||
<div id="jump_wrapper">
|
||
<div id="jump_page_wrapper">
|
||
<div id="jump_page">
|
||
|
||
|
||
<a class="source" href="consumer.html">
|
||
./src/consumer.js
|
||
</a>
|
||
|
||
|
||
<a class="source" href="index.html">
|
||
./src/index.js
|
||
</a>
|
||
|
||
|
||
<a class="source" href="json-stream.html">
|
||
./src/json-stream.js
|
||
</a>
|
||
|
||
|
||
<a class="source" href="socket.html">
|
||
./src/socket.js
|
||
</a>
|
||
|
||
</div>
|
||
</div>
|
||
</li>
|
||
</ul>
|
||
|
||
<ul class="sections">
|
||
|
||
<li id="title">
|
||
<div class="annotation">
|
||
<h1>socket.js</h1>
|
||
</div>
|
||
</li>
|
||
|
||
|
||
|
||
<li id="section-1">
|
||
<div class="annotation">
|
||
|
||
<div class="pilwrap ">
|
||
<a class="pilcrow" href="#section-1">¶</a>
|
||
</div>
|
||
|
||
</div>
|
||
|
||
<div class="content"><div class='highlight'><pre><span class="hljs-keyword">import</span> { Server } <span class="hljs-keyword">from</span> <span class="hljs-string">'net'</span>
|
||
<span class="hljs-keyword">import</span> { unlink <span class="hljs-keyword">as</span> fileDelete } <span class="hljs-keyword">from</span> <span class="hljs-string">'fs'</span>
|
||
<span class="hljs-keyword">import</span> { promisify } <span class="hljs-keyword">from</span> <span class="hljs-string">'util'</span>
|
||
<span class="hljs-keyword">import</span> path <span class="hljs-keyword">from</span> <span class="hljs-string">'path'</span>
|
||
<span class="hljs-keyword">import</span> mkdir <span class="hljs-keyword">from</span> <span class="hljs-string">'make-dir'</span>
|
||
<span class="hljs-keyword">import</span> btc <span class="hljs-keyword">from</span> <span class="hljs-string">'better-try-catch'</span>
|
||
<span class="hljs-keyword">import</span> _ON_DEATH <span class="hljs-keyword">from</span> <span class="hljs-string">'death'</span> <span class="hljs-comment">//this is intentionally ugly</span>
|
||
<span class="hljs-keyword">import</span> JSONStream <span class="hljs-keyword">from</span> <span class="hljs-string">'./json-stream'</span>
|
||
<span class="hljs-keyword">import</span> clone <span class="hljs-keyword">from</span> <span class="hljs-string">'clone'</span></pre></div></div>
|
||
|
||
</li>
|
||
|
||
|
||
<li id="section-2">
|
||
<div class="annotation">
|
||
|
||
<div class="pilwrap ">
|
||
<a class="pilcrow" href="#section-2">¶</a>
|
||
</div>
|
||
<p>import logger from ‘../../uci-logger/src/logger’</p>
|
||
|
||
</div>
|
||
|
||
<div class="content"><div class='highlight'><pre><span class="hljs-keyword">import</span> logger <span class="hljs-keyword">from</span> <span class="hljs-string">'@uci/logger'</span>
|
||
<span class="hljs-keyword">let</span> log = {}</pre></div></div>
|
||
|
||
</li>
|
||
|
||
|
||
<li id="section-3">
|
||
<div class="annotation">
|
||
|
||
<div class="pilwrap ">
|
||
<a class="pilcrow" href="#section-3">¶</a>
|
||
</div>
|
||
<p>TODO change default pipe dir for windows and mac os</p>
|
||
|
||
</div>
|
||
|
||
<div class="content"><div class='highlight'><pre><span class="hljs-keyword">const</span> DEFAULT_PIPE_DIR = (process.env.SOCKETS_DIR || <span class="hljs-string">'/tmp/UCI'</span>)
|
||
<span class="hljs-keyword">const</span> DEFAULT_SOCKET_NAME = <span class="hljs-string">'uci-sock'</span>
|
||
|
||
<span class="hljs-keyword">export</span> <span class="hljs-keyword">default</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">Socket</span> <span class="hljs-keyword">extends</span> <span class="hljs-title">Server</span> </span>{
|
||
<span class="hljs-keyword">constructor</span> (opts={}) {
|
||
<span class="hljs-keyword">super</span>()
|
||
<span class="hljs-keyword">this</span>.id = opts.id || opts.name || <span class="hljs-string">'socket:'</span>+ <span class="hljs-keyword">new</span> <span class="hljs-built_in">Date</span>().getTime()
|
||
<span class="hljs-keyword">if</span> (!opts.path) {
|
||
opts.host = opts.host || <span class="hljs-string">'0.0.0.0'</span>
|
||
opts.port = opts.port || <span class="hljs-number">8080</span>
|
||
} <span class="hljs-keyword">else</span> {
|
||
<span class="hljs-keyword">if</span> (<span class="hljs-keyword">typeof</span> opts.path === <span class="hljs-string">'boolean'</span>) opts.path = path.join(DEFAULT_PIPE_DIR,DEFAULT_SOCKET_NAME )
|
||
<span class="hljs-keyword">if</span> (path.dirname(opts.path)===<span class="hljs-string">'.'</span>) opts.path = path.join(DEFAULT_PIPE_DIR,opts.path )
|
||
}
|
||
<span class="hljs-keyword">this</span>.clientTracking = opts.clientTracking || <span class="hljs-literal">true</span>
|
||
<span class="hljs-keyword">this</span>.clients = [] <span class="hljs-comment">// track consumers (i.e. clients)</span>
|
||
<span class="hljs-keyword">this</span>.opts = opts <span class="hljs-comment">// for use to recover from selected errors</span></pre></div></div>
|
||
|
||
</li>
|
||
|
||
|
||
<li id="section-4">
|
||
<div class="annotation">
|
||
|
||
<div class="pilwrap ">
|
||
<a class="pilcrow" href="#section-4">¶</a>
|
||
</div>
|
||
<p>self bindings</p>
|
||
|
||
</div>
|
||
|
||
<div class="content"><div class='highlight'><pre> <span class="hljs-keyword">this</span>._listen = <span class="hljs-keyword">this</span>._listen.bind(<span class="hljs-keyword">this</span>)
|
||
<span class="hljs-keyword">this</span>.create = <span class="hljs-keyword">this</span>.create.bind(<span class="hljs-keyword">this</span>)
|
||
log = logger({<span class="hljs-attr">file</span>:<span class="hljs-string">'src/socket.js'</span>,<span class="hljs-attr">class</span>:<span class="hljs-string">'Socket'</span>,<span class="hljs-attr">name</span>:<span class="hljs-string">'socket'</span>,<span class="hljs-attr">id</span>:<span class="hljs-keyword">this</span>.id})
|
||
} <span class="hljs-comment">// end constructor</span>
|
||
|
||
<span class="hljs-keyword">async</span> create () {
|
||
|
||
<span class="hljs-keyword">return</span> <span class="hljs-keyword">new</span> <span class="hljs-built_in">Promise</span>( <span class="hljs-keyword">async</span> (resolve,reject) => {</pre></div></div>
|
||
|
||
</li>
|
||
|
||
|
||
<li id="section-5">
|
||
<div class="annotation">
|
||
|
||
<div class="pilwrap ">
|
||
<a class="pilcrow" href="#section-5">¶</a>
|
||
</div>
|
||
<p>couple ways to kill socket process when needed</p>
|
||
|
||
</div>
|
||
|
||
<div class="content"><div class='highlight'><pre> _ON_DEATH( <span class="hljs-keyword">async</span> () => {
|
||
log.info(<span class="hljs-string">'\nhe\'s dead jim'</span>)
|
||
<span class="hljs-keyword">await</span> <span class="hljs-keyword">this</span>._destroy()
|
||
})
|
||
process.once(<span class="hljs-string">'SIGUSR2'</span>, <span class="hljs-keyword">async</span> () => {
|
||
<span class="hljs-keyword">await</span> <span class="hljs-keyword">this</span>._destroy
|
||
process.kill(process.pid, <span class="hljs-string">'SIGUSR2'</span>)
|
||
})
|
||
|
||
<span class="hljs-keyword">this</span>.once(<span class="hljs-string">'error'</span>, <span class="hljs-keyword">async</span> (err) => {</pre></div></div>
|
||
|
||
</li>
|
||
|
||
|
||
<li id="section-6">
|
||
<div class="annotation">
|
||
|
||
<div class="pilwrap ">
|
||
<a class="pilcrow" href="#section-6">¶</a>
|
||
</div>
|
||
<p>recover from socket file that was not removed</p>
|
||
|
||
</div>
|
||
|
||
<div class="content"><div class='highlight'><pre> <span class="hljs-keyword">if</span> (err.code === <span class="hljs-string">'EADDRINUSE'</span>) {
|
||
<span class="hljs-keyword">if</span> (<span class="hljs-keyword">this</span>.opts.path) { <span class="hljs-comment">// if TCP socket should already be dead</span>
|
||
<span class="hljs-keyword">let</span> [err, res] = <span class="hljs-keyword">await</span> btc(promisify(fileDelete))(<span class="hljs-keyword">this</span>.opts.path)
|
||
<span class="hljs-keyword">if</span>(!err) {
|
||
log.info({<span class="hljs-attr">res</span>:res, <span class="hljs-attr">socket</span>: <span class="hljs-keyword">this</span>.opts.path}, <span class="hljs-string">'socket already exists.....deleted'</span>)
|
||
<span class="hljs-keyword">return</span> <span class="hljs-keyword">await</span> <span class="hljs-keyword">this</span>._listen(<span class="hljs-keyword">this</span>.opts)
|
||
}
|
||
log.fatal({<span class="hljs-attr">err</span>:err},<span class="hljs-string">'error deleting socket. Can not establish a socket'</span>)
|
||
<span class="hljs-keyword">return</span> err
|
||
}
|
||
}
|
||
<span class="hljs-keyword">if</span> (err.code ===<span class="hljs-string">'EACCES'</span>){
|
||
<span class="hljs-built_in">console</span>.log({<span class="hljs-attr">socket</span>: <span class="hljs-keyword">this</span>.opts.path}, <span class="hljs-string">'directory does not exist...creating'</span>)
|
||
<span class="hljs-keyword">await</span> mkdir(path.dirname(<span class="hljs-keyword">this</span>.opts.path))
|
||
<span class="hljs-built_in">console</span>.log({<span class="hljs-attr">socket</span>: <span class="hljs-keyword">this</span>.opts.path}, <span class="hljs-string">'created'</span>)
|
||
log.warn({<span class="hljs-attr">socket</span>: <span class="hljs-keyword">this</span>.opts.path}, <span class="hljs-string">'directory does not exist...creating'</span>)
|
||
<span class="hljs-keyword">return</span> <span class="hljs-keyword">await</span> <span class="hljs-keyword">this</span>._listen(<span class="hljs-keyword">this</span>.opts)
|
||
}</pre></div></div>
|
||
|
||
</li>
|
||
|
||
|
||
<li id="section-7">
|
||
<div class="annotation">
|
||
|
||
<div class="pilwrap ">
|
||
<a class="pilcrow" href="#section-7">¶</a>
|
||
</div>
|
||
<p>otherwise fatally exit</p>
|
||
|
||
</div>
|
||
|
||
<div class="content"><div class='highlight'><pre> log.info(err, <span class="hljs-string">'creating socket'</span>)
|
||
reject(err)
|
||
})
|
||
|
||
<span class="hljs-keyword">let</span> [err, res] = <span class="hljs-keyword">await</span> btc(<span class="hljs-keyword">this</span>._listen)(<span class="hljs-keyword">this</span>.opts)
|
||
<span class="hljs-keyword">if</span> (err) reject(err)
|
||
resolve(res)
|
||
|
||
}) <span class="hljs-comment">// end creeate promise</span>
|
||
} <span class="hljs-comment">// end create</span>
|
||
|
||
registerPacketProcessor (func) {
|
||
<span class="hljs-keyword">this</span>._packetProcess = func
|
||
}
|
||
|
||
<span class="hljs-keyword">async</span> _listen (opts) {
|
||
<span class="hljs-keyword">super</span>.listen(opts, <span class="hljs-keyword">async</span> (err, res) => {
|
||
<span class="hljs-keyword">if</span> (err) <span class="hljs-keyword">return</span> err</pre></div></div>
|
||
|
||
</li>
|
||
|
||
|
||
<li id="section-8">
|
||
<div class="annotation">
|
||
|
||
<div class="pilwrap ">
|
||
<a class="pilcrow" href="#section-8">¶</a>
|
||
</div>
|
||
<p>this gets called for each client connection and is unique to each</p>
|
||
|
||
</div>
|
||
|
||
<div class="content"><div class='highlight'><pre> <span class="hljs-keyword">this</span>.on(<span class="hljs-string">'connection'</span>, <span class="hljs-keyword">async</span> (socket) => {
|
||
<span class="hljs-keyword">const</span> stream = <span class="hljs-keyword">new</span> JSONStream()
|
||
socket.stream = stream <span class="hljs-comment">// need this to track clients</span>
|
||
<span class="hljs-keyword">let</span> send = <span class="hljs-keyword">this</span>._send.bind(socket)
|
||
<span class="hljs-keyword">if</span> (<span class="hljs-keyword">this</span>.clientTracking) <span class="hljs-keyword">this</span>.clients.push(socket)</pre></div></div>
|
||
|
||
</li>
|
||
|
||
|
||
<li id="section-9">
|
||
<div class="annotation">
|
||
|
||
<div class="pilwrap ">
|
||
<a class="pilcrow" href="#section-9">¶</a>
|
||
</div>
|
||
<p>TODO add ‘close’ listener to socket to remove from this.clients</p>
|
||
|
||
</div>
|
||
|
||
<div class="content"><div class='highlight'><pre> log.info(<span class="hljs-string">'new consumer connecting'</span>)
|
||
log.info(<span class="hljs-keyword">await</span> send(<span class="hljs-keyword">await</span> stream.serialize({<span class="hljs-string">'_handshake'</span>:<span class="hljs-literal">true</span>})))
|
||
<span class="hljs-keyword">if</span> (<span class="hljs-keyword">this</span>.opts.conPacket) {
|
||
<span class="hljs-keyword">this</span>.opts.conPacket._header = { <span class="hljs-attr">id</span>:<span class="hljs-string">'pushed'</span>}
|
||
log.info({<span class="hljs-attr">conPacket</span>:<span class="hljs-keyword">this</span>.opts.conPacket},<span class="hljs-string">'pushing a preset command to just connected consumer'</span>)
|
||
send(<span class="hljs-keyword">await</span> stream.serialize(<span class="hljs-keyword">this</span>.opts.conPacket)) <span class="hljs-comment">// send a packet command on to consumer on connection</span>
|
||
}
|
||
socket.on(<span class="hljs-string">'data'</span>, stream.onData)</pre></div></div>
|
||
|
||
</li>
|
||
|
||
|
||
<li id="section-10">
|
||
<div class="annotation">
|
||
|
||
<div class="pilwrap ">
|
||
<a class="pilcrow" href="#section-10">¶</a>
|
||
</div>
|
||
<p>TODO need to start error listener for stream so errors can be processed</p>
|
||
|
||
</div>
|
||
|
||
<div class="content"><div class='highlight'><pre> stream.on(<span class="hljs-string">'message'</span>, messageProcess.bind(<span class="hljs-keyword">this</span>,socket))
|
||
|
||
<span class="hljs-keyword">async</span> <span class="hljs-function"><span class="hljs-keyword">function</span> <span class="hljs-title">messageProcess</span> (<span class="hljs-params">client, packet</span>) </span>{
|
||
log.info({<span class="hljs-attr">packet</span>:packet},<span class="hljs-string">'incoming packet on socket side'</span>)
|
||
<span class="hljs-keyword">let</span> res = {}
|
||
<span class="hljs-keyword">if</span> (<span class="hljs-keyword">this</span>.clientTracking && packet.clientID) {
|
||
client.ID = packet.clientID
|
||
res.cmd=<span class="hljs-string">'ackID'</span>
|
||
}
|
||
<span class="hljs-keyword">else</span> {
|
||
res = <span class="hljs-keyword">await</span> <span class="hljs-keyword">this</span>._packetProcess(clone(packet)) || {}
|
||
<span class="hljs-keyword">if</span> (<span class="hljs-built_in">Object</span>.keys(res).length === <span class="hljs-number">0</span>) res = { <span class="hljs-attr">error</span>: <span class="hljs-string">'socket packet command function likely did not return a promise'</span>, <span class="hljs-attr">packet</span>:packet}
|
||
}
|
||
<span class="hljs-keyword">if</span> (packet) {
|
||
res._header = clone(packet._header,<span class="hljs-literal">false</span>) || {} <span class="hljs-comment">//make sure return packet has header with id in case it was removed in processing</span>
|
||
<span class="hljs-keyword">delete</span> packet._header <span class="hljs-comment">// remove before adding to response header as request</span>
|
||
} <span class="hljs-keyword">else</span> res._header = {}
|
||
res._header.request = clone(packet,<span class="hljs-literal">false</span>)
|
||
res._header.responder = {<span class="hljs-attr">name</span>:<span class="hljs-keyword">this</span>.name,<span class="hljs-attr">instanceID</span>:<span class="hljs-keyword">this</span>.id}
|
||
res._header.socket = <span class="hljs-keyword">this</span>.address()
|
||
<span class="hljs-keyword">if</span> (!res.cmd) res.cmd = <span class="hljs-string">'reply'</span> <span class="hljs-comment">// by default return command is 'reply'</span>
|
||
<span class="hljs-keyword">let</span> [err, ser] = <span class="hljs-keyword">await</span> btc(stream.serialize)(res)
|
||
<span class="hljs-keyword">if</span> (err) ser = <span class="hljs-keyword">await</span> stream.serialize({ <span class="hljs-attr">error</span>: <span class="hljs-string">'was not able to serialze the res packet'</span>, <span class="hljs-attr">err</span>:err, <span class="hljs-attr">_header</span>:{<span class="hljs-attr">id</span>:res._header.id}})
|
||
log.info(<span class="hljs-keyword">await</span> send(ser))
|
||
} <span class="hljs-comment">// end process message</span>
|
||
|
||
}) <span class="hljs-comment">// end connecttion consumer</span>
|
||
log.info({<span class="hljs-attr">opts</span>: <span class="hljs-keyword">this</span>.opts},<span class="hljs-string">'socket created'</span>)
|
||
<span class="hljs-keyword">return</span> res
|
||
}) <span class="hljs-comment">// end super listen callback</span>
|
||
|
||
} <span class="hljs-comment">// end listen</span>
|
||
|
||
<span class="hljs-keyword">async</span> _destroy () {
|
||
log.info(<span class="hljs-string">'closing down socket'</span>)
|
||
<span class="hljs-keyword">await</span> <span class="hljs-keyword">this</span>.close()
|
||
log.info(<span class="hljs-string">'all connections closed....exiting'</span>)
|
||
process.exit()
|
||
}</pre></div></div>
|
||
|
||
</li>
|
||
|
||
|
||
<li id="section-11">
|
||
<div class="annotation">
|
||
|
||
<div class="pilwrap ">
|
||
<a class="pilcrow" href="#section-11">¶</a>
|
||
</div>
|
||
<p>default packet process, just a simple echo</p>
|
||
|
||
</div>
|
||
|
||
<div class="content"><div class='highlight'><pre> <span class="hljs-keyword">async</span> _packetProcess (packet) {
|
||
<span class="hljs-keyword">return</span> <span class="hljs-keyword">new</span> <span class="hljs-built_in">Promise</span>(<span class="hljs-function"><span class="hljs-params">resolve</span> =></span> {
|
||
resolve(packet)
|
||
})
|
||
}</pre></div></div>
|
||
|
||
</li>
|
||
|
||
|
||
<li id="section-12">
|
||
<div class="annotation">
|
||
|
||
<div class="pilwrap ">
|
||
<a class="pilcrow" href="#section-12">¶</a>
|
||
</div>
|
||
<p>must have a consumer socket bound to use</p>
|
||
|
||
</div>
|
||
|
||
<div class="content"><div class='highlight'><pre> <span class="hljs-keyword">async</span> _send(packet) {</pre></div></div>
|
||
|
||
</li>
|
||
|
||
|
||
<li id="section-13">
|
||
<div class="annotation">
|
||
|
||
<div class="pilwrap ">
|
||
<a class="pilcrow" href="#section-13">¶</a>
|
||
</div>
|
||
<p>timeout already set if sockect can’t be drained in 10 secs</p>
|
||
|
||
</div>
|
||
|
||
<div class="content"><div class='highlight'><pre> <span class="hljs-keyword">return</span> <span class="hljs-keyword">new</span> <span class="hljs-built_in">Promise</span>(<span class="hljs-function"><span class="hljs-params">resolve</span> =></span> {
|
||
<span class="hljs-keyword">const</span> cb = <span class="hljs-function"><span class="hljs-params">()</span> =></span> resolve(<span class="hljs-string">'packet written to socket stream'</span>)
|
||
<span class="hljs-keyword">if</span> (!<span class="hljs-keyword">this</span>.write(packet)) {
|
||
<span class="hljs-keyword">this</span>.once(<span class="hljs-string">'drain'</span>,cb )
|
||
} <span class="hljs-keyword">else</span> {
|
||
process.nextTick(cb)
|
||
}
|
||
})
|
||
}
|
||
|
||
<span class="hljs-keyword">async</span> push (packet,id) {
|
||
packet._header = { <span class="hljs-attr">id</span>:<span class="hljs-string">'pushed'</span>}
|
||
log.info({<span class="hljs-attr">opts</span>:<span class="hljs-keyword">this</span>.opts,<span class="hljs-attr">packet</span>:packet},<span class="hljs-string">'pushing a packet to all connected consumers'</span>)
|
||
<span class="hljs-keyword">this</span>.clients.forEach(<span class="hljs-keyword">async</span> (client) => {
|
||
<span class="hljs-keyword">if</span> (client.writable) {
|
||
<span class="hljs-keyword">let</span> [err, ser] = <span class="hljs-keyword">await</span> btc(client.stream.serialize)(packet)
|
||
<span class="hljs-keyword">if</span> (err) ser = <span class="hljs-keyword">await</span> client.stream.serialize({ <span class="hljs-attr">error</span>: <span class="hljs-string">'was not able to serialze the res packet'</span>, <span class="hljs-attr">err</span>:err, <span class="hljs-attr">_header</span>:{<span class="hljs-attr">id</span>:packet._header.id}})
|
||
<span class="hljs-keyword">if</span> (!id || id ===client.ID ) <span class="hljs-keyword">await</span> <span class="hljs-keyword">this</span>._send.bind(client)(ser)
|
||
}
|
||
})
|
||
}
|
||
|
||
} <span class="hljs-comment">// end class</span></pre></div></div>
|
||
|
||
</li>
|
||
|
||
</ul>
|
||
</div>
|
||
</body>
|
||
</html>
|