0.2.33 refactor push, put in more send/write guards for closed stream
parent
42d4b54008
commit
0d578958a8
|
@ -1,6 +1,6 @@
|
||||||
{
|
{
|
||||||
"name": "@uci/socket",
|
"name": "@uci/socket",
|
||||||
"version": "0.2.31",
|
"version": "0.2.33",
|
||||||
"description": "JSON packet intra(named)/inter(TCP) host communication over socket",
|
"description": "JSON packet intra(named)/inter(TCP) host communication over socket",
|
||||||
"main": "src",
|
"main": "src",
|
||||||
"scripts": {
|
"scripts": {
|
||||||
|
@ -52,6 +52,7 @@
|
||||||
"clone": "^2.1.2",
|
"clone": "^2.1.2",
|
||||||
"death": "^1.1.0",
|
"death": "^1.1.0",
|
||||||
"delay": "^4.3.0",
|
"delay": "^4.3.0",
|
||||||
"make-dir": "^3.0.0"
|
"make-dir": "^3.0.0",
|
||||||
|
"p-reflect": "^2.1.0"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -70,6 +70,7 @@ class SocketConsumer extends Socket {
|
||||||
this._connection = 'offline'
|
this._connection = 'offline'
|
||||||
this._first = true // first connection or not
|
this._first = true // first connection or not
|
||||||
this._pingTimeout // value sent from socket upon connect
|
this._pingTimeout // value sent from socket upon connect
|
||||||
|
console.log('consumer.js @uci/socket package tag 0.2.33')
|
||||||
}
|
}
|
||||||
|
|
||||||
get connected() { return this._connected}
|
get connected() { return this._connected}
|
||||||
|
@ -188,17 +189,20 @@ class SocketConsumer extends Socket {
|
||||||
let [err, res] = await btc(this.stream.serialize)(packet)
|
let [err, res] = await btc(this.stream.serialize)(packet)
|
||||||
if (err)
|
if (err)
|
||||||
resolve({error: 'unable to serialize packet for sending',packet: packet})
|
resolve({error: 'unable to serialize packet for sending',packet: packet})
|
||||||
let res2 = await this.__write(res)
|
if (this.active && this.writable) {
|
||||||
if (res2.error) resolve(res2)
|
console.log('writable can write')
|
||||||
// if no write error then wait for send response
|
let res2 = await this.__write(res)
|
||||||
this.once(packet._header.id, async function(reply) {
|
if (res2.error) resolve(res2)
|
||||||
let res = await this._packetProcess(reply)
|
// if no write error then wait for send response
|
||||||
if (!res) { // if packetProcess was not promise
|
this.once(packet._header.id, async function(reply) {
|
||||||
res = reply
|
let res = await this._packetProcess(reply)
|
||||||
log.debug({method:'send', line:180, msg:'consumer function was not promise returning further processing may be out of sequence'})
|
if (!res) { // if packetProcess was not promise
|
||||||
}
|
res = reply
|
||||||
resolve(res) // resolves processed packet not return packet
|
log.debug({method:'send', line:180, msg:'consumer function was not promise returning further processing may be out of sequence'})
|
||||||
}) //end listener
|
}
|
||||||
|
resolve(res) // resolves processed packet not return packet
|
||||||
|
}) //end listener
|
||||||
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -291,7 +295,10 @@ async function handshake (packet) {
|
||||||
this.notify('handshake')
|
this.notify('handshake')
|
||||||
const authPacket = Object.assign(this._authenticate() || {}, {_authenticate:true, data:this._data})
|
const authPacket = Object.assign(this._authenticate() || {}, {_authenticate:true, data:this._data})
|
||||||
// console.log('----------------authentication packet---------------',authPacket)
|
// console.log('----------------authentication packet---------------',authPacket)
|
||||||
let res = await this._authenticateSend(authPacket)
|
console.log('authenticate writable',this.writable)
|
||||||
|
let res ={}
|
||||||
|
if (!this.writable) res.error ='socket stream not writable'
|
||||||
|
else res = await this._authenticateSend(authPacket)
|
||||||
this.stream.removeAllListeners('message')
|
this.stream.removeAllListeners('message')
|
||||||
clearTimeout(this._doneAuthenticate)
|
clearTimeout(this._doneAuthenticate)
|
||||||
if (res.error) {
|
if (res.error) {
|
||||||
|
@ -303,12 +310,14 @@ async function handshake (packet) {
|
||||||
if (!res.authenticated) {
|
if (!res.authenticated) {
|
||||||
let msg =`authentication failed: ${res.reason}, disconnecting with no reconnect`
|
let msg =`authentication failed: ${res.reason}, disconnecting with no reconnect`
|
||||||
this.notify('failed',{msg:msg})
|
this.notify('failed',{msg:msg})
|
||||||
|
this.log('error',msg)
|
||||||
this.disconnect()
|
this.disconnect()
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
this._authenticated = res.authenticated
|
this._authenticated = res.authenticated
|
||||||
let msg ='authentication succeeded connection ready'
|
let msg ='authentication succeeded connection ready'
|
||||||
this.notify('connected',{msg:msg})
|
this.notify('connected',{msg:msg})
|
||||||
|
this.log('info',msg)
|
||||||
this._reconnectCount = 0
|
this._reconnectCount = 0
|
||||||
this.stream.on('message', messageHandler.bind(this)) // reset default message handler
|
this.stream.on('message', messageHandler.bind(this)) // reset default message handler
|
||||||
this.on('pushed', pushHandler.bind(this) )
|
this.on('pushed', pushHandler.bind(this) )
|
||||||
|
|
|
@ -5,6 +5,7 @@ import path from 'path'
|
||||||
// npmjs modules
|
// npmjs modules
|
||||||
import mkdir from 'make-dir'
|
import mkdir from 'make-dir'
|
||||||
import btc from 'better-try-catch'
|
import btc from 'better-try-catch'
|
||||||
|
import pReflect from 'p-reflect'
|
||||||
import _ON_DEATH from 'death' //this is intentionally ugly
|
import _ON_DEATH from 'death' //this is intentionally ugly
|
||||||
import JSONStream from './json-stream'
|
import JSONStream from './json-stream'
|
||||||
import clone from 'clone'
|
import clone from 'clone'
|
||||||
|
@ -47,6 +48,7 @@ export default function socketClass(Server) {
|
||||||
super(opts)
|
super(opts)
|
||||||
delete opts.key
|
delete opts.key
|
||||||
delete opts.cert
|
delete opts.cert
|
||||||
|
this.name = opts.name
|
||||||
this.id = opts.id || opts.name || 'socket:' + new Date().getTime()
|
this.id = opts.id || opts.name || 'socket:' + new Date().getTime()
|
||||||
if (!opts.path) {
|
if (!opts.path) {
|
||||||
opts.host = opts.host || '0.0.0.0'
|
opts.host = opts.host || '0.0.0.0'
|
||||||
|
@ -78,6 +80,7 @@ export default function socketClass(Server) {
|
||||||
class: 'Socket',
|
class: 'Socket',
|
||||||
id: this.id
|
id: this.id
|
||||||
})
|
})
|
||||||
|
console.log('socket-class: @uci/socket package tag 0.2.33')
|
||||||
} // end constructor
|
} // end constructor
|
||||||
|
|
||||||
|
|
||||||
|
@ -187,7 +190,7 @@ export default function socketClass(Server) {
|
||||||
enablePing () {
|
enablePing () {
|
||||||
if (this.pingInterval > 499) {
|
if (this.pingInterval > 499) {
|
||||||
this._ping = setInterval( async () =>{
|
this._ping = setInterval( async () =>{
|
||||||
if (this.consumers.size > 0) this.push({pingInterval:this.pingInterval},'ping')
|
if (this.consumers.size > 0) this.push({pingInterval:this.pingInterval},{packetId:'ping'})
|
||||||
},this.pingInterval)
|
},this.pingInterval)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -238,20 +241,46 @@ export default function socketClass(Server) {
|
||||||
* @param {string} id the header id string of the pushed packet, default: 'pushed'
|
* @param {string} id the header id string of the pushed packet, default: 'pushed'
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
// TODO push to a specific set of consumers only
|
// TODO support multiple consumers in options
|
||||||
async push(packet={},id) {
|
async push(packet={},opts={}) {
|
||||||
packet._header = {id: id || 'pushed'}
|
|
||||||
if (this.consumers.size > 0) {
|
if (this.consumers.size > 0) {
|
||||||
log.trace({method:'push', line:142, id:packet._header.id, opts: this.opts, packet: packet, msg:'pushing a packet to all connected consumers'})
|
packet._header = {id: opts.packetId || 'pushed'}
|
||||||
// FIXME async with forEach not supported, use loop or map
|
let consumers = []
|
||||||
this.consumers.forEach(async consumer => {
|
if ( opts.consumers || opts.consumer ) {
|
||||||
if (consumer.writable) {
|
if (opts.consumer) opts.consumers = [opts.consumer]
|
||||||
let [err] = await btc(this._send)(consumer,packet)
|
consumers = Array.from(this.consumers).filter(([sid,consumer]) =>
|
||||||
if (err) log.error({msg:err, error:err})
|
opts.consumers.some(con=> {
|
||||||
}
|
console.log(consumer.sid,consumer.data,con)
|
||||||
})
|
return (
|
||||||
|
con === ( (consumer.data ||{}).name || (consumer.data ||{}).id ) ||
|
||||||
|
con.sid=== sid ||
|
||||||
|
con.name === (consumer.data ||{}).name ||
|
||||||
|
con.id === (consumer.data ||{}).id
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
).map(con=>con[1])
|
||||||
|
// console.log('custom consumers',consumers.length)
|
||||||
|
} else consumers = Array.from(this.consumers.values())
|
||||||
|
|
||||||
|
// if (!opts.packetId) {
|
||||||
|
// console.log('socket class push',packet,opts,consumers.length)
|
||||||
|
// console.log('consumer for push', consumers.map(consumer=>(consumer.data ||{}).name))
|
||||||
|
// }
|
||||||
|
|
||||||
|
consumers = consumers.filter(consumer=>consumer.writable)
|
||||||
|
|
||||||
|
const send = consumer => this._send(consumer,packet)
|
||||||
|
const res = await Promise.all(consumers.map(send).map(pReflect))
|
||||||
|
|
||||||
|
const success = res.filter(result => result.isFulfilled).map((result,index) => [consumers[index].name,result.value])
|
||||||
|
const errors =res.filter(result => result.isRejected).map((result,index) => [consumers[index].name,result.reason])
|
||||||
|
this.emit('log',{level:errors.length? 'error': packet._header.id==='ping'?'trace':'info', msg:'packet was pushed', socket:this.name||this.id, errors:errors, packet:packet, success:success, headerId:packet._header.id})
|
||||||
} else {
|
} else {
|
||||||
log.debug({method:'push', line:165, id:packet._header.id, opts: this.opts, packet: packet, msg:'no connected consumers, push ignored'})
|
this.emit('log',{level:'debug', msg:'no connected consumers packet push ignored', packet:packet})
|
||||||
|
|
||||||
|
// log.debug({method:'push', id:packet._header.id, opts: this.opts, packet: packet, msg:'no connected consumers, push ignored'})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -371,11 +400,14 @@ export default function socketClass(Server) {
|
||||||
// consumer.authenticated = true
|
// consumer.authenticated = true
|
||||||
this.consumers.set(consumer.sid, consumer) // add current consumer to consumers
|
this.consumers.set(consumer.sid, consumer) // add current consumer to consumers
|
||||||
consumer.setKeepAlive(this.keepAlive,30)
|
consumer.setKeepAlive(this.keepAlive,30)
|
||||||
|
|
||||||
const consumerCloseHandler = (sid) => {
|
const consumerCloseHandler = (sid) => {
|
||||||
log.warn({msg:'consumer connection was closed',sid:sid})
|
log.warn({msg:'consumer connection was closed',sid:sid})
|
||||||
this.removeConsumer(sid)
|
this.removeConsumer(sid)
|
||||||
}
|
}
|
||||||
consumer.on('close', consumerCloseHandler.bind(this,consumer.sid))
|
consumer.on('close', consumerCloseHandler.bind(this,consumer.sid))
|
||||||
|
|
||||||
|
|
||||||
log.debug({method:'_listen', line:364, msg:'new consumer connected/authenticated', cname:consumer.name, cid:consumer.id, totalConsumers:this.consumers.size})
|
log.debug({method:'_listen', line:364, msg:'new consumer connected/authenticated', cname:consumer.name, cid:consumer.id, totalConsumers:this.consumers.size})
|
||||||
// all's set enable main incoming message processor
|
// all's set enable main incoming message processor
|
||||||
stream.on('message', messageProcess.bind(this, consumer))
|
stream.on('message', messageProcess.bind(this, consumer))
|
||||||
|
@ -383,6 +415,7 @@ export default function socketClass(Server) {
|
||||||
if (this.conPackets) {
|
if (this.conPackets) {
|
||||||
this.conPackets = Array.isArray(this.conPackets) ? this.conPackets : [this.conPackets]
|
this.conPackets = Array.isArray(this.conPackets) ? this.conPackets : [this.conPackets]
|
||||||
log.debug({method:'_listen', line:171, conPacket: this.opts.conPacket, msg:'pushing a preset command to just connected consumer'})
|
log.debug({method:'_listen', line:171, conPacket: this.opts.conPacket, msg:'pushing a preset command to just connected consumer'})
|
||||||
|
// FIXME, don't use forEach and use promise map.
|
||||||
this.conPackets.forEach(packet => {
|
this.conPackets.forEach(packet => {
|
||||||
if (packet) {
|
if (packet) {
|
||||||
packet._header = {type:'on connection packet', id: 'pushed'}
|
packet._header = {type:'on connection packet', id: 'pushed'}
|
||||||
|
|
Loading…
Reference in New Issue