diff --git a/package.json b/package.json index 0602cd6..a76e436 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@uci/websocket", - "version": "0.3.12", + "version": "0.3.15", "description": "JSON packet host websocket server", "main": "src", "scripts": { diff --git a/src/socket.js b/src/socket.js index 6365bb2..eaedefa 100644 --- a/src/socket.js +++ b/src/socket.js @@ -1,6 +1,7 @@ import { Server as WSS } from 'ws' import { Server } from 'http' import btc from 'better-try-catch' +import pReflect from 'p-reflect' import { promisify } from 'util' import _ON_DEATH from 'death' //this is intentionally ugly import clone from 'clone' @@ -160,15 +161,37 @@ class Socket extends Server { * @param {} id - this is the parameter id * */ - async push(packet={},id) { - packet._header = {id: id || 'pushed'} + async push(packet={},opts={}) { if (this.consumers.size > 0) { - log.debug({method:'push', line:142, id:packet._header.id, opts: this.opts, packet: packet, msg:'pushing a packet to all connected consumers'}) - this.consumers.forEach(async consumer => { - this._send(consumer,packet) - }) + packet._header = {id: opts.packetId || 'pushed'} + let consumers = [] + if ( opts.consumers || opts.consumer ) { + if (opts.consumer) opts.consumers = [opts.consumer] + consumers = Array.from(this.consumers).filter(([sid,consumer]) => + opts.consumers.some(con=> { + console.log(consumer.sid,consumer.data,con) + return ( + con === ( (consumer.data ||{}).name || (consumer.data ||{}).id ) || + con.sid=== sid || + con.name === (consumer.data ||{}).name || + con.id === (consumer.data ||{}).id + ) + } + ) + ).map(con=>con[1]) + // console.log('custom consumers',consumers.length) + } else consumers = Array.from(this.consumers.values()) + + consumers = consumers.filter(consumer=>consumer.writable||consumer.readyState===1) + + const send = consumer => this._send(consumer,packet) + const res = await Promise.all(consumers.map(send).map(pReflect)) + + const success = res.filter(result => result.isFulfilled).map((result,index) => [consumers[index].name,result.value]) + const errors =res.filter(result => result.isRejected).map((result,index) => [consumers[index].name,result.reason]) + this.emit('log',{level:errors.length? 'error': packet._header.id==='ping'?'trace':'debug', msg:'packet was pushed', socket:this.name||this.id, errors:errors, packet:packet, success:success, headerId:packet._header.id}) } else { - log.debug({method:'push', line:165, id:packet._header.id, opts: this.opts, packet: packet, msg:'no connected consumers, push ignored'}) + this.emit('log',{level:'debug', msg:'no connected consumers packet push ignored', packet:packet}) } } @@ -270,7 +293,6 @@ class Socket extends Server { this.conPackets.forEach(packet => { if (packet) { packet._header = {type:'on connection packet', id: 'pushed'} - // console.log('new consumer',consumer.name,'pushing connection packet \n',packet) this._send(consumer,packet) // send a packet command on to consumer on connection } })