2019-01-01 16:39:08 -08:00
// Direct External Dependencies
// none
2018-05-16 11:17:38 -07:00
2019-01-01 16:39:08 -08:00
// UCI dependencies
// UCI communication transport communication modules
// TODO change to dynamic import so loads only if that socket type is requestd
import Socket from '@uci/socket' // tcp or named pipe
import MQTT from '@uci/mqtt' // requires broker
import WebSocket from '@uci/websocket' // server only - client is for web browser only
// UCI helpers
import { bindFuncs } from '@uci/utils/src/function'
// UCI logger
import logger from '@uci/logger'
let log = { } // declare module wide log to be set during construction
// Community dependencies
2018-01-18 21:32:07 -08:00
import EventEmitter from 'events'
2018-05-20 15:44:31 -07:00
import pSettle from 'p-settle'
2018-01-27 23:20:33 -08:00
2019-01-01 16:39:08 -08:00
// Internal dependencies
2018-05-16 07:08:14 -07:00
import { processor , commands , namespaces } from './processing'
2018-02-05 22:05:38 -08:00
2019-01-01 16:39:08 -08:00
// Useful Constants
const SOCKET _INFO _KEYS = [ 'name' , 'type' , 'transport' ]
const TRANSLATE = {
n : 'Named Pipe' ,
t : 'TCP' ,
s : 'Socket' ,
c : 'Consumer' ,
m : 'MQTT' ,
w : 'Web Socket'
}
/ * *
* @ class Base
* @ description
* An inter - process inter - machine multi socket communication class . < / b r >
* It is extended to derive many other UCI classes thus the name "Base" < / b r >
* The class itself is extended from the { @ link https : //nodejs.org/api/events.html#events_class_eventemitter nodejs EventEmitter } to support in process communication as well
*
* @ extends EventEmitter
* @ param { Object } opts hash of options
* @ param { String } [ opts . id = 'ucit-base' + timestamp ] and id for this process / instance used for logging , default : uci - base + timestamp
* @ param { String } [ opts . desc ] additional description for humans , can be logged .
* @ param { String | Boolean } [ opts . path = either full path to where socket should be created or 'true' ( which uses default path ]
* @ param { string } [ opts . sockets ] comma delimited strong of sockets to be created each socket of the form [ name ] # [ c or s ] > [ n , t , m or w ]
* @ param { Boolean } [ opts . useRootNS = false ] include the "root" of the created base instance as part of the namespace for which packet cmd ' s try to call corresponding function
* @ param { Object } [ opts . ( name of socket ) ] options for a particular socket created by opts . socket . see UCI guide { @ link ? content = guide # sockets | creating sockets }
* @ property { array } socket collection of all sockets created and used by base instance as per opts . sockets or addSocket method
*
* @ example
* import Base from '@uci/base'
* // options object example, credated is a socket of each transport using their defaults
* // and a tcp consumer socket that connects to a tcp socket on another host 'somehost' at port 8888
* const opts = id : mybase , sockets : 'us#s>n,tc#c>t,ts#s>t,mqtts#s>m,webs#s>w' , tc : { host : 'somehost' , port : 8888 } }
* let mybaseprocess = new Base ( opts )
* /
class Base extends EventEmitter {
constructor ( opts = { } ) {
2018-01-18 21:32:07 -08:00
super ( )
2019-01-01 16:39:08 -08:00
this . id = opts . id || opts . name || 'uci-base:' + new Date ( ) . getTime ( )
log = logger ( { name : 'base' , id : this . id } )
2018-01-18 21:32:07 -08:00
this . desc = opts . desc // additional details for humans
2019-01-01 16:39:08 -08:00
this . socket = { } // holds all the various communication sockets
this . _started = false // flag to know when instance has been initialized
2018-02-05 22:05:38 -08:00
this . _processors = { _default : processor }
this . _defaultCmds = commands
2018-02-06 18:30:00 -08:00
this . _namespaces = namespaces
2019-01-01 16:39:08 -08:00
if ( opts . useRootNS ) {
// add root of instance to checking for command functions - not recommended!
this . _namespaces . s . splice ( - 1 , 0 , null )
this . _namespaces . c . splice ( - 1 , 0 , null )
2018-05-25 14:39:48 -07:00
}
2018-05-20 15:44:31 -07:00
this . bindFuncs = bindFuncs
// predefined sockets:
// comma delimited list of this form '<name>#<c/p/s>><n=np/t=tcp/m=mqtt/w=web>'
this . socket = { }
2019-01-01 16:39:08 -08:00
if ( opts . sockets ) {
opts . sockets . split ( /[,|\s]+/ ) . forEach ( socketStr => {
2018-03-02 08:35:25 -08:00
let socket = { }
2019-01-01 16:39:08 -08:00
socketStr . split ( /[>#]+/ ) . map ( function ( prop , index ) {
2018-03-02 08:35:25 -08:00
socket [ SOCKET _INFO _KEYS [ index ] ] = prop
} )
2019-01-01 16:39:08 -08:00
this . addSocket (
socket . name ,
socket . type ,
socket . transport ,
opts [ socket . name ]
)
2018-02-02 14:09:52 -08:00
} )
2018-04-05 15:35:29 -07:00
}
2018-01-18 21:32:07 -08:00
} // end constructor
2019-01-01 16:39:08 -08:00
/ *
* -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --
* CLASS METHODS
* -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --
* /
// PUBLIC METHODS
2018-05-16 11:17:38 -07:00
2019-01-01 16:39:08 -08:00
/ * *
* initialize the instance with the set options . This must be called to initialize all sockets and connections
* @ async
* @ public
* @ required
* /
async init ( ) {
2018-01-30 21:12:38 -08:00
let sockets = [ ]
2018-05-20 15:44:31 -07:00
let initSockets = [ ]
2019-01-01 16:39:08 -08:00
for ( let name of Object . keys ( this . socket ) ) {
initSockets . push ( this . _initSocket ( name ) )
2018-05-20 15:44:31 -07:00
sockets . push ( name )
2018-01-27 23:20:33 -08:00
}
2019-01-01 16:39:08 -08:00
return pSettle ( initSockets ) . then ( res => {
log . info (
{ sockets : res } ,
'response from intializing sockets via instance options'
)
2018-05-20 15:44:31 -07:00
let err = [ ]
2019-01-01 16:39:08 -08:00
res . forEach ( ( p , index ) => {
2018-05-20 15:44:31 -07:00
if ( p . isRejected ) {
2019-01-01 16:39:08 -08:00
err . push ( { name : sockets [ index ] , err : p . reason } )
2018-05-20 15:44:31 -07:00
}
} )
2019-01-01 16:39:08 -08:00
this . _started = true
2018-05-20 15:44:31 -07:00
return err
2018-05-25 14:39:48 -07:00
// TODO if a websocket server was working then push status
// TODO if no mqtt broker then attempt to start one
2018-05-20 15:44:31 -07:00
} )
2018-01-27 23:20:33 -08:00
} // init
2019-01-01 16:39:08 -08:00
/ * *
* addSocket - Add a socket at runtime as opposed to via the sockets option at creation
*
* @ param { type } name Name of socket ( usually something short but unique )
* @ param { string } [ type = c ] consumer / client 'c' or socket / server 's'
* @ param { string } [ transport = n ] transport : ( n ) named pipe / unix socket , ( t ) tcp socket , ( m ) mqtt subscribe , ( w ) websocket
* @ param { object } [ options = { } ] options for that particular type / transport of socket ( i . e . path , host , port , etc )
*
* @ returns { string } Description
* /
async addSocket ( name , type = 'c' , transport = 'n' , options = { } ) {
log . info (
{ socketName : name , type : type , tranport : transport , options : options } ,
` adding socket ${ name } `
)
options . id = this . id + ':' + name
2018-05-20 15:44:31 -07:00
switch ( transport ) {
case 'n' :
options . path = options . path || true
2019-01-01 16:39:08 -08:00
// falls through
2018-05-20 15:44:31 -07:00
case 't' :
this . socket [ name ] = new Socket [ TRANSLATE [ type ] ] ( options )
break
case 'm' :
2019-01-01 16:39:08 -08:00
if ( type === 'p' ) type = 'c'
2018-05-20 15:44:31 -07:00
options . connect = options . connect || { }
options . connect . connectTimeout = options . connect . connectTimeout || 5000
2019-01-01 16:39:08 -08:00
this . socket [ name ] = new MQTT ( options )
2018-05-20 15:44:31 -07:00
break
case 'w' :
2019-01-01 16:39:08 -08:00
if ( type === 's' ) this . socket [ name ] = new WebSocket ( options )
else
log . warn (
{ name : name , type : type , transport : transport } ,
'Web socket not created Consumer/Client Web Socket not supported'
)
2018-05-20 15:44:31 -07:00
}
this . socket [ name ] . name = name
this . socket [ name ] . type = type
this . socket [ name ] . transport = transport
2019-01-01 16:39:08 -08:00
this . socket [ name ] . _packetProcess = this . _packetProcess . bind ( this , name )
if ( this . _started ) log . info ( await this . _initSocket ( name ) )
2018-05-20 15:44:31 -07:00
else return ` socket ${ name } added `
2018-05-16 11:17:38 -07:00
}
2019-01-01 16:39:08 -08:00
/ * *
* removeSocket - TODO not available
*
* @ param { string } name name of socket as created
* @ returns { String | Object } success string or error object
* /
async removeSocket ( name ) {
2018-05-20 15:44:31 -07:00
//TODO
2018-05-16 11:17:38 -07:00
}
2018-03-02 08:35:25 -08:00
2019-01-01 16:39:08 -08:00
/ * *
* send - Description
*
* @ param { String } name name of socket for send ( must be a consumer otherwise use push for server )
* @ param { Object } packet
*
* @ returns { type } Description
* /
async send ( name , packet ) {
2018-02-13 22:31:02 -08:00
if ( typeof name !== 'string' ) {
packet = name
2018-02-13 14:19:18 -08:00
let sends = [ ]
2019-01-01 16:39:08 -08:00
for ( let name of Object . keys ( this . socket ) ) {
if ( this . socket [ name ] . type === 'c' ) {
2018-02-13 14:19:18 -08:00
sends . push ( this . socket [ name ] . send . bind ( this . socket [ name ] ) )
2018-02-04 14:18:21 -08:00
}
2018-01-29 21:51:13 -08:00
}
2018-02-13 14:19:18 -08:00
if ( sends . length === 1 ) return sends [ 0 ] ( packet )
2019-01-01 16:39:08 -08:00
return Promise . all (
sends . map ( send => {
return send ( packet )
} )
)
} else {
2018-02-13 22:31:02 -08:00
if ( this . socket [ name ] ) return await this . socket [ name ] . send ( packet )
2019-01-01 16:39:08 -08:00
else return { error : ` no consumer socket of name ${ name } ` }
2018-01-27 23:20:33 -08:00
}
2018-01-18 21:32:07 -08:00
}
2018-05-25 14:39:48 -07:00
async push ( packet ) {
2019-01-01 16:39:08 -08:00
let broadcast = [ ]
for ( let name of Object . keys ( this . socket ) ) {
if ( this . socket [ name ] . type === 's' ) {
2018-05-25 14:39:48 -07:00
broadcast . push ( this . socket [ name ] . push . bind ( this . socket [ name ] ) )
}
}
2019-01-01 16:39:08 -08:00
return Promise . all (
broadcast . map ( push => {
return push ( packet )
} )
)
2018-05-25 14:39:48 -07:00
}
2019-01-01 16:39:08 -08:00
async sendTransport ( packet , transport ) {
2018-02-13 14:19:18 -08:00
let sends = [ ]
2019-01-01 16:39:08 -08:00
for ( let name of Object . keys ( this . socket ) ) {
if ( this . socket [ name ] . type === 'c' ) {
2018-02-13 14:19:18 -08:00
if ( this . socket [ name ] . transport === transport ) {
sends . push ( this . socket [ name ] . send . bind ( this . socket [ name ] ) )
}
2018-02-06 18:30:00 -08:00
}
2018-02-05 22:05:38 -08:00
}
2018-02-13 14:19:18 -08:00
if ( sends . length === 1 ) return sends [ 0 ] ( packet )
2019-01-01 16:39:08 -08:00
return Promise . all (
sends . map ( send => {
return send ( packet )
} )
)
2018-02-05 22:05:38 -08:00
}
2018-05-20 15:44:31 -07:00
// async sendMQTT(topic, packet) {return this.socket.mqtt.send(topic, packet)}
2019-01-01 16:39:08 -08:00
async sendTCP ( packet ) {
return this . sendTransport ( packet , 't' )
}
async sendIPC ( packet ) {
return this . sendTransport ( packet , 'n' )
}
2018-02-05 22:05:38 -08:00
2019-01-01 16:39:08 -08:00
getSocket ( name ) {
return this . socket [ name ]
}
2018-02-04 14:18:21 -08:00
2018-02-13 14:19:18 -08:00
getPacketByName ( name , packets ) {
if ( ! packets . length ) packets = [ packets ]
let found = { }
2019-01-01 16:39:08 -08:00
packets . some ( ( packet , index , packets ) => {
2018-02-13 14:19:18 -08:00
if ( packet . _header . sender . name === name ) {
found = packets [ index ]
return true
}
} )
return found
}
2018-02-17 18:23:38 -08:00
// TODO confirm Object.assign will be ok as it is not a deep copy
2019-01-01 16:39:08 -08:00
amendConsumerProcessing ( funcs , trans ) {
2018-02-05 22:05:38 -08:00
if ( trans ) {
2019-01-01 16:39:08 -08:00
if ( ! this . _defaultCmds . c [ trans ] ) this . _defaultCmds . c [ trans ] = { }
Object . assign ( this . _defaultCmds . c [ trans ] , funcs )
2018-02-05 22:05:38 -08:00
}
2019-01-01 16:39:08 -08:00
Object . assign ( this . _defaultCmds . c , funcs )
2018-01-29 21:51:13 -08:00
}
2019-01-01 16:39:08 -08:00
amendSocketProcessing ( funcs , trans ) {
2018-02-04 14:18:21 -08:00
if ( trans ) {
2018-05-25 14:39:48 -07:00
if ( ! this . _defaultCmds . s [ trans ] ) this . _defaultCmds . s [ trans ] = { }
2019-01-01 16:39:08 -08:00
Object . assign ( this . _defaultCmds . s [ trans ] , funcs )
2018-01-30 21:12:38 -08:00
}
2019-01-01 16:39:08 -08:00
Object . assign ( this . _defaultCmds . s , funcs )
2018-01-29 21:51:13 -08:00
}
2018-05-20 15:44:31 -07:00
2018-02-05 22:05:38 -08:00
// use s: and c: keys TODO need to change this
2019-01-01 16:39:08 -08:00
addNamedProcessing ( name , funcs , type ) {
if ( type ) {
if ( ! this . _cmds [ name ] [ type ] ) this . _cmds [ name ] [ type ] = { }
Object . assign ( this . _cmds [ name ] [ type ] , funcs )
2018-02-05 22:05:38 -08:00
} else {
2019-01-01 16:39:08 -08:00
if ( ! this . _cmds [ name ] ) this . _cmds [ name ] = { }
Object . assign ( this . _cmds [ name ] , funcs )
2018-02-05 22:05:38 -08:00
}
2018-01-27 23:20:33 -08:00
}
2018-01-18 21:32:07 -08:00
2018-05-25 14:39:48 -07:00
// func should take and return a packet
// beforeSendHook (func,type,transport){} // TODO
// afterReceiveHook(func,type,transport){} // TODO
// afterProcessHook(func,type,transport){} // TODO
2018-02-04 14:18:21 -08:00
2018-02-05 22:05:38 -08:00
// here you can add namespaced functions for packet commands
consumersProcessor ( func ) {
2019-01-01 16:39:08 -08:00
for ( let name of Object . keys ( this . socket ) ) {
if ( this . socket [ name ] . type === 'c' ) {
this . socketNameProcessor ( func , name )
2018-02-05 22:05:38 -08:00
}
}
}
2018-02-04 14:18:21 -08:00
2018-02-05 22:05:38 -08:00
socketsProcessor ( func ) {
2019-01-01 16:39:08 -08:00
for ( let name of Object . keys ( this . socket ) ) {
if ( this . socket [ name ] . type === 's' ) {
this . socketNamedProcessor ( func , name )
2018-02-05 22:05:38 -08:00
}
}
2018-01-30 21:12:38 -08:00
}
2019-01-01 16:39:08 -08:00
socketNameProcessor ( func , socket _name ) {
2018-02-05 22:05:38 -08:00
socket _name = socket _name || '_default'
this . _processors [ socket _name ] . _process = func
2018-01-30 21:12:38 -08:00
}
2019-01-01 16:39:08 -08:00
addNamespace ( space , type , trans ) {
if ( trans ) return this . _namespaces [ type + trans ] . unshift ( space )
2018-02-06 18:30:00 -08:00
else return this . _namespaces [ type ] . unshift ( space )
}
2018-02-04 14:18:21 -08:00
/ *
*
* Private Methods
*
* /
2019-01-01 16:39:08 -08:00
async _initSocket ( name ) {
let socket = this . socket [ name ]
let init = { }
if ( this . socket [ name ] . type === 's' && this . socket [ name ] . transport !== 'm' ) {
init = socket . create
} else {
init = socket . connect
}
log . info ( ` initializing socket ${ name } , ${ socket . type } , ${ socket . transport } ` )
if ( this . _started )
return ` socket ${ name } added and initialzed, ${ await init ( ) } `
else return init ( )
}
_transport ( name ) {
return this . socket [ name ] . transport
} //getter for socket transport
_type ( name ) {
return this . socket [ name ] . type
} //getter for socket type
2018-02-04 14:18:21 -08:00
2018-02-06 18:30:00 -08:00
_getTransportNamespaces ( socket ) {
2019-01-01 16:39:08 -08:00
return this . _namespace [ this . _type ( socket ) + this . _transport ( socket ) ]
2018-02-06 18:30:00 -08:00
}
2019-01-01 16:39:08 -08:00
_getCmdFuncNamespace ( cmd , namespaces ) {
2018-02-06 18:30:00 -08:00
let cmd _func = null
2019-01-01 16:39:08 -08:00
namespaces . some ( namespace => {
namespace = namespace ? namespace + '.' + cmd : cmd
2018-02-06 18:30:00 -08:00
cmd _func = this . _getCmdFunc ( namespace )
if ( cmd _func ) return true
} )
return cmd _func
}
2018-05-20 15:44:31 -07:00
// takes command and returns corresponding function in a hash
2019-01-01 16:39:08 -08:00
_getCmdFunc ( cmd , obj ) {
2018-02-06 18:30:00 -08:00
// console.log('obj',obj)
2019-01-01 16:39:08 -08:00
if ( typeof cmd === 'string' ) {
2018-02-06 18:30:00 -08:00
if ( ! obj ) obj = this
2019-01-01 16:39:08 -08:00
cmd = cmd . split ( /[.:/]+/ )
2018-02-06 18:30:00 -08:00
// console.log('===================',cmd)
}
2019-01-01 16:39:08 -08:00
var prop = cmd . shift ( )
2018-02-06 18:30:00 -08:00
if ( cmd . length === 0 ) return obj [ prop ]
2019-01-01 16:39:08 -08:00
if ( ! obj [ prop ] ) return null
2018-02-06 18:30:00 -08:00
// console.log(cmd.length,cmd,prop, obj[prop])
return this . _getCmdFunc ( cmd , obj [ prop ] )
}
2019-01-01 16:39:08 -08:00
async _callCmdFunc ( packet , socket ) {
let cmd _func = this . _getCmdFuncNamespace (
packet . cmd ,
this . _namespaces [ this . _type ( socket ) + this . _transport ( socket ) ]
)
2018-02-06 18:30:00 -08:00
if ( cmd _func ) return await cmd _func . bind ( this ) ( packet )
2019-01-01 16:39:08 -08:00
cmd _func = this . _getCmdFuncNamespace (
packet . cmd ,
this . _namespaces [ this . _type ( socket ) ]
)
2018-02-06 18:30:00 -08:00
if ( cmd _func ) return await cmd _func . bind ( this ) ( packet )
return 'failed'
}
2018-02-05 22:05:38 -08:00
/ *
* * * * * * * * * * default packet processor for all sockets
* /
2019-01-01 16:39:08 -08:00
async _packetProcess ( socket _name , packet ) {
2018-02-13 14:19:18 -08:00
// console.log(socket_name,packet)
2019-01-01 16:39:08 -08:00
let processor =
packet . _processor || this . _processors [ socket _name ] || '_default'
return await this . _processors [ processor ] . bind ( this ) (
packet ,
socket _name ,
this . _processors [ processor ]
)
2018-02-04 14:18:21 -08:00
}
2019-01-01 16:39:08 -08:00
} // end Base Class
2018-02-04 14:18:21 -08:00
2019-01-01 16:39:08 -08:00
export default Base