From d3b979b1fdd54fd522101e009bb3c36dc5ea5977 Mon Sep 17 00:00:00 2001 From: David Kebler Date: Sun, 11 Feb 2018 19:58:22 -0800 Subject: [PATCH] catching errors in json stream serialize added environment variable SOCKETS_DIR for socket directory --- src/consumer.mjs | 8 ++++---- src/json-stream.mjs | 16 +++++++++++----- src/socket.mjs | 15 ++++++++++----- 3 files changed, 25 insertions(+), 14 deletions(-) diff --git a/src/consumer.mjs b/src/consumer.mjs index e57b838..9d6338c 100644 --- a/src/consumer.mjs +++ b/src/consumer.mjs @@ -13,7 +13,7 @@ const LOG_OPTS = { instance_created:new Date().getTime() } -const DEFAULT_PIPE = __dirname + '/unix.sock' +const DEFAULT_PIPE = (process.env.SOCKETS_DIR || __dirname) + '/uci-socket.sock' export default class Consumer extends Socket { constructor (opts={}) { @@ -58,7 +58,7 @@ export default class Consumer extends Socket { clearTimeout(timeout) this._listen() log.info({opts:this.opts},'connected waiting for socket ready handshake') - this.setKeepAlive(this.keepAlive) + this.setKeepAlive(this.keepAlive,100) let [err, res] = await btc(isReady).bind(this)(this.__ready, this.wait, this.timeout) if (err) reject(err) log.info('handshake done, authenticating') @@ -86,7 +86,7 @@ export default class Consumer extends Socket { } async send(packet) { - await this.write(this.stream.serialize(packet)) + await this.write(await this.stream.serialize(packet)) // TODO handle possible error // TODO await response if required by setting id to packet // then set a flag (and promise) that is resovled in the listener @@ -109,7 +109,7 @@ export default class Consumer extends Socket { this.on('data', this.stream.onData) this.stream.on('message', messageProcess.bind(this)) async function messageProcess (packet) { - if (packet.ready) { + if (packet._handshake) { this._ready = true return } await this._packetProcess(packet) diff --git a/src/json-stream.mjs b/src/json-stream.mjs index 152fa97..6cf06e6 100644 --- a/src/json-stream.mjs +++ b/src/json-stream.mjs @@ -2,6 +2,7 @@ import {StringDecoder} from 'string_decoder' import EventEmitter from 'events' +import btc from 'better-try-catch' const decoder = new StringDecoder() @@ -24,11 +25,16 @@ export default class JsonStream extends EventEmitter{ } } - serialize(message) { - var messageData = JSON.stringify(message) - var length = Buffer.byteLength(messageData, 'utf8') - var data = length + this._delimeter + messageData - return data + async serialize(message) { + return new Promise( (resolve,reject) => { + let [err,messageData] = btc(JSON.stringify)(message) + if (err) reject(err) + let [err2,length] = btc(Buffer.byteLength)(messageData, 'utf8') + if (err2) reject(err2) + let data = length + this._delimeter + messageData + // console.log('serialized',data) + return resolve(data) + }) } _handleData (data) { diff --git a/src/socket.mjs b/src/socket.mjs index 6454d64..5500031 100644 --- a/src/socket.mjs +++ b/src/socket.mjs @@ -14,7 +14,7 @@ const LOG_OPTS = { id:this.id, instance_created:new Date().getTime() } -const DEFAULT_PIPE = __dirname + '/unix.sock' +const DEFAULT_PIPE = (process.env.SOCKETS_DIR || __dirname) + '/uci-socket.sock' export default class Socket extends Server { constructor (opts={}) { @@ -64,7 +64,7 @@ export default class Socket extends Server { if (err) reject(err) resolve(res) - }) // end promise + }) // end creeate promise } // end create @@ -76,14 +76,19 @@ export default class Socket extends Server { super.listen(opts, async (err, res) => { if (err) return err // this gets called for each client connection and is unique to each - this.on('connection', (socket) => { + this.on('connection', async (socket) => { const stream = new JSONStream() log.info('new consumer connecting sending handshake') - socket.write(stream.serialize({ready:true})) + socket.write(await stream.serialize({'_handshake':true})) socket.on('data', stream.onData) + // TODO need to start error listener for stream so errors can be processed stream.on('message', messageProcess.bind(this)) async function messageProcess (packet) { - socket.write(stream.serialize(await this._packetProcess(packet))) + // console.log('before processing',packet) + let processed = await this._packetProcess(packet) + if (!processed) processed = { error: 'packet command function likely did not return a promise', packet:packet} + // console.log('after processing',processed) + socket.write(await stream.serialize(processed)) } }) // end connecttion consumer log.info({opts: this.opts},'socket created')