From a1a9eb08228217035c4fb7ef7ed821ef50ae7235 Mon Sep 17 00:00:00 2001 From: David Kebler Date: Sun, 8 Sep 2019 19:53:51 -0700 Subject: [PATCH] 0.1.13 add back keeping payload as is when passed in a packet emit consumer-connection events --- package.json | 8 ++++---- readme.md | 3 +++ src/mqtt.js | 24 +++++++++++++++++------- 3 files changed, 24 insertions(+), 11 deletions(-) diff --git a/package.json b/package.json index fb9da8f..1fb95ae 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@uci/mqtt", - "version": "0.1.12", + "version": "0.1.13", "description": "mqtt client with UCI json packet payloads", "main": "src", "scripts": { @@ -28,7 +28,7 @@ }, "homepage": "https://github.com/uCOMmandIt/uci-changeme#readme", "dependencies": { - "@uci-utils/logger": "^0.0.15", + "@uci-utils/logger": "^0.0.16", "better-try-catch": "0.6.2", "is-plain-object": "^3.0.0", "lodash.union": "^4.6.0", @@ -40,7 +40,7 @@ "chai": "^4.2.0", "delay": "^4.3.0", "esm": "^3.2.25", - "mocha": "^6.2.0", - "nodemon": "^1.19.1" + "mocha": "^6.2.2", + "nodemon": "^2.0.0" } } diff --git a/readme.md b/readme.md index 12bf7e5..cc91dff 100644 --- a/readme.md +++ b/readme.md @@ -2,4 +2,7 @@ This module integrates MQTT messaging with UCI Packet Messaging. +One can use it as a standard mqtt pub/sub but also provides a transport and functionality for UCI packets via send and push METHODS +In which case they instance can act like a consumer/client (send) or server/socket (push) based on client option + One can `send` a UCI JSON packet and by default the command will be translated into the MQTT Topic and the remainder into the payload and vice versa a listener waits for incoming MQTT messages translates them to UCI packages diff --git a/src/mqtt.js b/src/mqtt.js index 8b16ca5..b0c45ee 100644 --- a/src/mqtt.js +++ b/src/mqtt.js @@ -33,6 +33,7 @@ class MQTTClient extends EventEmitter { this.url = opts.url || null this.client = opts.client || false this.sendTopic = opts.sendTopic || 'uci/send' + this.opts = opts // these are the "listen" topics to which the instance will subscribe. By default that will include the instance ID // default subscription topics can be string of commna delimited or array of strings. If none they by default topic is id this.topics = opts.topics ? (Array.isArray(opts.topics) ? opts.topics : opts.topics.split(',')) : [] @@ -47,6 +48,8 @@ class MQTTClient extends EventEmitter { this.client ? this.send = this._send.bind(this) : this.push = this._push.bind(this) } + // TODO add UCI authentication and MQTT authentication + get connected() { return this._connected } /** @@ -73,11 +76,15 @@ class MQTTClient extends EventEmitter { log.debug({method:'connect', line:71, msg:'mqtt client reconnected to broker'}) this._connected=true this._subscribe(this.topics) // resubscribe + this.emit('status',{level:'info', msg:'reconnected', id:this.id, opts:this.opts, connected:this._connected}) + this.emit('consumer-connection', {state:'reconnected', id:this.id}) + if (this.opts.conPacket) this.send(this.conPacket) }) this.mqtt.on('error', err => { this.emit('status',{level:'fatal', method:'connect', err: err, msg:'connection error to broker'}) log.fatal({method:'connect', line:76, err: err, msg:'connection error to broker'}) + this.emit('consumer-connection', {state:'disconnected', id:this.id}) this._connected=false }) @@ -97,7 +104,12 @@ class MQTTClient extends EventEmitter { this._connected=true let [err] = await btc(this._subscribe)(this.topics) // initial subscriptions if (err) reject({error:'unable to subscribe to topics after connection', err:err}) - resolve(`mqtt client connected to broker at ${this.mqtt.options.hostname}:${this.mqtt.options.port}`) + else{ + this.emit('status',{level:'info', msg:'connected', id:this.id, opts:this.opts, connected:this._connected}) + this.emit('consumer-connection', {state:'connected', id:this.id}) + if (this.opts.conPacket) this.send(this.conPacket) + resolve(`mqtt client connected to broker at ${this.mqtt.options.hostname}:${this.mqtt.options.port}`) + } }) }) //end promise } @@ -125,7 +137,7 @@ class MQTTClient extends EventEmitter { } // solely for primary of sending uci packets. Use publish for typical mqtt messaging - // if topic not passed, nor a severID supplied in the options then it will send to teh topic given by the cmd + // if topic not passed then it will send to the topic given by the cmd async _send(ipacket, topic, options) { // packet is required if (isPlainObject(topic)) { // if neither then assume none and what is passed is options @@ -199,7 +211,7 @@ class MQTTClient extends EventEmitter { // this is internal UCI publish async _publish(packet,topic,options) { if(!topic) return {error: 'no topic to which to publish packet'} - let payload = this._serialize(packet) // payload is entire packet + let payload = packet.payload ? (typeof packet.payload ==='string' ? packet.payload : this._serialize(packet)) : this._serialize(packet) // payload is entire packet if (!payload) { log.error({method:'send', line:163, msg:'not able to serilaize packet', packet:packet}) return {error:'not able to serilaize packet', packet:packet} @@ -254,8 +266,6 @@ class MQTTClient extends EventEmitter { } return payload } - // TODO if client set up reply subscription with message id and then emit that for resolving send - // if server then take id and send message back with that appended to topic _listen() { log.debug({method:'_listen', line:147, msg:`listening for incoming mqtt packets from broker on topics ${this.topics}`}) @@ -269,8 +279,8 @@ class MQTTClient extends EventEmitter { this.emit('payload',{topic:topic, payload:payload}) // emit 'payload' event for use for standard mqtt pub/sub let header = Object.assign({},packet._header) // save the header in case a packet processor deletes it let res = await this._packetProcess(packet) - if (header && this.client) this.emit(header.id,packet) - else { + if (header && this.client) this.emit(header.id,packet) // is acting like a client then emit returned packet + else { // act like a server, process and return reply res._header = header log.info({method:'_listen', line:159, msg:'processed packet - return only if sent not published', processed:res}) if (res._header) {