diff --git a/example/example.js b/example/example.js index 66bb1cc..cb891ff 100644 --- a/example/example.js +++ b/example/example.js @@ -1,17 +1,19 @@ import Client from '../src/client' +import btc from 'better-try-catch' // let mqtt = new Client({id:'example-mqtt-client', url:'tcp://trantor:1883', topics:'test1'}) -let mqtt = new Client({id:'example-mqtt-client', connect:{host:'localhost', port:1883}, topics:'topic1'}) +let mqtt = new Client({id:'example-mqtt-client', topics:'lights'}) ; (async () => { - await mqtt.connect() + console.log(await mqtt.connect()) await mqtt.send({cmd:'test', status:'sent to topic1 implicitly'}) await mqtt.subscribe('topic2') await mqtt.send('topic2', {cmd:'test', status:'sent to topic2 explicitly'}) await mqtt.unsubscribe('topic2') await mqtt.send('topic2', {cmd:'test', status:'sent to topic2 with subscription'}) + // await mqtt.subscribe('lights') })().catch(err => { diff --git a/package.json b/package.json index c91314b..785193c 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@uci/mqtt", - "version": "0.0.5", + "version": "0.1.0", "description": "mqtt client with json payloads and mqtt custom broker", "main": "src", "scripts": { @@ -29,7 +29,7 @@ "@uci/logger": "^0.0.3", "async-mqtt": "^1.0.1", "better-try-catch": "^0.6.2", - "lodash.isarray": "^4.0.0", + "is-plain-object": "^2.0.4", "lodash.merge": "^4.6.1", "lodash.union": "^4.6.0", "lodash.xor": "^4.5.0" diff --git a/src/client.js b/src/client.js index ac3b9af..adc8419 100644 --- a/src/client.js +++ b/src/client.js @@ -4,50 +4,53 @@ import btc from 'better-try-catch' import union from 'lodash.union' import xor from 'lodash.xor' import logger from '@uci/logger' +import isPlainObject from 'is-plain-object' let log = {} export default class Client { constructor (opts={}) { this.id = opts.id || opts.name || 'mqtt:'+ new Date().getTime() - this.url = opts.url + this.url = opts.url || '' // subscription topics can be string of commna delimited or array of strings see object see mqtt.js docs this.topics = Array.isArray(opts.topics) ? opts.topics : (opts.topics ? opts.topics.split(',') : [this.id]) this.opts = opts.connect || {} // see options for new mqtt.Client // self bindings this.connect = this.connect.bind(this) + this.push = this.send } async connect () { return new Promise( (resolve,reject) => { - // connect returns connected mqtt client instance so merge it to class - // console.log(this) let mqtt = connect(this.url,this.opts) - this._extend(mqtt,'subscribe,unsubscribe') - this._log() - this._listen() - - this.on('reconnect', () => { - log.info('mqtt client reconnected to broker' ) - this.subscribe(this.topics) - }) + this._extend(mqtt,'subscribe,unsubscribe') // merge mqtt client into class extend with given functions + let timeout = this.opts.connectTimeout || 5000 + setTimeout(()=>{ + reject(`ending mqtt connection attempt, no broker at ${this._client.options.hostname}:${this._client.options.port}`) + },timeout) this.once('connect', () => { - this.subscribe(this.topics) - log.info('mqtt client connected to broker' ) - resolve(`mqtt client connected to broker at ${this._client.options.host}`) - }) + this._log() + this._listen() - this.on('error', err => { - log.fatal({err:err},'connection error to broker' ) - console.log('connection error',err.code) - reject(err) + this.on('reconnect', () => { + log.info('mqtt client reconnected to broker' ) + this.subscribe(this.topics) + }) + + this.on('error', err => { + log.fatal({err:err},'connection error to broker' ) + console.log('connection error',err.code) + }) + + this.subscribe(this.topics) + log.info({options:this._client.options},`mqtt client connected to broker at ${this._client.options.hostname}:${this._client.options.port}`) + resolve(`mqtt client connected to broker at ${this._client.options.hostname}:${this._client.options.port}`) }) }) //end promise - } async subscribe(topic,options={}) { @@ -55,6 +58,7 @@ export default class Client { if (typeof topic==='string') topic = topic.split(',') this.topics=union(this.topics,topic) } + log.info(`subscription for topic ${topic} added`) this._subscribe(topic,options) } @@ -66,23 +70,24 @@ export default class Client { this._unsubscribe(topic) } - - async send(topics,payload,options) { + async send(packet,topics,options) { if (typeof topics !=='string' && !Array.isArray(topics)) { - payload = topics + options = topics topics = this.topics + } else { if (typeof topics ==='string') topics=topics.split(',') } + let payload = {} + if (typeof packet !== 'string') { + payload = this._serialize(packet) + if (!payload) payload = this._serialize({ err: 'could not serialze packet'}) + topics = [packet.cmd] || topics // if payload has a cmd use that as topic } - if (typeof topics ==='string') topics = topics.split(',') - // console.log('topics using', topics) - let serial = this._serialize(payload) - if (serial) { - let pubs = [] - topics.forEach( async topic => { - pubs.push(this.publish(topic,serial,options)) - }) - return await Promise.all(pubs) - - } + else payload = this._serialize({ data:packet }) + let pubs = [] + topics.forEach( async topic => { + log.info(`sending ${payload} to topic ${topic} with options ${options}`) + pubs.push(this.publish(topic,payload,options)) + }) + return await Promise.all(pubs) } registerPacketProcessor (func) { @@ -91,34 +96,33 @@ export default class Client { _serialize(json) { let [err,payload] = btc(JSON.stringify)(json) - if (err) { // await mqtt.unsubscribe('test2') - // await mqtt.send({cmd:'test', status:'I\'m good'}) - // console.log('+++++++++++++++') - log.warn(`error unable to stringify json:${json} - send aborted`) + if (err) { + log.warn(`error unable to stringify json:${json}`) return null } return payload } _listen () { - log.info('listening for incoming packets from broker') + log.info(`listening for incoming packets from broker on topics ${this.topics}`) this.on('message',messageProcess.bind(this)) async function messageProcess (topic,payload) { - log.info('incoming messeage on topic', topic) + log.info('incoming message on topic', topic) let packet = this._handlePayload(payload) - if (packet) await this._packetProcess (packet,topic) - + if(packet) packet.cmd = packet.cmd || topic // payload had no command that use topic as cmd + else packet = {cmd:topic} + let res = await this._packetProcess(packet,topic) || {} + this.send(res) } } _handlePayload (payload) { let [err,packet] = btc(JSON.parse)(payload.toString()) if (err) { - log.fatal({payload:payload},'Could not parse JSON of payload') - // console.log('Could not parse JSON of payload', payload.toString()) - return null + log.info('payload not json returning as prop data:') } + if (!isPlainObject(packet)) packet = { data:payload.toString() } return packet } @@ -129,8 +133,6 @@ export default class Client { console.dir(packet) console.log(packet.status) console.log('========================') - // await mqtt.send({cmd:'test', status:'I\'m good'}) - // console.log('+++++++++++++++')=') } async _log() {