diff --git a/package.json b/package.json index 26a5945..d763b05 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@uci/socket", - "version": "0.2.20", + "version": "0.2.21", "description": "JSON packet intra(named)/inter(TCP) host communication over socket", "main": "src", "scripts": { diff --git a/src/consumer.js b/src/consumer.js index 001b162..e46009a 100644 --- a/src/consumer.js +++ b/src/consumer.js @@ -72,7 +72,7 @@ class SocketConsumer extends Socket { if (this.opts.host === '127.0.0.1'|| this.opts.host ==='localhost') log.warn({method:'connect', line:107, msg:'tcp consumer on same machine as host, use named Pipe(Unix) Socket Instead'}) - log.debug('first connnect attempt for', this.opts.name) + log.debug('first connnect attempt for', this.opts.id) this.emit('status',{level:'info', msg:'attempting initial connection', id:this.id, opts:this.opts, ready:false}) let initTimeout = {} @@ -107,6 +107,8 @@ class SocketConsumer extends Socket { this._listen() // setup for active connection log.info({method:'connect', line:87, msg:'initial connect/authentication complete ready for communication'}) this.emit('status',{level:'info', msg:'authentication succesfull', id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected}) + this.emit('consumer-connection',{state:'connected',name:this.id}) + if (this.opts.conPacket) (this.send(this.conPacket)) resolve('initial connection successful') } } @@ -229,6 +231,8 @@ class SocketConsumer extends Socket { } this.stream.on('message', messageHandler.bind(this)) // reset default message handler this.emit('status',{level:'info', msg:'reconnected', id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected}) + this.emit('consumer-connection', {state:'reconnected', name:this.id}) + if (this.opts.conPacket) (this.send(this.conPacket)) } } } @@ -239,6 +243,7 @@ class SocketConsumer extends Socket { this._authenticated = false this.emit('status',{level:'error', msg:'connection error', id:this.id, opts:this.opts, authenticated:this._authenticated, connected:this._connected}) log.debug({ method:'connect', line:130, error: err.code, msg:`connect error ${err.code}, attempting reconnect after ${this.retryWait/1000} secs`}) + this.emit('consumer-connection', {state:'disconnected', name:this.id}) await pause(this.retryWait) this.stream.removeAllListeners('message') // remove regular message handler in prep for reconnect this.removeAllListeners('connect') @@ -259,7 +264,7 @@ class SocketConsumer extends Socket { const pingHandler = async (packet) => { clearTimeout(pingTimeout) - log.debug({method:'connect', line:191, msg:'received ping, restting timeout'}) + log.trace({method:'connect', line:191, msg:'received ping, restting timeout'}) this._pingTimeout= packet.pingInterval + 1000 monitorPing.call(this) } @@ -276,7 +281,7 @@ class SocketConsumer extends Socket { // general handler function messageHandler(packet) { - log.debug('incoming packet from socket sever',packet) + if (packet._header.id !== 'ping') log.debug('incoming packet from socket sever',packet) this.emit(packet._header.id, packet) } diff --git a/src/socket-class.js b/src/socket-class.js index ff57447..20969b4 100644 --- a/src/socket-class.js +++ b/src/socket-class.js @@ -57,12 +57,12 @@ export default function socketClass(Server) { if (path.dirname(opts.path) === '.') // relative path sent opts.path = path.join(DEFAULT_PIPE_DIR, opts.path) } + this.defaultReturnCmd = opts.defaultReturnCmd this.allowAnonymous = (!opts.tokens || !!process.env.UCI_ANON || opts.allowAnonymous) ? true : false this.tokens = opts.tokens || [] this.keepAlive = 'keepAlive' in opts ? opts.keepAlive : true this.pingInterval = opts.pingInterval === false ? opts.pingInterval : (opts.pingInterval * 1000 || 5000) - // this.clientTracking = opts.clientTracking || true - this.clients = [] // track consumers (i.e. clients) + this.clients = [] // track consumers (i.e. clients) TODO use a Map this.nextClientID = 0 // incrementer for default initial client ID this.opts = opts // for use to recover from selected errors this.errorCount = 0 @@ -126,8 +126,16 @@ export default function socketClass(Server) { this.on('error', err => { this.errorCount +=1 // log errors here this.errors.push(err) - if(this.errorCount>2) this.emit('warn', {msg:'something bad maybe going on, 3 errors', errors:this.errors}) - if(this.errorCount>5) this.emit('fatal', {msg:'something fatal is going on, 6 errors', errors:this.errors}) + if(this.errorCount>2 && this.errorCount<6) { + let errors= {level:'warning',msg:'something bad maybe going on, 3 errors', errors:this.errors} + this.emit('status', errors) + log.error(errors) + } + if(this.errorCount>5) { + let errors = {level:'fatal',msg:'something fatal is going on, 6 errors', errors:this.errors} + log.fatal(errors) + this.emit('status', errors) + } }) log.info({method:'create', line:54, msg:'socket server created and listening at', address:this.address()}) this.on('connection', this._connectionHandler.bind(this)) @@ -207,7 +215,8 @@ export default function socketClass(Server) { async push(packet={},id) { packet._header = {id: id || 'pushed'} if (this.clients.length > 0) { - log.debug({method:'push', line:142, id:packet._header.id, opts: this.opts, packet: packet, msg:'pushing a packet to all connected consumers'}) + log.trace({method:'push', line:142, id:packet._header.id, opts: this.opts, packet: packet, msg:'pushing a packet to all connected consumers'}) + // TODO should do a map and single promise this.clients.forEach(async client => { if (client.writable) { let [err] = await btc(this._send)(client,packet) @@ -219,9 +228,19 @@ export default function socketClass(Server) { } } + // TODO won't need this if moving to a Map + getClientIndex(id) { + return this.clients.findIndex(client => {return client.id === id }) + } + + getClient(id) { + return this.clients[this._getClientIndex(id)] + } + removeClient (id) { - let index = this.clients.findIndex(client => {return client.id === id }) - let client = this.clients[index] + let index = this.getClientIndex(id) + let client=this.clients[index] + this.emit('consumer-connection',{state:'disconnected', id:client.id, name:client.name, socketSide:true}) client.removeAllListeners() client.stream.removeAllListeners() this.clients.splice(index,1) @@ -281,14 +300,16 @@ export default function socketClass(Server) { } - async _connectionHandler(socket) { // this gets called for each client connection and is unique to each - log.debug({method:'_listen', line:167, msg:'new consumer connecting'}) - socket.id = ++this.nextClientID // server assigned ID - socket.authenticated = false - this.clients.push(socket) // add client to list + async _connectionHandler(consumer) { // this gets called for each client connection and is unique to each + consumer.id = ++this.nextClientID // server assigned ID + log.debug({method:'_listen', line:167, msg:'new consumer connecting', id:consumer.id, totalConsumers:this.clients.length}) + consumer.socketSide = true + consumer.authenticated = false + this.clients.push(consumer) // add client to list const stream = new JSONStream() - socket.stream = stream - socket.setKeepAlive(this.keepAlive,3000) + consumer.stream = stream + console.log('new consumer connecting', consumer.id, this.clients.length) + consumer.setKeepAlive(this.keepAlive,3000) // add listeners const clientCloseHandler = (id) => { @@ -296,38 +317,41 @@ export default function socketClass(Server) { this.removeClient(id) } - socket.on('close', clientCloseHandler.bind(this,socket.id) ) + consumer.on('close', clientCloseHandler.bind(this,consumer.id) ) - socket.on('error', (err) => { + consumer.on('error', (err) => { log.error({msg:'client connection error during listen',error:err}) // TODO do more handling than just logging }) - socket.on('data', stream.onData) // send data to + consumer.on('data', stream.onData) // send data to stream.on('error', (err) => { - log.error({msg:'client-socket stream error during listen',error:err}) + log.error({msg:'consumer stream error during listen',error:err}) // TODO do more handling than just logging }) - let [err] = await btc(this.authenticateClient)(socket) + let [err] = await btc(this.authenticateClient)(consumer) if (!this.allowAnonymous) { if (err) { - socket.end()// abort new connection socket, cleanup, remove listeners - this.removeClient(socket.id) + consumer.end()// abort new connection consumer, cleanup, remove listeners + this.removeClient(consumer.id) return } } // all's set main message processor - stream.on('message', messageProcess.bind(this, socket)) + stream.on('message', messageProcess.bind(this, consumer)) if (this.opts.conPacket) { this.opts.conPacket._header = { id: 'pushed' } log.debug({method:'_listen', line:171, conPacket: this.opts.conPacket, msg:'pushing a preset command to just connected consumer'}) - this._send(socket,this.opts.conPacket) // send a packet command on to consumer on connection + this._send(consumer,this.opts.conPacket) // send a packet command on to consumer on connection } + this.emit('consumer-connection',{state:'connected', id:consumer.id, name:consumer.name, socketSide:true}) + this.emit('status',{level:'info', msg:'a consumer connected', name:consumer.name, id:consumer.id}) + // that's it. Connection is active async function messageProcess(client, packet) { @@ -346,7 +370,7 @@ export default function socketClass(Server) { res._header.request = clone(packet, false) res._header.responder = { name: this.name, instanceID: this.id } res._header.socket = this.address() - if (!res.cmd) res.cmd = 'reply' // by default return command is 'reply' + if (!res.cmd) res.cmd = this.defaultReturnCmd || 'reply' // by default return command is 'reply' let [err] = await btc(this._send)(client,res) if (err) log.error({msg:err, error:err}) } // end message process @@ -375,7 +399,7 @@ export default function socketClass(Server) { } async _send(client, packet) { - log.debug({msg:`sending to client:${client.id}`, packet:packet}) + log.trace({msg:`sending to client:${client.id}`, packet:packet}) return new Promise(async (resolve, reject) => { let [err,ser] = await btc(client.stream.serialize)(packet) if (err) reject('unable to serialze the packet')