modify toBoolean to accept a packet with ready/online/active properties
make available via instance
refector to keep the 'all' combination separate from combineObserver
add 'all' getter
add combinations getter
add private method _combineAll
master
David Kebler 2020-01-18 22:05:42 -08:00
parent 2497321c66
commit 333d565894
4 changed files with 230 additions and 137 deletions

View File

@ -1,16 +1,10 @@
import { Observable } from 'rxjs'
import { Ready, changed, map, tap} from '../src/ready'
import { Ready } from '../src/ready'
import { EventEmitter } from 'events'
let emitter = new EventEmitter()
// let verbose = process.env.VERBOSE==='true'
let combo = true
// handler: (r)=> console.log('result:',r)
const late=3000
const late=500
let example = new Ready({emitter: emitter})
@ -27,82 +21,32 @@ let subscribe = ()=>{
if (!late) subscribe()
else setTimeout(subscribe,late)
const tObs = new Observable(subscriber => {
subscriber.next('on')
subscriber.next('off')
subscriber.next('enabled')
setTimeout(() => {
subscriber.next('F')
}, 7000)
setTimeout(() => {
subscriber.next('T')
}, 8000)
})
example.addObserver('obs',tObs,{details:{desc:'this is a constructed observable from the Observable Class'}})
const tPromise = new Promise(function(resolve) {
setTimeout(()=>{
console.log('promise observer is resolving')
resolve('yes')
example.addObserver('a',{details:{desc:'this is the a observer which comes from an emitter'}})
// .subscribe(ev=>{console.log('a',ev)})
example.addObserver('b')
// .subscribe(ev=>{console.log('b',ev)})
example.addObserver('c')
// .subscribe(ev=>{console.log('c',ev)})
console.log(' observers added>',example.observers)
example.combineObservers('a:b',['a','b']).subscribe(ready=>console.log('ab combo state>',ready))
console.log('-------------emitting, b true -----------------')
emitter.emit('b',true)
console.log('-------------emitting, c true -----------------')
emitter.emit('c',true)
console.log('===============done emitting =================')
setTimeout(async () => {
console.log('============emitting a true after 1 sec===================')
emitter.emit('a',true)
},1000)
})
example.addObserver('e',{details:{desc:'this is the e observer which comes from an emitter'}})
example.addObserver('ce',{condition: (ev)=>ev.ready, event:'ec'})
example.addObserver('pe',emitter)
example.addObserver('pr',tPromise)
example.addObserver('extnd',example.getObserver('e').pipe(
map(state => {
// console.log('+++++++++++\n','extend before:',state)
if (state) {
let val = Math.floor(Math.random() * Math.floor(10))
// console.log(val)
state = val <11 ? true:false
}
// console.log('extend after:',state,'\n-------------')
return state
})
),{details:'an observer extension of the e observer'})
if (combo) {
let combo = example.combineObservers('combo',['e','pr'])
.pipe(
tap(state=>console.log('combo e and pr state:',state)),
changed() // only emit if changed
)
if (combo){
console.log('a combination of e and pr was made\n','combinations are not added to \'all\' list')
console.log('subscribing to sub combination e, pr')
example.subscribe('combo',val => console.log('combo e, pr is:',val))
}
}
let madeonly = example.makeObserver('madeonly')
console.log('observer \'madeonly\' was only made and was not added so not in list \n',madeonly)
console.log('list of observers made and added to the all combination\n',example.observerNames)
emitter.emit('ec',{ready:true})
emitter.emit('pe','yup')
emitter.emit('e')
console.log('===============done initial emitting, pe,e,ec/ce=================')
setTimeout(async () => {
console.log('============emitting e, pe false===================')
example.setObserverDetails('e',{moreinfo:'an additional property of details added later'})
emitter.emit('e',false)
emitter.emit('pe',false)
}
,2000)
setTimeout(async () => {
console.log('=================emitting e true, removing pe which is false ================')
emitter.emit('e',true)
example.removeObserver('pe')
}
,4000)
console.log('============emitting a false after 2 sec===================')
emitter.emit('a',false)
setTimeout(async () => {console.log('timeout done')},6000)
},2000)
setTimeout(async () => {
console.log('============removing a after 3 sec===================')
example.removeObserver('a')
console.log('observers after removal>',example.observers)
},3000)
setTimeout(async () => {console.log('timeout done');process.exit()},10000)

117
examples/example2.js Normal file
View File

@ -0,0 +1,117 @@
import { Observable } from 'rxjs'
import { Ready, readyChanged as changed, map, tap} from '../src/ready'
import { EventEmitter } from 'events'
let emitter = new EventEmitter()
// let verbose = process.env.VERBOSE==='true'
let combo = false
const late=3000
let example = new Ready({emitter: emitter})
console.log('bound change function for use in customized observer', changed)
console.log('Boolean TEST: {online:\'yes\'}',example.toBoolean({online:'yes'}))
console.log('Boolean TEST: \'yes\'',example.toBoolean('yes'))
let subscribe = ()=>{
console.log('subscribing at',late, 'ms')
example.subscribe(ready => {
console.log(`-----------Subscriber at ${late} ms--------------?`,ready)
console.log('the failed observer:', example.failure, ',details:', example.getObserverDetails(example.failure))
console.log('the total state', example.state)
console.log('---------------------------------------')
})
}
if (!late) subscribe()
else setTimeout(subscribe,late)
const tObs = new Observable(subscriber => {
subscriber.next('on')
subscriber.next('off')
subscriber.next('enabled')
setTimeout(() => {
subscriber.next('F')
}, 7000)
setTimeout(() => {
subscriber.next('T')
}, 8000)
})
example.addObserver('obs',tObs,{details:{desc:'this is a constructed observable from the Observable Class'}})
const tPromise = new Promise(function(resolve) {
setTimeout(()=>{
console.log('promise observer is resolving')
resolve('yes')
},1000)
})
example.addObserver('e',{details:{desc:'this is the e observer which comes from an emitter'}})
// example.addObserver('ce',{condition: (ev)=>ev.ready, event:'ec'})
example.addObserver('ce',{event:'ec'})
example.addObserver('pe',emitter)
example.addObserver('pr',tPromise)
example.addObserver('extnd',example.getObserver('e').pipe(
map(state => {
// console.log('+++++++++++\n','extend before:',state)
if (state) {
let val = Math.floor(Math.random() * Math.floor(10))
// console.log(val)
state = val <11 ? true:false
}
// console.log('extend after:',state,'\n-------------')
return state
})
),{details:'an observer extension of the e observer'})
let madeonly = example.makeObserver({event:'madeonly'})
madeonly ? console.log('observer \'madeonly\' was only made and was not added so won\'t appear in \'all\' list')
: console.log('made observer failed')
console.log('list of observers made and added to the all combination\n',example.observers)
if (combo) {
let combo = example.combineObservers('combo',['e','pr'])
.pipe(
tap(state=>console.log('++++++++++++++ tap log of extened combo e and pr state:+++++++++++++++',state)),
)
if (combo){
example.subscribe('e', ready => console.log('e',ready))
example.subscribe('e', ready => console.log('pr',ready))
console.log('a combination name \'combo\' made of e and pr was created')
console.log('now subscribing to combination e, pr')
example.subscribe('combo', val => console.log('saved combo subscription state:',val))
combo.subscribe(val => console.log('extened combo subscription state:',val))
}
}
console.log('all current combinations',example.combinations)
console.log('------------------starting to emit values--------------------------')
// emitter.emit('ec',{ready:true})
emitter.emit('ec',true)
emitter.emit('pe','yup')
emitter.emit('e')
console.log('===============done initial emitting, pe,e,ec/ce=================')
setTimeout(async () => {
console.log('============emitting e, pe false===================')
example.setObserverDetails('e',{moreinfo:'an additional property of details added later'})
emitter.emit('e',false)
emitter.emit('pe',false)
}
,2000)
setTimeout(async () => {
console.log('=================emitting e true, removing pe which is false ================')
emitter.emit('e',true)
example.removeObserver('pe')
example.subscribe(ready=>console.log('a second all subscriber', ready),{add:true})
}
,4000)
setTimeout(async () => {console.log('timeout done');process.exit()},10000)

View File

@ -1,6 +1,6 @@
{
"name": "@uci-utils/ready",
"version": "0.1.6",
"version": "0.1.7",
"description": "A Class to Observe the reduced to boolean combined state of a map of observables",
"main": "src/ready.js",
"scripts": {

View File

@ -11,21 +11,27 @@ class Ready extends Map {
constructor(opts) {
super(opts.observables)
this.emitter = isEmitter(opts.emitter) ? opts.emitter : null
const toBool = createBoolean(opts.boolean) // e.g. {undefined:true}
this.condition = opts.condition || ( (ev) => toBool(ev) )
this.subscriptions = new Map()
this.combinations = new Map()
this.details = new Map()
this.combineObservers('__all__') // initialize all combination
const toBool = createBoolean(opts.boolean)
this.toBoolean = (value) => {
if (isPlainObject(value)) value = (value.ready || value.online || value.active || false)
return toBool(value)
}
this.condition = opts.condition || ( (ev) => this.toBoolean(ev) )
this._subscriptions = new Map()
this._combinations = new Map()
this._all = {} // where all observer is kept
this._combineAll()
this.logger = new BehaviorSubject()
this.state = [] // holds last state of all observers
this.state = [] // holds last emitted state of all combination
this.log = this.logger.next.bind(this.logger)
if (opts.verbose||process.env.UCI_READY_VERBOSE==='true') this.logger.subscribe(console.log)
this.handler = opts.handler || ((ready) => {console.log('default handler', ready)})
this._first = true // tracks first emission
}
get observerNames(){return Array.from(this.keys())}
get observers(){return Array.from(this.keys())}
get combinations(){return Array.from(this._combinations.keys())}
get all() { return this._all}
get failure () {
let ret = null
@ -50,11 +56,12 @@ class Ready extends Map {
}
getObserver(name) {
return ((this.get(name) || {}).obs || this.combinations.get(name||'__all__'))
return (this.get(name) || {}).obs
// return (name==null ||'__all__') ? this._all : ( (this.get(name) || {}).obs || this._combinations.get(name) || false )
}
getCombination(name) {
this.combinations.get(name||'__all__')
return this._combinations.get(name)
}
getValue(name) { // NOT recommended. if there is any issue will return false
@ -71,7 +78,6 @@ class Ready extends Map {
}
makeObserver(obs, opts={}) {
// validation and defaults, obs can be emitter, osbserver, or promise
if (isPlainObject(obs)) { opts = obs; obs=null }
if (!(obs || this.emitter)) return false // some observable requried
@ -79,14 +85,15 @@ class Ready extends Map {
condition = condition || this.condition
if (isEmitter(obs) && (event || name)) obs = fromEvent(obs, event || name ) // it's an emitter
if (isPromise(obs)) obs = from(obs) // it's a promise
if (obs && !isObservable(obs) && typeof obs==='function' && arguments.length===2) condition = obs
if (!obs && this.emitter && (event || name)) obs = fromEvent(this.emitter,event || name)
if (!obs && this.emitter && (event || name)) obs = fromEvent(this.emitter,(event || name))
if (!obs || !isObservable(obs)) return false
let xobs = obs.pipe(
tap(val => this.log(`${name} emitted/resolved the value =>${JSON.stringify(val)}`)),
// TODO retain plain object if it has a ready property
map(condition),
tap(val => this.log(`boolean: ${val}`)),
startWith(false),
// shareReplay({refCount:true, bufferSize:1}),
shareReplay(1),
)
xobs._readyType = true
@ -96,86 +103,111 @@ class Ready extends Map {
addObserver(name, obs, opts={} ) {
// validation and defaults, obs can be emitter, osbserver, or promise
if (!name || typeof(name)!=='string') return false // name required
if (isPlainObject(obs)) { opts = obs; obs=null }
if (this.has(name) && !opts.overwrite) return false // don't over write exiting observer
else this.removeObserver(name)
opts.name = name
if (!(obs || {})._readyType) obs = this.makeObserver(obs, opts)
if (!isObservable(obs)) return false
let { details } = opts
let sub = obs.subscribe()
let sub = obs.subscribe() // internal subscriber
// let sub = null
this.set(name, {obs:obs, sub:sub})
this.setObserverDetails(name,details)
this.combineObservers('__all__',opts) // update total combo
if (this.subscriptions.has('__all__')) this.subscribe() // will resubscribe
this.setObserverDetails(name,opts.details)
this._combineAll()
return obs
}
// TODO add option for whether changed is enabled (default) or not and store with combo for remake
combineObservers(name,list,opts={}) {
if (Array.isArray(name)) {list = name; name = null}
name = name || '__all__'
if (name==='__all__') list = Array.from(this.keys()) // get list of all observers
if (!Array.isArray(list)) return false // can't make a combo without a list
if (this.has(name)) return false // can't save a combo with same name as any single oberver
let observers = list.map(name=>this.getObserver(name)) // will get combo if exists
if (observers.filter(obs=>!isObservable(obs)).length) return false
let combination = combineLatest(observers).pipe(
tap(states => { if (name==='__all__') this.state = list.map((name,index) => [name,states[index]])}),
tap(states => { this.log(list.map((name,index) => [name,states[index]]))}),
map(states => states.reduce((res,state) => {return res && state},true)),
changed(), //filters out emission if it is unchanged from last
startWith(false),
shareReplay(1)
)
this.combinations.set(name, combination) // if name passed then save combo in Map
return combination
}
// will remove combination as well
removeObserver(names) {
if (!names) names = this.observerNames
else {
if (!names) names = this.observers // remove all
if (!Array.isArray(names)) names = [names]
console.log('names to remove', names)
names.forEach(name => {
const sub = (this.get(name)||{}).sub
if (sub) sub.unsubscribe() // remove attached subscription
this.unsubscribe(name) // remove any manual subscription
this.delete(name) || this.combinations.delete(name)
this.delete(name)
})
}
console.log(this.observerNames)
this.combineObservers('__all__') // update total combo
this.subscribe() // resubscribe to changed combo
// update total combo
this._combineAll()
// TODO update affected combinations
}
subscribe(name, handler, save) {
// TODO add option for whether changed is enabled (default) or not and store with combo for remake
combineObservers(name,list,opts={}) {
if (!name || typeof(name)!=='string') return false // name required
if (this.has(name)) return false // can't save a combo with same name as any single oberver
if (!Array.isArray(list)) return false // need a list of observers
if (this._combinations.has(name)) {
if (opts.overwrite) this.removeCombination(name)
else return false
}
let observers = list.map(name=>{return name._readyType ? name : this.getObserver(name)})
observers = observers.filter(obs=>obs._readyType)
if (!observers.length) return false
let combination = combineLatest(observers).pipe(
tap(states => { this.log(list.map((name,index) => [name,states[index]]))}),
map(states => states.reduce((res,state) => {return res && state},true)),
changed(), //filters out emission if it is unchanged from last
shareReplay(1)
)
this._combinations.set(name, combination) // if name passed then save combo in Map
return combination
}
removeCombination(names){
if (!names) names = this.combinations // remove all
if (!Array.isArray(names)) names = [names]
names.forEach(name => {
// const sub = (this._combinations.get(name)||{}).sub
// if (sub) sub.unsubscribe() // remove attached subscription
this.unsubscribe(name) // remove any resigtered subscription
this._combinations.delete(name)
})
}
subscribe(name, handler, opts={}) {
// only one subscription at a time per observer or combination from this method
if (typeof name ==='function') {
opts=handler||{}
handler=name
name = null
}
name = name ||'__all__'
handler = handler || (this.subscriptions.get(name)||{}).handler || this.handler
let obs = this.getObserver(name) // will attempt to get combo if no simple observer, all if name is null
handler = handler || (this._subscriptions.get(name)||{}).handler || this.handler // will resubscribe exisiting handler
let obs = name==='__all__' ? this._all : (this.getObserver(name)||this.getCombination(name))
if (!obs) return false
if (this.subscriptions.has(name)) this.subscriptions.get(name).subs.unsubscribe()
if (this._subscriptions.has(name)) this._subscriptions.get(name).subs.unsubscribe()
let subs = obs.subscribe(handler)
if (save) this.subscriptions.set(name,{subs:subs,handler:handler})
// TODO allow reseting additional subscription on add/remove
if (!opts.add) this._subscriptions.set(name, {subs:subs,handler:handler})
return subs
}
unsubscribe(name) {
name = name ||'__all__'
if (!this.subscriptions.has(name)) return false
this.subscriptions.get(name).subs.unsubscribe()
this.subscriptions.delete(name)
if (!this._subscriptions.has(name)) return false
this._subscriptions.get(name).subs.unsubscribe()
this._subscriptions.delete(name)
return true
}
_combineAll() {
let observers = this.observers.map(name=>this.getObserver(name))
this._all = combineLatest(observers).pipe(
tap(states => { this.state = this.observers.map((name,index) => [name,states[index]]) }),
tap(states => { this.log(this.observers.map((name,index) => [name,states[index]])) }),
map(states => states.reduce((res,state) => {return res && state},true)),
changed(), //filters out emission if it is unchanged from last
)
if (this._subscriptions.has('__all__')) this.subscribe() // will resubscribe with same handler
}
} // end class
function changed(toBoolean) {
toBoolean = toBoolean || (val => !!val)
return obs$ => obs$.pipe(
startWith(false),
pairwise(),
filter( ([p,c]) => {
const chng = !obs$.__not_first__ || !!(toBoolean(p) ^ toBoolean(c))