uci-base/src/processing.js

107 lines
4.2 KiB
JavaScript

import to from 'await-to-js'
import logger from '@uci-utils/logger'
let log = logger({ package: 'base',file:'processing.js'})
// this._processing refers to this module/hash
// processing errors that are caught should be sent back to consumer in packets with :error property
// but they might also throw local errors/execptions so they should get caught here, logged and pushed back
// messaging errors on socket will not be fatal to the entire socket server
// common processor, will call based on type s or c the ones below
const cmdProcessor = async function (packet,socket) {
let [err,res] = await to(_process[this.getSocket(socket).type].call(this,packet,socket))
if (err) {
let error = {cmd:'error', error:err, packet:packet, socket:socket, function:'processor', line: 15, msg:`'unhandled error in packet command function ${packet.cmd}`}
log.error(error)
res = Object.assign({},packet,error)
if (process.env.UCI_PUSH_UNHANDLED==='true') this.push(res)
if (process.env.UCI_SHOW_UNHANDLED==='true') console.log(error)
}
return res
}
// default processors for socket/server and consumer/client
const _process = {
s: async function (packet,socket) {
if (!packet.cmd) return {error: 'no command (cmd:) in packet for socket', packet: packet }
// this call will search the namespace and envoke a function and return a repsonse packet
let response = await this._callCmdFunc(packet,socket); if(response!=='failed') return response
return {error: 'no socket processing function supplied for command', packet: packet }
},
c: async function (packet,socket) {
// the the end of life for a consumer packet that has been sent and returned or a packet that was pushed.
if (packet.error) packet.cmd='error'
if (!packet.cmd) packet.cmd ='reply'
let response = await this._callCmdFunc(packet,socket); if(response!=='failed') return response
packet = {error:`no consumer return processing function supplied for ${packet.cmd}`, packet:packet}
this._c.error(packet)
}
}
// default name spaces
const namespaces = {
s: ['_s'], // default command functions below
c: ['_c'], // default command functions below
cn: ['_cn'],
ct: ['_ct'],
cm: ['_cm'],
// no cw because websocket client runs in browser only
sn: ['_sn'],
st: ['_st'],
sm: ['_sm'],
sw: ['_sw'],
}
/*
*
* Default packet command processing functions
*
*/
const defaultCmds ={
s:{
echo: function (packet) {
packet.processed = true
packet.msg = 'default socket echo'
return packet
},
// add sedning along an ack to any consumers and or pushing to other sockets on this device
ack: async function (packet) {
packet.cmd = 'reply'
packet.ack = true
packet.msg = 'this is the base default ack, superceed in your instance or extended class'
return packet
},
ready: async function (packet) {
const event = packet.event || packet.name || packet.id
delete(packet._header)
this.emit(event,packet.ready,packet)
this.emit('log', {level:'ready', msg:'change in ready state received via send', ready:packet.ready, packet:packet})
setTimeout(()=>this.emit('log', {level:'state', msg:'new ready state', state:this.ready.state}),1000)
return {cmd:'reply', msg:'consumer sent event was emitted event at socket process', event:event}
}
},
c:{
error: function (packet) { // default
log.error({error:packet.error, packet:packet, msg:'==========Consumer Sent Packet returned with ERROR ========='})
return packet
},
reply: function(packet) {
if (process.env.UCI_ENV==='dev') log.debug({packet:packet, msg:'====Packet returned from socket - default reply logger==='})
return packet
},
ready: async function (packet) {
const event = packet.event || packet.name || packet.id
delete(packet._header)
this.emit(event,packet.ready,packet)
this.emit('log', {level:'ready', msg:'change in ready state received via push', ready:packet.ready, packet:packet})
setTimeout(()=>this.emit('log', {level:'state', msg:'new ready state', state:this.ready.state}),1000)
return {cmd:'reply', msg:'ready packet event was emitted at consumer process from push'}
}
}
}
export { cmdProcessor, defaultCmds, namespaces }