From 23ea81c9d961d2112f98729b613228cd93c2df6e Mon Sep 17 00:00:00 2001 From: David Kebler Date: Fri, 13 Sep 2019 19:05:22 -0700 Subject: [PATCH] 0.1.29 add default return command for socket/servers add generic client and ack sockect server examples --- .eslintrc.js | 7 +++-- examples/ack.js | 44 +++++++++++++++++++++++++++ examples/client.js | 74 ++++++++++++++++++++++++++++++++++++++++++++++ nodemon.json | 5 ++-- package.json | 8 +++-- src/base.js | 34 ++++++++++++--------- src/processing.js | 3 +- 7 files changed, 153 insertions(+), 22 deletions(-) create mode 100644 examples/ack.js create mode 100644 examples/client.js diff --git a/.eslintrc.js b/.eslintrc.js index bb23489..5449e39 100644 --- a/.eslintrc.js +++ b/.eslintrc.js @@ -10,8 +10,11 @@ module.exports = { "mocha": true }, "parserOptions": { - "ecmaVersion": 2017 - ,"sourceType": "module" + "ecmaVersion": 2017, + "sourceType": "module", + "ecmaFeatures": { + "experimentalObjectRestSpread": true + } }, "extends": "eslint:recommended", "rules": { diff --git a/examples/ack.js b/examples/ack.js new file mode 100644 index 0000000..f47ea76 --- /dev/null +++ b/examples/ack.js @@ -0,0 +1,44 @@ +/* +simple generic server sending an ack +* +*/ +import Base from '../src/base' + +let opts={id:'simple-ack-server-server'} + +; +(async () => { + + let server = new Base(opts) + let opts2 = Object.assign({},opts) + opts.port = process.env.UCI_PORT + server.addSocket('server:tcp','s','t',opts) + opts2.path = process.env.UCI_PATH || true + server.addSocket('server:named-pipe','s','n',opts2) + + server.on('status',err => { + console.log('STATUS EMITTED\n', err) + }) + + // example return processor using a namespace, will take precidence over default and root namespaces + server.cmds = { ack: (packet) => { + packet.ack = true + packet.cmd='reply' + packet.msg ='ack via custom ack function' + } + } + + server.addNamespace('cmds') // comment this out to process via default ack + + let res = await server.init() + if (res.error) { + console.log('errors during init') + process.kill(process.pid, 'SIGTERM') + } + + console.log('sockets listening for an ack') + +})().catch(err => { + console.error('FATAL: UNABLE TO START SYSTEM!\n',err) + process.kill(process.pid, 'SIGTERM') +}) diff --git a/examples/client.js b/examples/client.js new file mode 100644 index 0000000..5aa1f4c --- /dev/null +++ b/examples/client.js @@ -0,0 +1,74 @@ +/* +simple generic client sending an ack +* +*/ +import Base from '../src/base' + +const delay = time => new Promise(res=>setTimeout(()=>res(),time)) + +let opts={id:'simple-client', useRootNS:true} +// if nothing set will be localhost:8080 +if (process.env.UCI_HOST || process.env.UCI_PORT || !process.env.UCI_PATH ) { + opts.host = process.env.UCI_HOST + opts.port = process.env.UCI_PORT +} else { + opts.path = process.env.UCI_PATH==='true' ? true : process.env.UCI_PATH +} + +(async () => { + + let client = new Base(opts) + + client.on('status',ev => { + switch (ev.level) { + case 'warning': + case 'error': + case 'fatal': + console.log(ev.level.toUpperCase(),'\n',ev) + break + } + }) + + function log (packet) { + if (packet.ack) { + console.log('socket acknowledged') + let {_header, cmd, ...rest} = packet + console.log('reply:',rest) + } + else { + console.log('process at socket did not acknowledge') + console.log(packet._header) + } + console.log('------ END REPLY------') + } + + // example return processor using a namespace, will take precidence over default and root namespaces + client.cmds = { reply: (packet) => { + console.log('------REPLY--via cmds namespace----') + log(packet) } + } + client.addNamespace('cmds','c') // comment this out to process via root namespace + + // alt example return process function making us of root namspace, useRootNS must be set true in options to do this + client.reply = function (packet) { + console.log('------REPLY--via root namespace----') + log(packet) + } + + client.addSocket(`client:${opts.path ? 'named-pipe':'tcp'}`,'c',opts.path?'n':'t',opts) + + let res = await client.init() + if (res.error) { + console.log('errors during init') + process.kill(process.pid, 'SIGTERM') + } + + console.log('connected proceed with ack') + await client.send({cmd:'ack'}) + delay(2000) + process.kill(process.pid, 'SIGTERM') + +})().catch(err => { + console.error('FATAL: UNABLE TO START SYSTEM!\n',err) + process.kill(process.pid, 'SIGTERM') +}) diff --git a/nodemon.json b/nodemon.json index 2e2a7df..ec691f6 100644 --- a/nodemon.json +++ b/nodemon.json @@ -1,4 +1,5 @@ { - "ignoreRoot": [".git","examples/ws-fio-client"], - "watch": ["node_modules/@uci/","node_modules/@uci-utils/","src/","index.js","examples/four-in-on.js","examples/four-in-on.js","test/"] + "ignoreRoot": [".git"], + "watch": ["node_modules/@uci/","node_modules/@uci-utils/","src/","index.js","examples","test/"], + "ignore":["examples/ws-fio-client"] } diff --git a/package.json b/package.json index 8214bc9..13d8ee2 100644 --- a/package.json +++ b/package.json @@ -1,11 +1,15 @@ { "name": "@uci/base", - "version": "0.1.28", + "version": "0.1.29", "description": "Multi type and transport JSON packet communication base class. Used in UCI extended classes", "main": "src/base", "scripts": { "fiod": "UCI_ENV=dev ./node_modules/.bin/nodemon -r esm --preserve-symlinks examples/four-in-one", "fio": "nodemon -r esm examples/four-in-one", + "ack": "node -r esm examples/ack || true", + "ack:dev": "UCI_ENV=dev ./node_modules/.bin/nodemon -r esm --preserve-symlinks examples/ack || true", + "client": "node -r esm examples/client || true", + "client:dev": "UCI_ENV=dev ./node_modules/.bin/nodemon -r esm --preserve-symlinks examples/client || true", "test": "UCI_ENV=dev nodemon -r esm --preserve-symlinks test/test", "mtestw": "mocha -r esm test/*.test.mjs --watch --recurse ", "mtest": "mocha -r esm test/*.test.mjs" @@ -36,7 +40,7 @@ "@uci-utils/bind-funcs": "^0.2.4", "@uci-utils/logger": "^0.0.15", "@uci/mqtt": "^0.1.13", - "@uci/socket": "^0.2.20", + "@uci/socket": "^0.2.21", "@uci/websocket": "^0.3.9", "await-to-js": "^2.1.1", "p-reflect": "^2.1.0", diff --git a/src/base.js b/src/base.js index ae5fb65..c3e4484 100644 --- a/src/base.js +++ b/src/base.js @@ -65,6 +65,10 @@ class Base extends EventEmitter { log = logger({ name: 'base', id: this.id }) this.desc = opts.desc // additional details for humans this._socket = {} // holds all the various communication sockets + // these two if passed will get applied to all consumer sockets, otherwise socket defaults will be used + this.initTimeout = opts.initTimeout + this.retryWait = opts.retryWait + this.defaultReturnCmd = opts.defaultReturnCmd this._started = false // flag to know when instance has been initialized this._processors = { _default: processor } // _c and _s are the default namespaces @@ -122,7 +126,7 @@ class Base extends EventEmitter { const value = await socket.init() results[socket.name] = value } catch (error) { - this.emit('error',{msg:'socket init error',error:error})// emit an error here, remove socket + this.emit('status',{level:'fatal', msg:'socket init error',error:error})// emit an error here, remove socket let res = await this.removeSocket(socket.name) errors[socket.name]={error:error, remove:res} } @@ -151,7 +155,9 @@ class Base extends EventEmitter { */ async addSocket(name, type = 'c', transport = 'n', options = {}) { log.debug({ socketName: name, type: type, tranport: transport, options: options, method:'addSocket', line:147, msg:`adding socket ${name}`}) - options.id = this.id + ':' + name + options.id = options.id || this.id + ':' + name + if (type==='c') options = Object.assign({initTimeout:this.initTimeout, retryWait:this.retryWait},options) + if (type==='s') options = Object.assign({defaultReturnCmd:this.defaultReturnCmd},options) switch (transport) { case 'n': options.path = options.path || true @@ -177,18 +183,20 @@ class Base extends EventEmitter { this._socket[name].transport = transport this._socket[name]._packetProcess = this._packetProcess.bind(this, name) - if (type==='c') { // bubble up events from client sockets - this._socket[name].on('status', ev => { - ev.socketName=name - this.emit('status', ev) - }) + // bubble up events from sockets + this._socket[name].on('status', ev => { + ev.socketName=name + this.emit('status', ev) + }) + + if (type==='c') { this._socket[name].on('pushed', packet => { packet._header.socketName=name this.emit('pushed', packet) }) } - // do this as .then promise then addSocket doesn't need to be async before init + // if instance already started then init this socket now if (this._started) return await this._initSocket(name) else return `socket ${name} added` } @@ -203,10 +211,10 @@ class Base extends EventEmitter { // NOTE: uci consumers have .end renamed as .close to match socket method for convenience let closeError let [err] = await to(this._socket[name].close()) - if (err.code !== 'ERR_SERVER_NOT_RUNNING') { + if(err) if (err.code !== 'ERR_SERVER_NOT_RUNNING') { closeError = {socket:this._socket[name].name, error:err, msg:'socket/consumer closed with errors, but removed'} } - this.emit('warn', {msg:`socket ${name} has been removed`, socket:this._socket[name].opts}) + this.emit('status', {level:'warn', msg:`socket ${name} has been removed`, socket:this._socket[name].opts}) delete this._socket[name] return closeError ? closeError : 'success' } @@ -325,7 +333,7 @@ class Base extends EventEmitter { // add set of functions to class prop/space and then register with this addNamespace(space, type, trans) { - if (type !=='c' || type !=='s') { + if (type !=='c' && type !=='s') { trans = type type = 's' } trans = this._validateTransport(trans) @@ -431,13 +439,11 @@ class Base extends EventEmitter { if (this._socket[socket_name].beforeProcess) { [err,res] = await to(this._socket[socket_name].beforeProcess.call(this,packet)) if (err) { // hook has forced an abort to processing - console.log('before error', packet) packet.error = err return packet } packet = res } - // if (this._socket[socket_name].beforeProcess) packet = await this._socket[socket_name].beforeProcess.call(this,packet) // the processor can be set via the incoming packet // otherwise if one is set on the socket or the default found in processing.js // TODO Try each "available" packet processor in some order if fails try next one before trying the default @@ -465,7 +471,7 @@ class Base extends EventEmitter { return `socket ${name} added and initialzed, ${res}` }) .catch(function(err) { - this.emit('error', {msg:'failed initialization', error:err, socket:socket, code:'SOCKET_INIT'}) + this.emit('status', {level:'fatal', msg:'failed initialization', error:err, socket:socket, code:'SOCKET_INIT'}) return {msg:`socket ${name} failed initialization`, error:err} }.bind(this) ) diff --git a/src/processing.js b/src/processing.js index 92703d1..63e976f 100644 --- a/src/processing.js +++ b/src/processing.js @@ -33,7 +33,6 @@ const _process = { }, c: async function (packet,socket) { - // the the end of life for a consumer packet that has been sent and returned or a packet that was pushed. if (packet.error) packet.cmd='error' if (packet.cmd) { @@ -78,7 +77,7 @@ const defaultCmds ={ ack: async packet => { packet.cmd = 'reply' packet.ack = true - packet.msg = 'this is the base default ack, superceed in your extended class' + packet.msg = 'this is the base default ack, superceed in your instance or extended class' return packet } },