uci-utils-scheduler/src/runner.js

375 lines
13 KiB
JavaScript

import clone from 'clone'
import RxClass from '@uci-utils/rx-class'
const DEFAULT_SETTINGS = {
enabled: false,
timer: true, // will update countdown and other dynamic values every minute or second
}
let update_all=false
let init = true
// Queue Multiple Schedulers
class UCIScheduleRunner extends RxClass {
constructor(opts) {
super(opts)
this.name = opts.name || 'queue'
this.set('state',false) // if runner is actually started
this.set('timer',0) // opts.timer != null ? opts.timer : DEFAULT_SETTINGS.timer)
// console.log('timer', this.timer, DEFAULT_SETTINGS.timer)
this.schedules = []
this._toID = 0 // id of timeout
this._delayed = [] // active schedules that are delayed
this._active = {} // object or lists of all active (running or queued) schedules by timeout id
this._disabledSchedules = []
this.set('running',[]) // list of currently running schedules
this.set('queue',[]) // list of queued names awaiting run
this.set('nextTS',0)
this.set('nextDT','')
this.set('nextName','')
this.set('activeCount',0)
this.set('countdown','')
this.set('runningCount',0)
// console.log('passed schedule',opts.schedules.map(sch=>sch.id))
if (opts.schedules) opts.schedules.forEach(sch => {
this.addSchedule(sch)
})
this._delayedListener = this.rxSubscribe('activeCount','ready',count => {
if (!count && this._delayed.length) {
const sch=this._delayed.shift()
console.log('a delayed action now proceeding====>', sch.name, sch.toid)
this.startScheduleAction(sch)
}
})
this.rxSubscribe('timer','countdown', (interval) => {
// console.log('timer state changed', interval)
clearInterval(this.countdownInterval)
if (interval) this.countdownInterval = setInterval(this._countdown.bind(this),interval*1000)
else this.countdown = ''
})
this.set('enabled',
opts.enabled != null ? opts.enabled: DEFAULT_SETTINGS.enabled,
{
subscribe:{
name:'start',
handler: (start) => {
console.log('enabled', start)
if (start) this.start()
else if (!init) this.stop()
}}
}
)
init=false
} // end construtor
_countdown(now) {
const first = this.schedules[0]
// console.log('first', first.name)
if (!first) return
this.countdown = now ? first._countdown() : first.countdown
// console.log(this.countdown)
return this.countdown
}
get _countdownMS() {
return (this.schedules[0]||{}).countdownMS
}
// updateActiveNamesList(){
// console.log('!!!!!!!!!!!!!!active list changed',Object.keys(this._active).length,this._delayed.length)
// let names = ''
// Object.entries(this._active).map(list => {
// names = names + list[1].map(sch=>sch.name).join(',')
// })
// this.active = {running:names, queue:this._queue.map(sch=>sch.name).join(',')}
// console.log(this.active)
// return this.active
// }
// init() {
//
//
// } // end init
async start() {
console.log('-----------starting scheduler runner-----------------')
this.state = true
await this.update()
this._getEnabledSchedule().forEach(sch=> sch.timer=1.1) // fire off timers
this.timer = 1.1
this.rxSubscribe('nextTS','resetTO', () => {
// console.log('^^^^runner nextTS changed, resetting runner timeout')
this._countdown(true)
this.runnerTimeout()
})
}
stop (now=true) {
console.log('--------------stopping scheduler runner-------------------')
this.state = false
this.rxUnsubscribe('nextTS','resetTO')
this.timer = 0
this._getEnabledSchedule().forEach(sch=> sch.timer=0)
this.nextDT = ''
this.countdown = ''
this.nextName = ''
this.runnerTimeout('stop')
this.removeActive()
}
async abortRunning(id) {
console.log('aborting running schedule(s)', id||'')
const running = id ? (this.running.filter(sch=>sch.id===id) ||[]) : this.running
for (let sch of running) {
if (sch.stopAction) await sch.stopAction()
else console.log(sch.name,'no stop action registered can not force active schedule to stop')
}
}
removeFromQueue(id) {
console.log('removing queued schedule(s)', id||'')
const curr = this.queue.length
if (id) this.queue = this.queue.filter(s=>s.id!==id)
else this.queue = []
this.activeCount-= curr-this.queue.length
}
removeFromDelayed(id) {
console.log('removing delayed schedule(s)', id||'')
if (id) this._delayed = this.delayed.filter(s=>s.id!==id)
else this._delayed = []
}
async removeActive(id) {
this.removeFromDelayed(id)
await this.abortRunning(id)
this.removeFromQueue(id)
console.log('clearing list activated schedules')
if (id) {
for (let toid in this._active) {
this._active[toid] = this._active[toid].filter(s=>s.id!==id)
}
} else this._active = {}
}
// main timeout based on the first schedule to trigger
runnerTimeout (stop) {
clearTimeout(this._runnerTimeout)
if (stop) {console.log('stopped, timeout cleared');return} // pass anything to stop
if (!this.nextSchedule) return
this._toID++
console.log(this._toID, '+++++ next schedule trigger in', this.countdown,this.nextName,'++++')
// console.log('current schedules run order\n',this.schedules.map(sch=>`${sch.nextDT} : ${sch.name}`))
this._runnerTimeout = setTimeout(async () => {
console.log('**********************timeout triggered for ID', this._toID, '****************************')
this._active[this._toID]=[]
let updates = []
this.schedules.forEach(sch =>{ // add first and any others set for ~ same time
if (sch.countdownMS <= 1000 ) { // should catch any with same TS
updates.push(sch.update.bind(sch))
const activeSch = clone(sch)
activeSch.toid = this._toID
activeSch.runID = `${this._toID}:${sch.id}`
this._active[this._toID].push(activeSch)
}
})
this._active[this._toID].next = 0
this.trigger(this._active[this._toID].length,this._toID)
updates.forEach(update=>update()) // only need update the ones that were active
// await this.update() // updates all _enabled
},this._countdownMS)
}
trigger (count,toid) { // trigger all schedules actions that have same timestamp as first in queue
if (count < 1 || count > this._active[toid].length) {
// console.log('done: returning')
return
}
// console.log(count,toid,this._active[toid])
const idx = this._active[toid].length-count
const sch = this._active[toid][idx]
count--
this.activeCount++
this.queue = this.queue.concat([sch]) // for rx must overwrite not push
if (sch.simultaneous) this.startScheduleAction(sch)
else if (!this.runningCount) this.startScheduleAction(sch)
// if (idx===0 && !this.runningCount) this.startScheduleAction(sch)
else {
console.log('adding schedule to delay',sch.name,sch.toid)
this._delayed.push(sch)
}
this.trigger(count,toid)
}
addSchedule (sch) {
// console.log('####################',sch.id,sch.enabled,'adding###########')
this.schedules.push(sch)
this.enableSchedule(sch.id,sch.enabled)
if (getBaseClass(sch) === 'UCISchedule') {
sch.rxSubscribe('settings.enabled','runner',function (sch) {
// console.log('runner: schedule enabled change',value)
this.enableSchedule(sch)
}.bind(this,sch))
// console.log(sch.id,'enabled subscriptions',sch.$get('_rx_.props.settings.enabled.subs'))
sch.rxSubscribe('nextTS','runner', function () {
// console.log('current runner timestamp', this.nextTS)
if (!update_all && this.state) {
// console.log('RUNNER:',sch.id,'has updated its time stamp', sch.nextTS, sch.nextDT)
this.sort()
}
// console.log('updated runner schedules',this.nextTS,this.nextDT,this.nextName)
}.bind(this)
)
// sch.update(true)
}
else {
console.log('ERROR: passed schedule was not a instance of UCISchedule')
console.log(sch.name)
}
}
// schedule id
removeSchedule (id) {
this.schedules = this.schedules.filter(a => a.id !== id)
this._disabledSchedules = this._disabledSchedules.filter(a => a.id !== id)
}
// enables dissables the schdule in the runner
enableSchedule(sch) {
if (typeof sch ==='string') sch = this.getSchedule(sch)
if (!(sch || {}).id) return
// console.log(sch.id,'<<<<---------------enable/disable schedue--------->>>>',sch.enabled)
// console.log('enabled schedules', this.schedules.map(sch=>sch.id))
// console.log('disabled schedules', this._disabledSchedules.map(sch=>sch.id))
// if (state == null) state = true
if (sch.enabled) {
if (this._disabledSchedules.find(a => a.id === sch.id)) {
this.schedules.push(sch)
// remove from disabled list
this._disabledSchedules = this._disabledSchedules.filter(a => a.id !== sch.id)
}
} else {
if(this.schedules.find(a => a.id === sch.id)) {
this._disabledSchedules.push(sch)
this.schedules = this.schedules.filter(a => a.id !== sch.id)
}
}
// if (this._running) this.sort()
this.sort()
// console.log('after')
// console.log('enabled schedules', this.schedules.map(sch=>sch.id))
// console.log('disabled schedules', this._disabledSchedules.map(sch=>sch.id))
}
getSchedule (id) {
if (!id) return this.schedules.concat(this._disabledSchedules)
return this._getEnabledSchedule(id) || this._getDisabledSchedule(id)
}
get nextSchedule () {
return this.schedules[0]
}
// force an update
updateSchedule (id) {
this.getSchedule(id).update()
}
_getEnabledSchedule (id) {
if (!id) return this.schedules
return this.schedules.find(a => a.id === id)
}
_getDisabledSchedule (id) {
if (!id) return this._disabledSchedules
return this._disabledSchedules.find(a => a.id === id)
}
async startScheduleAction (sch, toid) {
toid = sch.toid || toid
console.log('---- a scheduled run started----',sch.runID)
// TODO check if async or promise
this.runningCount++
this.running = this.running.concat(this.queue.filter(s=>s.id===sch.id))
this.queue = this.queue.filter(s=>s.id!==sch.id)
sch.startAction(sch.runID)
sch.once(sch.runID,removeScheduleFromActiveList.bind(this)) // emitted on abort too
function removeScheduleFromActiveList() {
console.log('-----scheduled run is complete or aborted removing from active list ----',sch.runID)
// done remove from active list
this._active[toid]=this._active[toid].filter(s=>s.id!==sch.id)
this.runningCount--
this.activeCount--
this.running = this.running.filter(s=>s.id!==sch.id)
if (!this._active[toid].length) {
// none left on that list, remove it
console.log('===========all actions for timeout id', toid, 'are complete===========')
delete this._active[toid]
}
// console.log('schedule action complete', sch.name,toid, this.runningCount)
}
}
// this will force update all schedules enabled or disabled
async update () {
update_all = true
// console.log('updating all',this.schedules.length, update_all)
for(let sch of this._getEnabledSchedule()) {
await sch.update(true)
// console.log('done updating', sch.id)
}
console.log('updated all schedules')
update_all = false
await this.sort()
}
async sort (force) {
const oldFirst = (this.nextSchedule || {}).id
const curTS = this.nextTS || 0
// console.log('current first',oldFirst)
if (this.nextSchedule == null) return
// console.log('Before Sorting>>>>>>>>>>>', this.schedules.map(sch => { return { name: sch.name, ts: sch.nextTS }}) )
this.schedules.sort((a,b) => a.nextTS - b.nextTS)
// console.log('sorted', this.schedules.map(sch => { return { name: sch.name, ts: sch.nextTS }}) )
const first = this.nextSchedule || {}
// console.log('apply sort',first.id,oldFirst,curTS,first.nextTS)
if (first && first.nextTS) {
if (oldFirst.id !== first.id || first.nextTS > curTS || force) {
// console.log('!!!!!!!!!!sort resulted a change of nextTS', first.nextTS, first.name)
this.nextName = first.name
this.nextDT = first.nextDT
this.nextTS = first.nextTS
}
}
}
} // end class
export default UCIScheduleRunner
export { UCIScheduleRunner as Runner }
function getBaseClass(targetClass){
if(targetClass instanceof Object){
let baseClass = targetClass
while (baseClass){
const newBaseClass = Object.getPrototypeOf(baseClass)
if(newBaseClass && newBaseClass !== Object && newBaseClass.name){
baseClass = newBaseClass
}else{
break
}
}
return baseClass.constructor.name
}
}