From 1e334022f478f10735cf48f186e9cdb8fb97dba4 Mon Sep 17 00:00:00 2001 From: David Kebler Date: Thu, 15 Aug 2019 08:11:18 -0700 Subject: [PATCH] 0.1.12 refactored to use mqtt.js directly. Converts needed functions to promises. Updated other deps --- .gitignore | 2 +- .npmignore | 2 + .travis.yml | 12 --- example/example.js | 19 ---- examples/client.js | 14 +++ examples/server.js | 26 ++++++ package.json | 23 +++-- readme.md | 6 +- src/index.js | 7 +- src/{client.js => mqtt.js} | 181 +++++++++++++++++++++---------------- 10 files changed, 165 insertions(+), 127 deletions(-) delete mode 100644 .travis.yml delete mode 100644 example/example.js create mode 100644 examples/client.js create mode 100644 examples/server.js rename src/{client.js => mqtt.js} (50%) diff --git a/.gitignore b/.gitignore index faad3eb..aab1692 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,3 @@ /node_modules/ -/coverage/ *.lock +/archive/ diff --git a/.npmignore b/.npmignore index f16fc41..480e0f5 100644 --- a/.npmignore +++ b/.npmignore @@ -2,3 +2,5 @@ tests/ test/ *.test.js testing/ +archive/ +example/ diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index 5b0b93e..0000000 --- a/.travis.yml +++ /dev/null @@ -1,12 +0,0 @@ -language: node_js - -node_js: - - '7.10' - - 'node' - -sudo: false - -script: npm test - -after_success: - - bash <(curl -s https://codecov.io/bash) || echo "Codecov did not collect coverage reports" diff --git a/example/example.js b/example/example.js deleted file mode 100644 index fadffaa..0000000 --- a/example/example.js +++ /dev/null @@ -1,19 +0,0 @@ -import Client from '../src/client' -// import btc from 'better-try-catch' - -// let mqtt = new Client({id:'example-mqtt-client', url:'tcp://trantor:1883', topics:'test1'}) -let mqtt = new Client({id:'example-mqtt-client', topics:'lights'}) - -; -(async () => { - - console.log(await mqtt.connect()) - await mqtt.send({cmd:'lights/on', status:'sent to topics lights'}) - await mqtt.send('topic2', {cmd:'test', status:'sent to topic2 explicitly but not listening'}) - // await mqtt.subscribe('topic2') - // await mqtt.send('topic2', {cmd:'test', status:'sent to topic2 with subscription'}) - // now can send via some mqtt client - -})().catch(err => { - console.error('FATAL: UNABLE TO START SYSTEM!\n',err) -}) diff --git a/examples/client.js b/examples/client.js new file mode 100644 index 0000000..b0d1363 --- /dev/null +++ b/examples/client.js @@ -0,0 +1,14 @@ +import MQTT from '../src/mqtt' +// import btc from 'better-try-catch' +const HOST='nas.kebler.net' +let client = new MQTT({id:'example-mqtt-client', topics:'', connect:{hostname:HOST}}) + +; +(async () => { + // mqtt.registerPacketProcessor(logit) + console.log(await client.connect()) + client.send({cmd:'lights/on', id:'someidofalight'}) + +})().catch(err => { + console.error('FATAL: UNABLE TO START SYSTEM!\n',err) +}) diff --git a/examples/server.js b/examples/server.js new file mode 100644 index 0000000..efa421c --- /dev/null +++ b/examples/server.js @@ -0,0 +1,26 @@ +import Client from '../src/client' +// import btc from 'better-try-catch' +const HOST='nas.kebler.net' +let mqtt = new Client({id:'example-mqtt-client', topics:'', connect:{hostname:HOST}}) + +// async function logit (packet) { +// if (packet.cmd === 'lights') { +// console.log(packet) +// } +// return null +// } + +; +(async () => { + // mqtt.registerPacketProcessor(logit) + console.log(await mqtt.connect()) + mqtt.send({cmd:'lights/status', id:'someidofalight'}) + mqtt.subscribe('test') + // await mqtt.send('topic2', {cmd:'test', status:'sent to topic2 explicitly but not listening'}) + // await mqtt.subscribe('topic2') + // await mqtt.send('topic2', {cmd:'test', status:'sent to topic2 with subscription'}) + // now can send via some mqtt client + +})().catch(err => { + console.error('FATAL: UNABLE TO START SYSTEM!\n',err) +}) diff --git a/package.json b/package.json index ff5b0f8..c8dfed3 100644 --- a/package.json +++ b/package.json @@ -1,13 +1,13 @@ { "name": "@uci/mqtt", - "version": "0.1.11", - "description": "mqtt client with json payloads and mqtt custom broker", + "version": "0.1.12", + "description": "mqtt client with UCI json packet payloads", "main": "src", "scripts": { "testw": "mocha -r esm test/*.test.mjs --watch --recurse --watch-extensions mjs", "test": "mocha -r esm test/*.test.mjs", - "example": "node -r esm example/example", - "dev": "./node_modules/.bin/nodemon -r esm example/example" + "client": "node -r esm example/client", + "dev": "UCI_ENV=dev UCI_LOG_LEVEL=debug ./node_modules/.bin/nodemon -r esm example/example" }, "author": "David Kebler", "license": "MIT", @@ -25,19 +25,18 @@ }, "homepage": "https://github.com/uCOMmandIt/uci-changeme#readme", "dependencies": { - "@uci-utils/logger": "^0.0.14", - "async-mqtt": "1.0.1", + "@uci-utils/logger": "^0.0.15", "better-try-catch": "0.6.2", - "is-plain-object": "^2.0.4", - "lodash.merge": "^4.6.1", + "is-plain-object": "^3.0.0", "lodash.union": "^4.6.0", "lodash.xor": "^4.5.0", - "mqtt-match": "^2.0.2" + "mqtt": "^3.0.0", + "mqtt-match": "^2.0.3" }, "devDependencies": { "chai": "^4.2.0", - "esm": "^3.2.5", - "mocha": "^5.2.0", - "nodemon": "^1.18.10" + "esm": "^3.2.25", + "mocha": "^6.2.0", + "nodemon": "^1.19.1" } } diff --git a/readme.md b/readme.md index 64e4fc2..12bf7e5 100644 --- a/readme.md +++ b/readme.md @@ -1 +1,5 @@ -# uCOMmandIt MQTT +# uCOMmandIt MQTT client + +This module integrates MQTT messaging with UCI Packet Messaging. + +One can `send` a UCI JSON packet and by default the command will be translated into the MQTT Topic and the remainder into the payload and vice versa a listener waits for incoming MQTT messages translates them to UCI packages diff --git a/src/index.js b/src/index.js index 8ef5b60..96ec97f 100644 --- a/src/index.js +++ b/src/index.js @@ -1,6 +1,5 @@ -import Client from './client' +import MQTT from './mqtt' // import Broker from './broker' -export { Client as Client } -// export { Broker as Broker } -export default Client +export { MQTT as MQTT } +export default MQTT diff --git a/src/client.js b/src/mqtt.js similarity index 50% rename from src/client.js rename to src/mqtt.js index 3d1d38b..2b41898 100644 --- a/src/client.js +++ b/src/mqtt.js @@ -1,6 +1,4 @@ -import { connect } from 'async-mqtt' -import { promisify } from 'util' -import merge from 'lodash.merge' +import { connect } from 'mqtt' import btc from 'better-try-catch' import union from 'lodash.union' import xor from 'lodash.xor' @@ -31,15 +29,18 @@ class MQTTClient { name: 'mqtt', id: this.id }) + this.type = opts.type || 'client' this.url = opts.url || null - this.error = opts.error || 'error' // error topic - // subscription topics can be string of commna delimited or array of strings. If none they by default topic is id + this.errorTopic = opts.errorTopic || 'error' // a generic error topic + // default subscription topics can be string of commna delimited or array of strings. If none they by default topic is id this.topics = Array.isArray(opts.topics) ? opts.topics : opts.topics ? opts.topics.split(',') : [this.id] - this.opts = opts.connect || {} // see options for new mqtt.Client + this.topics.push('reply') + this.connectOpts = opts.connect || {} // see options for new mqtt.Client + this.mqtt = {} // orginal mqtt.js object when created by connect // self bindings this.connect = this.connect.bind(this) this.push = this.send @@ -52,90 +53,129 @@ class MQTTClient { */ async connect() { return new Promise((resolve, reject) => { - let mqtt = connect( + this.mqtt = connect( this.url, - this.opts + this.connectOpts ) - // version 2.0 and up messes this up. Need to look at another way - // maybe better not to merge but have mqtt client be a property of class - this._extend(mqtt, 'subscribe,unsubscribe') // merge mqtt client into class extend with given functions - this.close = promisify(this.end).bind(this) - let timeout = this.opts.connectTimeout || 5000 + let timeout = this.connecOpts || 5000 setTimeout(() => { - reject({msg:'ending mqtt connection attempt, no broker', url:this.url, opts:this.opts}) + reject({msg:'ending mqtt connection attempt, no broker', url:this.url, options:this.connectOpts}) }, timeout) - this.once('connect', () => { + this.mqtt.once('connect', () => { this._log() this._listen() - this.on('reconnect', () => { + this.mqtt.on('reconnect', () => { log.debug({method:'connect', line:71, msg:'mqtt client reconnected to broker'}) - this.subscribe(this.topics) + this._subscribe(this.topics) // resubscribe }) - this.on('error', err => { + this.mqtt.on('error', err => { log.fatal({method:'connect', line:76, err: err, msg:'connection error to broker'}) }) - this.subscribe(this.topics) - log.debug({method:'connect', line:80, options: this._client.options, msg:`mqtt client connected to broker at ${this._client.options.hostname}:${this._client.options.port}`}) - resolve(`mqtt client connected to broker at ${this._client.options.hostname}:${this._client.options.port}`) + this._subscribe(this.topics) // initial subscriptions + log.debug({method:'connect', line:80, options: this.mqtt.options, msg:`mqtt client connected to broker at ${this.mqtt.options.hostname}:${this.mqtt.options.port}`}) + resolve(`mqtt client connected to broker at ${this.mqtt.options.hostname}:${this.mqtt.options.port}`) }) }) //end promise } async subscribe(topic, options = {}) { - if (options.pub) { - if (typeof topic === 'string') topic = topic.split(',') - this.topics = union(this.topics, topic) - } + // topic is only comma delimited string or array of topics + if (typeof topic === 'string') topic = topic.split(',') + this.topics = union(this.topics, topic || []) log.debug({method:'subscribe', line:91, msg:`subscription for topic ${topic} added`}) - this._subscribe(topic, options) + this._subscribe(topic,options) } + async unsubscribe(topic, options = {}) { - if (!options.pub) { - if (typeof topic === 'string') topic = topic.split(',') - this.topics = xor(this.topics, topic) - } - this._unsubscribe(topic) + if (typeof topic === 'string') topic = topic.split(',') + this.topics = xor(this.topics, topic || []) + + return new Promise((resolve, reject) => { + if (!topic) reject('no topic supplied') + if (!this.mqtt.connected) reject('no connection to broker') + this.mqtt.unsubscribe(topic,options, (err, granted) => { + if (err) reject(err) + else resolve(granted) + }) + }) } - async send(packet, topics, options) { - if (!this.hasOwnProperty('publish')) { - log.error({method:'send', line:105, url:this.url, opts:this.opts, msg:'connect method never called, initialization needed, aborting send'}) - return + // sends a uci packet as an mqtt message where cmd: is topic + // TODO if sending as a client (default) add a _header with ID to packet and set subcription for id and in _listen emit id with payload when it comes back + // if a "server" then after processing publish back as /ID/theidnumber + async send(packet, topic, options) { + + log.debug(({msg:'sending a packet', packet:packet,topic:topic,options:options})) + + if (!this.mqtt.connected) { + log.error({method:'send', line:105, url:this.url, opts:this.opts, msg:'no active connection to broker'}) + return Promise.reject('no active connection to broker') } - if (typeof topics !== 'string' && !Array.isArray(topics)) { - options = topics - topics = this.topics - } else { - if (typeof topics === 'string') topics = topics.split(',') + + if (topic == null || typeof topic !== 'string') { // if neither then assume none and what is passed is options + options = topic || {} + topic ='' } let payload if (typeof packet !== 'string') { + // packet is an object + topic = packet.cmd ? packet.cmd.replace(/\./gi,'/') : topic if (packet.payload) payload = packet.payload.toString() else { - payload = this._serialize(packet) + payload = this._serialize(packet) // payload is entire packet if (!payload) payload = this._serialize({ err: 'could not serialze packet' }) } - topics = packet.cmd ? [packet.cmd.replace(/\./gi,'/') ] : topics // if payload has a cmd use that as topic, convert any . to slash - } else payload = packet // packet was just a string + } else payload = packet // packet was just a string so it's the payload - let pubs = [] - topics.forEach(async topic => { - log.debug({method:'send', line:127, msg:'mqtt publishing', payload:payload, topic:topic}) - pubs.push(this.publish(topic, payload, options)) + if (topic.length === 0) { + log.warn({msg:'publish aborted, no topic pass nor command in packet', packet:packet, topic:topic}) + return {error:'publish aborted, no topic given or no command in packet', packet:packet, topic:topic} + } + + log.debug({method:'send', line:127, msg:'mqtt publishing', payload:payload, topic:topic}) + return await this.publish(topic, payload, options) + } + + publish (...args) { + return new Promise((resolve, reject) => { + if (!this.mqtt.connected) reject('no connection to broker') + this.mqtt.publish(...args, (err, result) => { + if (err) reject(err) + else resolve(result) + }) + }) + } + + end (...args) { + return new Promise((resolve, reject) => { + this._client.end(...args, (err, result) => { + if (err) reject(err) + else resolve(result) + }) }) - return await Promise.all(pubs) } registerPacketProcessor(func) { this._packetProcess = func } + async _subscribe (topic, options={}) { + return new Promise((resolve, reject) => { + if (!this.mqtt.connected) reject('no connection to broker') + if (!topic) reject('no topic supplied') + this.mqtt.subscribe(topic,options, (err, granted) => { + if (err) reject(err) + else resolve(granted) + }) + }) + } + _serialize(json) { let [err, payload] = btc(JSON.stringify)(json) if (err) { @@ -144,22 +184,22 @@ class MQTTClient { } return payload } - + // TODO if client set up reply subscription with message id and then emit that for resolving send + // if server then take id and send message back with that appended to topic _listen() { log.info({method:'_listen', line:147, msg:`listening for incoming mqtt packets from broker on topics ${this.topics}`}) - this.on('message', messageProcess.bind(this)) + this.mqtt.on('message', messageProcess.bind(this)) async function messageProcess(topic, payload) { log.debug({method:'_listen', line:153 ,msg:'incoming mqtt message to process', topic:topic, payload:payload}) let packet = this._handlePayload(payload) - if (packet) packet.cmd = packet.cmd || topic - // payload had no command that use topic as cmd + if (packet) packet.cmd = packet.cmd || topic // if payload had no command so use topic as cmd else packet = { cmd: topic } let res = (await this._packetProcess(packet, topic)) || {} log.debug({method:'_listen', line:159, msg:'processed packet back to broker', res:res, topics:this.topics}) - if (this.topics.some(topic => isTopicMatch(topic,res.cmd))) { - if (this.topics.some( topic => isTopicMatch(topic,this.error))) { - log.warn({method:'_listen', line:165, msg:'unable to send error, subscried to error topic', subscribed:this.topics, errorTopic:this.error}) + if (this.topics.some(topic => isTopicMatch(topic,res.cmd || ''))) { + if (this.topics.some( topic => isTopicMatch(topic,this.errorTopic))) { + log.warn({method:'_listen', line:165, msg:'unable to send error, subscried to error topic', subscribed:this.topics, errorTopic:this.errorTopic}) return } else { @@ -167,7 +207,8 @@ class MQTTClient { log.warn({method:'_listen', line:168,msg:'sent mqtt error message', error:res}) } } - this.send(res) + // TODO if acting as a "server" then send back the result, if client and ID is matched then emit for send function + // if (res) this.send(res) } } @@ -181,12 +222,9 @@ class MQTTClient { // default packet process just a simple console logger. ignores any cmd: prop _packetProcess(packet, topic) { console.log('=========================') - console.log( - 'default consumer processor\npacket from broker on topic:', - topic - ) + console.log('default consumer processor\npacket from broker on topic:',topic) + console.log('replace this by setting new function with .registerPacketProcessor') console.dir(packet) - console.log(packet.status) console.log('========================') } @@ -198,36 +236,23 @@ class MQTTClient { id: this.id }) - this.on('close', () => { + this.mqtt.on('close', () => { log.debug({method:'_log', line:193, msg:'connection to broker was closed'}) }) - this.on('offline', () => { + this.mqtt.on('offline', () => { log.warn({method:'_log', line:197, msg:'this client has gone offline from broker'}) }) - this.on('packetsend', packet => { + this.mqtt.on('packetsend', packet => { log.debug({method:'_log', line:201, packet: packet, msg:'outgoing packet to mqtt broker'}) }) - this.on('packetreceive', packet => { + this.mqtt.on('packetreceive', packet => { log.debug({method:'_log', line:205, packet: packet, msg:'incoming packet from mqtt broker'}) }) } - _extend(obj, funcs) { - let temp = {} - funcs = funcs.split(',') - funcs.forEach(func => { - temp[func] = this[func] - }) - merge(this, obj) - - funcs.forEach(func => { - this['_' + func] = this[func] - this[func] = temp[func] - }) - } } // end mqtt client class export default MQTTClient