0.0.9 refactored the run/queue logic (removed trigger function)

todo - need to refactor the abort/stop functions
master
David Kebler 2020-06-18 15:02:09 -07:00
parent e9fce89436
commit 080ac32648
4 changed files with 123 additions and 118 deletions

View File

@ -7,7 +7,7 @@ const dt = new Date()
const NZONES = 6 const NZONES = 6
const ZONE = { const ZONE = {
duration: 5, duration: 3,
settings:{ settings:{
enabled: true, enabled: true,
simultaneous: false, simultaneous: false,
@ -52,13 +52,10 @@ const ZONE = {
schedules: schs, schedules: schs,
}) })
runner.rxSubscribe('countdown','runner',value => { console.log(value)}) runner.rxSubscribe('countdown','runner',value => { console.log('trigger #', runner._toID,'in', value)})
runner.rxSubscribe('running','runner',list => { console.log('running schedules:',list)})
runner.rxSubscribe('running','runner',schs => { console.log('running schedules:',schs.map(sch=>sch.name).join(','))}) runner.rxSubscribe('queue','runner',list => { console.log('queued schedules:',list)})
runner.rxSubscribe('queueCount','runner',value => { console.log('queue schedule count', value)})
runner.rxSubscribe('queue','runner',schs => { console.log('queued schedules:',schs.map(sch=>sch.name).join(','))})
runner.rxSubscribe('activeCount','runner',value => { console.log('active schedule count', value)})
runner.rxSubscribe('runningCount','runner',value => { console.log('running schedule count', value)}) runner.rxSubscribe('runningCount','runner',value => { console.log('running schedule count', value)})
// setInterval(() => { // setInterval(() => {
@ -69,7 +66,7 @@ const ZONE = {
// 60*1000 // 60*1000
// ) // )
runner.enabled=true runner.enabled=true
setTimeout(()=>runner.enabled=false,6000) // setTimeout(()=>runner.enabled=false,6000)
})().catch(err => { })().catch(err => {
console.error('FATAL: UNABLE TO START SYSTEM!\n',err) console.error('FATAL: UNABLE TO START SYSTEM!\n',err)
@ -87,10 +84,10 @@ function stop (zone,activeSch) {
async function startAsync (zone,activeSch) { async function startAsync (zone,activeSch) {
// console.log('async start function') // console.log('async start function')
console.log('-----starting-------',zone.id, activeSch.runID) console.log('-----running----->>',zone.id, activeSch.runID)
// console.log('---waiting-----', zone.duration,'secs duration to complete') // console.log('---waiting-----', zone.duration,'secs duration to complete')
const tick = setInterval(()=>{ const tick = setInterval(()=>{
console.log('duration tick') console.log('duration tick',zone.id)
}, 1000) }, 1000)
const duration = delay(zone.duration*1000) const duration = delay(zone.duration*1000)
this.once('abort:'+activeSch.runID,()=> { this.once('abort:'+activeSch.runID,()=> {
@ -109,7 +106,7 @@ function startPromise (zone,activeSch) {
console.log('starting>>>',zone.id) console.log('starting>>>',zone.id)
console.log('---waiting-----', zone.duration,'secs duration to complete') console.log('---waiting-----', zone.duration,'secs duration to complete')
const tick = setInterval(()=>{ const tick = setInterval(()=>{
console.log('duration tick') console.log('duration tick',zone.id)
}, 1000) }, 1000)
this.once('abort:'+activeSch.runID,()=> { this.once('abort:'+activeSch.runID,()=> {
clearTimeout(run) clearTimeout(run)

View File

@ -1,6 +1,6 @@
{ {
"name": "@uci-utils/scheduler", "name": "@uci-utils/scheduler",
"version": "0.0.8", "version": "0.0.9",
"description": "an interval reoccuring style scheduler and runner", "description": "an interval reoccuring style scheduler and runner",
"main": "src/index.js", "main": "src/index.js",
"scripts": { "scripts": {

View File

@ -13,21 +13,22 @@ let init = true
class UCIScheduleRunner extends RxClass { class UCIScheduleRunner extends RxClass {
constructor(opts) { constructor(opts) {
super(opts) super(opts)
this.name = opts.name || 'queue' this.name = opts.name || 'Scheule Runner'
this.set('state',false) // if runner is actually started this.set('state',false) // if runner is actually started
this.set('timer',0) // opts.timer != null ? opts.timer : DEFAULT_SETTINGS.timer) this.set('timer',0) // opts.timer != null ? opts.timer : DEFAULT_SETTINGS.timer)
// console.log('timer', this.timer, DEFAULT_SETTINGS.timer) // console.log('timer', this.timer, DEFAULT_SETTINGS.timer)
this.schedules = [] this.schedules = []
this._toID = 0 // id of timeout this._toID = 0 // id of timeout
this._delayed = [] // active schedules that are delayed // this._delayed = [] // active schedules that are delayed
this._active = {} // object or lists of all active (running or queued) schedules by timeout id this._running = []
this._queue = [] // object or lists of all active (running or queued) schedules by timeout id
this._disabledSchedules = [] this._disabledSchedules = []
this.set('running',[]) // list of currently running schedules this.set('running','') // list of currently running schedules
this.set('queue',[]) // list of queued names awaiting run this.set('queue','') // list of queued names awaiting run
this.set('nextTS',0) this.set('nextTS',0)
this.set('nextDT','') this.set('nextDT','')
this.set('nextName','') this.set('nextName','')
this.set('activeCount',0) this.set('queueCount',0)
this.set('countdown','') this.set('countdown','')
this.set('runningCount',0) this.set('runningCount',0)
// console.log('passed schedule',opts.schedules.map(sch=>sch.id)) // console.log('passed schedule',opts.schedules.map(sch=>sch.id))
@ -35,13 +36,12 @@ class UCIScheduleRunner extends RxClass {
this.addSchedule(sch) this.addSchedule(sch)
}) })
this._delayedListener = this.rxSubscribe('activeCount','ready',count => { this.rxSubscribe('runningCount','ready',count => {
if (!count && this._delayed.length) { if (!count && this._queue.length) {
const sch=this._delayed.shift() setTimeout(()=>this.runFromQueue(),0)
console.log('a delayed action now proceeding====>', sch.name, sch.toid)
this.startScheduleAction(sch)
} }
}) })
this.rxSubscribe('timer','countdown', (interval) => { this.rxSubscribe('timer','countdown', (interval) => {
// console.log('timer state changed', interval) // console.log('timer state changed', interval)
clearInterval(this.countdownInterval) clearInterval(this.countdownInterval)
@ -79,22 +79,6 @@ class UCIScheduleRunner extends RxClass {
return (this.schedules[0]||{}).countdownMS return (this.schedules[0]||{}).countdownMS
} }
// updateActiveNamesList(){
// console.log('!!!!!!!!!!!!!!active list changed',Object.keys(this._active).length,this._delayed.length)
// let names = ''
// Object.entries(this._active).map(list => {
// names = names + list[1].map(sch=>sch.name).join(',')
// })
// this.active = {running:names, queue:this._queue.map(sch=>sch.name).join(',')}
// console.log(this.active)
// return this.active
// }
// init() {
//
//
// } // end init
async start() { async start() {
console.log('-----------starting scheduler runner-----------------') console.log('-----------starting scheduler runner-----------------')
this.state = true this.state = true
@ -119,42 +103,7 @@ class UCIScheduleRunner extends RxClass {
this.countdown = '' this.countdown = ''
this.nextName = '' this.nextName = ''
this.runnerTimeout('stop') this.runnerTimeout('stop')
this.removeActive() // this.removeActive()
}
async abortRunning(id) {
console.log('aborting running schedule(s)', id||'')
const running = id ? (this.running.filter(sch=>sch.id===id) ||[]) : this.running
for (let sch of running) {
if (sch.stopAction) await sch.stopAction()
else console.log(sch.name,'no stop action registered can not force active schedule to stop')
}
}
removeFromQueue(id) {
console.log('removing queued schedule(s)', id||'')
const curr = this.queue.length
if (id) this.queue = this.queue.filter(s=>s.id!==id)
else this.queue = []
this.activeCount-= curr-this.queue.length
}
removeFromDelayed(id) {
console.log('removing delayed schedule(s)', id||'')
if (id) this._delayed = this.delayed.filter(s=>s.id!==id)
else this._delayed = []
}
async removeActive(id) {
this.removeFromDelayed(id)
await this.abortRunning(id)
this.removeFromQueue(id)
console.log('clearing list activated schedules')
if (id) {
for (let toid in this._active) {
this._active[toid] = this._active[toid].filter(s=>s.id!==id)
}
} else this._active = {}
} }
// main timeout based on the first schedule to trigger // main timeout based on the first schedule to trigger
@ -167,7 +116,6 @@ class UCIScheduleRunner extends RxClass {
// console.log('current schedules run order\n',this.schedules.map(sch=>`${sch.nextDT} : ${sch.name}`)) // console.log('current schedules run order\n',this.schedules.map(sch=>`${sch.nextDT} : ${sch.name}`))
this._runnerTimeout = setTimeout(async () => { this._runnerTimeout = setTimeout(async () => {
console.log('**********************timeout triggered for ID', this._toID, '****************************') console.log('**********************timeout triggered for ID', this._toID, '****************************')
this._active[this._toID]=[]
let updates = [] let updates = []
this.schedules.forEach(sch =>{ // add first and any others set for ~ same time this.schedules.forEach(sch =>{ // add first and any others set for ~ same time
if (sch.countdownMS <= 1000 ) { // should catch any with same TS if (sch.countdownMS <= 1000 ) { // should catch any with same TS
@ -175,38 +123,104 @@ class UCIScheduleRunner extends RxClass {
const activeSch = clone(sch) const activeSch = clone(sch)
activeSch.toid = this._toID activeSch.toid = this._toID
activeSch.runID = `${this._toID}:${sch.id}` activeSch.runID = `${this._toID}:${sch.id}`
this._active[this._toID].push(activeSch) this.activeCount++
if (activeSch.settings.simultaneous) this.runSchedule(sch)
else this.queueSchedule(activeSch)
} }
}) })
this._active[this._toID].next = 0 if (!this.runningCount && this._queue.length) this.runFromQueue()
this.trigger(this._active[this._toID].length,this._toID) // this._queue[this._toID].next = 0
// this.trigger(this._queue[this._toID].length,this._toID)
updates.forEach(update=>update()) // only need update the ones that were active updates.forEach(update=>update()) // only need update the ones that were active
// await this.update() // updates all _enabled // await this.update() // updates all _enabled
},this._countdownMS) },this._countdownMS)
} }
trigger (count,toid) { // trigger all schedules actions that have same timestamp as first in queue runSchedule(sch) {
if (count < 1 || count > this._active[toid].length) { this.startScheduleAction(sch)
// console.log('done: returning') this._running.push(sch)
return this.runningCount=this._running.length
} this.running = this._running.map(s=>s.name).join(',')
// console.log(count,toid,this._active[toid])
const idx = this._active[toid].length-count
const sch = this._active[toid][idx]
count--
this.activeCount++
this.queue = this.queue.concat([sch]) // for rx must overwrite not push
if (sch.simultaneous) this.startScheduleAction(sch)
else if (!this.runningCount) this.startScheduleAction(sch)
// if (idx===0 && !this.runningCount) this.startScheduleAction(sch)
else {
console.log('adding schedule to delay',sch.name,sch.toid)
this._delayed.push(sch)
}
this.trigger(count,toid)
} }
runFromQueue(){
if (this._queue.length) {
const run = this._queue.shift()
this.queueCount=this._queue.length
this.queue = this._queue.map(s=>s.name).join(',')
this._running.push(run)
this.runningCount=this._running.length
this.running = this._running.map(s=>s.name).join(',')
this.startScheduleAction(run)
}
}
queueSchedule(sch){
this._queue.push(sch)
this.queueCount=this._queue.length
this.queue = this._queue.map(s=>s.name).join(',')
}
// async abortRunning(id) {
// console.log('aborting running schedule(s)', id||'')
// const running = id ? (this.running.filter(sch=>sch.id===id) ||[]) : this.running
// for (let sch of running) {
// if (sch.stopAction) await sch.stopAction()
// else console.log(sch.name,'no stop action registered can not force active schedule to stop')
// }
// }
// removeFromQueue(id) {
// console.log('removing queued schedule(s)', id||'')
// const curr = this.queue.length
// if (id) this.queue = this.queue.filter(s=>s.id!==id)
// else this.queue = []
// this.activeCount-= curr-this.queue.length
// }
//
// async removeFromDelayed(id) {
// console.log('removing delayed schedule(s)', id||'')
// if (id) this._delayed = this._delayed.filter(s=>s.id!==id)
// else this._delayed = []
// return
// }
//
// async removeActive(id) {
// await this.removeFromDelayed(id)
// await this.abortRunning(id)
// this.removeFromQueue(id)
// console.log('clearing list activated schedule(s)',id||'')
// if (id) {
// for (let toid in this._queue) {
// this._queue[toid] = this._queue[toid].filter(s=>s.id!==id)
// }
// } else this._queue = {}
// }
// trigger (count,toid) { // trigger all schedules actions that have same timestamp as first in queue
// if (count < 1 || count > this._queue[toid].length) {
// // console.log('done: returning')
// return
// }
// // console.log(count,toid,this._queue[toid])
// const idx = this._queue[toid].length-count
// const sch = this._queue[toid][idx]
// count--
// console.log('adding to queue',sch.id)
// this.activeCount++
// this.queue = this.queue.concat([sch]) // for rx must overwrite not push
// if (sch.simultaneous) setTimeout(() => this.startScheduleAction(sch),0)
// else if(!this.runningCount) setTimeout(() => this.startScheduleAction(sch),0)
// // if (idx===0 && !this.runningCount) this.startScheduleAction(sch)
// else {
// console.log('adding schedule to delay',sch.name,sch.toid)
// this._delayed.push(sch)
// }
//
// this.trigger(count,toid)
// }
addSchedule (sch) { addSchedule (sch) {
// console.log('####################',sch.id,sch.enabled,'adding###########') // console.log('####################',sch.id,sch.enabled,'adding###########')
this.schedules.push(sch) this.schedules.push(sch)
@ -291,27 +305,21 @@ class UCIScheduleRunner extends RxClass {
return this._disabledSchedules.find(a => a.id === id) return this._disabledSchedules.find(a => a.id === id)
} }
async startScheduleAction (sch, toid) { async startScheduleAction (sch) {
toid = sch.toid || toid // toid = sch.toid || toid
console.log('---- a scheduled run started----',sch.runID) console.log('---- in RUNNER: scheduled run started----',sch.runID)
// TODO check if async or promise // TODO check if async or promise
this.runningCount++
this.running = this.running.concat(this.queue.filter(s=>s.id===sch.id))
this.queue = this.queue.filter(s=>s.id!==sch.id)
sch.startAction(sch.runID) sch.startAction(sch.runID)
sch.once(sch.runID,removeScheduleFromActiveList.bind(this)) // emitted on abort too sch.once(sch.runID,removeScheduleFromActiveList.bind(this)) // emitted on abort too
function removeScheduleFromActiveList() { function removeScheduleFromActiveList() {
console.log('-----scheduled run is complete or aborted removing from active list ----',sch.runID) console.log('-----scheduled run is complete or aborted removing run list ----')
// done remove from active list this._running = this._running.filter(s=>s.runID!==sch.runID)
this._active[toid]=this._active[toid].filter(s=>s.id!==sch.id) this.running = this._running.map(s=>s.name).join(',')
this.runningCount-- this.runningCount = this._running.length
this.activeCount--
this.running = this.running.filter(s=>s.id!==sch.id) if (!this._queue.length) {
if (!this._active[toid].length) { console.log('========== the queue is empty nothing left to run =========')
// none left on that list, remove it
console.log('===========all actions for timeout id', toid, 'are complete===========')
delete this._active[toid]
} }
// console.log('schedule action complete', sch.name,toid, this.runningCount) // console.log('schedule action complete', sch.name,toid, this.runningCount)
} }

View File

@ -103,7 +103,7 @@ class UCISchedule extends RxClass {
let intoDayTS = (dt.getSeconds() + (60 * dt.getMinutes()) + (60 * 60 * dt.getHours())) let intoDayTS = (dt.getSeconds() + (60 * dt.getMinutes()) + (60 * 60 * dt.getHours()))
let nowTS = Math.floor(Date.now()/1000) + 1 // last TS and nowTS are within 1 sec so add one let nowTS = Math.floor(Date.now()/1000) + 1 // last TS and nowTS are within 1 sec so add one
let nextTS = nowTS - intoDayTS + baseTS let nextTS = nowTS - intoDayTS + baseTS
// console.log(this.name, nextTS-nowTS,'first computed time stamp', nextTS) // console.log(this.name, nextTS-nowTS,'first computed time stamp', nextTS,nowTS)
while (nowTS > nextTS) { while (nowTS > nextTS) {
// console.log(`now ${nowTS} is beyond next ${nextTS} adding delta ${delta * 60 * 60 } seconds`) // console.log(`now ${nowTS} is beyond next ${nextTS} adding delta ${delta * 60 * 60 } seconds`)
nextTS += (delta * 60 * 60 ) // delta in hours, TS in secs nextTS += (delta * 60 * 60 ) // delta in hours, TS in secs
@ -111,7 +111,7 @@ class UCISchedule extends RxClass {
// console.log(lastTS,'computed time stamp', nextTS) // console.log(lastTS,'computed time stamp', nextTS)
if(nextTS !== lastTS || lastTS == null || force===true) { if(nextTS !== lastTS || lastTS == null || force===true) {
// console.log('UPDATED TS>>>>>>>>>>>>>>>>', this.name,hour,minute,delta,nextTS,nextTS-lastTS) // console.log('UPDATED TS>>>>>>>>>>>>>>>>', this.name,hour,minute,delta,nextTS,nextTS-lastTS)
this.nextDT = moment(nextTS*1000).format('ddd, MMM Do h:mm:ss A') this.nextDT = moment(nextTS*1000).format('ddd, MMM Do h:mm A')
this.nextTS = nextTS this.nextTS = nextTS
this._countdown() this._countdown()
// console.log(this.nextTS,this.nextDT, this.countdown) // console.log(this.nextTS,this.nextDT, this.countdown)