parent
ddbc8e7aac
commit
c8071ed66e
|
@ -0,0 +1,33 @@
|
||||||
|
module.exports = {
|
||||||
|
"ecmaFeatures": {
|
||||||
|
"modules": true,
|
||||||
|
"spread" : true,
|
||||||
|
"restParams" : true
|
||||||
|
},
|
||||||
|
"env": {
|
||||||
|
"es6": true,
|
||||||
|
"node": true,
|
||||||
|
"mocha": true
|
||||||
|
},
|
||||||
|
"parserOptions": {
|
||||||
|
"ecmaVersion": 2017,
|
||||||
|
"sourceType": "module"
|
||||||
|
},
|
||||||
|
"extends": "eslint:recommended",
|
||||||
|
"rules": {
|
||||||
|
"indent": [
|
||||||
|
"error",
|
||||||
|
2
|
||||||
|
],
|
||||||
|
"no-console": 0,
|
||||||
|
"semi": ["error", "never"],
|
||||||
|
"linebreak-style": [
|
||||||
|
"error",
|
||||||
|
"unix"
|
||||||
|
],
|
||||||
|
"quotes": [
|
||||||
|
"error",
|
||||||
|
"single"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,19 @@
|
||||||
|
import Client from '../src/client'
|
||||||
|
|
||||||
|
// let mqtt = new Client({id:'example-mqtt-client', url:'tcp://trantor:1883', topics:'test1'})
|
||||||
|
let mqtt = new Client({id:'example-mqtt-client', connect:{host:'localhost', port:1883}, topics:'test1'})
|
||||||
|
|
||||||
|
;
|
||||||
|
(async () => {
|
||||||
|
|
||||||
|
await mqtt.connect()
|
||||||
|
await mqtt.send({cmd:'test', status:'I\'m good'})
|
||||||
|
await mqtt.subscribe('test2',{pub:true})
|
||||||
|
await mqtt.send({cmd:'test', status:'I\'m good'})
|
||||||
|
await mqtt.unsubscribe('test2')
|
||||||
|
await mqtt.send({cmd:'test', status:'I\'m good'})
|
||||||
|
|
||||||
|
|
||||||
|
})().catch(err => {
|
||||||
|
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)
|
||||||
|
})
|
8
index.js
8
index.js
|
@ -1,8 +0,0 @@
|
||||||
let opts = {
|
|
||||||
dirname: __dirname + '/lib',
|
|
||||||
// http://stackoverflow.com/questions/2078915/a-regular-expression-to-exclude-a-word-string
|
|
||||||
filter: /^(?!index)([^\.].*)\.js?$/,
|
|
||||||
recursive: false,
|
|
||||||
merge: true // remove or comment to have each file in /lib be a prop/key in library...see node-require-all
|
|
||||||
}
|
|
||||||
module.exports = require('require-all')(opts);
|
|
34
package.json
34
package.json
|
@ -1,14 +1,14 @@
|
||||||
{
|
{
|
||||||
"name": "@uci/changeme",
|
"name": "@uci/mqtt",
|
||||||
"version": "0.0.1",
|
"version": "0.0.1",
|
||||||
"description": "A template for a starting a uci package",
|
"description": "mqtt client with json payloads and mqtt custom broker",
|
||||||
"main": "index.js",
|
"main": "src/index.js",
|
||||||
"scripts": {
|
"scripts": {
|
||||||
"test": "./node_modules/.bin/mocha --reporter list --watch --timeout 30000",
|
"testw": "mocha -r @std/esm test/*.test.mjs --watch --recurse --watch-extensions mjs",
|
||||||
"testd": "DEBUG='1:*' ./node_modules/.bin/mocha --reporter list --watch --timeout 30000",
|
"test": "mocha -r @std/esm test/*.test.mjs",
|
||||||
"testd2": "DEBUG='1:*,2:*' ./node_modules/.bin/mocha --reporter list --watch --timeout 30000",
|
"testci": "istanbul cover ./node_modules/.bin/_mocha --report lcovonly -- -R spec --recursive && codecov || true",
|
||||||
"testd3": "DEBUG='1:*,2:*,3:*' ./node_modules/.bin/mocha --reporter list --watch --timeout 30000",
|
"example": "node -r @std/esm example/example",
|
||||||
"testibc": "istanbul cover ./node_modules/.bin/_mocha test/ --report lcovonly -- -R spec --recursive && codecov || true"
|
"dev": "./node_modules/.bin/nodemon -r @std/esm -e mjs example/example"
|
||||||
},
|
},
|
||||||
"author": "David Kebler",
|
"author": "David Kebler",
|
||||||
"license": "MIT",
|
"license": "MIT",
|
||||||
|
@ -26,12 +26,22 @@
|
||||||
},
|
},
|
||||||
"homepage": "https://github.com/uCOMmandIt/uci-changeme#readme",
|
"homepage": "https://github.com/uCOMmandIt/uci-changeme#readme",
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
|
"@uci/logger": "0.0.1",
|
||||||
|
"async-mqtt": "^1.0.1",
|
||||||
|
"better-try-catch": "^0.6.2",
|
||||||
|
"lodash.isarray": "^4.0.0",
|
||||||
|
"lodash.merge": "^4.6.1",
|
||||||
|
"lodash.union": "^4.6.0",
|
||||||
|
"lodash.xor": "^4.5.0"
|
||||||
},
|
},
|
||||||
|
"@std/esm": "cjs",
|
||||||
"devDependencies": {
|
"devDependencies": {
|
||||||
"chai": "^3.5.0",
|
"@std/esm": "^0.18.0",
|
||||||
"codecov": "^1.0.1",
|
"nodemon": "^1.14.3",
|
||||||
"debug": "^2.6.8",
|
"chai": "^4.1.2",
|
||||||
|
"chai-as-promised": "^7.1.1",
|
||||||
|
"codecov": "^3.0.0",
|
||||||
"istanbul": "^0.4.5",
|
"istanbul": "^0.4.5",
|
||||||
"mocha": "^3.x"
|
"mocha": "^4.0.1"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,22 +0,0 @@
|
||||||
'use strict'
|
|
||||||
|
|
||||||
// **********************************
|
|
||||||
|
|
||||||
const
|
|
||||||
Something = require('apackagemodule')
|
|
||||||
|
|
||||||
class aClass extends Something {
|
|
||||||
constructor(opts = {}) {
|
|
||||||
super()
|
|
||||||
this.id = opts.id // must be unique within a bus
|
|
||||||
this.desc = opts.desc
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
hello(what) { return 'Hello ' + what }
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
module.exports = {
|
|
||||||
aClass
|
|
||||||
}
|
|
|
@ -0,0 +1,184 @@
|
||||||
|
import { connect } from 'async-mqtt'
|
||||||
|
import merge from 'lodash.merge'
|
||||||
|
import btc from 'better-try-catch'
|
||||||
|
import isArray from 'lodash.isarray'
|
||||||
|
import union from 'lodash.union'
|
||||||
|
import xor from 'lodash.xor'
|
||||||
|
import logger from '@uci/logger'
|
||||||
|
|
||||||
|
let log = {}
|
||||||
|
|
||||||
|
|
||||||
|
export default class Client {
|
||||||
|
constructor (opts={}) {
|
||||||
|
this.id = opts.id || opts.name || 'mqtt:'+ new Date().getTime()
|
||||||
|
this.url = opts.url
|
||||||
|
// subscription topics can be string of commna delimited or array of strings see object see mqtt.js docs
|
||||||
|
this.topics = isArray(opts.topics) ? opts.topic : (opts.topics.split(',') || ['default'])
|
||||||
|
this.opts = opts.connect || {} // see options for new mqtt.Client
|
||||||
|
// self bindings
|
||||||
|
this.connect = this.connect.bind(this)
|
||||||
|
}
|
||||||
|
|
||||||
|
async connect () {
|
||||||
|
|
||||||
|
return new Promise( (resolve,reject) => {
|
||||||
|
|
||||||
|
// connect returns connected mqtt client instance so merge it to class
|
||||||
|
// console.log(this)
|
||||||
|
let mqtt = connect(this.url,this.opts)
|
||||||
|
this._extend(mqtt,'subscribe,unsubscribe')
|
||||||
|
this._log()
|
||||||
|
this._listen()
|
||||||
|
|
||||||
|
this.on('reconnect', () => {
|
||||||
|
log.info('mqtt client reconnected to broker' )
|
||||||
|
this.subscribe(this.topics)
|
||||||
|
})
|
||||||
|
|
||||||
|
this.once('connect', () => {
|
||||||
|
this.subscribe(this.topics)
|
||||||
|
log.info('mqtt client connected to broker' )
|
||||||
|
resolve(`mqtt client connected to broker at ${this._client.options.host}`)
|
||||||
|
})
|
||||||
|
|
||||||
|
this.on('error', err => {
|
||||||
|
log.fatal({err:err},'connection error to broker' )
|
||||||
|
console.log('connection error',err.code)
|
||||||
|
reject(err)
|
||||||
|
})
|
||||||
|
|
||||||
|
}) //end promise
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
async subscribe(topic,options={}) {
|
||||||
|
if (options.pub) {
|
||||||
|
if (typeof topic==='string') topic = topic.split(',')
|
||||||
|
this.topics=union(this.topics,topic)
|
||||||
|
}
|
||||||
|
this._subscribe(topic,options)
|
||||||
|
}
|
||||||
|
|
||||||
|
async unsubscribe(topic,options={}) {
|
||||||
|
if (!options.pub) {
|
||||||
|
if (typeof topic==='string') topic = topic.split(',')
|
||||||
|
this.topics=xor(this.topics,topic)
|
||||||
|
}
|
||||||
|
this._unsubscribe(topic)
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
async send(topics,payload,options) {
|
||||||
|
if (typeof topics !=='string'|| !isArray(topics)) {
|
||||||
|
payload = topics
|
||||||
|
topics = this.topics
|
||||||
|
}
|
||||||
|
if (typeof topics ==='string') topics = topics.split(',')
|
||||||
|
let serial = this._serialize(payload)
|
||||||
|
if (serial) {
|
||||||
|
let pubs = []
|
||||||
|
topics.forEach( async topic => {
|
||||||
|
pubs.push(this.publish(topic,serial,options))
|
||||||
|
})
|
||||||
|
return await Promise.all(pubs)
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
registerPacketProcessor (func) {
|
||||||
|
this._packetProcess = func
|
||||||
|
}
|
||||||
|
|
||||||
|
_serialize(json) {
|
||||||
|
let [err,payload] = btc(JSON.stringify)(json)
|
||||||
|
if (err) { // await mqtt.unsubscribe('test2')
|
||||||
|
// await mqtt.send({cmd:'test', status:'I\'m good'})
|
||||||
|
// console.log('+++++++++++++++')
|
||||||
|
log.warn(`error unable to stringify json:${json} - send aborted`)
|
||||||
|
return null
|
||||||
|
}
|
||||||
|
return payload
|
||||||
|
}
|
||||||
|
|
||||||
|
_listen () {
|
||||||
|
log.info('listening for incoming packets from broker')
|
||||||
|
this.on('message',messageProcess.bind(this))
|
||||||
|
|
||||||
|
async function messageProcess (topic,payload) {
|
||||||
|
// console.log('incoming messeage on topic', topic)
|
||||||
|
let packet = this._handlePayload(payload)
|
||||||
|
if (packet) await this._packetProcess (packet,topic)
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_handlePayload (payload) {
|
||||||
|
let [err,packet] = btc(JSON.parse)(payload.toString())
|
||||||
|
if (err) {
|
||||||
|
log.fatal({payload:payload},'Could not parse JSON of payload')
|
||||||
|
console.log('Could not parse JSON of payload', payload.toString())
|
||||||
|
return null
|
||||||
|
}
|
||||||
|
return packet
|
||||||
|
}
|
||||||
|
|
||||||
|
// default packet process just a simple console logger. ignores any cmd: prop
|
||||||
|
_packetProcess (packet,topic) {
|
||||||
|
console.log('=========================')
|
||||||
|
console.log('default consumer processor\npacket from broker on topic:',topic)
|
||||||
|
console.dir(packet)
|
||||||
|
console.log(packet.status)
|
||||||
|
console.log('========================')
|
||||||
|
// await mqtt.send({cmd:'test', status:'I\'m good'})
|
||||||
|
// console.log('+++++++++++++++')=')
|
||||||
|
}
|
||||||
|
|
||||||
|
async _log() {
|
||||||
|
|
||||||
|
const LOG_OPTS = {
|
||||||
|
repo:'uci-mqtt',
|
||||||
|
npm:'@uci/mqtt',
|
||||||
|
file:'src/client.mjs',
|
||||||
|
class:'Client',
|
||||||
|
id:this.id,
|
||||||
|
instance_created:new Date().getTime(),
|
||||||
|
mqtt: this._client.options // await mqtt.unsubscribe('test2')
|
||||||
|
// await mqtt.send({cmd:'test', status:'I\'m good'})
|
||||||
|
// console.log('+++++++++++++++')
|
||||||
|
}
|
||||||
|
|
||||||
|
log = logger.child(LOG_OPTS)
|
||||||
|
|
||||||
|
this.on('close', () => {
|
||||||
|
log.info('connection to broker was closed')
|
||||||
|
})
|
||||||
|
|
||||||
|
this.on('offline', () => {
|
||||||
|
log.info('this client has gone offline from broker')
|
||||||
|
})
|
||||||
|
|
||||||
|
this.on('packetsend', packet => {
|
||||||
|
log.info({packet:packet},'outgoing packet to mqtt broker')
|
||||||
|
})
|
||||||
|
|
||||||
|
this.on('packetreceive', packet => {
|
||||||
|
log.info({packet:packet},'incoming packet from mqtt broker')
|
||||||
|
})
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
_extend (obj, funcs) {
|
||||||
|
let temp = {}
|
||||||
|
funcs = funcs.split(',')
|
||||||
|
funcs.forEach(func => {
|
||||||
|
temp[func] = this[func]
|
||||||
|
})
|
||||||
|
merge(this,obj)
|
||||||
|
funcs.forEach(func => {
|
||||||
|
this['_'+func]=this[func]
|
||||||
|
this[func] = temp[func]
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
} // end mqtt client class
|
|
@ -0,0 +1,6 @@
|
||||||
|
import Client from './client'
|
||||||
|
// import Broker from './broker'
|
||||||
|
|
||||||
|
export { Client as Client }
|
||||||
|
// export { Broker as Broker }
|
||||||
|
export default Client
|
Loading…
Reference in New Issue