From 30f9f273e74d1f1eb39dbe5e64e9d7889a4143c4 Mon Sep 17 00:00:00 2001 From: David Kebler Date: Tue, 7 Jan 2020 15:02:36 -0800 Subject: [PATCH] 0.1.4 using '__all__' for main combination can save combinations to map .state hold state at last main combination change can add observer details that can be passed on later by subscription shareReply added to observers added 'changed' operator that emits only on changed state or first emission modified the map operator to accept async function via switchMap added methods observerNames to get list failure gets name of first observer to fail in the state prop getValue addObserverDetails (can add/ammend at any time) --- examples/example.js | 71 ++++++++++++++++----- package.json | 13 ++-- src/ready.js | 149 +++++++++++++++++++++++++++++++++++--------- 3 files changed, 180 insertions(+), 53 deletions(-) diff --git a/examples/example.js b/examples/example.js index a885c61..e476eb6 100644 --- a/examples/example.js +++ b/examples/example.js @@ -1,12 +1,16 @@ import { Observable } from 'rxjs' -import Ready from '../src/ready' +import { Ready, changed, map, tap} from '../src/ready' import { EventEmitter } from 'events' let emitter = new EventEmitter() +let verbose = process.env.VERBOSE==='true' + +let combo = false + // handler: (r)=> console.log('result:',r) -let process = new Ready({emitter: emitter, verbose:true}) +let example = new Ready({emitter: emitter, verbose:verbose}) const tObs = new Observable(subscriber => { subscriber.next('on') @@ -14,24 +18,63 @@ const tObs = new Observable(subscriber => { subscriber.next('enabled') setTimeout(() => { subscriber.next('F') - subscriber.next('T') }, 2000) + setTimeout(() => { + subscriber.next('T') + }, 3000) }) const tPromise = new Promise(function(resolve) { - setTimeout(()=>resolve('yes'),1500) + setTimeout(()=>resolve('yes'),1000) }) -process.addObserver('e') -process.addObserver('ec',(ev)=>ev.test) -process.addObserver('pe',emitter) -process.addObserver('obs',tObs) -process.addObserver('pr',tPromise) +example.addObserver('e',{details:{desc:'this is the e observer which comes from an emitter'}}) +example.addObserver('ce',{condition: (ev)=>ev.ready, event:'ec'}) +example.addObserver('pe',emitter) +example.addObserver('obs',tObs) +example.addObserver('pr',tPromise) +example.addObserver('extnd',example.getObserver('e').pipe( + map(state => { + // console.log('+++++++++++\n','extend before:',state) + if (state) { + let val = Math.floor(Math.random() * Math.floor(10)) + // console.log(val) + state = val <10 ? true:false + } + // console.log('extend after:',state,'\n-------------') + return state + }) +)) -process.subscribe(val => console.log('---------------Ready Changed to :',val)) -// process.subscribe('obs',val => console.log('obs',val)) +if (combo) { + let combo = example.combineObservers('combo',['e','pr']) + .pipe( + tap(state=>console.log('combo e and pr state:',state)), + changed() // only emit if changed + ) + if (combo){ + console.log('subscribing to sub combination e, pr') + example.subscribe('combo',val => console.log('combo e, pr is:',val)) + } +} +example.subscribe(val => { + console.log('Full Combination>>>>>>>>READY? example:',val) + console.log('failed observer', example.failure, 'details', example.details.get(example.failure)) +}) // all -emitter.emit('ec',{test:true}) -emitter.emit('e','sure') -emitter.emit('pe') +emitter.emit('ec',{ready:true}) +emitter.emit('pe','yup') +emitter.emit('e') +console.log('===============done initial emitting=================') +setTimeout(async () => { + console.log('============emitting e false===================') + emitter.emit('e',false) +} + ,2000) +setTimeout(async () => { + console.log('=================emitting e true================') + emitter.emit('e',true) +} + ,4000) +setTimeout(async () => {console.log('timeout done')},6000) diff --git a/package.json b/package.json index 804f7f2..88ba4a3 100644 --- a/package.json +++ b/package.json @@ -1,16 +1,12 @@ { "name": "@uci-utils/ready", - "version": "0.1.2", + "version": "0.1.4", "description": "A Class to Observe the reduced to boolean combined state of a map of observables", "main": "src/ready.js", "scripts": { - "example": "node --r esm examples/example", - "example:dev": "UCI_ENV=dev ./node_modules/.bin/nodemon -r esm examples/example", - "test": "./node_modules/.bin/mocha -r esm --timeout 30000", - "testd": "UCI_ENV=dev ./node_modules/.bin/nodemon --exec './node_modules/.bin/mocha -r esm --timeout 30000' || exit 0", - "testdd": "UCI_LOG_LEVEL='trace' npm run testd", - "testde": "UCI_LOG_LEVEL='warn' npm run testd", - "testl": "UCI_ENV=pro UCI_LOG_PATH=./test/test.log 0 npm run test || exit 0" + "example": "node -r esm examples/example", + "example:dev": "./node_modules/.bin/nodemon -r esm examples/example", + "test": "./node_modules/.bin/mocha -r esm --timeout 30000" }, "author": "David Kebler", "license": "MIT", @@ -28,6 +24,7 @@ "dependencies": { "@uci-utils/to-boolean": "^0.1.1", "is-observable": "^2.0.0", + "is-plain-object": "^3.0.0", "p-is-promise": "^3.0.0", "rxjs": "^6.5.4" }, diff --git a/src/ready.js b/src/ready.js index 54f286c..5562ef7 100644 --- a/src/ready.js +++ b/src/ready.js @@ -1,7 +1,9 @@ -import { from, fromEvent, combineLatest, Subject } from 'rxjs' -import { map, startWith, tap, pairwise, filter } from 'rxjs/operators' +// new change +import { from, fromEvent, combineLatest, ReplaySubject } from 'rxjs' +import { switchMap, map as simpleMap, startWith, tap, pairwise, filter, shareReplay} from 'rxjs/operators' import isObservable from 'is-observable' import isPromise from 'p-is-promise' +import isPlainObject from 'is-plain-object' // UCI dependencies import { createBoolean } from '@uci-utils/to-boolean' const toBool = createBoolean({undefined:true}) // make default make null event emission cast to TRUE @@ -13,37 +15,81 @@ class Ready extends Map { this.emitter = typeof opts.emitter.on ==='function' ? opts.emitter : null this.condition = opts.condition || ( (ev) => toBool(ev) ) this.subscriptions = new Map() - this._state = {} - this.logger = new Subject() + this.combinations = new Map() + this.details = new Map() + this.combineObservers('__all__') // initialize all combination + this.logger = new ReplaySubject() + this.state = [] // holds last state of all observers this.log = this.logger.next.bind(this.logger) if (opts.verbose) this.logger.subscribe(console.log) - this._updateObserversList() // initialize this.handler = opts.handler || console.log + this._first = true // tracks first emission } - get state() {return this._state} + get observerNames(){return Array.from(this.keys())} - addObserver(name, obs, condition ) { - if (!name) return false // name required + get failure () { + let ret = null + let failed = this.state.some(obs=> { + ret = obs[0] + return obs[1]===false + }) + return !failed ? '__none__' : ret + } + + getObserver(name) { + return (this.get(name) || this.combinations.get(name||'__all__')) + } + + getValue(name) { // NOT recommended. if there is any issue will return false + let obs = this.getObserver(name) + return new Promise(resolve => { + setTimeout(()=>resolve(false),50) + if (isObservable(obs)){ + const sub = obs.subscribe(val => { + resolve(val) + }) + sub.unsubscribe() + } else resolve(false) + }) + } + + addObserverDetails(name,details,overwrite) { + if (this.has(name)) { + if (!isPlainObject(details)) return false + // TODO substitue merge anything for Object.assign + this.details.set(name,overwrite ? details : Object.assign(this.details.get(name)||{},details)) + return true + } + return false + } + + addObserver(name, obs, opts={} ) { + // validation and defaults, obs can be emitter, osbserver, or promise + if (!name || typeof(name)!=='string') return false // name required + if (isPlainObject(obs)) { opts = obs; obs=null } if (!(obs || this.emitter)) return false // some observable requried - if (typeof (obs ||{}).on ==='function') obs = fromEvent(obs,name) // it's an emitter + let { condition, event, details } = opts + condition = condition || this.condition + if (typeof (obs ||{}).on ==='function') obs = fromEvent(obs, event || name) // it's an emitter if (isPromise(obs)) obs = from(obs) // it's a promise if (obs && !isObservable(obs) && typeof obs==='function' && arguments.length===2) { condition = obs - obs = null + // obs = null } - if (!obs && this.emitter) obs = fromEvent(this.emitter,name) + if (!obs && this.emitter) obs = fromEvent(this.emitter,event || name) if (!obs || !isObservable(obs)) return false - this.set(name, obs - .pipe( - tap(val => this.logger.next(`${name} emitted/resolved the value =>${JSON.stringify(val)}`)), - map(condition||this.condition), - tap(val => this.log(`boolean: ${val}`)), - startWith(false), - ) + let xobs = obs.pipe( + startWith(false), + tap(val => this.log(`${name} emitted/resolved the value =>${JSON.stringify(val)}`)), + map(condition), + // tap(val => this.log(`boolean: ${val}`)), + shareReplay(1) ) - this._updateObserversList() - return true + this.set(name, xobs) + this.addObserverDetails(name,details,true) + this.combineObservers('__all__') // update total combo + return xobs } removeObserver(names) { @@ -54,7 +100,7 @@ class Ready extends Map { this.delete(name) }) } - this._updateObserversList() + this.combineObservers('__all__') // update total combo } subscribe(name, handler) { @@ -62,24 +108,65 @@ class Ready extends Map { handler=name name = null } - this.subscriptions.set(name||'_primary', (name ? this.get(name):this._state).subscribe(handler||this.handler)) + name = name || '__all__' + if (this.subscriptions.get(name)) this.unsubscribe(name) + let obs = this.getObserver(name) + if (!obs) return false + let subs = obs.subscribe(handler||this.handler) + this.subscriptions.set(name,subs) + return subs } unsubscribe(name) { - this.subscriptions.get(name||'_primary').unsubscribe() + if (!this.subscriptions.has(name)) return false + this.subscriptions.get(name||'__all__').unsubscribe() + return true } - _updateObserversList() { - this._state = combineLatest(Array.from(this.values())).pipe( - tap(states => this.log( Array.from(this.keys()).map((name,index) => [name,states[index]]))), - map( states => states.reduce((res,state) => {return res && state},true)), - pairwise(), - filter( ([p,c]) => (p ^ c)), // only emit on change - map( r => r[1] ) //remove previous + combineObservers(name,list) { + if (Array.isArray(name)) {list = name; name = null} + if (name==='__all__') list = Array.from(this.keys()) // get list of all observers + if (!Array.isArray(list)) return false // can't make a combo without a list + if (this.has(name)) return false // can't save a combo with same name as any single oberver + let observers = list.map(name=>this.get(name)||this.getCombination(name)) + if (observers.filter(obs=>!isObservable(obs)).length) return false + let combination = combineLatest(observers).pipe( + tap(states => { if (name==='__all__') this.state = list.map((name,index) => [name,states[index]])}), + map(states => states.reduce((res,state) => {return res && state},true)), + startWith(false), + changed(), + shareReplay(1) ) + if (name) this.combinations.set(name, combination) // if name passed then save combo in Map + return combination + } + + getCombination(name) { + this.combinations.get(name||'__all__') } } // end class + +function changed(toBoolean) { + toBoolean = toBoolean || (val => !!val) + return obs$ => obs$.pipe( + pairwise(), + filter( ([p,c]) => { + const chng = !obs$.__not_first__ || !!(toBoolean(p) ^ toBoolean(c)) + obs$.__not_first__=true + return chng}), + map( r => r[1] ), //remove previous + ) +} + +function map(fn) { + return isAsync(fn) ? switchMap.call(this,fn) : simpleMap.call(this,fn) +} + +function isAsync(fn) { + return (isPromise(fn) || fn.constructor.name === 'AsyncFunction') +} + export default Ready -export { Ready } +export { Ready, changed, map, tap}