diff --git a/examples/example.js b/examples/example.js index 613a087..2d6df39 100644 --- a/examples/example.js +++ b/examples/example.js @@ -1,16 +1,10 @@ -import { Observable } from 'rxjs' -import { Ready, changed, map, tap} from '../src/ready' +import { Ready } from '../src/ready' import { EventEmitter } from 'events' let emitter = new EventEmitter() // let verbose = process.env.VERBOSE==='true' - -let combo = true - -// handler: (r)=> console.log('result:',r) - -const late=3000 +const late=500 let example = new Ready({emitter: emitter}) @@ -27,82 +21,32 @@ let subscribe = ()=>{ if (!late) subscribe() else setTimeout(subscribe,late) -const tObs = new Observable(subscriber => { - subscriber.next('on') - subscriber.next('off') - subscriber.next('enabled') - setTimeout(() => { - subscriber.next('F') - }, 7000) - setTimeout(() => { - subscriber.next('T') - }, 8000) -}) - - -example.addObserver('obs',tObs,{details:{desc:'this is a constructed observable from the Observable Class'}}) - -const tPromise = new Promise(function(resolve) { - setTimeout(()=>{ - console.log('promise observer is resolving') - resolve('yes') - },1000) -}) - -example.addObserver('e',{details:{desc:'this is the e observer which comes from an emitter'}}) -example.addObserver('ce',{condition: (ev)=>ev.ready, event:'ec'}) -example.addObserver('pe',emitter) -example.addObserver('pr',tPromise) -example.addObserver('extnd',example.getObserver('e').pipe( - map(state => { - // console.log('+++++++++++\n','extend before:',state) - if (state) { - let val = Math.floor(Math.random() * Math.floor(10)) - // console.log(val) - state = val <11 ? true:false - } - // console.log('extend after:',state,'\n-------------') - return state - }) -),{details:'an observer extension of the e observer'}) - -if (combo) { - let combo = example.combineObservers('combo',['e','pr']) - .pipe( - tap(state=>console.log('combo e and pr state:',state)), - changed() // only emit if changed - ) - - if (combo){ - console.log('a combination of e and pr was made\n','combinations are not added to \'all\' list') - console.log('subscribing to sub combination e, pr') - example.subscribe('combo',val => console.log('combo e, pr is:',val)) - } -} - -let madeonly = example.makeObserver('madeonly') - -console.log('observer \'madeonly\' was only made and was not added so not in list \n',madeonly) -console.log('list of observers made and added to the all combination\n',example.observerNames) - - - -emitter.emit('ec',{ready:true}) -emitter.emit('pe','yup') -emitter.emit('e') -console.log('===============done initial emitting, pe,e,ec/ce=================') +example.addObserver('a',{details:{desc:'this is the a observer which comes from an emitter'}}) +// .subscribe(ev=>{console.log('a',ev)}) +example.addObserver('b') +// .subscribe(ev=>{console.log('b',ev)}) +example.addObserver('c') +// .subscribe(ev=>{console.log('c',ev)}) +console.log(' observers added>',example.observers) +example.combineObservers('a:b',['a','b']).subscribe(ready=>console.log('ab combo state>',ready)) +console.log('-------------emitting, b true -----------------') +emitter.emit('b',true) +console.log('-------------emitting, c true -----------------') +emitter.emit('c',true) +console.log('===============done emitting =================') setTimeout(async () => { - console.log('============emitting e, pe false===================') - example.setObserverDetails('e',{moreinfo:'an additional property of details added later'}) - emitter.emit('e',false) - emitter.emit('pe',false) -} - ,2000) + console.log('============emitting a true after 1 sec===================') + emitter.emit('a',true) +},1000) setTimeout(async () => { - console.log('=================emitting e true, removing pe which is false ================') - emitter.emit('e',true) - example.removeObserver('pe') -} - ,4000) + console.log('============emitting a false after 2 sec===================') + emitter.emit('a',false) -setTimeout(async () => {console.log('timeout done')},6000) +},2000) +setTimeout(async () => { + console.log('============removing a after 3 sec===================') + example.removeObserver('a') + console.log('observers after removal>',example.observers) +},3000) + +setTimeout(async () => {console.log('timeout done');process.exit()},10000) diff --git a/examples/example2.js b/examples/example2.js new file mode 100644 index 0000000..9956462 --- /dev/null +++ b/examples/example2.js @@ -0,0 +1,117 @@ +import { Observable } from 'rxjs' +import { Ready, readyChanged as changed, map, tap} from '../src/ready' +import { EventEmitter } from 'events' + +let emitter = new EventEmitter() + +// let verbose = process.env.VERBOSE==='true' +let combo = false +const late=3000 + +let example = new Ready({emitter: emitter}) + +console.log('bound change function for use in customized observer', changed) + +console.log('Boolean TEST: {online:\'yes\'}',example.toBoolean({online:'yes'})) +console.log('Boolean TEST: \'yes\'',example.toBoolean('yes')) + +let subscribe = ()=>{ + console.log('subscribing at',late, 'ms') + example.subscribe(ready => { + console.log(`-----------Subscriber at ${late} ms--------------?`,ready) + console.log('the failed observer:', example.failure, ',details:', example.getObserverDetails(example.failure)) + console.log('the total state', example.state) + console.log('---------------------------------------') + }) +} + +if (!late) subscribe() +else setTimeout(subscribe,late) + +const tObs = new Observable(subscriber => { + subscriber.next('on') + subscriber.next('off') + subscriber.next('enabled') + setTimeout(() => { + subscriber.next('F') + }, 7000) + setTimeout(() => { + subscriber.next('T') + }, 8000) +}) + + +example.addObserver('obs',tObs,{details:{desc:'this is a constructed observable from the Observable Class'}}) + +const tPromise = new Promise(function(resolve) { + setTimeout(()=>{ + console.log('promise observer is resolving') + resolve('yes') + },1000) +}) + +example.addObserver('e',{details:{desc:'this is the e observer which comes from an emitter'}}) +// example.addObserver('ce',{condition: (ev)=>ev.ready, event:'ec'}) +example.addObserver('ce',{event:'ec'}) +example.addObserver('pe',emitter) +example.addObserver('pr',tPromise) +example.addObserver('extnd',example.getObserver('e').pipe( + map(state => { + // console.log('+++++++++++\n','extend before:',state) + if (state) { + let val = Math.floor(Math.random() * Math.floor(10)) + // console.log(val) + state = val <11 ? true:false + } + // console.log('extend after:',state,'\n-------------') + return state + }) +),{details:'an observer extension of the e observer'}) + +let madeonly = example.makeObserver({event:'madeonly'}) +madeonly ? console.log('observer \'madeonly\' was only made and was not added so won\'t appear in \'all\' list') + : console.log('made observer failed') +console.log('list of observers made and added to the all combination\n',example.observers) + +if (combo) { + let combo = example.combineObservers('combo',['e','pr']) + .pipe( + tap(state=>console.log('++++++++++++++ tap log of extened combo e and pr state:+++++++++++++++',state)), + ) + + if (combo){ + example.subscribe('e', ready => console.log('e',ready)) + example.subscribe('e', ready => console.log('pr',ready)) + console.log('a combination name \'combo\' made of e and pr was created') + console.log('now subscribing to combination e, pr') + example.subscribe('combo', val => console.log('saved combo subscription state:',val)) + combo.subscribe(val => console.log('extened combo subscription state:',val)) + } +} + +console.log('all current combinations',example.combinations) + +console.log('------------------starting to emit values--------------------------') + + +// emitter.emit('ec',{ready:true}) +emitter.emit('ec',true) +emitter.emit('pe','yup') +emitter.emit('e') +console.log('===============done initial emitting, pe,e,ec/ce=================') +setTimeout(async () => { + console.log('============emitting e, pe false===================') + example.setObserverDetails('e',{moreinfo:'an additional property of details added later'}) + emitter.emit('e',false) + emitter.emit('pe',false) +} + ,2000) +setTimeout(async () => { + console.log('=================emitting e true, removing pe which is false ================') + emitter.emit('e',true) + example.removeObserver('pe') + example.subscribe(ready=>console.log('a second all subscriber', ready),{add:true}) +} + ,4000) + +setTimeout(async () => {console.log('timeout done');process.exit()},10000) diff --git a/package.json b/package.json index f8f7bf3..1228ec2 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@uci-utils/ready", - "version": "0.1.6", + "version": "0.1.7", "description": "A Class to Observe the reduced to boolean combined state of a map of observables", "main": "src/ready.js", "scripts": { diff --git a/src/ready.js b/src/ready.js index 2617e5a..966e58b 100644 --- a/src/ready.js +++ b/src/ready.js @@ -11,21 +11,27 @@ class Ready extends Map { constructor(opts) { super(opts.observables) this.emitter = isEmitter(opts.emitter) ? opts.emitter : null - const toBool = createBoolean(opts.boolean) // e.g. {undefined:true} - this.condition = opts.condition || ( (ev) => toBool(ev) ) - this.subscriptions = new Map() - this.combinations = new Map() - this.details = new Map() - this.combineObservers('__all__') // initialize all combination + const toBool = createBoolean(opts.boolean) + this.toBoolean = (value) => { + if (isPlainObject(value)) value = (value.ready || value.online || value.active || false) + return toBool(value) + } + this.condition = opts.condition || ( (ev) => this.toBoolean(ev) ) + this._subscriptions = new Map() + this._combinations = new Map() + this._all = {} // where all observer is kept + this._combineAll() this.logger = new BehaviorSubject() - this.state = [] // holds last state of all observers + this.state = [] // holds last emitted state of all combination this.log = this.logger.next.bind(this.logger) if (opts.verbose||process.env.UCI_READY_VERBOSE==='true') this.logger.subscribe(console.log) this.handler = opts.handler || ((ready) => {console.log('default handler', ready)}) this._first = true // tracks first emission } - get observerNames(){return Array.from(this.keys())} + get observers(){return Array.from(this.keys())} + get combinations(){return Array.from(this._combinations.keys())} + get all() { return this._all} get failure () { let ret = null @@ -50,11 +56,12 @@ class Ready extends Map { } getObserver(name) { - return ((this.get(name) || {}).obs || this.combinations.get(name||'__all__')) + return (this.get(name) || {}).obs + // return (name==null ||'__all__') ? this._all : ( (this.get(name) || {}).obs || this._combinations.get(name) || false ) } getCombination(name) { - this.combinations.get(name||'__all__') + return this._combinations.get(name) } getValue(name) { // NOT recommended. if there is any issue will return false @@ -71,7 +78,6 @@ class Ready extends Map { } makeObserver(obs, opts={}) { - // validation and defaults, obs can be emitter, osbserver, or promise if (isPlainObject(obs)) { opts = obs; obs=null } if (!(obs || this.emitter)) return false // some observable requried @@ -79,14 +85,15 @@ class Ready extends Map { condition = condition || this.condition if (isEmitter(obs) && (event || name)) obs = fromEvent(obs, event || name ) // it's an emitter if (isPromise(obs)) obs = from(obs) // it's a promise - if (obs && !isObservable(obs) && typeof obs==='function' && arguments.length===2) condition = obs - if (!obs && this.emitter && (event || name)) obs = fromEvent(this.emitter,event || name) + if (!obs && this.emitter && (event || name)) obs = fromEvent(this.emitter,(event || name)) if (!obs || !isObservable(obs)) return false let xobs = obs.pipe( tap(val => this.log(`${name} emitted/resolved the value =>${JSON.stringify(val)}`)), + // TODO retain plain object if it has a ready property map(condition), tap(val => this.log(`boolean: ${val}`)), startWith(false), + // shareReplay({refCount:true, bufferSize:1}), shareReplay(1), ) xobs._readyType = true @@ -96,86 +103,111 @@ class Ready extends Map { addObserver(name, obs, opts={} ) { // validation and defaults, obs can be emitter, osbserver, or promise if (!name || typeof(name)!=='string') return false // name required + if (isPlainObject(obs)) { opts = obs; obs=null } + if (this.has(name) && !opts.overwrite) return false // don't over write exiting observer + else this.removeObserver(name) opts.name = name if (!(obs || {})._readyType) obs = this.makeObserver(obs, opts) if (!isObservable(obs)) return false - let { details } = opts - let sub = obs.subscribe() + let sub = obs.subscribe() // internal subscriber + // let sub = null this.set(name, {obs:obs, sub:sub}) - this.setObserverDetails(name,details) - this.combineObservers('__all__',opts) // update total combo - if (this.subscriptions.has('__all__')) this.subscribe() // will resubscribe + this.setObserverDetails(name,opts.details) + this._combineAll() return obs } + // will remove combination as well + removeObserver(names) { + if (!names) names = this.observers // remove all + if (!Array.isArray(names)) names = [names] + names.forEach(name => { + const sub = (this.get(name)||{}).sub + if (sub) sub.unsubscribe() // remove attached subscription + this.unsubscribe(name) // remove any manual subscription + this.delete(name) + }) + // update total combo + this._combineAll() + // TODO update affected combinations + } + // TODO add option for whether changed is enabled (default) or not and store with combo for remake combineObservers(name,list,opts={}) { - if (Array.isArray(name)) {list = name; name = null} - name = name || '__all__' - if (name==='__all__') list = Array.from(this.keys()) // get list of all observers - if (!Array.isArray(list)) return false // can't make a combo without a list + if (!name || typeof(name)!=='string') return false // name required if (this.has(name)) return false // can't save a combo with same name as any single oberver - let observers = list.map(name=>this.getObserver(name)) // will get combo if exists - if (observers.filter(obs=>!isObservable(obs)).length) return false + if (!Array.isArray(list)) return false // need a list of observers + if (this._combinations.has(name)) { + if (opts.overwrite) this.removeCombination(name) + else return false + } + let observers = list.map(name=>{return name._readyType ? name : this.getObserver(name)}) + observers = observers.filter(obs=>obs._readyType) + if (!observers.length) return false let combination = combineLatest(observers).pipe( - tap(states => { if (name==='__all__') this.state = list.map((name,index) => [name,states[index]])}), tap(states => { this.log(list.map((name,index) => [name,states[index]]))}), map(states => states.reduce((res,state) => {return res && state},true)), changed(), //filters out emission if it is unchanged from last - startWith(false), shareReplay(1) ) - this.combinations.set(name, combination) // if name passed then save combo in Map + this._combinations.set(name, combination) // if name passed then save combo in Map return combination } - // will remove combination as well - removeObserver(names) { - if (!names) names = this.observerNames - else { - if (!Array.isArray(names)) names = [names] - console.log('names to remove', names) - names.forEach(name => { - const sub = (this.get(name)||{}).sub - if (sub) sub.unsubscribe() // remove attached subscription - this.unsubscribe(name) // remove any manual subscription - this.delete(name) || this.combinations.delete(name) - }) - } - console.log(this.observerNames) - this.combineObservers('__all__') // update total combo - this.subscribe() // resubscribe to changed combo + removeCombination(names){ + if (!names) names = this.combinations // remove all + if (!Array.isArray(names)) names = [names] + names.forEach(name => { + // const sub = (this._combinations.get(name)||{}).sub + // if (sub) sub.unsubscribe() // remove attached subscription + this.unsubscribe(name) // remove any resigtered subscription + this._combinations.delete(name) + }) } - subscribe(name, handler, save) { + subscribe(name, handler, opts={}) { // only one subscription at a time per observer or combination from this method if (typeof name ==='function') { + opts=handler||{} handler=name name = null } - name = name || '__all__' - handler = handler || (this.subscriptions.get(name)||{}).handler || this.handler - let obs = this.getObserver(name) // will attempt to get combo if no simple observer, all if name is null + name = name ||'__all__' + handler = handler || (this._subscriptions.get(name)||{}).handler || this.handler // will resubscribe exisiting handler + let obs = name==='__all__' ? this._all : (this.getObserver(name)||this.getCombination(name)) if (!obs) return false - if (this.subscriptions.has(name)) this.subscriptions.get(name).subs.unsubscribe() + if (this._subscriptions.has(name)) this._subscriptions.get(name).subs.unsubscribe() let subs = obs.subscribe(handler) - if (save) this.subscriptions.set(name,{subs:subs,handler:handler}) + // TODO allow reseting additional subscription on add/remove + if (!opts.add) this._subscriptions.set(name, {subs:subs,handler:handler}) return subs } unsubscribe(name) { name = name ||'__all__' - if (!this.subscriptions.has(name)) return false - this.subscriptions.get(name).subs.unsubscribe() - this.subscriptions.delete(name) + if (!this._subscriptions.has(name)) return false + this._subscriptions.get(name).subs.unsubscribe() + this._subscriptions.delete(name) return true } + _combineAll() { + let observers = this.observers.map(name=>this.getObserver(name)) + this._all = combineLatest(observers).pipe( + tap(states => { this.state = this.observers.map((name,index) => [name,states[index]]) }), + tap(states => { this.log(this.observers.map((name,index) => [name,states[index]])) }), + map(states => states.reduce((res,state) => {return res && state},true)), + changed(), //filters out emission if it is unchanged from last + ) + if (this._subscriptions.has('__all__')) this.subscribe() // will resubscribe with same handler + } + } // end class function changed(toBoolean) { toBoolean = toBoolean || (val => !!val) return obs$ => obs$.pipe( + startWith(false), pairwise(), filter( ([p,c]) => { const chng = !obs$.__not_first__ || !!(toBoolean(p) ^ toBoolean(c))