From ec953c01232eeebdd59c729c9bd98e5d8422cd9a Mon Sep 17 00:00:00 2001 From: David Kebler Date: Sun, 1 Sep 2019 18:14:16 -0700 Subject: [PATCH] refactor so send and push work like regular sockets except using mqtt as transport pub/sub still works traditionally with 'payload' event for incoming mqtt messages --- examples/client.js | 13 +-- examples/server.js | 31 +++---- package.json | 12 ++- src/mqtt.js | 220 +++++++++++++++++++++++++++++---------------- 4 files changed, 174 insertions(+), 102 deletions(-) diff --git a/examples/client.js b/examples/client.js index b0d1363..3f5ef95 100644 --- a/examples/client.js +++ b/examples/client.js @@ -1,13 +1,16 @@ import MQTT from '../src/mqtt' -// import btc from 'better-try-catch' -const HOST='nas.kebler.net' -let client = new MQTT({id:'example-mqtt-client', topics:'', connect:{hostname:HOST}}) +import pause from 'delay' +const BROKER='nas.kebler.net' +let client = new MQTT({id:'example-mqtt-client', client:true, connect:{host:BROKER}}) ; (async () => { - // mqtt.registerPacketProcessor(logit) + + client.on('pushed', (packet) => {console.log('pushed packet came\n', packet)}) console.log(await client.connect()) - client.send({cmd:'lights/on', id:'someidofalight'}) + await client.subscribe('test') + await pause(2000) // give time for server to start first + console.log('direct reply', await client.send({cmd:'test', msg:'this came from another mqtt client via broker'})) })().catch(err => { console.error('FATAL: UNABLE TO START SYSTEM!\n',err) diff --git a/examples/server.js b/examples/server.js index efa421c..66f4fa3 100644 --- a/examples/server.js +++ b/examples/server.js @@ -1,25 +1,20 @@ -import Client from '../src/client' +import MQTT from '../src/mqtt' // import btc from 'better-try-catch' -const HOST='nas.kebler.net' -let mqtt = new Client({id:'example-mqtt-client', topics:'', connect:{hostname:HOST}}) +const BROKER='nas.kebler.net' +let server = new MQTT({id:'example-mqtt-server', connect:{host:BROKER}}) -// async function logit (packet) { -// if (packet.cmd === 'lights') { -// console.log(packet) -// } -// return null -// } +async function test (packet) { + console.log('in packet processor', packet) + packet.success = true + packet.cmd ='reply' + packet.msg ='this mqtt client acting like a "server" processed the test message' + this.push({cmd:'test', data:'some data'}, 'pushed') + return packet +} -; (async () => { - // mqtt.registerPacketProcessor(logit) - console.log(await mqtt.connect()) - mqtt.send({cmd:'lights/status', id:'someidofalight'}) - mqtt.subscribe('test') - // await mqtt.send('topic2', {cmd:'test', status:'sent to topic2 explicitly but not listening'}) - // await mqtt.subscribe('topic2') - // await mqtt.send('topic2', {cmd:'test', status:'sent to topic2 with subscription'}) - // now can send via some mqtt client + server.registerPacketProcessor(test) + console.log(await server.connect()) })().catch(err => { console.error('FATAL: UNABLE TO START SYSTEM!\n',err) diff --git a/package.json b/package.json index c8dfed3..fb9da8f 100644 --- a/package.json +++ b/package.json @@ -4,10 +4,13 @@ "description": "mqtt client with UCI json packet payloads", "main": "src", "scripts": { - "testw": "mocha -r esm test/*.test.mjs --watch --recurse --watch-extensions mjs", - "test": "mocha -r esm test/*.test.mjs", - "client": "node -r esm example/client", - "dev": "UCI_ENV=dev UCI_LOG_LEVEL=debug ./node_modules/.bin/nodemon -r esm example/example" + "test": "mocha -r esm test/*.test.js", + "client": "./node_modules/.bin/nodemon -r esm examples/client", + "server": "./node_modules/.bin/nodemon -r esm examples/server", + "client:dev": "UCI_ENV=dev npm run client", + "server:dev": "UCI_ENV=dev npm run server", + "client:debug": "UCI_LOG_LEVEL=debug npm run client:dev", + "server:debug": "UCI_LOG_LEVEL=debug npm run server:dev" }, "author": "David Kebler", "license": "MIT", @@ -35,6 +38,7 @@ }, "devDependencies": { "chai": "^4.2.0", + "delay": "^4.3.0", "esm": "^3.2.25", "mocha": "^6.2.0", "nodemon": "^1.19.1" diff --git a/src/mqtt.js b/src/mqtt.js index 2b41898..8b16ca5 100644 --- a/src/mqtt.js +++ b/src/mqtt.js @@ -1,4 +1,5 @@ import { connect } from 'mqtt' +import EventEmitter from 'events' import btc from 'better-try-catch' import union from 'lodash.union' import xor from 'lodash.xor' @@ -10,10 +11,9 @@ let log = {} /** * MQTT - Client - An mqtt client that supports UCI packets - * it extends the {@link https://github.com/mqttjs/async-mqtt| async version this mqtt client } albiet in a clunky way with a private extend method because it was not written as an extendable class - * @extends MQTT.js + * uses mqtt.js as basis */ -class MQTTClient { +class MQTTClient extends EventEmitter { /** * constructor - Description * @@ -21,6 +21,7 @@ class MQTTClient { * */ constructor(opts = {}) { + super(opts) this.id = opts.id || opts.name || 'mqtt:' + new Date().getTime() log = logger({ file: 'src/client.js', @@ -29,23 +30,25 @@ class MQTTClient { name: 'mqtt', id: this.id }) - this.type = opts.type || 'client' this.url = opts.url || null - this.errorTopic = opts.errorTopic || 'error' // a generic error topic + this.client = opts.client || false + this.sendTopic = opts.sendTopic || 'uci/send' + // these are the "listen" topics to which the instance will subscribe. By default that will include the instance ID // default subscription topics can be string of commna delimited or array of strings. If none they by default topic is id - this.topics = Array.isArray(opts.topics) - ? opts.topics - : opts.topics - ? opts.topics.split(',') - : [this.id] - this.topics.push('reply') + this.topics = opts.topics ? (Array.isArray(opts.topics) ? opts.topics : opts.topics.split(',')) : [] + this.topics.push(this.client ? `${this.sendTopic}/return` : `${this.sendTopic}/receive`) this.connectOpts = opts.connect || {} // see options for new mqtt.Client this.mqtt = {} // orginal mqtt.js object when created by connect // self bindings + this._connected = false this.connect = this.connect.bind(this) - this.push = this.send + this.publish = this.publish.bind(this) + this._subscribe = this._subscribe.bind(this) + this.client ? this.send = this._send.bind(this) : this.push = this._push.bind(this) } + get connected() { return this._connected } + /** * connect - Description * @@ -62,21 +65,38 @@ class MQTTClient { reject({msg:'ending mqtt connection attempt, no broker', url:this.url, options:this.connectOpts}) }, timeout) - this.mqtt.once('connect', () => { + this.mqtt.once('connect', async () => { this._log() this._listen() this.mqtt.on('reconnect', () => { log.debug({method:'connect', line:71, msg:'mqtt client reconnected to broker'}) + this._connected=true this._subscribe(this.topics) // resubscribe }) this.mqtt.on('error', err => { + this.emit('status',{level:'fatal', method:'connect', err: err, msg:'connection error to broker'}) log.fatal({method:'connect', line:76, err: err, msg:'connection error to broker'}) + this._connected=false + }) + + this.mqtt.on('close', () => { + this.emit('status',{level:'fatal', msg:'mqtt connection to broker was closed'}) + log.debug({method:'_log', line:193, msg:'connection to broker was closed'}) + this._connected=false + }) + + this.mqtt.on('offline', () => { + this.emit('status',{level:'fatal',msg:'this client has gone offline from broker'}) + log.warn({method:'_log', line:197, msg:'this client has gone offline from broker'}) + this._connected=false }) - this._subscribe(this.topics) // initial subscriptions log.debug({method:'connect', line:80, options: this.mqtt.options, msg:`mqtt client connected to broker at ${this.mqtt.options.hostname}:${this.mqtt.options.port}`}) + this._connected=true + let [err] = await btc(this._subscribe)(this.topics) // initial subscriptions + if (err) reject({error:'unable to subscribe to topics after connection', err:err}) resolve(`mqtt client connected to broker at ${this.mqtt.options.hostname}:${this.mqtt.options.port}`) }) }) //end promise @@ -87,17 +107,16 @@ class MQTTClient { if (typeof topic === 'string') topic = topic.split(',') this.topics = union(this.topics, topic || []) log.debug({method:'subscribe', line:91, msg:`subscription for topic ${topic} added`}) - this._subscribe(topic,options) + return this._subscribe(topic,options) } - async unsubscribe(topic, options = {}) { if (typeof topic === 'string') topic = topic.split(',') this.topics = xor(this.topics, topic || []) return new Promise((resolve, reject) => { if (!topic) reject('no topic supplied') - if (!this.mqtt.connected) reject('no connection to broker') + if (!this._connected) reject('no connection to broker') this.mqtt.unsubscribe(topic,options, (err, granted) => { if (err) reject(err) else resolve(granted) @@ -105,48 +124,99 @@ class MQTTClient { }) } - // sends a uci packet as an mqtt message where cmd: is topic - // TODO if sending as a client (default) add a _header with ID to packet and set subcription for id and in _listen emit id with payload when it comes back - // if a "server" then after processing publish back as /ID/theidnumber - async send(packet, topic, options) { + // solely for primary of sending uci packets. Use publish for typical mqtt messaging + // if topic not passed, nor a severID supplied in the options then it will send to teh topic given by the cmd + async _send(ipacket, topic, options) { // packet is required - log.debug(({msg:'sending a packet', packet:packet,topic:topic,options:options})) - - if (!this.mqtt.connected) { - log.error({method:'send', line:105, url:this.url, opts:this.opts, msg:'no active connection to broker'}) - return Promise.reject('no active connection to broker') - } - - if (topic == null || typeof topic !== 'string') { // if neither then assume none and what is passed is options + if (isPlainObject(topic)) { // if neither then assume none and what is passed is options options = topic || {} - topic ='' + topic = '' } - let payload - if (typeof packet !== 'string') { - // packet is an object - topic = packet.cmd ? packet.cmd.replace(/\./gi,'/') : topic - if (packet.payload) payload = packet.payload.toString() - else { - payload = this._serialize(packet) // payload is entire packet - if (!payload) payload = this._serialize({ err: 'could not serialze packet' }) + return new Promise(async resolve => { + if (!this._connected) { + log.error({method:'send', line:105, url:this.url, opts:this.opts, msg:'no active connection to broker'}) + resolve({ error: 'client not connected to a broker, aborting send' }) } - } else payload = packet // packet was just a string so it's the payload - if (topic.length === 0) { - log.warn({msg:'publish aborted, no topic pass nor command in packet', packet:packet, topic:topic}) - return {error:'publish aborted, no topic given or no command in packet', packet:packet, topic:topic} + let packet = Object.assign({}, ipacket) // need to avoid mutuation for different consumers using same packet instance + + packet._header = { + id: Math.random() + .toString() + .slice(2), // need this for when multiple sends for different consumers use same packet instanceack + returnTopic: `${this.sendTopic}/return` + } + + topic = topic || `${this.sendTopic}/receive` + + let payload = this._serialize(packet) // payload is entire packet + if (!payload) { + log.error({method:'send', line:163, msg:'not able to serilaize packet', packet:packet}) + resolve({error:'not able to serilaize packet', packet:packet}) + } + + setTimeout(() => {resolve({ error: 'no response from socket in 10sec' })}, 10000) + this.once(packet._header.id, async reply => { + let res = await this._packetProcess(reply) + if (!res) { // if packetProcess was not promise + res = reply + log.debug({method:'send', line:180, msg:'consumer function was not promise returning further processing may be out of sequence'}) + } + resolve(res) // resolves processed packet not returned packet + }) + + let res = await this._publish(packet,topic,options) + if (res) resolve(res) + + }) // end promise + } + + async _push(packet={}, topic, options) { + + if (!this._connected) { + log.error({method:'send', line:105, url:this.url, opts:this.opts, msg:'no active connection to broker'}) + return {error:'no active connection to broker'} } - log.debug({method:'send', line:127, msg:'mqtt publishing', payload:payload, topic:topic}) - return await this.publish(topic, payload, options) + if (isPlainObject(topic)) { // if neither then assume none and what is passed is options + options = topic || {} + topic = '' + } + + topic = topic || packet.cmd ? packet.cmd.replace(/\./gi,'/') : '' + + if (!topic) { + log.warn({msg:'push aborted, no topic pass nor command in packet', packet:packet, topic:topic}) + return {error:'push aborted, no topic given or no command in packet', packet:packet, topic:topic} + } + + packet._header ={id:'pushed'} + log.info({method:'push', line:194, msg:'mqtt publishing a uci pushed packet', packet:packet, topic:topic}) + return await this._publish(packet, topic, options) + } + + // this is internal UCI publish + async _publish(packet,topic,options) { + if(!topic) return {error: 'no topic to which to publish packet'} + let payload = this._serialize(packet) // payload is entire packet + if (!payload) { + log.error({method:'send', line:163, msg:'not able to serilaize packet', packet:packet}) + return {error:'not able to serilaize packet', packet:packet} + } + // all ready publish packets + log.info(({msg:'sending a uci packet vis mqtt', packet:packet,topic:topic,payload:payload, options:options})) + let [err] = await btc(this.publish)(topic,payload,options) + if (err) return {error:err} } publish (...args) { return new Promise((resolve, reject) => { - if (!this.mqtt.connected) reject('no connection to broker') + if (!this._connected) reject('publish failed - no connection to broker') this.mqtt.publish(...args, (err, result) => { - if (err) reject(err) + if (err) { + reject(err) + } else resolve(result) }) }) @@ -154,7 +224,7 @@ class MQTTClient { end (...args) { return new Promise((resolve, reject) => { - this._client.end(...args, (err, result) => { + this.mqtt.end(...args, (err, result) => { if (err) reject(err) else resolve(result) }) @@ -167,7 +237,7 @@ class MQTTClient { async _subscribe (topic, options={}) { return new Promise((resolve, reject) => { - if (!this.mqtt.connected) reject('no connection to broker') + if (!this._connected) reject('no connection to broker during subscription attempt') if (!topic) reject('no topic supplied') this.mqtt.subscribe(topic,options, (err, granted) => { if (err) reject(err) @@ -187,45 +257,53 @@ class MQTTClient { // TODO if client set up reply subscription with message id and then emit that for resolving send // if server then take id and send message back with that appended to topic _listen() { - log.info({method:'_listen', line:147, msg:`listening for incoming mqtt packets from broker on topics ${this.topics}`}) + log.debug({method:'_listen', line:147, msg:`listening for incoming mqtt packets from broker on topics ${this.topics}`}) + this.mqtt.on('message', messageProcess.bind(this)) async function messageProcess(topic, payload) { log.debug({method:'_listen', line:153 ,msg:'incoming mqtt message to process', topic:topic, payload:payload}) let packet = this._handlePayload(payload) - if (packet) packet.cmd = packet.cmd || topic // if payload had no command so use topic as cmd - else packet = { cmd: topic } - let res = (await this._packetProcess(packet, topic)) || {} - log.debug({method:'_listen', line:159, msg:'processed packet back to broker', res:res, topics:this.topics}) - if (this.topics.some(topic => isTopicMatch(topic,res.cmd || ''))) { - if (this.topics.some( topic => isTopicMatch(topic,this.errorTopic))) { - log.warn({method:'_listen', line:165, msg:'unable to send error, subscried to error topic', subscribed:this.topics, errorTopic:this.errorTopic}) - return - } - else { - res = {cmd:'error', msg:`unable to send this response, response topic '${res.cmd}'' matches a topic [${this.topics}] - subscribed to by sender`, response:res, subcribed:this.topics} - log.warn({method:'_listen', line:168,msg:'sent mqtt error message', error:res}) + if (isPlainObject(packet)) packet.cmd = packet.cmd || topic // if payload had no command so use topic as cmd + else packet = { cmd: topic, data:packet } // payload was not json packet, create a packet and emit payload + this.emit('payload',{topic:topic, payload:payload}) // emit 'payload' event for use for standard mqtt pub/sub + let header = Object.assign({},packet._header) // save the header in case a packet processor deletes it + let res = await this._packetProcess(packet) + if (header && this.client) this.emit(header.id,packet) + else { + res._header = header + log.info({method:'_listen', line:159, msg:'processed packet - return only if sent not published', processed:res}) + if (res._header) { + if(res._header.returnTopic) this._publish(res,res._header.returnTopic) } } - // TODO if acting as a "server" then send back the result, if client and ID is matched then emit for send function - // if (res) this.send(res) + } } _handlePayload(payload) { let [err, packet] = btc(JSON.parse)(payload.toString()) if (err) log.debug({method:'_handlePayload', line:201, msg:'payload is not json returning as prop data:'}) - if (!isPlainObject(packet)) return { data: payload.toString() } + if (!isPlainObject(packet)) return payload.toString() return packet } + _validateTopic (topic) { + if (this.topics.some(ctopic => isTopicMatch(ctopic,topic))) { + log.warn({msg:'topic not valid for publishing, matches a subscribed topic or error topic', topic:topic, errorTopic:this.errorTopics, subscribed:this.topics}) + return false + } + return true + } + // default packet process just a simple console logger. ignores any cmd: prop - _packetProcess(packet, topic) { + _packetProcess(packet) { console.log('=========================') - console.log('default consumer processor\npacket from broker on topic:',topic) + console.log('received packet to be processed=>',packet) console.log('replace this by setting new function with .registerPacketProcessor') - console.dir(packet) + packet.msg='the default packet processeor processed this packet' console.log('========================') + return packet } async _log() { @@ -236,14 +314,6 @@ class MQTTClient { id: this.id }) - this.mqtt.on('close', () => { - log.debug({method:'_log', line:193, msg:'connection to broker was closed'}) - }) - - this.mqtt.on('offline', () => { - log.warn({method:'_log', line:197, msg:'this client has gone offline from broker'}) - }) - this.mqtt.on('packetsend', packet => { log.debug({method:'_log', line:201, packet: packet, msg:'outgoing packet to mqtt broker'}) })