import { connect } from 'mqtt' import EventEmitter from 'events' import btc from 'better-try-catch' import union from 'lodash.union' import xor from 'lodash.xor' import logger from '@uci-utils/logger' import isPlainObject from 'is-plain-object' import isTopicMatch from 'mqtt-match' let log = {} /** * MQTT - Client - An mqtt client that supports UCI packets * uses mqtt.js as basis */ class MQTTClient extends EventEmitter { /** * constructor - Description * * @param {object} [opts={}] Description * */ constructor(opts = {}) { super(opts) this.id = opts.id || opts.name || 'mqtt:' + new Date().getTime() log = logger({ file: 'src/client.js', package:'@uci/mqttjs', class: 'MQTTClient', name: 'mqtt', id: this.id }) this.url = opts.url || null this.client = opts.client || false this.sendTopic = opts.sendTopic || 'uci/send' this.opts = opts // these are the "listen" topics to which the instance will subscribe. By default that will include the instance ID // default subscription topics can be string of commna delimited or array of strings. If none they by default topic is id this.topics = opts.topics ? (Array.isArray(opts.topics) ? opts.topics : opts.topics.split(',')) : [] this.topics.push(this.client ? `${this.sendTopic}/return` : `${this.sendTopic}/receive`) this.connectOpts = opts.connect || {} // see options for new mqtt.Client this.mqtt = {} // orginal mqtt.js object when created by connect // self bindings this._connected = false this.conPackets = opts.conPackets || [opts.conPacket] this.connect = this.connect.bind(this) this.publish = this.publish.bind(this) this._subscribe = this._subscribe.bind(this) this.client ? this.send = this._send.bind(this) : this.push = this._push.bind(this) } // TODO add UCI authentication and MQTT authentication get connected() { return this._connected } /** * connect - Description * * @returns {type} Description */ async connect() { return new Promise((resolve, reject) => { this.mqtt = connect( this.url, this.connectOpts ) let timeout = this.connecOpts || 5000 setTimeout(() => { reject({msg:'ending mqtt connection attempt, no broker', url:this.url, options:this.connectOpts}) }, timeout) this.mqtt.once('connect', async () => { this._log() this._listen() this.mqtt.on('reconnect', () => { log.debug({method:'connect', line:71, msg:'mqtt client reconnected to broker'}) this._connected=true this._subscribe(this.topics) // resubscribe this.emit('status',{level:'info', msg:'reconnected', id:this.id, opts:this.opts, connected:this._connected}) this.emit('connection:consumer', {state:'connected', id:this.id, mqtt:true}) if (this.opts.conPacket) this.send(this.conPacket) }) this.mqtt.on('error', err => { this.emit('status',{level:'fatal', method:'connect', err: err, msg:'connection error to broker'}) log.fatal({method:'connect', line:76, err: err, msg:'connection error to broker'}) this.emit('connection:consumer', {state:'disconnected', id:this.id, mqtt:true}) this._connected=false }) this.mqtt.on('close', () => { this.emit('status',{level:'fatal', msg:'mqtt connection to broker was closed'}) log.debug({method:'_log', line:193, msg:'connection to broker was closed'}) this._connected=false }) this.mqtt.on('offline', () => { this.emit('status',{level:'fatal',msg:'this client has gone offline from broker'}) log.warn({method:'_log', line:197, msg:'this client has gone offline from broker'}) this._connected=false }) log.debug({method:'connect', line:80, options: this.mqtt.options, msg:`mqtt client connected to broker at ${this.mqtt.options.hostname}:${this.mqtt.options.port}`}) this._connected=true let [err] = await btc(this._subscribe)(this.topics) // initial subscriptions if (err) reject({error:'unable to subscribe to topics after connection', err:err}) else{ this.emit('status',{level:'info', msg:'connected', id:this.id, opts:this.opts, connected:this._connected}) this.emit('connection:consumer', {state:'connected', id:this.id, mqtt:true}) if (this.opts.conPacket) this.send(this.conPacket) resolve(`mqtt client connected to broker at ${this.mqtt.options.hostname}:${this.mqtt.options.port}`) } }) }) //end promise } async subscribe(topic, options = {}) { // topic is only comma delimited string or array of topics if (typeof topic === 'string') topic = topic.split(',') this.topics = union(this.topics, topic || []) log.debug({method:'subscribe', line:91, msg:`subscription for topic ${topic} added`}) return this._subscribe(topic,options) } async unsubscribe(topic, options = {}) { if (typeof topic === 'string') topic = topic.split(',') this.topics = xor(this.topics, topic || []) return new Promise((resolve, reject) => { if (!topic) reject('no topic supplied') if (!this._connected) reject('no connection to broker') this.mqtt.unsubscribe(topic,options, (err, granted) => { if (err) reject(err) else resolve(granted) }) }) } // solely for primary of sending uci packets. Use publish for typical mqtt messaging // if topic not passed then it will send to the topic given by the cmd async _send(ipacket, topic, options) { // packet is required if (!ipacket || !Object.keys(ipacket).length) return {error:'send aborted, no packet passed'} if (isPlainObject(topic)) { // if neither then assume none and what is passed is options options = topic || {} topic = '' } return new Promise(async resolve => { if (!this._connected) { log.error({method:'send', line:105, url:this.url, opts:this.opts, msg:'no active connection to broker'}) resolve({ error: 'client not connected to a broker, aborting send' }) } let packet = Object.assign({}, ipacket) // need to avoid mutuation for different consumers using same packet instance packet._header = { id: Math.random() .toString() .slice(2), // need this for when multiple sends for different consumers use same packet instanceack returnTopic: `${this.sendTopic}/return` } topic = topic || `${this.sendTopic}/receive` let payload = this._serialize(packet) // payload is entire packet if (!payload) { log.error({method:'send', line:163, msg:'not able to serilaize packet', packet:packet}) resolve({error:'not able to serilaize packet', packet:packet}) } setTimeout(() => {resolve({ error: 'no response from socket in 10sec' })}, 10000) this.once(packet._header.id, async reply => { let res = await this._packetProcess(reply) if (!res) { // if packetProcess was not promise res = reply log.debug({method:'send', line:180, msg:'consumer function was not promise returning further processing may be out of sequence'}) } resolve(res) // resolves processed packet not returned packet }) let res = await this._publish(packet,topic,options) if (res) resolve(res) }) // end promise } async _push(packet, topic, options) { if (!packet || !Object.keys(packet).length) return {error:'push aborted, no packet passed', packet:packet, topic:topic} if (!this._connected) { log.error({method:'send', line:105, url:this.url, opts:this.opts, msg:'no active connection to broker'}) return {error:'no active connection to broker'} } if (isPlainObject(topic)) { // if neither then assume none and what is passed is options options = topic || {} topic = '' } topic = topic || packet.cmd ? packet.cmd.replace(/\./gi,'/') : '' if (!topic) { log.warn({msg:'push aborted, no topic pass nor command in packet', packet:packet, topic:topic}) return {error:'push aborted, no topic given or no command in packet', packet:packet, topic:topic} } packet._header ={id:'pushed'} log.info({method:'push', line:194, msg:'mqtt publishing a uci pushed packet', packet:packet, topic:topic}) return await this._publish(packet, topic, options) } // this is internal UCI publish async _publish(packet,topic,options) { if(!topic) return {error: 'no topic to which to publish packet'} let payload = packet.payload!=null ? ( (typeof packet.payload ==='string' ? packet.payload : this._serialize(packet.payload)) ) : this._serialize(packet) // payload is entire packet if (!payload) { log.error({method:'send', line:163, msg:'not able to serilaize packet', packet:packet}) return {error:'not able to serilaize packet', packet:packet} } // all ready publish packets log.info(({msg:'sending a uci packet vis mqtt', packet:packet,topic:topic,payload:payload, options:options})) let [err] = await btc(this.publish)(topic,payload,options) if (err) return {error:err} } publish (...args) { return new Promise((resolve, reject) => { if (!this._connected) reject('publish failed - no connection to broker') this.mqtt.publish(...args, (err, result) => { if (err) { reject(err) } else resolve(result) }) }) } end (...args) { return new Promise((resolve, reject) => { this.mqtt.end(...args, (err, result) => { if (err) reject(err) else resolve(result) }) }) } registerPacketProcessor(func) { this._packetProcess = func } async _subscribe (topic, options={}) { return new Promise((resolve, reject) => { if (!this._connected) reject('no connection to broker during subscription attempt') if (!topic) reject('no topic supplied') this.mqtt.subscribe(topic,options, (err, granted) => { if (err) reject(err) else resolve(granted) }) }) } _serialize(json) { let [err, payload] = btc(JSON.stringify)(json) if (err) { log.warn({method:'_serialize', line:140, msg:`unable to stringify json for payload:${json}`}) return null } return payload } _listen() { const msg = `listening for incoming mqtt packets from broker on topics ${this.topics}` log.debug({method:'_listen', line:147, msg:msg}) this.emit('socket',{state:'listening', msg:msg}) this.mqtt.on('message', messageProcess.bind(this)) async function messageProcess(topic, payload) { log.debug({method:'_listen', line:153 ,msg:'incoming mqtt message to process', topic:topic, payload:payload}) let packet = this._handlePayload(payload) if (isPlainObject(packet)) packet.cmd = packet.cmd || topic // if payload had no command so use topic as cmd else packet = { cmd: topic, data:packet } // payload was not json packet, create a packet and emit payload this.emit('payload',{topic:topic, payload:payload}) // emit 'payload' event for use for standard mqtt pub/sub let header = Object.assign({},packet._header) // save the header in case a packet processor deletes it let res = await this._packetProcess(packet) if (header && this.client) this.emit(header.id,packet) // is acting like a client then emit returned packet else { // act like a server, process and return reply res._header = header log.info({method:'_listen', line:159, msg:'processed packet - return only if sent not published', processed:res}) if (res._header) { if(res._header.returnTopic) this._publish(res,res._header.returnTopic) } } } } _handlePayload(payload) { let [err, packet] = btc(JSON.parse)(payload.toString()) if (err) log.debug({method:'_handlePayload', line:201, msg:'payload is not json returning as prop data:'}) if (!isPlainObject(packet)) return payload.toString() return packet } _validateTopic (topic) { if (this.topics.some(ctopic => isTopicMatch(ctopic,topic))) { log.warn({msg:'topic not valid for publishing, matches a subscribed topic or error topic', topic:topic, errorTopic:this.errorTopics, subscribed:this.topics}) return false } return true } // default packet process just a simple console logger. ignores any cmd: prop _packetProcess(packet) { console.log('=========================') console.log('received packet to be processed=>',packet) console.log('replace this by setting new function with .registerPacketProcessor') packet.msg='the default packet processeor processed this packet' console.log('========================') return packet } async _log() { log = logger({ file: 'src/client.js', class: 'Client', package: '@uci/mqtt', id: this.id }) this.mqtt.on('packetsend', packet => { log.debug({method:'_log', line:201, packet: packet, msg:'outgoing packet to mqtt broker'}) }) this.mqtt.on('packetreceive', packet => { log.debug({method:'_log', line:205, packet: packet, msg:'incoming packet from mqtt broker'}) }) } } // end mqtt client class export default MQTTClient