refactor for allowing dynamic addition of sockets

add websocket (listener) to socket transports
improved integration of mqtt transport with topic being translated to cmd and vice versa
David Kebler 2018-05-16 11:17:38 -07:00
parent 78bcf3009a
commit 04268d2b9c
3 changed files with 59 additions and 88 deletions

View File

@ -50,9 +50,9 @@ const tcpfuncs = {
(async () => {
let fio = new Base({sockets:'uc#c>n,us#s>n,tc#c>t,ts#s>t,tc2#c>t', id:'four-in-one'})
let fio = new Base({sockets:'uc#c>n,us#s>n,tc#c>t,ts#s>t,mqttp#c>m,mqtts#s>m,webs#s>w', mqtts:{broker:{host:'test', port: 5555},topic:'blah'}, id:'four-in-one'})
// let fio = new Base({sockets:'uc#c>n,us#s>n', id:'four-in-one'})
await fio.init()
// await fio.init()
fio.s = socketfuncs = tcpfuncs
fio.ct = {reply: packet =>{
@ -85,24 +85,24 @@ const tcpfuncs = {
let packet = {}
// packet = {cmd:'echo', data:'some data to echo'}
// let packet = {}
// console.log('=============sending============')
// // packet = {cmd:'echo', data:'some data to echo'}
// // console.log(packet)
// // await fio.send(packet,'uc')
// packet = {cmd:'write:happy', data:'My name is Zoe'}
// console.log(packet)
// await fio.send(packet,'uc')
packet = {cmd:'write:happy', data:'My name is Zoe'}
//console.log(await fio.send(packet))
console.log(fio.getPacketByName('uc',await fio.send(packet)).response)
packet = {cmd:'write:sad', data:'data to write'}
await fio.send(packet)
packet = {cmd:'write:sad', data:'sent only via tcp'}
console.log(fio.getPacketByName('tc2',await fio.sendTCP(packet)))
packet = {cmd:'good:bad'}
await fio.send(packet)
// //console.log(await fio.send(packet))
// console.log(fio.getPacketByName('uc',await fio.send(packet)).response)
// packet = {cmd:'write:sad', data:'data to write'}
// console.log(packet)
// await fio.send(packet)
// packet = {cmd:'write:sad', data:'sent only via tcp'}
// console.log(packet)
// console.log(fio.getPacketByName('tc2',await fio.sendTCP(packet)))
// packet = {cmd:'good:bad'}
// console.log(packet)
// await fio.send(packet)
process.kill(, 'SIGTERM')

View File

@ -1,6 +1,7 @@
import UCISocket from '@uci/socket'
import MQTT from '@uci/mqtt'
// import MQTT from '../../uci-mqtt/src/client'
import WebSocket from '@uci/socket'
import EventEmitter from 'events'
import { bindFuncs } from '@uci/utils/src/function'
@ -15,26 +16,33 @@ export default class Base extends EventEmitter { = || || 'uci-base:'+ new Date().getTime()
log = logger({name:'base',})
this.desc = opts.desc // additional details for humans
this.started = false // flag to know when instance has been initialized
this.socket={} // holds all the various communication sockets
this._processors = { _default: processor }
this._defaultCmds = commands
this._namespaces = namespaces
// sockets: option
// of this form '<name>#<c/s>><n/np/t/tcp/m/mqtt/w/web>'
if(opts.sockets) {
opts.sockets.split(/[,|\s]+/).forEach( socketStr => {
let socket = {}
socketStr.split(/[>#]+/).map(function(prop,index) {
socket[SOCKET_INFO_KEYS[index]] = prop
if (!opts[]) opts[] = {}
if (socket.transport ==='n') opts[].np = true
opts[].id = +':'+
// console.log(TRANSLATIONS[socket.type])
this.socket[] = new UCISocket[TRANSLATE[socket.type]](opts[])
// console.log(, this.socket[].send)
Object.assign(this.socket[],socket) // copy socket info props to new socket
this.socket[]._packetProcess = this._packetProcess.bind(this,
if (socket.type === 's') this.addSocket(,socket.transport,opts[])
if (socket.type === 'c') this.addConsumer(,socket.transport,opts[])
// if (!opts[]) opts[] = {}
// if (socket.transport ==='n') opts[].np = true
// opts[].id = +':'+
// // console.log(TRANSLATIONS[socket.type])
// this.socket[] = new UCISocket[TRANSLATE[socket.type]](opts[])
// // console.log(, this.socket[].send)
// Object.assign(this.socket[],socket) // copy socket info props to new socket
// this.socket[]._packetProcess = this._packetProcess.bind(this,
} else log.warn({opts:opts},'no sockets requested for creations -- using only standard emitter')
} else log.warn({opts:opts},'no sockets requested for creation -- using only standard emitter')
if (opts.mqtt) {
opts.mqtt.topcis = (opts.mqtt.topcis ? opts.mqtt.topics + ',' : '') +
this.socket.mqtt = new MQTT(opts.mqtt)
@ -45,6 +53,8 @@ export default class Base extends EventEmitter {
} // end constructor
// Class Methods
async init () {
let sockets = []
for(let name of Object.keys(this.socket)){
@ -57,13 +67,29 @@ export default class Base extends EventEmitter {
// TODO maybe throw if one fails
this.started = true
return Promise.all(sockets).then(() => {return 'ready'}).catch((err) => {return err})
} // init
async end(name) {} // TODO end all or named sockets
async addSocket (name, transport, options) {
// type: (np) named pipe/ unix socket, (tcp) tcp socket, (mqtt) mqtt subscribe, (web) websocket
console.log('Socket=> ', name, TRANSLATE[transport],options)
async addSocket(options) {} //TODO add ability to add another socket at runtime and initialize
async removeSocket (name) {
// type: (np) named pipe/ unix socket, (tcp) tcp socket, (mqtt) mqtt subscribe, (web) websocket
async addConsumer (name, transport, options) {
console.log('Consumer=> ', name,TRANSLATE[transport],options)
// type: (np) named pipe/ unix socket, (tcp) tcp socket, (mqtt) mqtt subscribe
async removeConsumer (name) {
// type: (np) named pipe/ unix socket, (tcp) tcp socket, (mqtt) mqtt subscribe
async send (name,packet) {
if (typeof name !== 'string') {
@ -244,4 +270,4 @@ export default class Base extends EventEmitter {
} // end class
const SOCKET_INFO_KEYS = ['name','type','transport']
const TRANSLATE= {n:'Named Pipe',t:'TCP',s:'Socket',c:'Consumer'}
const TRANSLATE= {n:'Named Pipe',t:'TCP',s:'Socket',c:'Consumer', m:'MQTT', w:'Web Socket'}

View File

@ -1,55 +0,0 @@
// this._processing refers to this module/hash
export default {
s:{ // s is for socket/server
_process: async function (packet,sname) {
if (!packet.cmd) return {error: 'no command in packet', packet: packet }
// TODO before hook here
// TODO Add namespce
if (this._default[sname]) if (this._default[sname][packet.cmd]) return await this._default[sname][packet.cmd](packet)
if (this[packet.cmd]) return await this[packet.cmd](packet)
// TODO add socket transport
if (this._default.s[packet.cmd]) return await this._default.s[packet.cmd](packet)
// TODO after hook here
return {error: 'no socket processing function supplied for command', packet: packet }
echo: packet => {
packet.processed = true
packet.cmd = 'reply' = 'default socket echo'
return packet
c: { // c is for consumer/client
_process: async function (packet,sname) {
if (packet.error) this._default.c.error(packet)
if (packet.cmd) {
// TODO before hook here.
// TODO Add namespce
if (this._default[sname]) if (this._default[sname][packet.cmd]) return await this._default[sname][packet.cmd](packet)
if (this[packet.cmd]) return await this[packet.cmd](packet)
if (this._default.c[packet.cmd]) return await this._default.c[packet.cmd](packet)
// TODO after hook here
packet = {error:'no consumer processing function supplied for command',packet:packet}
} else {
packet = {error:'no command in packet',packet:packet}
error: function (packet) {
console.log('==============Packet ERROR==========')
console.log(packet.error )
reply: packet => {
console.log('==============Packet returned from socket==========')