added in mqtt as socket type

refactored with addSocket and initSocket for individual sockets
now can do runtime socket addition after init
This commit is contained in:
David Kebler 2018-05-20 15:44:31 -07:00
parent 04268d2b9c
commit ea8253f4b0
7 changed files with 247 additions and 113 deletions

43
examples/dynamic.js Normal file
View file

@ -0,0 +1,43 @@
import Base from '../src/base'
// const USOCKET = __dirname + '/sample.sock'
let dy = new Base({id:'dynamic'})
;
(async () => {
await dy.init()
console.log('started', dy.started)
await Promise.all([dy.addSocket('con','c','t'),dy.addSocket('ser','s','t')])
dy.good = {
bad: function(packet){
return new Promise( async (resolve) => {
let res = {}
res.req = packet
res.cmd='good/ugly'
res.response='The Good The Bad and The Ugly'
return resolve(res)
})
},
ugly: function (packet) {
console.log('==============reply from Good Bad command =========')
console.log(packet.response)
console.log('===========================')
}
}
let packet = {}
// console.log('=============sending============')
packet = {cmd:'good:bad'}
console.log(packet)
await dy.send(packet)
process.kill(process.pid, 'SIGTERM')
})().catch(err => {
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)
process.kill(process.pid, 'SIGTERM')
})

View file

@ -50,9 +50,26 @@ const tcpfuncs = {
;
(async () => {
let fio = new Base({sockets:'uc#c>n,us#s>n,tc#c>t,ts#s>t,mqttp#c>m,mqtts#s>m,webs#s>w', mqtts:{broker:{host:'test', port: 5555},topic:'blah'}, id:'four-in-one'})
let fio = new Base({sockets:'uc#c>n,us#s>n,tc#c>t,ts#s>t,mqtts#s>m,webs#s>w', webs:{ port:8090 }, mqtts:{ topics:'write'}, id:'four-in-one'})
// let fio = new Base({sockets:'uc#c>n,us#s>n', id:'four-in-one'})
// await fio.init()
let err = await fio.init()
// console.log(err)
if(!err.find(socket => {
console.log(socket.name)
return socket.name==='mqtts'}))
{
const mqttProcess = async function (packet,topic) {
console.log('==============mqtt incoming packet/topic processor =========')
console.log(packet, topic, fio.s[topic][packet.cmd])
if (packet.cmd) console.log(await fio.s[topic][packet.cmd](packet))
console.log('===========================')
}
fio.socket.mqtts.registerPacketProcessor(mqttProcess)
}
fio.s = socketfuncs
fio.st = tcpfuncs
fio.ct = {reply: packet =>{
@ -85,26 +102,26 @@ const tcpfuncs = {
}
}
// let packet = {}
// console.log('=============sending============')
// // packet = {cmd:'echo', data:'some data to echo'}
// // console.log(packet)
// // await fio.send(packet,'uc')
// packet = {cmd:'write:happy', data:'My name is Zoe'}
// console.log(packet)
// //console.log(await fio.send(packet))
// console.log(fio.getPacketByName('uc',await fio.send(packet)).response)
// packet = {cmd:'write:sad', data:'data to write'}
// console.log(packet)
// await fio.send(packet)
// packet = {cmd:'write:sad', data:'sent only via tcp'}
// console.log(packet)
// console.log(fio.getPacketByName('tc2',await fio.sendTCP(packet)))
// packet = {cmd:'good:bad'}
// console.log(packet)
// await fio.send(packet)
let packet = {}
console.log('=============sending============')
packet = {cmd:'echo', data:'some data to echo'}
console.log(packet)
await fio.send(packet,'uc')
packet = {cmd:'write:happy', data:'My name is Zoe'}
console.log(packet)
console.log(await fio.send(packet))
console.log(fio.getPacketByName('uc',await fio.send(packet)).response)
packet = {cmd:'write:sad', data:'data to write'}
console.log(packet)
await fio.send(packet)
packet = {cmd:'write:sad', data:'sent only via tcp'}
console.log(packet)
console.log(fio.getPacketByName('tc',await fio.sendTCP(packet)))
packet = {cmd:'good:bad'}
console.log(packet)
await fio.send(packet)
process.kill(process.pid, 'SIGTERM')
// process.kill(process.pid, 'SIGTERM')
})().catch(err => {
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)

View file

@ -2,42 +2,43 @@ import Base from '../src/base'
// const USOCKET = __dirname + '/sample.sock'
const mqttfuncs = {
relays: {
on: async function(packet){
return new Promise( async (resolve) => {
console.log('ON')
console.dir(packet)
return resolve()
})
},
off: async function(packet){
return new Promise( async (resolve) => {
console.log('OFF')
console.dir(packet)
return resolve()
})
}
let dy = new Base({id:'dynamic'})
dy.switch = {
on: function(packet){
return new Promise( async (resolve) => {
console.log(`turning switch on for id ${packet.id||packet.data}`)
// call switch on here
let res = {}
res.cmd='switch/status'
res.status='on'
res.id = packet.id
return resolve(res)
})
},
off: function(packet){
return new Promise( async (resolve) => {
console.log(packet)
console.log(`turning switch off for id ${packet.id||packet.data}`)
// call switch on here
let res = {}
res.cmd='switch/status'
res.status='off'
res.id = packet.id
return resolve(res)
})
}
}
const process = async function (packet,topic) {
console.log('==============mqtt incoming packet/topic processor =========')
// console.log(packet, topic, this.topics[topic])
if (packet.cmd) await this.topics[topic][packet.cmd](packet)
else await this.topics[topic](packet)
console.log('===========================')
}
let mqtt = new Base({ mqtt:{ connect:{host:'localhost', port:1883}, id:'relays'} })
mqtt.topics = mqttfuncs
mqtt.socket.mqtt.registerPacketProcessor(process.bind(mqtt))
;
(async () => {
console.log(await mqtt.init())
await dy.init()
console.log('started', dy.started)
// await Promise.all([dy.addSocket('mqc','c','m'),dy.addSocket('mqs','s','m')])
await dy.addSocket('mqs','s','m')
dy.socket.mqs.subscribe(['switch/on','switch/off','switch/toggle'])
})().catch(err => {
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)

46
examples/web.js Normal file
View file

@ -0,0 +1,46 @@
import Base from '../src/base'
// const USOCKET = __dirname + '/sample.sock'
let dy = new Base({id:'dynamic'})
dy.switch = {
on: function(packet){
return new Promise( async (resolve) => {
console.log(`turning switch on for id ${packet.id||packet.data}`)
// call switch on here
let res = {}
res.cmd='switch/status'
res.status='on'
res.id = packet.id
return resolve(res)
})
},
off: function(packet){
return new Promise( async (resolve) => {
console.log(packet)
console.log(`turning switch off for id ${packet.id||packet.data}`)
// call switch on here
let res = {}
res.cmd='switch/status'
res.status='off'
res.id = packet.id
return resolve(res)
})
}
}
;
(async () => {
await dy.init()
console.log('started', dy.started)
// await Promise.all([dy.addSocket('mqc','c','m'),dy.addSocket('mqs','s','m')])
await dy.addSocket('mqs','s','m')
dy.socket.mqs.subscribe(['switch/on','switch/off','switch/toggle'])
})().catch(err => {
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)
process.kill(process.pid, 'SIGTERM')
})

View file

@ -6,6 +6,8 @@
"scripts": {
"deve": "./node_modules/.bin/nodemon -r esm examples/four-in-one",
"fio": "node -r esm examples/four-in-one",
"dy": "node -r esm examples/dynamic",
"web": "node -r esm examples/web",
"mqtt": "node -r esm examples/mqtt",
"testw": "mocha -r esm test/*.test.mjs --watch --recurse --watch-extensions mjs",
"test": "mocha -r esm test/*.test.mjs",
@ -31,15 +33,17 @@
"chai": "^4.1.2",
"chai-as-promised": "^7.1.1",
"codecov": "^3.0.0",
"esm": "^3.0.22",
"esm": "^3.0.36",
"istanbul": "^0.4.5",
"mocha": "^5.0.1",
"mocha": "^5.2.0",
"nodemon": "^1.14.12"
},
"dependencies": {
"@uci/logger": "^0.0.3",
"@uci/mqtt": "^0.0.5",
"@uci/mqtt": "0.0.9",
"@uci/socket": "^0.1.1",
"@uci/utils": "^0.1.1"
"@uci/utils": "^0.1.1",
"@uci/websocket": "^0.1.8",
"p-settle": "^2.1.0"
}
}

View file

@ -1,8 +1,12 @@
import UCISocket from '@uci/socket'
import MQTT from '@uci/mqtt'
import WebSocket from '@uci/socket'
// communication modules
import Socket from '@uci/socket'
import MQTT from '@uci/mqtt' // requires broker
// import MQTT from '../../uci-mqtt/src/client' // requires broker
import WebSocket from '@uci/websocket' // server only
// TODO
import EventEmitter from 'events'
import pSettle from 'p-settle'
import { bindFuncs } from '@uci/utils/src/function'
import { processor, commands, namespaces } from './processing'
@ -21,35 +25,19 @@ export default class Base extends EventEmitter {
this._processors = { _default: processor }
this._defaultCmds = commands
this._namespaces = namespaces
// sockets: option
// of this form '<name>#<c/s>><n/np/t/tcp/m/mqtt/w/web>'
this.bindFuncs = bindFuncs
// predefined sockets:
// comma delimited list of this form '<name>#<c/p/s>><n=np/t=tcp/m=mqtt/w=web>'
this.socket = {}
if(opts.sockets) {
opts.sockets.split(/[,|\s]+/).forEach( socketStr => {
let socket = {}
socketStr.split(/[>#]+/).map(function(prop,index) {
socket[SOCKET_INFO_KEYS[index]] = prop
})
if (socket.type === 's') this.addSocket(socket.name,socket.transport,opts[socket.name])
if (socket.type === 'c') this.addConsumer(socket.name,socket.transport,opts[socket.name])
// if (!opts[socket.name]) opts[socket.name] = {}
// if (socket.transport ==='n') opts[socket.name].np = true
// opts[socket.name].id = this.id +':'+ socket.name
// // console.log(TRANSLATIONS[socket.type])
// this.socket[socket.name] = new UCISocket[TRANSLATE[socket.type]](opts[socket.name])
// // console.log(socket.name, this.socket[socket.name].send)
// Object.assign(this.socket[socket.name],socket) // copy socket info props to new socket
// this.socket[socket.name]._packetProcess = this._packetProcess.bind(this,socket.name)
this.addSocket(socket.name, socket.type, socket.transport,opts[socket.name])
})
} else log.warn({opts:opts},'no sockets requested for creation -- using only standard emitter')
if (opts.mqtt) {
opts.mqtt.topcis = (opts.mqtt.topcis ? opts.mqtt.topics + ',' : '') + opts.id
this.socket.mqtt = new MQTT(opts.mqtt)
Object.assign(this.socket.mqtt, {name:'mqtt', type:'c', transport:'t'})
}
this.bindFuncs = bindFuncs
// console.log('base opts', opts)
} // end constructor
@ -57,38 +45,74 @@ export default class Base extends EventEmitter {
async init () {
let sockets = []
let initSockets = []
for(let name of Object.keys(this.socket)){
if (this.socket[name].type ==='s') {
sockets.push(this.socket[name].create())
// setTimeout(this.socket[type].create,200) // test for consumer retry for delayed socket creation
}
else {
sockets.push(this.socket[name].connect())
}
initSockets.push(this.initSocket(name))
sockets.push(name)
}
// TODO maybe throw if one fails
this.started = true
return Promise.all(sockets).then(() => {return 'ready'}).catch((err) => {return err})
return pSettle(initSockets).then(res=>{
console.log(res)
let err = []
res.forEach((p,index) => {
if (p.isRejected) {
err.push({name:sockets[index],err:p.reason})
}
})
return err
})
// TODO if a websocket server was working then push status
// TODO if no mqtt broker then attempt to start one
} // init
async addSocket (name, transport, options) {
// type: (np) named pipe/ unix socket, (tcp) tcp socket, (mqtt) mqtt subscribe, (web) websocket
console.log('Socket=> ', name, TRANSLATE[transport],options)
async addSocket (name, type='c', transport='n', options={}) {
// transport: (n) named pipe/ unix socket, (t) tcp socket, (m) mqtt subscribe, (w) websocket
log.info({socketName:name, type:type, tranport:transport, options:options},`adding socket ${name}`)
options.id = this.id +':'+ name
switch (transport) {
case 'n':
options.path = options.path || true
// falls through
case 't':
this.socket[name] = new Socket[TRANSLATE[type]](options)
break
case 'm':
if (type === 'p') type ='c'
options.connect = options.connect || {}
options.connect.connectTimeout = options.connect.connectTimeout || 5000
this.socket[name] = new MQTT(options)
break
case 'w':
if (type ==='s') this.socket[name] = new WebSocket(options)
else log.warn({name:name,type:type,transport:transport},'Web socket not created Consumer/Client Web Socket not supported')
}
// console.log(name, '====',this.socket[name])
this.socket[name].name = name
this.socket[name].type = type
this.socket[name].transport = transport
this.socket[name]._packetProcess = this._packetProcess.bind(this,name)
if (this.started) await this.initSocket(name)
else return `socket ${name} added`
// console.log(name, '====',this.socket[name])
}
async initSocket(name) {
let socket = this.socket[name]
let init = {}
if (this.socket[name].type ==='s' && this.socket[name].transport !=='m') {
init = socket.create
} else {
init = socket.connect
}
log.info(`initializing socket ${name}, ${socket.type}, ${socket.transport}`)
if(this.started) return `socket ${name} added and initialzed, ${await init()}`
else return init()
}
async removeSocket (name) {
// type: (np) named pipe/ unix socket, (tcp) tcp socket, (mqtt) mqtt subscribe, (web) websocket
}
async addConsumer (name, transport, options) {
console.log('Consumer=> ', name,TRANSLATE[transport],options)
// type: (np) named pipe/ unix socket, (tcp) tcp socket, (mqtt) mqtt subscribe
}
async removeConsumer (name) {
// type: (np) named pipe/ unix socket, (tcp) tcp socket, (mqtt) mqtt subscribe
//TODO
}
async send (name,packet) {
@ -123,7 +147,7 @@ export default class Base extends EventEmitter {
return Promise.all(sends.map(send => {return send(packet)}))
}
async sendMQTT(topic, packet) {return this.socket.mqtt.send(topic, packet)}
// async sendMQTT(topic, packet) {return this.socket.mqtt.send(topic, packet)}
async sendTCP(packet) {return this.sendTransport(packet,'t')}
async sendIPC(packet) {return this.sendTransport(packet,'n')}
@ -158,6 +182,7 @@ export default class Base extends EventEmitter {
}
Object.assign(this._defaultCmds.c,funcs)
}
// use s: and c: keys TODO need to change this
addNamedProcessing(name,funcs,type) {
if (type){
@ -170,7 +195,7 @@ export default class Base extends EventEmitter {
}
// takes and returns a packet
beforeWriteHook (type,funcs){} // TODO before packet send
beforeSendHook (type,funcs){} // TODO before packet send
afterReceiveHook(type,funcs){} // TODO after receiv
afterProcessHook(type,funcs){} // TODO
@ -191,10 +216,6 @@ export default class Base extends EventEmitter {
}
}
mqttProcessor(func) {
this.mqtt._packetProcess = func
}
socketNameProcessor(func,socket_name) {
socket_name = socket_name || '_default'
this._processors[socket_name]._process = func
@ -205,10 +226,9 @@ export default class Base extends EventEmitter {
else return this._namespaces[type].unshift(space)
}
packetProcessor(func) {
this._packetProcess = func
}
// registerPacketProcessor(name,func) {
// this._packetProcess = func
// }
/*
@ -217,8 +237,8 @@ export default class Base extends EventEmitter {
*
*/
_transport(name) {return this.socket[name].transport}
_type(name) {return this.socket[name].type}
_transport(name) {return this.socket[name].transport} //getter for socket transport
_type(name) {return this.socket[name].type} //getter for socket type
_getTransportNamespaces(socket) {
return this._namespace[this._type(socket)+this._transport(socket)]
@ -234,6 +254,7 @@ export default class Base extends EventEmitter {
return cmd_func
}
// takes command and returns corresponding function in a hash
_getCmdFunc (cmd,obj) {
// console.log('obj',obj)
if (typeof cmd ==='string') {

View file

@ -38,6 +38,8 @@ const namespaces = {
ct: ['ct'],
sn: ['sn'],
st: ['st'],
cm: ['cm'],
sm: ['sm'],
}
/*