diff --git a/examples/example.js b/examples/example.js index a885c61..e476eb6 100644 --- a/examples/example.js +++ b/examples/example.js @@ -1,12 +1,16 @@ import { Observable } from 'rxjs' -import Ready from '../src/ready' +import { Ready, changed, map, tap} from '../src/ready' import { EventEmitter } from 'events' let emitter = new EventEmitter() +let verbose = process.env.VERBOSE==='true' + +let combo = false + // handler: (r)=> console.log('result:',r) -let process = new Ready({emitter: emitter, verbose:true}) +let example = new Ready({emitter: emitter, verbose:verbose}) const tObs = new Observable(subscriber => { subscriber.next('on') @@ -14,24 +18,63 @@ const tObs = new Observable(subscriber => { subscriber.next('enabled') setTimeout(() => { subscriber.next('F') - subscriber.next('T') }, 2000) + setTimeout(() => { + subscriber.next('T') + }, 3000) }) const tPromise = new Promise(function(resolve) { - setTimeout(()=>resolve('yes'),1500) + setTimeout(()=>resolve('yes'),1000) }) -process.addObserver('e') -process.addObserver('ec',(ev)=>ev.test) -process.addObserver('pe',emitter) -process.addObserver('obs',tObs) -process.addObserver('pr',tPromise) +example.addObserver('e',{details:{desc:'this is the e observer which comes from an emitter'}}) +example.addObserver('ce',{condition: (ev)=>ev.ready, event:'ec'}) +example.addObserver('pe',emitter) +example.addObserver('obs',tObs) +example.addObserver('pr',tPromise) +example.addObserver('extnd',example.getObserver('e').pipe( + map(state => { + // console.log('+++++++++++\n','extend before:',state) + if (state) { + let val = Math.floor(Math.random() * Math.floor(10)) + // console.log(val) + state = val <10 ? true:false + } + // console.log('extend after:',state,'\n-------------') + return state + }) +)) -process.subscribe(val => console.log('---------------Ready Changed to :',val)) -// process.subscribe('obs',val => console.log('obs',val)) +if (combo) { + let combo = example.combineObservers('combo',['e','pr']) + .pipe( + tap(state=>console.log('combo e and pr state:',state)), + changed() // only emit if changed + ) + if (combo){ + console.log('subscribing to sub combination e, pr') + example.subscribe('combo',val => console.log('combo e, pr is:',val)) + } +} +example.subscribe(val => { + console.log('Full Combination>>>>>>>>READY? example:',val) + console.log('failed observer', example.failure, 'details', example.details.get(example.failure)) +}) // all -emitter.emit('ec',{test:true}) -emitter.emit('e','sure') -emitter.emit('pe') +emitter.emit('ec',{ready:true}) +emitter.emit('pe','yup') +emitter.emit('e') +console.log('===============done initial emitting=================') +setTimeout(async () => { + console.log('============emitting e false===================') + emitter.emit('e',false) +} + ,2000) +setTimeout(async () => { + console.log('=================emitting e true================') + emitter.emit('e',true) +} + ,4000) +setTimeout(async () => {console.log('timeout done')},6000) diff --git a/package.json b/package.json index 804f7f2..88ba4a3 100644 --- a/package.json +++ b/package.json @@ -1,16 +1,12 @@ { "name": "@uci-utils/ready", - "version": "0.1.2", + "version": "0.1.4", "description": "A Class to Observe the reduced to boolean combined state of a map of observables", "main": "src/ready.js", "scripts": { - "example": "node --r esm examples/example", - "example:dev": "UCI_ENV=dev ./node_modules/.bin/nodemon -r esm examples/example", - "test": "./node_modules/.bin/mocha -r esm --timeout 30000", - "testd": "UCI_ENV=dev ./node_modules/.bin/nodemon --exec './node_modules/.bin/mocha -r esm --timeout 30000' || exit 0", - "testdd": "UCI_LOG_LEVEL='trace' npm run testd", - "testde": "UCI_LOG_LEVEL='warn' npm run testd", - "testl": "UCI_ENV=pro UCI_LOG_PATH=./test/test.log 0 npm run test || exit 0" + "example": "node -r esm examples/example", + "example:dev": "./node_modules/.bin/nodemon -r esm examples/example", + "test": "./node_modules/.bin/mocha -r esm --timeout 30000" }, "author": "David Kebler", "license": "MIT", @@ -28,6 +24,7 @@ "dependencies": { "@uci-utils/to-boolean": "^0.1.1", "is-observable": "^2.0.0", + "is-plain-object": "^3.0.0", "p-is-promise": "^3.0.0", "rxjs": "^6.5.4" }, diff --git a/src/ready.js b/src/ready.js index 54f286c..5562ef7 100644 --- a/src/ready.js +++ b/src/ready.js @@ -1,7 +1,9 @@ -import { from, fromEvent, combineLatest, Subject } from 'rxjs' -import { map, startWith, tap, pairwise, filter } from 'rxjs/operators' +// new change +import { from, fromEvent, combineLatest, ReplaySubject } from 'rxjs' +import { switchMap, map as simpleMap, startWith, tap, pairwise, filter, shareReplay} from 'rxjs/operators' import isObservable from 'is-observable' import isPromise from 'p-is-promise' +import isPlainObject from 'is-plain-object' // UCI dependencies import { createBoolean } from '@uci-utils/to-boolean' const toBool = createBoolean({undefined:true}) // make default make null event emission cast to TRUE @@ -13,37 +15,81 @@ class Ready extends Map { this.emitter = typeof opts.emitter.on ==='function' ? opts.emitter : null this.condition = opts.condition || ( (ev) => toBool(ev) ) this.subscriptions = new Map() - this._state = {} - this.logger = new Subject() + this.combinations = new Map() + this.details = new Map() + this.combineObservers('__all__') // initialize all combination + this.logger = new ReplaySubject() + this.state = [] // holds last state of all observers this.log = this.logger.next.bind(this.logger) if (opts.verbose) this.logger.subscribe(console.log) - this._updateObserversList() // initialize this.handler = opts.handler || console.log + this._first = true // tracks first emission } - get state() {return this._state} + get observerNames(){return Array.from(this.keys())} - addObserver(name, obs, condition ) { - if (!name) return false // name required + get failure () { + let ret = null + let failed = this.state.some(obs=> { + ret = obs[0] + return obs[1]===false + }) + return !failed ? '__none__' : ret + } + + getObserver(name) { + return (this.get(name) || this.combinations.get(name||'__all__')) + } + + getValue(name) { // NOT recommended. if there is any issue will return false + let obs = this.getObserver(name) + return new Promise(resolve => { + setTimeout(()=>resolve(false),50) + if (isObservable(obs)){ + const sub = obs.subscribe(val => { + resolve(val) + }) + sub.unsubscribe() + } else resolve(false) + }) + } + + addObserverDetails(name,details,overwrite) { + if (this.has(name)) { + if (!isPlainObject(details)) return false + // TODO substitue merge anything for Object.assign + this.details.set(name,overwrite ? details : Object.assign(this.details.get(name)||{},details)) + return true + } + return false + } + + addObserver(name, obs, opts={} ) { + // validation and defaults, obs can be emitter, osbserver, or promise + if (!name || typeof(name)!=='string') return false // name required + if (isPlainObject(obs)) { opts = obs; obs=null } if (!(obs || this.emitter)) return false // some observable requried - if (typeof (obs ||{}).on ==='function') obs = fromEvent(obs,name) // it's an emitter + let { condition, event, details } = opts + condition = condition || this.condition + if (typeof (obs ||{}).on ==='function') obs = fromEvent(obs, event || name) // it's an emitter if (isPromise(obs)) obs = from(obs) // it's a promise if (obs && !isObservable(obs) && typeof obs==='function' && arguments.length===2) { condition = obs - obs = null + // obs = null } - if (!obs && this.emitter) obs = fromEvent(this.emitter,name) + if (!obs && this.emitter) obs = fromEvent(this.emitter,event || name) if (!obs || !isObservable(obs)) return false - this.set(name, obs - .pipe( - tap(val => this.logger.next(`${name} emitted/resolved the value =>${JSON.stringify(val)}`)), - map(condition||this.condition), - tap(val => this.log(`boolean: ${val}`)), - startWith(false), - ) + let xobs = obs.pipe( + startWith(false), + tap(val => this.log(`${name} emitted/resolved the value =>${JSON.stringify(val)}`)), + map(condition), + // tap(val => this.log(`boolean: ${val}`)), + shareReplay(1) ) - this._updateObserversList() - return true + this.set(name, xobs) + this.addObserverDetails(name,details,true) + this.combineObservers('__all__') // update total combo + return xobs } removeObserver(names) { @@ -54,7 +100,7 @@ class Ready extends Map { this.delete(name) }) } - this._updateObserversList() + this.combineObservers('__all__') // update total combo } subscribe(name, handler) { @@ -62,24 +108,65 @@ class Ready extends Map { handler=name name = null } - this.subscriptions.set(name||'_primary', (name ? this.get(name):this._state).subscribe(handler||this.handler)) + name = name || '__all__' + if (this.subscriptions.get(name)) this.unsubscribe(name) + let obs = this.getObserver(name) + if (!obs) return false + let subs = obs.subscribe(handler||this.handler) + this.subscriptions.set(name,subs) + return subs } unsubscribe(name) { - this.subscriptions.get(name||'_primary').unsubscribe() + if (!this.subscriptions.has(name)) return false + this.subscriptions.get(name||'__all__').unsubscribe() + return true } - _updateObserversList() { - this._state = combineLatest(Array.from(this.values())).pipe( - tap(states => this.log( Array.from(this.keys()).map((name,index) => [name,states[index]]))), - map( states => states.reduce((res,state) => {return res && state},true)), - pairwise(), - filter( ([p,c]) => (p ^ c)), // only emit on change - map( r => r[1] ) //remove previous + combineObservers(name,list) { + if (Array.isArray(name)) {list = name; name = null} + if (name==='__all__') list = Array.from(this.keys()) // get list of all observers + if (!Array.isArray(list)) return false // can't make a combo without a list + if (this.has(name)) return false // can't save a combo with same name as any single oberver + let observers = list.map(name=>this.get(name)||this.getCombination(name)) + if (observers.filter(obs=>!isObservable(obs)).length) return false + let combination = combineLatest(observers).pipe( + tap(states => { if (name==='__all__') this.state = list.map((name,index) => [name,states[index]])}), + map(states => states.reduce((res,state) => {return res && state},true)), + startWith(false), + changed(), + shareReplay(1) ) + if (name) this.combinations.set(name, combination) // if name passed then save combo in Map + return combination + } + + getCombination(name) { + this.combinations.get(name||'__all__') } } // end class + +function changed(toBoolean) { + toBoolean = toBoolean || (val => !!val) + return obs$ => obs$.pipe( + pairwise(), + filter( ([p,c]) => { + const chng = !obs$.__not_first__ || !!(toBoolean(p) ^ toBoolean(c)) + obs$.__not_first__=true + return chng}), + map( r => r[1] ), //remove previous + ) +} + +function map(fn) { + return isAsync(fn) ? switchMap.call(this,fn) : simpleMap.call(this,fn) +} + +function isAsync(fn) { + return (isPromise(fn) || fn.constructor.name === 'AsyncFunction') +} + export default Ready -export { Ready } +export { Ready, changed, map, tap}