From 70e16fa08d2aa189a61b67bf942340129371e027 Mon Sep 17 00:00:00 2001 From: David Kebler Date: Wed, 18 Dec 2019 18:14:21 -0800 Subject: [PATCH] 0.2.28 Prevent epipe error exception on socket stream write --- package.json | 2 +- src/consumer.js | 26 ++++++++++++++------------ src/socket-class.js | 16 ++++++++++++++++ 3 files changed, 31 insertions(+), 13 deletions(-) diff --git a/package.json b/package.json index fc16297..5648b0a 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@uci/socket", - "version": "0.2.27", + "version": "0.2.28", "description": "JSON packet intra(named)/inter(TCP) host communication over socket", "main": "src", "scripts": { diff --git a/src/consumer.js b/src/consumer.js index aea66c3..cf47f22 100644 --- a/src/consumer.js +++ b/src/consumer.js @@ -182,7 +182,9 @@ class SocketConsumer extends Socket { let [err, res] = await btc(this.stream.serialize)(packet) if (err) resolve({error: 'unable to serialize packet for sending',packet: packet}) - await this.__write(res) // write errors will be caught by socket error listener and result in reconnect + let res2 = await this.__write(res) + if (res2.error) resolve(res2) + // if no write error then wait for send response this.once(packet._header.id, async function(reply) { let res = await this._packetProcess(reply) if (!res) { // if packetProcess was not promise @@ -236,16 +238,17 @@ class SocketConsumer extends Socket { async __write(packet) { // timeout already set if sockect can't be drained in 10 secs - return new Promise(resolve => { - const cb = () => resolve('packet written to consumer side socket stream ') - try { - if (!super.write(packet)) { - this.once('drain', cb) - } else { - process.nextTick(cb) - } - } catch (err){ - resolve({error:`error during write to socket - ${err.code}`, err:err}) + return new Promise((resolve) => { + // most stream write errors will be caught by socket error listener but make sure + if (!this.writable) { // stream is writeable as can't catch epipe errors there + resolve({error:'socket stream closed can not send packet'}) + return + } + const cb = () => resolve({response:'packet written to consumer side socket stream '}) + if (!super.write(packet)) { + this.once('drain', cb) + } else { + process.nextTick(cb) } }) } @@ -261,7 +264,6 @@ class SocketConsumer extends Socket { export default SocketConsumer - // CONNECTION HANDLERS async function connectHandler () { diff --git a/src/socket-class.js b/src/socket-class.js index 0d30b8c..d731f34 100644 --- a/src/socket-class.js +++ b/src/socket-class.js @@ -311,6 +311,7 @@ export default function socketClass(Server) { const stream = new JSONStream() consumer.stream = stream + consumer.connected = true // add listeners consumer.on('error', (err) => { @@ -318,6 +319,15 @@ export default function socketClass(Server) { // TODO do more handling than just logging }) + consumer.on('end', (err) => { + log.error({msg:'consumer connected ended',error:err}) + if (consumer.sid) this.removeClient(consumer.sid) + else { + consumer.removeAllListeners() + consumer.stream.removeAllListeners() + } + }) + consumer.on('data', stream.onData) // send data to stream.on('error', (err) => { @@ -333,6 +343,7 @@ export default function socketClass(Server) { consumer.removeAllListeners() consumer.stream.removeAllListeners() consumer.end()// abort new connection consumer, cleanup, remove listeners + consumer.emit('end',err) return } } @@ -409,6 +420,10 @@ export default function socketClass(Server) { async _send(client, packet) { log.trace({msg:`sending to client:${client.id}`, packet:packet}) return new Promise(async (resolve, reject) => { + if (!client.writable) { + reject('socket stream closed can not send packet') + return + } let [err,ser] = await btc(client.stream.serialize)(packet) if (err) reject('unable to serialze the packet') const cb = () => resolve('packet written to socket stream') @@ -417,6 +432,7 @@ export default function socketClass(Server) { } else { process.nextTick(cb) } + }) } } // end class