diff --git a/examples/four-in-one.mjs b/examples/four-in-one.mjs index ddb29dc..4ad1181 100644 --- a/examples/four-in-one.mjs +++ b/examples/four-in-one.mjs @@ -2,14 +2,14 @@ import Base from '../src/base' const USOCKET = __dirname + '/sample.sock' -const basefuncs = { +const socketfuncs = { write: function(packet){ - packet.cmd='log' + packet.cmd='reply' packet.response='return of write command' return packet }, write2: function(packet){ - packet.cmd='log' + packet.cmd='reply' packet.response='return of write2 command' return packet } @@ -21,51 +21,52 @@ const basefuncs = { const delay = time => new Promise(res=>setTimeout(()=>res(),time)) // let app = new Base({com:'us,uc,ts,tc', id:'example', path: USOCKET, log:false}) - let app = new Base({sockets:'uc#c>n,us#s>n,tc#c>t,ts#s>t', id:'four-in-one'}) + let fio = new Base({sockets:'uc#c>n,us#s>n,tc#c>t,ts#s>t', id:'four-in-one'}) - app.amendPacketProcessing(basefuncs) + // app.amendPacketProcessing(basefuncs) - await app.init() + await fio.init() - app.amendPacketProcessing('tc',{ - log: packet => { - console.log('==============Packet returned to TCP consumer==========') + fio.amendNamedProcessing('tc',{ + reply: packet => { + console.log('==============Packet returned to nameed consumer tc==========') console.dir(packet) console.log('===========================') } }) - app.registerPacketProcessor('ts', - async function (packet) { - packet.test = 'this went through custom tcp socket processor' - if (!packet.cmd) return {error: 'no command in packet', packet: packet } - if (this.context) if (this.context[packet.cmd]) return await this.context[packet.cmd].bind(this.context)(packet) - if (this[packet.cmd]) return await this[packet.cmd](packet) - return {error: 'no socket processing function supplied for command', packet: packet } - }) + fio.reply = packet =>{ + console.log('==============Packet Displayed by Generic reply function') + console.dir(packet) + } + let packet = {} console.log('=============sending============') packet = {cmd:'echo', data:'some data to echo'} - await app.send('tc',packet) + console.log(packet) + await fio.send(packet) + + fio.amendSocketProcessing(socketfuncs) - app.registerPacketContext('ts',tcpfuncs) packet = {cmd:'write', data:'data to write'} - await app.send(packet) - await delay(500) - app.amendPacketContext( - {write: function(packet){ - packet.cmd='log' - packet.response='return of AMMEDED write command' - return packet - }} - ) - await delay(500) - packet = {cmd:'write', data:'2ND data to write'} - await app.sendIPC(packet) - packet = {cmd:'write2', data:'data to write'} - await app.sendTCP(packet) - // + console.log(packet) + await fio.send(packet) + // await app.send(packet) + // await delay(500) + // app.amendPacketContext( + // {write: function(packet){ + // packet.cmd='log' + // packet.response='return of AMMEDED write command' + // return packet + // }} + // ) + // await delay(500) + // packet = {cmd:'write', data:'2ND data to write'} + // await app.sendIPC(packet) + // packet = {cmd:'write2', data:'data to write'} + // await app.send(packet) + // // await delay(2000) process.kill(process.pid, 'SIGTERM') diff --git a/examples/package-lock.json b/examples/package-lock.json deleted file mode 100644 index f4addda..0000000 --- a/examples/package-lock.json +++ /dev/null @@ -1,190 +0,0 @@ -{ - "requires": true, - "lockfileVersion": 1, - "dependencies": { - "@uci/socket": { - "version": "0.1.0", - "resolved": "http://trantor:8082/@uci%2fsocket/-/socket-0.1.0.tgz", - "integrity": "sha512-2PoTfjLyYxIEd7zeoJyD0yUyQtICYPJBlmMrMP3U46FLtwWRntVu3fXKa0/gbbQYIwcWd24OK5CNvDeNdoa6cw==", - "requires": { - "better-try-catch": "0.6.2", - "bunyan": "1.8.12", - "death": "1.1.0", - "simple-node-logger": "0.93.33" - } - }, - "balanced-match": { - "version": "1.0.0", - "resolved": "http://trantor:8082/balanced-match/-/balanced-match-1.0.0.tgz", - "integrity": "sha1-ibTRmasr7kneFk6gK4nORi1xt2c=" - }, - "better-try-catch": { - "version": "0.6.2", - "resolved": "http://trantor:8082/better-try-catch/-/better-try-catch-0.6.2.tgz", - "integrity": "sha512-Ihghrm64n0sMiqZxA959Fism7/eDVgszfaLFpotZGGmDJc0EbpymhpkuA/OuTQ/BKMWZA6I68v3TVcnHBp7hXQ==", - "requires": { - "is-promise": "2.1.0" - } - }, - "brace-expansion": { - "version": "1.1.8", - "resolved": "http://trantor:8082/brace-expansion/-/brace-expansion-1.1.8.tgz", - "integrity": "sha1-wHshHHyVLsH479Uad+8NHTmQopI=", - "requires": { - "balanced-match": "1.0.0", - "concat-map": "0.0.1" - } - }, - "bunyan": { - "version": "1.8.12", - "resolved": "http://trantor:8082/bunyan/-/bunyan-1.8.12.tgz", - "integrity": "sha1-8VDw9nSKvdcq6uhPBEA74u8RN5c=", - "requires": { - "dtrace-provider": "0.8.6", - "moment": "2.20.1", - "mv": "2.1.1", - "safe-json-stringify": "1.0.4" - } - }, - "concat-map": { - "version": "0.0.1", - "resolved": "http://trantor:8082/concat-map/-/concat-map-0.0.1.tgz", - "integrity": "sha1-2Klr13/Wjfd5OnMDajug1UBdR3s=" - }, - "death": { - "version": "1.1.0", - "resolved": "http://trantor:8082/death/-/death-1.1.0.tgz", - "integrity": "sha1-AaqcQB7dknUFFEcLgmY5DGbGcxg=" - }, - "dtrace-provider": { - "version": "0.8.6", - "resolved": "http://trantor:8082/dtrace-provider/-/dtrace-provider-0.8.6.tgz", - "integrity": "sha1-QooiOv4DQl0s1tY0f99AxmkDVj0=", - "requires": { - "nan": "2.8.0" - } - }, - "glob": { - "version": "6.0.4", - "resolved": "http://trantor:8082/glob/-/glob-6.0.4.tgz", - "integrity": "sha1-DwiGD2oVUSey+t1PnOJLGqtuTSI=", - "requires": { - "inflight": "1.0.6", - "inherits": "2.0.3", - "minimatch": "3.0.4", - "once": "1.4.0", - "path-is-absolute": "1.0.1" - } - }, - "inflight": { - "version": "1.0.6", - "resolved": "http://trantor:8082/inflight/-/inflight-1.0.6.tgz", - "integrity": "sha1-Sb1jMdfQLQwJvJEKEHW6gWW1bfk=", - "requires": { - "once": "1.4.0", - "wrappy": "1.0.2" - } - }, - "inherits": { - "version": "2.0.3", - "resolved": "http://trantor:8082/inherits/-/inherits-2.0.3.tgz", - "integrity": "sha1-Yzwsg+PaQqUC9SRmAiSA9CCCYd4=" - }, - "is-promise": { - "version": "2.1.0", - "resolved": "http://trantor:8082/is-promise/-/is-promise-2.1.0.tgz", - "integrity": "sha1-eaKp7OfwlugPNtKy87wWwf9L8/o=" - }, - "lodash": { - "version": "4.17.4", - "resolved": "http://trantor:8082/lodash/-/lodash-4.17.4.tgz", - "integrity": "sha1-eCA6TRwyiuHYbcpkYONptX9AVa4=" - }, - "minimatch": { - "version": "3.0.4", - "resolved": "http://trantor:8082/minimatch/-/minimatch-3.0.4.tgz", - "integrity": "sha512-yJHVQEhyqPLUTgt9B83PXu6W3rx4MvvHvSUvToogpwoGDOUQ+yDrR0HRot+yOCdCO7u4hX3pWft6kWBBcqh0UA==", - "requires": { - "brace-expansion": "1.1.8" - } - }, - "minimist": { - "version": "0.0.8", - "resolved": "http://trantor:8082/minimist/-/minimist-0.0.8.tgz", - "integrity": "sha1-hX/Kv8M5fSYluCKCYuhqp6ARsF0=" - }, - "mkdirp": { - "version": "0.5.1", - "resolved": "http://trantor:8082/mkdirp/-/mkdirp-0.5.1.tgz", - "integrity": "sha1-MAV0OOrGz3+MR2fzhkjWaX11yQM=", - "requires": { - "minimist": "0.0.8" - } - }, - "moment": { - "version": "2.20.1", - "resolved": "http://trantor:8082/moment/-/moment-2.20.1.tgz", - "integrity": "sha512-Yh9y73JRljxW5QxN08Fner68eFLxM5ynNOAw2LbIB1YAGeQzZT8QFSUvkAz609Zf+IHhhaUxqZK8dG3W/+HEvg==" - }, - "mv": { - "version": "2.1.1", - "resolved": "http://trantor:8082/mv/-/mv-2.1.1.tgz", - "integrity": "sha1-rmzg1vbV4KT32JN5jQPB6pVZtqI=", - "requires": { - "mkdirp": "0.5.1", - "ncp": "2.0.0", - "rimraf": "2.4.5" - } - }, - "nan": { - "version": "2.8.0", - "resolved": "http://trantor:8082/nan/-/nan-2.8.0.tgz", - "integrity": "sha1-7XFfP+neArV6XmJS2QqWZ14fCFo=" - }, - "ncp": { - "version": "2.0.0", - "resolved": "http://trantor:8082/ncp/-/ncp-2.0.0.tgz", - "integrity": "sha1-GVoh1sRuNh0vsSgbo4uR6d9727M=" - }, - "once": { - "version": "1.4.0", - "resolved": "http://trantor:8082/once/-/once-1.4.0.tgz", - "integrity": "sha1-WDsap3WWHUsROsF9nFC6753Xa9E=", - "requires": { - "wrappy": "1.0.2" - } - }, - "path-is-absolute": { - "version": "1.0.1", - "resolved": "http://trantor:8082/path-is-absolute/-/path-is-absolute-1.0.1.tgz", - "integrity": "sha1-F0uSaHNVNP+8es5r9TpanhtcX18=" - }, - "rimraf": { - "version": "2.4.5", - "resolved": "http://trantor:8082/rimraf/-/rimraf-2.4.5.tgz", - "integrity": "sha1-7nEM5dk6j9uFb7Xqj/Di11k0sto=", - "requires": { - "glob": "6.0.4" - } - }, - "safe-json-stringify": { - "version": "1.0.4", - "resolved": "http://trantor:8082/safe-json-stringify/-/safe-json-stringify-1.0.4.tgz", - "integrity": "sha1-gaCY9Efku8P/MxKiQ1IbwGDvWRE=" - }, - "simple-node-logger": { - "version": "0.93.33", - "resolved": "http://trantor:8082/simple-node-logger/-/simple-node-logger-0.93.33.tgz", - "integrity": "sha512-ppFuaDeacR1Vu+cP17kwOWQsx5C1vbIRa54qm5WgZBzQ5eBue/GWsDd4sr++ITnWZIoIOvjx5kEm5AhP7IqU+Q==", - "requires": { - "lodash": "4.17.4", - "moment": "2.20.1" - } - }, - "wrappy": { - "version": "1.0.2", - "resolved": "http://trantor:8082/wrappy/-/wrappy-1.0.2.tgz", - "integrity": "sha1-tSQ9jz7BqjXxNkYFvA0QNuMKtp8=" - } - } -} diff --git a/package.json b/package.json index 1117022..189ccaf 100644 --- a/package.json +++ b/package.json @@ -4,6 +4,7 @@ "description": "Mutli Level/Transport Message/Event Classes", "main": "src/base", "scripts": { + "deve": "./node_modules/.bin/nodemon -r @std/esm -e mjs examples/four-in-one", "testw": "mocha -r @std/esm test/*.test.mjs --watch --recurse --watch-extensions mjs", "test": "mocha -r @std/esm test/*.test.mjs", "testci": "istanbul cover ./node_modules/.bin/_mocha --report lcovonly -- -R spec --recursive && codecov || true" diff --git a/readme.md b/readme.md index b69021a..1c09de2 100644 --- a/readme.md +++ b/readme.md @@ -8,3 +8,16 @@ [![codecov](https://img.shields.io/codecov/c/github/uCOMmandIt/uci-pkg-template/master.svg)](https://codecov.io/gh/uCOMmandIt/uci-pkg-template) Base Class extended for all UCI Classes + + +changed sockets options now can add as many as you want of either type and transport +improved socket packet processing call now binds this and name of socket that received packet for processing +single common processor for all packets incoming (either consumer or socket) +default processing splits into socket or consumer +default processing has calls looking for packet cmd functions before returning error packet +1. Added Namespace // TODO +2. A Pariticular Sockect +3. Root of instance +4. The socket type and transport //TODO +5. The socket type +there are helper functions for adding cmd functions to any of these save the root instance we you can add directly. diff --git a/src/base.mjs b/src/base.mjs index c2aeddf..8f815f5 100644 --- a/src/base.mjs +++ b/src/base.mjs @@ -1,157 +1,148 @@ // import { Socket, Consumer } from '@uci/socket' import UCISocket from '../../uci-socket/src' -import packet from './packet.mjs' +import defaultProcessing from './default-processing.mjs' import EventEmitter from 'events' -const USOCKET = __dirname + '/unix.sock' - export default class Base extends EventEmitter { constructor(opts={}) { super() - opts.path = opts.path || USOCKET - this.id = opts.id || opts.name || 'uci:'+ Math.random()*100 + this.id = opts.id || opts.name || 'uci-base:'+ new Date().getTime() this.desc = opts.desc // additional details for humans this.socket={} - opts.sockets.split(/[,|\s]+/).forEach( opts_socket => { + this._default = defaultProcessing + opts.sockets.split(/[,|\s]+/).forEach( socketStr => { let socket = {} - opts_socket.split(/[>#]+/).map(function(prop,index) { + socketStr.split(/[>#]+/).map(function(prop,index) { socket[SOCKET_INFO_KEYS[index]] = prop }) - let args =[] if (!opts[socket.name]) opts[socket.name] = {} - let path = opts[socket.name].path || opts.path - if (socket.transport ==='n') args.push(path) - opts.name = this.id +':'+ socket.name - args.push(opts) - this.socket[socket.name] = new UCISocket[TRANSLATIONS[socket.type]](...args) - Object.assign(this.socket[socket.name],socket) - // this will be removed when moving to emit - this.socket[socket.name].packet = Object.assign({},packet[TRANSLATIONS[socket.type]]) - console.log(socket.name,'=========\n', this.socket[socket.name]) + if (socket.transport ==='n') opts[socket.name].np = true + opts[socket.name].id = this.id +':'+ socket.name + // console.log(TRANSLATIONS[socket.type]) + this.socket[socket.name] = new UCISocket[TRANSLATIONS[socket.type]](opts[socket.name]) + // console.log(socket.name, this.socket[socket.name].send) + Object.assign(this.socket[socket.name],socket) // copy socket info props to new socket + this.socket[socket.name]._packetProcess = this._packetProcess.bind(this,socket.name) }) - - // opts.id = this.id +':'+sock - // if (!opts[sock]) opts[sock] = {} - // switch (sock) { - // case - // 'us': - // opts.us.path = opts.us.path || opts.path - // - // this.socket[sock].info = {transport:'np', id:sock, } - // this.socket[sock].packet._process = socketEmit.bind(this.socket[sock],this.emit) - // break - // case 'uc': - // opts.uc.path = opts.uc.path || opts.path - // this.socket[sock] = new Consumer(opts.uc.path,opts) - // this.socket[sock].packet = Object.assign({},packet.consumer) - // break - // case 'ts': - // this.socket[sock]= new Socket(opts.ts, opts) - // this.socket[sock].packet = Object.assign({},packet.socket) - // break - // case 'tc': - // this.socket[sock]= new Consumer(opts.tc,opts) - // this.socket[sock].packet = Object.assign({},packet.consumer) - // } - // }) - - } // end constructor - async init (context) { + async init () { let sockets = [] - context = context || this // additional context is Base instance (or callsite) if none supplied - this.registerPacketContext(context) - for(let type of Object.keys(this.socket)){ - if (type.indexOf('s')!==-1) { - sockets.push(this.socket[type].create()) + for(let name of Object.keys(this.socket)){ + if (this.socket[name].type ==='s') { + sockets.push(this.socket[name].create()) // setTimeout(this.socket[type].create,200) // test for consumer retry for delayed socket creation } else { - sockets.push(this.socket[type].connect()) + sockets.push(this.socket[name].connect()) } } await Promise.all(sockets) - this.on('packet', packet => { - console.log('==========>',packet.socket) - }) } // init - registerPacketContext(type, context) { - if (typeof type === 'string') { - this.socket[type].packet.context = context - } else { - context=type - for(let type of Object.keys(this.socket)){ - this.socket[type].packet.context = context - } - } - } - - amendPacketContext(type, context) { - if (typeof type === 'string') { - Object.assign(this.socket[type].packet.context,context) - } else { - context=type - for(let type of Object.keys(this.socket)){ - Object.assign(this.socket[type].packet.context,context) - } - } - } - - amendPacketProcessing(type,funcs) { - if (typeof type === 'string') { - this.socket[type].packet = Object.assign({},this.socket[type].packet,funcs) - } else { - funcs=type - for(let type of Object.keys(this.socket)){ - this.socket[type].packet = Object.assign(this.socket[type].packet,funcs) - } - } - } - - registerPacketProcessor(type,func) { - if (typeof type === 'string') { - this.socket[type].packet._process = func - } else { - func = type - for(let type of Object.keys(this.socket)){ - this.socket[type].registerPacketProcessor(func) - } - } - } - - async sendTCP(packet) { - await this.socket.tc.send(packet) - } - - async sendIPC(packet) { - await this.socket.uc.send(packet) - } - - async send (transport, packet) { - if (typeof transport === 'string') { - if (this.socket[transport]) await this.socket.uc.send(packet) + async send (name, packet) { + if (typeof name === 'string') { + if (this.socket[name]) await this.socket[name].send(packet) } else { - packet = transport - for(let type of Object.keys(this.socket)){ - if (type.indexOf('c')!==-1) { - await this.socket[type].send(packet) + packet = name + for(let name of Object.keys(this.socket)){ + if (this.socket[name].type ==='c') { + await this.socket[name].send(packet) } } } - this.emit('packet', packet) // intra process, need to set a 'packet' listener to process it + this.emit('packet', packet) // emits on instance for instance use } + // registerPacketProcessor(name, func) { + // if (typeof name === 'string') { + // if (this.socket[name]) await this.socket[name]._packetProcess = func + // } + // else { + // func = name + // for(let name of Object.keys(this.socket)){ + // this.socket[name].send(packet) + // } + // } + // } + // } + + // registerConsumerProcessor(func) { + // this._packetProces = func + // } + // registerSocketProcessor(func) { + // this._packetProces = func + // } + + getSocket(name) {return this.socket[name]} + sendSocket(name) {} + sendTransport(trans) {} + + amendConsumerProcessing(funcs,trans) { + Object.assign(this._default.c[trans],funcs) + } + + amendSocketProcessing(funcs,trans) { + if (trans) { + if (!this._default.s[trans]) this._default.s[trans] ={} + Object.assign(this._default.s[trans],funcs) + } + Object.assign(this._default.s,funcs) + } + + amendNamedProcessing(name,funcs) { + if(!this._default[name]) this._default[name] ={} + Object.assign(this._default[name],funcs) + } + + beforeHook (type,funcs){} + afterHook(type,funcs){} + + addProcessing(name,type) {} + + consumerProcessor(func) { + this._default.c._process = func + } + + socketProcessor(func) { + this._default.s._process = func + } + + packetProcessor(func) { + this._packetProcess = func + } + + /* + * + * Private Methods + * + */ + + _transport(name) {return this.socket[name].transport} + _type(name) {return this.socket[name].type} + + // default packet processor for all sockets + async _packetProcess (sname,packet) { + return this._default[this.socket[sname].type]._process.bind(this)(packet,sname) + } + + } // end class const SOCKET_INFO_KEYS = ['name','type','transport'] const TRANSLATIONS = {t:'TCP',s:'Socket',c:'Consumer',n:'Named Pipe',} -const socketEmit = function (emit,packet) { - packet.socket = this.info - emit('packet', packet) -} + + +// this.on('packet', packet => { +// console.log('incoming packet ==========>',packet) +// app[TRANSLATIONS[packet._socket.type]].bind(app)(packet) +// }) +// +// // console.log('IN EMIT FUNCTION',socket_name,packet) +// // let s = this.socket[socket_name] +// // packet._socket = {name:s.name,type:s.type,transport:s.transport} +// // this.emit('packet', packet) diff --git a/src/default-processing.mjs b/src/default-processing.mjs new file mode 100644 index 0000000..f64158a --- /dev/null +++ b/src/default-processing.mjs @@ -0,0 +1,56 @@ + + +// this._default refers to this module/hash + +export default { + s:{ // s is for socket/server + _process: async function (packet,sname) { + if (!packet.cmd) return {error: 'no command in packet', packet: packet } + // TODO before hook here + // TODO Add namespce + if (this._default[sname]) if (this._default[sname][packet.cmd]) return await this._default[sname][packet.cmd](packet) + if (this[packet.cmd]) return await this[packet.cmd](packet) + // TODO add socket transport + if (this._default.s[packet.cmd]) return await this._default.s[packet.cmd](packet) + // TODO after hook here + return {error: 'no socket processing function supplied for command', packet: packet } + }, + echo: packet => { + packet.processed = true + packet.cmd = 'reply' + packet.info = 'default socket echo' + return packet + } + }, + + c: { // c is for consumer/client + _process: async function (packet,sname) { + if (packet.error) this._default.c.error(packet) + if (packet.cmd) { + // TODO before hook here. + // TODO Add namespce + if (this._default[sname]) if (this._default[sname][packet.cmd]) return await this._default[sname][packet.cmd](packet) + // if (this.app[sname]) if (this.app[sname][packet.cmd]) return await this.app[sname][packet.cmd](packet) + if (this[packet.cmd]) return await this[packet.cmd](packet) + if (this._default.c[packet.cmd]) return await this._default.c[packet.cmd](packet) + // TODO after hook here + packet = {error:'no consumer processing function supplied for command',packet:packet} + this._default.c.error(packet) + } else { + packet = {error:'no command in packet',packet:packet} + this._default.c.error(packet) + } + }, + error: function (packet) { + console.log('==============Packet ERROR==========') + console.log(packet.error ) + console.dir(packet.packet) + console.log('===========================') + }, + reply: packet => { + console.log('==============Packet returned from socket==========') + console.dir(packet) + console.log('===========================') + } + } +} diff --git a/src/packet.mjs b/src/packet.mjs deleted file mode 100644 index b4ee7e7..0000000 --- a/src/packet.mjs +++ /dev/null @@ -1,45 +0,0 @@ - - -export default { - - Socket:{ - _process: async function (packet) { - if (!packet.cmd) return {error: 'no command in packet', packet: packet } - if (this.context) if (this.context[packet.cmd]) return await this.context[packet.cmd].bind(this.context)(packet) - if (this[packet.cmd]) return await this[packet.cmd](packet) - return {error: 'no socket processing function supplied for command', packet: packet } - }, - echo: packet => { - packet.processed = true - packet.cmd = 'log' - packet.info = 'default socket echo' - return packet - } - }, - - Consumer: { - _process: async function (packet) { - if (packet.error) return this.error(packet) - if (packet.cmd) { - if (this.context) if (this.context[packet.cmd]) return await this.context[packet.cmd].bind(this.context)(packet) - if (this[packet.cmd]) return await this[packet.cmd](packet) - packet = {error:'no consumer processing function supplied for command',packet:packet} - return this.error(packet) - } else { - packet = {error:'no command in packet',packet:packet} - return this.error(packet) - } - }, - error: function (packet) { - console.log('==============Packet ERROR==========') - console.log(packet.error ) - console.dir(packet.packet) - console.log('===========================') - }, - log: packet => { - console.log('==============Packet returned from socket==========') - console.dir(packet) - console.log('===========================') - } - } -}