246 lines
9.1 KiB
JavaScript
246 lines
9.1 KiB
JavaScript
// new change
|
|
import { from, fromEvent, combineLatest, BehaviorSubject } from 'rxjs'
|
|
import { switchMap, map as simpleMap, startWith, tap, pairwise, filter, shareReplay} from 'rxjs/operators'
|
|
import isObservable from 'is-observable'
|
|
import isPromise from 'p-is-promise'
|
|
import isPlainObject from 'is-plain-object'
|
|
// UCI dependencies
|
|
import { createBoolean } from '@uci-utils/to-boolean'
|
|
|
|
class Ready extends Map {
|
|
constructor(opts={}) {
|
|
super(opts.observables)
|
|
this.emitter = isEmitter(opts.emitter) ? opts.emitter : null
|
|
const toBool = createBoolean(opts.boolean)
|
|
this.toBoolean = (value) => {
|
|
// can emit plain object with ready prop, or array with first value as ready value or just ready value
|
|
if (isPlainObject(value)) value = (value.ready || value.online || value.active || false)
|
|
if (Array.isArray(value)) value=value[0]
|
|
return toBool(value)
|
|
}
|
|
this.condition = opts.condition || ( (ev) => this.toBoolean(ev) )
|
|
this._subscriptions = new Map()
|
|
this._combinations = new Map()
|
|
this._all = {} // where all observer is kept
|
|
this._combineAll()
|
|
this.logger = new BehaviorSubject()
|
|
this.state = [] // holds last emitted state of all combination
|
|
this.log = this.logger.next.bind(this.logger)
|
|
if (opts.verbose||process.env.UCI_READY_VERBOSE==='true') this.logger.subscribe(console.log)
|
|
this.handler = opts.handler || ((ready) => {console.log('default handler', ready)})
|
|
this._first = true // tracks first emission
|
|
}
|
|
|
|
get observers(){return Array.from(this.keys())}
|
|
get combinations(){return Array.from(this._combinations.keys())}
|
|
get all() { return this._all}
|
|
|
|
get failed() {
|
|
let failed = this.state
|
|
.filter(([,ready]) => {
|
|
return !ready
|
|
})
|
|
.map(obs=> {return {name:obs[0], details: this.getObserverDetails(obs[0])} })
|
|
|
|
return failed.length ? failed : '__none__'
|
|
}
|
|
|
|
getObserverDetails(name) { return (this.get(name)||{}).details}
|
|
|
|
setObserverDetails(name,details,overwrite) {
|
|
if (this.has(name)) {
|
|
if (details==null) return false
|
|
if (!isPlainObject(details)) details= {desc:details}
|
|
// TODO substitue merge anything for Object.assign
|
|
this.get(name).details = overwrite ? details : Object.assign(this.get(name).details || {},details)
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
getObserver(name) {
|
|
return (this.get(name) || {}).obs
|
|
}
|
|
|
|
getCombination(name) {
|
|
return this._combinations.get(name)
|
|
}
|
|
|
|
getValue(name) { // NOT recommended. if there is any issue will return false
|
|
let obs = this.getObserver(name)
|
|
return new Promise(resolve => {
|
|
setTimeout(()=>resolve(false),100)
|
|
if (isObservable(obs)){
|
|
const sub = obs.subscribe(val => {
|
|
console.log('in value subscriber', name,val)
|
|
resolve(val)
|
|
})
|
|
sub.unsubscribe()
|
|
} else resolve(false)
|
|
})
|
|
}
|
|
|
|
makeObserver(obs, opts={}) {
|
|
// validation and defaults, obs can be emitter, osbserver, or promise
|
|
if (isPlainObject(obs)) { opts = obs; obs=null }
|
|
if (!(obs || this.emitter)) return false // some observable requried
|
|
let { condition, event, name} = opts
|
|
condition = condition || this.condition
|
|
if (isEmitter(obs) && (event || name)) obs = fromEvent(obs, event || name ) // it's an emitter
|
|
if (isPromise(obs)) obs = from(obs) // it's a promise
|
|
if ((!obs || typeof obs==='string') && this.emitter && (event || name || obs)) obs = fromEvent(this.emitter,(event || name || obs))
|
|
if (!obs || !isObservable(obs)) return false
|
|
let xobs = obs.pipe(
|
|
tap(val => this.log(`${name} emitted/resolved the value =>${JSON.stringify(val)}`)),
|
|
// TODO retain plain object if it has a ready property
|
|
map(condition),
|
|
// tap(val => console.log(`boolean: ${val}`)),
|
|
tap(val => this.log(`boolean: ${val}`)),
|
|
map(val => {return opts.reverse ? !val : val}),
|
|
startWith(false),
|
|
// shareReplay({refCount:true, bufferSize:1}),
|
|
shareReplay(1),
|
|
)
|
|
xobs._readyType = true
|
|
return xobs
|
|
}
|
|
|
|
addObserver(name, obs, opts={} ) {
|
|
// validation and defaults, obs can be emitter, osbserver, or promise
|
|
if (!name || typeof(name)!=='string') return false // name required
|
|
if (isPlainObject(obs)) { opts = obs; obs=null }
|
|
if (this.has(name) && !opts.overwrite) return false // don't over write exiting observer
|
|
else this.removeObserver(name)
|
|
opts.name = name
|
|
if (!(obs || {})._readyType) obs = this.makeObserver(obs, opts)
|
|
if (!isObservable(obs)) return false
|
|
let sub = obs.subscribe() // internal subscriber
|
|
this.set(name, {obs:obs, sub:sub})
|
|
this.setObserverDetails(name,opts.details)
|
|
this._combineAll()
|
|
return obs
|
|
}
|
|
// will remove combination as well
|
|
removeObserver(names) {
|
|
if (!names) names = this.observers // remove all
|
|
if (!Array.isArray(names)) names = [names]
|
|
names.forEach(name => {
|
|
const sub = (this.get(name)||{}).sub
|
|
if (sub) sub.unsubscribe() // remove attached subscription
|
|
this.unsubscribe(name) // remove any manual subscription
|
|
this.delete(name)
|
|
})
|
|
// update total combo
|
|
this._combineAll()
|
|
// TODO update affected combinations
|
|
}
|
|
|
|
// TODO have combineObservers call this
|
|
makeCombination(observers,list){
|
|
observers = observers.filter(obs=>(obs || {})._readyType)
|
|
if (!observers.length) return false
|
|
let combination = combineLatest(observers).pipe(
|
|
tap(states => { if (list) this.log(list.map((name,index) => [name,states[index]]))}),
|
|
map(states => states.reduce((res,state) => {return res && state},true)),
|
|
changed(), //filters out emission if it is unchanged from last TODO replace with distinctUntilChanged
|
|
shareReplay(1)
|
|
)
|
|
combination._readyType = true
|
|
return combination
|
|
}
|
|
|
|
// TODO add option for whether changed is enabled (default) or not and store with combo for remake
|
|
combineObservers(name,list,opts={}) {
|
|
if (!name || typeof(name)!=='string') return false // name required
|
|
if (this.has(name)) return false // can't save a combo with same name as any single oberver
|
|
if (!Array.isArray(list)) return false // need a list of observers
|
|
if (this._combinations.has(name)) {
|
|
if (opts.overwrite) this.removeCombination(name)
|
|
else return false
|
|
}
|
|
let observers = list.map(name=>{return name._readyType ? name : this.getObserver(name)})
|
|
const combination = this.makeCombination(observers,list)
|
|
this._combinations.set(name, combination) // if name passed then save combo in Map
|
|
return combination
|
|
}
|
|
|
|
removeCombination(names){
|
|
if (!names) names = this.combinations // remove all
|
|
if (!Array.isArray(names)) names = [names]
|
|
names.forEach(name => {
|
|
this.unsubscribe(name) // remove any resigtered subscription
|
|
this._combinations.delete(name)
|
|
})
|
|
}
|
|
|
|
subscribe(name, handler, opts={}) {
|
|
// only one subscription at a time per observer or combination from this method
|
|
if (typeof name ==='function') {
|
|
opts=handler||{}
|
|
handler=name
|
|
name = null
|
|
}
|
|
name = name ||'__all__'
|
|
handler = handler || (this._subscriptions.get(name)||{}).handler || this.handler // will resubscribe exisiting handler
|
|
let obs = name==='__all__' ? this._all : (this.getObserver(name)||this.getCombination(name))
|
|
if (!obs) return false
|
|
if (this._subscriptions.has(name)) this._subscriptions.get(name).subs.unsubscribe()
|
|
let subs = obs.subscribe(handler)
|
|
// TODO allow reseting additional subscription on add/remove
|
|
if (!opts.add) this._subscriptions.set(name, {subs:subs,handler:handler})
|
|
return subs
|
|
}
|
|
|
|
unsubscribe(name) {
|
|
name = name ||'__all__'
|
|
if (!this._subscriptions.has(name)) return false
|
|
this._subscriptions.get(name).subs.unsubscribe()
|
|
this._subscriptions.delete(name)
|
|
return true
|
|
}
|
|
|
|
_combineAll() {
|
|
let observers = this.observers.map(name=>this.getObserver(name))
|
|
this._all = combineLatest(observers).pipe(
|
|
tap(states => { this.state = this.observers.map((name,index) => [name,states[index]]) }),
|
|
tap(states => { this.log(this.observers.map((name,index) => [name,states[index]])) }),
|
|
map(states => states.reduce((res,state) => {return res && state},true)),
|
|
changed(), //filters out emission if it is unchanged from last
|
|
)
|
|
if (this._subscriptions.has('__all__')) this.subscribe() // will resubscribe with same handler
|
|
}
|
|
|
|
} // end class
|
|
|
|
|
|
function changed(toBoolean) {
|
|
toBoolean = toBoolean || (val => !!val)
|
|
return obs$ => obs$.pipe(
|
|
startWith(false),
|
|
pairwise(),
|
|
filter( ([p,c]) => {
|
|
const chng = !obs$.__not_first__ || !!(toBoolean(p) ^ toBoolean(c))
|
|
obs$.__not_first__=true
|
|
return chng}),
|
|
map( r => r[1] ), //remove previous
|
|
)
|
|
}
|
|
|
|
function map(fn) {
|
|
return isAsync(fn) ? switchMap.call(this,fn) : simpleMap.call(this,fn)
|
|
}
|
|
|
|
function isEmitter(emitter) {
|
|
if (!emitter) return false
|
|
let check = ['on','emit']
|
|
return check.reduce((acc,fn)=> acc && typeof emitter[fn] ==='function',true)
|
|
}
|
|
|
|
function isAsync(fn) {
|
|
if (typeof fn !== 'function') return false
|
|
return (isPromise(fn) || fn.constructor.name === 'AsyncFunction')
|
|
}
|
|
|
|
export default Ready
|
|
export { Ready, changed, map, tap}
|