0.1.29 add default return command for socket/servers

add generic client and ack sockect server examples
master
David Kebler 2019-09-13 19:05:22 -07:00
parent 61df3593b1
commit 23ea81c9d9
7 changed files with 153 additions and 22 deletions

View File

@ -10,8 +10,11 @@ module.exports = {
"mocha": true
},
"parserOptions": {
"ecmaVersion": 2017
,"sourceType": "module"
"ecmaVersion": 2017,
"sourceType": "module",
"ecmaFeatures": {
"experimentalObjectRestSpread": true
}
},
"extends": "eslint:recommended",
"rules": {

44
examples/ack.js Normal file
View File

@ -0,0 +1,44 @@
/*
simple generic server sending an ack
*
*/
import Base from '../src/base'
let opts={id:'simple-ack-server-server'}
;
(async () => {
let server = new Base(opts)
let opts2 = Object.assign({},opts)
opts.port = process.env.UCI_PORT
server.addSocket('server:tcp','s','t',opts)
opts2.path = process.env.UCI_PATH || true
server.addSocket('server:named-pipe','s','n',opts2)
server.on('status',err => {
console.log('STATUS EMITTED\n', err)
})
// example return processor using a namespace, will take precidence over default and root namespaces
server.cmds = { ack: (packet) => {
packet.ack = true
packet.cmd='reply'
packet.msg ='ack via custom ack function'
}
}
server.addNamespace('cmds') // comment this out to process via default ack
let res = await server.init()
if (res.error) {
console.log('errors during init')
process.kill(process.pid, 'SIGTERM')
}
console.log('sockets listening for an ack')
})().catch(err => {
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)
process.kill(process.pid, 'SIGTERM')
})

74
examples/client.js Normal file
View File

@ -0,0 +1,74 @@
/*
simple generic client sending an ack
*
*/
import Base from '../src/base'
const delay = time => new Promise(res=>setTimeout(()=>res(),time))
let opts={id:'simple-client', useRootNS:true}
// if nothing set will be localhost:8080
if (process.env.UCI_HOST || process.env.UCI_PORT || !process.env.UCI_PATH ) {
opts.host = process.env.UCI_HOST
opts.port = process.env.UCI_PORT
} else {
opts.path = process.env.UCI_PATH==='true' ? true : process.env.UCI_PATH
}
(async () => {
let client = new Base(opts)
client.on('status',ev => {
switch (ev.level) {
case 'warning':
case 'error':
case 'fatal':
console.log(ev.level.toUpperCase(),'\n',ev)
break
}
})
function log (packet) {
if (packet.ack) {
console.log('socket acknowledged')
let {_header, cmd, ...rest} = packet
console.log('reply:',rest)
}
else {
console.log('process at socket did not acknowledge')
console.log(packet._header)
}
console.log('------ END REPLY------')
}
// example return processor using a namespace, will take precidence over default and root namespaces
client.cmds = { reply: (packet) => {
console.log('------REPLY--via cmds namespace----')
log(packet) }
}
client.addNamespace('cmds','c') // comment this out to process via root namespace
// alt example return process function making us of root namspace, useRootNS must be set true in options to do this
client.reply = function (packet) {
console.log('------REPLY--via root namespace----')
log(packet)
}
client.addSocket(`client:${opts.path ? 'named-pipe':'tcp'}`,'c',opts.path?'n':'t',opts)
let res = await client.init()
if (res.error) {
console.log('errors during init')
process.kill(process.pid, 'SIGTERM')
}
console.log('connected proceed with ack')
await client.send({cmd:'ack'})
delay(2000)
process.kill(process.pid, 'SIGTERM')
})().catch(err => {
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)
process.kill(process.pid, 'SIGTERM')
})

View File

@ -1,4 +1,5 @@
{
"ignoreRoot": [".git","examples/ws-fio-client"],
"watch": ["node_modules/@uci/","node_modules/@uci-utils/","src/","index.js","examples/four-in-on.js","examples/four-in-on.js","test/"]
"ignoreRoot": [".git"],
"watch": ["node_modules/@uci/","node_modules/@uci-utils/","src/","index.js","examples","test/"],
"ignore":["examples/ws-fio-client"]
}

View File

@ -1,11 +1,15 @@
{
"name": "@uci/base",
"version": "0.1.28",
"version": "0.1.29",
"description": "Multi type and transport JSON packet communication base class. Used in UCI extended classes",
"main": "src/base",
"scripts": {
"fiod": "UCI_ENV=dev ./node_modules/.bin/nodemon -r esm --preserve-symlinks examples/four-in-one",
"fio": "nodemon -r esm examples/four-in-one",
"ack": "node -r esm examples/ack || true",
"ack:dev": "UCI_ENV=dev ./node_modules/.bin/nodemon -r esm --preserve-symlinks examples/ack || true",
"client": "node -r esm examples/client || true",
"client:dev": "UCI_ENV=dev ./node_modules/.bin/nodemon -r esm --preserve-symlinks examples/client || true",
"test": "UCI_ENV=dev nodemon -r esm --preserve-symlinks test/test",
"mtestw": "mocha -r esm test/*.test.mjs --watch --recurse ",
"mtest": "mocha -r esm test/*.test.mjs"
@ -36,7 +40,7 @@
"@uci-utils/bind-funcs": "^0.2.4",
"@uci-utils/logger": "^0.0.15",
"@uci/mqtt": "^0.1.13",
"@uci/socket": "^0.2.20",
"@uci/socket": "^0.2.21",
"@uci/websocket": "^0.3.9",
"await-to-js": "^2.1.1",
"p-reflect": "^2.1.0",

View File

@ -65,6 +65,10 @@ class Base extends EventEmitter {
log = logger({ name: 'base', id: this.id })
this.desc = opts.desc // additional details for humans
this._socket = {} // holds all the various communication sockets
// these two if passed will get applied to all consumer sockets, otherwise socket defaults will be used
this.initTimeout = opts.initTimeout
this.retryWait = opts.retryWait
this.defaultReturnCmd = opts.defaultReturnCmd
this._started = false // flag to know when instance has been initialized
this._processors = { _default: processor }
// _c and _s are the default namespaces
@ -122,7 +126,7 @@ class Base extends EventEmitter {
const value = await socket.init()
results[socket.name] = value
} catch (error) {
this.emit('error',{msg:'socket init error',error:error})// emit an error here, remove socket
this.emit('status',{level:'fatal', msg:'socket init error',error:error})// emit an error here, remove socket
let res = await this.removeSocket(socket.name)
errors[socket.name]={error:error, remove:res}
}
@ -151,7 +155,9 @@ class Base extends EventEmitter {
*/
async addSocket(name, type = 'c', transport = 'n', options = {}) {
log.debug({ socketName: name, type: type, tranport: transport, options: options, method:'addSocket', line:147, msg:`adding socket ${name}`})
options.id = this.id + ':' + name
options.id = options.id || this.id + ':' + name
if (type==='c') options = Object.assign({initTimeout:this.initTimeout, retryWait:this.retryWait},options)
if (type==='s') options = Object.assign({defaultReturnCmd:this.defaultReturnCmd},options)
switch (transport) {
case 'n':
options.path = options.path || true
@ -177,18 +183,20 @@ class Base extends EventEmitter {
this._socket[name].transport = transport
this._socket[name]._packetProcess = this._packetProcess.bind(this, name)
if (type==='c') { // bubble up events from client sockets
this._socket[name].on('status', ev => {
ev.socketName=name
this.emit('status', ev)
})
// bubble up events from sockets
this._socket[name].on('status', ev => {
ev.socketName=name
this.emit('status', ev)
})
if (type==='c') {
this._socket[name].on('pushed', packet => {
packet._header.socketName=name
this.emit('pushed', packet)
})
}
// do this as .then promise then addSocket doesn't need to be async before init
// if instance already started then init this socket now
if (this._started) return await this._initSocket(name)
else return `socket ${name} added`
}
@ -203,10 +211,10 @@ class Base extends EventEmitter {
// NOTE: uci consumers have .end renamed as .close to match socket method for convenience
let closeError
let [err] = await to(this._socket[name].close())
if (err.code !== 'ERR_SERVER_NOT_RUNNING') {
if(err) if (err.code !== 'ERR_SERVER_NOT_RUNNING') {
closeError = {socket:this._socket[name].name, error:err, msg:'socket/consumer closed with errors, but removed'}
}
this.emit('warn', {msg:`socket ${name} has been removed`, socket:this._socket[name].opts})
this.emit('status', {level:'warn', msg:`socket ${name} has been removed`, socket:this._socket[name].opts})
delete this._socket[name]
return closeError ? closeError : 'success'
}
@ -325,7 +333,7 @@ class Base extends EventEmitter {
// add set of functions to class prop/space and then register with this
addNamespace(space, type, trans) {
if (type !=='c' || type !=='s') {
if (type !=='c' && type !=='s') {
trans = type
type = 's' }
trans = this._validateTransport(trans)
@ -431,13 +439,11 @@ class Base extends EventEmitter {
if (this._socket[socket_name].beforeProcess) {
[err,res] = await to(this._socket[socket_name].beforeProcess.call(this,packet))
if (err) { // hook has forced an abort to processing
console.log('before error', packet)
packet.error = err
return packet
}
packet = res
}
// if (this._socket[socket_name].beforeProcess) packet = await this._socket[socket_name].beforeProcess.call(this,packet)
// the processor can be set via the incoming packet
// otherwise if one is set on the socket or the default found in processing.js
// TODO Try each "available" packet processor in some order if fails try next one before trying the default
@ -465,7 +471,7 @@ class Base extends EventEmitter {
return `socket ${name} added and initialzed, ${res}`
})
.catch(function(err) {
this.emit('error', {msg:'failed initialization', error:err, socket:socket, code:'SOCKET_INIT'})
this.emit('status', {level:'fatal', msg:'failed initialization', error:err, socket:socket, code:'SOCKET_INIT'})
return {msg:`socket ${name} failed initialization`, error:err}
}.bind(this)
)

View File

@ -33,7 +33,6 @@ const _process = {
},
c: async function (packet,socket) {
// the the end of life for a consumer packet that has been sent and returned or a packet that was pushed.
if (packet.error) packet.cmd='error'
if (packet.cmd) {
@ -78,7 +77,7 @@ const defaultCmds ={
ack: async packet => {
packet.cmd = 'reply'
packet.ack = true
packet.msg = 'this is the base default ack, superceed in your extended class'
packet.msg = 'this is the base default ack, superceed in your instance or extended class'
return packet
}
},