import { connect } from 'async-mqtt' import merge from 'lodash.merge' import btc from 'better-try-catch' import union from 'lodash.union' import xor from 'lodash.xor' import logger from '@uci/logger' let log = {} export default class Client { constructor (opts={}) { this.id = opts.id || opts.name || 'mqtt:'+ new Date().getTime() this.url = opts.url // subscription topics can be string of commna delimited or array of strings see object see mqtt.js docs this.topics = Array.isArray(opts.topics) ? opts.topics : (opts.topics ? opts.topics.split(',') : [this.id]) console.log(this.topics) this.opts = opts.connect || {} // see options for new mqtt.Client // self bindings this.connect = this.connect.bind(this) } async connect () { return new Promise( (resolve,reject) => { // connect returns connected mqtt client instance so merge it to class // console.log(this) let mqtt = connect(this.url,this.opts) this._extend(mqtt,'subscribe,unsubscribe') this._log() this._listen() this.on('reconnect', () => { log.info('mqtt client reconnected to broker' ) this.subscribe(this.topics) }) this.once('connect', () => { this.subscribe(this.topics) log.info('mqtt client connected to broker' ) resolve(`mqtt client connected to broker at ${this._client.options.host}`) }) this.on('error', err => { log.fatal({err:err},'connection error to broker' ) console.log('connection error',err.code) reject(err) }) }) //end promise } async subscribe(topic,options={}) { if (options.pub) { if (typeof topic==='string') topic = topic.split(',') this.topics=union(this.topics,topic) } this._subscribe(topic,options) } async unsubscribe(topic,options={}) { if (!options.pub) { if (typeof topic==='string') topic = topic.split(',') this.topics=xor(this.topics,topic) } this._unsubscribe(topic) } async send(topics,payload,options) { if (typeof topics !=='string'|| !Array.isArray(topics)) { payload = topics topics = this.topics } if (typeof topics ==='string') topics = topics.split(',') let serial = this._serialize(payload) if (serial) { let pubs = [] topics.forEach( async topic => { pubs.push(this.publish(topic,serial,options)) }) return await Promise.all(pubs) } } registerPacketProcessor (func) { this._packetProcess = func } _serialize(json) { let [err,payload] = btc(JSON.stringify)(json) if (err) { // await mqtt.unsubscribe('test2') // await mqtt.send({cmd:'test', status:'I\'m good'}) // console.log('+++++++++++++++') log.warn(`error unable to stringify json:${json} - send aborted`) return null } return payload } _listen () { log.info('listening for incoming packets from broker') this.on('message',messageProcess.bind(this)) async function messageProcess (topic,payload) { console.log('incoming messeage on topic', topic) let packet = this._handlePayload(payload) if (packet) await this._packetProcess (packet,topic) } } _handlePayload (payload) { let [err,packet] = btc(JSON.parse)(payload.toString()) if (err) { log.fatal({payload:payload},'Could not parse JSON of payload') console.log('Could not parse JSON of payload', payload.toString()) return null } return packet } // default packet process just a simple console logger. ignores any cmd: prop _packetProcess (packet,topic) { console.log('=========================') console.log('default consumer processor\npacket from broker on topic:',topic) console.dir(packet) console.log(packet.status) console.log('========================') // await mqtt.send({cmd:'test', status:'I\'m good'}) // console.log('+++++++++++++++')=') } async _log() { const LOG_OPTS = { repo:'uci-mqtt', npm:'@uci/mqtt', file:'src/client.mjs', class:'Client', id:this.id, instance_created:new Date().getTime(), mqtt: this._client.options // await mqtt.unsubscribe('test2') // await mqtt.send({cmd:'test', status:'I\'m good'}) // console.log('+++++++++++++++') } log = logger.child(LOG_OPTS) this.on('close', () => { log.info('connection to broker was closed') }) this.on('offline', () => { log.info('this client has gone offline from broker') }) this.on('packetsend', packet => { log.info({packet:packet},'outgoing packet to mqtt broker') }) this.on('packetreceive', packet => { log.info({packet:packet},'incoming packet from mqtt broker') }) } _extend (obj, funcs) { let temp = {} funcs = funcs.split(',') funcs.forEach(func => { temp[func] = this[func] }) merge(this,obj) funcs.forEach(func => { this['_'+func]=this[func] this[func] = temp[func] }) } } // end mqtt client class