0.1.13 add back keeping payload as is when passed in a packet

emit consumer-connection events
This commit is contained in:
David Kebler 2019-09-08 19:53:51 -07:00
parent ec953c0123
commit a1a9eb0822
3 changed files with 24 additions and 11 deletions

View file

@ -1,6 +1,6 @@
{
"name": "@uci/mqtt",
"version": "0.1.12",
"version": "0.1.13",
"description": "mqtt client with UCI json packet payloads",
"main": "src",
"scripts": {
@ -28,7 +28,7 @@
},
"homepage": "https://github.com/uCOMmandIt/uci-changeme#readme",
"dependencies": {
"@uci-utils/logger": "^0.0.15",
"@uci-utils/logger": "^0.0.16",
"better-try-catch": "0.6.2",
"is-plain-object": "^3.0.0",
"lodash.union": "^4.6.0",
@ -40,7 +40,7 @@
"chai": "^4.2.0",
"delay": "^4.3.0",
"esm": "^3.2.25",
"mocha": "^6.2.0",
"nodemon": "^1.19.1"
"mocha": "^6.2.2",
"nodemon": "^2.0.0"
}
}

View file

@ -2,4 +2,7 @@
This module integrates MQTT messaging with UCI Packet Messaging.
One can use it as a standard mqtt pub/sub but also provides a transport and functionality for UCI packets via send and push METHODS
In which case they instance can act like a consumer/client (send) or server/socket (push) based on client option
One can `send` a UCI JSON packet and by default the command will be translated into the MQTT Topic and the remainder into the payload and vice versa a listener waits for incoming MQTT messages translates them to UCI packages

View file

@ -33,6 +33,7 @@ class MQTTClient extends EventEmitter {
this.url = opts.url || null
this.client = opts.client || false
this.sendTopic = opts.sendTopic || 'uci/send'
this.opts = opts
// these are the "listen" topics to which the instance will subscribe. By default that will include the instance ID
// default subscription topics can be string of commna delimited or array of strings. If none they by default topic is id
this.topics = opts.topics ? (Array.isArray(opts.topics) ? opts.topics : opts.topics.split(',')) : []
@ -47,6 +48,8 @@ class MQTTClient extends EventEmitter {
this.client ? this.send = this._send.bind(this) : this.push = this._push.bind(this)
}
// TODO add UCI authentication and MQTT authentication
get connected() { return this._connected }
/**
@ -73,11 +76,15 @@ class MQTTClient extends EventEmitter {
log.debug({method:'connect', line:71, msg:'mqtt client reconnected to broker'})
this._connected=true
this._subscribe(this.topics) // resubscribe
this.emit('status',{level:'info', msg:'reconnected', id:this.id, opts:this.opts, connected:this._connected})
this.emit('consumer-connection', {state:'reconnected', id:this.id})
if (this.opts.conPacket) this.send(this.conPacket)
})
this.mqtt.on('error', err => {
this.emit('status',{level:'fatal', method:'connect', err: err, msg:'connection error to broker'})
log.fatal({method:'connect', line:76, err: err, msg:'connection error to broker'})
this.emit('consumer-connection', {state:'disconnected', id:this.id})
this._connected=false
})
@ -97,7 +104,12 @@ class MQTTClient extends EventEmitter {
this._connected=true
let [err] = await btc(this._subscribe)(this.topics) // initial subscriptions
if (err) reject({error:'unable to subscribe to topics after connection', err:err})
resolve(`mqtt client connected to broker at ${this.mqtt.options.hostname}:${this.mqtt.options.port}`)
else{
this.emit('status',{level:'info', msg:'connected', id:this.id, opts:this.opts, connected:this._connected})
this.emit('consumer-connection', {state:'connected', id:this.id})
if (this.opts.conPacket) this.send(this.conPacket)
resolve(`mqtt client connected to broker at ${this.mqtt.options.hostname}:${this.mqtt.options.port}`)
}
})
}) //end promise
}
@ -125,7 +137,7 @@ class MQTTClient extends EventEmitter {
}
// solely for primary of sending uci packets. Use publish for typical mqtt messaging
// if topic not passed, nor a severID supplied in the options then it will send to teh topic given by the cmd
// if topic not passed then it will send to the topic given by the cmd
async _send(ipacket, topic, options) { // packet is required
if (isPlainObject(topic)) { // if neither then assume none and what is passed is options
@ -199,7 +211,7 @@ class MQTTClient extends EventEmitter {
// this is internal UCI publish
async _publish(packet,topic,options) {
if(!topic) return {error: 'no topic to which to publish packet'}
let payload = this._serialize(packet) // payload is entire packet
let payload = packet.payload ? (typeof packet.payload ==='string' ? packet.payload : this._serialize(packet)) : this._serialize(packet) // payload is entire packet
if (!payload) {
log.error({method:'send', line:163, msg:'not able to serilaize packet', packet:packet})
return {error:'not able to serilaize packet', packet:packet}
@ -254,8 +266,6 @@ class MQTTClient extends EventEmitter {
}
return payload
}
// TODO if client set up reply subscription with message id and then emit that for resolving send
// if server then take id and send message back with that appended to topic
_listen() {
log.debug({method:'_listen', line:147, msg:`listening for incoming mqtt packets from broker on topics ${this.topics}`})
@ -269,8 +279,8 @@ class MQTTClient extends EventEmitter {
this.emit('payload',{topic:topic, payload:payload}) // emit 'payload' event for use for standard mqtt pub/sub
let header = Object.assign({},packet._header) // save the header in case a packet processor deletes it
let res = await this._packetProcess(packet)
if (header && this.client) this.emit(header.id,packet)
else {
if (header && this.client) this.emit(header.id,packet) // is acting like a client then emit returned packet
else { // act like a server, process and return reply
res._header = header
log.info({method:'_listen', line:159, msg:'processed packet - return only if sent not published', processed:res})
if (res._header) {