update dependencies

little cleanup
add jsdoc comments
master
David Kebler 2019-01-01 19:20:51 -08:00
parent f08e361d2e
commit 4a63a53a3d
2 changed files with 119 additions and 66 deletions

View File

@ -1,6 +1,6 @@
{ {
"name": "@uci/mqtt", "name": "@uci/mqtt",
"version": "0.1.0", "version": "0.1.2",
"description": "mqtt client with json payloads and mqtt custom broker", "description": "mqtt client with json payloads and mqtt custom broker",
"main": "src", "main": "src",
"scripts": { "scripts": {
@ -26,8 +26,8 @@
}, },
"homepage": "https://github.com/uCOMmandIt/uci-changeme#readme", "homepage": "https://github.com/uCOMmandIt/uci-changeme#readme",
"dependencies": { "dependencies": {
"@uci/logger": "^0.0.3", "@uci/logger": "0.0.6",
"async-mqtt": "^1.0.1", "async-mqtt": "^2.0.0",
"better-try-catch": "^0.6.2", "better-try-catch": "^0.6.2",
"is-plain-object": "^2.0.4", "is-plain-object": "^2.0.4",
"lodash.merge": "^4.6.1", "lodash.merge": "^4.6.1",
@ -35,12 +35,12 @@
"lodash.xor": "^4.5.0" "lodash.xor": "^4.5.0"
}, },
"devDependencies": { "devDependencies": {
"chai": "^4.1.2", "chai": "^4.2.0",
"chai-as-promised": "^7.1.1", "chai-as-promised": "^7.1.1",
"codecov": "^3.0.0", "codecov": "^3.1.0",
"esm": "^3.0.21", "esm": "^3.0.84",
"istanbul": "^0.4.5", "istanbul": "^0.4.5",
"mocha": "^4.0.1", "mocha": "^5.2.0",
"nodemon": "^1.14.3" "nodemon": "^1.18.6"
} }
} }

View File

@ -8,94 +8,137 @@ import isPlainObject from 'is-plain-object'
let log = {} let log = {}
export default class Client { /**
constructor (opts={}) { * MQTT - Client - An mqtt client that supports UCI packets
this.id = opts.id || opts.name || 'mqtt:'+ new Date().getTime() * it extends the {@link https://github.com/mqttjs/async-mqtt| async version this mqtt client } albiet in a clunky way with a private extend method because it was not written as an extendable class
* @extends MQTT.js
*/
class MQTTClient {
/**
* constructor - Description
*
* @param {object} [opts={}] Description
*
*/
constructor(opts = {}) {
this.id = opts.id || opts.name || 'mqtt:' + new Date().getTime()
log = logger({
file: 'src/client.js',
class: 'Client',
name: 'mqtt',
id: this.id
})
this.url = opts.url || '' this.url = opts.url || ''
// subscription topics can be string of commna delimited or array of strings see object see mqtt.js docs // subscription topics can be string of commna delimited or array of strings see object see mqtt.js docs
this.topics = Array.isArray(opts.topics) ? opts.topics : (opts.topics ? opts.topics.split(',') : [this.id]) this.topics = Array.isArray(opts.topics)
this.opts = opts.connect || {} // see options for new mqtt.Client ? opts.topics
: opts.topics
? opts.topics.split(',')
: [this.id]
this.opts = opts.connect || {} // see options for new mqtt.Client
// self bindings // self bindings
this.connect = this.connect.bind(this) this.connect = this.connect.bind(this)
this.push = this.send this.push = this.send
} }
async connect () { /**
* connect - Description
return new Promise( (resolve,reject) => { *
* @returns {type} Description
let mqtt = connect(this.url,this.opts) */
this._extend(mqtt,'subscribe,unsubscribe') // merge mqtt client into class extend with given functions async connect() {
return new Promise((resolve, reject) => {
let mqtt = connect(
this.url,
this.opts
)
this._extend(mqtt, 'subscribe,unsubscribe') // merge mqtt client into class extend with given functions
console.log('publish,', this.publish)
let timeout = this.opts.connectTimeout || 5000 let timeout = this.opts.connectTimeout || 5000
setTimeout(()=>{ setTimeout(() => {
reject(`ending mqtt connection attempt, no broker at ${this._client.options.hostname}:${this._client.options.port}`) reject(
},timeout) `ending mqtt connection attempt, no broker at ${
this._client.options.hostname
}:${this._client.options.port}`
)
}, timeout)
this.once('connect', () => { this.once('connect', () => {
this._log() this._log()
this._listen() this._listen()
this.on('reconnect', () => { this.on('reconnect', () => {
log.info('mqtt client reconnected to broker' ) log.info('mqtt client reconnected to broker')
this.subscribe(this.topics) this.subscribe(this.topics)
}) })
this.on('error', err => { this.on('error', err => {
log.fatal({err:err},'connection error to broker' ) log.fatal({ err: err }, 'connection error to broker')
console.log('connection error',err.code) console.log('connection error', err.code)
}) })
this.subscribe(this.topics) this.subscribe(this.topics)
log.info({options:this._client.options},`mqtt client connected to broker at ${this._client.options.hostname}:${this._client.options.port}`) log.info(
resolve(`mqtt client connected to broker at ${this._client.options.hostname}:${this._client.options.port}`) { options: this._client.options },
`mqtt client connected to broker at ${
this._client.options.hostname
}:${this._client.options.port}`
)
resolve(
`mqtt client connected to broker at ${
this._client.options.hostname
}:${this._client.options.port}`
)
}) })
}) //end promise }) //end promise
} }
async subscribe(topic,options={}) { async subscribe(topic, options = {}) {
if (options.pub) { if (options.pub) {
if (typeof topic==='string') topic = topic.split(',') if (typeof topic === 'string') topic = topic.split(',')
this.topics=union(this.topics,topic) this.topics = union(this.topics, topic)
} }
log.info(`subscription for topic ${topic} added`) log.info(`subscription for topic ${topic} added`)
this._subscribe(topic,options) this._subscribe(topic, options)
} }
async unsubscribe(topic,options={}) { async unsubscribe(topic, options = {}) {
if (!options.pub) { if (!options.pub) {
if (typeof topic==='string') topic = topic.split(',') if (typeof topic === 'string') topic = topic.split(',')
this.topics=xor(this.topics,topic) this.topics = xor(this.topics, topic)
} }
this._unsubscribe(topic) this._unsubscribe(topic)
} }
async send(packet,topics,options) { async send(packet, topics, options) {
if (typeof topics !=='string' && !Array.isArray(topics)) { if (typeof topics !== 'string' && !Array.isArray(topics)) {
options = topics options = topics
topics = this.topics topics = this.topics
} else { if (typeof topics ==='string') topics=topics.split(',') } } else {
if (typeof topics === 'string') topics = topics.split(',')
}
let payload = {} let payload = {}
if (typeof packet !== 'string') { if (typeof packet !== 'string') {
payload = this._serialize(packet) payload = this._serialize(packet)
if (!payload) payload = this._serialize({ err: 'could not serialze packet'}) if (!payload)
payload = this._serialize({ err: 'could not serialze packet' })
topics = [packet.cmd] || topics // if payload has a cmd use that as topic topics = [packet.cmd] || topics // if payload has a cmd use that as topic
} } else payload = this._serialize({ data: packet })
else payload = this._serialize({ data:packet })
let pubs = [] let pubs = []
topics.forEach( async topic => { topics.forEach(async topic => {
log.info(`sending ${payload} to topic ${topic} with options ${options}`) log.info(`sending ${payload} to topic ${topic} with options ${options}`)
pubs.push(this.publish(topic,payload,options)) console.log('publish,', this.publish)
pubs.push(this.publish(topic, payload, options))
}) })
return await Promise.all(pubs) return await Promise.all(pubs)
} }
registerPacketProcessor (func) { registerPacketProcessor(func) {
this._packetProcess = func this._packetProcess = func
} }
_serialize(json) { _serialize(json) {
let [err,payload] = btc(JSON.stringify)(json) let [err, payload] = btc(JSON.stringify)(json)
if (err) { if (err) {
log.warn(`error unable to stringify json:${json}`) log.warn(`error unable to stringify json:${json}`)
return null return null
@ -103,41 +146,51 @@ export default class Client {
return payload return payload
} }
_listen () { _listen() {
log.info(`listening for incoming packets from broker on topics ${this.topics}`) log.info(
this.on('message',messageProcess.bind(this)) `listening for incoming packets from broker on topics ${this.topics}`
)
this.on('message', messageProcess.bind(this))
async function messageProcess (topic,payload) { async function messageProcess(topic, payload) {
log.info('incoming message on topic', topic) log.info('incoming message on topic', topic)
let packet = this._handlePayload(payload) let packet = this._handlePayload(payload)
if(packet) packet.cmd = packet.cmd || topic // payload had no command that use topic as cmd if (packet) packet.cmd = packet.cmd || topic
else packet = {cmd:topic} // payload had no command that use topic as cmd
let res = await this._packetProcess(packet,topic) || {} else packet = { cmd: topic }
let res = (await this._packetProcess(packet, topic)) || {}
this.send(res) this.send(res)
} }
} }
_handlePayload (payload) { _handlePayload(payload) {
let [err,packet] = btc(JSON.parse)(payload.toString()) let [err, packet] = btc(JSON.parse)(payload.toString())
if (err) { if (err) {
log.info('payload not json returning as prop data:<payload>') log.info('payload not json returning as prop data:<payload>')
} }
if (!isPlainObject(packet)) packet = { data:payload.toString() } if (!isPlainObject(packet)) packet = { data: payload.toString() }
return packet return packet
} }
// default packet process just a simple console logger. ignores any cmd: prop // default packet process just a simple console logger. ignores any cmd: prop
_packetProcess (packet,topic) { _packetProcess(packet, topic) {
console.log('=========================') console.log('=========================')
console.log('default consumer processor\npacket from broker on topic:',topic) console.log(
'default consumer processor\npacket from broker on topic:',
topic
)
console.dir(packet) console.dir(packet)
console.log(packet.status) console.log(packet.status)
console.log('========================') console.log('========================')
} }
async _log() { async _log() {
log = logger({
log = logger({file:'src/client.js',class:'Client',name:'mqtt',id:this.id}) file: 'src/client.js',
class: 'Client',
name: 'mqtt',
id: this.id
})
this.on('close', () => { this.on('close', () => {
log.info('connection to broker was closed') log.info('connection to broker was closed')
@ -148,26 +201,26 @@ export default class Client {
}) })
this.on('packetsend', packet => { this.on('packetsend', packet => {
log.info({packet:packet},'outgoing packet to mqtt broker') log.info({ packet: packet }, 'outgoing packet to mqtt broker')
}) })
this.on('packetreceive', packet => { this.on('packetreceive', packet => {
log.info({packet:packet},'incoming packet from mqtt broker') log.info({ packet: packet }, 'incoming packet from mqtt broker')
}) })
} }
_extend (obj, funcs) { _extend(obj, funcs) {
let temp = {} let temp = {}
funcs = funcs.split(',') funcs = funcs.split(',')
funcs.forEach(func => { funcs.forEach(func => {
temp[func] = this[func] temp[func] = this[func]
}) })
merge(this,obj) merge(this, obj)
funcs.forEach(func => { funcs.forEach(func => {
this['_'+func]=this[func] this['_' + func] = this[func]
this[func] = temp[func] this[func] = temp[func]
}) })
} }
} // end mqtt client class
} // end mqtt client class export default MQTTClient