2019-08-15 08:11:18 -07:00
import { connect } from 'mqtt'
2019-09-01 18:14:16 -07:00
import EventEmitter from 'events'
2018-02-28 20:24:45 -08:00
import btc from 'better-try-catch'
import union from 'lodash.union'
import xor from 'lodash.xor'
2019-02-14 14:01:11 -08:00
import logger from '@uci-utils/logger'
2018-05-24 12:31:47 -07:00
import isPlainObject from 'is-plain-object'
2019-04-27 12:08:28 -07:00
import isTopicMatch from 'mqtt-match'
2018-02-28 20:24:45 -08:00
let log = { }
2019-01-01 19:20:51 -08:00
/ * *
* MQTT - Client - An mqtt client that supports UCI packets
2019-09-01 18:14:16 -07:00
* uses mqtt . js as basis
2019-01-01 19:20:51 -08:00
* /
2019-09-01 18:14:16 -07:00
class MQTTClient extends EventEmitter {
2019-01-01 19:20:51 -08:00
/ * *
* constructor - Description
*
* @ param { object } [ opts = { } ] Description
*
* /
constructor ( opts = { } ) {
2019-09-01 18:14:16 -07:00
super ( opts )
2019-01-01 19:20:51 -08:00
this . id = opts . id || opts . name || 'mqtt:' + new Date ( ) . getTime ( )
log = logger ( {
file : 'src/client.js' ,
2019-04-26 10:41:46 -07:00
package : '@uci/mqttjs' ,
class : 'MQTTClient' ,
2019-01-01 19:20:51 -08:00
name : 'mqtt' ,
id : this . id
} )
2019-03-17 13:50:04 -07:00
this . url = opts . url || null
2019-09-01 18:14:16 -07:00
this . client = opts . client || false
this . sendTopic = opts . sendTopic || 'uci/send'
2019-09-08 19:53:51 -07:00
this . opts = opts
2019-09-01 18:14:16 -07:00
// these are the "listen" topics to which the instance will subscribe. By default that will include the instance ID
2019-08-15 08:11:18 -07:00
// default subscription topics can be string of commna delimited or array of strings. If none they by default topic is id
2019-09-01 18:14:16 -07:00
this . topics = opts . topics ? ( Array . isArray ( opts . topics ) ? opts . topics : opts . topics . split ( ',' ) ) : [ ]
this . topics . push ( this . client ? ` ${ this . sendTopic } /return ` : ` ${ this . sendTopic } /receive ` )
2019-08-15 08:11:18 -07:00
this . connectOpts = opts . connect || { } // see options for new mqtt.Client
this . mqtt = { } // orginal mqtt.js object when created by connect
2018-02-28 20:24:45 -08:00
// self bindings
2019-09-01 18:14:16 -07:00
this . _connected = false
2020-03-15 15:56:48 -07:00
this . conPackets = opts . conPackets || [ opts . conPacket ]
2018-02-28 20:24:45 -08:00
this . connect = this . connect . bind ( this )
2019-09-01 18:14:16 -07:00
this . publish = this . publish . bind ( this )
this . _subscribe = this . _subscribe . bind ( this )
this . client ? this . send = this . _send . bind ( this ) : this . push = this . _push . bind ( this )
2018-02-28 20:24:45 -08:00
}
2019-09-08 19:53:51 -07:00
// TODO add UCI authentication and MQTT authentication
2019-09-01 18:14:16 -07:00
get connected ( ) { return this . _connected }
2019-01-01 19:20:51 -08:00
/ * *
* connect - Description
*
* @ returns { type } Description
* /
async connect ( ) {
return new Promise ( ( resolve , reject ) => {
2019-08-15 08:11:18 -07:00
this . mqtt = connect (
2019-01-01 19:20:51 -08:00
this . url ,
2019-08-15 08:11:18 -07:00
this . connectOpts
2019-01-01 19:20:51 -08:00
)
2019-08-15 08:11:18 -07:00
let timeout = this . connecOpts || 5000
2019-01-01 19:20:51 -08:00
setTimeout ( ( ) => {
2019-08-15 08:11:18 -07:00
reject ( { msg : 'ending mqtt connection attempt, no broker' , url : this . url , options : this . connectOpts } )
2019-01-01 19:20:51 -08:00
} , timeout )
2018-02-28 20:24:45 -08:00
2019-09-01 18:14:16 -07:00
this . mqtt . once ( 'connect' , async ( ) => {
2018-05-24 12:31:47 -07:00
this . _log ( )
this . _listen ( )
2018-02-28 20:24:45 -08:00
2019-08-15 08:11:18 -07:00
this . mqtt . on ( 'reconnect' , ( ) => {
2019-04-26 10:41:46 -07:00
log . debug ( { method : 'connect' , line : 71 , msg : 'mqtt client reconnected to broker' } )
2019-09-01 18:14:16 -07:00
this . _connected = true
2019-08-15 08:11:18 -07:00
this . _subscribe ( this . topics ) // resubscribe
2019-09-08 19:53:51 -07:00
this . emit ( 'status' , { level : 'info' , msg : 'reconnected' , id : this . id , opts : this . opts , connected : this . _connected } )
2020-03-15 15:56:48 -07:00
this . emit ( 'connection:consumer' , { state : 'connected' , id : this . id , mqtt : true } )
2019-09-08 19:53:51 -07:00
if ( this . opts . conPacket ) this . send ( this . conPacket )
2018-05-24 12:31:47 -07:00
} )
2019-08-15 08:11:18 -07:00
this . mqtt . on ( 'error' , err => {
2019-09-01 18:14:16 -07:00
this . emit ( 'status' , { level : 'fatal' , method : 'connect' , err : err , msg : 'connection error to broker' } )
2019-04-26 10:41:46 -07:00
log . fatal ( { method : 'connect' , line : 76 , err : err , msg : 'connection error to broker' } )
2020-03-15 15:56:48 -07:00
this . emit ( 'connection:consumer' , { state : 'disconnected' , id : this . id , mqtt : true } )
2019-09-01 18:14:16 -07:00
this . _connected = false
} )
this . mqtt . on ( 'close' , ( ) => {
this . emit ( 'status' , { level : 'fatal' , msg : 'mqtt connection to broker was closed' } )
log . debug ( { method : '_log' , line : 193 , msg : 'connection to broker was closed' } )
this . _connected = false
} )
this . mqtt . on ( 'offline' , ( ) => {
this . emit ( 'status' , { level : 'fatal' , msg : 'this client has gone offline from broker' } )
log . warn ( { method : '_log' , line : 197 , msg : 'this client has gone offline from broker' } )
this . _connected = false
2018-05-24 12:31:47 -07:00
} )
2019-08-15 08:11:18 -07:00
log . debug ( { method : 'connect' , line : 80 , options : this . mqtt . options , msg : ` mqtt client connected to broker at ${ this . mqtt . options . hostname } : ${ this . mqtt . options . port } ` } )
2019-09-01 18:14:16 -07:00
this . _connected = true
let [ err ] = await btc ( this . _subscribe ) ( this . topics ) // initial subscriptions
if ( err ) reject ( { error : 'unable to subscribe to topics after connection' , err : err } )
2019-09-08 19:53:51 -07:00
else {
this . emit ( 'status' , { level : 'info' , msg : 'connected' , id : this . id , opts : this . opts , connected : this . _connected } )
2020-03-15 15:56:48 -07:00
this . emit ( 'connection:consumer' , { state : 'connected' , id : this . id , mqtt : true } )
2019-09-08 19:53:51 -07:00
if ( this . opts . conPacket ) this . send ( this . conPacket )
resolve ( ` mqtt client connected to broker at ${ this . mqtt . options . hostname } : ${ this . mqtt . options . port } ` )
}
2018-02-28 20:24:45 -08:00
} )
} ) //end promise
}
2019-01-01 19:20:51 -08:00
async subscribe ( topic , options = { } ) {
2019-08-15 08:11:18 -07:00
// topic is only comma delimited string or array of topics
if ( typeof topic === 'string' ) topic = topic . split ( ',' )
this . topics = union ( this . topics , topic || [ ] )
2019-04-26 10:41:46 -07:00
log . debug ( { method : 'subscribe' , line : 91 , msg : ` subscription for topic ${ topic } added ` } )
2019-09-01 18:14:16 -07:00
return this . _subscribe ( topic , options )
2018-02-28 20:24:45 -08:00
}
2019-01-01 19:20:51 -08:00
async unsubscribe ( topic , options = { } ) {
2019-08-15 08:11:18 -07:00
if ( typeof topic === 'string' ) topic = topic . split ( ',' )
this . topics = xor ( this . topics , topic || [ ] )
return new Promise ( ( resolve , reject ) => {
if ( ! topic ) reject ( 'no topic supplied' )
2019-09-01 18:14:16 -07:00
if ( ! this . _connected ) reject ( 'no connection to broker' )
2019-08-15 08:11:18 -07:00
this . mqtt . unsubscribe ( topic , options , ( err , granted ) => {
if ( err ) reject ( err )
else resolve ( granted )
} )
} )
2018-02-28 20:24:45 -08:00
}
2019-09-01 18:14:16 -07:00
// solely for primary of sending uci packets. Use publish for typical mqtt messaging
2019-09-08 19:53:51 -07:00
// if topic not passed then it will send to the topic given by the cmd
2019-09-01 18:14:16 -07:00
async _send ( ipacket , topic , options ) { // packet is required
2020-03-15 15:56:48 -07:00
if ( ! ipacket || ! Object . keys ( ipacket ) . length ) return { error : 'send aborted, no packet passed' }
2019-09-01 18:14:16 -07:00
if ( isPlainObject ( topic ) ) { // if neither then assume none and what is passed is options
options = topic || { }
topic = ''
}
return new Promise ( async resolve => {
if ( ! this . _connected ) {
log . error ( { method : 'send' , line : 105 , url : this . url , opts : this . opts , msg : 'no active connection to broker' } )
resolve ( { error : 'client not connected to a broker, aborting send' } )
}
let packet = Object . assign ( { } , ipacket ) // need to avoid mutuation for different consumers using same packet instance
packet . _header = {
id : Math . random ( )
. toString ( )
. slice ( 2 ) , // need this for when multiple sends for different consumers use same packet instanceack
returnTopic : ` ${ this . sendTopic } /return `
}
topic = topic || ` ${ this . sendTopic } /receive `
let payload = this . _serialize ( packet ) // payload is entire packet
if ( ! payload ) {
log . error ( { method : 'send' , line : 163 , msg : 'not able to serilaize packet' , packet : packet } )
resolve ( { error : 'not able to serilaize packet' , packet : packet } )
}
setTimeout ( ( ) => { resolve ( { error : 'no response from socket in 10sec' } ) } , 10000 )
this . once ( packet . _header . id , async reply => {
let res = await this . _packetProcess ( reply )
if ( ! res ) { // if packetProcess was not promise
res = reply
log . debug ( { method : 'send' , line : 180 , msg : 'consumer function was not promise returning further processing may be out of sequence' } )
}
resolve ( res ) // resolves processed packet not returned packet
} )
let res = await this . _publish ( packet , topic , options )
if ( res ) resolve ( res )
2019-08-15 08:11:18 -07:00
2019-09-01 18:14:16 -07:00
} ) // end promise
}
2020-03-15 15:56:48 -07:00
async _push ( packet , topic , options ) {
if ( ! packet || ! Object . keys ( packet ) . length ) return { error : 'push aborted, no packet passed' , packet : packet , topic : topic }
2019-08-15 08:11:18 -07:00
2019-09-01 18:14:16 -07:00
if ( ! this . _connected ) {
2019-08-15 08:11:18 -07:00
log . error ( { method : 'send' , line : 105 , url : this . url , opts : this . opts , msg : 'no active connection to broker' } )
2019-09-01 18:14:16 -07:00
return { error : 'no active connection to broker' }
2019-02-14 14:01:11 -08:00
}
2019-08-15 08:11:18 -07:00
2019-09-01 18:14:16 -07:00
if ( isPlainObject ( topic ) ) { // if neither then assume none and what is passed is options
2019-08-15 08:11:18 -07:00
options = topic || { }
2019-09-01 18:14:16 -07:00
topic = ''
2019-01-01 19:20:51 -08:00
}
2019-04-26 10:41:46 -07:00
2019-09-01 18:14:16 -07:00
topic = topic || packet . cmd ? packet . cmd . replace ( /\./gi , '/' ) : ''
2019-08-15 08:11:18 -07:00
2019-09-01 18:14:16 -07:00
if ( ! topic ) {
log . warn ( { msg : 'push aborted, no topic pass nor command in packet' , packet : packet , topic : topic } )
return { error : 'push aborted, no topic given or no command in packet' , packet : packet , topic : topic }
2019-08-15 08:11:18 -07:00
}
2019-04-26 10:41:46 -07:00
2019-09-01 18:14:16 -07:00
packet . _header = { id : 'pushed' }
log . info ( { method : 'push' , line : 194 , msg : 'mqtt publishing a uci pushed packet' , packet : packet , topic : topic } )
return await this . _publish ( packet , topic , options )
}
// this is internal UCI publish
async _publish ( packet , topic , options ) {
if ( ! topic ) return { error : 'no topic to which to publish packet' }
2020-03-15 15:56:48 -07:00
let payload = packet . payload != null ? ( ( typeof packet . payload === 'string' ? packet . payload : this . _serialize ( packet . payload ) ) ) : this . _serialize ( packet ) // payload is entire packet
2019-09-01 18:14:16 -07:00
if ( ! payload ) {
log . error ( { method : 'send' , line : 163 , msg : 'not able to serilaize packet' , packet : packet } )
return { error : 'not able to serilaize packet' , packet : packet }
}
// all ready publish packets
log . info ( ( { msg : 'sending a uci packet vis mqtt' , packet : packet , topic : topic , payload : payload , options : options } ) )
let [ err ] = await btc ( this . publish ) ( topic , payload , options )
if ( err ) return { error : err }
2019-08-15 08:11:18 -07:00
}
publish ( ... args ) {
return new Promise ( ( resolve , reject ) => {
2019-09-01 18:14:16 -07:00
if ( ! this . _connected ) reject ( 'publish failed - no connection to broker' )
2019-08-15 08:11:18 -07:00
this . mqtt . publish ( ... args , ( err , result ) => {
2019-09-01 18:14:16 -07:00
if ( err ) {
reject ( err )
}
2019-08-15 08:11:18 -07:00
else resolve ( result )
} )
} )
}
end ( ... args ) {
return new Promise ( ( resolve , reject ) => {
2019-09-01 18:14:16 -07:00
this . mqtt . end ( ... args , ( err , result ) => {
2019-08-15 08:11:18 -07:00
if ( err ) reject ( err )
else resolve ( result )
} )
2018-05-24 12:31:47 -07:00
} )
2018-02-28 20:24:45 -08:00
}
2019-01-01 19:20:51 -08:00
registerPacketProcessor ( func ) {
2018-02-28 20:24:45 -08:00
this . _packetProcess = func
}
2019-08-15 08:11:18 -07:00
async _subscribe ( topic , options = { } ) {
return new Promise ( ( resolve , reject ) => {
2019-09-01 18:14:16 -07:00
if ( ! this . _connected ) reject ( 'no connection to broker during subscription attempt' )
2019-08-15 08:11:18 -07:00
if ( ! topic ) reject ( 'no topic supplied' )
this . mqtt . subscribe ( topic , options , ( err , granted ) => {
if ( err ) reject ( err )
else resolve ( granted )
} )
} )
}
2018-02-28 20:24:45 -08:00
_serialize ( json ) {
2019-01-01 19:20:51 -08:00
let [ err , payload ] = btc ( JSON . stringify ) ( json )
2018-05-24 12:31:47 -07:00
if ( err ) {
2019-04-26 10:41:46 -07:00
log . warn ( { method : '_serialize' , line : 140 , msg : ` unable to stringify json for payload: ${ json } ` } )
2018-02-28 20:24:45 -08:00
return null
}
return payload
}
2019-01-01 19:20:51 -08:00
_listen ( ) {
2020-03-15 17:00:19 -07:00
const msg = ` listening for incoming mqtt packets from broker on topics ${ this . topics } `
log . debug ( { method : '_listen' , line : 147 , msg : msg } )
this . emit ( 'socket' , { state : 'listening' , msg : msg } )
2019-09-01 18:14:16 -07:00
2019-08-15 08:11:18 -07:00
this . mqtt . on ( 'message' , messageProcess . bind ( this ) )
2018-02-28 20:24:45 -08:00
2019-01-01 19:20:51 -08:00
async function messageProcess ( topic , payload ) {
2019-04-26 10:41:46 -07:00
log . debug ( { method : '_listen' , line : 153 , msg : 'incoming mqtt message to process' , topic : topic , payload : payload } )
2018-02-28 20:24:45 -08:00
let packet = this . _handlePayload ( payload )
2019-09-01 18:14:16 -07:00
if ( isPlainObject ( packet ) ) packet . cmd = packet . cmd || topic // if payload had no command so use topic as cmd
else packet = { cmd : topic , data : packet } // payload was not json packet, create a packet and emit payload
this . emit ( 'payload' , { topic : topic , payload : payload } ) // emit 'payload' event for use for standard mqtt pub/sub
let header = Object . assign ( { } , packet . _header ) // save the header in case a packet processor deletes it
let res = await this . _packetProcess ( packet )
2019-09-08 19:53:51 -07:00
if ( header && this . client ) this . emit ( header . id , packet ) // is acting like a client then emit returned packet
else { // act like a server, process and return reply
2019-09-01 18:14:16 -07:00
res . _header = header
log . info ( { method : '_listen' , line : 159 , msg : 'processed packet - return only if sent not published' , processed : res } )
if ( res . _header ) {
if ( res . _header . returnTopic ) this . _publish ( res , res . _header . returnTopic )
2019-04-27 12:08:28 -07:00
}
}
2019-09-01 18:14:16 -07:00
2018-02-28 20:24:45 -08:00
}
}
2019-01-01 19:20:51 -08:00
_handlePayload ( payload ) {
let [ err , packet ] = btc ( JSON . parse ) ( payload . toString ( ) )
2019-04-26 10:41:46 -07:00
if ( err ) log . debug ( { method : '_handlePayload' , line : 201 , msg : 'payload is not json returning as prop data:<payload>' } )
2019-09-01 18:14:16 -07:00
if ( ! isPlainObject ( packet ) ) return payload . toString ( )
2018-02-28 20:24:45 -08:00
return packet
}
2019-09-01 18:14:16 -07:00
_validateTopic ( topic ) {
if ( this . topics . some ( ctopic => isTopicMatch ( ctopic , topic ) ) ) {
log . warn ( { msg : 'topic not valid for publishing, matches a subscribed topic or error topic' , topic : topic , errorTopic : this . errorTopics , subscribed : this . topics } )
return false
}
return true
}
2018-02-28 20:24:45 -08:00
// default packet process just a simple console logger. ignores any cmd: prop
2019-09-01 18:14:16 -07:00
_packetProcess ( packet ) {
2018-02-28 20:24:45 -08:00
console . log ( '=========================' )
2019-09-01 18:14:16 -07:00
console . log ( 'received packet to be processed=>' , packet )
2019-08-15 08:11:18 -07:00
console . log ( 'replace this by setting new function with .registerPacketProcessor' )
2019-09-01 18:14:16 -07:00
packet . msg = 'the default packet processeor processed this packet'
2018-02-28 20:24:45 -08:00
console . log ( '========================' )
2019-09-01 18:14:16 -07:00
return packet
2018-02-28 20:24:45 -08:00
}
async _log ( ) {
2019-01-01 19:20:51 -08:00
log = logger ( {
file : 'src/client.js' ,
class : 'Client' ,
2019-02-14 14:01:11 -08:00
package : '@uci/mqtt' ,
2019-01-01 19:20:51 -08:00
id : this . id
} )
2018-02-28 20:24:45 -08:00
2019-08-15 08:11:18 -07:00
this . mqtt . on ( 'packetsend' , packet => {
2019-04-26 10:41:46 -07:00
log . debug ( { method : '_log' , line : 201 , packet : packet , msg : 'outgoing packet to mqtt broker' } )
2018-02-28 20:24:45 -08:00
} )
2019-08-15 08:11:18 -07:00
this . mqtt . on ( 'packetreceive' , packet => {
2019-04-26 10:41:46 -07:00
log . debug ( { method : '_log' , line : 205 , packet : packet , msg : 'incoming packet from mqtt broker' } )
2018-02-28 20:24:45 -08:00
} )
}
2019-01-01 19:20:51 -08:00
} // end mqtt client class
2018-02-28 20:24:45 -08:00
2019-01-01 19:20:51 -08:00
export default MQTTClient