uci-mqtt/src/mqtt.js

348 lines
13 KiB
JavaScript

import { connect } from 'mqtt'
import EventEmitter from 'events'
import btc from 'better-try-catch'
import union from 'lodash.union'
import xor from 'lodash.xor'
import logger from '@uci-utils/logger'
import isPlainObject from 'is-plain-object'
import isTopicMatch from 'mqtt-match'
let log = {}
/**
* MQTT - Client - An mqtt client that supports UCI packets
* uses mqtt.js as basis
*/
class MQTTClient extends EventEmitter {
/**
* constructor - Description
*
* @param {object} [opts={}] Description
*
*/
constructor(opts = {}) {
super(opts)
this.id = opts.id || opts.name || 'mqtt:' + new Date().getTime()
log = logger({
file: 'src/client.js',
package:'@uci/mqttjs',
class: 'MQTTClient',
name: 'mqtt',
id: this.id
})
this.url = opts.url || null
this.client = opts.client || false
this.sendTopic = opts.sendTopic || 'uci/send'
this.opts = opts
// these are the "listen" topics to which the instance will subscribe. By default that will include the instance ID
// default subscription topics can be string of commna delimited or array of strings. If none they by default topic is id
this.topics = opts.topics ? (Array.isArray(opts.topics) ? opts.topics : opts.topics.split(',')) : []
this.topics.push(this.client ? `${this.sendTopic}/return` : `${this.sendTopic}/receive`)
this.connectOpts = opts.connect || {} // see options for new mqtt.Client
this.mqtt = {} // orginal mqtt.js object when created by connect
// self bindings
this._connected = false
this.conPackets = opts.conPackets || [opts.conPacket]
this.connect = this.connect.bind(this)
this.publish = this.publish.bind(this)
this._subscribe = this._subscribe.bind(this)
this.client ? this.send = this._send.bind(this) : this.push = this._push.bind(this)
}
// TODO add UCI authentication and MQTT authentication
get connected() { return this._connected }
/**
* connect - Description
*
* @returns {type} Description
*/
async connect() {
return new Promise((resolve, reject) => {
this.mqtt = connect(
this.url,
this.connectOpts
)
let timeout = this.connecOpts || 5000
setTimeout(() => {
reject({msg:'ending mqtt connection attempt, no broker', url:this.url, options:this.connectOpts})
}, timeout)
this.mqtt.once('connect', async () => {
this._log()
this._listen()
this.mqtt.on('reconnect', () => {
log.debug({method:'connect', line:71, msg:'mqtt client reconnected to broker'})
this._connected=true
this._subscribe(this.topics) // resubscribe
this.emit('status',{level:'info', msg:'reconnected', id:this.id, opts:this.opts, connected:this._connected})
this.emit('connection:consumer', {state:'connected', id:this.id, mqtt:true})
if (this.opts.conPacket) this.send(this.conPacket)
})
this.mqtt.on('error', err => {
this.emit('status',{level:'fatal', method:'connect', err: err, msg:'connection error to broker'})
log.fatal({method:'connect', line:76, err: err, msg:'connection error to broker'})
this.emit('connection:consumer', {state:'disconnected', id:this.id, mqtt:true})
this._connected=false
})
this.mqtt.on('close', () => {
this.emit('status',{level:'fatal', msg:'mqtt connection to broker was closed'})
log.debug({method:'_log', line:193, msg:'connection to broker was closed'})
this._connected=false
})
this.mqtt.on('offline', () => {
this.emit('status',{level:'fatal',msg:'this client has gone offline from broker'})
log.warn({method:'_log', line:197, msg:'this client has gone offline from broker'})
this._connected=false
})
log.debug({method:'connect', line:80, options: this.mqtt.options, msg:`mqtt client connected to broker at ${this.mqtt.options.hostname}:${this.mqtt.options.port}`})
this._connected=true
let [err] = await btc(this._subscribe)(this.topics) // initial subscriptions
if (err) reject({error:'unable to subscribe to topics after connection', err:err})
else{
this.emit('status',{level:'info', msg:'connected', id:this.id, opts:this.opts, connected:this._connected})
this.emit('connection:consumer', {state:'connected', id:this.id, mqtt:true})
if (this.opts.conPacket) this.send(this.conPacket)
resolve(`mqtt client connected to broker at ${this.mqtt.options.hostname}:${this.mqtt.options.port}`)
}
})
}) //end promise
}
async subscribe(topic, options = {}) {
// topic is only comma delimited string or array of topics
if (typeof topic === 'string') topic = topic.split(',')
this.topics = union(this.topics, topic || [])
log.debug({method:'subscribe', line:91, msg:`subscription for topic ${topic} added`})
return this._subscribe(topic,options)
}
async unsubscribe(topic, options = {}) {
if (typeof topic === 'string') topic = topic.split(',')
this.topics = xor(this.topics, topic || [])
return new Promise((resolve, reject) => {
if (!topic) reject('no topic supplied')
if (!this._connected) reject('no connection to broker')
this.mqtt.unsubscribe(topic,options, (err, granted) => {
if (err) reject(err)
else resolve(granted)
})
})
}
// solely for primary of sending uci packets. Use publish for typical mqtt messaging
// if topic not passed then it will send to the topic given by the cmd
async _send(ipacket, topic, options) { // packet is required
if (!ipacket || !Object.keys(ipacket).length) return {error:'send aborted, no packet passed'}
if (isPlainObject(topic)) { // if neither then assume none and what is passed is options
options = topic || {}
topic = ''
}
return new Promise(async resolve => {
if (!this._connected) {
log.error({method:'send', line:105, url:this.url, opts:this.opts, msg:'no active connection to broker'})
resolve({ error: 'client not connected to a broker, aborting send' })
}
let packet = Object.assign({}, ipacket) // need to avoid mutuation for different consumers using same packet instance
packet._header = {
id: Math.random()
.toString()
.slice(2), // need this for when multiple sends for different consumers use same packet instanceack
returnTopic: `${this.sendTopic}/return`
}
topic = topic || `${this.sendTopic}/receive`
let payload = this._serialize(packet) // payload is entire packet
if (!payload) {
log.error({method:'send', line:163, msg:'not able to serilaize packet', packet:packet})
resolve({error:'not able to serilaize packet', packet:packet})
}
setTimeout(() => {resolve({ error: 'no response from socket in 10sec' })}, 10000)
this.once(packet._header.id, async reply => {
let res = await this._packetProcess(reply)
if (!res) { // if packetProcess was not promise
res = reply
log.debug({method:'send', line:180, msg:'consumer function was not promise returning further processing may be out of sequence'})
}
resolve(res) // resolves processed packet not returned packet
})
let res = await this._publish(packet,topic,options)
if (res) resolve(res)
}) // end promise
}
async _push(packet, topic, options) {
if (!packet || !Object.keys(packet).length) return {error:'push aborted, no packet passed', packet:packet, topic:topic}
if (!this._connected) {
log.error({method:'send', line:105, url:this.url, opts:this.opts, msg:'no active connection to broker'})
return {error:'no active connection to broker'}
}
if (isPlainObject(topic)) { // if neither then assume none and what is passed is options
options = topic || {}
topic = ''
}
topic = topic || packet.cmd ? packet.cmd.replace(/\./gi,'/') : ''
if (!topic) {
log.warn({msg:'push aborted, no topic pass nor command in packet', packet:packet, topic:topic})
return {error:'push aborted, no topic given or no command in packet', packet:packet, topic:topic}
}
packet._header ={id:'pushed'}
log.info({method:'push', line:194, msg:'mqtt publishing a uci pushed packet', packet:packet, topic:topic})
return await this._publish(packet, topic, options)
}
// this is internal UCI publish
async _publish(packet,topic,options) {
if(!topic) return {error: 'no topic to which to publish packet'}
let payload = packet.payload!=null ? ( (typeof packet.payload ==='string' ? packet.payload : this._serialize(packet.payload)) ) : this._serialize(packet) // payload is entire packet
if (!payload) {
log.error({method:'send', line:163, msg:'not able to serilaize packet', packet:packet})
return {error:'not able to serilaize packet', packet:packet}
}
// all ready publish packets
log.info(({msg:'sending a uci packet vis mqtt', packet:packet,topic:topic,payload:payload, options:options}))
let [err] = await btc(this.publish)(topic,payload,options)
if (err) return {error:err}
}
publish (...args) {
return new Promise((resolve, reject) => {
if (!this._connected) reject('publish failed - no connection to broker')
this.mqtt.publish(...args, (err, result) => {
if (err) {
reject(err)
}
else resolve(result)
})
})
}
end (...args) {
return new Promise((resolve, reject) => {
this.mqtt.end(...args, (err, result) => {
if (err) reject(err)
else resolve(result)
})
})
}
registerPacketProcessor(func) {
this._packetProcess = func
}
async _subscribe (topic, options={}) {
return new Promise((resolve, reject) => {
if (!this._connected) reject('no connection to broker during subscription attempt')
if (!topic) reject('no topic supplied')
this.mqtt.subscribe(topic,options, (err, granted) => {
if (err) reject(err)
else resolve(granted)
})
})
}
_serialize(json) {
let [err, payload] = btc(JSON.stringify)(json)
if (err) {
log.warn({method:'_serialize', line:140, msg:`unable to stringify json for payload:${json}`})
return null
}
return payload
}
_listen() {
const msg = `listening for incoming mqtt packets from broker on topics ${this.topics}`
log.debug({method:'_listen', line:147, msg:msg})
this.emit('socket',{state:'listening', msg:msg})
this.mqtt.on('message', messageProcess.bind(this))
async function messageProcess(topic, payload) {
log.debug({method:'_listen', line:153 ,msg:'incoming mqtt message to process', topic:topic, payload:payload})
let packet = this._handlePayload(payload)
if (isPlainObject(packet)) packet.cmd = packet.cmd || topic // if payload had no command so use topic as cmd
else packet = { cmd: topic, data:packet } // payload was not json packet, create a packet and emit payload
this.emit('payload',{topic:topic, payload:payload}) // emit 'payload' event for use for standard mqtt pub/sub
let header = Object.assign({},packet._header) // save the header in case a packet processor deletes it
let res = await this._packetProcess(packet)
if (header && this.client) this.emit(header.id,packet) // is acting like a client then emit returned packet
else { // act like a server, process and return reply
res._header = header
log.info({method:'_listen', line:159, msg:'processed packet - return only if sent not published', processed:res})
if (res._header) {
if(res._header.returnTopic) this._publish(res,res._header.returnTopic)
}
}
}
}
_handlePayload(payload) {
let [err, packet] = btc(JSON.parse)(payload.toString())
if (err) log.debug({method:'_handlePayload', line:201, msg:'payload is not json returning as prop data:<payload>'})
if (!isPlainObject(packet)) return payload.toString()
return packet
}
_validateTopic (topic) {
if (this.topics.some(ctopic => isTopicMatch(ctopic,topic))) {
log.warn({msg:'topic not valid for publishing, matches a subscribed topic or error topic', topic:topic, errorTopic:this.errorTopics, subscribed:this.topics})
return false
}
return true
}
// default packet process just a simple console logger. ignores any cmd: prop
_packetProcess(packet) {
console.log('=========================')
console.log('received packet to be processed=>',packet)
console.log('replace this by setting new function with .registerPacketProcessor')
packet.msg='the default packet processeor processed this packet'
console.log('========================')
return packet
}
async _log() {
log = logger({
file: 'src/client.js',
class: 'Client',
package: '@uci/mqtt',
id: this.id
})
this.mqtt.on('packetsend', packet => {
log.debug({method:'_log', line:201, packet: packet, msg:'outgoing packet to mqtt broker'})
})
this.mqtt.on('packetreceive', packet => {
log.debug({method:'_log', line:205, packet: packet, msg:'incoming packet from mqtt broker'})
})
}
} // end mqtt client class
export default MQTTClient