
259 lines
9.3 KiB

import { connect } from 'mqtt'
import btc from 'better-try-catch'
import union from 'lodash.union'
import xor from 'lodash.xor'
import logger from '@uci-utils/logger'
import isPlainObject from 'is-plain-object'
import isTopicMatch from 'mqtt-match'
let log = {}
* MQTT - Client - An mqtt client that supports UCI packets
* it extends the {@link https://github.com/mqttjs/async-mqtt| async version this mqtt client } albiet in a clunky way with a private extend method because it was not written as an extendable class
* @extends MQTT.js
class MQTTClient {
* constructor - Description
* @param {object} [opts={}] Description
constructor(opts = {}) {
this.id = opts.id || opts.name || 'mqtt:' + new Date().getTime()
log = logger({
file: 'src/client.js',
class: 'MQTTClient',
name: 'mqtt',
id: this.id
this.type = opts.type || 'client'
this.url = opts.url || null
this.errorTopic = opts.errorTopic || 'error' // a generic error topic
// default subscription topics can be string of commna delimited or array of strings. If none they by default topic is id
this.topics = Array.isArray(opts.topics)
? opts.topics
: opts.topics
? opts.topics.split(',')
: [this.id]
this.connectOpts = opts.connect || {} // see options for new mqtt.Client
this.mqtt = {} // orginal mqtt.js object when created by connect
// self bindings
this.connect = this.connect.bind(this)
this.push = this.send
* connect - Description
* @returns {type} Description
async connect() {
return new Promise((resolve, reject) => {
this.mqtt = connect(
let timeout = this.connecOpts || 5000
setTimeout(() => {
reject({msg:'ending mqtt connection attempt, no broker', url:this.url, options:this.connectOpts})
}, timeout)
this.mqtt.once('connect', () => {
this.mqtt.on('reconnect', () => {
log.debug({method:'connect', line:71, msg:'mqtt client reconnected to broker'})
this._subscribe(this.topics) // resubscribe
this.mqtt.on('error', err => {
log.fatal({method:'connect', line:76, err: err, msg:'connection error to broker'})
this._subscribe(this.topics) // initial subscriptions
log.debug({method:'connect', line:80, options: this.mqtt.options, msg:`mqtt client connected to broker at ${this.mqtt.options.hostname}:${this.mqtt.options.port}`})
resolve(`mqtt client connected to broker at ${this.mqtt.options.hostname}:${this.mqtt.options.port}`)
}) //end promise
async subscribe(topic, options = {}) {
// topic is only comma delimited string or array of topics
if (typeof topic === 'string') topic = topic.split(',')
this.topics = union(this.topics, topic || [])
log.debug({method:'subscribe', line:91, msg:`subscription for topic ${topic} added`})
async unsubscribe(topic, options = {}) {
if (typeof topic === 'string') topic = topic.split(',')
this.topics = xor(this.topics, topic || [])
return new Promise((resolve, reject) => {
if (!topic) reject('no topic supplied')
if (!this.mqtt.connected) reject('no connection to broker')
this.mqtt.unsubscribe(topic,options, (err, granted) => {
if (err) reject(err)
else resolve(granted)
// sends a uci packet as an mqtt message where cmd: is topic
// TODO if sending as a client (default) add a _header with ID to packet and set subcription for id and in _listen emit id with payload when it comes back
// if a "server" then after processing publish back as /ID/theidnumber
async send(packet, topic, options) {
log.debug(({msg:'sending a packet', packet:packet,topic:topic,options:options}))
if (!this.mqtt.connected) {
log.error({method:'send', line:105, url:this.url, opts:this.opts, msg:'no active connection to broker'})
return Promise.reject('no active connection to broker')
if (topic == null || typeof topic !== 'string') { // if neither then assume none and what is passed is options
options = topic || {}
topic =''
let payload
if (typeof packet !== 'string') {
// packet is an object
topic = packet.cmd ? packet.cmd.replace(/\./gi,'/') : topic
if (packet.payload) payload = packet.payload.toString()
else {
payload = this._serialize(packet) // payload is entire packet
if (!payload) payload = this._serialize({ err: 'could not serialze packet' })
} else payload = packet // packet was just a string so it's the payload
if (topic.length === 0) {
log.warn({msg:'publish aborted, no topic pass nor command in packet', packet:packet, topic:topic})
return {error:'publish aborted, no topic given or no command in packet', packet:packet, topic:topic}
log.debug({method:'send', line:127, msg:'mqtt publishing', payload:payload, topic:topic})
return await this.publish(topic, payload, options)
publish (...args) {
return new Promise((resolve, reject) => {
if (!this.mqtt.connected) reject('no connection to broker')
this.mqtt.publish(...args, (err, result) => {
if (err) reject(err)
else resolve(result)
end (...args) {
return new Promise((resolve, reject) => {
this._client.end(...args, (err, result) => {
if (err) reject(err)
else resolve(result)
registerPacketProcessor(func) {
this._packetProcess = func
async _subscribe (topic, options={}) {
return new Promise((resolve, reject) => {
if (!this.mqtt.connected) reject('no connection to broker')
if (!topic) reject('no topic supplied')
this.mqtt.subscribe(topic,options, (err, granted) => {
if (err) reject(err)
else resolve(granted)
_serialize(json) {
let [err, payload] = btc(JSON.stringify)(json)
if (err) {
log.warn({method:'_serialize', line:140, msg:`unable to stringify json for payload:${json}`})
return null
return payload
// TODO if client set up reply subscription with message id and then emit that for resolving send
// if server then take id and send message back with that appended to topic
_listen() {
log.info({method:'_listen', line:147, msg:`listening for incoming mqtt packets from broker on topics ${this.topics}`})
this.mqtt.on('message', messageProcess.bind(this))
async function messageProcess(topic, payload) {
log.debug({method:'_listen', line:153 ,msg:'incoming mqtt message to process', topic:topic, payload:payload})
let packet = this._handlePayload(payload)
if (packet) packet.cmd = packet.cmd || topic // if payload had no command so use topic as cmd
else packet = { cmd: topic }
let res = (await this._packetProcess(packet, topic)) || {}
log.debug({method:'_listen', line:159, msg:'processed packet back to broker', res:res, topics:this.topics})
if (this.topics.some(topic => isTopicMatch(topic,res.cmd || ''))) {
if (this.topics.some( topic => isTopicMatch(topic,this.errorTopic))) {
log.warn({method:'_listen', line:165, msg:'unable to send error, subscried to error topic', subscribed:this.topics, errorTopic:this.errorTopic})
else {
res = {cmd:'error', msg:`unable to send this response, response topic '${res.cmd}'' matches a topic [${this.topics}] - subscribed to by sender`, response:res, subcribed:this.topics}
log.warn({method:'_listen', line:168,msg:'sent mqtt error message', error:res})
// TODO if acting as a "server" then send back the result, if client and ID is matched then emit for send function
// if (res) this.send(res)
_handlePayload(payload) {
let [err, packet] = btc(JSON.parse)(payload.toString())
if (err) log.debug({method:'_handlePayload', line:201, msg:'payload is not json returning as prop data:<payload>'})
if (!isPlainObject(packet)) return { data: payload.toString() }
return packet
// default packet process just a simple console logger. ignores any cmd: prop
_packetProcess(packet, topic) {
console.log('default consumer processor\npacket from broker on topic:',topic)
console.log('replace this by setting new function with .registerPacketProcessor')
async _log() {
log = logger({
file: 'src/client.js',
class: 'Client',
package: '@uci/mqtt',
id: this.id
this.mqtt.on('close', () => {
log.debug({method:'_log', line:193, msg:'connection to broker was closed'})
this.mqtt.on('offline', () => {
log.warn({method:'_log', line:197, msg:'this client has gone offline from broker'})
this.mqtt.on('packetsend', packet => {
log.debug({method:'_log', line:201, packet: packet, msg:'outgoing packet to mqtt broker'})
this.mqtt.on('packetreceive', packet => {
log.debug({method:'_log', line:205, packet: packet, msg:'incoming packet from mqtt broker'})
} // end mqtt client class
export default MQTTClient