changed sockets options now can add as many as you want of either type and transport

improved socket packet processing call now binds this and name of socket that received packet for processing
single common processor for all packets incoming (either consumer or socket)
default processing splits into socket or consumer
default processing has calls looking for packet cmd functions before returning error packet
1. Added Namespace // TODO
2. A Pariticular Sockect
3. Root of instance
4. The socket type and transport //TODO
5. The socket type
there are helper functions for adding cmd functions to any of these save the root instance we you can add directly.
This commit is contained in:
David Kebler 2018-02-04 14:18:21 -08:00
parent 8c61ec854d
commit e5059368a9
7 changed files with 214 additions and 387 deletions

View file

@ -2,14 +2,14 @@ import Base from '../src/base'
const USOCKET = __dirname + '/sample.sock'
const basefuncs = {
const socketfuncs = {
write: function(packet){
packet.cmd='log'
packet.cmd='reply'
packet.response='return of write command'
return packet
},
write2: function(packet){
packet.cmd='log'
packet.cmd='reply'
packet.response='return of write2 command'
return packet
}
@ -21,51 +21,52 @@ const basefuncs = {
const delay = time => new Promise(res=>setTimeout(()=>res(),time))
// let app = new Base({com:'us,uc,ts,tc', id:'example', path: USOCKET, log:false})
let app = new Base({sockets:'uc#c>n,us#s>n,tc#c>t,ts#s>t', id:'four-in-one'})
let fio = new Base({sockets:'uc#c>n,us#s>n,tc#c>t,ts#s>t', id:'four-in-one'})
app.amendPacketProcessing(basefuncs)
// app.amendPacketProcessing(basefuncs)
await app.init()
await fio.init()
app.amendPacketProcessing('tc',{
log: packet => {
console.log('==============Packet returned to TCP consumer==========')
fio.amendNamedProcessing('tc',{
reply: packet => {
console.log('==============Packet returned to nameed consumer tc==========')
console.dir(packet)
console.log('===========================')
}
})
app.registerPacketProcessor('ts',
async function (packet) {
packet.test = 'this went through custom tcp socket processor'
if (!packet.cmd) return {error: 'no command in packet', packet: packet }
if (this.context) if (this.context[packet.cmd]) return await this.context[packet.cmd].bind(this.context)(packet)
if (this[packet.cmd]) return await this[packet.cmd](packet)
return {error: 'no socket processing function supplied for command', packet: packet }
})
fio.reply = packet =>{
console.log('==============Packet Displayed by Generic reply function')
console.dir(packet)
}
let packet = {}
console.log('=============sending============')
packet = {cmd:'echo', data:'some data to echo'}
await app.send('tc',packet)
console.log(packet)
await fio.send(packet)
fio.amendSocketProcessing(socketfuncs)
app.registerPacketContext('ts',tcpfuncs)
packet = {cmd:'write', data:'data to write'}
await app.send(packet)
await delay(500)
app.amendPacketContext(
{write: function(packet){
packet.cmd='log'
packet.response='return of AMMEDED write command'
return packet
}}
)
await delay(500)
packet = {cmd:'write', data:'2ND data to write'}
await app.sendIPC(packet)
packet = {cmd:'write2', data:'data to write'}
await app.sendTCP(packet)
//
console.log(packet)
await fio.send(packet)
// await app.send(packet)
// await delay(500)
// app.amendPacketContext(
// {write: function(packet){
// packet.cmd='log'
// packet.response='return of AMMEDED write command'
// return packet
// }}
// )
// await delay(500)
// packet = {cmd:'write', data:'2ND data to write'}
// await app.sendIPC(packet)
// packet = {cmd:'write2', data:'data to write'}
// await app.send(packet)
// //
await delay(2000)
process.kill(process.pid, 'SIGTERM')

View file

@ -1,190 +0,0 @@
{
"requires": true,
"lockfileVersion": 1,
"dependencies": {
"@uci/socket": {
"version": "0.1.0",
"resolved": "http://trantor:8082/@uci%2fsocket/-/socket-0.1.0.tgz",
"integrity": "sha512-2PoTfjLyYxIEd7zeoJyD0yUyQtICYPJBlmMrMP3U46FLtwWRntVu3fXKa0/gbbQYIwcWd24OK5CNvDeNdoa6cw==",
"requires": {
"better-try-catch": "0.6.2",
"bunyan": "1.8.12",
"death": "1.1.0",
"simple-node-logger": "0.93.33"
}
},
"balanced-match": {
"version": "1.0.0",
"resolved": "http://trantor:8082/balanced-match/-/balanced-match-1.0.0.tgz",
"integrity": "sha1-ibTRmasr7kneFk6gK4nORi1xt2c="
},
"better-try-catch": {
"version": "0.6.2",
"resolved": "http://trantor:8082/better-try-catch/-/better-try-catch-0.6.2.tgz",
"integrity": "sha512-Ihghrm64n0sMiqZxA959Fism7/eDVgszfaLFpotZGGmDJc0EbpymhpkuA/OuTQ/BKMWZA6I68v3TVcnHBp7hXQ==",
"requires": {
"is-promise": "2.1.0"
}
},
"brace-expansion": {
"version": "1.1.8",
"resolved": "http://trantor:8082/brace-expansion/-/brace-expansion-1.1.8.tgz",
"integrity": "sha1-wHshHHyVLsH479Uad+8NHTmQopI=",
"requires": {
"balanced-match": "1.0.0",
"concat-map": "0.0.1"
}
},
"bunyan": {
"version": "1.8.12",
"resolved": "http://trantor:8082/bunyan/-/bunyan-1.8.12.tgz",
"integrity": "sha1-8VDw9nSKvdcq6uhPBEA74u8RN5c=",
"requires": {
"dtrace-provider": "0.8.6",
"moment": "2.20.1",
"mv": "2.1.1",
"safe-json-stringify": "1.0.4"
}
},
"concat-map": {
"version": "0.0.1",
"resolved": "http://trantor:8082/concat-map/-/concat-map-0.0.1.tgz",
"integrity": "sha1-2Klr13/Wjfd5OnMDajug1UBdR3s="
},
"death": {
"version": "1.1.0",
"resolved": "http://trantor:8082/death/-/death-1.1.0.tgz",
"integrity": "sha1-AaqcQB7dknUFFEcLgmY5DGbGcxg="
},
"dtrace-provider": {
"version": "0.8.6",
"resolved": "http://trantor:8082/dtrace-provider/-/dtrace-provider-0.8.6.tgz",
"integrity": "sha1-QooiOv4DQl0s1tY0f99AxmkDVj0=",
"requires": {
"nan": "2.8.0"
}
},
"glob": {
"version": "6.0.4",
"resolved": "http://trantor:8082/glob/-/glob-6.0.4.tgz",
"integrity": "sha1-DwiGD2oVUSey+t1PnOJLGqtuTSI=",
"requires": {
"inflight": "1.0.6",
"inherits": "2.0.3",
"minimatch": "3.0.4",
"once": "1.4.0",
"path-is-absolute": "1.0.1"
}
},
"inflight": {
"version": "1.0.6",
"resolved": "http://trantor:8082/inflight/-/inflight-1.0.6.tgz",
"integrity": "sha1-Sb1jMdfQLQwJvJEKEHW6gWW1bfk=",
"requires": {
"once": "1.4.0",
"wrappy": "1.0.2"
}
},
"inherits": {
"version": "2.0.3",
"resolved": "http://trantor:8082/inherits/-/inherits-2.0.3.tgz",
"integrity": "sha1-Yzwsg+PaQqUC9SRmAiSA9CCCYd4="
},
"is-promise": {
"version": "2.1.0",
"resolved": "http://trantor:8082/is-promise/-/is-promise-2.1.0.tgz",
"integrity": "sha1-eaKp7OfwlugPNtKy87wWwf9L8/o="
},
"lodash": {
"version": "4.17.4",
"resolved": "http://trantor:8082/lodash/-/lodash-4.17.4.tgz",
"integrity": "sha1-eCA6TRwyiuHYbcpkYONptX9AVa4="
},
"minimatch": {
"version": "3.0.4",
"resolved": "http://trantor:8082/minimatch/-/minimatch-3.0.4.tgz",
"integrity": "sha512-yJHVQEhyqPLUTgt9B83PXu6W3rx4MvvHvSUvToogpwoGDOUQ+yDrR0HRot+yOCdCO7u4hX3pWft6kWBBcqh0UA==",
"requires": {
"brace-expansion": "1.1.8"
}
},
"minimist": {
"version": "0.0.8",
"resolved": "http://trantor:8082/minimist/-/minimist-0.0.8.tgz",
"integrity": "sha1-hX/Kv8M5fSYluCKCYuhqp6ARsF0="
},
"mkdirp": {
"version": "0.5.1",
"resolved": "http://trantor:8082/mkdirp/-/mkdirp-0.5.1.tgz",
"integrity": "sha1-MAV0OOrGz3+MR2fzhkjWaX11yQM=",
"requires": {
"minimist": "0.0.8"
}
},
"moment": {
"version": "2.20.1",
"resolved": "http://trantor:8082/moment/-/moment-2.20.1.tgz",
"integrity": "sha512-Yh9y73JRljxW5QxN08Fner68eFLxM5ynNOAw2LbIB1YAGeQzZT8QFSUvkAz609Zf+IHhhaUxqZK8dG3W/+HEvg=="
},
"mv": {
"version": "2.1.1",
"resolved": "http://trantor:8082/mv/-/mv-2.1.1.tgz",
"integrity": "sha1-rmzg1vbV4KT32JN5jQPB6pVZtqI=",
"requires": {
"mkdirp": "0.5.1",
"ncp": "2.0.0",
"rimraf": "2.4.5"
}
},
"nan": {
"version": "2.8.0",
"resolved": "http://trantor:8082/nan/-/nan-2.8.0.tgz",
"integrity": "sha1-7XFfP+neArV6XmJS2QqWZ14fCFo="
},
"ncp": {
"version": "2.0.0",
"resolved": "http://trantor:8082/ncp/-/ncp-2.0.0.tgz",
"integrity": "sha1-GVoh1sRuNh0vsSgbo4uR6d9727M="
},
"once": {
"version": "1.4.0",
"resolved": "http://trantor:8082/once/-/once-1.4.0.tgz",
"integrity": "sha1-WDsap3WWHUsROsF9nFC6753Xa9E=",
"requires": {
"wrappy": "1.0.2"
}
},
"path-is-absolute": {
"version": "1.0.1",
"resolved": "http://trantor:8082/path-is-absolute/-/path-is-absolute-1.0.1.tgz",
"integrity": "sha1-F0uSaHNVNP+8es5r9TpanhtcX18="
},
"rimraf": {
"version": "2.4.5",
"resolved": "http://trantor:8082/rimraf/-/rimraf-2.4.5.tgz",
"integrity": "sha1-7nEM5dk6j9uFb7Xqj/Di11k0sto=",
"requires": {
"glob": "6.0.4"
}
},
"safe-json-stringify": {
"version": "1.0.4",
"resolved": "http://trantor:8082/safe-json-stringify/-/safe-json-stringify-1.0.4.tgz",
"integrity": "sha1-gaCY9Efku8P/MxKiQ1IbwGDvWRE="
},
"simple-node-logger": {
"version": "0.93.33",
"resolved": "http://trantor:8082/simple-node-logger/-/simple-node-logger-0.93.33.tgz",
"integrity": "sha512-ppFuaDeacR1Vu+cP17kwOWQsx5C1vbIRa54qm5WgZBzQ5eBue/GWsDd4sr++ITnWZIoIOvjx5kEm5AhP7IqU+Q==",
"requires": {
"lodash": "4.17.4",
"moment": "2.20.1"
}
},
"wrappy": {
"version": "1.0.2",
"resolved": "http://trantor:8082/wrappy/-/wrappy-1.0.2.tgz",
"integrity": "sha1-tSQ9jz7BqjXxNkYFvA0QNuMKtp8="
}
}
}

View file

@ -4,6 +4,7 @@
"description": "Mutli Level/Transport Message/Event Classes",
"main": "src/base",
"scripts": {
"deve": "./node_modules/.bin/nodemon -r @std/esm -e mjs examples/four-in-one",
"testw": "mocha -r @std/esm test/*.test.mjs --watch --recurse --watch-extensions mjs",
"test": "mocha -r @std/esm test/*.test.mjs",
"testci": "istanbul cover ./node_modules/.bin/_mocha --report lcovonly -- -R spec --recursive && codecov || true"

View file

@ -8,3 +8,16 @@
[![codecov](https://img.shields.io/codecov/c/github/uCOMmandIt/uci-pkg-template/master.svg)](https://codecov.io/gh/uCOMmandIt/uci-pkg-template)
Base Class extended for all UCI Classes
changed sockets options now can add as many as you want of either type and transport
improved socket packet processing call now binds this and name of socket that received packet for processing
single common processor for all packets incoming (either consumer or socket)
default processing splits into socket or consumer
default processing has calls looking for packet cmd functions before returning error packet
1. Added Namespace // TODO
2. A Pariticular Sockect
3. Root of instance
4. The socket type and transport //TODO
5. The socket type
there are helper functions for adding cmd functions to any of these save the root instance we you can add directly.

View file

@ -1,157 +1,148 @@
// import { Socket, Consumer } from '@uci/socket'
import UCISocket from '../../uci-socket/src'
import packet from './packet.mjs'
import defaultProcessing from './default-processing.mjs'
import EventEmitter from 'events'
const USOCKET = __dirname + '/unix.sock'
export default class Base extends EventEmitter {
constructor(opts={}) {
super()
opts.path = opts.path || USOCKET
this.id = opts.id || opts.name || 'uci:'+ Math.random()*100
this.id = opts.id || opts.name || 'uci-base:'+ new Date().getTime()
this.desc = opts.desc // additional details for humans
this.socket={}
opts.sockets.split(/[,|\s]+/).forEach( opts_socket => {
this._default = defaultProcessing
opts.sockets.split(/[,|\s]+/).forEach( socketStr => {
let socket = {}
opts_socket.split(/[>#]+/).map(function(prop,index) {
socketStr.split(/[>#]+/).map(function(prop,index) {
socket[SOCKET_INFO_KEYS[index]] = prop
})
let args =[]
if (!opts[socket.name]) opts[socket.name] = {}
let path = opts[socket.name].path || opts.path
if (socket.transport ==='n') args.push(path)
opts.name = this.id +':'+ socket.name
args.push(opts)
this.socket[socket.name] = new UCISocket[TRANSLATIONS[socket.type]](...args)
Object.assign(this.socket[socket.name],socket)
// this will be removed when moving to emit
this.socket[socket.name].packet = Object.assign({},packet[TRANSLATIONS[socket.type]])
console.log(socket.name,'=========\n', this.socket[socket.name])
if (socket.transport ==='n') opts[socket.name].np = true
opts[socket.name].id = this.id +':'+ socket.name
// console.log(TRANSLATIONS[socket.type])
this.socket[socket.name] = new UCISocket[TRANSLATIONS[socket.type]](opts[socket.name])
// console.log(socket.name, this.socket[socket.name].send)
Object.assign(this.socket[socket.name],socket) // copy socket info props to new socket
this.socket[socket.name]._packetProcess = this._packetProcess.bind(this,socket.name)
})
// opts.id = this.id +':'+sock
// if (!opts[sock]) opts[sock] = {}
// switch (sock) {
// case
// 'us':
// opts.us.path = opts.us.path || opts.path
//
// this.socket[sock].info = {transport:'np', id:sock, }
// this.socket[sock].packet._process = socketEmit.bind(this.socket[sock],this.emit)
// break
// case 'uc':
// opts.uc.path = opts.uc.path || opts.path
// this.socket[sock] = new Consumer(opts.uc.path,opts)
// this.socket[sock].packet = Object.assign({},packet.consumer)
// break
// case 'ts':
// this.socket[sock]= new Socket(opts.ts, opts)
// this.socket[sock].packet = Object.assign({},packet.socket)
// break
// case 'tc':
// this.socket[sock]= new Consumer(opts.tc,opts)
// this.socket[sock].packet = Object.assign({},packet.consumer)
// }
// })
} // end constructor
async init (context) {
async init () {
let sockets = []
context = context || this // additional context is Base instance (or callsite) if none supplied
this.registerPacketContext(context)
for(let type of Object.keys(this.socket)){
if (type.indexOf('s')!==-1) {
sockets.push(this.socket[type].create())
for(let name of Object.keys(this.socket)){
if (this.socket[name].type ==='s') {
sockets.push(this.socket[name].create())
// setTimeout(this.socket[type].create,200) // test for consumer retry for delayed socket creation
}
else {
sockets.push(this.socket[type].connect())
sockets.push(this.socket[name].connect())
}
}
await Promise.all(sockets)
this.on('packet', packet => {
console.log('==========>',packet.socket)
})
} // init
registerPacketContext(type, context) {
if (typeof type === 'string') {
this.socket[type].packet.context = context
} else {
context=type
for(let type of Object.keys(this.socket)){
this.socket[type].packet.context = context
}
}
}
amendPacketContext(type, context) {
if (typeof type === 'string') {
Object.assign(this.socket[type].packet.context,context)
} else {
context=type
for(let type of Object.keys(this.socket)){
Object.assign(this.socket[type].packet.context,context)
}
}
}
amendPacketProcessing(type,funcs) {
if (typeof type === 'string') {
this.socket[type].packet = Object.assign({},this.socket[type].packet,funcs)
} else {
funcs=type
for(let type of Object.keys(this.socket)){
this.socket[type].packet = Object.assign(this.socket[type].packet,funcs)
}
}
}
registerPacketProcessor(type,func) {
if (typeof type === 'string') {
this.socket[type].packet._process = func
} else {
func = type
for(let type of Object.keys(this.socket)){
this.socket[type].registerPacketProcessor(func)
}
}
}
async sendTCP(packet) {
await this.socket.tc.send(packet)
}
async sendIPC(packet) {
await this.socket.uc.send(packet)
}
async send (transport, packet) {
if (typeof transport === 'string') {
if (this.socket[transport]) await this.socket.uc.send(packet)
async send (name, packet) {
if (typeof name === 'string') {
if (this.socket[name]) await this.socket[name].send(packet)
}
else {
packet = transport
for(let type of Object.keys(this.socket)){
if (type.indexOf('c')!==-1) {
await this.socket[type].send(packet)
packet = name
for(let name of Object.keys(this.socket)){
if (this.socket[name].type ==='c') {
await this.socket[name].send(packet)
}
}
}
this.emit('packet', packet) // intra process, need to set a 'packet' listener to process it
this.emit('packet', packet) // emits on instance for instance use
}
// registerPacketProcessor(name, func) {
// if (typeof name === 'string') {
// if (this.socket[name]) await this.socket[name]._packetProcess = func
// }
// else {
// func = name
// for(let name of Object.keys(this.socket)){
// this.socket[name].send(packet)
// }
// }
// }
// }
// registerConsumerProcessor(func) {
// this._packetProces = func
// }
// registerSocketProcessor(func) {
// this._packetProces = func
// }
getSocket(name) {return this.socket[name]}
sendSocket(name) {}
sendTransport(trans) {}
amendConsumerProcessing(funcs,trans) {
Object.assign(this._default.c[trans],funcs)
}
amendSocketProcessing(funcs,trans) {
if (trans) {
if (!this._default.s[trans]) this._default.s[trans] ={}
Object.assign(this._default.s[trans],funcs)
}
Object.assign(this._default.s,funcs)
}
amendNamedProcessing(name,funcs) {
if(!this._default[name]) this._default[name] ={}
Object.assign(this._default[name],funcs)
}
beforeHook (type,funcs){}
afterHook(type,funcs){}
addProcessing(name,type) {}
consumerProcessor(func) {
this._default.c._process = func
}
socketProcessor(func) {
this._default.s._process = func
}
packetProcessor(func) {
this._packetProcess = func
}
/*
*
* Private Methods
*
*/
_transport(name) {return this.socket[name].transport}
_type(name) {return this.socket[name].type}
// default packet processor for all sockets
async _packetProcess (sname,packet) {
return this._default[this.socket[sname].type]._process.bind(this)(packet,sname)
}
} // end class
const SOCKET_INFO_KEYS = ['name','type','transport']
const TRANSLATIONS = {t:'TCP',s:'Socket',c:'Consumer',n:'Named Pipe',}
const socketEmit = function (emit,packet) {
packet.socket = this.info
emit('packet', packet)
}
// this.on('packet', packet => {
// console.log('incoming packet ==========>',packet)
// app[TRANSLATIONS[packet._socket.type]].bind(app)(packet)
// })
//
// // console.log('IN EMIT FUNCTION',socket_name,packet)
// // let s = this.socket[socket_name]
// // packet._socket = {name:s.name,type:s.type,transport:s.transport}
// // this.emit('packet', packet)

View file

@ -0,0 +1,56 @@
// this._default refers to this module/hash
export default {
s:{ // s is for socket/server
_process: async function (packet,sname) {
if (!packet.cmd) return {error: 'no command in packet', packet: packet }
// TODO before hook here
// TODO Add namespce
if (this._default[sname]) if (this._default[sname][packet.cmd]) return await this._default[sname][packet.cmd](packet)
if (this[packet.cmd]) return await this[packet.cmd](packet)
// TODO add socket transport
if (this._default.s[packet.cmd]) return await this._default.s[packet.cmd](packet)
// TODO after hook here
return {error: 'no socket processing function supplied for command', packet: packet }
},
echo: packet => {
packet.processed = true
packet.cmd = 'reply'
packet.info = 'default socket echo'
return packet
}
},
c: { // c is for consumer/client
_process: async function (packet,sname) {
if (packet.error) this._default.c.error(packet)
if (packet.cmd) {
// TODO before hook here.
// TODO Add namespce
if (this._default[sname]) if (this._default[sname][packet.cmd]) return await this._default[sname][packet.cmd](packet)
// if (this.app[sname]) if (this.app[sname][packet.cmd]) return await this.app[sname][packet.cmd](packet)
if (this[packet.cmd]) return await this[packet.cmd](packet)
if (this._default.c[packet.cmd]) return await this._default.c[packet.cmd](packet)
// TODO after hook here
packet = {error:'no consumer processing function supplied for command',packet:packet}
this._default.c.error(packet)
} else {
packet = {error:'no command in packet',packet:packet}
this._default.c.error(packet)
}
},
error: function (packet) {
console.log('==============Packet ERROR==========')
console.log(packet.error )
console.dir(packet.packet)
console.log('===========================')
},
reply: packet => {
console.log('==============Packet returned from socket==========')
console.dir(packet)
console.log('===========================')
}
}
}

View file

@ -1,45 +0,0 @@
export default {
Socket:{
_process: async function (packet) {
if (!packet.cmd) return {error: 'no command in packet', packet: packet }
if (this.context) if (this.context[packet.cmd]) return await this.context[packet.cmd].bind(this.context)(packet)
if (this[packet.cmd]) return await this[packet.cmd](packet)
return {error: 'no socket processing function supplied for command', packet: packet }
},
echo: packet => {
packet.processed = true
packet.cmd = 'log'
packet.info = 'default socket echo'
return packet
}
},
Consumer: {
_process: async function (packet) {
if (packet.error) return this.error(packet)
if (packet.cmd) {
if (this.context) if (this.context[packet.cmd]) return await this.context[packet.cmd].bind(this.context)(packet)
if (this[packet.cmd]) return await this[packet.cmd](packet)
packet = {error:'no consumer processing function supplied for command',packet:packet}
return this.error(packet)
} else {
packet = {error:'no command in packet',packet:packet}
return this.error(packet)
}
},
error: function (packet) {
console.log('==============Packet ERROR==========')
console.log(packet.error )
console.dir(packet.packet)
console.log('===========================')
},
log: packet => {
console.log('==============Packet returned from socket==========')
console.dir(packet)
console.log('===========================')
}
}
}