diff --git a/.eslintrc.js b/.eslintrc.js new file mode 100644 index 0000000..2bed546 --- /dev/null +++ b/.eslintrc.js @@ -0,0 +1,33 @@ +module.exports = { + "ecmaFeatures": { + "modules": true, + "spread" : true, + "restParams" : true + }, + "env": { + "es6": true, + "node": true, + "mocha": true + }, + "parserOptions": { + "ecmaVersion": 2017, + "sourceType": "module" + }, + "extends": "eslint:recommended", + "rules": { + "indent": [ + "error", + 2 + ], + "no-console": 0, + "semi": ["error", "never"], + "linebreak-style": [ + "error", + "unix" + ], + "quotes": [ + "error", + "single" + ] + } +} diff --git a/example/example.mjs b/example/example.mjs new file mode 100644 index 0000000..4791682 --- /dev/null +++ b/example/example.mjs @@ -0,0 +1,19 @@ +import Client from '../src/client' + +// let mqtt = new Client({id:'example-mqtt-client', url:'tcp://trantor:1883', topics:'test1'}) +let mqtt = new Client({id:'example-mqtt-client', connect:{host:'localhost', port:1883}, topics:'test1'}) + +; +(async () => { + + await mqtt.connect() + await mqtt.send({cmd:'test', status:'I\'m good'}) + await mqtt.subscribe('test2',{pub:true}) + await mqtt.send({cmd:'test', status:'I\'m good'}) + await mqtt.unsubscribe('test2') + await mqtt.send({cmd:'test', status:'I\'m good'}) + + +})().catch(err => { + console.error('FATAL: UNABLE TO START SYSTEM!\n',err) +}) diff --git a/index.js b/index.js deleted file mode 100644 index 90ae21c..0000000 --- a/index.js +++ /dev/null @@ -1,8 +0,0 @@ -let opts = { - dirname: __dirname + '/lib', - // http://stackoverflow.com/questions/2078915/a-regular-expression-to-exclude-a-word-string - filter: /^(?!index)([^\.].*)\.js?$/, - recursive: false, - merge: true // remove or comment to have each file in /lib be a prop/key in library...see node-require-all -} -module.exports = require('require-all')(opts); diff --git a/package.json b/package.json index 9b8e62a..43492c2 100644 --- a/package.json +++ b/package.json @@ -1,14 +1,14 @@ { - "name": "@uci/changeme", + "name": "@uci/mqtt", "version": "0.0.1", - "description": "A template for a starting a uci package", - "main": "index.js", + "description": "mqtt client with json payloads and mqtt custom broker", + "main": "src/index.js", "scripts": { - "test": "./node_modules/.bin/mocha --reporter list --watch --timeout 30000", - "testd": "DEBUG='1:*' ./node_modules/.bin/mocha --reporter list --watch --timeout 30000", - "testd2": "DEBUG='1:*,2:*' ./node_modules/.bin/mocha --reporter list --watch --timeout 30000", - "testd3": "DEBUG='1:*,2:*,3:*' ./node_modules/.bin/mocha --reporter list --watch --timeout 30000", - "testibc": "istanbul cover ./node_modules/.bin/_mocha test/ --report lcovonly -- -R spec --recursive && codecov || true" + "testw": "mocha -r @std/esm test/*.test.mjs --watch --recurse --watch-extensions mjs", + "test": "mocha -r @std/esm test/*.test.mjs", + "testci": "istanbul cover ./node_modules/.bin/_mocha --report lcovonly -- -R spec --recursive && codecov || true", + "example": "node -r @std/esm example/example", + "dev": "./node_modules/.bin/nodemon -r @std/esm -e mjs example/example" }, "author": "David Kebler", "license": "MIT", @@ -26,12 +26,22 @@ }, "homepage": "https://github.com/uCOMmandIt/uci-changeme#readme", "dependencies": { + "@uci/logger": "0.0.1", + "async-mqtt": "^1.0.1", + "better-try-catch": "^0.6.2", + "lodash.isarray": "^4.0.0", + "lodash.merge": "^4.6.1", + "lodash.union": "^4.6.0", + "lodash.xor": "^4.5.0" }, + "@std/esm": "cjs", "devDependencies": { - "chai": "^3.5.0", - "codecov": "^1.0.1", - "debug": "^2.6.8", + "@std/esm": "^0.18.0", + "nodemon": "^1.14.3", + "chai": "^4.1.2", + "chai-as-promised": "^7.1.1", + "codecov": "^3.0.0", "istanbul": "^0.4.5", - "mocha": "^3.x" + "mocha": "^4.0.1" } } diff --git a/src/amodule.js b/src/amodule.js deleted file mode 100644 index ab85667..0000000 --- a/src/amodule.js +++ /dev/null @@ -1,22 +0,0 @@ -'use strict' - -// ********************************** - -const - Something = require('apackagemodule') - -class aClass extends Something { - constructor(opts = {}) { - super() - this.id = opts.id // must be unique within a bus - this.desc = opts.desc - - } - - hello(what) { return 'Hello ' + what } - -} - -module.exports = { - aClass -} diff --git a/src/broker.mjs b/src/broker.mjs new file mode 100644 index 0000000..e69de29 diff --git a/src/client.mjs b/src/client.mjs new file mode 100644 index 0000000..213dcb7 --- /dev/null +++ b/src/client.mjs @@ -0,0 +1,184 @@ +import { connect } from 'async-mqtt' +import merge from 'lodash.merge' +import btc from 'better-try-catch' +import isArray from 'lodash.isarray' +import union from 'lodash.union' +import xor from 'lodash.xor' +import logger from '@uci/logger' + +let log = {} + + +export default class Client { + constructor (opts={}) { + this.id = opts.id || opts.name || 'mqtt:'+ new Date().getTime() + this.url = opts.url + // subscription topics can be string of commna delimited or array of strings see object see mqtt.js docs + this.topics = isArray(opts.topics) ? opts.topic : (opts.topics.split(',') || ['default']) + this.opts = opts.connect || {} // see options for new mqtt.Client + // self bindings + this.connect = this.connect.bind(this) + } + + async connect () { + + return new Promise( (resolve,reject) => { + + // connect returns connected mqtt client instance so merge it to class + // console.log(this) + let mqtt = connect(this.url,this.opts) + this._extend(mqtt,'subscribe,unsubscribe') + this._log() + this._listen() + + this.on('reconnect', () => { + log.info('mqtt client reconnected to broker' ) + this.subscribe(this.topics) + }) + + this.once('connect', () => { + this.subscribe(this.topics) + log.info('mqtt client connected to broker' ) + resolve(`mqtt client connected to broker at ${this._client.options.host}`) + }) + + this.on('error', err => { + log.fatal({err:err},'connection error to broker' ) + console.log('connection error',err.code) + reject(err) + }) + + }) //end promise + + } + + async subscribe(topic,options={}) { + if (options.pub) { + if (typeof topic==='string') topic = topic.split(',') + this.topics=union(this.topics,topic) + } + this._subscribe(topic,options) + } + + async unsubscribe(topic,options={}) { + if (!options.pub) { + if (typeof topic==='string') topic = topic.split(',') + this.topics=xor(this.topics,topic) + } + this._unsubscribe(topic) + } + + + async send(topics,payload,options) { + if (typeof topics !=='string'|| !isArray(topics)) { + payload = topics + topics = this.topics + } + if (typeof topics ==='string') topics = topics.split(',') + let serial = this._serialize(payload) + if (serial) { + let pubs = [] + topics.forEach( async topic => { + pubs.push(this.publish(topic,serial,options)) + }) + return await Promise.all(pubs) + + } + } + + registerPacketProcessor (func) { + this._packetProcess = func + } + + _serialize(json) { + let [err,payload] = btc(JSON.stringify)(json) + if (err) { // await mqtt.unsubscribe('test2') + // await mqtt.send({cmd:'test', status:'I\'m good'}) + // console.log('+++++++++++++++') + log.warn(`error unable to stringify json:${json} - send aborted`) + return null + } + return payload + } + + _listen () { + log.info('listening for incoming packets from broker') + this.on('message',messageProcess.bind(this)) + + async function messageProcess (topic,payload) { + // console.log('incoming messeage on topic', topic) + let packet = this._handlePayload(payload) + if (packet) await this._packetProcess (packet,topic) + + } + } + + _handlePayload (payload) { + let [err,packet] = btc(JSON.parse)(payload.toString()) + if (err) { + log.fatal({payload:payload},'Could not parse JSON of payload') + console.log('Could not parse JSON of payload', payload.toString()) + return null + } + return packet + } + + // default packet process just a simple console logger. ignores any cmd: prop + _packetProcess (packet,topic) { + console.log('=========================') + console.log('default consumer processor\npacket from broker on topic:',topic) + console.dir(packet) + console.log(packet.status) + console.log('========================') + // await mqtt.send({cmd:'test', status:'I\'m good'}) + // console.log('+++++++++++++++')=') + } + + async _log() { + + const LOG_OPTS = { + repo:'uci-mqtt', + npm:'@uci/mqtt', + file:'src/client.mjs', + class:'Client', + id:this.id, + instance_created:new Date().getTime(), + mqtt: this._client.options // await mqtt.unsubscribe('test2') + // await mqtt.send({cmd:'test', status:'I\'m good'}) + // console.log('+++++++++++++++') + } + + log = logger.child(LOG_OPTS) + + this.on('close', () => { + log.info('connection to broker was closed') + }) + + this.on('offline', () => { + log.info('this client has gone offline from broker') + }) + + this.on('packetsend', packet => { + log.info({packet:packet},'outgoing packet to mqtt broker') + }) + + this.on('packetreceive', packet => { + log.info({packet:packet},'incoming packet from mqtt broker') + }) + + } + + _extend (obj, funcs) { + let temp = {} + funcs = funcs.split(',') + funcs.forEach(func => { + temp[func] = this[func] + }) + merge(this,obj) + funcs.forEach(func => { + this['_'+func]=this[func] + this[func] = temp[func] + }) + } + +} // end mqtt client class diff --git a/src/index.mjs b/src/index.mjs new file mode 100644 index 0000000..8ef5b60 --- /dev/null +++ b/src/index.mjs @@ -0,0 +1,6 @@ +import Client from './client' +// import Broker from './broker' + +export { Client as Client } +// export { Broker as Broker } +export default Client