From 4a63a53a3deab8111c53d42532e7b4bbed797053 Mon Sep 17 00:00:00 2001 From: David Kebler Date: Tue, 1 Jan 2019 19:20:51 -0800 Subject: [PATCH] update dependencies little cleanup add jsdoc comments --- package.json | 16 ++--- src/client.js | 169 +++++++++++++++++++++++++++++++++----------------- 2 files changed, 119 insertions(+), 66 deletions(-) diff --git a/package.json b/package.json index 785193c..41685c3 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@uci/mqtt", - "version": "0.1.0", + "version": "0.1.2", "description": "mqtt client with json payloads and mqtt custom broker", "main": "src", "scripts": { @@ -26,8 +26,8 @@ }, "homepage": "https://github.com/uCOMmandIt/uci-changeme#readme", "dependencies": { - "@uci/logger": "^0.0.3", - "async-mqtt": "^1.0.1", + "@uci/logger": "0.0.6", + "async-mqtt": "^2.0.0", "better-try-catch": "^0.6.2", "is-plain-object": "^2.0.4", "lodash.merge": "^4.6.1", @@ -35,12 +35,12 @@ "lodash.xor": "^4.5.0" }, "devDependencies": { - "chai": "^4.1.2", + "chai": "^4.2.0", "chai-as-promised": "^7.1.1", - "codecov": "^3.0.0", - "esm": "^3.0.21", + "codecov": "^3.1.0", + "esm": "^3.0.84", "istanbul": "^0.4.5", - "mocha": "^4.0.1", - "nodemon": "^1.14.3" + "mocha": "^5.2.0", + "nodemon": "^1.18.6" } } diff --git a/src/client.js b/src/client.js index adc8419..4ca3c9b 100644 --- a/src/client.js +++ b/src/client.js @@ -8,94 +8,137 @@ import isPlainObject from 'is-plain-object' let log = {} -export default class Client { - constructor (opts={}) { - this.id = opts.id || opts.name || 'mqtt:'+ new Date().getTime() +/** + * MQTT - Client - An mqtt client that supports UCI packets + * it extends the {@link https://github.com/mqttjs/async-mqtt| async version this mqtt client } albiet in a clunky way with a private extend method because it was not written as an extendable class + * @extends MQTT.js + */ +class MQTTClient { + /** + * constructor - Description + * + * @param {object} [opts={}] Description + * + */ + constructor(opts = {}) { + this.id = opts.id || opts.name || 'mqtt:' + new Date().getTime() + log = logger({ + file: 'src/client.js', + class: 'Client', + name: 'mqtt', + id: this.id + }) this.url = opts.url || '' // subscription topics can be string of commna delimited or array of strings see object see mqtt.js docs - this.topics = Array.isArray(opts.topics) ? opts.topics : (opts.topics ? opts.topics.split(',') : [this.id]) - this.opts = opts.connect || {} // see options for new mqtt.Client + this.topics = Array.isArray(opts.topics) + ? opts.topics + : opts.topics + ? opts.topics.split(',') + : [this.id] + this.opts = opts.connect || {} // see options for new mqtt.Client // self bindings this.connect = this.connect.bind(this) this.push = this.send } - async connect () { - - return new Promise( (resolve,reject) => { - - let mqtt = connect(this.url,this.opts) - this._extend(mqtt,'subscribe,unsubscribe') // merge mqtt client into class extend with given functions + /** + * connect - Description + * + * @returns {type} Description + */ + async connect() { + return new Promise((resolve, reject) => { + let mqtt = connect( + this.url, + this.opts + ) + this._extend(mqtt, 'subscribe,unsubscribe') // merge mqtt client into class extend with given functions + console.log('publish,', this.publish) let timeout = this.opts.connectTimeout || 5000 - setTimeout(()=>{ - reject(`ending mqtt connection attempt, no broker at ${this._client.options.hostname}:${this._client.options.port}`) - },timeout) + setTimeout(() => { + reject( + `ending mqtt connection attempt, no broker at ${ + this._client.options.hostname + }:${this._client.options.port}` + ) + }, timeout) this.once('connect', () => { this._log() this._listen() this.on('reconnect', () => { - log.info('mqtt client reconnected to broker' ) + log.info('mqtt client reconnected to broker') this.subscribe(this.topics) }) this.on('error', err => { - log.fatal({err:err},'connection error to broker' ) - console.log('connection error',err.code) + log.fatal({ err: err }, 'connection error to broker') + console.log('connection error', err.code) }) this.subscribe(this.topics) - log.info({options:this._client.options},`mqtt client connected to broker at ${this._client.options.hostname}:${this._client.options.port}`) - resolve(`mqtt client connected to broker at ${this._client.options.hostname}:${this._client.options.port}`) + log.info( + { options: this._client.options }, + `mqtt client connected to broker at ${ + this._client.options.hostname + }:${this._client.options.port}` + ) + resolve( + `mqtt client connected to broker at ${ + this._client.options.hostname + }:${this._client.options.port}` + ) }) - }) //end promise } - async subscribe(topic,options={}) { + async subscribe(topic, options = {}) { if (options.pub) { - if (typeof topic==='string') topic = topic.split(',') - this.topics=union(this.topics,topic) + if (typeof topic === 'string') topic = topic.split(',') + this.topics = union(this.topics, topic) } log.info(`subscription for topic ${topic} added`) - this._subscribe(topic,options) + this._subscribe(topic, options) } - async unsubscribe(topic,options={}) { + async unsubscribe(topic, options = {}) { if (!options.pub) { - if (typeof topic==='string') topic = topic.split(',') - this.topics=xor(this.topics,topic) + if (typeof topic === 'string') topic = topic.split(',') + this.topics = xor(this.topics, topic) } this._unsubscribe(topic) } - async send(packet,topics,options) { - if (typeof topics !=='string' && !Array.isArray(topics)) { + async send(packet, topics, options) { + if (typeof topics !== 'string' && !Array.isArray(topics)) { options = topics topics = this.topics - } else { if (typeof topics ==='string') topics=topics.split(',') } + } else { + if (typeof topics === 'string') topics = topics.split(',') + } let payload = {} if (typeof packet !== 'string') { payload = this._serialize(packet) - if (!payload) payload = this._serialize({ err: 'could not serialze packet'}) + if (!payload) + payload = this._serialize({ err: 'could not serialze packet' }) topics = [packet.cmd] || topics // if payload has a cmd use that as topic - } - else payload = this._serialize({ data:packet }) + } else payload = this._serialize({ data: packet }) let pubs = [] - topics.forEach( async topic => { + topics.forEach(async topic => { log.info(`sending ${payload} to topic ${topic} with options ${options}`) - pubs.push(this.publish(topic,payload,options)) + console.log('publish,', this.publish) + pubs.push(this.publish(topic, payload, options)) }) return await Promise.all(pubs) } - registerPacketProcessor (func) { + registerPacketProcessor(func) { this._packetProcess = func } _serialize(json) { - let [err,payload] = btc(JSON.stringify)(json) + let [err, payload] = btc(JSON.stringify)(json) if (err) { log.warn(`error unable to stringify json:${json}`) return null @@ -103,41 +146,51 @@ export default class Client { return payload } - _listen () { - log.info(`listening for incoming packets from broker on topics ${this.topics}`) - this.on('message',messageProcess.bind(this)) + _listen() { + log.info( + `listening for incoming packets from broker on topics ${this.topics}` + ) + this.on('message', messageProcess.bind(this)) - async function messageProcess (topic,payload) { + async function messageProcess(topic, payload) { log.info('incoming message on topic', topic) let packet = this._handlePayload(payload) - if(packet) packet.cmd = packet.cmd || topic // payload had no command that use topic as cmd - else packet = {cmd:topic} - let res = await this._packetProcess(packet,topic) || {} + if (packet) packet.cmd = packet.cmd || topic + // payload had no command that use topic as cmd + else packet = { cmd: topic } + let res = (await this._packetProcess(packet, topic)) || {} this.send(res) } } - _handlePayload (payload) { - let [err,packet] = btc(JSON.parse)(payload.toString()) + _handlePayload(payload) { + let [err, packet] = btc(JSON.parse)(payload.toString()) if (err) { log.info('payload not json returning as prop data:') } - if (!isPlainObject(packet)) packet = { data:payload.toString() } + if (!isPlainObject(packet)) packet = { data: payload.toString() } return packet } // default packet process just a simple console logger. ignores any cmd: prop - _packetProcess (packet,topic) { + _packetProcess(packet, topic) { console.log('=========================') - console.log('default consumer processor\npacket from broker on topic:',topic) + console.log( + 'default consumer processor\npacket from broker on topic:', + topic + ) console.dir(packet) console.log(packet.status) console.log('========================') } async _log() { - - log = logger({file:'src/client.js',class:'Client',name:'mqtt',id:this.id}) + log = logger({ + file: 'src/client.js', + class: 'Client', + name: 'mqtt', + id: this.id + }) this.on('close', () => { log.info('connection to broker was closed') @@ -148,26 +201,26 @@ export default class Client { }) this.on('packetsend', packet => { - log.info({packet:packet},'outgoing packet to mqtt broker') + log.info({ packet: packet }, 'outgoing packet to mqtt broker') }) this.on('packetreceive', packet => { - log.info({packet:packet},'incoming packet from mqtt broker') + log.info({ packet: packet }, 'incoming packet from mqtt broker') }) - } - _extend (obj, funcs) { + _extend(obj, funcs) { let temp = {} funcs = funcs.split(',') funcs.forEach(func => { temp[func] = this[func] }) - merge(this,obj) + merge(this, obj) funcs.forEach(func => { - this['_'+func]=this[func] + this['_' + func] = this[func] this[func] = temp[func] }) } +} // end mqtt client class -} // end mqtt client class +export default MQTTClient