diff --git a/package.json b/package.json index f666063..8214bc9 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@uci/base", - "version": "0.1.27", + "version": "0.1.28", "description": "Multi type and transport JSON packet communication base class. Used in UCI extended classes", "main": "src/base", "scripts": { diff --git a/src/base.js b/src/base.js index 3b3b8d9..ae5fb65 100644 --- a/src/base.js +++ b/src/base.js @@ -177,11 +177,16 @@ class Base extends EventEmitter { this._socket[name].transport = transport this._socket[name]._packetProcess = this._packetProcess.bind(this, name) - // bubble up any socket 'status' events to the base instance - this._socket[name].on('status', ev => { - ev.socketName=name - this.emit('status', ev) - }) + if (type==='c') { // bubble up events from client sockets + this._socket[name].on('status', ev => { + ev.socketName=name + this.emit('status', ev) + }) + this._socket[name].on('pushed', packet => { + packet._header.socketName=name + this.emit('pushed', packet) + }) + } // do this as .then promise then addSocket doesn't need to be async before init if (this._started) return await this._initSocket(name) @@ -227,7 +232,7 @@ class Base extends EventEmitter { if (this._socket[name].type === 'c') { let hookedPacket = {} hookedPacket = this._socket[name].beforeSend ? await this._socket[name].beforeSend.call(this,Object.assign({},packet)) : packet - log.debug({msg:'after possible hook packet to send', name:name, packet:hookedPacket, method:'send', line:217}) + log.debug({msg:'after hook, sending packet', name:name, packet:hookedPacket, method:'send', line:235}) sends.push(this._socket[name].send.bind(this._socket[name],hookedPacket)) } } @@ -329,8 +334,7 @@ class Base extends EventEmitter { } // TODO confirm Object.assign will be ok as it is not a deep copy - // one off add a command function or two to basic namespaces which is called before default - + // allow a single or arrary of single functions amendCommands(funcs, trans, type) { if (!trans && !type) type = 's' if (trans ==='c' || trans ==='s') { @@ -422,16 +426,27 @@ class Base extends EventEmitter { async _packetProcess(socket_name, packet) { log.debug({ socket:socket_name, packet:packet, method:'_packetProcess', line:393, msg:'processing incoming packet'}) + let header = packet._header ? packet._header : {} // retain header + let err, res + if (this._socket[socket_name].beforeProcess) { + [err,res] = await to(this._socket[socket_name].beforeProcess.call(this,packet)) + if (err) { // hook has forced an abort to processing + console.log('before error', packet) + packet.error = err + return packet + } + packet = res + } + // if (this._socket[socket_name].beforeProcess) packet = await this._socket[socket_name].beforeProcess.call(this,packet) // the processor can be set via the incoming packet // otherwise if one is set on the socket or the default found in processing.js - // TODO?? Try all each available packet processors in some order if fails try next one before trying the default - if (this._socket[socket_name].beforeProcess) packet = await this._socket[socket_name].beforeProcess.call(this,packet) - if (packet.error) return packet // hook invalidated the packet abort further processing + // TODO Try each "available" packet processor in some order if fails try next one before trying the default let processor = packet._processor || this._processors[socket_name] ? socket_name : '_default' - let res = await this._processors[processor].call(this,packet,socket_name) + res = (await this._processors[processor].call(this,packet,socket_name))|| packet // processor didn't return a packet then return the packet sent log.debug({ socket:socket_name, response:res, msg:'processed packet ready for hook'}) if (this._socket[socket_name].afterProcess) res = await this._socket[socket_name].afterProcess.call(this,res) log.debug({ socket:socket_name, response:res, msg:'packet after hook complete ready for return'}) + res._header = Object.assign(header,res._header) // re-apply header in case hooks or processor mangled or removed it return res } diff --git a/src/processing.js b/src/processing.js index af4dce2..92703d1 100644 --- a/src/processing.js +++ b/src/processing.js @@ -5,18 +5,20 @@ let log = logger({ package: 'base',file:'processing.js'}) // this._processing refers to this module/hash // processing errors that are caught should be sent back to consumer in packets with :error property -// but they might also throw local errors/execptions so they should bubble up here and get caught and logged +// but they might also throw local errors/execptions so they should get caught here, logged and pushed back // messaging errors on socket will not be fatal to the entire socket server // common processor, will call based on type s or c the ones below const processor = async function (packet,socket) { - let [err,res] = await to(_process[this.getSocket(socket).type].bind(this)(packet,socket)) + let [err,res] = await to(_process[this.getSocket(socket).type].bind(this)(packet,socket)) if (err) { - let error = {error:err, socket:socket, packet:packet, function:'processor', line: 15, msg:'some possibly unhandled badness happened during packet processing'} + let error = {cmd:'error', error:err, packet:packet, socket:socket, function:'processor', line: 15, msg:`'unhandled error in packet command function ${packet.cmd}`} log.error(error) + res = Object.assign({},packet,error) + if (process.env.UCI_PUSH_UNHANDLED==='true') this.push(res) if (process.env.UCI_SHOW_UNHANDLED==='true') console.log(error) } - else return res + return res } export { processor, defaultCmds, namespaces } @@ -33,13 +35,13 @@ const _process = { c: async function (packet,socket) { // the the end of life for a consumer packet that has been sent and returned or a packet that was pushed. - if (packet.error) return await this._c.error(packet) + if (packet.error) packet.cmd='error' if (packet.cmd) { let response = await this._callCmdFunc(packet,socket); if(response!=='failed') return response - packet = {error:'no consumer reply processing function supplied for command',packet:packet} + packet = {error:'no consumer processing function supplied in for command in returned packet',packet:packet} this._c.error(packet) } else { - packet = {error:'[consumer] no command in reply packet',packet:packet} + packet = {error:'[consumer] no command in returned packet',packet:packet} return await this._c.error(packet) } } @@ -81,11 +83,11 @@ const defaultCmds ={ } }, c:{ - error: function (packet) { - if (process.env.UCI_ENV==='dev') log.error({error:packet.error, packet:packet, msg:'==========Consumer Sent Packet returned with ERROR ========='}) + error: function (packet) { // default + log.error({error:packet.error, packet:packet, msg:'==========Consumer Sent Packet returned with ERROR ========='}) }, reply: function(packet) { - if (process.env.UCI_ENV==='dev') log.debug({packet:packet, msg:'====Packet returned from socket - debug default reply logger==='}) + if (process.env.UCI_ENV==='dev') log.info({packet:packet, msg:'====Packet returned from socket - default reply logger==='}) } } }