diff --git a/examples/four-in-one.js b/examples/four-in-one.js index 743780e..4e75713 100644 --- a/examples/four-in-one.js +++ b/examples/four-in-one.js @@ -50,9 +50,9 @@ const tcpfuncs = { ; (async () => { - let fio = new Base({sockets:'uc#c>n,us#s>n,tc#c>t,ts#s>t,tc2#c>t', id:'four-in-one'}) + let fio = new Base({sockets:'uc#c>n,us#s>n,tc#c>t,ts#s>t,mqttp#c>m,mqtts#s>m,webs#s>w', mqtts:{broker:{host:'test', port: 5555},topic:'blah'}, id:'four-in-one'}) // let fio = new Base({sockets:'uc#c>n,us#s>n', id:'four-in-one'}) - await fio.init() + // await fio.init() fio.s = socketfuncs fio.st = tcpfuncs fio.ct = {reply: packet =>{ @@ -85,24 +85,24 @@ const tcpfuncs = { } } - let packet = {} - console.log('=============sending============') - // packet = {cmd:'echo', data:'some data to echo'} + // let packet = {} + // console.log('=============sending============') + // // packet = {cmd:'echo', data:'some data to echo'} + // // console.log(packet) + // // await fio.send(packet,'uc') + // packet = {cmd:'write:happy', data:'My name is Zoe'} // console.log(packet) - // await fio.send(packet,'uc') - packet = {cmd:'write:happy', data:'My name is Zoe'} - console.log(packet) - //console.log(await fio.send(packet)) - console.log(fio.getPacketByName('uc',await fio.send(packet)).response) - packet = {cmd:'write:sad', data:'data to write'} - console.log(packet) - await fio.send(packet) - packet = {cmd:'write:sad', data:'sent only via tcp'} - console.log(packet) - console.log(fio.getPacketByName('tc2',await fio.sendTCP(packet))) - packet = {cmd:'good:bad'} - console.log(packet) - await fio.send(packet) + // //console.log(await fio.send(packet)) + // console.log(fio.getPacketByName('uc',await fio.send(packet)).response) + // packet = {cmd:'write:sad', data:'data to write'} + // console.log(packet) + // await fio.send(packet) + // packet = {cmd:'write:sad', data:'sent only via tcp'} + // console.log(packet) + // console.log(fio.getPacketByName('tc2',await fio.sendTCP(packet))) + // packet = {cmd:'good:bad'} + // console.log(packet) + // await fio.send(packet) process.kill(process.pid, 'SIGTERM') diff --git a/src/base.js b/src/base.js index bbf624a..f826d62 100644 --- a/src/base.js +++ b/src/base.js @@ -1,6 +1,7 @@ import UCISocket from '@uci/socket' import MQTT from '@uci/mqtt' -// import MQTT from '../../uci-mqtt/src/client' +import WebSocket from '@uci/socket' + import EventEmitter from 'events' import { bindFuncs } from '@uci/utils/src/function' @@ -15,26 +16,33 @@ export default class Base extends EventEmitter { this.id = opts.id || opts.name || 'uci-base:'+ new Date().getTime() log = logger({name:'base',id:this.id}) this.desc = opts.desc // additional details for humans - this.socket={} + this.started = false // flag to know when instance has been initialized + this.socket={} // holds all the various communication sockets this._processors = { _default: processor } this._defaultCmds = commands this._namespaces = namespaces + // sockets: option + // of this form '#>' if(opts.sockets) { opts.sockets.split(/[,|\s]+/).forEach( socketStr => { let socket = {} socketStr.split(/[>#]+/).map(function(prop,index) { socket[SOCKET_INFO_KEYS[index]] = prop }) - if (!opts[socket.name]) opts[socket.name] = {} - if (socket.transport ==='n') opts[socket.name].np = true - opts[socket.name].id = this.id +':'+ socket.name - // console.log(TRANSLATIONS[socket.type]) - this.socket[socket.name] = new UCISocket[TRANSLATE[socket.type]](opts[socket.name]) - // console.log(socket.name, this.socket[socket.name].send) - Object.assign(this.socket[socket.name],socket) // copy socket info props to new socket - this.socket[socket.name]._packetProcess = this._packetProcess.bind(this,socket.name) + if (socket.type === 's') this.addSocket(socket.name,socket.transport,opts[socket.name]) + if (socket.type === 'c') this.addConsumer(socket.name,socket.transport,opts[socket.name]) + + // if (!opts[socket.name]) opts[socket.name] = {} + // if (socket.transport ==='n') opts[socket.name].np = true + // opts[socket.name].id = this.id +':'+ socket.name + // // console.log(TRANSLATIONS[socket.type]) + // this.socket[socket.name] = new UCISocket[TRANSLATE[socket.type]](opts[socket.name]) + // // console.log(socket.name, this.socket[socket.name].send) + // Object.assign(this.socket[socket.name],socket) // copy socket info props to new socket + // this.socket[socket.name]._packetProcess = this._packetProcess.bind(this,socket.name) + }) - } else log.warn({opts:opts},'no sockets requested for creations -- using only standard emitter') + } else log.warn({opts:opts},'no sockets requested for creation -- using only standard emitter') if (opts.mqtt) { opts.mqtt.topcis = (opts.mqtt.topcis ? opts.mqtt.topics + ',' : '') + opts.id this.socket.mqtt = new MQTT(opts.mqtt) @@ -45,6 +53,8 @@ export default class Base extends EventEmitter { } // end constructor + // Class Methods + async init () { let sockets = [] for(let name of Object.keys(this.socket)){ @@ -57,13 +67,29 @@ export default class Base extends EventEmitter { } } // TODO maybe throw if one fails + this.started = true return Promise.all(sockets).then(() => {return 'ready'}).catch((err) => {return err}) } // init - async end(name) {} // TODO end all or named sockets + async addSocket (name, transport, options) { + // type: (np) named pipe/ unix socket, (tcp) tcp socket, (mqtt) mqtt subscribe, (web) websocket + console.log('Socket=> ', name, TRANSLATE[transport],options) + } - async addSocket(options) {} //TODO add ability to add another socket at runtime and initialize + async removeSocket (name) { + // type: (np) named pipe/ unix socket, (tcp) tcp socket, (mqtt) mqtt subscribe, (web) websocket + } + + + async addConsumer (name, transport, options) { + console.log('Consumer=> ', name,TRANSLATE[transport],options) + // type: (np) named pipe/ unix socket, (tcp) tcp socket, (mqtt) mqtt subscribe + } + + async removeConsumer (name) { + // type: (np) named pipe/ unix socket, (tcp) tcp socket, (mqtt) mqtt subscribe + } async send (name,packet) { if (typeof name !== 'string') { @@ -244,4 +270,4 @@ export default class Base extends EventEmitter { } // end class const SOCKET_INFO_KEYS = ['name','type','transport'] -const TRANSLATE= {n:'Named Pipe',t:'TCP',s:'Socket',c:'Consumer'} +const TRANSLATE= {n:'Named Pipe',t:'TCP',s:'Socket',c:'Consumer', m:'MQTT', w:'Web Socket'} diff --git a/src/processing.mjs.org b/src/processing.mjs.org deleted file mode 100644 index 37523e9..0000000 --- a/src/processing.mjs.org +++ /dev/null @@ -1,55 +0,0 @@ - - -// this._processing refers to this module/hash - -export default { - s:{ // s is for socket/server - _process: async function (packet,sname) { - if (!packet.cmd) return {error: 'no command in packet', packet: packet } - // TODO before hook here - // TODO Add namespce - if (this._default[sname]) if (this._default[sname][packet.cmd]) return await this._default[sname][packet.cmd](packet) - if (this[packet.cmd]) return await this[packet.cmd](packet) - // TODO add socket transport - if (this._default.s[packet.cmd]) return await this._default.s[packet.cmd](packet) - // TODO after hook here - return {error: 'no socket processing function supplied for command', packet: packet } - }, - echo: packet => { - packet.processed = true - packet.cmd = 'reply' - packet.info = 'default socket echo' - return packet - } - }, - - c: { // c is for consumer/client - _process: async function (packet,sname) { - if (packet.error) this._default.c.error(packet) - if (packet.cmd) { - // TODO before hook here. - // TODO Add namespce - if (this._default[sname]) if (this._default[sname][packet.cmd]) return await this._default[sname][packet.cmd](packet) - if (this[packet.cmd]) return await this[packet.cmd](packet) - if (this._default.c[packet.cmd]) return await this._default.c[packet.cmd](packet) - // TODO after hook here - packet = {error:'no consumer processing function supplied for command',packet:packet} - this._default.c.error(packet) - } else { - packet = {error:'no command in packet',packet:packet} - this._default.c.error(packet) - } - }, - error: function (packet) { - console.log('==============Packet ERROR==========') - console.log(packet.error ) - console.dir(packet.packet) - console.log('===========================') - }, - reply: packet => { - console.log('==============Packet returned from socket==========') - console.dir(packet) - console.log('===========================') - } - } -}