172 lines
4.7 KiB
JavaScript
172 lines
4.7 KiB
JavaScript
|
import { connect } from 'async-mqtt'
|
||
|
import merge from 'lodash.merge'
|
||
|
import btc from 'better-try-catch'
|
||
|
import union from 'lodash.union'
|
||
|
import xor from 'lodash.xor'
|
||
|
import logger from '@uci/logger'
|
||
|
|
||
|
let log = {}
|
||
|
|
||
|
export default class Client {
|
||
|
constructor (opts={}) {
|
||
|
this.id = opts.id || opts.name || 'mqtt:'+ new Date().getTime()
|
||
|
this.url = opts.url
|
||
|
// subscription topics can be string of commna delimited or array of strings see object see mqtt.js docs
|
||
|
this.topics = Array.isArray(opts.topics) ? opts.topics : (opts.topics ? opts.topics.split(',') : [this.id])
|
||
|
this.opts = opts.connect || {} // see options for new mqtt.Client
|
||
|
// self bindings
|
||
|
this.connect = this.connect.bind(this)
|
||
|
}
|
||
|
|
||
|
async connect () {
|
||
|
|
||
|
return new Promise( (resolve,reject) => {
|
||
|
|
||
|
// connect returns connected mqtt client instance so merge it to class
|
||
|
// console.log(this)
|
||
|
let mqtt = connect(this.url,this.opts)
|
||
|
this._extend(mqtt,'subscribe,unsubscribe')
|
||
|
this._log()
|
||
|
this._listen()
|
||
|
|
||
|
this.on('reconnect', () => {
|
||
|
log.info('mqtt client reconnected to broker' )
|
||
|
this.subscribe(this.topics)
|
||
|
})
|
||
|
|
||
|
this.once('connect', () => {
|
||
|
this.subscribe(this.topics)
|
||
|
log.info('mqtt client connected to broker' )
|
||
|
resolve(`mqtt client connected to broker at ${this._client.options.host}`)
|
||
|
})
|
||
|
|
||
|
this.on('error', err => {
|
||
|
log.fatal({err:err},'connection error to broker' )
|
||
|
console.log('connection error',err.code)
|
||
|
reject(err)
|
||
|
})
|
||
|
|
||
|
}) //end promise
|
||
|
|
||
|
}
|
||
|
|
||
|
async subscribe(topic,options={}) {
|
||
|
if (options.pub) {
|
||
|
if (typeof topic==='string') topic = topic.split(',')
|
||
|
this.topics=union(this.topics,topic)
|
||
|
}
|
||
|
this._subscribe(topic,options)
|
||
|
}
|
||
|
|
||
|
async unsubscribe(topic,options={}) {
|
||
|
if (!options.pub) {
|
||
|
if (typeof topic==='string') topic = topic.split(',')
|
||
|
this.topics=xor(this.topics,topic)
|
||
|
}
|
||
|
this._unsubscribe(topic)
|
||
|
}
|
||
|
|
||
|
|
||
|
async send(topics,payload,options) {
|
||
|
if (typeof topics !=='string' && !Array.isArray(topics)) {
|
||
|
payload = topics
|
||
|
topics = this.topics
|
||
|
}
|
||
|
if (typeof topics ==='string') topics = topics.split(',')
|
||
|
// console.log('topics using', topics)
|
||
|
let serial = this._serialize(payload)
|
||
|
if (serial) {
|
||
|
let pubs = []
|
||
|
topics.forEach( async topic => {
|
||
|
pubs.push(this.publish(topic,serial,options))
|
||
|
})
|
||
|
return await Promise.all(pubs)
|
||
|
|
||
|
}
|
||
|
}
|
||
|
|
||
|
registerPacketProcessor (func) {
|
||
|
this._packetProcess = func
|
||
|
}
|
||
|
|
||
|
_serialize(json) {
|
||
|
let [err,payload] = btc(JSON.stringify)(json)
|
||
|
if (err) { // await mqtt.unsubscribe('test2')
|
||
|
// await mqtt.send({cmd:'test', status:'I\'m good'})
|
||
|
// console.log('+++++++++++++++')
|
||
|
log.warn(`error unable to stringify json:${json} - send aborted`)
|
||
|
return null
|
||
|
}
|
||
|
return payload
|
||
|
}
|
||
|
|
||
|
_listen () {
|
||
|
log.info('listening for incoming packets from broker')
|
||
|
this.on('message',messageProcess.bind(this))
|
||
|
|
||
|
async function messageProcess (topic,payload) {
|
||
|
log.info('incoming messeage on topic', topic)
|
||
|
let packet = this._handlePayload(payload)
|
||
|
if (packet) await this._packetProcess (packet,topic)
|
||
|
|
||
|
}
|
||
|
}
|
||
|
|
||
|
_handlePayload (payload) {
|
||
|
let [err,packet] = btc(JSON.parse)(payload.toString())
|
||
|
if (err) {
|
||
|
log.fatal({payload:payload},'Could not parse JSON of payload')
|
||
|
// console.log('Could not parse JSON of payload', payload.toString())
|
||
|
return null
|
||
|
}
|
||
|
return packet
|
||
|
}
|
||
|
|
||
|
// default packet process just a simple console logger. ignores any cmd: prop
|
||
|
_packetProcess (packet,topic) {
|
||
|
console.log('=========================')
|
||
|
console.log('default consumer processor\npacket from broker on topic:',topic)
|
||
|
console.dir(packet)
|
||
|
console.log(packet.status)
|
||
|
console.log('========================')
|
||
|
// await mqtt.send({cmd:'test', status:'I\'m good'})
|
||
|
// console.log('+++++++++++++++')=')
|
||
|
}
|
||
|
|
||
|
async _log() {
|
||
|
|
||
|
log = logger({file:'src/client.js',class:'Client',name:'mqtt',id:this.id})
|
||
|
|
||
|
this.on('close', () => {
|
||
|
log.info('connection to broker was closed')
|
||
|
})
|
||
|
|
||
|
this.on('offline', () => {
|
||
|
log.info('this client has gone offline from broker')
|
||
|
})
|
||
|
|
||
|
this.on('packetsend', packet => {
|
||
|
log.info({packet:packet},'outgoing packet to mqtt broker')
|
||
|
})
|
||
|
|
||
|
this.on('packetreceive', packet => {
|
||
|
log.info({packet:packet},'incoming packet from mqtt broker')
|
||
|
})
|
||
|
|
||
|
}
|
||
|
|
||
|
_extend (obj, funcs) {
|
||
|
let temp = {}
|
||
|
funcs = funcs.split(',')
|
||
|
funcs.forEach(func => {
|
||
|
temp[func] = this[func]
|
||
|
})
|
||
|
merge(this,obj)
|
||
|
funcs.forEach(func => {
|
||
|
this['_'+func]=this[func]
|
||
|
this[func] = temp[func]
|
||
|
})
|
||
|
}
|
||
|
|
||
|
} // end mqtt client class
|