catching errors in json stream serialize

added environment variable SOCKETS_DIR for socket directory
tls
David Kebler 2018-02-11 19:58:22 -08:00
parent 751491ee00
commit d3b979b1fd
3 changed files with 25 additions and 14 deletions

View File

@ -13,7 +13,7 @@ const LOG_OPTS = {
instance_created:new Date().getTime() instance_created:new Date().getTime()
} }
const DEFAULT_PIPE = __dirname + '/unix.sock' const DEFAULT_PIPE = (process.env.SOCKETS_DIR || __dirname) + '/uci-socket.sock'
export default class Consumer extends Socket { export default class Consumer extends Socket {
constructor (opts={}) { constructor (opts={}) {
@ -58,7 +58,7 @@ export default class Consumer extends Socket {
clearTimeout(timeout) clearTimeout(timeout)
this._listen() this._listen()
log.info({opts:this.opts},'connected waiting for socket ready handshake') log.info({opts:this.opts},'connected waiting for socket ready handshake')
this.setKeepAlive(this.keepAlive) this.setKeepAlive(this.keepAlive,100)
let [err, res] = await btc(isReady).bind(this)(this.__ready, this.wait, this.timeout) let [err, res] = await btc(isReady).bind(this)(this.__ready, this.wait, this.timeout)
if (err) reject(err) if (err) reject(err)
log.info('handshake done, authenticating') log.info('handshake done, authenticating')
@ -86,7 +86,7 @@ export default class Consumer extends Socket {
} }
async send(packet) { async send(packet) {
await this.write(this.stream.serialize(packet)) await this.write(await this.stream.serialize(packet))
// TODO handle possible error // TODO handle possible error
// TODO await response if required by setting id to packet // TODO await response if required by setting id to packet
// then set a flag (and promise) that is resovled in the listener // then set a flag (and promise) that is resovled in the listener
@ -109,7 +109,7 @@ export default class Consumer extends Socket {
this.on('data', this.stream.onData) this.on('data', this.stream.onData)
this.stream.on('message', messageProcess.bind(this)) this.stream.on('message', messageProcess.bind(this))
async function messageProcess (packet) { async function messageProcess (packet) {
if (packet.ready) { if (packet._handshake) {
this._ready = true this._ready = true
return } return }
await this._packetProcess(packet) await this._packetProcess(packet)

View File

@ -2,6 +2,7 @@
import {StringDecoder} from 'string_decoder' import {StringDecoder} from 'string_decoder'
import EventEmitter from 'events' import EventEmitter from 'events'
import btc from 'better-try-catch'
const decoder = new StringDecoder() const decoder = new StringDecoder()
@ -24,11 +25,16 @@ export default class JsonStream extends EventEmitter{
} }
} }
serialize(message) { async serialize(message) {
var messageData = JSON.stringify(message) return new Promise( (resolve,reject) => {
var length = Buffer.byteLength(messageData, 'utf8') let [err,messageData] = btc(JSON.stringify)(message)
var data = length + this._delimeter + messageData if (err) reject(err)
return data let [err2,length] = btc(Buffer.byteLength)(messageData, 'utf8')
if (err2) reject(err2)
let data = length + this._delimeter + messageData
// console.log('serialized',data)
return resolve(data)
})
} }
_handleData (data) { _handleData (data) {

View File

@ -14,7 +14,7 @@ const LOG_OPTS = {
id:this.id, id:this.id,
instance_created:new Date().getTime() instance_created:new Date().getTime()
} }
const DEFAULT_PIPE = __dirname + '/unix.sock' const DEFAULT_PIPE = (process.env.SOCKETS_DIR || __dirname) + '/uci-socket.sock'
export default class Socket extends Server { export default class Socket extends Server {
constructor (opts={}) { constructor (opts={}) {
@ -64,7 +64,7 @@ export default class Socket extends Server {
if (err) reject(err) if (err) reject(err)
resolve(res) resolve(res)
}) // end promise }) // end creeate promise
} // end create } // end create
@ -76,14 +76,19 @@ export default class Socket extends Server {
super.listen(opts, async (err, res) => { super.listen(opts, async (err, res) => {
if (err) return err if (err) return err
// this gets called for each client connection and is unique to each // this gets called for each client connection and is unique to each
this.on('connection', (socket) => { this.on('connection', async (socket) => {
const stream = new JSONStream() const stream = new JSONStream()
log.info('new consumer connecting sending handshake') log.info('new consumer connecting sending handshake')
socket.write(stream.serialize({ready:true})) socket.write(await stream.serialize({'_handshake':true}))
socket.on('data', stream.onData) socket.on('data', stream.onData)
// TODO need to start error listener for stream so errors can be processed
stream.on('message', messageProcess.bind(this)) stream.on('message', messageProcess.bind(this))
async function messageProcess (packet) { async function messageProcess (packet) {
socket.write(stream.serialize(await this._packetProcess(packet))) // console.log('before processing',packet)
let processed = await this._packetProcess(packet)
if (!processed) processed = { error: 'packet command function likely did not return a promise', packet:packet}
// console.log('after processing',processed)
socket.write(await stream.serialize(processed))
} }
}) // end connecttion consumer }) // end connecttion consumer
log.info({opts: this.opts},'socket created') log.info({opts: this.opts},'socket created')