86 lines
2.8 KiB
JavaScript
86 lines
2.8 KiB
JavaScript
import { from, fromEvent, combineLatest, Subject } from 'rxjs'
|
|
import { map, startWith, tap, pairwise, filter } from 'rxjs/operators'
|
|
import isObservable from 'is-observable'
|
|
import isPromise from 'p-is-promise'
|
|
// UCI dependencies
|
|
import { createBoolean } from '@uci-utils/to-boolean'
|
|
const toBool = createBoolean({undefined:true}) // make default make null event emission cast to TRUE
|
|
|
|
class Ready extends Map {
|
|
constructor(opts) {
|
|
super(opts.observables)
|
|
// TODO support setting registration in options
|
|
this.emitter = typeof opts.emitter.on ==='function' ? opts.emitter : null
|
|
this.condition = opts.condition || ( (ev) => toBool(ev) )
|
|
this.subscriptions = new Map()
|
|
this._state = {}
|
|
this.logger = new Subject()
|
|
this.log = this.logger.next.bind(this.logger)
|
|
if (opts.verbose) this.logger.subscribe(console.log)
|
|
this._updateObserversList() // initialize
|
|
this.handler = opts.handler || console.log
|
|
}
|
|
|
|
get state() {return this._state}
|
|
|
|
addObserver(name, obs, condition ) {
|
|
if (!name) return false // name required
|
|
if (!(obs || this.emitter)) return false // some observable requried
|
|
if (typeof (obs ||{}).on ==='function') obs = fromEvent(obs,name) // it's an emitter
|
|
if (isPromise(obs)) obs = from(obs) // it's a promise
|
|
if (obs && !isObservable(obs) && typeof obs==='function' && arguments.length===2) {
|
|
condition = obs
|
|
obs = null
|
|
}
|
|
if (!obs && this.emitter) obs = fromEvent(this.emitter,name)
|
|
if (!obs || !isObservable(obs)) return false
|
|
this.set(name, obs
|
|
.pipe(
|
|
tap(val => this.logger.next(`${name} emitted/resolved the value =>${JSON.stringify(val)}`)),
|
|
map(condition||this.condition),
|
|
tap(val => this.log(`boolean: ${val}`)),
|
|
startWith(false),
|
|
)
|
|
)
|
|
this._updateObserversList()
|
|
return true
|
|
}
|
|
|
|
removeObserver(names) {
|
|
if (!names) this.clear
|
|
else {
|
|
if (!Array.isArray(names)) names = [names]
|
|
names.forEach(name => {
|
|
this.delete(name)
|
|
})
|
|
}
|
|
this._updateObserversList()
|
|
}
|
|
|
|
subscribe(name, handler) {
|
|
if (typeof name ==='function') {
|
|
handler=name
|
|
name = null
|
|
}
|
|
this.subscriptions.set(name||'_primary', (name ? this.get(name):this._state).subscribe(handler||this.handler))
|
|
}
|
|
|
|
unsubscribe(name) {
|
|
this.subscriptions.get(name||'_primary').unsubscribe()
|
|
}
|
|
|
|
_updateObserversList() {
|
|
this._state = combineLatest(Array.from(this.values())).pipe(
|
|
tap(states => this.log( Array.from(this.keys()).map((name,index) => [name,states[index]]))),
|
|
map( states => states.reduce((res,state) => {return res && state},true)),
|
|
pairwise(),
|
|
filter( ([p,c]) => (p ^ c)), // only emit on change
|
|
map( r => r[1] ) //remove previous
|
|
)
|
|
}
|
|
|
|
} // end class
|
|
|
|
export default Ready
|
|
export { Ready }
|