From ea8253f4b0b1676589f2e947f407b7ec8aa39ff2 Mon Sep 17 00:00:00 2001 From: David Kebler Date: Sun, 20 May 2018 15:44:31 -0700 Subject: [PATCH] added in mqtt as socket type refactored with addSocket and initSocket for individual sockets now can do runtime socket addition after init --- examples/dynamic.js | 43 +++++++++++++ examples/four-in-one.js | 59 +++++++++++------ examples/mqtt.js | 59 ++++++++--------- examples/web.js | 46 +++++++++++++ package.json | 12 ++-- src/base.js | 139 +++++++++++++++++++++++----------------- src/processing.js | 2 + 7 files changed, 247 insertions(+), 113 deletions(-) create mode 100644 examples/dynamic.js create mode 100644 examples/web.js diff --git a/examples/dynamic.js b/examples/dynamic.js new file mode 100644 index 0000000..02a61da --- /dev/null +++ b/examples/dynamic.js @@ -0,0 +1,43 @@ +import Base from '../src/base' + +// const USOCKET = __dirname + '/sample.sock' + +let dy = new Base({id:'dynamic'}) + +; +(async () => { + + + await dy.init() + console.log('started', dy.started) + await Promise.all([dy.addSocket('con','c','t'),dy.addSocket('ser','s','t')]) + + dy.good = { + bad: function(packet){ + return new Promise( async (resolve) => { + let res = {} + res.req = packet + res.cmd='good/ugly' + res.response='The Good The Bad and The Ugly' + return resolve(res) + }) + }, + ugly: function (packet) { + console.log('==============reply from Good Bad command =========') + console.log(packet.response) + console.log('===========================') + } + } + + let packet = {} + // console.log('=============sending============') + packet = {cmd:'good:bad'} + console.log(packet) + await dy.send(packet) + + process.kill(process.pid, 'SIGTERM') + +})().catch(err => { + console.error('FATAL: UNABLE TO START SYSTEM!\n',err) + process.kill(process.pid, 'SIGTERM') +}) diff --git a/examples/four-in-one.js b/examples/four-in-one.js index 4e75713..bcece1a 100644 --- a/examples/four-in-one.js +++ b/examples/four-in-one.js @@ -50,9 +50,26 @@ const tcpfuncs = { ; (async () => { - let fio = new Base({sockets:'uc#c>n,us#s>n,tc#c>t,ts#s>t,mqttp#c>m,mqtts#s>m,webs#s>w', mqtts:{broker:{host:'test', port: 5555},topic:'blah'}, id:'four-in-one'}) + let fio = new Base({sockets:'uc#c>n,us#s>n,tc#c>t,ts#s>t,mqtts#s>m,webs#s>w', webs:{ port:8090 }, mqtts:{ topics:'write'}, id:'four-in-one'}) // let fio = new Base({sockets:'uc#c>n,us#s>n', id:'four-in-one'}) - // await fio.init() + let err = await fio.init() + // console.log(err) + if(!err.find(socket => { + console.log(socket.name) + return socket.name==='mqtts'})) + + { + const mqttProcess = async function (packet,topic) { + console.log('==============mqtt incoming packet/topic processor =========') + console.log(packet, topic, fio.s[topic][packet.cmd]) + if (packet.cmd) console.log(await fio.s[topic][packet.cmd](packet)) + console.log('===========================') + } + + fio.socket.mqtts.registerPacketProcessor(mqttProcess) + } + + fio.s = socketfuncs fio.st = tcpfuncs fio.ct = {reply: packet =>{ @@ -85,26 +102,26 @@ const tcpfuncs = { } } - // let packet = {} - // console.log('=============sending============') - // // packet = {cmd:'echo', data:'some data to echo'} - // // console.log(packet) - // // await fio.send(packet,'uc') - // packet = {cmd:'write:happy', data:'My name is Zoe'} - // console.log(packet) - // //console.log(await fio.send(packet)) - // console.log(fio.getPacketByName('uc',await fio.send(packet)).response) - // packet = {cmd:'write:sad', data:'data to write'} - // console.log(packet) - // await fio.send(packet) - // packet = {cmd:'write:sad', data:'sent only via tcp'} - // console.log(packet) - // console.log(fio.getPacketByName('tc2',await fio.sendTCP(packet))) - // packet = {cmd:'good:bad'} - // console.log(packet) - // await fio.send(packet) + let packet = {} + console.log('=============sending============') + packet = {cmd:'echo', data:'some data to echo'} + console.log(packet) + await fio.send(packet,'uc') + packet = {cmd:'write:happy', data:'My name is Zoe'} + console.log(packet) + console.log(await fio.send(packet)) + console.log(fio.getPacketByName('uc',await fio.send(packet)).response) + packet = {cmd:'write:sad', data:'data to write'} + console.log(packet) + await fio.send(packet) + packet = {cmd:'write:sad', data:'sent only via tcp'} + console.log(packet) + console.log(fio.getPacketByName('tc',await fio.sendTCP(packet))) + packet = {cmd:'good:bad'} + console.log(packet) + await fio.send(packet) - process.kill(process.pid, 'SIGTERM') + // process.kill(process.pid, 'SIGTERM') })().catch(err => { console.error('FATAL: UNABLE TO START SYSTEM!\n',err) diff --git a/examples/mqtt.js b/examples/mqtt.js index 9a106e3..1670cb3 100644 --- a/examples/mqtt.js +++ b/examples/mqtt.js @@ -2,42 +2,43 @@ import Base from '../src/base' // const USOCKET = __dirname + '/sample.sock' -const mqttfuncs = { - relays: { - on: async function(packet){ - return new Promise( async (resolve) => { - console.log('ON') - console.dir(packet) - return resolve() - }) - }, - off: async function(packet){ - return new Promise( async (resolve) => { - console.log('OFF') - console.dir(packet) - return resolve() - }) - } +let dy = new Base({id:'dynamic'}) + +dy.switch = { + on: function(packet){ + return new Promise( async (resolve) => { + console.log(`turning switch on for id ${packet.id||packet.data}`) + // call switch on here + let res = {} + res.cmd='switch/status' + res.status='on' + res.id = packet.id + return resolve(res) + }) + }, + off: function(packet){ + return new Promise( async (resolve) => { + console.log(packet) + console.log(`turning switch off for id ${packet.id||packet.data}`) + // call switch on here + let res = {} + res.cmd='switch/status' + res.status='off' + res.id = packet.id + return resolve(res) + }) } } -const process = async function (packet,topic) { - console.log('==============mqtt incoming packet/topic processor =========') - // console.log(packet, topic, this.topics[topic]) - if (packet.cmd) await this.topics[topic][packet.cmd](packet) - else await this.topics[topic](packet) - console.log('===========================') -} - -let mqtt = new Base({ mqtt:{ connect:{host:'localhost', port:1883}, id:'relays'} }) -mqtt.topics = mqttfuncs -mqtt.socket.mqtt.registerPacketProcessor(process.bind(mqtt)) - ; (async () => { - console.log(await mqtt.init()) + await dy.init() + console.log('started', dy.started) + // await Promise.all([dy.addSocket('mqc','c','m'),dy.addSocket('mqs','s','m')]) + await dy.addSocket('mqs','s','m') + dy.socket.mqs.subscribe(['switch/on','switch/off','switch/toggle']) })().catch(err => { console.error('FATAL: UNABLE TO START SYSTEM!\n',err) diff --git a/examples/web.js b/examples/web.js new file mode 100644 index 0000000..1670cb3 --- /dev/null +++ b/examples/web.js @@ -0,0 +1,46 @@ +import Base from '../src/base' + +// const USOCKET = __dirname + '/sample.sock' + +let dy = new Base({id:'dynamic'}) + +dy.switch = { + on: function(packet){ + return new Promise( async (resolve) => { + console.log(`turning switch on for id ${packet.id||packet.data}`) + // call switch on here + let res = {} + res.cmd='switch/status' + res.status='on' + res.id = packet.id + return resolve(res) + }) + }, + off: function(packet){ + return new Promise( async (resolve) => { + console.log(packet) + console.log(`turning switch off for id ${packet.id||packet.data}`) + // call switch on here + let res = {} + res.cmd='switch/status' + res.status='off' + res.id = packet.id + return resolve(res) + }) + } +} + + +; +(async () => { + + await dy.init() + console.log('started', dy.started) + // await Promise.all([dy.addSocket('mqc','c','m'),dy.addSocket('mqs','s','m')]) + await dy.addSocket('mqs','s','m') + dy.socket.mqs.subscribe(['switch/on','switch/off','switch/toggle']) + +})().catch(err => { + console.error('FATAL: UNABLE TO START SYSTEM!\n',err) + process.kill(process.pid, 'SIGTERM') +}) diff --git a/package.json b/package.json index 11b5ef0..2877699 100644 --- a/package.json +++ b/package.json @@ -6,6 +6,8 @@ "scripts": { "deve": "./node_modules/.bin/nodemon -r esm examples/four-in-one", "fio": "node -r esm examples/four-in-one", + "dy": "node -r esm examples/dynamic", + "web": "node -r esm examples/web", "mqtt": "node -r esm examples/mqtt", "testw": "mocha -r esm test/*.test.mjs --watch --recurse --watch-extensions mjs", "test": "mocha -r esm test/*.test.mjs", @@ -31,15 +33,17 @@ "chai": "^4.1.2", "chai-as-promised": "^7.1.1", "codecov": "^3.0.0", - "esm": "^3.0.22", + "esm": "^3.0.36", "istanbul": "^0.4.5", - "mocha": "^5.0.1", + "mocha": "^5.2.0", "nodemon": "^1.14.12" }, "dependencies": { "@uci/logger": "^0.0.3", - "@uci/mqtt": "^0.0.5", + "@uci/mqtt": "0.0.9", "@uci/socket": "^0.1.1", - "@uci/utils": "^0.1.1" + "@uci/utils": "^0.1.1", + "@uci/websocket": "^0.1.8", + "p-settle": "^2.1.0" } } diff --git a/src/base.js b/src/base.js index f826d62..481f5e3 100644 --- a/src/base.js +++ b/src/base.js @@ -1,8 +1,12 @@ -import UCISocket from '@uci/socket' -import MQTT from '@uci/mqtt' -import WebSocket from '@uci/socket' +// communication modules +import Socket from '@uci/socket' +import MQTT from '@uci/mqtt' // requires broker +// import MQTT from '../../uci-mqtt/src/client' // requires broker +import WebSocket from '@uci/websocket' // server only +// TODO import EventEmitter from 'events' +import pSettle from 'p-settle' import { bindFuncs } from '@uci/utils/src/function' import { processor, commands, namespaces } from './processing' @@ -21,35 +25,19 @@ export default class Base extends EventEmitter { this._processors = { _default: processor } this._defaultCmds = commands this._namespaces = namespaces - // sockets: option - // of this form '#>' + this.bindFuncs = bindFuncs + // predefined sockets: + // comma delimited list of this form '#>' + this.socket = {} if(opts.sockets) { opts.sockets.split(/[,|\s]+/).forEach( socketStr => { let socket = {} socketStr.split(/[>#]+/).map(function(prop,index) { socket[SOCKET_INFO_KEYS[index]] = prop }) - if (socket.type === 's') this.addSocket(socket.name,socket.transport,opts[socket.name]) - if (socket.type === 'c') this.addConsumer(socket.name,socket.transport,opts[socket.name]) - - // if (!opts[socket.name]) opts[socket.name] = {} - // if (socket.transport ==='n') opts[socket.name].np = true - // opts[socket.name].id = this.id +':'+ socket.name - // // console.log(TRANSLATIONS[socket.type]) - // this.socket[socket.name] = new UCISocket[TRANSLATE[socket.type]](opts[socket.name]) - // // console.log(socket.name, this.socket[socket.name].send) - // Object.assign(this.socket[socket.name],socket) // copy socket info props to new socket - // this.socket[socket.name]._packetProcess = this._packetProcess.bind(this,socket.name) - + this.addSocket(socket.name, socket.type, socket.transport,opts[socket.name]) }) - } else log.warn({opts:opts},'no sockets requested for creation -- using only standard emitter') - if (opts.mqtt) { - opts.mqtt.topcis = (opts.mqtt.topcis ? opts.mqtt.topics + ',' : '') + opts.id - this.socket.mqtt = new MQTT(opts.mqtt) - Object.assign(this.socket.mqtt, {name:'mqtt', type:'c', transport:'t'}) } - this.bindFuncs = bindFuncs - // console.log('base opts', opts) } // end constructor @@ -57,38 +45,74 @@ export default class Base extends EventEmitter { async init () { let sockets = [] + let initSockets = [] for(let name of Object.keys(this.socket)){ - if (this.socket[name].type ==='s') { - sockets.push(this.socket[name].create()) - // setTimeout(this.socket[type].create,200) // test for consumer retry for delayed socket creation - } - else { - sockets.push(this.socket[name].connect()) - } + initSockets.push(this.initSocket(name)) + sockets.push(name) } - // TODO maybe throw if one fails this.started = true - return Promise.all(sockets).then(() => {return 'ready'}).catch((err) => {return err}) + return pSettle(initSockets).then(res=>{ + console.log(res) + let err = [] + res.forEach((p,index) => { + if (p.isRejected) { + err.push({name:sockets[index],err:p.reason}) + } + }) + return err + }) + // TODO if a websocket server was working then push status + // TODO if no mqtt broker then attempt to start one } // init - async addSocket (name, transport, options) { - // type: (np) named pipe/ unix socket, (tcp) tcp socket, (mqtt) mqtt subscribe, (web) websocket - console.log('Socket=> ', name, TRANSLATE[transport],options) + async addSocket (name, type='c', transport='n', options={}) { + // transport: (n) named pipe/ unix socket, (t) tcp socket, (m) mqtt subscribe, (w) websocket + log.info({socketName:name, type:type, tranport:transport, options:options},`adding socket ${name}`) + options.id = this.id +':'+ name + switch (transport) { + case 'n': + options.path = options.path || true + // falls through + case 't': + this.socket[name] = new Socket[TRANSLATE[type]](options) + break + case 'm': + if (type === 'p') type ='c' + options.connect = options.connect || {} + options.connect.connectTimeout = options.connect.connectTimeout || 5000 + this.socket[name] = new MQTT(options) + break + case 'w': + if (type ==='s') this.socket[name] = new WebSocket(options) + else log.warn({name:name,type:type,transport:transport},'Web socket not created Consumer/Client Web Socket not supported') + } + // console.log(name, '====',this.socket[name]) + this.socket[name].name = name + this.socket[name].type = type + this.socket[name].transport = transport + this.socket[name]._packetProcess = this._packetProcess.bind(this,name) + if (this.started) await this.initSocket(name) + else return `socket ${name} added` + // console.log(name, '====',this.socket[name]) + + } + + async initSocket(name) { + let socket = this.socket[name] + let init = {} + if (this.socket[name].type ==='s' && this.socket[name].transport !=='m') { + init = socket.create + } else { + init = socket.connect + } + log.info(`initializing socket ${name}, ${socket.type}, ${socket.transport}`) + if(this.started) return `socket ${name} added and initialzed, ${await init()}` + else return init() } async removeSocket (name) { - // type: (np) named pipe/ unix socket, (tcp) tcp socket, (mqtt) mqtt subscribe, (web) websocket - } - - - async addConsumer (name, transport, options) { - console.log('Consumer=> ', name,TRANSLATE[transport],options) - // type: (np) named pipe/ unix socket, (tcp) tcp socket, (mqtt) mqtt subscribe - } - - async removeConsumer (name) { - // type: (np) named pipe/ unix socket, (tcp) tcp socket, (mqtt) mqtt subscribe + //TODO } async send (name,packet) { @@ -123,7 +147,7 @@ export default class Base extends EventEmitter { return Promise.all(sends.map(send => {return send(packet)})) } - async sendMQTT(topic, packet) {return this.socket.mqtt.send(topic, packet)} + // async sendMQTT(topic, packet) {return this.socket.mqtt.send(topic, packet)} async sendTCP(packet) {return this.sendTransport(packet,'t')} async sendIPC(packet) {return this.sendTransport(packet,'n')} @@ -158,6 +182,7 @@ export default class Base extends EventEmitter { } Object.assign(this._defaultCmds.c,funcs) } + // use s: and c: keys TODO need to change this addNamedProcessing(name,funcs,type) { if (type){ @@ -170,7 +195,7 @@ export default class Base extends EventEmitter { } // takes and returns a packet - beforeWriteHook (type,funcs){} // TODO before packet send + beforeSendHook (type,funcs){} // TODO before packet send afterReceiveHook(type,funcs){} // TODO after receiv afterProcessHook(type,funcs){} // TODO @@ -191,10 +216,6 @@ export default class Base extends EventEmitter { } } - mqttProcessor(func) { - this.mqtt._packetProcess = func - } - socketNameProcessor(func,socket_name) { socket_name = socket_name || '_default' this._processors[socket_name]._process = func @@ -205,10 +226,9 @@ export default class Base extends EventEmitter { else return this._namespaces[type].unshift(space) } - packetProcessor(func) { - this._packetProcess = func - } - + // registerPacketProcessor(name,func) { + // this._packetProcess = func + // } /* @@ -217,8 +237,8 @@ export default class Base extends EventEmitter { * */ - _transport(name) {return this.socket[name].transport} - _type(name) {return this.socket[name].type} + _transport(name) {return this.socket[name].transport} //getter for socket transport + _type(name) {return this.socket[name].type} //getter for socket type _getTransportNamespaces(socket) { return this._namespace[this._type(socket)+this._transport(socket)] @@ -234,6 +254,7 @@ export default class Base extends EventEmitter { return cmd_func } + // takes command and returns corresponding function in a hash _getCmdFunc (cmd,obj) { // console.log('obj',obj) if (typeof cmd ==='string') { diff --git a/src/processing.js b/src/processing.js index 8f00b33..cc49d5c 100644 --- a/src/processing.js +++ b/src/processing.js @@ -38,6 +38,8 @@ const namespaces = { ct: ['ct'], sn: ['sn'], st: ['st'], + cm: ['cm'], + sm: ['sm'], } /*