0.3.15 refactor push for accepting specific consumers
parent
87e94b765b
commit
84cd362751
|
@ -1,6 +1,6 @@
|
||||||
{
|
{
|
||||||
"name": "@uci/websocket",
|
"name": "@uci/websocket",
|
||||||
"version": "0.3.12",
|
"version": "0.3.15",
|
||||||
"description": "JSON packet host websocket server",
|
"description": "JSON packet host websocket server",
|
||||||
"main": "src",
|
"main": "src",
|
||||||
"scripts": {
|
"scripts": {
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
import { Server as WSS } from 'ws'
|
import { Server as WSS } from 'ws'
|
||||||
import { Server } from 'http'
|
import { Server } from 'http'
|
||||||
import btc from 'better-try-catch'
|
import btc from 'better-try-catch'
|
||||||
|
import pReflect from 'p-reflect'
|
||||||
import { promisify } from 'util'
|
import { promisify } from 'util'
|
||||||
import _ON_DEATH from 'death' //this is intentionally ugly
|
import _ON_DEATH from 'death' //this is intentionally ugly
|
||||||
import clone from 'clone'
|
import clone from 'clone'
|
||||||
|
@ -160,15 +161,37 @@ class Socket extends Server {
|
||||||
* @param {<type>} id - this is the parameter id
|
* @param {<type>} id - this is the parameter id
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
async push(packet={},id) {
|
async push(packet={},opts={}) {
|
||||||
packet._header = {id: id || 'pushed'}
|
|
||||||
if (this.consumers.size > 0) {
|
if (this.consumers.size > 0) {
|
||||||
log.debug({method:'push', line:142, id:packet._header.id, opts: this.opts, packet: packet, msg:'pushing a packet to all connected consumers'})
|
packet._header = {id: opts.packetId || 'pushed'}
|
||||||
this.consumers.forEach(async consumer => {
|
let consumers = []
|
||||||
this._send(consumer,packet)
|
if ( opts.consumers || opts.consumer ) {
|
||||||
})
|
if (opts.consumer) opts.consumers = [opts.consumer]
|
||||||
|
consumers = Array.from(this.consumers).filter(([sid,consumer]) =>
|
||||||
|
opts.consumers.some(con=> {
|
||||||
|
console.log(consumer.sid,consumer.data,con)
|
||||||
|
return (
|
||||||
|
con === ( (consumer.data ||{}).name || (consumer.data ||{}).id ) ||
|
||||||
|
con.sid=== sid ||
|
||||||
|
con.name === (consumer.data ||{}).name ||
|
||||||
|
con.id === (consumer.data ||{}).id
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
).map(con=>con[1])
|
||||||
|
// console.log('custom consumers',consumers.length)
|
||||||
|
} else consumers = Array.from(this.consumers.values())
|
||||||
|
|
||||||
|
consumers = consumers.filter(consumer=>consumer.writable||consumer.readyState===1)
|
||||||
|
|
||||||
|
const send = consumer => this._send(consumer,packet)
|
||||||
|
const res = await Promise.all(consumers.map(send).map(pReflect))
|
||||||
|
|
||||||
|
const success = res.filter(result => result.isFulfilled).map((result,index) => [consumers[index].name,result.value])
|
||||||
|
const errors =res.filter(result => result.isRejected).map((result,index) => [consumers[index].name,result.reason])
|
||||||
|
this.emit('log',{level:errors.length? 'error': packet._header.id==='ping'?'trace':'debug', msg:'packet was pushed', socket:this.name||this.id, errors:errors, packet:packet, success:success, headerId:packet._header.id})
|
||||||
} else {
|
} else {
|
||||||
log.debug({method:'push', line:165, id:packet._header.id, opts: this.opts, packet: packet, msg:'no connected consumers, push ignored'})
|
this.emit('log',{level:'debug', msg:'no connected consumers packet push ignored', packet:packet})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -270,7 +293,6 @@ class Socket extends Server {
|
||||||
this.conPackets.forEach(packet => {
|
this.conPackets.forEach(packet => {
|
||||||
if (packet) {
|
if (packet) {
|
||||||
packet._header = {type:'on connection packet', id: 'pushed'}
|
packet._header = {type:'on connection packet', id: 'pushed'}
|
||||||
// console.log('new consumer',consumer.name,'pushing connection packet \n',packet)
|
|
||||||
this._send(consumer,packet) // send a packet command on to consumer on connection
|
this._send(consumer,packet) // send a packet command on to consumer on connection
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
Loading…
Reference in New Issue