diff --git a/package.json b/package.json index bd622fa..58bbb82 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@uci/socket", - "version": "0.2.31", + "version": "0.2.33", "description": "JSON packet intra(named)/inter(TCP) host communication over socket", "main": "src", "scripts": { @@ -52,6 +52,7 @@ "clone": "^2.1.2", "death": "^1.1.0", "delay": "^4.3.0", - "make-dir": "^3.0.0" + "make-dir": "^3.0.0", + "p-reflect": "^2.1.0" } } diff --git a/src/consumer.js b/src/consumer.js index a224f28..4daa344 100644 --- a/src/consumer.js +++ b/src/consumer.js @@ -70,6 +70,7 @@ class SocketConsumer extends Socket { this._connection = 'offline' this._first = true // first connection or not this._pingTimeout // value sent from socket upon connect + console.log('consumer.js @uci/socket package tag 0.2.33') } get connected() { return this._connected} @@ -188,17 +189,20 @@ class SocketConsumer extends Socket { let [err, res] = await btc(this.stream.serialize)(packet) if (err) resolve({error: 'unable to serialize packet for sending',packet: packet}) - let res2 = await this.__write(res) - if (res2.error) resolve(res2) - // if no write error then wait for send response - this.once(packet._header.id, async function(reply) { - let res = await this._packetProcess(reply) - if (!res) { // if packetProcess was not promise - res = reply - log.debug({method:'send', line:180, msg:'consumer function was not promise returning further processing may be out of sequence'}) - } - resolve(res) // resolves processed packet not return packet - }) //end listener + if (this.active && this.writable) { + console.log('writable can write') + let res2 = await this.__write(res) + if (res2.error) resolve(res2) + // if no write error then wait for send response + this.once(packet._header.id, async function(reply) { + let res = await this._packetProcess(reply) + if (!res) { // if packetProcess was not promise + res = reply + log.debug({method:'send', line:180, msg:'consumer function was not promise returning further processing may be out of sequence'}) + } + resolve(res) // resolves processed packet not return packet + }) //end listener + } } }) } @@ -291,7 +295,10 @@ async function handshake (packet) { this.notify('handshake') const authPacket = Object.assign(this._authenticate() || {}, {_authenticate:true, data:this._data}) // console.log('----------------authentication packet---------------',authPacket) - let res = await this._authenticateSend(authPacket) + console.log('authenticate writable',this.writable) + let res ={} + if (!this.writable) res.error ='socket stream not writable' + else res = await this._authenticateSend(authPacket) this.stream.removeAllListeners('message') clearTimeout(this._doneAuthenticate) if (res.error) { @@ -303,12 +310,14 @@ async function handshake (packet) { if (!res.authenticated) { let msg =`authentication failed: ${res.reason}, disconnecting with no reconnect` this.notify('failed',{msg:msg}) + this.log('error',msg) this.disconnect() } else { this._authenticated = res.authenticated let msg ='authentication succeeded connection ready' this.notify('connected',{msg:msg}) + this.log('info',msg) this._reconnectCount = 0 this.stream.on('message', messageHandler.bind(this)) // reset default message handler this.on('pushed', pushHandler.bind(this) ) diff --git a/src/socket-class.js b/src/socket-class.js index f911a70..80ee406 100644 --- a/src/socket-class.js +++ b/src/socket-class.js @@ -5,6 +5,7 @@ import path from 'path' // npmjs modules import mkdir from 'make-dir' import btc from 'better-try-catch' +import pReflect from 'p-reflect' import _ON_DEATH from 'death' //this is intentionally ugly import JSONStream from './json-stream' import clone from 'clone' @@ -47,6 +48,7 @@ export default function socketClass(Server) { super(opts) delete opts.key delete opts.cert + this.name = opts.name this.id = opts.id || opts.name || 'socket:' + new Date().getTime() if (!opts.path) { opts.host = opts.host || '0.0.0.0' @@ -78,6 +80,7 @@ export default function socketClass(Server) { class: 'Socket', id: this.id }) + console.log('socket-class: @uci/socket package tag 0.2.33') } // end constructor @@ -187,7 +190,7 @@ export default function socketClass(Server) { enablePing () { if (this.pingInterval > 499) { this._ping = setInterval( async () =>{ - if (this.consumers.size > 0) this.push({pingInterval:this.pingInterval},'ping') + if (this.consumers.size > 0) this.push({pingInterval:this.pingInterval},{packetId:'ping'}) },this.pingInterval) } } @@ -238,20 +241,46 @@ export default function socketClass(Server) { * @param {string} id the header id string of the pushed packet, default: 'pushed' * */ - // TODO push to a specific set of consumers only - async push(packet={},id) { - packet._header = {id: id || 'pushed'} + // TODO support multiple consumers in options + async push(packet={},opts={}) { + if (this.consumers.size > 0) { - log.trace({method:'push', line:142, id:packet._header.id, opts: this.opts, packet: packet, msg:'pushing a packet to all connected consumers'}) - // FIXME async with forEach not supported, use loop or map - this.consumers.forEach(async consumer => { - if (consumer.writable) { - let [err] = await btc(this._send)(consumer,packet) - if (err) log.error({msg:err, error:err}) - } - }) + packet._header = {id: opts.packetId || 'pushed'} + let consumers = [] + if ( opts.consumers || opts.consumer ) { + if (opts.consumer) opts.consumers = [opts.consumer] + consumers = Array.from(this.consumers).filter(([sid,consumer]) => + opts.consumers.some(con=> { + console.log(consumer.sid,consumer.data,con) + return ( + con === ( (consumer.data ||{}).name || (consumer.data ||{}).id ) || + con.sid=== sid || + con.name === (consumer.data ||{}).name || + con.id === (consumer.data ||{}).id + ) + } + ) + ).map(con=>con[1]) + // console.log('custom consumers',consumers.length) + } else consumers = Array.from(this.consumers.values()) + + // if (!opts.packetId) { + // console.log('socket class push',packet,opts,consumers.length) + // console.log('consumer for push', consumers.map(consumer=>(consumer.data ||{}).name)) + // } + + consumers = consumers.filter(consumer=>consumer.writable) + + const send = consumer => this._send(consumer,packet) + const res = await Promise.all(consumers.map(send).map(pReflect)) + + const success = res.filter(result => result.isFulfilled).map((result,index) => [consumers[index].name,result.value]) + const errors =res.filter(result => result.isRejected).map((result,index) => [consumers[index].name,result.reason]) + this.emit('log',{level:errors.length? 'error': packet._header.id==='ping'?'trace':'info', msg:'packet was pushed', socket:this.name||this.id, errors:errors, packet:packet, success:success, headerId:packet._header.id}) } else { - log.debug({method:'push', line:165, id:packet._header.id, opts: this.opts, packet: packet, msg:'no connected consumers, push ignored'}) + this.emit('log',{level:'debug', msg:'no connected consumers packet push ignored', packet:packet}) + + // log.debug({method:'push', id:packet._header.id, opts: this.opts, packet: packet, msg:'no connected consumers, push ignored'}) } } @@ -371,11 +400,14 @@ export default function socketClass(Server) { // consumer.authenticated = true this.consumers.set(consumer.sid, consumer) // add current consumer to consumers consumer.setKeepAlive(this.keepAlive,30) + const consumerCloseHandler = (sid) => { log.warn({msg:'consumer connection was closed',sid:sid}) this.removeConsumer(sid) } consumer.on('close', consumerCloseHandler.bind(this,consumer.sid)) + + log.debug({method:'_listen', line:364, msg:'new consumer connected/authenticated', cname:consumer.name, cid:consumer.id, totalConsumers:this.consumers.size}) // all's set enable main incoming message processor stream.on('message', messageProcess.bind(this, consumer)) @@ -383,6 +415,7 @@ export default function socketClass(Server) { if (this.conPackets) { this.conPackets = Array.isArray(this.conPackets) ? this.conPackets : [this.conPackets] log.debug({method:'_listen', line:171, conPacket: this.opts.conPacket, msg:'pushing a preset command to just connected consumer'}) + // FIXME, don't use forEach and use promise map. this.conPackets.forEach(packet => { if (packet) { packet._header = {type:'on connection packet', id: 'pushed'}