now process topic to cmd prop and vice versa so it is compatible with uci packets

master
David Kebler 2018-05-24 12:31:47 -07:00
parent 8ac552e9a8
commit f08e361d2e
3 changed files with 55 additions and 51 deletions

View File

@ -1,17 +1,19 @@
import Client from '../src/client' import Client from '../src/client'
import btc from 'better-try-catch'
// let mqtt = new Client({id:'example-mqtt-client', url:'tcp://trantor:1883', topics:'test1'}) // let mqtt = new Client({id:'example-mqtt-client', url:'tcp://trantor:1883', topics:'test1'})
let mqtt = new Client({id:'example-mqtt-client', connect:{host:'localhost', port:1883}, topics:'topic1'}) let mqtt = new Client({id:'example-mqtt-client', topics:'lights'})
; ;
(async () => { (async () => {
await mqtt.connect() console.log(await mqtt.connect())
await mqtt.send({cmd:'test', status:'sent to topic1 implicitly'}) await mqtt.send({cmd:'test', status:'sent to topic1 implicitly'})
await mqtt.subscribe('topic2') await mqtt.subscribe('topic2')
await mqtt.send('topic2', {cmd:'test', status:'sent to topic2 explicitly'}) await mqtt.send('topic2', {cmd:'test', status:'sent to topic2 explicitly'})
await mqtt.unsubscribe('topic2') await mqtt.unsubscribe('topic2')
await mqtt.send('topic2', {cmd:'test', status:'sent to topic2 with subscription'}) await mqtt.send('topic2', {cmd:'test', status:'sent to topic2 with subscription'})
// await mqtt.subscribe('lights')
})().catch(err => { })().catch(err => {

View File

@ -1,6 +1,6 @@
{ {
"name": "@uci/mqtt", "name": "@uci/mqtt",
"version": "0.0.5", "version": "0.1.0",
"description": "mqtt client with json payloads and mqtt custom broker", "description": "mqtt client with json payloads and mqtt custom broker",
"main": "src", "main": "src",
"scripts": { "scripts": {
@ -29,7 +29,7 @@
"@uci/logger": "^0.0.3", "@uci/logger": "^0.0.3",
"async-mqtt": "^1.0.1", "async-mqtt": "^1.0.1",
"better-try-catch": "^0.6.2", "better-try-catch": "^0.6.2",
"lodash.isarray": "^4.0.0", "is-plain-object": "^2.0.4",
"lodash.merge": "^4.6.1", "lodash.merge": "^4.6.1",
"lodash.union": "^4.6.0", "lodash.union": "^4.6.0",
"lodash.xor": "^4.5.0" "lodash.xor": "^4.5.0"

View File

@ -4,50 +4,53 @@ import btc from 'better-try-catch'
import union from 'lodash.union' import union from 'lodash.union'
import xor from 'lodash.xor' import xor from 'lodash.xor'
import logger from '@uci/logger' import logger from '@uci/logger'
import isPlainObject from 'is-plain-object'
let log = {} let log = {}
export default class Client { export default class Client {
constructor (opts={}) { constructor (opts={}) {
this.id = opts.id || opts.name || 'mqtt:'+ new Date().getTime() this.id = opts.id || opts.name || 'mqtt:'+ new Date().getTime()
this.url = opts.url this.url = opts.url || ''
// subscription topics can be string of commna delimited or array of strings see object see mqtt.js docs // subscription topics can be string of commna delimited or array of strings see object see mqtt.js docs
this.topics = Array.isArray(opts.topics) ? opts.topics : (opts.topics ? opts.topics.split(',') : [this.id]) this.topics = Array.isArray(opts.topics) ? opts.topics : (opts.topics ? opts.topics.split(',') : [this.id])
this.opts = opts.connect || {} // see options for new mqtt.Client this.opts = opts.connect || {} // see options for new mqtt.Client
// self bindings // self bindings
this.connect = this.connect.bind(this) this.connect = this.connect.bind(this)
this.push = this.send
} }
async connect () { async connect () {
return new Promise( (resolve,reject) => { return new Promise( (resolve,reject) => {
// connect returns connected mqtt client instance so merge it to class
// console.log(this)
let mqtt = connect(this.url,this.opts) let mqtt = connect(this.url,this.opts)
this._extend(mqtt,'subscribe,unsubscribe') this._extend(mqtt,'subscribe,unsubscribe') // merge mqtt client into class extend with given functions
this._log() let timeout = this.opts.connectTimeout || 5000
this._listen() setTimeout(()=>{
reject(`ending mqtt connection attempt, no broker at ${this._client.options.hostname}:${this._client.options.port}`)
this.on('reconnect', () => { },timeout)
log.info('mqtt client reconnected to broker' )
this.subscribe(this.topics)
})
this.once('connect', () => { this.once('connect', () => {
this.subscribe(this.topics) this._log()
log.info('mqtt client connected to broker' ) this._listen()
resolve(`mqtt client connected to broker at ${this._client.options.host}`)
})
this.on('error', err => { this.on('reconnect', () => {
log.fatal({err:err},'connection error to broker' ) log.info('mqtt client reconnected to broker' )
console.log('connection error',err.code) this.subscribe(this.topics)
reject(err) })
this.on('error', err => {
log.fatal({err:err},'connection error to broker' )
console.log('connection error',err.code)
})
this.subscribe(this.topics)
log.info({options:this._client.options},`mqtt client connected to broker at ${this._client.options.hostname}:${this._client.options.port}`)
resolve(`mqtt client connected to broker at ${this._client.options.hostname}:${this._client.options.port}`)
}) })
}) //end promise }) //end promise
} }
async subscribe(topic,options={}) { async subscribe(topic,options={}) {
@ -55,6 +58,7 @@ export default class Client {
if (typeof topic==='string') topic = topic.split(',') if (typeof topic==='string') topic = topic.split(',')
this.topics=union(this.topics,topic) this.topics=union(this.topics,topic)
} }
log.info(`subscription for topic ${topic} added`)
this._subscribe(topic,options) this._subscribe(topic,options)
} }
@ -66,23 +70,24 @@ export default class Client {
this._unsubscribe(topic) this._unsubscribe(topic)
} }
async send(packet,topics,options) {
async send(topics,payload,options) {
if (typeof topics !=='string' && !Array.isArray(topics)) { if (typeof topics !=='string' && !Array.isArray(topics)) {
payload = topics options = topics
topics = this.topics topics = this.topics
} else { if (typeof topics ==='string') topics=topics.split(',') }
let payload = {}
if (typeof packet !== 'string') {
payload = this._serialize(packet)
if (!payload) payload = this._serialize({ err: 'could not serialze packet'})
topics = [packet.cmd] || topics // if payload has a cmd use that as topic
} }
if (typeof topics ==='string') topics = topics.split(',') else payload = this._serialize({ data:packet })
// console.log('topics using', topics) let pubs = []
let serial = this._serialize(payload) topics.forEach( async topic => {
if (serial) { log.info(`sending ${payload} to topic ${topic} with options ${options}`)
let pubs = [] pubs.push(this.publish(topic,payload,options))
topics.forEach( async topic => { })
pubs.push(this.publish(topic,serial,options)) return await Promise.all(pubs)
})
return await Promise.all(pubs)
}
} }
registerPacketProcessor (func) { registerPacketProcessor (func) {
@ -91,34 +96,33 @@ export default class Client {
_serialize(json) { _serialize(json) {
let [err,payload] = btc(JSON.stringify)(json) let [err,payload] = btc(JSON.stringify)(json)
if (err) { // await mqtt.unsubscribe('test2') if (err) {
// await mqtt.send({cmd:'test', status:'I\'m good'}) log.warn(`error unable to stringify json:${json}`)
// console.log('+++++++++++++++')
log.warn(`error unable to stringify json:${json} - send aborted`)
return null return null
} }
return payload return payload
} }
_listen () { _listen () {
log.info('listening for incoming packets from broker') log.info(`listening for incoming packets from broker on topics ${this.topics}`)
this.on('message',messageProcess.bind(this)) this.on('message',messageProcess.bind(this))
async function messageProcess (topic,payload) { async function messageProcess (topic,payload) {
log.info('incoming messeage on topic', topic) log.info('incoming message on topic', topic)
let packet = this._handlePayload(payload) let packet = this._handlePayload(payload)
if (packet) await this._packetProcess (packet,topic) if(packet) packet.cmd = packet.cmd || topic // payload had no command that use topic as cmd
else packet = {cmd:topic}
let res = await this._packetProcess(packet,topic) || {}
this.send(res)
} }
} }
_handlePayload (payload) { _handlePayload (payload) {
let [err,packet] = btc(JSON.parse)(payload.toString()) let [err,packet] = btc(JSON.parse)(payload.toString())
if (err) { if (err) {
log.fatal({payload:payload},'Could not parse JSON of payload') log.info('payload not json returning as prop data:<payload>')
// console.log('Could not parse JSON of payload', payload.toString())
return null
} }
if (!isPlainObject(packet)) packet = { data:payload.toString() }
return packet return packet
} }
@ -129,8 +133,6 @@ export default class Client {
console.dir(packet) console.dir(packet)
console.log(packet.status) console.log(packet.status)
console.log('========================') console.log('========================')
// await mqtt.send({cmd:'test', status:'I\'m good'})
// console.log('+++++++++++++++')=')
} }
async _log() { async _log() {