0.5.0 bumping in anticipation of 3-2020 deployment of light code

working fine, no bugs
cleaned code
refactored socketsInit and how it handles the ready object, delays 100ms to avoid pushing/send issue when socket is closed abruptly
refactored the observer names to add a suffix be more clear like <listening> for sockets  and <inbound> and <outbound> for consumers
add better examples  two consumers and socket example and duplex example
This commit is contained in:
David Kebler 2020-03-15 15:15:43 -07:00
parent b7a20230bb
commit 31747cc373
8 changed files with 363 additions and 35 deletions

3
.gitignore vendored
View file

@ -2,3 +2,6 @@
/coverage/
/syncd/
yarn.lock
/.yalc/
/yalc.lock
/yarn-error.log

79
examples/consumer.js Normal file
View file

@ -0,0 +1,79 @@
/*
simple generic consumer sending an ack
*
*/
import Base from '../src/base'
const delay = time => new Promise(res=>setTimeout(()=>res(),time))
let opts={name:'example-consumer', id:'example-consumer'}
let consumer = new Base(opts)
;
(async () => {
//
consumer.on('log',ev => {
switch (ev.level) {
// case 'warning':
// case 'error':
case 'testing':
// case 'fatal':
console.log(ev.level.toUpperCase(),'\n',ev)
break
}
})
// example return processor using a namespace, will take precidence over default and root namespaces
consumer.cmds = {
reply: (packet) => {
// console.log('------REPLY--via cmds namespace----')
// console.log(packet)
},
error: (packet) => {
console.log('------Error--via cmds namespace----')
console.log(packet)
}
}
consumer.addNamespace('cmds','c') // comment this out to process via root namespace
consumer.registerSocket('to-example-socket','c','n',{name:'example-consumer', path:'named-pipe'})
consumer.ready.addObserver('example-socket:process')
consumer.on('example-socket:process', (ready,packet) => {
console.log('process state from socket', ready)
})
let res = await consumer.init()
if (res.error) {
console.log('errors during init')
process.kill(process.pid, 'SIGTERM')
}
consumer.ready.subscribe(async ready=>{
console.log('consumer can be ready when socket process pushes it is ready', ready)
})
process.once('SIGINT', function() {
console.log('Caught interrupt signal')
consumer.removeAllListeners()
process.exit()
})
process.once('SIGUSR2', async () => {
consumer.removeAllListeners()
process.exit()
})
})().catch(err => {
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)
consumer.removeAllListeners()
process.kill(process.pid, 'SIGTERM')
})

82
examples/consumer2.js Normal file
View file

@ -0,0 +1,82 @@
/*
simple generic consumer sending an ack
*
*/
import Base from '../src/base'
const delay = time => new Promise(res=>setTimeout(()=>res(),time))
let opts={name:'example-consumer-2', id:'example-consumer-2'}
let consumer = new Base(opts)
;
(async () => {
//
consumer.on('log',ev => {
switch (ev.level) {
// case 'trace':
// case 'debug':
case 'info':
case 'warning':
case 'error':
case 'fatal':
case 'testing':
console.log(ev.level.toUpperCase(),'\n',ev)
break
}
})
// example return processor using a namespace, will take precidence over default and root namespaces
consumer.cmds = {
reply: (packet) => {
// console.log('------REPLY--via cmds namespace----')
// console.log(packet)
},
error: (packet) => {
console.log('------Error--via cmds namespace----')
console.log(packet)
}
}
consumer.addNamespace('cmds','c') // comment this out to process via root namespace
consumer.registerSocket('to-example-socket-2','c','n',{name:'example-consumer-2', path:'named-pipe'})
consumer.ready.addObserver('example-socket:process')
consumer.on('example-socket:process', (ready,packet) => {
console.log('process state from socket', ready)
})
let res = await consumer.init()
if (res.error) {
console.log('errors during init')
process.kill(process.pid, 'SIGTERM')
}
consumer.ready.subscribe(async ready=>{
console.log('consumer 2 can be ready when socket process pushes it is ready', ready)
})
process.once('SIGINT', function() {
console.log('Caught interrupt signal')
consumer.removeAllListeners()
process.exit()
})
process.once('SIGUSR2', async () => {
consumer.removeAllListeners()
process.exit()
})
})().catch(err => {
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)
consumer.removeAllListeners()
process.kill(process.pid, 'SIGTERM')
})

90
examples/duplex.js Normal file
View file

@ -0,0 +1,90 @@
/*
simple generic consumer sending an ack
*
*/
import Base from '../src/base'
const delay = time => new Promise(res=>setTimeout(()=>res(),time))
const NAME = 'example-duplex'
let opts={name:NAME+process.env.NAME, id:NAME+process.env.NAME}
let duplex = new Base(opts)
;
(async () => {
//
duplex.on('log',ev => {
switch (ev.level) {
// case 'warning':
// case 'error':
case 'testing':
// case 'fatal':
console.log(ev.level.toUpperCase(),'\n',ev)
break
}
})
// example return processor using a namespace, will take precidence over default and root namespaces
duplex.cmds = {
reply: (packet) => {
// console.log('------REPLY--via cmds namespace----')
// console.log(packet)
},
error: (packet) => {
console.log('------Error--via cmds namespace----')
console.log(packet)
}
}
duplex.addNamespace('cmds','c') // comment this out to process via root namespace
duplex.registerSocket(opts.name+'c','c','n',{name:opts.name, path:'pipe-'+process.env.DUPLEX_NAME})
duplex.registerSocket(opts.name+'s','s','n',{ path:'pipe-'+process.env.NAME, duplex:NAME+process.env.DUPLEX_NAME})
duplex.ready.addObserver(opts.name+':another-state')
duplex.on(opts.name+':process', (ready, packet) => {
console.log(opts.name,'process state changed', ready)
})
duplex.ready.subscribe(async ready=>{
console.log(opts.name,'overall ready state change', ready)
})
duplex.ready.makeObserver(NAME+process.env.DUPLEX_NAME+':process')
.subscribe(ready=>console.log(NAME+process.env.DUPLEX_NAME+':process','was observed as',ready))
let res = await duplex.init()
if (res.error) {
console.log('errors during init')
process.kill(process.pid, 'SIGTERM')
}
setTimeout(()=>{
duplex.emit(opts.name+':another-state',true)
console.log(opts.name,'setting test observer to true')
},2000)
process.once('SIGINT', function() {
console.log('Caught interrupt signal')
duplex.removeAllListeners()
process.exit()
})
process.once('SIGUSR2', async () => {
duplex.removeAllListeners()
process.exit()
})
})().catch(err => {
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)
duplex.removeAllListeners()
process.kill(process.pid, 'SIGTERM')
})

73
examples/socket.js Normal file
View file

@ -0,0 +1,73 @@
/*
simple generic client sending an ack
*
*/
import Base from '../src/base'
let opts={name:'example-socket', id:'id-example-socket'}
// if nothing set will be localhost:8080
let socket = new Base(opts)
;
(async () => {
socket.on('log',ev => {
switch (ev.level) {
// case 'trace':
// case 'debug':
// case 'info':
case 'warning':
case 'error':
case 'fatal':
case 'testing':
console.log(ev.level.toUpperCase(),'\n',ev)
break
}
})
socket.registerSocket('socket-name','s','n',{ path:'named-pipe'})
socket.ready.addObserver('test-observer')
let res = await socket.init()
if (res.error) {
console.log('errors during init')
process.kill(process.pid, 'SIGTERM')
}
socket.on('example-consumer:process', (ready,packet) => {
console.log('process state event from consumer', ready, packet.name)
})
socket.on('example-consumer-2:process', (ready,packet) => {
console.log('process state event from consumer', ready, packet.name)
})
socket.ready.subscribe(ready=>{
console.log('the socket process is ready?', ready)
})
setTimeout(()=>{
socket.emit('test-observer',true)
console.log('socket test-observer set to true')
},10000)
process.once('SIGINT', function() {
console.log('Caught interrupt signal')
socket.removeAllListeners()
process.exit()
})
process.once('SIGUSR2', async () => {
socket.removeAllListeners()
process.exit()
})
})().catch(err => {
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)
socket.removeAllListeners()
process.kill(process.pid, 'SIGTERM')
})

View file

@ -1,5 +1,5 @@
{
"ignoreRoot": [".git"],
"watch": ["node_modules/@uci/","node_modules/@uci-utils/","src/","index.js","examples","test/"],
"watch": ["node_modules/@uci/","node_modules/@uci-utils/","src/","index.js","examples","test/","examples/"],
"ignore":["examples/ws-fio-client"]
}

View file

@ -1,6 +1,6 @@
{
"name": "@uci/base",
"version": "0.1.51",
"version": "0.5.0",
"description": "Multi type and transport JSON packet communication base class. Used in UCI extended classes",
"main": "src/base",
"scripts": {
@ -39,18 +39,18 @@
"devDependencies": {
"chai": "^4.2.0",
"esm": "^3.2.25",
"mocha": "^7.0.1",
"mocha": "^7.1.0",
"nodemon": "^2.0.2"
},
"dependencies": {
"@uci-utils/bind-funcs": "^0.2.4",
"@uci-utils/logger": "^0.0.16",
"@uci-utils/ready": "^0.1.9",
"@uci/mqtt": "^0.1.13",
"@uci/socket": "^0.2.31",
"@uci/websocket": "^0.3.13",
"@uci-utils/ready": "^0.3.0",
"@uci/mqtt": "^0.3.0",
"@uci/socket": "^0.3.0",
"@uci/websocket": "^0.4.0",
"await-to-js": "^2.1.1",
"is-plain-object": "^3.0.0",
"merge-anything": "^2.4.4"
"merge-anything": "^3.0.3"
}
}

View file

@ -18,7 +18,7 @@ let log = {} // declare module wide log to be set during construction
// Community dependencies
import to from 'await-to-js'
import isPlainObject from 'is-plain-object'
import merge from 'merge-anything'
import { merge } from 'merge-anything'
// Nodejs dependencies
import EventEmitter from 'events'
@ -26,7 +26,7 @@ import EventEmitter from 'events'
import { cmdProcessor, defaultCmds, namespaces } from './processing'
// Constants
const SOCKET_INFO_KEYS = ['name', 'type', 'transport']
// const SOCKET_INFO_KEYS = ['name', 'type', 'transport']
const TRANSLATE = {
n: 'Named Pipe',
t: 'TCP',
@ -99,7 +99,6 @@ class Base extends EventEmitter {
sockets = Array.isArray(sockets) ? sockets:[sockets]
sockets.forEach(socket => this.registerSocket(socket))
}
console.log('base.js @uci/base package tag 0.1.47')
} // end constructor
@ -125,20 +124,26 @@ class Base extends EventEmitter {
// TODO ready needs to allow multiple all subscribers that get rebuilt on add/remove
const res = await this.socketsInit(sockets)
// console.log('all observer', this.ready.all)
// update ready packet and push/send that changed packet
this.ready.all.subscribe(async ready => {
this._readyPacket.ready= ready
delete (this._readyPacket.failure)
delete (this._readyPacket._header)
if (!ready) { // make a list of the failures to send
// await new Promise(res=>setTimeout(()=>res(),1000))
this._readyPacket.failures = this.ready.failed
} else delete this._readyPacket.failures
this.emit('log',{level:'testing', msg:`${this.name} has an updated state broadcasting: event>state = ${this._readyPacket.event}>${this._readyPacket.ready}`})
// setTimeout(async () => {
// console.log('ready send', await this.send(this._readyPacket)) // to any socket that this instance is connected to
// console.log('ready push',await this.push(this._readyPacket)) // to any remote consumer connected to an instance socket
// },100)
// this.emit('log',{level:'testing', msg:`${this.name} has an updated state broadcasting: event>state = ${this._readyPacket.event}>${this._readyPacket.ready}`})
let packet = Object.assign({},this._readyPacket)
// console.log('ready packet to broadcast',packet)
// console.log('broadcasting ready',ready,this.ready.state)
this.getSocketsFilter({type:'s'})
.forEach(socket=>this.getSocket(socket).conPackets[0]=packet)
setTimeout(async () => {
this.send(packet) // to any socket that this instance is connected to
this.push(packet) // to any remote consumer connected to an instance socket
// console.log('ready send:', await this.send(packet)) // to any socket that this instance is connected to
// console.log('ready push:',await this.push(packet)) // to any remote consumer connected to an instance socket
},100)
})
return res
}
@ -268,7 +273,8 @@ class Base extends EventEmitter {
if (type==='c') {
// when consumer has sucessfully connected to a socket
this.ready.addObserver(`${name}:consumer`,this._socket[name],{event:'connection:socket',condition:ev=>ev.state==='connected'})
this._socket[name].obsName = `${name}:${options.path ? options.path : `${options.host}:${options.port}`}<outbound>`
this.ready.addObserver(this._socket[name].obsName,this._socket[name],{event:'connection:socket',condition:ev=>ev.state==='connected'})
// set up listner for any pushed packets and emit locally
this._socket[name].on('pushed', packet => {
packet._header.socketName=name
@ -278,16 +284,10 @@ class Base extends EventEmitter {
if (type==='s') {
// when socket is listnening
this.ready.addObserver(`${name}:socket`,this._socket[name],{ event:'socket', condition: ev => (ev || {}).state ==='listening' })
// TODO refactor as regular listners so will know which consumer and can push only to it
this.consumerConnected(this._socket[name],{
subscribe: newConsumer => { if (newConsumer) {
this.emit('log',{level:'testing', msg:`${this.name} has new consumer connecting pushing: event>state = ${this._readyPacket.event}>${this._readyPacket.ready}`})
this.push(this._readyPacket,{socket:name})
}
}
})
this.ready.addObserver(`${name}:socket<listening>`,this._socket[name],{ event:'socket', condition: ev => (ev || {}).state ==='listening' })
// initially set conPackets, ready packets is ALWAYS the first
this._socket[name].conPackets.unshift(this._readyPacket)
if (options.duplex) this.consumerConnected(this._socket[name],{consumer:options.duplex, add:true})
}
return this.getSocketInit(name) // returns the init function (e.g. connect or create) for the socket
}
@ -319,7 +319,7 @@ class Base extends EventEmitter {
else return this._socket
}
// returns array of names of sockets that pass filter
getSocketsFilter({type,trans, active}) {
getSocketsFilter({type, trans, active}={}) {
if (trans) trans = this._validateTransport(trans)
let filtered = []
Object.keys(this._socket).forEach(name => {
@ -357,6 +357,7 @@ class Base extends EventEmitter {
* @returns {type} Description
*/
async send(name, packet) {
if (!packet || !Object.keys(packet).length) return Promise.resolve('no packet to push - aborted')
if (typeof name !== 'string') {
packet = name
let sends = []
@ -385,6 +386,7 @@ class Base extends EventEmitter {
}
async push(packet,opts={}) {
if (!packet || !Object.keys(packet).length) return Promise.resolve('no packet to push - aborted')
let sockets = this.getSocketsFilter({type:'s'})
if (!sockets.length) return Promise.resolve('no sockets on which to push')
opts.sockets = opts.sockets ? opts.sockets : (opts.socket ? [opts.socket] : [])
@ -399,7 +401,7 @@ class Base extends EventEmitter {
for (let socket of sockets) {
let hookedPacket = {}
hookedPacket = socket.beforeSend ? await socket.beforeSend.call(this,Object.assign({},packet),true) : packet
log.debug({msg:'hooked packet to push', name:socket.name, packet:hookedPacket, method:'push', line:243})
// console.log({msg:'hooked packet to push', name:socket.name, packet:hookedPacket, method:'push', line:243})
broadcast.push(socket.push.bind(socket,hookedPacket,opts))
}
return Promise.all(
@ -549,8 +551,8 @@ class Base extends EventEmitter {
name = name || consumer
add = add && consumer
const options = {event:'connection:consumer',condition:conditionHandler}
const obs = add ? this.ready.addObserver(`${name}:consumer:inbound`,socket,options) : this.ready.makeObserver(socket,options)
let oname = `${name}:consumer>${socket.name}:socket<inbound>`
const obs = add ? this.ready.addObserver(oname,socket,options) : this.ready.makeObserver(socket,options)
if (typeof subscribe ==='function') return obs.subscribe(subscribe)
return obs
} // end consumerConnected
@ -612,7 +614,7 @@ class Base extends EventEmitter {
if (!isPlainObject(res)) packet.processResult ? packet.processResult[name]=res : packet.processResult = {[name]:res}
else {
let method = (packet.processMethod || {})[name] || packet.processMethod
// TODO could support other specialized methods
// TODO could support other specialized merge methods
if (method === 'merge') {
packet = merge(packet,res)
}
@ -663,7 +665,6 @@ class Base extends EventEmitter {
return trans
}
_transport(name) {
return this._socket[name].transport
} //getter for socket transport