0.2.28 Prevent epipe error exception on socket stream write

master
David Kebler 2019-12-18 18:14:21 -08:00
parent d7fc89cafb
commit 70e16fa08d
3 changed files with 31 additions and 13 deletions

View File

@ -1,6 +1,6 @@
{ {
"name": "@uci/socket", "name": "@uci/socket",
"version": "0.2.27", "version": "0.2.28",
"description": "JSON packet intra(named)/inter(TCP) host communication over socket", "description": "JSON packet intra(named)/inter(TCP) host communication over socket",
"main": "src", "main": "src",
"scripts": { "scripts": {

View File

@ -182,7 +182,9 @@ class SocketConsumer extends Socket {
let [err, res] = await btc(this.stream.serialize)(packet) let [err, res] = await btc(this.stream.serialize)(packet)
if (err) if (err)
resolve({error: 'unable to serialize packet for sending',packet: packet}) resolve({error: 'unable to serialize packet for sending',packet: packet})
await this.__write(res) // write errors will be caught by socket error listener and result in reconnect let res2 = await this.__write(res)
if (res2.error) resolve(res2)
// if no write error then wait for send response
this.once(packet._header.id, async function(reply) { this.once(packet._header.id, async function(reply) {
let res = await this._packetProcess(reply) let res = await this._packetProcess(reply)
if (!res) { // if packetProcess was not promise if (!res) { // if packetProcess was not promise
@ -236,16 +238,17 @@ class SocketConsumer extends Socket {
async __write(packet) { async __write(packet) {
// timeout already set if sockect can't be drained in 10 secs // timeout already set if sockect can't be drained in 10 secs
return new Promise(resolve => { return new Promise((resolve) => {
const cb = () => resolve('packet written to consumer side socket stream ') // most stream write errors will be caught by socket error listener but make sure
try { if (!this.writable) { // stream is writeable as can't catch epipe errors there
if (!super.write(packet)) { resolve({error:'socket stream closed can not send packet'})
this.once('drain', cb) return
} else { }
process.nextTick(cb) const cb = () => resolve({response:'packet written to consumer side socket stream '})
} if (!super.write(packet)) {
} catch (err){ this.once('drain', cb)
resolve({error:`error during write to socket - ${err.code}`, err:err}) } else {
process.nextTick(cb)
} }
}) })
} }
@ -261,7 +264,6 @@ class SocketConsumer extends Socket {
export default SocketConsumer export default SocketConsumer
// CONNECTION HANDLERS // CONNECTION HANDLERS
async function connectHandler () { async function connectHandler () {

View File

@ -311,6 +311,7 @@ export default function socketClass(Server) {
const stream = new JSONStream() const stream = new JSONStream()
consumer.stream = stream consumer.stream = stream
consumer.connected = true
// add listeners // add listeners
consumer.on('error', (err) => { consumer.on('error', (err) => {
@ -318,6 +319,15 @@ export default function socketClass(Server) {
// TODO do more handling than just logging // TODO do more handling than just logging
}) })
consumer.on('end', (err) => {
log.error({msg:'consumer connected ended',error:err})
if (consumer.sid) this.removeClient(consumer.sid)
else {
consumer.removeAllListeners()
consumer.stream.removeAllListeners()
}
})
consumer.on('data', stream.onData) // send data to consumer.on('data', stream.onData) // send data to
stream.on('error', (err) => { stream.on('error', (err) => {
@ -333,6 +343,7 @@ export default function socketClass(Server) {
consumer.removeAllListeners() consumer.removeAllListeners()
consumer.stream.removeAllListeners() consumer.stream.removeAllListeners()
consumer.end()// abort new connection consumer, cleanup, remove listeners consumer.end()// abort new connection consumer, cleanup, remove listeners
consumer.emit('end',err)
return return
} }
} }
@ -409,6 +420,10 @@ export default function socketClass(Server) {
async _send(client, packet) { async _send(client, packet) {
log.trace({msg:`sending to client:${client.id}`, packet:packet}) log.trace({msg:`sending to client:${client.id}`, packet:packet})
return new Promise(async (resolve, reject) => { return new Promise(async (resolve, reject) => {
if (!client.writable) {
reject('socket stream closed can not send packet')
return
}
let [err,ser] = await btc(client.stream.serialize)(packet) let [err,ser] = await btc(client.stream.serialize)(packet)
if (err) reject('unable to serialze the packet') if (err) reject('unable to serialze the packet')
const cb = () => resolve('packet written to socket stream') const cb = () => resolve('packet written to socket stream')
@ -417,6 +432,7 @@ export default function socketClass(Server) {
} else { } else {
process.nextTick(cb) process.nextTick(cb)
} }
}) })
} }
} // end class } // end class