2019-01-30 20:14:00 -08:00
// node modules
import { unlink as fileDelete } from 'fs'
import { promisify } from 'util'
import path from 'path'
// npmjs modules
import mkdir from 'make-dir'
import btc from 'better-try-catch'
2020-02-10 21:38:18 -08:00
import pReflect from 'p-reflect'
2019-01-30 20:14:00 -08:00
import _ON _DEATH from 'death' //this is intentionally ugly
import JSONStream from './json-stream'
import clone from 'clone'
// uci modules
2019-02-14 14:01:14 -08:00
import logger from '@uci-utils/logger'
2019-01-30 20:14:00 -08:00
let log = { } // must declare here and set later for module wide access
export default function socketClass ( Server ) {
// TODO change default pipe dir depending on OS linux,windows,mac
/ * * @ c o n s t a n t { S t r i n g } D E F A U L T _ P I P E _ D I R
* @ description SOCKETS _DIR environment variable or '/tmp/UCI'
* /
const DEFAULT _PIPE _DIR = process . env . SOCKETS _DIR || '/tmp/UCI'
/ * * @ c o n s t a n t { S t r i n g } D E F A U L T _ S O C K E T _ N A M E
* @ description for named pipe 'uci-sock' if not set in options * /
const DEFAULT _SOCKET _NAME = 'uci-sock'
/ * *
* UCI Socket - class used to create a socket ( server ) that supports passing json packets
* supports both named pipes and tcp sockets
* also supports push of packets to all connected consumers ( clients )
* is extended from { @ link https : //nodejs.org/api/net.html#net_class_net_server | nodejs net.Server }
* @ extends Server
* /
return class Socket extends Server {
/ * *
* UCI Socket class constructor
* @ param { Object } opts hash of options
* @ param { String } options . host a tcp host name nornally not used as 0.0 . 0.0 is set by default
* @ param { String } options . port a tcp
* @ param { String | Boolean } options . path xeither full path to where socket should be created or if just 'true' then use default
2020-01-06 23:01:34 -08:00
* @ param { Boolean } options . consumerTracking track connected consumers for push notifications - default : true
2019-01-30 20:14:00 -08:00
* @ param { Object } options . conPacket A json operson ' s property
*
* /
constructor ( opts = { } ) {
super ( opts )
delete opts . key
delete opts . cert
2020-02-10 21:38:18 -08:00
this . name = opts . name
2019-01-30 20:14:00 -08:00
this . id = opts . id || opts . name || 'socket:' + new Date ( ) . getTime ( )
if ( ! opts . path ) {
opts . host = opts . host || '0.0.0.0'
opts . port = opts . port || 8080
} else {
if ( typeof opts . path === 'boolean' )
opts . path = path . join ( DEFAULT _PIPE _DIR , DEFAULT _SOCKET _NAME )
2019-03-17 13:51:36 -07:00
if ( path . dirname ( opts . path ) === '.' ) // relative path sent
2019-01-30 20:14:00 -08:00
opts . path = path . join ( DEFAULT _PIPE _DIR , opts . path )
}
2019-09-13 18:59:22 -07:00
this . defaultReturnCmd = opts . defaultReturnCmd
2019-08-28 09:02:12 -07:00
this . allowAnonymous = ( ! opts . tokens || ! ! process . env . UCI _ANON || opts . allowAnonymous ) ? true : false
2019-08-23 15:48:39 -07:00
this . tokens = opts . tokens || [ ]
2019-08-19 17:57:10 -07:00
this . keepAlive = 'keepAlive' in opts ? opts . keepAlive : true
2020-01-06 23:01:34 -08:00
this . pingInterval = opts . pingInterval === false ? 0 : ( opts . pingInterval * 1000 || 5000 )
this . consumers = new Map ( ) // track consumers (i.e. clients) TODO use a Map
this . nextConsumerID = 0 // incrementer for default initial consumer ID
2020-03-15 14:32:35 -07:00
this . conPackets = opts . conPackets || [ opts . conPacket ]
2019-01-30 20:14:00 -08:00
this . opts = opts // for use to recover from selected errors
2019-09-08 19:49:05 -07:00
this . errorCount = 0
2019-01-30 20:14:00 -08:00
//self bindings
this . create = this . create . bind ( this )
2020-01-06 23:01:34 -08:00
this . authenticateConsumer = this . authenticateConsumer . bind ( this )
2019-08-23 15:48:39 -07:00
this . _authenticate = this . _authenticate . bind ( this )
2019-03-17 13:51:36 -07:00
this . close = promisify ( this . close ) . bind ( this )
2019-01-30 20:14:00 -08:00
log = logger ( {
2019-11-21 09:35:09 -08:00
package : '@uci/socket' ,
2019-01-30 20:14:00 -08:00
file : 'src/socket.js' ,
class : 'Socket' ,
id : this . id
} )
} // end constructor
2019-11-21 09:35:09 -08:00
get active ( ) { return this . listening }
2019-01-30 20:14:00 -08:00
/ * *
* create - Description
*
* @ returns { type } Description
* /
async create ( ) {
2020-01-14 13:38:24 -08:00
this . emit ( 'socket' , { state : 'creating' , msg : 'creating socket for consumers to connect' } )
2019-01-30 20:14:00 -08:00
return new Promise ( async ( resolve , reject ) => {
_ON _DEATH ( async ( ) => {
2019-04-26 10:14:57 -07:00
log . error ( { method : 'create' , line : 84 , msg : '\nhe\'s dead jim' } )
2019-01-30 20:14:00 -08:00
await this . _destroy ( )
} )
process . once ( 'SIGUSR2' , async ( ) => {
2019-08-19 17:57:10 -07:00
await this . _destroy ( )
2019-01-30 20:14:00 -08:00
process . kill ( process . pid , 'SIGUSR2' )
} )
this . once ( 'error' , async err => {
// recover from socket file that was not removed
if ( err . code === 'EADDRINUSE' ) {
if ( this . opts . path ) {
// if TCP socket should already be dead
let [ err , res ] = await btc ( promisify ( fileDelete ) ) ( this . opts . path )
if ( ! err ) {
2019-09-08 19:49:05 -07:00
log . info ( { method : 'create' , line : 99 , res : res , socket : this . opts . path , msg : 'socket already exists.....deleted' } )
// try again
this . removeAllListeners ( 'listening' )
2020-03-15 14:32:35 -07:00
resolve ( await this . create ( ) )
2019-01-30 20:14:00 -08:00
}
2019-04-26 10:14:57 -07:00
log . error ( { method : 'create' , line : 102 , err : err , msg : 'error deleting socket. Can not establish a socket' } )
2019-01-30 20:14:00 -08:00
}
}
if ( err . code === 'EACCES' ) {
2019-04-26 10:14:57 -07:00
log . debug ( { method : 'create' , line : 107 , socket : this . opts . path , msg : 'directory does not exist...creating' } )
2019-01-30 20:14:00 -08:00
await mkdir ( path . dirname ( this . opts . path ) )
2019-04-26 10:14:57 -07:00
log . debug ( { method : 'create' , line : 109 , socket : this . opts . path , msg : 'directory created' } )
2019-09-08 19:49:05 -07:00
this . removeAllListeners ( 'listening' )
2020-03-15 14:32:35 -07:00
resolve ( await this . create ( ) )
2019-01-30 20:14:00 -08:00
}
// otherwise fatally exit
2019-09-08 19:49:05 -07:00
log . error ( { method : 'create' , line : 113 , err : err , opts : this . opts , msg : ` error creating socket server ${ this . name } ` } )
2019-01-30 20:14:00 -08:00
reject ( err )
} )
2019-09-08 19:49:05 -07:00
this . once ( 'listening' , ( ) => {
this . on ( 'error' , err => {
this . errorCount += 1 // log errors here
this . errors . push ( err )
2019-09-13 18:59:22 -07:00
if ( this . errorCount > 2 && this . errorCount < 6 ) {
2019-11-21 09:35:09 -08:00
let errors = { level : 'warn' , msg : 'something bad maybe going on, 3 errors' , errors : this . errors }
2020-01-14 13:38:24 -08:00
this . emit ( 'socket' , { state : 'error' , msg : '2 to 5 socket errors' , errors : this . errors } )
2019-12-05 14:42:41 -08:00
this . emit ( 'log' , errors )
2019-09-13 18:59:22 -07:00
log . error ( errors )
}
if ( this . errorCount > 5 ) {
let errors = { level : 'fatal' , msg : 'something fatal is going on, 6 errors' , errors : this . errors }
log . fatal ( errors )
2020-01-14 13:38:24 -08:00
this . removeAllListeners ( 'listening' )
2019-11-21 09:35:09 -08:00
this . listening = false
2020-01-14 13:38:24 -08:00
this . close ( ( ) => {
this . emit ( 'socket' , { state : 'offline' , msg : 'too many socket errors no longer listening for consumers to connect' } )
} )
2019-12-05 14:42:41 -08:00
this . emit ( 'log' , errors )
2019-09-13 18:59:22 -07:00
}
2019-09-08 19:49:05 -07:00
} )
2020-01-06 23:01:34 -08:00
let msg = ` socket ready and listening ${ typeof this . address ( ) === 'string' ? ` at ${ this . address ( ) } ` : ` on port ${ this . address ( ) . port } ` } `
2020-01-14 13:38:24 -08:00
this . emit ( 'socket' , { state : 'listening' , msg : msg } )
2020-01-06 23:01:34 -08:00
let obj = { method : 'create' , line : 54 , msg : msg }
2019-12-05 14:42:41 -08:00
log . info ( obj )
2019-09-08 19:49:05 -07:00
this . on ( 'connection' , this . _connectionHandler . bind ( this ) )
2020-01-06 23:01:34 -08:00
resolve ( msg )
2019-09-08 19:49:05 -07:00
} )
super . listen ( this . opts )
2019-08-23 15:48:39 -07:00
this . enablePing ( )
2019-09-08 19:49:05 -07:00
2019-01-30 20:14:00 -08:00
} ) // end creeate promise
} // end create
2020-01-14 13:38:24 -08:00
async stop ( ) {
return new Promise ( function ( resolve ) {
this . removeAllListeners ( 'listening' )
this . listening = false
this . close ( ( ) => {
this . emit ( 'socket' , { state : 'offline' , msg : 'manually closed socket on request' } )
resolve ( 'socket is offline' )
} )
} )
}
2019-01-30 20:14:00 -08:00
/ * *
* registerPacketProcessor - Description
* @ public
* @ param { func } Description
*
* /
registerPacketProcessor ( func ) {
this . _packetProcess = func
}
2019-08-23 15:48:39 -07:00
enablePing ( ) {
if ( this . pingInterval > 499 ) {
this . _ping = setInterval ( async ( ) => {
2020-02-10 21:38:18 -08:00
if ( this . consumers . size > 0 ) this . push ( { pingInterval : this . pingInterval } , { packetId : 'ping' } )
2019-08-23 15:48:39 -07:00
} , this . pingInterval )
}
}
disablePing ( ) {
clearInterval ( this . _ping )
}
2019-08-28 09:02:12 -07:00
addTokens ( tokens ) {
if ( typeof tokens === 'string' ) {
tokens = tokens . split ( ',' )
}
this . tokens = this . tokens . concat ( tokens )
if ( this . tokens . length > 0 ) this . allowAnonymous = false
}
removeTokens ( tokens ) {
if ( typeof tokens === 'string' ) {
if ( tokens === 'all' ) {
this . tokens = [ ]
this . allowAnonymous = true
return
}
tokens = tokens . split ( ',' )
}
this . tokens = this . tokens . filter ( token => ! tokens . includes ( token ) )
if ( this . tokens . length === 0 ) {
log . warn ( { msg : 'all tokens have been removed, switching to allow anonymous connections' } )
this . allowAnonymous = true
}
}
registerTokenValidator ( func ) {
this . allowAnonymous = false
this . _validateToken = func
}
registerAuthenticator ( func ) {
this . allowAnonymous = false
this . _authenticate = func
}
2019-01-30 20:14:00 -08:00
/ * *
2020-01-06 23:01:34 -08:00
* push - pushes a supplied UCI object packet to all connected consumers
2019-01-30 20:14:00 -08:00
*
* @ param { object } packet Description
* @ param { string } id the header id string of the pushed packet , default : 'pushed'
*
* /
2020-02-10 21:38:18 -08:00
// TODO support multiple consumers in options
async push ( packet = { } , opts = { } ) {
2020-01-06 23:01:34 -08:00
if ( this . consumers . size > 0 ) {
2020-02-10 21:38:18 -08:00
packet . _header = { id : opts . packetId || 'pushed' }
let consumers = [ ]
if ( opts . consumers || opts . consumer ) {
if ( opts . consumer ) opts . consumers = [ opts . consumer ]
consumers = Array . from ( this . consumers ) . filter ( ( [ sid , consumer ] ) =>
opts . consumers . some ( con => {
2020-03-15 14:32:35 -07:00
// console.log('filtering consumers', consumer.sid,consumer.data,con)
2020-02-10 21:38:18 -08:00
return (
con === ( ( consumer . data || { } ) . name || ( consumer . data || { } ) . id ) ||
con . sid === sid ||
con . name === ( consumer . data || { } ) . name ||
con . id === ( consumer . data || { } ) . id
)
}
)
) . map ( con => con [ 1 ] )
// console.log('custom consumers',consumers.length)
} else consumers = Array . from ( this . consumers . values ( ) )
// if (!opts.packetId) {
// console.log('socket class push',packet,opts,consumers.length)
// console.log('consumer for push', consumers.map(consumer=>(consumer.data ||{}).name))
// }
consumers = consumers . filter ( consumer => consumer . writable )
const send = consumer => this . _send ( consumer , packet )
const res = await Promise . all ( consumers . map ( send ) . map ( pReflect ) )
const success = res . filter ( result => result . isFulfilled ) . map ( ( result , index ) => [ consumers [ index ] . name , result . value ] )
const errors = res . filter ( result => result . isRejected ) . map ( ( result , index ) => [ consumers [ index ] . name , result . reason ] )
this . emit ( 'log' , { level : errors . length ? 'error' : packet . _header . id === 'ping' ? 'trace' : 'info' , msg : 'packet was pushed' , socket : this . name || this . id , errors : errors , packet : packet , success : success , headerId : packet . _header . id } )
2019-08-20 10:52:59 -07:00
} else {
2020-02-10 21:38:18 -08:00
this . emit ( 'log' , { level : 'debug' , msg : 'no connected consumers packet push ignored' , packet : packet } )
// log.debug({method:'push', id:packet._header.id, opts: this.opts, packet: packet, msg:'no connected consumers, push ignored'})
2019-08-20 10:52:59 -07:00
}
2019-01-30 20:14:00 -08:00
}
2019-09-13 18:59:22 -07:00
2020-01-06 23:01:34 -08:00
removeConsumer ( sid ) {
let consumer = this . consumers . get ( sid )
this . emit ( 'log' , { level : 'info' , msg : 'a consumer disconnected' , consumer : consumer . data , sid : consumer . sid } )
this . emit ( 'connection:consumer' , { state : 'disconnected' , msg : 'a consumer disconnected' , consumer : consumer . data , sid : consumer . sid } )
consumer . removeAllListeners ( )
consumer . stream . removeAllListeners ( )
this . consumers . delete ( sid )
log . warn ( { msg : 'consumer removed from tracking' , sid : sid , curConsumerCount : this . consumers . size } )
2019-08-23 15:48:39 -07:00
}
2020-01-06 23:01:34 -08:00
async authenticateConsumer ( consumer ) {
2019-12-02 14:20:57 -08:00
return new Promise ( async ( resolve , reject ) => {
2019-08-23 15:48:39 -07:00
// when consumer gets the handshake they must follow with authentication
2020-01-06 23:01:34 -08:00
consumer . stream . on ( 'message' , authenticate . bind ( this , consumer ) )
let [ err ] = await btc ( this . _send ) ( consumer , { _handshake : true , sid : consumer . sid } )
2019-08-23 15:48:39 -07:00
if ( err ) {
log . error ( { msg : 'error in handshake send' , error : err } )
reject ( err )
}
2020-01-06 23:01:34 -08:00
async function authenticate ( consumer , packet ) {
log . debug ( { msg : ` authentication packet from consumer ${ consumer . name } : ${ consumer . id } : ${ consumer . sid } ` , packet : packet } )
consumer . stream . removeAllListeners ( 'message' )
if ( ! packet . _authenticate ) reject ( 'first consumer packet was not authentication' )
2019-08-23 15:48:39 -07:00
else {
let [ err , res ] = await btc ( this . _authenticate ) ( packet )
2020-01-06 23:01:34 -08:00
consumer . authenticated = this . allowAnonymous ? 'anonymous' : ( err ? false : res )
2020-01-14 13:38:24 -08:00
consumer . data = packet . data || { }
consumer . name = packet . name || consumer . data . name
consumer . id = packet . id || consumer . data . id
// console.log('-------------------Inbound Consumer Authenticated---------------------------')
// console.log(packet)
// console.log(consumer.authenticated, consumer.name,consumer.id,consumer.data)
// console.log('--------------------------------------------------------')
2020-01-06 23:01:34 -08:00
packet . authenticated = consumer . authenticated
2019-08-28 09:02:12 -07:00
packet . reason = err || null
2020-01-06 23:01:34 -08:00
log . debug ( { msg : 'sending authorization result to consumer' , packet : packet } )
2020-01-18 22:34:38 -08:00
this . _send ( consumer , packet ) // send either way
2019-08-28 09:02:12 -07:00
if ( err && ! this . allowAnonymous ) {
2020-01-14 13:38:24 -08:00
log . info ( { msg : ` consumer ${ consumer . data . name } authentication failed ` , name : consumer . name , id : consumer . id , data : consumer . data , consumer _sid : consumer . sid , reason : err } )
2019-08-28 09:02:12 -07:00
reject ( packet . reason )
}
else {
2020-01-14 13:38:24 -08:00
log . info ( { msg : ` consumer ${ consumer . name } authenticated successfuly ` , name : consumer . name , id : consumer . id , data : consumer . data } )
if ( this . allowAnonymous ) log . warn ( { msg : ` consumer ${ consumer . data . name } , connected anonymously ` } )
2020-01-06 23:01:34 -08:00
resolve ( consumer . authenticated )
2019-08-28 09:02:12 -07:00
}
2019-08-23 15:48:39 -07:00
}
}
2019-12-02 14:20:57 -08:00
} )
2019-08-23 15:48:39 -07:00
}
// private methods
2019-08-28 09:02:12 -07:00
// default validator
_validateToken ( token ) {
if ( token ) return this . tokens . includes ( token )
return false
}
2020-01-06 23:01:34 -08:00
// default authenticator - reject value should be reason which is returned to consumer
2019-08-23 15:48:39 -07:00
async _authenticate ( packet ) {
2019-08-28 09:02:12 -07:00
if ( ! this . _validateToken ( packet . token ) ) return Promise . reject ( 'invalid token' )
2019-08-23 15:48:39 -07:00
return true
}
2019-08-19 17:57:10 -07:00
2020-01-06 23:01:34 -08:00
// async _connectionHandler({consumer, server}) { // this gets called for each consumer connection and is unique to
async _connectionHandler ( consumer ) { // this gets called for each consumer connection and is unique to each
2019-12-02 14:20:57 -08:00
2019-09-08 19:49:05 -07:00
const stream = new JSONStream ( )
2019-09-13 18:59:22 -07:00
consumer . stream = stream
2020-01-06 23:01:34 -08:00
consumer . data = { }
2019-12-18 18:14:21 -08:00
consumer . connected = true
2019-04-28 09:58:15 -07:00
2019-09-08 19:49:05 -07:00
// add listeners
2019-09-13 18:59:22 -07:00
consumer . on ( 'error' , ( err ) => {
2020-01-06 23:01:34 -08:00
log . error ( { msg : 'consumer connection error' , error : err } )
2019-09-08 19:49:05 -07:00
// TODO do more handling than just logging
} )
2019-08-23 15:48:39 -07:00
2019-12-18 18:14:21 -08:00
consumer . on ( 'end' , ( err ) => {
2020-01-06 23:01:34 -08:00
log . error ( { msg : ` 'consumer connection ended: ${ consumer . data . name } ` , error : err } )
if ( consumer . sid ) this . removeConsumer ( consumer . sid )
2019-12-18 18:14:21 -08:00
else {
consumer . removeAllListeners ( )
consumer . stream . removeAllListeners ( )
}
} )
2019-09-13 18:59:22 -07:00
consumer . on ( 'data' , stream . onData ) // send data to
2019-08-23 15:48:39 -07:00
2019-09-08 19:49:05 -07:00
stream . on ( 'error' , ( err ) => {
2019-09-13 18:59:22 -07:00
log . error ( { msg : 'consumer stream error during listen' , error : err } )
2019-09-08 19:49:05 -07:00
// TODO do more handling than just logging
} )
2019-08-23 15:48:39 -07:00
2020-01-06 23:01:34 -08:00
// consumer.authenticated = true
2019-12-02 14:20:57 -08:00
2020-01-06 23:01:34 -08:00
let [ err ] = await btc ( this . authenticateConsumer ) ( consumer )
2019-09-08 19:49:05 -07:00
if ( ! this . allowAnonymous ) {
if ( err ) {
2019-12-02 14:20:57 -08:00
consumer . removeAllListeners ( )
consumer . stream . removeAllListeners ( )
2019-09-13 18:59:22 -07:00
consumer . end ( ) // abort new connection consumer, cleanup, remove listeners
2019-12-18 18:14:21 -08:00
consumer . emit ( 'end' , err )
2019-09-08 19:49:05 -07:00
return
}
}
2020-01-06 23:01:34 -08:00
// authenticated consumer, add to list of consumers
consumer . sid = ++ this . nextConsumerID // server assigned ID
// consumer.authenticated = true
this . consumers . set ( consumer . sid , consumer ) // add current consumer to consumers
2019-12-02 14:20:57 -08:00
consumer . setKeepAlive ( this . keepAlive , 30 )
2020-02-10 21:38:18 -08:00
2020-01-06 23:01:34 -08:00
const consumerCloseHandler = ( sid ) => {
2019-12-05 14:42:41 -08:00
log . warn ( { msg : 'consumer connection was closed' , sid : sid } )
2020-01-06 23:01:34 -08:00
this . removeConsumer ( sid )
2019-12-05 14:42:41 -08:00
}
2020-01-06 23:01:34 -08:00
consumer . on ( 'close' , consumerCloseHandler . bind ( this , consumer . sid ) )
2020-02-10 21:38:18 -08:00
2020-01-06 23:01:34 -08:00
log . debug ( { method : '_listen' , line : 364 , msg : 'new consumer connected/authenticated' , cname : consumer . name , cid : consumer . id , totalConsumers : this . consumers . size } )
2019-12-02 14:20:57 -08:00
// all's set enable main incoming message processor
2019-09-13 18:59:22 -07:00
stream . on ( 'message' , messageProcess . bind ( this , consumer ) )
2019-08-19 17:57:10 -07:00
2020-03-15 14:32:35 -07:00
if ( this . conPackets . length ) {
2020-03-24 14:13:37 -07:00
setTimeout ( async ( ) => {
for ( let packet of this . conPackets ) {
packet . _header = { type : 'on connection packet' , id : 'pushed' }
await this . _send ( consumer , packet ) // send a packet command on to consumer on connection
}
} , 100 )
2019-09-08 19:49:05 -07:00
}
2019-08-23 15:48:39 -07:00
2020-03-15 14:32:35 -07:00
2019-12-05 14:42:41 -08:00
this . emit ( 'log' , { level : 'info' , msg : 'a consumer connected and authenticated' , name : consumer . name , id : consumer . id } )
2020-01-06 23:01:34 -08:00
this . emit ( 'connection:consumer' , { state : 'connected' , msg : ` consumer ${ ( consumer . data || { } ).name} connected and authenticated to socket ${ this . id } ` ,
name : ( consumer . data || { } ) . name || ( consumer . data || { } ) . id || consumer . sid ,
sid : consumer . sid ,
data : consumer . data ,
authenticated : consumer . authenticated
} )
2019-09-13 18:59:22 -07:00
2019-09-08 19:49:05 -07:00
// that's it. Connection is active
2019-08-23 15:48:39 -07:00
2020-01-06 23:01:34 -08:00
async function messageProcess ( consumer , packet ) {
2020-03-24 14:13:37 -07:00
this . emit ( 'log' , { level : 'mcp' , packet : packet , consumer : consumer . data , msg : 'incoming packet on socket side' } )
2019-09-08 19:49:05 -07:00
let res = ( await this . _packetProcess ( clone ( packet ) ) ) || { }
if ( Object . keys ( res ) . length === 0 )
res = {
error :
'socket packet command function likely did not return a promise' ,
packet : packet
}
if ( packet ) {
res . _header = clone ( packet . _header , false ) || { } //make sure return packet has header with id in case it was removed in processing
delete packet . _header // remove before adding to response header as request
} else res . _header = { }
res . _header . request = clone ( packet , false )
res . _header . responder = { name : this . name , instanceID : this . id }
res . _header . socket = this . address ( )
2019-09-13 18:59:22 -07:00
if ( ! res . cmd ) res . cmd = this . defaultReturnCmd || 'reply' // by default return command is 'reply'
2020-01-06 23:01:34 -08:00
let [ err ] = await btc ( this . _send ) ( consumer , res )
2019-09-08 19:49:05 -07:00
if ( err ) log . error ( { msg : err , error : err } )
} // end message process
2019-08-23 15:48:39 -07:00
2019-01-30 20:14:00 -08:00
} // end listen
2019-08-23 15:48:39 -07:00
// call when socket server is going down
2019-01-30 20:14:00 -08:00
async _destroy ( ) {
2019-08-23 15:48:39 -07:00
log . fatal ( { method : '_destroy' , line : 217 , msg : 'closing down socket server' } )
// this.push()
2019-08-19 17:57:10 -07:00
clearInterval ( this . _ping )
2019-01-30 20:14:00 -08:00
await this . close ( )
2020-01-06 23:01:34 -08:00
this . consumers . forEach ( consumer => {
consumer . removeAllListeners ( )
consumer . stream . removeAllListeners ( )
2019-08-23 15:48:39 -07:00
} )
2019-04-26 10:14:57 -07:00
log . debug ( { method : '_destroy' , line : 219 , msg : 'all connections closed....exiting' } )
2019-01-30 20:14:00 -08:00
process . exit ( )
}
2019-08-23 15:48:39 -07:00
// default packet process, just a simple echo, override with registerPacketProcessor
2019-01-30 20:14:00 -08:00
async _packetProcess ( packet ) {
return new Promise ( resolve => {
resolve ( packet )
} )
}
2020-01-06 23:01:34 -08:00
async _send ( consumer , packet ) {
log . trace ( { msg : ` sending to consumer: ${ consumer . sid } : ${ consumer . data . name } ` , consumer : consumer . data , packet : packet } )
2019-08-23 15:48:39 -07:00
return new Promise ( async ( resolve , reject ) => {
2020-07-12 18:28:19 -07:00
if ( ! consumer ) {
console . log ( 'no consumer rejecting packet send' )
reject ( 'no consumer specified can not send packet' )
return
}
2020-01-06 23:01:34 -08:00
if ( ! consumer . writable ) {
2020-07-12 18:28:19 -07:00
console . log ( 'no consumer writeable stream rejecting packet send' )
2019-12-18 18:14:21 -08:00
reject ( 'socket stream closed can not send packet' )
return
}
2020-01-06 23:01:34 -08:00
let [ err , ser ] = await btc ( consumer . stream . serialize ) ( packet )
2019-08-23 15:48:39 -07:00
if ( err ) reject ( 'unable to serialze the packet' )
2020-07-12 18:28:19 -07:00
if ( ! ser ) {
2020-07-26 15:32:04 -07:00
// console.log('empty-serialized packet', consumer.name, consumer.socketName)
2020-07-12 18:28:19 -07:00
reject ( 'empty packet rejecting send, nothing to send' )
return
}
2019-01-30 20:14:00 -08:00
const cb = ( ) => resolve ( 'packet written to socket stream' )
2020-01-06 23:01:34 -08:00
if ( ! consumer . write ( ser ) ) {
consumer . once ( 'drain' , cb )
2019-01-30 20:14:00 -08:00
} else {
process . nextTick ( cb )
}
2019-12-18 18:14:21 -08:00
2019-01-30 20:14:00 -08:00
} )
}
} // end class
} // end function makeSocketClass