add boolean option to allow pass through of toBoolean module options
refactor main map to hold observer, details and corresponding 'hidden' subscription together
unsubscribes from that subscription when removing observer
new getter/setter for observer details, can also pass as option when adding observer
master
David Kebler 2020-01-14 13:26:40 -08:00
parent 30f9f273e7
commit 63566d624f
4 changed files with 111 additions and 77 deletions

View File

@ -4,13 +4,28 @@ import { EventEmitter } from 'events'
let emitter = new EventEmitter()
let verbose = process.env.VERBOSE==='true'
// let verbose = process.env.VERBOSE==='true'
let combo = false
// handler: (r)=> console.log('result:',r)
let example = new Ready({emitter: emitter, verbose:verbose})
const late=3000
let example = new Ready({emitter: emitter})
let subscribe = ()=>{
console.log('subscribing at',late, 'ms')
example.subscribe(ready => {
console.log(`-----------Subscriber at ${late} ms--------------?`,ready)
console.log('the failed observer:', example.failure, ',details:', example.getObserverDetails(example.failure))
console.log('the total state', example.state)
console.log('---------------------------------------')
})
}
if (!late) subscribe()
else setTimeout(subscribe,late)
const tObs = new Observable(subscriber => {
subscriber.next('on')
@ -18,20 +33,25 @@ const tObs = new Observable(subscriber => {
subscriber.next('enabled')
setTimeout(() => {
subscriber.next('F')
}, 2000)
}, 7000)
setTimeout(() => {
subscriber.next('T')
}, 3000)
}, 8000)
})
example.addObserver('obs',tObs,{details:{desc:'this is a constructed observable from the Observable Class'}})
const tPromise = new Promise(function(resolve) {
setTimeout(()=>resolve('yes'),1000)
setTimeout(()=>{
console.log('promise observer is resolving')
resolve('yes')
},1000)
})
example.addObserver('e',{details:{desc:'this is the e observer which comes from an emitter'}})
example.addObserver('ce',{condition: (ev)=>ev.ready, event:'ec'})
example.addObserver('pe',emitter)
example.addObserver('obs',tObs)
example.addObserver('pr',tPromise)
example.addObserver('extnd',example.getObserver('e').pipe(
map(state => {
@ -39,12 +59,12 @@ example.addObserver('extnd',example.getObserver('e').pipe(
if (state) {
let val = Math.floor(Math.random() * Math.floor(10))
// console.log(val)
state = val <10 ? true:false
state = val <11 ? true:false
}
// console.log('extend after:',state,'\n-------------')
return state
})
))
),{details:'an observer extension of the e observer'})
if (combo) {
let combo = example.combineObservers('combo',['e','pr'])
@ -58,23 +78,23 @@ if (combo) {
example.subscribe('combo',val => console.log('combo e, pr is:',val))
}
}
example.subscribe(val => {
console.log('Full Combination>>>>>>>>READY? example:',val)
console.log('failed observer', example.failure, 'details', example.details.get(example.failure))
}) // all
emitter.emit('ec',{ready:true})
emitter.emit('pe','yup')
emitter.emit('e')
console.log('===============done initial emitting=================')
console.log('===============done initial emitting, pe,e,ec/ce=================')
setTimeout(async () => {
console.log('============emitting e false===================')
console.log('============emitting e, pe false===================')
example.setObserverDetails('e',{moreinfo:'an additional property of details added later'})
emitter.emit('e',false)
emitter.emit('pe',false)
}
,2000)
setTimeout(async () => {
console.log('=================emitting e true================')
console.log('=================emitting e true, removing pe which is false ================')
emitter.emit('e',true)
example.removeObserver('pe')
}
,4000)
setTimeout(async () => {console.log('timeout done')},6000)

View File

@ -1,6 +1,6 @@
{
"name": "@uci-utils/ready",
"version": "0.1.4",
"version": "0.1.5",
"description": "A Class to Observe the reduced to boolean combined state of a map of observables",
"main": "src/ready.js",
"scripts": {

View File

@ -1,28 +1,27 @@
// new change
import { from, fromEvent, combineLatest, ReplaySubject } from 'rxjs'
import { from, fromEvent, combineLatest, BehaviorSubject } from 'rxjs'
import { switchMap, map as simpleMap, startWith, tap, pairwise, filter, shareReplay} from 'rxjs/operators'
import isObservable from 'is-observable'
import isPromise from 'p-is-promise'
import isPlainObject from 'is-plain-object'
// UCI dependencies
import { createBoolean } from '@uci-utils/to-boolean'
const toBool = createBoolean({undefined:true}) // make default make null event emission cast to TRUE
class Ready extends Map {
constructor(opts) {
super(opts.observables)
// TODO support setting registration in options
this.emitter = typeof opts.emitter.on ==='function' ? opts.emitter : null
const toBool = createBoolean(opts.boolean) // e.g. {undefined:true}
this.condition = opts.condition || ( (ev) => toBool(ev) )
this.subscriptions = new Map()
this.combinations = new Map()
this.details = new Map()
this.combineObservers('__all__') // initialize all combination
this.logger = new ReplaySubject()
this.logger = new BehaviorSubject()
this.state = [] // holds last state of all observers
this.log = this.logger.next.bind(this.logger)
if (opts.verbose) this.logger.subscribe(console.log)
this.handler = opts.handler || console.log
if (opts.verbose||process.env.UCI_READY_VERBOSE==='true') this.logger.subscribe(console.log)
this.handler = opts.handler || ((ready) => {console.log('default handler', ready)})
this._first = true // tracks first emission
}
@ -37,8 +36,25 @@ class Ready extends Map {
return !failed ? '__none__' : ret
}
getObserverDetails(name) { return (this.get(name)||{}).details}
setObserverDetails(name,details,overwrite) {
if (this.has(name)) {
if (details==null) return false
if (!isPlainObject(details)) details= {desc:details}
// TODO substitue merge anything for Object.assign
this.get(name).details = overwrite ? details : Object.assign(this.get(name).details || {},details)
return true
}
return false
}
getObserver(name) {
return (this.get(name) || this.combinations.get(name||'__all__'))
return ((this.get(name) || {}).obs || this.combinations.get(name||'__all__'))
}
getCombination(name) {
this.combinations.get(name||'__all__')
}
getValue(name) { // NOT recommended. if there is any issue will return false
@ -54,16 +70,6 @@ class Ready extends Map {
})
}
addObserverDetails(name,details,overwrite) {
if (this.has(name)) {
if (!isPlainObject(details)) return false
// TODO substitue merge anything for Object.assign
this.details.set(name,overwrite ? details : Object.assign(this.details.get(name)||{},details))
return true
}
return false
}
addObserver(name, obs, opts={} ) {
// validation and defaults, obs can be emitter, osbserver, or promise
if (!name || typeof(name)!=='string') return false // name required
@ -73,78 +79,87 @@ class Ready extends Map {
condition = condition || this.condition
if (typeof (obs ||{}).on ==='function') obs = fromEvent(obs, event || name) // it's an emitter
if (isPromise(obs)) obs = from(obs) // it's a promise
if (obs && !isObservable(obs) && typeof obs==='function' && arguments.length===2) {
condition = obs
// obs = null
}
if (obs && !isObservable(obs) && typeof obs==='function' && arguments.length===2) condition = obs
if (!obs && this.emitter) obs = fromEvent(this.emitter,event || name)
if (!obs || !isObservable(obs)) return false
let xobs = obs.pipe(
startWith(false),
tap(val => this.log(`${name} emitted/resolved the value =>${JSON.stringify(val)}`)),
map(condition),
// tap(val => this.log(`boolean: ${val}`)),
shareReplay(1)
tap(val => this.log(`boolean: ${val}`)),
startWith(false),
shareReplay(1),
// multicast(new Subject())
)
this.set(name, xobs)
this.addObserverDetails(name,details,true)
let sub = xobs.subscribe()
this.set(name, {obs:xobs, sub:sub})
this.setObserverDetails(name,details)
this.combineObservers('__all__') // update total combo
if (this.subscriptions.has('__all__')) this.subscribe() // will resubscribe
return xobs
}
combineObservers(name,list) {
if (Array.isArray(name)) {list = name; name = null}
name = name || '__all__'
if (name==='__all__') list = Array.from(this.keys()) // get list of all observers
if (!Array.isArray(list)) return false // can't make a combo without a list
if (this.has(name)) return false // can't save a combo with same name as any single oberver
let observers = list.map(name=>this.getObserver(name)) // will get combo if exists
if (observers.filter(obs=>!isObservable(obs)).length) return false
let combination = combineLatest(observers).pipe(
tap(states => { if (name==='__all__') this.state = list.map((name,index) => [name,states[index]])}),
tap(states => { this.log(list.map((name,index) => [name,states[index]]))}),
map(states => states.reduce((res,state) => {return res && state},true)),
changed(), //filters out emission if it is unchanged from last
startWith(false),
shareReplay(1)
)
this.combinations.set(name, combination) // if name passed then save combo in Map
return combination
}
// will remove combination as well
removeObserver(names) {
if (!names) this.clear
if (!names) names = this.observerNames
else {
if (!Array.isArray(names)) names = [names]
console.log('names to remove', names)
names.forEach(name => {
this.delete(name)
const sub = (this.get(name)||{}).sub
if (sub) sub.unsubscribe() // remove attached subscription
this.unsubscribe(name) // remove any manual subscription
this.delete(name) || this.combinations.delete(name)
})
}
console.log(this.observerNames)
this.combineObservers('__all__') // update total combo
this.subscribe() // resubscribe to changed combo
}
subscribe(name, handler) {
// only one subscription at a time per observer or combination from this method
if (typeof name ==='function') {
handler=name
name = null
}
name = name || '__all__'
if (this.subscriptions.get(name)) this.unsubscribe(name)
let obs = this.getObserver(name)
handler = handler || (this.subscriptions.get(name)||{}).handler || this.handler
let obs = this.getObserver(name) // will attempt to get combo if no simple observer, all if name is null
if (!obs) return false
let subs = obs.subscribe(handler||this.handler)
this.subscriptions.set(name,subs)
if (this.subscriptions.has(name)) this.subscriptions.get(name).subs.unsubscribe()
let subs = obs.subscribe(handler)
this.subscriptions.set(name,{subs:subs,handler:handler})
return subs
}
unsubscribe(name) {
name = name ||'__all__'
if (!this.subscriptions.has(name)) return false
this.subscriptions.get(name||'__all__').unsubscribe()
this.subscriptions.get(name).subs.unsubscribe()
this.subscriptions.delete(name)
return true
}
combineObservers(name,list) {
if (Array.isArray(name)) {list = name; name = null}
if (name==='__all__') list = Array.from(this.keys()) // get list of all observers
if (!Array.isArray(list)) return false // can't make a combo without a list
if (this.has(name)) return false // can't save a combo with same name as any single oberver
let observers = list.map(name=>this.get(name)||this.getCombination(name))
if (observers.filter(obs=>!isObservable(obs)).length) return false
let combination = combineLatest(observers).pipe(
tap(states => { if (name==='__all__') this.state = list.map((name,index) => [name,states[index]])}),
map(states => states.reduce((res,state) => {return res && state},true)),
startWith(false),
changed(),
shareReplay(1)
)
if (name) this.combinations.set(name, combination) // if name passed then save combo in Map
return combination
}
getCombination(name) {
this.combinations.get(name||'__all__')
}
} // end class

View File

@ -1,15 +1,14 @@
import { expect } from 'chai'
import Ready from '../src/ready'
describe('', function () {
let ready = new Ready()
it('Should include custom types', function () {
expect(u.isBuffer(Buffer.from('this is a test'))).to.equal(true)
describe('Ready Testing', function () {
it('Should worth with four kinds of observers', function () {
// expect(u.isBuffer(Buffer.from('this is a test'))).to.equal(true)
})
it('Should load typechecker', function () {
expect(u.isPlainObject([1])).to.equal(false)
})
})