2019-09-08 13:58:41 -07:00
import { Server as WSS } from 'ws'
import { Server } from 'http'
2018-04-05 15:05:59 -07:00
import btc from 'better-try-catch'
2020-02-10 21:39:33 -08:00
import pReflect from 'p-reflect'
2019-03-17 13:52:58 -07:00
import { promisify } from 'util'
2018-04-05 15:05:59 -07:00
import _ON _DEATH from 'death' //this is intentionally ugly
import clone from 'clone'
2019-02-14 14:01:17 -08:00
import logger from '@uci-utils/logger'
2018-04-05 15:05:59 -07:00
let log = { }
2019-01-01 19:11:16 -08:00
/ * *
2019-09-08 13:58:41 -07:00
* Socket - Description
* @ extends Server
2019-01-01 19:11:16 -08:00
* /
2019-09-08 13:58:41 -07:00
class Socket extends Server {
/ * *
* constructor - Description
*
* @ param { object } [ opts = { } ] Description
*
* @ returns { type } Description
* /
2019-01-01 19:11:16 -08:00
constructor ( opts = { } ) {
2019-09-08 13:58:41 -07:00
super ( opts )
2018-04-05 15:05:59 -07:00
opts . host = opts . host || '0.0.0.0'
2018-05-24 12:23:49 -07:00
opts . port = opts . port || 8090
2019-01-01 19:11:16 -08:00
this . id = opts . id || opts . name || 'Websocket:' + new Date ( ) . getTime ( )
this . opts = opts // for use to recover from selected errors
2019-09-08 13:58:41 -07:00
this . wss = { } // web socket instance goes here
this . allowAnonymous = ( ! opts . tokens || ! ! process . env . UCI _ANON || opts . allowAnonymous ) ? true : false
this . tokens = opts . tokens || [ ]
this . pingInterval = opts . pingInterval === false ? opts . pingInterval : ( opts . pingInterval * 1000 || 5000 )
this . nextconsumerID = 0 // incrementer for default initial consumer ID
this . consumers = new Map ( )
2020-03-15 15:48:34 -07:00
this . conPackets = opts . conPackets || [ opts . conPacket ]
2019-09-08 13:58:41 -07:00
this . errors = [ ]
this . errorCount = 0
2018-04-05 15:05:59 -07:00
this . create = this . create . bind ( this )
2019-09-08 13:58:41 -07:00
this . _destroy = this . _destroy . bind ( this )
2020-01-16 22:12:30 -08:00
this . _listening = false
2019-09-08 13:58:41 -07:00
this . authenticateconsumer = this . authenticateconsumer . bind ( this )
this . _authenticate = this . _authenticate . bind ( this )
2019-03-17 13:52:58 -07:00
this . close = promisify ( this . close ) . bind ( this )
2019-01-01 19:11:16 -08:00
log = logger ( {
2019-11-21 09:42:22 -08:00
package : '@uci/websocket' ,
2019-01-01 19:11:16 -08:00
file : 'src/socket.js' ,
class : 'Socket' ,
name : 'websocket' ,
id : this . id
} )
2018-04-05 15:05:59 -07:00
} // end constructor
2020-01-16 22:12:30 -08:00
get active ( ) {
return this . _listening
}
2019-11-21 09:42:22 -08:00
2019-09-08 13:58:41 -07:00
/ * *
* create - Description
*
* @ returns { type } Description
* /
2019-01-01 19:11:16 -08:00
async create ( ) {
2020-01-16 22:12:30 -08:00
this . emit ( 'socket' , { state : 'creating' , msg : 'creating socket for consumers to connect' } )
2018-04-05 15:05:59 -07:00
return new Promise ( ( resolve , reject ) => {
2019-01-01 19:11:16 -08:00
_ON _DEATH ( async ( ) => {
2019-09-08 13:58:41 -07:00
log . error ( { method : 'create' , line : 51 , msg : '\nhe\'s dead jim' } )
2018-04-05 15:05:59 -07:00
await this . _destroy ( )
} )
2019-09-08 13:58:41 -07:00
// kills nodemon properly
2018-04-05 15:05:59 -07:00
process . once ( 'SIGUSR2' , async ( ) => {
2019-09-08 13:58:41 -07:00
await this . _destroy ( )
2018-04-05 15:05:59 -07:00
process . kill ( process . pid , 'SIGUSR2' )
} )
2019-09-08 13:58:41 -07:00
this . once ( 'error' , async err => {
log . error ( { method : 'creaete' , line : 48 , err : err , msg : 'http server error' } )
2018-04-05 15:05:59 -07:00
reject ( err )
} )
2019-09-08 13:58:41 -07:00
this . once ( 'listening' , ( ) => {
2019-04-26 10:12:34 -07:00
log . info ( { method : 'create' , line : 54 , msg : 'websocket server created and listening at' , address : this . address ( ) } )
2019-09-08 13:58:41 -07:00
// emit ready
this . on ( 'error' , err => {
this . errorCount += 1 // log errors here
this . errors . push ( err )
2020-01-16 22:12:30 -08:00
if ( this . errorCount > 2 ) {
this . emit ( 'log' , { level : 'warn' , msg : 'something bad maybe going on, 3 errors' , errors : this . errors } )
this . emit ( 'socket' , { state : 'error' , msg : '2 to 5 socket errors' , errors : this . errors } )
}
2019-11-21 09:42:22 -08:00
if ( this . errorCount > 5 ) {
let errors = { level : 'fatal' , msg : 'something fatal is going on, 6 errors' , errors : this . errors }
log . fatal ( errors )
2020-01-16 22:12:30 -08:00
this . _listening = false
this . close ( ( ) => {
this . emit ( 'socket' , { state : 'offline' , msg : 'too many socket errors no longer listening for consumers to connect' } )
} )
2019-12-08 14:54:40 -08:00
this . emit ( 'log' , { active : this . active } )
this . emit ( 'log' , errors )
2019-11-21 09:42:22 -08:00
}
2019-09-08 13:58:41 -07:00
} )
this . wss = new WSS ( { server : this } )
this . wss . on ( 'error' , err => { this . emit ( 'error' , err ) } ) // bubble up errors
this . wss . on ( 'connection' , this . _connectionHandler . bind ( this ) )
2020-01-16 22:12:30 -08:00
this . _listening = true
let msg = ` socket ready and listening ${ typeof this . address ( ) === 'string' ? ` at ${ this . address ( ) } ` : ` on port ${ this . address ( ) . port } ` } `
2019-12-08 14:54:40 -08:00
this . emit ( 'log' , { active : this . active } )
2020-01-16 22:12:30 -08:00
this . emit ( 'socket' , { state : 'listening' , msg : msg } )
2019-09-08 13:58:41 -07:00
resolve ( ` websocket ready and listening at ${ this . address ( ) . address } : ${ this . address ( ) . port } ` )
2018-04-05 15:05:59 -07:00
} )
2019-09-08 13:58:41 -07:00
super . listen ( this . opts )
2018-04-05 15:05:59 -07:00
} )
} // end create
2019-01-01 19:11:16 -08:00
registerPacketProcessor ( func ) {
2019-09-08 13:58:41 -07:00
this . _packetProcess = func . bind ( this )
2018-04-05 15:05:59 -07:00
}
2019-09-08 13:58:41 -07:00
addTokens ( tokens ) {
if ( typeof tokens === 'string' ) {
tokens = tokens . split ( ',' )
}
this . tokens = this . tokens . concat ( tokens )
if ( this . tokens . length > 0 ) this . allowAnonymous = false
}
removeTokens ( tokens ) {
if ( typeof tokens === 'string' ) {
if ( tokens === 'all' ) {
this . tokens = [ ]
this . allowAnonymous = true
return
}
tokens = tokens . split ( ',' )
}
this . tokens = this . tokens . filter ( token => ! tokens . includes ( token ) )
if ( this . tokens . length === 0 ) {
log . warn ( { msg : 'all tokens have been removed, switching to allow anonymous connections' } )
this . allowAnonymous = true
}
}
registerTokenValidator ( func ) {
this . allowAnonymous = false
this . _validateToken = func
}
registerAuthenticator ( func ) {
this . allowAnonymous = false
this . _authenticate = func
}
2019-04-28 09:56:22 -07:00
2019-09-08 13:58:41 -07:00
/ * *
* push
*
* @ param { Object } packet - this is the parameter packet
* @ param { < type > } id - this is the parameter id
*
* /
2020-02-10 21:39:33 -08:00
async push ( packet = { } , opts = { } ) {
2019-09-08 13:58:41 -07:00
if ( this . consumers . size > 0 ) {
2020-02-10 21:39:33 -08:00
packet . _header = { id : opts . packetId || 'pushed' }
let consumers = [ ]
if ( opts . consumers || opts . consumer ) {
if ( opts . consumer ) opts . consumers = [ opts . consumer ]
consumers = Array . from ( this . consumers ) . filter ( ( [ sid , consumer ] ) =>
opts . consumers . some ( con => {
console . log ( consumer . sid , consumer . data , con )
return (
con === ( ( consumer . data || { } ) . name || ( consumer . data || { } ) . id ) ||
con . sid === sid ||
con . name === ( consumer . data || { } ) . name ||
con . id === ( consumer . data || { } ) . id
)
}
)
) . map ( con => con [ 1 ] )
// console.log('custom consumers',consumers.length)
} else consumers = Array . from ( this . consumers . values ( ) )
consumers = consumers . filter ( consumer => consumer . writable || consumer . readyState === 1 )
const send = consumer => this . _send ( consumer , packet )
const res = await Promise . all ( consumers . map ( send ) . map ( pReflect ) )
const success = res . filter ( result => result . isFulfilled ) . map ( ( result , index ) => [ consumers [ index ] . name , result . value ] )
const errors = res . filter ( result => result . isRejected ) . map ( ( result , index ) => [ consumers [ index ] . name , result . reason ] )
this . emit ( 'log' , { level : errors . length ? 'error' : packet . _header . id === 'ping' ? 'trace' : 'debug' , msg : 'packet was pushed' , socket : this . name || this . id , errors : errors , packet : packet , success : success , headerId : packet . _header . id } )
2019-09-08 13:58:41 -07:00
} else {
2020-02-10 21:39:33 -08:00
this . emit ( 'log' , { level : 'debug' , msg : 'no connected consumers packet push ignored' , packet : packet } )
2019-09-08 13:58:41 -07:00
}
}
2019-04-28 09:56:22 -07:00
2019-09-08 13:58:41 -07:00
removeconsumer ( id ) {
let consumer = this . consumers . get ( id )
2019-12-08 14:54:40 -08:00
this . emit ( 'connection:consumer' , { state : 'disconnected' , name : consumer . name } )
2019-09-08 13:58:41 -07:00
clearInterval ( consumer . _ping )
consumer . removeAllListeners ( )
log . warn ( { msg : ` consumer ${ id } : ${ consumer . name } removed from server tracking ` , id : id , name : consumer . name , curconsumerCount : this . consumers . size } )
this . consumers . delete ( id )
}
2019-04-28 09:56:22 -07:00
2019-09-08 13:58:41 -07:00
async authenticateconsumer ( consumer ) {
// let server = this
return new Promise ( async function ( resolve , reject ) {
// when consumer gets the handshake they must follow with authentication
consumer . on ( 'message' , authenticate . bind ( this , consumer ) )
let [ err ] = await btc ( this . _send ) ( consumer , { _handshake : true , id : consumer . id } )
if ( err ) {
log . error ( { msg : 'error in handshake send' , error : err } )
reject ( err )
}
async function authenticate ( consumer , message ) {
let [ err , packet ] = btc ( JSON . parse ) ( message )
if ( err ) reject ( 'unable to parse authentication packet from consumer' )
log . debug ( { msg : ` authentication packet from consumer ${ consumer . id } ` , packet : packet } )
if ( ! packet . _authenticate ) reject ( 'first consumer packet was not authentication' )
else {
let [ err , res ] = await btc ( this . _authenticate ) ( packet )
consumer . authenticated = this . allowAnonymous ? 'anonymous' : ( err ? false : res )
consumer . name = packet . consumerName
packet . authenticated = consumer . authenticated
if ( err && ! this . allowAnonymous ) packet . reason = err
log . debug ( { msg : 'sending authorization result to consumer' , packet : packet } )
await this . _send ( consumer , packet ) // send either way
if ( packet . reason ) {
log . info ( { msg : 'consumer authentication failed' , consumer : consumer . name , consumer _id : consumer . id , reason : err } )
reject ( packet . reason )
}
else {
log . info ( { msg : 'consumer authenticated successfuly' , consumer : consumer . name , consumer _id : consumer . id } )
2019-11-21 09:42:22 -08:00
if ( this . allowAnonymous ) log . warn ( { msg : 'web consumer connected anonymously' , consumer : consumer . name , consumer _id : consumer . id } )
2019-09-08 13:58:41 -07:00
resolve ( consumer . authenticated )
2018-05-24 12:23:49 -07:00
}
2018-04-05 15:05:59 -07:00
}
2019-09-08 13:58:41 -07:00
}
} . bind ( this ) )
}
// private methods
// default validator
_validateToken ( token ) {
if ( token ) return this . tokens . includes ( token )
return false
}
// default authenticator - reject value should be reason which is returned to consumer
async _authenticate ( packet ) {
if ( ! this . _validateToken ( packet . token ) ) return Promise . reject ( 'invalid token' )
return true
}
async _connectionHandler ( consumer ) {
log . debug ( { method : '_connectionHandler' , line : 76 , msg : 'new web consumer connecting' } )
consumer . id = ++ this . nextconsumerID // server assigned ID
this . consumers . set ( consumer . id , consumer )
consumer . authenticated = false
consumer . connected = true
// add listeners
const consumerCloseHandler = ( id ) => {
log . warn ( { msg : 'consumer connection closed during listen,' , id : id } )
this . removeconsumer ( id )
}
consumer . on ( 'close' , consumerCloseHandler . bind ( this , consumer . id ) )
consumer . on ( 'error' , ( err ) => {
log . error ( { msg : 'consumer connection error during listen' , error : err } )
// TODO do more handling than just logging
} )
let [ err ] = await btc ( this . authenticateconsumer ) ( consumer )
if ( ! this . allowAnonymous ) {
if ( err ) {
consumer . close ( ) // abort new connection consumer, cleanup, remove listeners
return
}
}
2020-03-15 15:48:34 -07:00
if ( this . conPackets . length ) {
for ( let packet of this . conPackets ) {
packet . _header = { type : 'on connection packet' , id : 'pushed' }
await this . _send ( consumer , packet ) // send a packet command on to consumer on connection
}
2019-09-08 13:58:41 -07:00
}
2019-12-08 14:54:40 -08:00
this . emit ( 'connection:consumer' , { state : 'connected' , name : consumer . name , id : consumer . id } )
2019-09-08 13:58:41 -07:00
consumer . _ping = setInterval ( ( ) => {
consumer . ping ( JSON . stringify ( { pingInterval : this . pingInterval } ) )
this . _send ( consumer , { _header : { id : 'ping' } , pingInterval : this . pingInterval } )
} , this . pingInterval )
consumer . on ( 'message' , messageProcess . bind ( this , consumer ) )
async function messageProcess ( consumer , strPacket ) {
log . debug ( { method : '_listen' , line : 76 , packet : strPacket , msg : ' incoming packet from web consumer' } )
let res = { }
let [ err , packet ] = btc ( JSON . parse ) ( strPacket )
if ( err ) {
res = { error : 'Could not JSON parse packet' , packet : strPacket }
}
else {
2019-11-21 09:42:22 -08:00
log . debug ( { method : '_listen' , line : 266 , packet : packet , msg : 'parsed packet' } )
2019-09-08 13:58:41 -07:00
res = ( await this . _packetProcess ( clone ( packet ) ) ) || { }
if ( Object . keys ( res ) . length === 0 )
res = {
error :
'consumer packet command function likely did not return a promise' ,
packet : packet
}
2018-05-24 12:23:49 -07:00
if ( packet ) {
2019-01-01 19:11:16 -08:00
res . _header = clone ( packet . _header , false ) || { } //make sure return packet has header with id in case it was removed in processing
delete packet . _header // remove before adding to response header as request
2018-05-24 12:23:49 -07:00
} else res . _header = { }
2019-01-01 19:11:16 -08:00
res . _header . request = clone ( packet , false )
res . _header . responder = { name : this . name , instanceID : this . id }
2018-04-05 15:05:59 -07:00
res . _header . socket = this . address ( )
2019-01-01 19:11:16 -08:00
if ( ! res . cmd ) res . cmd = 'reply' // by default return command is 'reply'
2019-09-08 13:58:41 -07:00
let [ err ] = await btc ( this . _send ) ( consumer , res )
if ( err ) log . error ( { msg : err , error : err } )
2018-04-05 15:05:59 -07:00
}
2019-09-08 13:58:41 -07:00
} // end message process
} // end connection Handler
2018-04-05 15:05:59 -07:00
2019-01-01 19:11:16 -08:00
async _destroy ( ) {
2019-09-08 13:58:41 -07:00
log . debug ( { method : '_destroy' , line : 302 , msg : 'closing down http server and attached sockets' , port : this . port } )
this . consumers . forEach ( consumer => {
consumer . terminate ( )
consumer . emit ( 'close' )
} )
2018-04-05 15:05:59 -07:00
await this . close ( )
2019-09-08 13:58:41 -07:00
log . debug ( { method : '_listen' , line : 105 , msg : 'all connections closed, all consumer sockets deleted....exiting' } )
2018-04-05 15:05:59 -07:00
process . exit ( )
}
// default packet process, just a simple echo - replace
2019-01-01 19:11:16 -08:00
async _packetProcess ( packet ) {
2018-04-05 15:05:59 -07:00
return new Promise ( resolve => {
resolve ( packet )
} )
}
2018-05-24 12:23:49 -07:00
2019-09-08 13:58:41 -07:00
async _send ( consumer , packet ) {
log . debug ( { msg : ` sending to client: ${ consumer . id } : ${ consumer . name } ` , packet : packet } )
return new Promise ( async ( resolve , reject ) => {
// if (!consumer._connected) reject('can not send no connection')
2019-01-01 19:11:16 -08:00
let [ err , message ] = btc ( JSON . stringify ) ( packet )
2019-09-08 13:58:41 -07:00
if ( err ) reject ( 'unable to serialze the packet' )
consumer . send ( message , ( err ) => {
if ( ! err ) resolve ( 'packet written to socket stream successfully' )
else reject ( ` error sending: ${ err } ` )
} )
} )
2018-04-05 15:05:59 -07:00
}
2019-09-08 13:58:41 -07:00
2018-04-05 15:05:59 -07:00
} // end class
2019-01-01 19:11:16 -08:00
export default Socket