uci-mqtt/src/client.js

224 lines
6.3 KiB
JavaScript

import { connect } from 'async-mqtt'
import merge from 'lodash.merge'
import btc from 'better-try-catch'
import union from 'lodash.union'
import xor from 'lodash.xor'
import logger from '@uci-utils/logger'
import isPlainObject from 'is-plain-object'
let log = {}
/**
* MQTT - Client - An mqtt client that supports UCI packets
* it extends the {@link https://github.com/mqttjs/async-mqtt| async version this mqtt client } albiet in a clunky way with a private extend method because it was not written as an extendable class
* @extends MQTT.js
*/
class MQTTClient {
/**
* constructor - Description
*
* @param {object} [opts={}] Description
*
*/
constructor(opts = {}) {
this.id = opts.id || opts.name || 'mqtt:' + new Date().getTime()
log = logger({
file: 'src/client.js',
class: 'Client',
name: 'mqtt',
id: this.id
})
this.url = opts.url || ''
// subscription topics can be string of commna delimited or array of strings see object see mqtt.js docs
this.topics = Array.isArray(opts.topics)
? opts.topics
: opts.topics
? opts.topics.split(',')
: [this.id]
this.opts = opts.connect || {} // see options for new mqtt.Client
// self bindings
this.connect = this.connect.bind(this)
this.push = this.send
}
/**
* connect - Description
*
* @returns {type} Description
*/
async connect() {
return new Promise((resolve, reject) => {
let mqtt = connect(
this.url,
this.opts
)
this._extend(mqtt, 'subscribe,unsubscribe') // merge mqtt client into class extend with given functions
let timeout = this.opts.connectTimeout || 5000
setTimeout(() => {
reject({msg:'ending mqtt connection attempt, no broker', url:this.url, opts:this.opts})
}, timeout)
this.once('connect', () => {
this._log()
this._listen()
this.on('reconnect', () => {
log.info('mqtt client reconnected to broker')
this.subscribe(this.topics)
})
this.on('error', err => {
log.fatal({ err: err }, 'connection error to broker')
})
this.subscribe(this.topics)
log.info(
{ options: this._client.options },
`mqtt client connected to broker at ${
this._client.options.hostname
}:${this._client.options.port}`
)
resolve(
`mqtt client connected to broker at ${
this._client.options.hostname
}:${this._client.options.port}`
)
})
}) //end promise
}
async subscribe(topic, options = {}) {
if (options.pub) {
if (typeof topic === 'string') topic = topic.split(',')
this.topics = union(this.topics, topic)
}
log.info(`subscription for topic ${topic} added`)
this._subscribe(topic, options)
}
async unsubscribe(topic, options = {}) {
if (!options.pub) {
if (typeof topic === 'string') topic = topic.split(',')
this.topics = xor(this.topics, topic)
}
this._unsubscribe(topic)
}
async send(packet, topics, options) {
if (!this.hasOwnProperty('publish')) {
log.warn({url:this.url, opts:this.opts, msg:'connect method never called, initialization needed, aborting send'})
return
}
if (typeof topics !== 'string' && !Array.isArray(topics)) {
options = topics
topics = this.topics
} else {
if (typeof topics === 'string') topics = topics.split(',')
}
let payload = {}
if (typeof packet !== 'string') {
payload = this._serialize(packet)
if (!payload)
payload = this._serialize({ err: 'could not serialze packet' })
topics = [packet.cmd] || topics // if payload has a cmd use that as topic
} else payload = this._serialize({ data: packet })
let pubs = []
topics.forEach(async topic => {
log.info(`sending ${payload} to topic ${topic} with options ${options}`)
pubs.push(this.publish(topic, payload, options))
})
return await Promise.all(pubs)
}
registerPacketProcessor(func) {
this._packetProcess = func
}
_serialize(json) {
let [err, payload] = btc(JSON.stringify)(json)
if (err) {
log.warn(`error unable to stringify json:${json}`)
return null
}
return payload
}
_listen() {
log.info(
`listening for incoming packets from broker on topics ${this.topics}`
)
this.on('message', messageProcess.bind(this))
async function messageProcess(topic, payload) {
log.info('incoming message on topic', topic)
let packet = this._handlePayload(payload)
if (packet) packet.cmd = packet.cmd || topic
// payload had no command that use topic as cmd
else packet = { cmd: topic }
let res = (await this._packetProcess(packet, topic)) || {}
this.send(res)
}
}
_handlePayload(payload) {
let [err, packet] = btc(JSON.parse)(payload.toString())
if (err) {
log.info('payload not json returning as prop data:<payload>')
}
if (!isPlainObject(packet)) packet = { data: payload.toString() }
return packet
}
// default packet process just a simple console logger. ignores any cmd: prop
_packetProcess(packet, topic) {
console.log('=========================')
console.log(
'default consumer processor\npacket from broker on topic:',
topic
)
console.dir(packet)
console.log(packet.status)
console.log('========================')
}
async _log() {
log = logger({
file: 'src/client.js',
class: 'Client',
package: '@uci/mqtt',
id: this.id
})
this.on('close', () => {
log.info('connection to broker was closed')
})
this.on('offline', () => {
log.info('this client has gone offline from broker')
})
this.on('packetsend', packet => {
log.info({ packet: packet }, 'outgoing packet to mqtt broker')
})
this.on('packetreceive', packet => {
log.info({ packet: packet }, 'incoming packet from mqtt broker')
})
}
_extend(obj, funcs) {
let temp = {}
funcs = funcs.split(',')
funcs.forEach(func => {
temp[func] = this[func]
})
merge(this, obj)
funcs.forEach(func => {
this['_' + func] = this[func]
this[func] = temp[func]
})
}
} // end mqtt client class
export default MQTTClient