add mqtt client as option to uci-base with uci-mqtt

master
David Kebler 2018-04-05 15:35:29 -07:00
parent df9a879fef
commit 5a0af4d75e
4 changed files with 63 additions and 2784 deletions

47
examples/mqtt.mjs Normal file
View File

@ -0,0 +1,47 @@
import Base from '../src/base'
// const USOCKET = __dirname + '/sample.sock'
const mqttfuncs = {
relays: {
on: async function(packet){
return new Promise( async (resolve) => {
console.log('ON')
console.dir(packet)
return resolve()
})
},
off: async function(packet){
return new Promise( async (resolve) => {
console.log('OFF')
console.dir(packet)
return resolve()
})
}
}
}
const process = async function (packet,topic) {
console.log('==============mqtt incoming packet/topic processor =========')
// console.log(packet, topic, this.topics[topic])
if (packet.cmd) await this.topics[topic][packet.cmd](packet)
else await this.topics[topic](packet)
console.log('===========================')
}
let mqtt = new Base({ mqtt:{ connect:{host:'localhost', port:1883}, id:'relays'} })
mqtt.topics = mqttfuncs
mqtt.socket.mqtt.registerPacketProcessor(process.bind(mqtt))
;
(async () => {
console.log(await mqtt.init())
// process.kill(process.pid, 'SIGTERM')
})().catch(err => {
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)
process.kill(process.pid, 'SIGTERM')
})

2781
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@ -4,8 +4,9 @@
"description": "Mutli Level/Transport Message/Event Classes", "description": "Mutli Level/Transport Message/Event Classes",
"main": "src/base", "main": "src/base",
"scripts": { "scripts": {
"deve": "SOCKETS_DIR=/opt/sockets ./node_modules/.bin/nodemon -r @std/esm -e mjs examples/four-in-one", "deve": "./node_modules/.bin/nodemon -r @std/esm -e mjs examples/four-in-one",
"fio": "SOCKETS_DIR=/opt/sockets node -r @std/esm examples/four-in-one", "fio": "node -r @std/esm examples/four-in-one",
"mqtt": "node -r @std/esm examples/mqtt",
"testw": "mocha -r @std/esm test/*.test.mjs --watch --recurse --watch-extensions mjs", "testw": "mocha -r @std/esm test/*.test.mjs --watch --recurse --watch-extensions mjs",
"test": "mocha -r @std/esm test/*.test.mjs", "test": "mocha -r @std/esm test/*.test.mjs",
"testci": "istanbul cover ./node_modules/.bin/_mocha --report lcovonly -- -R spec --recursive && codecov || true" "testci": "istanbul cover ./node_modules/.bin/_mocha --report lcovonly -- -R spec --recursive && codecov || true"
@ -37,6 +38,7 @@
"nodemon": "^1.14.12" "nodemon": "^1.14.12"
}, },
"dependencies": { "dependencies": {
"@uci/mqtt": "0.0.1",
"@uci/socket": "^0.1.1", "@uci/socket": "^0.1.1",
"@uci/utils": "^0.1.1" "@uci/utils": "^0.1.1"
} }

View File

@ -1,4 +1,5 @@
import UCISocket from '@uci/socket' import UCISocket from '@uci/socket'
import MQTT from '@uci/mqtt'
import EventEmitter from 'events' import EventEmitter from 'events'
import { bindFuncs } from '@uci/utils/src/function' import { bindFuncs } from '@uci/utils/src/function'
@ -40,7 +41,12 @@ export default class Base extends EventEmitter {
Object.assign(this.socket[socket.name],socket) // copy socket info props to new socket Object.assign(this.socket[socket.name],socket) // copy socket info props to new socket
this.socket[socket.name]._packetProcess = this._packetProcess.bind(this,socket.name) this.socket[socket.name]._packetProcess = this._packetProcess.bind(this,socket.name)
}) })
} else log.warn({opts:opts},'no sockets requested for creations -- on standard emitter') } else log.warn({opts:opts},'no sockets requested for creations -- using only standard emitter')
if (opts.mqtt) {
opts.mqtt.topcis = (opts.mqtt.topcis ? opts.mqtt.topics + ',' : '') + opts.id
this.socket.mqtt = new MQTT(opts.mqtt)
Object.assign(this.socket.mqtt, {name:'mqtt', type:'c', transport:'t'})
}
this.bindFuncs = bindFuncs this.bindFuncs = bindFuncs
// console.log('base opts', opts) // console.log('base opts', opts)
@ -98,6 +104,7 @@ export default class Base extends EventEmitter {
return Promise.all(sends.map(send => {return send(packet)})) return Promise.all(sends.map(send => {return send(packet)}))
} }
async sendMQTT(topic, packet) {return this.socket.mqtt.send(topic, packet)}
async sendTCP(packet) {return this.sendTransport(packet,'t')} async sendTCP(packet) {return this.sendTransport(packet,'t')}
async sendIPC(packet) {return this.sendTransport(packet,'n')} async sendIPC(packet) {return this.sendTransport(packet,'n')}
@ -165,6 +172,10 @@ export default class Base extends EventEmitter {
} }
} }
mqttProcessor(func) {
this.mqtt._packetProcess = func
}
socketNameProcessor(func,socket_name) { socketNameProcessor(func,socket_name) {
socket_name = socket_name || '_default' socket_name = socket_name || '_default'
this._processors[socket_name]._process = func this._processors[socket_name]._process = func