223 lines
7.3 KiB
JavaScript
223 lines
7.3 KiB
JavaScript
import { connect } from 'async-mqtt'
|
|
import { promisify } from 'util'
|
|
import merge from 'lodash.merge'
|
|
import btc from 'better-try-catch'
|
|
import union from 'lodash.union'
|
|
import xor from 'lodash.xor'
|
|
import logger from '@uci-utils/logger'
|
|
import isPlainObject from 'is-plain-object'
|
|
|
|
let log = {}
|
|
|
|
/**
|
|
* MQTT - Client - An mqtt client that supports UCI packets
|
|
* it extends the {@link https://github.com/mqttjs/async-mqtt| async version this mqtt client } albiet in a clunky way with a private extend method because it was not written as an extendable class
|
|
* @extends MQTT.js
|
|
*/
|
|
class MQTTClient {
|
|
/**
|
|
* constructor - Description
|
|
*
|
|
* @param {object} [opts={}] Description
|
|
*
|
|
*/
|
|
constructor(opts = {}) {
|
|
this.id = opts.id || opts.name || 'mqtt:' + new Date().getTime()
|
|
log = logger({
|
|
file: 'src/client.js',
|
|
package:'@uci/mqttjs',
|
|
class: 'MQTTClient',
|
|
name: 'mqtt',
|
|
id: this.id
|
|
})
|
|
this.url = opts.url || null
|
|
// subscription topics can be string of commna delimited or array of strings. If none they by default topic is id
|
|
this.topics = Array.isArray(opts.topics)
|
|
? opts.topics
|
|
: opts.topics
|
|
? opts.topics.split(',')
|
|
: [this.id]
|
|
this.opts = opts.connect || {} // see options for new mqtt.Client
|
|
// self bindings
|
|
this.connect = this.connect.bind(this)
|
|
this.push = this.send
|
|
}
|
|
|
|
/**
|
|
* connect - Description
|
|
*
|
|
* @returns {type} Description
|
|
*/
|
|
async connect() {
|
|
return new Promise((resolve, reject) => {
|
|
let mqtt = connect(
|
|
this.url,
|
|
this.opts
|
|
)
|
|
// version 2.0 and up messes this up. Need to look at another way
|
|
// maybe better not to merge but have mqtt client be a property of class
|
|
this._extend(mqtt, 'subscribe,unsubscribe') // merge mqtt client into class extend with given functions
|
|
this.close = promisify(this.end).bind(this)
|
|
let timeout = this.opts.connectTimeout || 5000
|
|
setTimeout(() => {
|
|
reject({msg:'ending mqtt connection attempt, no broker', url:this.url, opts:this.opts})
|
|
}, timeout)
|
|
|
|
this.once('connect', () => {
|
|
this._log()
|
|
this._listen()
|
|
|
|
this.on('reconnect', () => {
|
|
log.debug({method:'connect', line:71, msg:'mqtt client reconnected to broker'})
|
|
this.subscribe(this.topics)
|
|
})
|
|
|
|
this.on('error', err => {
|
|
log.fatal({method:'connect', line:76, err: err, msg:'connection error to broker'})
|
|
})
|
|
|
|
this.subscribe(this.topics)
|
|
log.debug({method:'connect', line:80, options: this._client.options, msg:`mqtt client connected to broker at ${this._client.options.hostname}:${this._client.options.port}`})
|
|
resolve(`mqtt client connected to broker at ${this._client.options.hostname}:${this._client.options.port}`)
|
|
})
|
|
}) //end promise
|
|
}
|
|
|
|
async subscribe(topic, options = {}) {
|
|
if (options.pub) {
|
|
if (typeof topic === 'string') topic = topic.split(',')
|
|
this.topics = union(this.topics, topic)
|
|
}
|
|
log.debug({method:'subscribe', line:91, msg:`subscription for topic ${topic} added`})
|
|
this._subscribe(topic, options)
|
|
}
|
|
|
|
async unsubscribe(topic, options = {}) {
|
|
if (!options.pub) {
|
|
if (typeof topic === 'string') topic = topic.split(',')
|
|
this.topics = xor(this.topics, topic)
|
|
}
|
|
this._unsubscribe(topic)
|
|
}
|
|
|
|
async send(packet, topics, options) {
|
|
if (!this.hasOwnProperty('publish')) {
|
|
log.error({method:'send', line:105, url:this.url, opts:this.opts, msg:'connect method never called, initialization needed, aborting send'})
|
|
return
|
|
}
|
|
if (typeof topics !== 'string' && !Array.isArray(topics)) {
|
|
options = topics
|
|
topics = this.topics
|
|
} else {
|
|
if (typeof topics === 'string') topics = topics.split(',')
|
|
}
|
|
|
|
let payload
|
|
if (typeof packet !== 'string') {
|
|
if (packet.payload) payload = packet.payload.toString()
|
|
else {
|
|
payload = this._serialize(packet)
|
|
if (!payload) payload = this._serialize({ err: 'could not serialze packet' })
|
|
}
|
|
topics = packet.cmd ? [packet.cmd.replace(/\./gi,'/') ] : topics // if payload has a cmd use that as topic, convert any . to slash
|
|
} else payload = packet // packet was just a string
|
|
|
|
let pubs = []
|
|
topics.forEach(async topic => {
|
|
log.debug({method:'send', line:127, msg:'mqtt publishing', payload:payload, topic:topic})
|
|
pubs.push(this.publish(topic, payload, options))
|
|
})
|
|
return await Promise.all(pubs)
|
|
}
|
|
|
|
registerPacketProcessor(func) {
|
|
this._packetProcess = func
|
|
}
|
|
|
|
_serialize(json) {
|
|
let [err, payload] = btc(JSON.stringify)(json)
|
|
if (err) {
|
|
log.warn({method:'_serialize', line:140, msg:`unable to stringify json for payload:${json}`})
|
|
return null
|
|
}
|
|
return payload
|
|
}
|
|
|
|
_listen() {
|
|
log.info({method:'_listen', line:147, msg:`listening for incoming mqtt packets from broker on topics ${this.topics}`})
|
|
this.on('message', messageProcess.bind(this))
|
|
|
|
async function messageProcess(topic, payload) {
|
|
log.debug({method:'_listen', line:153 ,msg:'incoming mqtt message to process', topic:topic, payload:payload})
|
|
let packet = this._handlePayload(payload)
|
|
if (packet) packet.cmd = packet.cmd || topic
|
|
// payload had no command that use topic as cmd
|
|
else packet = { cmd: topic }
|
|
let res = (await this._packetProcess(packet, topic)) || {}
|
|
log.debug({method:'_listen', line:159, msg:'processed packet back to broker - cmd: should NOT be a subscribed topic or infinite loop!', res:res, topics:this.topics})
|
|
// TODO!!!!!!!!!! make sure cmd/topic is not the same as subscribed topics or can make an infinite loop
|
|
this.send(res)
|
|
}
|
|
}
|
|
|
|
_handlePayload(payload) {
|
|
let [err, packet] = btc(JSON.parse)(payload.toString())
|
|
if (err) log.debug({method:'_handlePayload', line:201, msg:'payload is not json returning as prop data:<payload>'})
|
|
if (!isPlainObject(packet)) return { data: payload.toString() }
|
|
return packet
|
|
}
|
|
|
|
// default packet process just a simple console logger. ignores any cmd: prop
|
|
_packetProcess(packet, topic) {
|
|
console.log('=========================')
|
|
console.log(
|
|
'default consumer processor\npacket from broker on topic:',
|
|
topic
|
|
)
|
|
console.dir(packet)
|
|
console.log(packet.status)
|
|
console.log('========================')
|
|
}
|
|
|
|
async _log() {
|
|
log = logger({
|
|
file: 'src/client.js',
|
|
class: 'Client',
|
|
package: '@uci/mqtt',
|
|
id: this.id
|
|
})
|
|
|
|
this.on('close', () => {
|
|
log.debug({method:'_log', line:193, msg:'connection to broker was closed'})
|
|
})
|
|
|
|
this.on('offline', () => {
|
|
log.warn({method:'_log', line:197, msg:'this client has gone offline from broker'})
|
|
})
|
|
|
|
this.on('packetsend', packet => {
|
|
log.debug({method:'_log', line:201, packet: packet, msg:'outgoing packet to mqtt broker'})
|
|
})
|
|
|
|
this.on('packetreceive', packet => {
|
|
log.debug({method:'_log', line:205, packet: packet, msg:'incoming packet from mqtt broker'})
|
|
})
|
|
}
|
|
|
|
_extend(obj, funcs) {
|
|
let temp = {}
|
|
funcs = funcs.split(',')
|
|
funcs.forEach(func => {
|
|
temp[func] = this[func]
|
|
})
|
|
merge(this, obj)
|
|
|
|
funcs.forEach(func => {
|
|
this['_' + func] = this[func]
|
|
this[func] = temp[func]
|
|
})
|
|
}
|
|
} // end mqtt client class
|
|
|
|
export default MQTTClient
|