refactor so send and push work like regular sockets except using mqtt as transport

pub/sub still works traditionally with 'payload' event for incoming mqtt messages
master
David Kebler 2019-09-01 18:14:16 -07:00
parent 1e334022f4
commit ec953c0123
4 changed files with 174 additions and 102 deletions

View File

@ -1,13 +1,16 @@
import MQTT from '../src/mqtt' import MQTT from '../src/mqtt'
// import btc from 'better-try-catch' import pause from 'delay'
const HOST='nas.kebler.net' const BROKER='nas.kebler.net'
let client = new MQTT({id:'example-mqtt-client', topics:'', connect:{hostname:HOST}}) let client = new MQTT({id:'example-mqtt-client', client:true, connect:{host:BROKER}})
; ;
(async () => { (async () => {
// mqtt.registerPacketProcessor(logit)
client.on('pushed', (packet) => {console.log('pushed packet came\n', packet)})
console.log(await client.connect()) console.log(await client.connect())
client.send({cmd:'lights/on', id:'someidofalight'}) await client.subscribe('test')
await pause(2000) // give time for server to start first
console.log('direct reply', await client.send({cmd:'test', msg:'this came from another mqtt client via broker'}))
})().catch(err => { })().catch(err => {
console.error('FATAL: UNABLE TO START SYSTEM!\n',err) console.error('FATAL: UNABLE TO START SYSTEM!\n',err)

View File

@ -1,25 +1,20 @@
import Client from '../src/client' import MQTT from '../src/mqtt'
// import btc from 'better-try-catch' // import btc from 'better-try-catch'
const HOST='nas.kebler.net' const BROKER='nas.kebler.net'
let mqtt = new Client({id:'example-mqtt-client', topics:'', connect:{hostname:HOST}}) let server = new MQTT({id:'example-mqtt-server', connect:{host:BROKER}})
// async function logit (packet) { async function test (packet) {
// if (packet.cmd === 'lights') { console.log('in packet processor', packet)
// console.log(packet) packet.success = true
// } packet.cmd ='reply'
// return null packet.msg ='this mqtt client acting like a "server" processed the test message'
// } this.push({cmd:'test', data:'some data'}, 'pushed')
return packet
}
;
(async () => { (async () => {
// mqtt.registerPacketProcessor(logit) server.registerPacketProcessor(test)
console.log(await mqtt.connect()) console.log(await server.connect())
mqtt.send({cmd:'lights/status', id:'someidofalight'})
mqtt.subscribe('test')
// await mqtt.send('topic2', {cmd:'test', status:'sent to topic2 explicitly but not listening'})
// await mqtt.subscribe('topic2')
// await mqtt.send('topic2', {cmd:'test', status:'sent to topic2 with subscription'})
// now can send via some mqtt client
})().catch(err => { })().catch(err => {
console.error('FATAL: UNABLE TO START SYSTEM!\n',err) console.error('FATAL: UNABLE TO START SYSTEM!\n',err)

View File

@ -4,10 +4,13 @@
"description": "mqtt client with UCI json packet payloads", "description": "mqtt client with UCI json packet payloads",
"main": "src", "main": "src",
"scripts": { "scripts": {
"testw": "mocha -r esm test/*.test.mjs --watch --recurse --watch-extensions mjs", "test": "mocha -r esm test/*.test.js",
"test": "mocha -r esm test/*.test.mjs", "client": "./node_modules/.bin/nodemon -r esm examples/client",
"client": "node -r esm example/client", "server": "./node_modules/.bin/nodemon -r esm examples/server",
"dev": "UCI_ENV=dev UCI_LOG_LEVEL=debug ./node_modules/.bin/nodemon -r esm example/example" "client:dev": "UCI_ENV=dev npm run client",
"server:dev": "UCI_ENV=dev npm run server",
"client:debug": "UCI_LOG_LEVEL=debug npm run client:dev",
"server:debug": "UCI_LOG_LEVEL=debug npm run server:dev"
}, },
"author": "David Kebler", "author": "David Kebler",
"license": "MIT", "license": "MIT",
@ -35,6 +38,7 @@
}, },
"devDependencies": { "devDependencies": {
"chai": "^4.2.0", "chai": "^4.2.0",
"delay": "^4.3.0",
"esm": "^3.2.25", "esm": "^3.2.25",
"mocha": "^6.2.0", "mocha": "^6.2.0",
"nodemon": "^1.19.1" "nodemon": "^1.19.1"

View File

@ -1,4 +1,5 @@
import { connect } from 'mqtt' import { connect } from 'mqtt'
import EventEmitter from 'events'
import btc from 'better-try-catch' import btc from 'better-try-catch'
import union from 'lodash.union' import union from 'lodash.union'
import xor from 'lodash.xor' import xor from 'lodash.xor'
@ -10,10 +11,9 @@ let log = {}
/** /**
* MQTT - Client - An mqtt client that supports UCI packets * MQTT - Client - An mqtt client that supports UCI packets
* it extends the {@link https://github.com/mqttjs/async-mqtt| async version this mqtt client } albiet in a clunky way with a private extend method because it was not written as an extendable class * uses mqtt.js as basis
* @extends MQTT.js
*/ */
class MQTTClient { class MQTTClient extends EventEmitter {
/** /**
* constructor - Description * constructor - Description
* *
@ -21,6 +21,7 @@ class MQTTClient {
* *
*/ */
constructor(opts = {}) { constructor(opts = {}) {
super(opts)
this.id = opts.id || opts.name || 'mqtt:' + new Date().getTime() this.id = opts.id || opts.name || 'mqtt:' + new Date().getTime()
log = logger({ log = logger({
file: 'src/client.js', file: 'src/client.js',
@ -29,23 +30,25 @@ class MQTTClient {
name: 'mqtt', name: 'mqtt',
id: this.id id: this.id
}) })
this.type = opts.type || 'client'
this.url = opts.url || null this.url = opts.url || null
this.errorTopic = opts.errorTopic || 'error' // a generic error topic this.client = opts.client || false
this.sendTopic = opts.sendTopic || 'uci/send'
// these are the "listen" topics to which the instance will subscribe. By default that will include the instance ID
// default subscription topics can be string of commna delimited or array of strings. If none they by default topic is id // default subscription topics can be string of commna delimited or array of strings. If none they by default topic is id
this.topics = Array.isArray(opts.topics) this.topics = opts.topics ? (Array.isArray(opts.topics) ? opts.topics : opts.topics.split(',')) : []
? opts.topics this.topics.push(this.client ? `${this.sendTopic}/return` : `${this.sendTopic}/receive`)
: opts.topics
? opts.topics.split(',')
: [this.id]
this.topics.push('reply')
this.connectOpts = opts.connect || {} // see options for new mqtt.Client this.connectOpts = opts.connect || {} // see options for new mqtt.Client
this.mqtt = {} // orginal mqtt.js object when created by connect this.mqtt = {} // orginal mqtt.js object when created by connect
// self bindings // self bindings
this._connected = false
this.connect = this.connect.bind(this) this.connect = this.connect.bind(this)
this.push = this.send this.publish = this.publish.bind(this)
this._subscribe = this._subscribe.bind(this)
this.client ? this.send = this._send.bind(this) : this.push = this._push.bind(this)
} }
get connected() { return this._connected }
/** /**
* connect - Description * connect - Description
* *
@ -62,21 +65,38 @@ class MQTTClient {
reject({msg:'ending mqtt connection attempt, no broker', url:this.url, options:this.connectOpts}) reject({msg:'ending mqtt connection attempt, no broker', url:this.url, options:this.connectOpts})
}, timeout) }, timeout)
this.mqtt.once('connect', () => { this.mqtt.once('connect', async () => {
this._log() this._log()
this._listen() this._listen()
this.mqtt.on('reconnect', () => { this.mqtt.on('reconnect', () => {
log.debug({method:'connect', line:71, msg:'mqtt client reconnected to broker'}) log.debug({method:'connect', line:71, msg:'mqtt client reconnected to broker'})
this._connected=true
this._subscribe(this.topics) // resubscribe this._subscribe(this.topics) // resubscribe
}) })
this.mqtt.on('error', err => { this.mqtt.on('error', err => {
this.emit('status',{level:'fatal', method:'connect', err: err, msg:'connection error to broker'})
log.fatal({method:'connect', line:76, err: err, msg:'connection error to broker'}) log.fatal({method:'connect', line:76, err: err, msg:'connection error to broker'})
this._connected=false
})
this.mqtt.on('close', () => {
this.emit('status',{level:'fatal', msg:'mqtt connection to broker was closed'})
log.debug({method:'_log', line:193, msg:'connection to broker was closed'})
this._connected=false
})
this.mqtt.on('offline', () => {
this.emit('status',{level:'fatal',msg:'this client has gone offline from broker'})
log.warn({method:'_log', line:197, msg:'this client has gone offline from broker'})
this._connected=false
}) })
this._subscribe(this.topics) // initial subscriptions
log.debug({method:'connect', line:80, options: this.mqtt.options, msg:`mqtt client connected to broker at ${this.mqtt.options.hostname}:${this.mqtt.options.port}`}) log.debug({method:'connect', line:80, options: this.mqtt.options, msg:`mqtt client connected to broker at ${this.mqtt.options.hostname}:${this.mqtt.options.port}`})
this._connected=true
let [err] = await btc(this._subscribe)(this.topics) // initial subscriptions
if (err) reject({error:'unable to subscribe to topics after connection', err:err})
resolve(`mqtt client connected to broker at ${this.mqtt.options.hostname}:${this.mqtt.options.port}`) resolve(`mqtt client connected to broker at ${this.mqtt.options.hostname}:${this.mqtt.options.port}`)
}) })
}) //end promise }) //end promise
@ -87,17 +107,16 @@ class MQTTClient {
if (typeof topic === 'string') topic = topic.split(',') if (typeof topic === 'string') topic = topic.split(',')
this.topics = union(this.topics, topic || []) this.topics = union(this.topics, topic || [])
log.debug({method:'subscribe', line:91, msg:`subscription for topic ${topic} added`}) log.debug({method:'subscribe', line:91, msg:`subscription for topic ${topic} added`})
this._subscribe(topic,options) return this._subscribe(topic,options)
} }
async unsubscribe(topic, options = {}) { async unsubscribe(topic, options = {}) {
if (typeof topic === 'string') topic = topic.split(',') if (typeof topic === 'string') topic = topic.split(',')
this.topics = xor(this.topics, topic || []) this.topics = xor(this.topics, topic || [])
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
if (!topic) reject('no topic supplied') if (!topic) reject('no topic supplied')
if (!this.mqtt.connected) reject('no connection to broker') if (!this._connected) reject('no connection to broker')
this.mqtt.unsubscribe(topic,options, (err, granted) => { this.mqtt.unsubscribe(topic,options, (err, granted) => {
if (err) reject(err) if (err) reject(err)
else resolve(granted) else resolve(granted)
@ -105,48 +124,99 @@ class MQTTClient {
}) })
} }
// sends a uci packet as an mqtt message where cmd: is topic // solely for primary of sending uci packets. Use publish for typical mqtt messaging
// TODO if sending as a client (default) add a _header with ID to packet and set subcription for id and in _listen emit id with payload when it comes back // if topic not passed, nor a severID supplied in the options then it will send to teh topic given by the cmd
// if a "server" then after processing publish back as /ID/theidnumber async _send(ipacket, topic, options) { // packet is required
async send(packet, topic, options) {
log.debug(({msg:'sending a packet', packet:packet,topic:topic,options:options})) if (isPlainObject(topic)) { // if neither then assume none and what is passed is options
if (!this.mqtt.connected) {
log.error({method:'send', line:105, url:this.url, opts:this.opts, msg:'no active connection to broker'})
return Promise.reject('no active connection to broker')
}
if (topic == null || typeof topic !== 'string') { // if neither then assume none and what is passed is options
options = topic || {} options = topic || {}
topic ='' topic = ''
} }
let payload return new Promise(async resolve => {
if (typeof packet !== 'string') { if (!this._connected) {
// packet is an object log.error({method:'send', line:105, url:this.url, opts:this.opts, msg:'no active connection to broker'})
topic = packet.cmd ? packet.cmd.replace(/\./gi,'/') : topic resolve({ error: 'client not connected to a broker, aborting send' })
if (packet.payload) payload = packet.payload.toString()
else {
payload = this._serialize(packet) // payload is entire packet
if (!payload) payload = this._serialize({ err: 'could not serialze packet' })
}
} else payload = packet // packet was just a string so it's the payload
if (topic.length === 0) {
log.warn({msg:'publish aborted, no topic pass nor command in packet', packet:packet, topic:topic})
return {error:'publish aborted, no topic given or no command in packet', packet:packet, topic:topic}
} }
log.debug({method:'send', line:127, msg:'mqtt publishing', payload:payload, topic:topic}) let packet = Object.assign({}, ipacket) // need to avoid mutuation for different consumers using same packet instance
return await this.publish(topic, payload, options)
packet._header = {
id: Math.random()
.toString()
.slice(2), // need this for when multiple sends for different consumers use same packet instanceack
returnTopic: `${this.sendTopic}/return`
}
topic = topic || `${this.sendTopic}/receive`
let payload = this._serialize(packet) // payload is entire packet
if (!payload) {
log.error({method:'send', line:163, msg:'not able to serilaize packet', packet:packet})
resolve({error:'not able to serilaize packet', packet:packet})
}
setTimeout(() => {resolve({ error: 'no response from socket in 10sec' })}, 10000)
this.once(packet._header.id, async reply => {
let res = await this._packetProcess(reply)
if (!res) { // if packetProcess was not promise
res = reply
log.debug({method:'send', line:180, msg:'consumer function was not promise returning further processing may be out of sequence'})
}
resolve(res) // resolves processed packet not returned packet
})
let res = await this._publish(packet,topic,options)
if (res) resolve(res)
}) // end promise
}
async _push(packet={}, topic, options) {
if (!this._connected) {
log.error({method:'send', line:105, url:this.url, opts:this.opts, msg:'no active connection to broker'})
return {error:'no active connection to broker'}
}
if (isPlainObject(topic)) { // if neither then assume none and what is passed is options
options = topic || {}
topic = ''
}
topic = topic || packet.cmd ? packet.cmd.replace(/\./gi,'/') : ''
if (!topic) {
log.warn({msg:'push aborted, no topic pass nor command in packet', packet:packet, topic:topic})
return {error:'push aborted, no topic given or no command in packet', packet:packet, topic:topic}
}
packet._header ={id:'pushed'}
log.info({method:'push', line:194, msg:'mqtt publishing a uci pushed packet', packet:packet, topic:topic})
return await this._publish(packet, topic, options)
}
// this is internal UCI publish
async _publish(packet,topic,options) {
if(!topic) return {error: 'no topic to which to publish packet'}
let payload = this._serialize(packet) // payload is entire packet
if (!payload) {
log.error({method:'send', line:163, msg:'not able to serilaize packet', packet:packet})
return {error:'not able to serilaize packet', packet:packet}
}
// all ready publish packets
log.info(({msg:'sending a uci packet vis mqtt', packet:packet,topic:topic,payload:payload, options:options}))
let [err] = await btc(this.publish)(topic,payload,options)
if (err) return {error:err}
} }
publish (...args) { publish (...args) {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
if (!this.mqtt.connected) reject('no connection to broker') if (!this._connected) reject('publish failed - no connection to broker')
this.mqtt.publish(...args, (err, result) => { this.mqtt.publish(...args, (err, result) => {
if (err) reject(err) if (err) {
reject(err)
}
else resolve(result) else resolve(result)
}) })
}) })
@ -154,7 +224,7 @@ class MQTTClient {
end (...args) { end (...args) {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
this._client.end(...args, (err, result) => { this.mqtt.end(...args, (err, result) => {
if (err) reject(err) if (err) reject(err)
else resolve(result) else resolve(result)
}) })
@ -167,7 +237,7 @@ class MQTTClient {
async _subscribe (topic, options={}) { async _subscribe (topic, options={}) {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
if (!this.mqtt.connected) reject('no connection to broker') if (!this._connected) reject('no connection to broker during subscription attempt')
if (!topic) reject('no topic supplied') if (!topic) reject('no topic supplied')
this.mqtt.subscribe(topic,options, (err, granted) => { this.mqtt.subscribe(topic,options, (err, granted) => {
if (err) reject(err) if (err) reject(err)
@ -187,45 +257,53 @@ class MQTTClient {
// TODO if client set up reply subscription with message id and then emit that for resolving send // TODO if client set up reply subscription with message id and then emit that for resolving send
// if server then take id and send message back with that appended to topic // if server then take id and send message back with that appended to topic
_listen() { _listen() {
log.info({method:'_listen', line:147, msg:`listening for incoming mqtt packets from broker on topics ${this.topics}`}) log.debug({method:'_listen', line:147, msg:`listening for incoming mqtt packets from broker on topics ${this.topics}`})
this.mqtt.on('message', messageProcess.bind(this)) this.mqtt.on('message', messageProcess.bind(this))
async function messageProcess(topic, payload) { async function messageProcess(topic, payload) {
log.debug({method:'_listen', line:153 ,msg:'incoming mqtt message to process', topic:topic, payload:payload}) log.debug({method:'_listen', line:153 ,msg:'incoming mqtt message to process', topic:topic, payload:payload})
let packet = this._handlePayload(payload) let packet = this._handlePayload(payload)
if (packet) packet.cmd = packet.cmd || topic // if payload had no command so use topic as cmd if (isPlainObject(packet)) packet.cmd = packet.cmd || topic // if payload had no command so use topic as cmd
else packet = { cmd: topic } else packet = { cmd: topic, data:packet } // payload was not json packet, create a packet and emit payload
let res = (await this._packetProcess(packet, topic)) || {} this.emit('payload',{topic:topic, payload:payload}) // emit 'payload' event for use for standard mqtt pub/sub
log.debug({method:'_listen', line:159, msg:'processed packet back to broker', res:res, topics:this.topics}) let header = Object.assign({},packet._header) // save the header in case a packet processor deletes it
if (this.topics.some(topic => isTopicMatch(topic,res.cmd || ''))) { let res = await this._packetProcess(packet)
if (this.topics.some( topic => isTopicMatch(topic,this.errorTopic))) { if (header && this.client) this.emit(header.id,packet)
log.warn({method:'_listen', line:165, msg:'unable to send error, subscried to error topic', subscribed:this.topics, errorTopic:this.errorTopic})
return
}
else { else {
res = {cmd:'error', msg:`unable to send this response, response topic '${res.cmd}'' matches a topic [${this.topics}] - subscribed to by sender`, response:res, subcribed:this.topics} res._header = header
log.warn({method:'_listen', line:168,msg:'sent mqtt error message', error:res}) log.info({method:'_listen', line:159, msg:'processed packet - return only if sent not published', processed:res})
if (res._header) {
if(res._header.returnTopic) this._publish(res,res._header.returnTopic)
} }
} }
// TODO if acting as a "server" then send back the result, if client and ID is matched then emit for send function
// if (res) this.send(res)
} }
} }
_handlePayload(payload) { _handlePayload(payload) {
let [err, packet] = btc(JSON.parse)(payload.toString()) let [err, packet] = btc(JSON.parse)(payload.toString())
if (err) log.debug({method:'_handlePayload', line:201, msg:'payload is not json returning as prop data:<payload>'}) if (err) log.debug({method:'_handlePayload', line:201, msg:'payload is not json returning as prop data:<payload>'})
if (!isPlainObject(packet)) return { data: payload.toString() } if (!isPlainObject(packet)) return payload.toString()
return packet return packet
} }
_validateTopic (topic) {
if (this.topics.some(ctopic => isTopicMatch(ctopic,topic))) {
log.warn({msg:'topic not valid for publishing, matches a subscribed topic or error topic', topic:topic, errorTopic:this.errorTopics, subscribed:this.topics})
return false
}
return true
}
// default packet process just a simple console logger. ignores any cmd: prop // default packet process just a simple console logger. ignores any cmd: prop
_packetProcess(packet, topic) { _packetProcess(packet) {
console.log('=========================') console.log('=========================')
console.log('default consumer processor\npacket from broker on topic:',topic) console.log('received packet to be processed=>',packet)
console.log('replace this by setting new function with .registerPacketProcessor') console.log('replace this by setting new function with .registerPacketProcessor')
console.dir(packet) packet.msg='the default packet processeor processed this packet'
console.log('========================') console.log('========================')
return packet
} }
async _log() { async _log() {
@ -236,14 +314,6 @@ class MQTTClient {
id: this.id id: this.id
}) })
this.mqtt.on('close', () => {
log.debug({method:'_log', line:193, msg:'connection to broker was closed'})
})
this.mqtt.on('offline', () => {
log.warn({method:'_log', line:197, msg:'this client has gone offline from broker'})
})
this.mqtt.on('packetsend', packet => { this.mqtt.on('packetsend', packet => {
log.debug({method:'_log', line:201, packet: packet, msg:'outgoing packet to mqtt broker'}) log.debug({method:'_log', line:201, packet: packet, msg:'outgoing packet to mqtt broker'})
}) })