2019-01-01 16:39:08 -08:00
// Direct External Dependencies
// none
2018-05-16 11:17:38 -07:00
2019-01-01 16:39:08 -08:00
// UCI dependencies
// UCI communication transport communication modules
// TODO change to dynamic import so loads only if that socket type is requestd
import Socket from '@uci/socket' // tcp or named pipe
import MQTT from '@uci/mqtt' // requires broker
import WebSocket from '@uci/websocket' // server only - client is for web browser only
// UCI helpers
2019-02-14 14:01:21 -08:00
import { bindFuncs } from '@uci-utils/bind-funcs'
2019-01-01 16:39:08 -08:00
// UCI logger
2019-02-14 14:01:21 -08:00
import logger from '@uci-utils/logger'
2019-01-01 16:39:08 -08:00
let log = { } // declare module wide log to be set during construction
// Community dependencies
2019-03-17 13:45:19 -07:00
import to from 'await-to-js'
2018-01-18 21:32:07 -08:00
import EventEmitter from 'events'
2019-08-29 13:41:32 -07:00
// import pSettle from 'p-settle'
// import pReflect from 'p-reflect'
2018-01-27 23:20:33 -08:00
2019-01-01 16:39:08 -08:00
// Internal dependencies
2019-04-26 11:05:10 -07:00
import { processor , defaultCmds , namespaces } from './processing'
2018-02-05 22:05:38 -08:00
2019-09-16 18:05:03 -07:00
// Constants
2019-01-01 16:39:08 -08:00
const SOCKET _INFO _KEYS = [ 'name' , 'type' , 'transport' ]
const TRANSLATE = {
n : 'Named Pipe' ,
t : 'TCP' ,
s : 'Socket' ,
c : 'Consumer' ,
m : 'MQTT' ,
w : 'Web Socket'
}
/ * *
* @ class Base
* @ description
* An inter - process inter - machine multi socket communication class . < / b r >
* It is extended to derive many other UCI classes thus the name "Base" < / b r >
* The class itself is extended from the { @ link https : //nodejs.org/api/events.html#events_class_eventemitter nodejs EventEmitter } to support in process communication as well
*
* @ extends EventEmitter
* @ param { Object } opts hash of options
* @ param { String } [ opts . id = 'ucit-base' + timestamp ] and id for this process / instance used for logging , default : uci - base + timestamp
* @ param { String } [ opts . desc ] additional description for humans , can be logged .
* @ param { String | Boolean } [ opts . path = either full path to where socket should be created or 'true' ( which uses default path ]
* @ param { string } [ opts . sockets ] comma delimited strong of sockets to be created each socket of the form [ name ] # [ c or s ] > [ n , t , m or w ]
* @ param { Boolean } [ opts . useRootNS = false ] include the "root" of the created base instance as part of the namespace for which packet cmd ' s try to call corresponding function
* @ param { Object } [ opts . ( name of socket ) ] options for a particular socket created by opts . socket . see UCI guide { @ link ? content = guide # sockets | creating sockets }
* @ property { array } socket collection of all sockets created and used by base instance as per opts . sockets or addSocket method
*
* @ example
* import Base from '@uci/base'
* // options object example, credated is a socket of each transport using their defaults
* // and a tcp consumer socket that connects to a tcp socket on another host 'somehost' at port 8888
* const opts = id : mybase , sockets : 'us#s>n,tc#c>t,ts#s>t,mqtts#s>m,webs#s>w' , tc : { host : 'somehost' , port : 8888 } }
* let mybaseprocess = new Base ( opts )
* /
class Base extends EventEmitter {
constructor ( opts = { } ) {
2018-01-18 21:32:07 -08:00
super ( )
2019-01-01 16:39:08 -08:00
this . id = opts . id || opts . name || 'uci-base:' + new Date ( ) . getTime ( )
log = logger ( { name : 'base' , id : this . id } )
2018-01-18 21:32:07 -08:00
this . desc = opts . desc // additional details for humans
2019-04-26 11:05:10 -07:00
this . _socket = { } // holds all the various communication sockets
2019-09-13 19:05:22 -07:00
// these two if passed will get applied to all consumer sockets, otherwise socket defaults will be used
this . initTimeout = opts . initTimeout
this . retryWait = opts . retryWait
this . defaultReturnCmd = opts . defaultReturnCmd
2019-01-01 16:39:08 -08:00
this . _started = false // flag to know when instance has been initialized
2018-02-05 22:05:38 -08:00
this . _processors = { _default : processor }
2019-04-26 11:05:10 -07:00
// _c and _s are the default namespaces
2018-02-06 18:30:00 -08:00
this . _namespaces = namespaces
2019-04-26 11:05:10 -07:00
this . _c = defaultCmds . c
this . _s = defaultCmds . s
2019-01-01 16:39:08 -08:00
if ( opts . useRootNS ) {
// add root of instance to checking for command functions - not recommended!
this . _namespaces . s . splice ( - 1 , 0 , null )
this . _namespaces . c . splice ( - 1 , 0 , null )
2018-05-25 14:39:48 -07:00
}
2019-04-26 11:05:10 -07:00
// method that will bind a whole object tree of functions
2018-05-20 15:44:31 -07:00
this . bindFuncs = bindFuncs
// predefined sockets:
// comma delimited list of this form '<name>#<c/p/s>><n=np/t=tcp/m=mqtt/w=web>'
2019-04-26 11:05:10 -07:00
this . _socket = { }
2019-01-01 16:39:08 -08:00
if ( opts . sockets ) {
opts . sockets . split ( /[,|\s]+/ ) . forEach ( socketStr => {
2018-03-02 08:35:25 -08:00
let socket = { }
2019-01-01 16:39:08 -08:00
socketStr . split ( /[>#]+/ ) . map ( function ( prop , index ) {
2018-03-02 08:35:25 -08:00
socket [ SOCKET _INFO _KEYS [ index ] ] = prop
} )
2019-01-01 16:39:08 -08:00
this . addSocket (
socket . name ,
socket . type ,
socket . transport ,
opts [ socket . name ]
)
2018-02-02 14:09:52 -08:00
} )
2018-04-05 15:35:29 -07:00
}
2018-01-18 21:32:07 -08:00
} // end constructor
2019-01-01 16:39:08 -08:00
/ *
* -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --
* CLASS METHODS
* -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --
* /
// PUBLIC METHODS
2018-05-16 11:17:38 -07:00
2019-01-01 16:39:08 -08:00
/ * *
* initialize the instance with the set options . This must be called to initialize all sockets and connections
* @ async
* @ public
* @ required
* /
async init ( ) {
2019-08-29 13:41:32 -07:00
let results = { }
let errors = { }
const pReflect = async socket => {
try {
const value = await socket . init ( )
results [ socket . name ] = value
} catch ( error ) {
2019-09-13 19:05:22 -07:00
this . emit ( 'status' , { level : 'fatal' , msg : 'socket init error' , error : error } ) // emit an error here, remove socket
2019-08-29 13:41:32 -07:00
let res = await this . removeSocket ( socket . name )
errors [ socket . name ] = { error : error , remove : res }
}
}
2018-01-30 21:12:38 -08:00
let sockets = [ ]
2019-04-26 11:05:10 -07:00
for ( let name of Object . keys ( this . _socket ) ) {
2019-08-29 13:41:32 -07:00
sockets . push ( this . _initSocket ( name ) )
2018-01-27 23:20:33 -08:00
}
2019-08-29 13:41:32 -07:00
await Promise . all ( sockets . map ( pReflect ) )
if ( Object . keys ( errors ) . length === 0 ) errors = false
this . _started = true
return { results : results , errors : errors }
}
2018-01-27 23:20:33 -08:00
2019-01-01 16:39:08 -08:00
/ * *
* addSocket - Add a socket at runtime as opposed to via the sockets option at creation
*
* @ param { type } name Name of socket ( usually something short but unique )
* @ param { string } [ type = c ] consumer / client 'c' or socket / server 's'
* @ param { string } [ transport = n ] transport : ( n ) named pipe / unix socket , ( t ) tcp socket , ( m ) mqtt subscribe , ( w ) websocket
* @ param { object } [ options = { } ] options for that particular type / transport of socket ( i . e . path , host , port , etc )
*
* @ returns { string } Description
* /
async addSocket ( name , type = 'c' , transport = 'n' , options = { } ) {
2019-04-26 11:05:10 -07:00
log . debug ( { socketName : name , type : type , tranport : transport , options : options , method : 'addSocket' , line : 147 , msg : ` adding socket ${ name } ` } )
2019-09-13 19:05:22 -07:00
options . id = options . id || this . id + ':' + name
if ( type === 'c' ) options = Object . assign ( { initTimeout : this . initTimeout , retryWait : this . retryWait } , options )
if ( type === 's' ) options = Object . assign ( { defaultReturnCmd : this . defaultReturnCmd } , options )
2018-05-20 15:44:31 -07:00
switch ( transport ) {
case 'n' :
options . path = options . path || true
2019-01-01 16:39:08 -08:00
// falls through
2018-05-20 15:44:31 -07:00
case 't' :
2019-04-26 11:05:10 -07:00
this . _socket [ name ] = new Socket [ TRANSLATE [ type ] ] ( options )
2018-05-20 15:44:31 -07:00
break
case 'm' :
2019-01-01 16:39:08 -08:00
if ( type === 'p' ) type = 'c'
2019-09-16 18:05:03 -07:00
options . client = type === 'c' ? true : false
2018-05-20 15:44:31 -07:00
options . connect = options . connect || { }
2019-04-26 11:05:10 -07:00
if ( options . host ) options . connect . host = options . host
if ( options . port ) options . connect . port = options . port
2018-05-20 15:44:31 -07:00
options . connect . connectTimeout = options . connect . connectTimeout || 5000
2019-04-26 11:05:10 -07:00
this . _socket [ name ] = new MQTT ( options )
2018-05-20 15:44:31 -07:00
break
case 'w' :
2019-04-26 11:05:10 -07:00
if ( type === 's' ) this . _socket [ name ] = new WebSocket ( options )
2019-01-01 16:39:08 -08:00
else
2019-04-26 11:05:10 -07:00
log . warn ( { name : name , type : type , transport : transport , method : 'addSocket' , line : 167 , msg : 'Web socket not created Consumer/Client Web Socket not supported' } )
2018-05-20 15:44:31 -07:00
}
2019-04-26 11:05:10 -07:00
this . _socket [ name ] . name = name
this . _socket [ name ] . type = type
this . _socket [ name ] . transport = transport
this . _socket [ name ] . _packetProcess = this . _packetProcess . bind ( this , name )
2019-08-29 13:41:32 -07:00
2019-09-16 18:05:03 -07:00
// bubble up events from sockets to instance
const EVENTS = [ 'status' , 'consumer-connection' ] // that should emit up from socket to instance
EVENTS . forEach ( event => {
this . _socket [ name ] . on ( event , obj => {
if ( Object . prototype . toString . call ( obj ) !== '[object Object]' ) {
let data = obj
obj = { }
obj . data = data
}
obj . socketName = name
this . emit ( event , obj )
} )
2019-09-13 19:05:22 -07:00
} )
if ( type === 'c' ) {
2019-09-11 21:35:08 -07:00
this . _socket [ name ] . on ( 'pushed' , packet => {
packet . _header . socketName = name
this . emit ( 'pushed' , packet )
} )
}
2019-08-29 13:41:32 -07:00
2019-09-13 19:05:22 -07:00
// if instance already started then init this socket now
2019-03-17 13:45:19 -07:00
if ( this . _started ) return await this . _initSocket ( name )
2018-05-20 15:44:31 -07:00
else return ` socket ${ name } added `
2018-05-16 11:17:38 -07:00
}
2019-01-01 16:39:08 -08:00
/ * *
* removeSocket - TODO not available
*
* @ param { string } name name of socket as created
* @ returns { String | Object } success string or error object
* /
async removeSocket ( name ) {
2019-03-17 13:45:19 -07:00
// NOTE: uci consumers have .end renamed as .close to match socket method for convenience
2019-08-29 13:41:32 -07:00
let closeError
2019-04-26 11:05:10 -07:00
let [ err ] = await to ( this . _socket [ name ] . close ( ) )
2019-09-13 19:05:22 -07:00
if ( err ) if ( err . code !== 'ERR_SERVER_NOT_RUNNING' ) {
2019-08-29 13:41:32 -07:00
closeError = { socket : this . _socket [ name ] . name , error : err , msg : 'socket/consumer closed with errors, but removed' }
}
2019-09-13 19:05:22 -07:00
this . emit ( 'status' , { level : 'warn' , msg : ` socket ${ name } has been removed ` , socket : this . _socket [ name ] . opts } )
2019-04-26 11:05:10 -07:00
delete this . _socket [ name ]
2019-08-29 13:41:32 -07:00
return closeError ? closeError : 'success'
2018-05-16 11:17:38 -07:00
}
2018-03-02 08:35:25 -08:00
2019-04-26 11:05:10 -07:00
getSocket ( name ) {
if ( name ) return this . _socket [ name ]
else return this . _socket
}
2019-01-01 16:39:08 -08:00
/ * *
* send - Description
*
* @ param { String } name name of socket for send ( must be a consumer otherwise use push for server )
* @ param { Object } packet
*
* @ returns { type } Description
* /
async send ( name , packet ) {
2018-02-13 22:31:02 -08:00
if ( typeof name !== 'string' ) {
packet = name
2018-02-13 14:19:18 -08:00
let sends = [ ]
2019-04-26 11:05:10 -07:00
for ( let name of Object . keys ( this . _socket ) ) {
if ( this . _socket [ name ] . type === 'c' ) {
let hookedPacket = { }
hookedPacket = this . _socket [ name ] . beforeSend ? await this . _socket [ name ] . beforeSend . call ( this , Object . assign ( { } , packet ) ) : packet
2019-09-11 21:35:08 -07:00
log . debug ( { msg : 'after hook, sending packet' , name : name , packet : hookedPacket , method : 'send' , line : 235 } )
2019-04-26 11:05:10 -07:00
sends . push ( this . _socket [ name ] . send . bind ( this . _socket [ name ] , hookedPacket ) )
2018-02-04 14:18:21 -08:00
}
2018-01-29 21:51:13 -08:00
}
2019-04-26 11:05:10 -07:00
if ( sends . length === 1 ) return await sends [ 0 ] ( )
2019-01-01 16:39:08 -08:00
return Promise . all (
sends . map ( send => {
2019-04-26 11:05:10 -07:00
return send ( )
2019-01-01 16:39:08 -08:00
} )
)
} else {
2019-04-26 11:05:10 -07:00
if ( this . _socket [ name ] ) {
if ( this . _socket [ name ] . beforeSend ) packet = await this . _socket [ name ] . beforeSend . call ( this , packet )
log . debug ( { msg : 'single socket hooked packet to send' , name : name , packet : packet , method : 'send' , line : 230 } )
return await this . _socket [ name ] . send ( packet )
}
2019-01-01 16:39:08 -08:00
else return { error : ` no consumer socket of name ${ name } ` }
2018-01-27 23:20:33 -08:00
}
2018-01-18 21:32:07 -08:00
}
2018-05-25 14:39:48 -07:00
async push ( packet ) {
2019-05-01 15:37:52 -07:00
// TODO set like send to accept a name
2019-01-01 16:39:08 -08:00
let broadcast = [ ]
2019-04-26 11:05:10 -07:00
for ( let name of Object . keys ( this . _socket ) ) {
if ( this . _socket [ name ] . type === 's' ) {
let hookedPacket = { }
hookedPacket = this . _socket [ name ] . beforeSend ? await this . _socket [ name ] . beforeSend . call ( this , Object . assign ( { } , packet ) , true ) : packet
log . debug ( { msg : 'hooked packet to push' , name : name , packet : hookedPacket , method : 'push' , line : 243 } )
broadcast . push ( this . _socket [ name ] . push . bind ( this . _socket [ name ] , hookedPacket ) )
2018-05-25 14:39:48 -07:00
}
}
2019-01-01 16:39:08 -08:00
return Promise . all (
broadcast . map ( push => {
2019-04-26 11:05:10 -07:00
return push ( )
2019-01-01 16:39:08 -08:00
} )
)
2018-05-25 14:39:48 -07:00
}
2019-05-01 15:37:52 -07:00
// TODO make push version of all this sends
// TODO accept alt transport string i.e. t or TCP
2019-01-01 16:39:08 -08:00
async sendTransport ( packet , transport ) {
2018-02-13 14:19:18 -08:00
let sends = [ ]
2019-04-26 11:05:10 -07:00
for ( let name of Object . keys ( this . _socket ) ) {
if ( this . _socket [ name ] . type === 'c' ) {
if ( this . _socket [ name ] . transport === transport ) {
sends . push ( this . _socket [ name ] . send . bind ( this . _socket [ name ] ) )
2018-02-13 14:19:18 -08:00
}
2018-02-06 18:30:00 -08:00
}
2018-02-05 22:05:38 -08:00
}
2018-02-13 14:19:18 -08:00
if ( sends . length === 1 ) return sends [ 0 ] ( packet )
2019-01-01 16:39:08 -08:00
return Promise . all (
sends . map ( send => {
return send ( packet )
} )
)
2018-02-05 22:05:38 -08:00
}
2019-04-26 11:05:10 -07:00
// async sendMQTT(topic, packet) {return this._socket.mqtt.send(topic, packet)}
2019-01-01 16:39:08 -08:00
async sendTCP ( packet ) {
return this . sendTransport ( packet , 't' )
}
2019-05-01 15:37:52 -07:00
// TODO change this to PIPE
2019-01-01 16:39:08 -08:00
async sendIPC ( packet ) {
return this . sendTransport ( packet , 'n' )
}
2018-02-05 22:05:38 -08:00
2019-05-01 15:37:52 -07:00
// TODO add sendMQTT, sendWS
2019-04-26 11:05:10 -07:00
socketsListen ( event , fn ) {
this . _eventListen ( 's' , event , fn )
}
consumersListen ( event , fn ) {
this . _eventListen ( 'c' , event , fn )
2019-01-01 16:39:08 -08:00
}
2018-02-04 14:18:21 -08:00
2019-04-26 11:05:10 -07:00
2018-02-13 14:19:18 -08:00
getPacketByName ( name , packets ) {
if ( ! packets . length ) packets = [ packets ]
let found = { }
2019-01-01 16:39:08 -08:00
packets . some ( ( packet , index , packets ) => {
2018-02-13 14:19:18 -08:00
if ( packet . _header . sender . name === name ) {
found = packets [ index ]
return true
}
} )
return found
}
2019-04-26 11:05:10 -07:00
// add set of functions to class prop/space and then register with this
addNamespace ( space , type , trans ) {
2019-09-13 19:05:22 -07:00
if ( type !== 'c' && type !== 's' ) {
2019-08-27 20:10:44 -07:00
trans = type
type = 's' }
trans = this . _validateTransport ( trans )
2019-04-26 11:05:10 -07:00
if ( trans ) return this . _namespaces [ type + trans ] . unshift ( space )
else return this . _namespaces [ type ] . unshift ( space )
2019-04-20 16:57:44 -07:00
}
2018-02-17 18:23:38 -08:00
// TODO confirm Object.assign will be ok as it is not a deep copy
2019-09-11 21:35:08 -07:00
// allow a single or arrary of single functions
2019-08-27 20:10:44 -07:00
amendCommands ( funcs , trans , type ) {
if ( ! trans && ! type ) type = 's'
if ( trans === 'c' || trans === 's' ) {
type = trans
trans = ''
2018-02-05 22:05:38 -08:00
}
2019-08-27 20:10:44 -07:00
trans = this . _validateTransport ( trans )
if ( ! this [ '_' + type + trans ] ) this [ '_' + type + trans ] = { }
Object . assign ( this [ '_' + type + trans ] , funcs ) // trans is type here
log . debug ( { msg : 'amended namespace' , default _key : '_' + type + trans , functions : this [ '_' + type + trans ] } )
}
amendConsumerCommands ( funcs , trans ) {
this . amendCommands ( funcs , trans , 'c' )
2018-01-29 21:51:13 -08:00
}
2019-08-27 20:10:44 -07:00
2019-04-26 11:05:10 -07:00
amendSocketCommands ( funcs , trans ) {
2019-08-27 20:10:44 -07:00
this . amendCommands ( funcs , trans )
2018-01-29 21:51:13 -08:00
}
2018-05-20 15:44:31 -07:00
2019-04-26 11:05:10 -07:00
// func should take and return a packet. if type
beforeSendHook ( func , opts ) {
this . _packetHook ( 'beforeSend' , func , opts )
2018-01-27 23:20:33 -08:00
}
2018-01-18 21:32:07 -08:00
2019-04-26 11:05:10 -07:00
beforeProcessHook ( func , opts ) {
this . _packetHook ( 'beforeProcess' , func , opts )
}
afterProcessHook ( func , opts ) {
this . _packetHook ( 'afterProcess' , func , opts )
}
// A Big Hammer - use only if necessary - default with hooks should suffice
// these three will pre-empt default processor to be called in ._packetProcess
2019-03-17 13:45:19 -07:00
2019-04-26 11:05:10 -07:00
// add and override default processor for ALL consumers, i.e. packets returning form a send or arrived from a push
2018-02-05 22:05:38 -08:00
consumersProcessor ( func ) {
2019-04-26 11:05:10 -07:00
for ( let name of Object . keys ( this . _socket ) ) {
if ( this . _socket [ name ] . type === 'c' ) {
this . altProcessor ( func , name )
2018-02-05 22:05:38 -08:00
}
}
}
2018-02-04 14:18:21 -08:00
2019-04-26 11:05:10 -07:00
// add and override default processor for ALL sockets, i.e. packets coming in from consumer send
2018-02-05 22:05:38 -08:00
socketsProcessor ( func ) {
2019-04-26 11:05:10 -07:00
for ( let name of Object . keys ( this . _socket ) ) {
if ( this . _socket [ name ] . type === 's' ) {
this . altProcessor ( func , name )
2018-02-05 22:05:38 -08:00
}
}
2018-01-30 21:12:38 -08:00
}
2019-04-26 11:05:10 -07:00
// add and override a processor for a particular socket/consumer to list of processors
// if no socket name given it will replace the default processor in _processors from processing.js
altProcessor ( func , socket _name ) {
2018-02-05 22:05:38 -08:00
socket _name = socket _name || '_default'
2019-04-26 11:05:10 -07:00
this . _processors [ socket _name ] = func
2018-02-06 18:30:00 -08:00
}
2019-03-17 13:45:19 -07:00
//======================================================
2018-02-04 14:18:21 -08:00
/ *
*
* Private Methods
*
* /
2019-08-27 20:10:44 -07:00
_packetHook ( hook , func , opts ) {
log . debug ( { msg : 'hooking a socket(s)' , method : '_packetHook' , line : 334 , hook : hook , function : func , options : opts } )
let { name , type , trans , all } = opts
if ( name ) this . _socket [ name ] [ hook ] = func
else {
log . debug ( { msg : 'sockets available to hook' , method : '_packetHook' , line : 338 , sockets : Object . keys ( this . _socket ) } )
for ( let name of Object . keys ( this . _socket ) ) {
if ( this . _socket [ name ] . type === type ) this . _socket [ name ] [ hook ] = func
if ( this . _socket [ name ] . transport === trans ) this . _socket [ name ] [ hook ] = func
if ( all ) this . _socket [ name ] [ hook ] = func
if ( this . _socket [ name ] [ hook ] ) log . debug ( { msg : 'hooked socket' , method : '_packetHook' , line : 343 , name : name , type : this . _socket [ name ] . type , trans : this . _socket [ name ] . transport } )
}
}
}
2019-04-26 11:05:10 -07:00
/ *
* * * * * * * * * * default packet processor for all sockets
* this can be hooked or replaced all together
* /
async _packetProcess ( socket _name , packet ) {
log . debug ( { socket : socket _name , packet : packet , method : '_packetProcess' , line : 393 , msg : 'processing incoming packet' } )
2019-09-11 21:35:08 -07:00
let header = packet . _header ? packet . _header : { } // retain header
let err , res
if ( this . _socket [ socket _name ] . beforeProcess ) {
[ err , res ] = await to ( this . _socket [ socket _name ] . beforeProcess . call ( this , packet ) )
if ( err ) { // hook has forced an abort to processing
packet . error = err
return packet
}
packet = res
}
2019-04-26 11:05:10 -07:00
// the processor can be set via the incoming packet
// otherwise if one is set on the socket or the default found in processing.js
2019-09-11 21:35:08 -07:00
// TODO Try each "available" packet processor in some order if fails try next one before trying the default
2019-04-26 11:05:10 -07:00
let processor = packet . _processor || this . _processors [ socket _name ] ? socket _name : '_default'
2019-09-11 21:35:08 -07:00
res = ( await this . _processors [ processor ] . call ( this , packet , socket _name ) ) || packet // processor didn't return a packet then return the packet sent
2019-08-15 13:30:56 -07:00
log . debug ( { socket : socket _name , response : res , msg : 'processed packet ready for hook' } )
2019-04-26 11:05:10 -07:00
if ( this . _socket [ socket _name ] . afterProcess ) res = await this . _socket [ socket _name ] . afterProcess . call ( this , res )
2019-08-15 13:30:56 -07:00
log . debug ( { socket : socket _name , response : res , msg : 'packet after hook complete ready for return' } )
2019-09-11 21:35:08 -07:00
res . _header = Object . assign ( header , res . _header ) // re-apply header in case hooks or processor mangled or removed it
2019-04-26 11:05:10 -07:00
return res
}
2019-08-29 13:41:32 -07:00
_initSocket ( name ) {
2019-04-26 11:05:10 -07:00
let socket = this . _socket [ name ]
2019-01-01 16:39:08 -08:00
let init = { }
2019-04-26 11:05:10 -07:00
if ( this . _socket [ name ] . type === 's' && this . _socket [ name ] . transport !== 'm' ) {
2019-01-01 16:39:08 -08:00
init = socket . create
} else {
init = socket . connect
}
2019-08-29 13:41:32 -07:00
log . info ( { msg : ` initializing socket ${ name } , ${ socket . type } , ${ socket . transport } ` } )
if ( this . _started ) {
return init ( ) . then ( function ( res ) {
return ` socket ${ name } added and initialzed, ${ res } `
} )
. catch ( function ( err ) {
2019-09-13 19:05:22 -07:00
this . emit ( 'status' , { level : 'fatal' , msg : 'failed initialization' , error : err , socket : socket , code : 'SOCKET_INIT' } )
2019-08-29 13:41:32 -07:00
return { msg : ` socket ${ name } failed initialization ` , error : err }
} . bind ( this )
)
}
else return { name : name , init : init }
2019-01-01 16:39:08 -08:00
}
2019-04-26 11:05:10 -07:00
// all sockets are emitters. Adds a listener to all sockets of a type with given event.
// now sockets can emit locally processed events
2019-04-20 16:57:44 -07:00
_eventListen ( type , event , fn ) {
2019-04-26 11:05:10 -07:00
for ( let name of Object . keys ( this . _socket ) ) {
if ( this . _socket [ name ] . type === type ) {
if ( fn === 'stop' ) this . _socket [ name ] . removeAllListeners ( event )
2019-04-20 16:57:44 -07:00
else {
2019-04-26 11:05:10 -07:00
log . debug ( { socket : name , type : type , event : event , msg : 'adding listener to socket' } )
this . _socket [ name ] . on ( event , fn )
2019-04-20 16:57:44 -07:00
}
}
}
}
2019-08-27 20:10:44 -07:00
_validateTransport ( trans , type = 's' ) {
const valids = {
w : 'w' ,
web : 'w' ,
n : 'n' ,
named : 'n' ,
unix : 'n' ,
pipe : 'n' ,
t : 't' ,
tcp : 't' ,
net : 't' ,
network : 't' ,
m : 'm' ,
mqtt : 'm' ,
}
trans = valids [ trans ] || ''
if ( type !== 'c' && trans === 'w' ) trans = ''
return trans
}
2019-01-01 16:39:08 -08:00
_transport ( name ) {
2019-04-26 11:05:10 -07:00
return this . _socket [ name ] . transport
2019-01-01 16:39:08 -08:00
} //getter for socket transport
_type ( name ) {
2019-04-26 11:05:10 -07:00
return this . _socket [ name ] . type
2019-01-01 16:39:08 -08:00
} //getter for socket type
2018-02-04 14:18:21 -08:00
2018-02-06 18:30:00 -08:00
_getTransportNamespaces ( socket ) {
2019-01-01 16:39:08 -08:00
return this . _namespace [ this . _type ( socket ) + this . _transport ( socket ) ]
2018-02-06 18:30:00 -08:00
}
2019-01-01 16:39:08 -08:00
_getCmdFuncNamespace ( cmd , namespaces ) {
2018-02-06 18:30:00 -08:00
let cmd _func = null
2019-01-01 16:39:08 -08:00
namespaces . some ( namespace => {
namespace = namespace ? namespace + '.' + cmd : cmd
2018-02-06 18:30:00 -08:00
cmd _func = this . _getCmdFunc ( namespace )
if ( cmd _func ) return true
} )
return cmd _func
}
2019-08-27 20:10:44 -07:00
// takes command and returns corresponding function in a hash, recurisve walk
2019-01-01 16:39:08 -08:00
_getCmdFunc ( cmd , obj ) {
if ( typeof cmd === 'string' ) {
2018-02-06 18:30:00 -08:00
if ( ! obj ) obj = this
2019-01-01 16:39:08 -08:00
cmd = cmd . split ( /[.:/]+/ )
2018-02-06 18:30:00 -08:00
}
2019-01-01 16:39:08 -08:00
var prop = cmd . shift ( )
2018-02-06 18:30:00 -08:00
if ( cmd . length === 0 ) return obj [ prop ]
2019-01-01 16:39:08 -08:00
if ( ! obj [ prop ] ) return null
2019-04-26 11:05:10 -07:00
log . debug ( { length : cmd . length , cmd : cmd , prop : prop , objprop : obj [ prop ] , method : '_getCmdFunc' , msg : 'command to corresponding function in a hash' } )
2018-02-06 18:30:00 -08:00
return this . _getCmdFunc ( cmd , obj [ prop ] )
}
2019-04-26 11:05:10 -07:00
// primary function to find a function to call based on packet cmd
2019-01-01 16:39:08 -08:00
async _callCmdFunc ( packet , socket ) {
let cmd _func = this . _getCmdFuncNamespace (
packet . cmd ,
this . _namespaces [ this . _type ( socket ) + this . _transport ( socket ) ]
)
2019-03-17 13:45:19 -07:00
if ( cmd _func ) return await cmd _func . bind ( this ) ( packet ) // todo try .call
2019-01-01 16:39:08 -08:00
cmd _func = this . _getCmdFuncNamespace (
packet . cmd ,
this . _namespaces [ this . _type ( socket ) ]
)
2018-02-06 18:30:00 -08:00
if ( cmd _func ) return await cmd _func . bind ( this ) ( packet )
return 'failed'
}
2019-01-01 16:39:08 -08:00
} // end Base Class
2018-02-04 14:18:21 -08:00
2019-01-01 16:39:08 -08:00
export default Base