// new change import { from, fromEvent, combineLatest, BehaviorSubject } from 'rxjs' import { switchMap, map as simpleMap, startWith, tap, pairwise, filter, shareReplay} from 'rxjs/operators' import isObservable from 'is-observable' import isPromise from 'p-is-promise' import isPlainObject from 'is-plain-object' // UCI dependencies import { createBoolean } from '@uci-utils/to-boolean' class Ready extends Map { constructor(opts={}) { super(opts.observables) this.emitter = isEmitter(opts.emitter) ? opts.emitter : null const toBool = createBoolean(opts.boolean) this.toBoolean = (value) => { // can emit plain object with ready prop, or array with first value as ready value or just ready value if (isPlainObject(value)) value = (value.ready || value.online || value.active || false) if (Array.isArray(value)) value=value[0] return toBool(value) } this.condition = opts.condition || ( (ev) => this.toBoolean(ev) ) this._subscriptions = new Map() this._combinations = new Map() this._all = {} // where all observer is kept this._combineAll() this.logger = new BehaviorSubject() this.state = [] // holds last emitted state of all combination this.log = this.logger.next.bind(this.logger) if (opts.verbose||process.env.UCI_READY_VERBOSE==='true') this.logger.subscribe(console.log) this.handler = opts.handler || ((ready) => {console.log('default handler', ready)}) this._first = true // tracks first emission } get observers(){return Array.from(this.keys())} get combinations(){return Array.from(this._combinations.keys())} get all() { return this._all} get failed() { let failed = this.state .filter(([,ready]) => { return !ready }) .map(obs=> {return {name:obs[0], details: this.getObserverDetails(obs[0])} }) return failed.length ? failed : '__none__' } getObserverDetails(name) { return (this.get(name)||{}).details} setObserverDetails(name,details,overwrite) { if (this.has(name)) { if (details==null) return false if (!isPlainObject(details)) details= {desc:details} // TODO substitue merge anything for Object.assign this.get(name).details = overwrite ? details : Object.assign(this.get(name).details || {},details) return true } return false } getObserver(name) { return (this.get(name) || {}).obs } getCombination(name) { return this._combinations.get(name) } getValue(name) { // NOT recommended. if there is any issue will return false let obs = this.getObserver(name) return new Promise(resolve => { setTimeout(()=>resolve(false),100) if (isObservable(obs)){ const sub = obs.subscribe(val => { console.log('in value subscriber', name,val) resolve(val) }) sub.unsubscribe() } else resolve(false) }) } makeObserver(obs, opts={}) { // validation and defaults, obs can be emitter, osbserver, or promise if (isPlainObject(obs)) { opts = obs; obs=null } if (!(obs || this.emitter)) return false // some observable requried let { condition, event, name} = opts condition = condition || this.condition if (isEmitter(obs) && (event || name)) obs = fromEvent(obs, event || name ) // it's an emitter if (isPromise(obs)) obs = from(obs) // it's a promise if ((!obs || typeof obs==='string') && this.emitter && (event || name || obs)) obs = fromEvent(this.emitter,(event || name || obs)) if (!obs || !isObservable(obs)) return false let xobs = obs.pipe( tap(val => this.log(`${name} emitted/resolved the value =>${JSON.stringify(val)}`)), // TODO retain plain object if it has a ready property map(condition), // tap(val => console.log(`boolean: ${val}`)), tap(val => this.log(`boolean: ${val}`)), map(val => {return opts.reverse ? !val : val}), startWith(false), // shareReplay({refCount:true, bufferSize:1}), shareReplay(1), ) xobs._readyType = true return xobs } addObserver(name, obs, opts={} ) { // validation and defaults, obs can be emitter, osbserver, or promise if (!name || typeof(name)!=='string') return false // name required if (isPlainObject(obs)) { opts = obs; obs=null } if (this.has(name) && !opts.overwrite) return false // don't over write exiting observer else this.removeObserver(name) opts.name = name if (!(obs || {})._readyType) obs = this.makeObserver(obs, opts) if (!isObservable(obs)) return false let sub = obs.subscribe() // internal subscriber this.set(name, {obs:obs, sub:sub}) this.setObserverDetails(name,opts.details) this._combineAll() return obs } // will remove combination as well removeObserver(names) { if (!names) names = this.observers // remove all if (!Array.isArray(names)) names = [names] names.forEach(name => { const sub = (this.get(name)||{}).sub if (sub) sub.unsubscribe() // remove attached subscription this.unsubscribe(name) // remove any manual subscription this.delete(name) }) // update total combo this._combineAll() // TODO update affected combinations } // TODO have combineObservers call this makeCombination(observers,list){ observers = observers.filter(obs=>(obs || {})._readyType) if (!observers.length) return false let combination = combineLatest(observers).pipe( tap(states => { if (list) this.log(list.map((name,index) => [name,states[index]]))}), map(states => states.reduce((res,state) => {return res && state},true)), changed(), //filters out emission if it is unchanged from last TODO replace with distinctUntilChanged shareReplay(1) ) combination._readyType = true return combination } // TODO add option for whether changed is enabled (default) or not and store with combo for remake combineObservers(name,list,opts={}) { if (!name || typeof(name)!=='string') return false // name required if (this.has(name)) return false // can't save a combo with same name as any single oberver if (!Array.isArray(list)) return false // need a list of observers if (this._combinations.has(name)) { if (opts.overwrite) this.removeCombination(name) else return false } let observers = list.map(name=>{return name._readyType ? name : this.getObserver(name)}) const combination = this.makeCombination(observers,list) this._combinations.set(name, combination) // if name passed then save combo in Map return combination } removeCombination(names){ if (!names) names = this.combinations // remove all if (!Array.isArray(names)) names = [names] names.forEach(name => { this.unsubscribe(name) // remove any resigtered subscription this._combinations.delete(name) }) } subscribe(name, handler, opts={}) { // only one subscription at a time per observer or combination from this method if (typeof name ==='function') { opts=handler||{} handler=name name = null } name = name ||'__all__' handler = handler || (this._subscriptions.get(name)||{}).handler || this.handler // will resubscribe exisiting handler let obs = name==='__all__' ? this._all : (this.getObserver(name)||this.getCombination(name)) if (!obs) return false if (this._subscriptions.has(name)) this._subscriptions.get(name).subs.unsubscribe() let subs = obs.subscribe(handler) // TODO allow reseting additional subscription on add/remove if (!opts.add) this._subscriptions.set(name, {subs:subs,handler:handler}) return subs } unsubscribe(name) { name = name ||'__all__' if (!this._subscriptions.has(name)) return false this._subscriptions.get(name).subs.unsubscribe() this._subscriptions.delete(name) return true } _combineAll() { let observers = this.observers.map(name=>this.getObserver(name)) this._all = combineLatest(observers).pipe( tap(states => { this.state = this.observers.map((name,index) => [name,states[index]]) }), tap(states => { this.log(this.observers.map((name,index) => [name,states[index]])) }), map(states => states.reduce((res,state) => {return res && state},true)), changed(), //filters out emission if it is unchanged from last ) if (this._subscriptions.has('__all__')) this.subscribe() // will resubscribe with same handler } } // end class function changed(toBoolean) { toBoolean = toBoolean || (val => !!val) return obs$ => obs$.pipe( startWith(false), pairwise(), filter( ([p,c]) => { const chng = !obs$.__not_first__ || !!(toBoolean(p) ^ toBoolean(c)) obs$.__not_first__=true return chng}), map( r => r[1] ), //remove previous ) } function map(fn) { return isAsync(fn) ? switchMap.call(this,fn) : simpleMap.call(this,fn) } function isEmitter(emitter) { if (!emitter) return false let check = ['on','emit'] return check.reduce((acc,fn)=> acc && typeof emitter[fn] ==='function',true) } function isAsync(fn) { if (typeof fn !== 'function') return false return (isPromise(fn) || fn.constructor.name === 'AsyncFunction') } export default Ready export { Ready, changed, map, tap}