From 080ac32648053f232546f4b9b36dbad65365e78f Mon Sep 17 00:00:00 2001 From: David Kebler Date: Thu, 18 Jun 2020 15:02:09 -0700 Subject: [PATCH] 0.0.9 refactored the run/queue logic (removed trigger function) todo - need to refactor the abort/stop functions --- examples/runner.js | 21 ++--- package.json | 2 +- src/runner.js | 214 +++++++++++++++++++++++---------------------- src/schedule.js | 4 +- 4 files changed, 123 insertions(+), 118 deletions(-) diff --git a/examples/runner.js b/examples/runner.js index 81b71a4..0262a02 100644 --- a/examples/runner.js +++ b/examples/runner.js @@ -7,7 +7,7 @@ const dt = new Date() const NZONES = 6 const ZONE = { - duration: 5, + duration: 3, settings:{ enabled: true, simultaneous: false, @@ -52,13 +52,10 @@ const ZONE = { schedules: schs, }) - runner.rxSubscribe('countdown','runner',value => { console.log(value)}) - - runner.rxSubscribe('running','runner',schs => { console.log('running schedules:',schs.map(sch=>sch.name).join(','))}) - - runner.rxSubscribe('queue','runner',schs => { console.log('queued schedules:',schs.map(sch=>sch.name).join(','))}) - - runner.rxSubscribe('activeCount','runner',value => { console.log('active schedule count', value)}) + runner.rxSubscribe('countdown','runner',value => { console.log('trigger #', runner._toID,'in', value)}) + runner.rxSubscribe('running','runner',list => { console.log('running schedules:',list)}) + runner.rxSubscribe('queue','runner',list => { console.log('queued schedules:',list)}) + runner.rxSubscribe('queueCount','runner',value => { console.log('queue schedule count', value)}) runner.rxSubscribe('runningCount','runner',value => { console.log('running schedule count', value)}) // setInterval(() => { @@ -69,7 +66,7 @@ const ZONE = { // 60*1000 // ) runner.enabled=true - setTimeout(()=>runner.enabled=false,6000) + // setTimeout(()=>runner.enabled=false,6000) })().catch(err => { console.error('FATAL: UNABLE TO START SYSTEM!\n',err) @@ -87,10 +84,10 @@ function stop (zone,activeSch) { async function startAsync (zone,activeSch) { // console.log('async start function') - console.log('-----starting-------',zone.id, activeSch.runID) + console.log('-----running----->>',zone.id, activeSch.runID) // console.log('---waiting-----', zone.duration,'secs duration to complete') const tick = setInterval(()=>{ - console.log('duration tick') + console.log('duration tick',zone.id) }, 1000) const duration = delay(zone.duration*1000) this.once('abort:'+activeSch.runID,()=> { @@ -109,7 +106,7 @@ function startPromise (zone,activeSch) { console.log('starting>>>',zone.id) console.log('---waiting-----', zone.duration,'secs duration to complete') const tick = setInterval(()=>{ - console.log('duration tick') + console.log('duration tick',zone.id) }, 1000) this.once('abort:'+activeSch.runID,()=> { clearTimeout(run) diff --git a/package.json b/package.json index bf1c1f2..ef47a69 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@uci-utils/scheduler", - "version": "0.0.8", + "version": "0.0.9", "description": "an interval reoccuring style scheduler and runner", "main": "src/index.js", "scripts": { diff --git a/src/runner.js b/src/runner.js index 6227657..d89be9b 100644 --- a/src/runner.js +++ b/src/runner.js @@ -13,21 +13,22 @@ let init = true class UCIScheduleRunner extends RxClass { constructor(opts) { super(opts) - this.name = opts.name || 'queue' + this.name = opts.name || 'Scheule Runner' this.set('state',false) // if runner is actually started this.set('timer',0) // opts.timer != null ? opts.timer : DEFAULT_SETTINGS.timer) // console.log('timer', this.timer, DEFAULT_SETTINGS.timer) this.schedules = [] this._toID = 0 // id of timeout - this._delayed = [] // active schedules that are delayed - this._active = {} // object or lists of all active (running or queued) schedules by timeout id + // this._delayed = [] // active schedules that are delayed + this._running = [] + this._queue = [] // object or lists of all active (running or queued) schedules by timeout id this._disabledSchedules = [] - this.set('running',[]) // list of currently running schedules - this.set('queue',[]) // list of queued names awaiting run + this.set('running','') // list of currently running schedules + this.set('queue','') // list of queued names awaiting run this.set('nextTS',0) this.set('nextDT','') this.set('nextName','') - this.set('activeCount',0) + this.set('queueCount',0) this.set('countdown','') this.set('runningCount',0) // console.log('passed schedule',opts.schedules.map(sch=>sch.id)) @@ -35,13 +36,12 @@ class UCIScheduleRunner extends RxClass { this.addSchedule(sch) }) - this._delayedListener = this.rxSubscribe('activeCount','ready',count => { - if (!count && this._delayed.length) { - const sch=this._delayed.shift() - console.log('a delayed action now proceeding====>', sch.name, sch.toid) - this.startScheduleAction(sch) + this.rxSubscribe('runningCount','ready',count => { + if (!count && this._queue.length) { + setTimeout(()=>this.runFromQueue(),0) } }) + this.rxSubscribe('timer','countdown', (interval) => { // console.log('timer state changed', interval) clearInterval(this.countdownInterval) @@ -79,22 +79,6 @@ class UCIScheduleRunner extends RxClass { return (this.schedules[0]||{}).countdownMS } - // updateActiveNamesList(){ - // console.log('!!!!!!!!!!!!!!active list changed',Object.keys(this._active).length,this._delayed.length) - // let names = '' - // Object.entries(this._active).map(list => { - // names = names + list[1].map(sch=>sch.name).join(',') - // }) - // this.active = {running:names, queue:this._queue.map(sch=>sch.name).join(',')} - // console.log(this.active) - // return this.active - // } - - // init() { - // - // - // } // end init - async start() { console.log('-----------starting scheduler runner-----------------') this.state = true @@ -119,42 +103,7 @@ class UCIScheduleRunner extends RxClass { this.countdown = '' this.nextName = '' this.runnerTimeout('stop') - this.removeActive() - } - - async abortRunning(id) { - console.log('aborting running schedule(s)', id||'') - const running = id ? (this.running.filter(sch=>sch.id===id) ||[]) : this.running - for (let sch of running) { - if (sch.stopAction) await sch.stopAction() - else console.log(sch.name,'no stop action registered can not force active schedule to stop') - } - } - - removeFromQueue(id) { - console.log('removing queued schedule(s)', id||'') - const curr = this.queue.length - if (id) this.queue = this.queue.filter(s=>s.id!==id) - else this.queue = [] - this.activeCount-= curr-this.queue.length - } - - removeFromDelayed(id) { - console.log('removing delayed schedule(s)', id||'') - if (id) this._delayed = this.delayed.filter(s=>s.id!==id) - else this._delayed = [] - } - - async removeActive(id) { - this.removeFromDelayed(id) - await this.abortRunning(id) - this.removeFromQueue(id) - console.log('clearing list activated schedules') - if (id) { - for (let toid in this._active) { - this._active[toid] = this._active[toid].filter(s=>s.id!==id) - } - } else this._active = {} + // this.removeActive() } // main timeout based on the first schedule to trigger @@ -167,7 +116,6 @@ class UCIScheduleRunner extends RxClass { // console.log('current schedules run order\n',this.schedules.map(sch=>`${sch.nextDT} : ${sch.name}`)) this._runnerTimeout = setTimeout(async () => { console.log('**********************timeout triggered for ID', this._toID, '****************************') - this._active[this._toID]=[] let updates = [] this.schedules.forEach(sch =>{ // add first and any others set for ~ same time if (sch.countdownMS <= 1000 ) { // should catch any with same TS @@ -175,38 +123,104 @@ class UCIScheduleRunner extends RxClass { const activeSch = clone(sch) activeSch.toid = this._toID activeSch.runID = `${this._toID}:${sch.id}` - this._active[this._toID].push(activeSch) + this.activeCount++ + if (activeSch.settings.simultaneous) this.runSchedule(sch) + else this.queueSchedule(activeSch) } }) - this._active[this._toID].next = 0 - this.trigger(this._active[this._toID].length,this._toID) + if (!this.runningCount && this._queue.length) this.runFromQueue() + // this._queue[this._toID].next = 0 + // this.trigger(this._queue[this._toID].length,this._toID) updates.forEach(update=>update()) // only need update the ones that were active // await this.update() // updates all _enabled },this._countdownMS) } - trigger (count,toid) { // trigger all schedules actions that have same timestamp as first in queue - if (count < 1 || count > this._active[toid].length) { - // console.log('done: returning') - return - } - // console.log(count,toid,this._active[toid]) - const idx = this._active[toid].length-count - const sch = this._active[toid][idx] - count-- - this.activeCount++ - this.queue = this.queue.concat([sch]) // for rx must overwrite not push - if (sch.simultaneous) this.startScheduleAction(sch) - else if (!this.runningCount) this.startScheduleAction(sch) - // if (idx===0 && !this.runningCount) this.startScheduleAction(sch) - else { - console.log('adding schedule to delay',sch.name,sch.toid) - this._delayed.push(sch) - } - - this.trigger(count,toid) + runSchedule(sch) { + this.startScheduleAction(sch) + this._running.push(sch) + this.runningCount=this._running.length + this.running = this._running.map(s=>s.name).join(',') } + runFromQueue(){ + if (this._queue.length) { + const run = this._queue.shift() + this.queueCount=this._queue.length + this.queue = this._queue.map(s=>s.name).join(',') + this._running.push(run) + this.runningCount=this._running.length + this.running = this._running.map(s=>s.name).join(',') + this.startScheduleAction(run) + } + } + + queueSchedule(sch){ + this._queue.push(sch) + this.queueCount=this._queue.length + this.queue = this._queue.map(s=>s.name).join(',') + } + + + // async abortRunning(id) { + // console.log('aborting running schedule(s)', id||'') + // const running = id ? (this.running.filter(sch=>sch.id===id) ||[]) : this.running + // for (let sch of running) { + // if (sch.stopAction) await sch.stopAction() + // else console.log(sch.name,'no stop action registered can not force active schedule to stop') + // } + // } + + // removeFromQueue(id) { + // console.log('removing queued schedule(s)', id||'') + // const curr = this.queue.length + // if (id) this.queue = this.queue.filter(s=>s.id!==id) + // else this.queue = [] + // this.activeCount-= curr-this.queue.length + // } + // + // async removeFromDelayed(id) { + // console.log('removing delayed schedule(s)', id||'') + // if (id) this._delayed = this._delayed.filter(s=>s.id!==id) + // else this._delayed = [] + // return + // } + // + // async removeActive(id) { + // await this.removeFromDelayed(id) + // await this.abortRunning(id) + // this.removeFromQueue(id) + // console.log('clearing list activated schedule(s)',id||'') + // if (id) { + // for (let toid in this._queue) { + // this._queue[toid] = this._queue[toid].filter(s=>s.id!==id) + // } + // } else this._queue = {} + // } + + // trigger (count,toid) { // trigger all schedules actions that have same timestamp as first in queue + // if (count < 1 || count > this._queue[toid].length) { + // // console.log('done: returning') + // return + // } + // // console.log(count,toid,this._queue[toid]) + // const idx = this._queue[toid].length-count + // const sch = this._queue[toid][idx] + // count-- + // console.log('adding to queue',sch.id) + // this.activeCount++ + // this.queue = this.queue.concat([sch]) // for rx must overwrite not push + // if (sch.simultaneous) setTimeout(() => this.startScheduleAction(sch),0) + // else if(!this.runningCount) setTimeout(() => this.startScheduleAction(sch),0) + // // if (idx===0 && !this.runningCount) this.startScheduleAction(sch) + // else { + // console.log('adding schedule to delay',sch.name,sch.toid) + // this._delayed.push(sch) + // } + // + // this.trigger(count,toid) + // } + addSchedule (sch) { // console.log('####################',sch.id,sch.enabled,'adding###########') this.schedules.push(sch) @@ -291,27 +305,21 @@ class UCIScheduleRunner extends RxClass { return this._disabledSchedules.find(a => a.id === id) } - async startScheduleAction (sch, toid) { - toid = sch.toid || toid - console.log('---- a scheduled run started----',sch.runID) + async startScheduleAction (sch) { + // toid = sch.toid || toid + console.log('---- in RUNNER: scheduled run started----',sch.runID) // TODO check if async or promise - this.runningCount++ - this.running = this.running.concat(this.queue.filter(s=>s.id===sch.id)) - this.queue = this.queue.filter(s=>s.id!==sch.id) sch.startAction(sch.runID) sch.once(sch.runID,removeScheduleFromActiveList.bind(this)) // emitted on abort too function removeScheduleFromActiveList() { - console.log('-----scheduled run is complete or aborted removing from active list ----',sch.runID) - // done remove from active list - this._active[toid]=this._active[toid].filter(s=>s.id!==sch.id) - this.runningCount-- - this.activeCount-- - this.running = this.running.filter(s=>s.id!==sch.id) - if (!this._active[toid].length) { - // none left on that list, remove it - console.log('===========all actions for timeout id', toid, 'are complete===========') - delete this._active[toid] + console.log('-----scheduled run is complete or aborted removing run list ----') + this._running = this._running.filter(s=>s.runID!==sch.runID) + this.running = this._running.map(s=>s.name).join(',') + this.runningCount = this._running.length + + if (!this._queue.length) { + console.log('========== the queue is empty nothing left to run =========') } // console.log('schedule action complete', sch.name,toid, this.runningCount) } diff --git a/src/schedule.js b/src/schedule.js index e6268a1..fdbda4f 100644 --- a/src/schedule.js +++ b/src/schedule.js @@ -103,7 +103,7 @@ class UCISchedule extends RxClass { let intoDayTS = (dt.getSeconds() + (60 * dt.getMinutes()) + (60 * 60 * dt.getHours())) let nowTS = Math.floor(Date.now()/1000) + 1 // last TS and nowTS are within 1 sec so add one let nextTS = nowTS - intoDayTS + baseTS - // console.log(this.name, nextTS-nowTS,'first computed time stamp', nextTS) + // console.log(this.name, nextTS-nowTS,'first computed time stamp', nextTS,nowTS) while (nowTS > nextTS) { // console.log(`now ${nowTS} is beyond next ${nextTS} adding delta ${delta * 60 * 60 } seconds`) nextTS += (delta * 60 * 60 ) // delta in hours, TS in secs @@ -111,7 +111,7 @@ class UCISchedule extends RxClass { // console.log(lastTS,'computed time stamp', nextTS) if(nextTS !== lastTS || lastTS == null || force===true) { // console.log('UPDATED TS>>>>>>>>>>>>>>>>', this.name,hour,minute,delta,nextTS,nextTS-lastTS) - this.nextDT = moment(nextTS*1000).format('ddd, MMM Do h:mm:ss A') + this.nextDT = moment(nextTS*1000).format('ddd, MMM Do h:mm A') this.nextTS = nextTS this._countdown() // console.log(this.nextTS,this.nextDT, this.countdown)