From 31747cc373b944c27692e9845c9c5024bf529df2 Mon Sep 17 00:00:00 2001 From: David Kebler Date: Sun, 15 Mar 2020 15:15:43 -0700 Subject: [PATCH] 0.5.0 bumping in anticipation of 3-2020 deployment of light code working fine, no bugs cleaned code refactored socketsInit and how it handles the ready object, delays 100ms to avoid pushing/send issue when socket is closed abruptly refactored the observer names to add a suffix be more clear like for sockets and and for consumers add better examples two consumers and socket example and duplex example --- .gitignore | 3 ++ examples/consumer.js | 79 +++++++++++++++++++++++++++++++++++++ examples/consumer2.js | 82 +++++++++++++++++++++++++++++++++++++++ examples/duplex.js | 90 +++++++++++++++++++++++++++++++++++++++++++ examples/socket.js | 73 +++++++++++++++++++++++++++++++++++ nodemon.json | 2 +- package.json | 14 +++---- src/base.js | 55 +++++++++++++------------- 8 files changed, 363 insertions(+), 35 deletions(-) create mode 100644 examples/consumer.js create mode 100644 examples/consumer2.js create mode 100644 examples/duplex.js create mode 100644 examples/socket.js diff --git a/.gitignore b/.gitignore index ba63714..06fb011 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,6 @@ /coverage/ /syncd/ yarn.lock +/.yalc/ +/yalc.lock +/yarn-error.log diff --git a/examples/consumer.js b/examples/consumer.js new file mode 100644 index 0000000..38c7ae0 --- /dev/null +++ b/examples/consumer.js @@ -0,0 +1,79 @@ +/* +simple generic consumer sending an ack +* +*/ +import Base from '../src/base' + +const delay = time => new Promise(res=>setTimeout(()=>res(),time)) + +let opts={name:'example-consumer', id:'example-consumer'} + +let consumer = new Base(opts) + +; +(async () => { + + + // + consumer.on('log',ev => { + switch (ev.level) { + // case 'warning': + // case 'error': + case 'testing': + // case 'fatal': + console.log(ev.level.toUpperCase(),'\n',ev) + break + } + }) + + // example return processor using a namespace, will take precidence over default and root namespaces + consumer.cmds = { + + reply: (packet) => { + // console.log('------REPLY--via cmds namespace----') + // console.log(packet) + }, + error: (packet) => { + console.log('------Error--via cmds namespace----') + console.log(packet) + } + + } + + consumer.addNamespace('cmds','c') // comment this out to process via root namespace + + consumer.registerSocket('to-example-socket','c','n',{name:'example-consumer', path:'named-pipe'}) + + consumer.ready.addObserver('example-socket:process') + + consumer.on('example-socket:process', (ready,packet) => { + console.log('process state from socket', ready) + }) + + let res = await consumer.init() + if (res.error) { + console.log('errors during init') + process.kill(process.pid, 'SIGTERM') + } + + consumer.ready.subscribe(async ready=>{ + console.log('consumer can be ready when socket process pushes it is ready', ready) + }) + + process.once('SIGINT', function() { + console.log('Caught interrupt signal') + consumer.removeAllListeners() + process.exit() + }) + + process.once('SIGUSR2', async () => { + consumer.removeAllListeners() + process.exit() + }) + + +})().catch(err => { + console.error('FATAL: UNABLE TO START SYSTEM!\n',err) + consumer.removeAllListeners() + process.kill(process.pid, 'SIGTERM') +}) diff --git a/examples/consumer2.js b/examples/consumer2.js new file mode 100644 index 0000000..06e2503 --- /dev/null +++ b/examples/consumer2.js @@ -0,0 +1,82 @@ +/* +simple generic consumer sending an ack +* +*/ +import Base from '../src/base' + +const delay = time => new Promise(res=>setTimeout(()=>res(),time)) + +let opts={name:'example-consumer-2', id:'example-consumer-2'} + +let consumer = new Base(opts) + +; +(async () => { + + + // + consumer.on('log',ev => { + switch (ev.level) { + // case 'trace': + // case 'debug': + case 'info': + case 'warning': + case 'error': + case 'fatal': + case 'testing': + console.log(ev.level.toUpperCase(),'\n',ev) + break + } + }) + + // example return processor using a namespace, will take precidence over default and root namespaces + consumer.cmds = { + + reply: (packet) => { + // console.log('------REPLY--via cmds namespace----') + // console.log(packet) + }, + error: (packet) => { + console.log('------Error--via cmds namespace----') + console.log(packet) + } + + } + + consumer.addNamespace('cmds','c') // comment this out to process via root namespace + + consumer.registerSocket('to-example-socket-2','c','n',{name:'example-consumer-2', path:'named-pipe'}) + + consumer.ready.addObserver('example-socket:process') + + consumer.on('example-socket:process', (ready,packet) => { + console.log('process state from socket', ready) + }) + + let res = await consumer.init() + if (res.error) { + console.log('errors during init') + process.kill(process.pid, 'SIGTERM') + } + + consumer.ready.subscribe(async ready=>{ + console.log('consumer 2 can be ready when socket process pushes it is ready', ready) + }) + + process.once('SIGINT', function() { + console.log('Caught interrupt signal') + consumer.removeAllListeners() + process.exit() + }) + + process.once('SIGUSR2', async () => { + consumer.removeAllListeners() + process.exit() + }) + + +})().catch(err => { + console.error('FATAL: UNABLE TO START SYSTEM!\n',err) + consumer.removeAllListeners() + process.kill(process.pid, 'SIGTERM') +}) diff --git a/examples/duplex.js b/examples/duplex.js new file mode 100644 index 0000000..1280c2e --- /dev/null +++ b/examples/duplex.js @@ -0,0 +1,90 @@ +/* +simple generic consumer sending an ack +* +*/ +import Base from '../src/base' + +const delay = time => new Promise(res=>setTimeout(()=>res(),time)) + +const NAME = 'example-duplex' + +let opts={name:NAME+process.env.NAME, id:NAME+process.env.NAME} + +let duplex = new Base(opts) + +; +(async () => { + + + // + duplex.on('log',ev => { + switch (ev.level) { + // case 'warning': + // case 'error': + case 'testing': + // case 'fatal': + console.log(ev.level.toUpperCase(),'\n',ev) + break + } + }) + + // example return processor using a namespace, will take precidence over default and root namespaces + duplex.cmds = { + + reply: (packet) => { + // console.log('------REPLY--via cmds namespace----') + // console.log(packet) + }, + error: (packet) => { + console.log('------Error--via cmds namespace----') + console.log(packet) + } + + } + + duplex.addNamespace('cmds','c') // comment this out to process via root namespace + + duplex.registerSocket(opts.name+'c','c','n',{name:opts.name, path:'pipe-'+process.env.DUPLEX_NAME}) + duplex.registerSocket(opts.name+'s','s','n',{ path:'pipe-'+process.env.NAME, duplex:NAME+process.env.DUPLEX_NAME}) + + duplex.ready.addObserver(opts.name+':another-state') + + duplex.on(opts.name+':process', (ready, packet) => { + console.log(opts.name,'process state changed', ready) + }) + + duplex.ready.subscribe(async ready=>{ + console.log(opts.name,'overall ready state change', ready) + }) + + duplex.ready.makeObserver(NAME+process.env.DUPLEX_NAME+':process') + .subscribe(ready=>console.log(NAME+process.env.DUPLEX_NAME+':process','was observed as',ready)) + + let res = await duplex.init() + if (res.error) { + console.log('errors during init') + process.kill(process.pid, 'SIGTERM') + } + + setTimeout(()=>{ + duplex.emit(opts.name+':another-state',true) + console.log(opts.name,'setting test observer to true') + },2000) + + process.once('SIGINT', function() { + console.log('Caught interrupt signal') + duplex.removeAllListeners() + process.exit() + }) + + process.once('SIGUSR2', async () => { + duplex.removeAllListeners() + process.exit() + }) + + +})().catch(err => { + console.error('FATAL: UNABLE TO START SYSTEM!\n',err) + duplex.removeAllListeners() + process.kill(process.pid, 'SIGTERM') +}) diff --git a/examples/socket.js b/examples/socket.js new file mode 100644 index 0000000..58c455f --- /dev/null +++ b/examples/socket.js @@ -0,0 +1,73 @@ +/* +simple generic client sending an ack +* +*/ +import Base from '../src/base' + + +let opts={name:'example-socket', id:'id-example-socket'} +// if nothing set will be localhost:8080 + +let socket = new Base(opts) + +; +(async () => { + + socket.on('log',ev => { + switch (ev.level) { + // case 'trace': + // case 'debug': + // case 'info': + case 'warning': + case 'error': + case 'fatal': + case 'testing': + console.log(ev.level.toUpperCase(),'\n',ev) + break + } + }) + + socket.registerSocket('socket-name','s','n',{ path:'named-pipe'}) + + socket.ready.addObserver('test-observer') + + let res = await socket.init() + if (res.error) { + console.log('errors during init') + process.kill(process.pid, 'SIGTERM') + } + + socket.on('example-consumer:process', (ready,packet) => { + console.log('process state event from consumer', ready, packet.name) + }) + + socket.on('example-consumer-2:process', (ready,packet) => { + console.log('process state event from consumer', ready, packet.name) + }) + + socket.ready.subscribe(ready=>{ + console.log('the socket process is ready?', ready) + }) + + setTimeout(()=>{ + socket.emit('test-observer',true) + console.log('socket test-observer set to true') + },10000) + + process.once('SIGINT', function() { + console.log('Caught interrupt signal') + socket.removeAllListeners() + process.exit() + }) + + process.once('SIGUSR2', async () => { + socket.removeAllListeners() + process.exit() + }) + + +})().catch(err => { + console.error('FATAL: UNABLE TO START SYSTEM!\n',err) + socket.removeAllListeners() + process.kill(process.pid, 'SIGTERM') +}) diff --git a/nodemon.json b/nodemon.json index ec691f6..cc96955 100644 --- a/nodemon.json +++ b/nodemon.json @@ -1,5 +1,5 @@ { "ignoreRoot": [".git"], - "watch": ["node_modules/@uci/","node_modules/@uci-utils/","src/","index.js","examples","test/"], + "watch": ["node_modules/@uci/","node_modules/@uci-utils/","src/","index.js","examples","test/","examples/"], "ignore":["examples/ws-fio-client"] } diff --git a/package.json b/package.json index ccdef78..57874a5 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@uci/base", - "version": "0.1.51", + "version": "0.5.0", "description": "Multi type and transport JSON packet communication base class. Used in UCI extended classes", "main": "src/base", "scripts": { @@ -39,18 +39,18 @@ "devDependencies": { "chai": "^4.2.0", "esm": "^3.2.25", - "mocha": "^7.0.1", + "mocha": "^7.1.0", "nodemon": "^2.0.2" }, "dependencies": { "@uci-utils/bind-funcs": "^0.2.4", "@uci-utils/logger": "^0.0.16", - "@uci-utils/ready": "^0.1.9", - "@uci/mqtt": "^0.1.13", - "@uci/socket": "^0.2.31", - "@uci/websocket": "^0.3.13", + "@uci-utils/ready": "^0.3.0", + "@uci/mqtt": "^0.3.0", + "@uci/socket": "^0.3.0", + "@uci/websocket": "^0.4.0", "await-to-js": "^2.1.1", "is-plain-object": "^3.0.0", - "merge-anything": "^2.4.4" + "merge-anything": "^3.0.3" } } diff --git a/src/base.js b/src/base.js index 359d5d1..9b9cfd8 100644 --- a/src/base.js +++ b/src/base.js @@ -18,7 +18,7 @@ let log = {} // declare module wide log to be set during construction // Community dependencies import to from 'await-to-js' import isPlainObject from 'is-plain-object' -import merge from 'merge-anything' +import { merge } from 'merge-anything' // Nodejs dependencies import EventEmitter from 'events' @@ -26,7 +26,7 @@ import EventEmitter from 'events' import { cmdProcessor, defaultCmds, namespaces } from './processing' // Constants -const SOCKET_INFO_KEYS = ['name', 'type', 'transport'] +// const SOCKET_INFO_KEYS = ['name', 'type', 'transport'] const TRANSLATE = { n: 'Named Pipe', t: 'TCP', @@ -99,7 +99,6 @@ class Base extends EventEmitter { sockets = Array.isArray(sockets) ? sockets:[sockets] sockets.forEach(socket => this.registerSocket(socket)) } - console.log('base.js @uci/base package tag 0.1.47') } // end constructor @@ -125,20 +124,26 @@ class Base extends EventEmitter { // TODO ready needs to allow multiple all subscribers that get rebuilt on add/remove const res = await this.socketsInit(sockets) - // console.log('all observer', this.ready.all) // update ready packet and push/send that changed packet this.ready.all.subscribe(async ready => { this._readyPacket.ready= ready - delete (this._readyPacket.failure) + delete (this._readyPacket._header) if (!ready) { // make a list of the failures to send // await new Promise(res=>setTimeout(()=>res(),1000)) this._readyPacket.failures = this.ready.failed } else delete this._readyPacket.failures - this.emit('log',{level:'testing', msg:`${this.name} has an updated state broadcasting: event>state = ${this._readyPacket.event}>${this._readyPacket.ready}`}) - // setTimeout(async () => { - // console.log('ready send', await this.send(this._readyPacket)) // to any socket that this instance is connected to - // console.log('ready push',await this.push(this._readyPacket)) // to any remote consumer connected to an instance socket - // },100) + // this.emit('log',{level:'testing', msg:`${this.name} has an updated state broadcasting: event>state = ${this._readyPacket.event}>${this._readyPacket.ready}`}) + let packet = Object.assign({},this._readyPacket) + // console.log('ready packet to broadcast',packet) + // console.log('broadcasting ready',ready,this.ready.state) + this.getSocketsFilter({type:'s'}) + .forEach(socket=>this.getSocket(socket).conPackets[0]=packet) + setTimeout(async () => { + this.send(packet) // to any socket that this instance is connected to + this.push(packet) // to any remote consumer connected to an instance socket + // console.log('ready send:', await this.send(packet)) // to any socket that this instance is connected to + // console.log('ready push:',await this.push(packet)) // to any remote consumer connected to an instance socket + },100) }) return res } @@ -268,7 +273,8 @@ class Base extends EventEmitter { if (type==='c') { // when consumer has sucessfully connected to a socket - this.ready.addObserver(`${name}:consumer`,this._socket[name],{event:'connection:socket',condition:ev=>ev.state==='connected'}) + this._socket[name].obsName = `${name}:${options.path ? options.path : `${options.host}:${options.port}`}` + this.ready.addObserver(this._socket[name].obsName,this._socket[name],{event:'connection:socket',condition:ev=>ev.state==='connected'}) // set up listner for any pushed packets and emit locally this._socket[name].on('pushed', packet => { packet._header.socketName=name @@ -278,16 +284,10 @@ class Base extends EventEmitter { if (type==='s') { // when socket is listnening - this.ready.addObserver(`${name}:socket`,this._socket[name],{ event:'socket', condition: ev => (ev || {}).state ==='listening' }) - - // TODO refactor as regular listners so will know which consumer and can push only to it - this.consumerConnected(this._socket[name],{ - subscribe: newConsumer => { if (newConsumer) { - this.emit('log',{level:'testing', msg:`${this.name} has new consumer connecting pushing: event>state = ${this._readyPacket.event}>${this._readyPacket.ready}`}) - this.push(this._readyPacket,{socket:name}) - } - } - }) + this.ready.addObserver(`${name}:socket`,this._socket[name],{ event:'socket', condition: ev => (ev || {}).state ==='listening' }) + // initially set conPackets, ready packets is ALWAYS the first + this._socket[name].conPackets.unshift(this._readyPacket) + if (options.duplex) this.consumerConnected(this._socket[name],{consumer:options.duplex, add:true}) } return this.getSocketInit(name) // returns the init function (e.g. connect or create) for the socket } @@ -319,7 +319,7 @@ class Base extends EventEmitter { else return this._socket } // returns array of names of sockets that pass filter - getSocketsFilter({type,trans, active}) { + getSocketsFilter({type, trans, active}={}) { if (trans) trans = this._validateTransport(trans) let filtered = [] Object.keys(this._socket).forEach(name => { @@ -357,6 +357,7 @@ class Base extends EventEmitter { * @returns {type} Description */ async send(name, packet) { + if (!packet || !Object.keys(packet).length) return Promise.resolve('no packet to push - aborted') if (typeof name !== 'string') { packet = name let sends = [] @@ -385,6 +386,7 @@ class Base extends EventEmitter { } async push(packet,opts={}) { + if (!packet || !Object.keys(packet).length) return Promise.resolve('no packet to push - aborted') let sockets = this.getSocketsFilter({type:'s'}) if (!sockets.length) return Promise.resolve('no sockets on which to push') opts.sockets = opts.sockets ? opts.sockets : (opts.socket ? [opts.socket] : []) @@ -399,7 +401,7 @@ class Base extends EventEmitter { for (let socket of sockets) { let hookedPacket = {} hookedPacket = socket.beforeSend ? await socket.beforeSend.call(this,Object.assign({},packet),true) : packet - log.debug({msg:'hooked packet to push', name:socket.name, packet:hookedPacket, method:'push', line:243}) + // console.log({msg:'hooked packet to push', name:socket.name, packet:hookedPacket, method:'push', line:243}) broadcast.push(socket.push.bind(socket,hookedPacket,opts)) } return Promise.all( @@ -549,8 +551,8 @@ class Base extends EventEmitter { name = name || consumer add = add && consumer const options = {event:'connection:consumer',condition:conditionHandler} - - const obs = add ? this.ready.addObserver(`${name}:consumer:inbound`,socket,options) : this.ready.makeObserver(socket,options) + let oname = `${name}:consumer>${socket.name}:socket` + const obs = add ? this.ready.addObserver(oname,socket,options) : this.ready.makeObserver(socket,options) if (typeof subscribe ==='function') return obs.subscribe(subscribe) return obs } // end consumerConnected @@ -612,7 +614,7 @@ class Base extends EventEmitter { if (!isPlainObject(res)) packet.processResult ? packet.processResult[name]=res : packet.processResult = {[name]:res} else { let method = (packet.processMethod || {})[name] || packet.processMethod - // TODO could support other specialized methods + // TODO could support other specialized merge methods if (method === 'merge') { packet = merge(packet,res) } @@ -663,7 +665,6 @@ class Base extends EventEmitter { return trans } - _transport(name) { return this._socket[name].transport } //getter for socket transport