using '__all__' for main combination
can save combinations to map
.state hold state at last main combination change
can add observer details that can be passed on later by subscription
shareReply added to observers
added 'changed' operator that emits only on changed state or first emission
modified the map operator to accept async function via switchMap
added methods
 observerNames to get list
 failure gets name of first observer to fail in the state prop
 getValue
 addObserverDetails  (can add/ammend at any time)
master
David Kebler 2020-01-07 15:02:36 -08:00
parent be1cf69996
commit 30f9f273e7
3 changed files with 180 additions and 53 deletions

View File

@ -1,12 +1,16 @@
import { Observable } from 'rxjs' import { Observable } from 'rxjs'
import Ready from '../src/ready' import { Ready, changed, map, tap} from '../src/ready'
import { EventEmitter } from 'events' import { EventEmitter } from 'events'
let emitter = new EventEmitter() let emitter = new EventEmitter()
let verbose = process.env.VERBOSE==='true'
let combo = false
// handler: (r)=> console.log('result:',r) // handler: (r)=> console.log('result:',r)
let process = new Ready({emitter: emitter, verbose:true}) let example = new Ready({emitter: emitter, verbose:verbose})
const tObs = new Observable(subscriber => { const tObs = new Observable(subscriber => {
subscriber.next('on') subscriber.next('on')
@ -14,24 +18,63 @@ const tObs = new Observable(subscriber => {
subscriber.next('enabled') subscriber.next('enabled')
setTimeout(() => { setTimeout(() => {
subscriber.next('F') subscriber.next('F')
subscriber.next('T')
}, 2000) }, 2000)
setTimeout(() => {
subscriber.next('T')
}, 3000)
}) })
const tPromise = new Promise(function(resolve) { const tPromise = new Promise(function(resolve) {
setTimeout(()=>resolve('yes'),1500) setTimeout(()=>resolve('yes'),1000)
}) })
process.addObserver('e') example.addObserver('e',{details:{desc:'this is the e observer which comes from an emitter'}})
process.addObserver('ec',(ev)=>ev.test) example.addObserver('ce',{condition: (ev)=>ev.ready, event:'ec'})
process.addObserver('pe',emitter) example.addObserver('pe',emitter)
process.addObserver('obs',tObs) example.addObserver('obs',tObs)
process.addObserver('pr',tPromise) example.addObserver('pr',tPromise)
example.addObserver('extnd',example.getObserver('e').pipe(
map(state => {
// console.log('+++++++++++\n','extend before:',state)
if (state) {
let val = Math.floor(Math.random() * Math.floor(10))
// console.log(val)
state = val <10 ? true:false
}
// console.log('extend after:',state,'\n-------------')
return state
})
))
process.subscribe(val => console.log('---------------Ready Changed to :',val)) if (combo) {
// process.subscribe('obs',val => console.log('obs',val)) let combo = example.combineObservers('combo',['e','pr'])
.pipe(
tap(state=>console.log('combo e and pr state:',state)),
changed() // only emit if changed
)
if (combo){
console.log('subscribing to sub combination e, pr')
example.subscribe('combo',val => console.log('combo e, pr is:',val))
}
}
example.subscribe(val => {
console.log('Full Combination>>>>>>>>READY? example:',val)
console.log('failed observer', example.failure, 'details', example.details.get(example.failure))
}) // all
emitter.emit('ec',{test:true}) emitter.emit('ec',{ready:true})
emitter.emit('e','sure') emitter.emit('pe','yup')
emitter.emit('pe') emitter.emit('e')
console.log('===============done initial emitting=================')
setTimeout(async () => {
console.log('============emitting e false===================')
emitter.emit('e',false)
}
,2000)
setTimeout(async () => {
console.log('=================emitting e true================')
emitter.emit('e',true)
}
,4000)
setTimeout(async () => {console.log('timeout done')},6000)

View File

@ -1,16 +1,12 @@
{ {
"name": "@uci-utils/ready", "name": "@uci-utils/ready",
"version": "0.1.2", "version": "0.1.4",
"description": "A Class to Observe the reduced to boolean combined state of a map of observables", "description": "A Class to Observe the reduced to boolean combined state of a map of observables",
"main": "src/ready.js", "main": "src/ready.js",
"scripts": { "scripts": {
"example": "node --r esm examples/example", "example": "node -r esm examples/example",
"example:dev": "UCI_ENV=dev ./node_modules/.bin/nodemon -r esm examples/example", "example:dev": "./node_modules/.bin/nodemon -r esm examples/example",
"test": "./node_modules/.bin/mocha -r esm --timeout 30000", "test": "./node_modules/.bin/mocha -r esm --timeout 30000"
"testd": "UCI_ENV=dev ./node_modules/.bin/nodemon --exec './node_modules/.bin/mocha -r esm --timeout 30000' || exit 0",
"testdd": "UCI_LOG_LEVEL='trace' npm run testd",
"testde": "UCI_LOG_LEVEL='warn' npm run testd",
"testl": "UCI_ENV=pro UCI_LOG_PATH=./test/test.log 0 npm run test || exit 0"
}, },
"author": "David Kebler", "author": "David Kebler",
"license": "MIT", "license": "MIT",
@ -28,6 +24,7 @@
"dependencies": { "dependencies": {
"@uci-utils/to-boolean": "^0.1.1", "@uci-utils/to-boolean": "^0.1.1",
"is-observable": "^2.0.0", "is-observable": "^2.0.0",
"is-plain-object": "^3.0.0",
"p-is-promise": "^3.0.0", "p-is-promise": "^3.0.0",
"rxjs": "^6.5.4" "rxjs": "^6.5.4"
}, },

View File

@ -1,7 +1,9 @@
import { from, fromEvent, combineLatest, Subject } from 'rxjs' // new change
import { map, startWith, tap, pairwise, filter } from 'rxjs/operators' import { from, fromEvent, combineLatest, ReplaySubject } from 'rxjs'
import { switchMap, map as simpleMap, startWith, tap, pairwise, filter, shareReplay} from 'rxjs/operators'
import isObservable from 'is-observable' import isObservable from 'is-observable'
import isPromise from 'p-is-promise' import isPromise from 'p-is-promise'
import isPlainObject from 'is-plain-object'
// UCI dependencies // UCI dependencies
import { createBoolean } from '@uci-utils/to-boolean' import { createBoolean } from '@uci-utils/to-boolean'
const toBool = createBoolean({undefined:true}) // make default make null event emission cast to TRUE const toBool = createBoolean({undefined:true}) // make default make null event emission cast to TRUE
@ -13,37 +15,81 @@ class Ready extends Map {
this.emitter = typeof opts.emitter.on ==='function' ? opts.emitter : null this.emitter = typeof opts.emitter.on ==='function' ? opts.emitter : null
this.condition = opts.condition || ( (ev) => toBool(ev) ) this.condition = opts.condition || ( (ev) => toBool(ev) )
this.subscriptions = new Map() this.subscriptions = new Map()
this._state = {} this.combinations = new Map()
this.logger = new Subject() this.details = new Map()
this.combineObservers('__all__') // initialize all combination
this.logger = new ReplaySubject()
this.state = [] // holds last state of all observers
this.log = this.logger.next.bind(this.logger) this.log = this.logger.next.bind(this.logger)
if (opts.verbose) this.logger.subscribe(console.log) if (opts.verbose) this.logger.subscribe(console.log)
this._updateObserversList() // initialize
this.handler = opts.handler || console.log this.handler = opts.handler || console.log
this._first = true // tracks first emission
} }
get state() {return this._state} get observerNames(){return Array.from(this.keys())}
addObserver(name, obs, condition ) { get failure () {
if (!name) return false // name required let ret = null
let failed = this.state.some(obs=> {
ret = obs[0]
return obs[1]===false
})
return !failed ? '__none__' : ret
}
getObserver(name) {
return (this.get(name) || this.combinations.get(name||'__all__'))
}
getValue(name) { // NOT recommended. if there is any issue will return false
let obs = this.getObserver(name)
return new Promise(resolve => {
setTimeout(()=>resolve(false),50)
if (isObservable(obs)){
const sub = obs.subscribe(val => {
resolve(val)
})
sub.unsubscribe()
} else resolve(false)
})
}
addObserverDetails(name,details,overwrite) {
if (this.has(name)) {
if (!isPlainObject(details)) return false
// TODO substitue merge anything for Object.assign
this.details.set(name,overwrite ? details : Object.assign(this.details.get(name)||{},details))
return true
}
return false
}
addObserver(name, obs, opts={} ) {
// validation and defaults, obs can be emitter, osbserver, or promise
if (!name || typeof(name)!=='string') return false // name required
if (isPlainObject(obs)) { opts = obs; obs=null }
if (!(obs || this.emitter)) return false // some observable requried if (!(obs || this.emitter)) return false // some observable requried
if (typeof (obs ||{}).on ==='function') obs = fromEvent(obs,name) // it's an emitter let { condition, event, details } = opts
condition = condition || this.condition
if (typeof (obs ||{}).on ==='function') obs = fromEvent(obs, event || name) // it's an emitter
if (isPromise(obs)) obs = from(obs) // it's a promise if (isPromise(obs)) obs = from(obs) // it's a promise
if (obs && !isObservable(obs) && typeof obs==='function' && arguments.length===2) { if (obs && !isObservable(obs) && typeof obs==='function' && arguments.length===2) {
condition = obs condition = obs
obs = null // obs = null
} }
if (!obs && this.emitter) obs = fromEvent(this.emitter,name) if (!obs && this.emitter) obs = fromEvent(this.emitter,event || name)
if (!obs || !isObservable(obs)) return false if (!obs || !isObservable(obs)) return false
this.set(name, obs let xobs = obs.pipe(
.pipe( startWith(false),
tap(val => this.logger.next(`${name} emitted/resolved the value =>${JSON.stringify(val)}`)), tap(val => this.log(`${name} emitted/resolved the value =>${JSON.stringify(val)}`)),
map(condition||this.condition), map(condition),
tap(val => this.log(`boolean: ${val}`)), // tap(val => this.log(`boolean: ${val}`)),
startWith(false), shareReplay(1)
)
) )
this._updateObserversList() this.set(name, xobs)
return true this.addObserverDetails(name,details,true)
this.combineObservers('__all__') // update total combo
return xobs
} }
removeObserver(names) { removeObserver(names) {
@ -54,7 +100,7 @@ class Ready extends Map {
this.delete(name) this.delete(name)
}) })
} }
this._updateObserversList() this.combineObservers('__all__') // update total combo
} }
subscribe(name, handler) { subscribe(name, handler) {
@ -62,24 +108,65 @@ class Ready extends Map {
handler=name handler=name
name = null name = null
} }
this.subscriptions.set(name||'_primary', (name ? this.get(name):this._state).subscribe(handler||this.handler)) name = name || '__all__'
if (this.subscriptions.get(name)) this.unsubscribe(name)
let obs = this.getObserver(name)
if (!obs) return false
let subs = obs.subscribe(handler||this.handler)
this.subscriptions.set(name,subs)
return subs
} }
unsubscribe(name) { unsubscribe(name) {
this.subscriptions.get(name||'_primary').unsubscribe() if (!this.subscriptions.has(name)) return false
this.subscriptions.get(name||'__all__').unsubscribe()
return true
} }
_updateObserversList() { combineObservers(name,list) {
this._state = combineLatest(Array.from(this.values())).pipe( if (Array.isArray(name)) {list = name; name = null}
tap(states => this.log( Array.from(this.keys()).map((name,index) => [name,states[index]]))), if (name==='__all__') list = Array.from(this.keys()) // get list of all observers
map( states => states.reduce((res,state) => {return res && state},true)), if (!Array.isArray(list)) return false // can't make a combo without a list
pairwise(), if (this.has(name)) return false // can't save a combo with same name as any single oberver
filter( ([p,c]) => (p ^ c)), // only emit on change let observers = list.map(name=>this.get(name)||this.getCombination(name))
map( r => r[1] ) //remove previous if (observers.filter(obs=>!isObservable(obs)).length) return false
let combination = combineLatest(observers).pipe(
tap(states => { if (name==='__all__') this.state = list.map((name,index) => [name,states[index]])}),
map(states => states.reduce((res,state) => {return res && state},true)),
startWith(false),
changed(),
shareReplay(1)
) )
if (name) this.combinations.set(name, combination) // if name passed then save combo in Map
return combination
}
getCombination(name) {
this.combinations.get(name||'__all__')
} }
} // end class } // end class
function changed(toBoolean) {
toBoolean = toBoolean || (val => !!val)
return obs$ => obs$.pipe(
pairwise(),
filter( ([p,c]) => {
const chng = !obs$.__not_first__ || !!(toBoolean(p) ^ toBoolean(c))
obs$.__not_first__=true
return chng}),
map( r => r[1] ), //remove previous
)
}
function map(fn) {
return isAsync(fn) ? switchMap.call(this,fn) : simpleMap.call(this,fn)
}
function isAsync(fn) {
return (isPromise(fn) || fn.constructor.name === 'AsyncFunction')
}
export default Ready export default Ready
export { Ready } export { Ready, changed, map, tap}