uci-utils-ready/src/ready.js

258 lines
9.8 KiB
JavaScript

// new change
import { from, fromEvent, combineLatest, BehaviorSubject } from 'rxjs'
import { switchMap, map as simpleMap, startWith, tap, pairwise, filter, shareReplay} from 'rxjs/operators'
import isObservable from 'is-observable'
import isPromise from 'p-is-promise'
import isPlainObject from 'is-plain-object'
// UCI dependencies
import { createBoolean } from '@uci-utils/to-boolean'
class Ready extends Map {
constructor(opts={}) {
super(opts.observables)
this.emitter = isEmitter(opts.emitter) ? opts.emitter : null
const toBool = createBoolean(opts.boolean)
this.toBoolean = (value) => {
// can emit plain object with ready prop, or array with first value as ready value or just ready value
if (isPlainObject(value)) value = (value.ready || value.online || value.active || false)
if (Array.isArray(value)) value=value[0]
return toBool(value)
}
this.condition = opts.condition || ( (ev) => this.toBoolean(ev) )
this._subscriptions = new Map()
this._combinations = new Map()
this._all = {} // where all observer is kept
this._combineAll()
this.logger = new BehaviorSubject()
this.state = [] // holds last emitted state of all combination
this.log = this.logger.next.bind(this.logger)
if (opts.verbose||process.env.UCI_READY_VERBOSE==='true') this.logger.subscribe(console.log)
this.handler = opts.handler || ((ready) => {console.log('default handler', ready)})
this._first = true // tracks first emission
console.log('@uci-utils/ready package tag 0.1.10')
}
get observers(){return Array.from(this.keys())}
get combinations(){return Array.from(this._combinations.keys())}
get all() { return this._all}
get failed() {
// console.log('making failures from',this.state)
let failed = this.state
.filter(([,ready]) => {
// console.log('in filter', ready)
// return !await this.getValue(name)
// ret = obs[0]
return !ready
})
.map(obs=> {return {name:obs[0], details: this.getObserverDetails(obs[0])} })
// console.log('failed as filtered', failed)
return failed.length ? failed : '__none__'
}
getObserverDetails(name) { return (this.get(name)||{}).details}
setObserverDetails(name,details,overwrite) {
if (this.has(name)) {
if (details==null) return false
if (!isPlainObject(details)) details= {desc:details}
// TODO substitue merge anything for Object.assign
this.get(name).details = overwrite ? details : Object.assign(this.get(name).details || {},details)
return true
}
return false
}
getObserver(name) {
return (this.get(name) || {}).obs
// return (name==null ||'__all__') ? this._all : ( (this.get(name) || {}).obs || this._combinations.get(name) || false )
}
getCombination(name) {
return this._combinations.get(name)
}
getValue(name) { // NOT recommended. if there is any issue will return false
let obs = this.getObserver(name)
return new Promise(resolve => {
setTimeout(()=>resolve(false),100)
if (isObservable(obs)){
const sub = obs.subscribe(val => {
console.log('in value subscriber', name,val)
resolve(val)
})
sub.unsubscribe()
} else resolve(false)
})
}
makeObserver(obs, opts={}) {
// validation and defaults, obs can be emitter, osbserver, or promise
if (isPlainObject(obs)) { opts = obs; obs=null }
if (!(obs || this.emitter)) return false // some observable requried
let { condition, event, name} = opts
condition = condition || this.condition
if (isEmitter(obs) && (event || name)) obs = fromEvent(obs, event || name ) // it's an emitter
if (isPromise(obs)) obs = from(obs) // it's a promise
if ((!obs || typeof obs==='string') && this.emitter && (event || name || obs)) obs = fromEvent(this.emitter,(event || name || obs))
if (!obs || !isObservable(obs)) return false
let xobs = obs.pipe(
tap(val => this.log(`${name} emitted/resolved the value =>${JSON.stringify(val)}`)),
// tap(val => console.log(`${name} emitted/resolved the value =>${JSON.stringify(val)}`)),
// TODO retain plain object if it has a ready property
map(condition),
// tap(val => console.log(`boolean: ${val}`)),
tap(val => this.log(`boolean: ${val}`)),
map(val => {
// if (opts.reverse) console.log('reversing', val, opts.reverse ? !val : val)
return opts.reverse ? !val : val}),
startWith(false),
// shareReplay({refCount:true, bufferSize:1}),
shareReplay(1),
)
xobs._readyType = true
return xobs
}
addObserver(name, obs, opts={} ) {
// validation and defaults, obs can be emitter, osbserver, or promise
if (!name || typeof(name)!=='string') return false // name required
if (isPlainObject(obs)) { opts = obs; obs=null }
if (this.has(name) && !opts.overwrite) return false // don't over write exiting observer
else this.removeObserver(name)
opts.name = name
if (!(obs || {})._readyType) obs = this.makeObserver(obs, opts)
if (!isObservable(obs)) return false
let sub = obs.subscribe() // internal subscriber
// let sub = null
this.set(name, {obs:obs, sub:sub})
this.setObserverDetails(name,opts.details)
this._combineAll()
return obs
}
// will remove combination as well
removeObserver(names) {
if (!names) names = this.observers // remove all
if (!Array.isArray(names)) names = [names]
names.forEach(name => {
const sub = (this.get(name)||{}).sub
if (sub) sub.unsubscribe() // remove attached subscription
this.unsubscribe(name) // remove any manual subscription
this.delete(name)
})
// update total combo
this._combineAll()
// TODO update affected combinations
}
// TODO have combineObservers call this
makeCombination(observers,list){
observers = observers.filter(obs=>obs._readyType)
if (!observers.length) return false
let combination = combineLatest(observers).pipe(
tap(states => { if (list) this.log(list.map((name,index) => [name,states[index]]))}),
map(states => states.reduce((res,state) => {return res && state},true)),
changed(), //filters out emission if it is unchanged from last TODO replace with distinctUntilChanged
shareReplay(1)
)
return combination
}
// TODO add option for whether changed is enabled (default) or not and store with combo for remake
combineObservers(name,list,opts={}) {
if (!name || typeof(name)!=='string') return false // name required
if (this.has(name)) return false // can't save a combo with same name as any single oberver
if (!Array.isArray(list)) return false // need a list of observers
if (this._combinations.has(name)) {
if (opts.overwrite) this.removeCombination(name)
else return false
}
let observers = list.map(name=>{return name._readyType ? name : this.getObserver(name)})
const combination = this.makeCombination(observers,list)
this._combinations.set(name, combination) // if name passed then save combo in Map
return combination
}
removeCombination(names){
if (!names) names = this.combinations // remove all
if (!Array.isArray(names)) names = [names]
names.forEach(name => {
// const sub = (this._combinations.get(name)||{}).sub
// if (sub) sub.unsubscribe() // remove attached subscription
this.unsubscribe(name) // remove any resigtered subscription
this._combinations.delete(name)
})
}
subscribe(name, handler, opts={}) {
// only one subscription at a time per observer or combination from this method
if (typeof name ==='function') {
opts=handler||{}
handler=name
name = null
}
name = name ||'__all__'
handler = handler || (this._subscriptions.get(name)||{}).handler || this.handler // will resubscribe exisiting handler
let obs = name==='__all__' ? this._all : (this.getObserver(name)||this.getCombination(name))
if (!obs) return false
if (this._subscriptions.has(name)) this._subscriptions.get(name).subs.unsubscribe()
let subs = obs.subscribe(handler)
// TODO allow reseting additional subscription on add/remove
if (!opts.add) this._subscriptions.set(name, {subs:subs,handler:handler})
return subs
}
unsubscribe(name) {
name = name ||'__all__'
if (!this._subscriptions.has(name)) return false
this._subscriptions.get(name).subs.unsubscribe()
this._subscriptions.delete(name)
return true
}
_combineAll() {
let observers = this.observers.map(name=>this.getObserver(name))
this._all = combineLatest(observers).pipe(
tap(states => { this.state = this.observers.map((name,index) => [name,states[index]]) }),
tap(states => { this.log(this.observers.map((name,index) => [name,states[index]])) }),
map(states => states.reduce((res,state) => {return res && state},true)),
changed(), //filters out emission if it is unchanged from last
)
if (this._subscriptions.has('__all__')) this.subscribe() // will resubscribe with same handler
}
} // end class
function changed(toBoolean) {
toBoolean = toBoolean || (val => !!val)
return obs$ => obs$.pipe(
startWith(false),
pairwise(),
filter( ([p,c]) => {
const chng = !obs$.__not_first__ || !!(toBoolean(p) ^ toBoolean(c))
obs$.__not_first__=true
return chng}),
map( r => r[1] ), //remove previous
)
}
function map(fn) {
return isAsync(fn) ? switchMap.call(this,fn) : simpleMap.call(this,fn)
}
function isEmitter(emitter) {
if (!emitter) return false
let check = ['on','emit']
return check.reduce((acc,fn)=> acc && typeof emitter[fn] ==='function',true)
}
function isAsync(fn) {
if (typeof fn !== 'function') return false
return (isPromise(fn) || fn.constructor.name === 'AsyncFunction')
}
export default Ready
export { Ready, changed, map, tap}