From e9fce89436f4017194151e5297fe8316719146ef Mon Sep 17 00:00:00 2001 From: David Kebler Date: Wed, 17 Jun 2020 12:02:53 -0700 Subject: [PATCH] 0.0.8 move to using rxclass results in refactor of properties etc. refactor timer/countdown enabled - start/stop added methods to cancel/abort changed name of timeout to runnerTimeout schedule TS updates now reactive on storing list add enabled schedule method to call when schedule emits change in enabled refactor of start/stop action methods refactored TS update for a schedule many changes not all listed --- examples/runner.js | 177 ++++++++++--------- examples/schedule.js | 57 +++++-- nodemon.json | 2 +- package.json | 4 +- src/runner.js | 393 +++++++++++++++++++++++++++++-------------- src/schedule.js | 144 +++++++++++----- 6 files changed, 511 insertions(+), 266 deletions(-) diff --git a/examples/runner.js b/examples/runner.js index 0c344b1..81b71a4 100644 --- a/examples/runner.js +++ b/examples/runner.js @@ -3,15 +3,106 @@ import Runner from '../src/runner.js' import delay from 'delay' -const HOUR = 0 -const MINUTE = 0 -const DELTA = 5 -const DURATION = 4 -const DEV = true // if true delta and duration are seconds -const NZONES = 1 +const dt = new Date() +const NZONES = 6 +const ZONE = { + duration: 5, + settings:{ + enabled: true, + simultaneous: false, + timing: { + hour: dt.getHours(), + minute: dt.getMinutes()+(dt.getSeconds()+5)/60, + delta: 12 + } + } +} + +; +(async () => { + + let zones = [] + let schs = [] + for (let zone=0; zone < NZONES; zone++) { + let opts = Object.assign({},ZONE) + opts.name = 'sch-zone-'+(zone+1) + opts.id= 'zone-'+(zone+1), + zones[zone] = opts + let min = opts.settings.timing.minute + // console.log(zones[zone].id,opts.min) + let last = ((zone+1) % 3 ? 0 : 1) + // console.log(opts.id, 'last', last, min) + // console.log('zone:',opts) + schs[zone] = new Schedule (opts) + opts.settings.timing.minute = min + last * zone * .1 + // schs[zone].rxSubscribe('nextTS','TS',function (val) { + // console.log(schs[zone].name, 'subscription: nextTS was updated', val) + // }) + // console.log('register action !!!!!!!!!!!!!!!!!!!!!') + // console.log(schs[zone].name,zones[zone]) + schs[zone].registerStartAction(startAsync,zones[zone]) + schs[zone].registerStopAction(stop,zones[zone]) + // console.log('register action done!!!!!!!!!!!!!!!!!!!!!') + } + + const runner = new Runner ({ + name: 'example', + // one: true, + schedules: schs, + }) + + runner.rxSubscribe('countdown','runner',value => { console.log(value)}) + + runner.rxSubscribe('running','runner',schs => { console.log('running schedules:',schs.map(sch=>sch.name).join(','))}) + + runner.rxSubscribe('queue','runner',schs => { console.log('queued schedules:',schs.map(sch=>sch.name).join(','))}) + + runner.rxSubscribe('activeCount','runner',value => { console.log('active schedule count', value)}) + runner.rxSubscribe('runningCount','runner',value => { console.log('running schedule count', value)}) + + // setInterval(() => { + // console.log('resetting mintue') + // runner.nextSchedule.minute+=1 + // // console.log(runner.nextSchedule.get('settings')) + // }, + // 60*1000 + // ) + runner.enabled=true + setTimeout(()=>runner.enabled=false,6000) + +})().catch(err => { + console.error('FATAL: UNABLE TO START SYSTEM!\n',err) +// process.kill(process.pid, 'SIGTERM') +}) + + +// ACTIONS // MUST be promise returning and NOT an arrow function +function stop (zone,activeSch) { + console.log('stop action: aborting run for action id',activeSch.runID, 'zone',zone.id) + this.emit('abort:'+activeSch.runID) +} + +async function startAsync (zone,activeSch) { + // console.log('async start function') + console.log('-----starting-------',zone.id, activeSch.runID) + // console.log('---waiting-----', zone.duration,'secs duration to complete') + const tick = setInterval(()=>{ + console.log('duration tick') + }, 1000) + const duration = delay(zone.duration*1000) + this.once('abort:'+activeSch.runID,()=> { + console.log('aborting run>>>', activeSch.runID) + duration.clear() + }) + await duration + clearInterval(tick) + this.removeAllListeners('abort:'+activeSch.runID) + console.log('stopping', activeSch.runID) +} + function startPromise (zone,activeSch) { console.log('promise return start function', this.evName) return new Promise(resolve => { @@ -32,77 +123,3 @@ function startPromise (zone,activeSch) { }, zone.duration*1000) }) } - -function stop (zone,activeSch) { - console.log('aborting run for action id',activeSch.runID, 'zone',zone.id) - this.emit('abort:'+activeSch.runID) -} - -async function startAsync (zone,activeSch) { - // console.log(arguments) - console.log('async start function') - console.log('-----starting-------',zone.id, activeSch.runID) - console.log('---waiting-----', zone.duration,'secs duration to complete') - const tick = setInterval(()=>{ - console.log('duration tick') - }, 1000) - const duration = delay(zone.duration*1000) - this.once('abort:'+activeSch.runID,()=> { - console.log('aborting run>>>', activeSch.runID) - duration.clear() - }) - await duration - clearInterval(tick) - this.removeAllListeners('abort:'+activeSch.runID) - console.log('stopping', activeSch.runID) -} - - -(async () => { - - let zones = [] - let schs = [] - for (let zone=0; zone < NZONES; zone++) { - let opts = {dev:DEV, simultaneous:(zone % 2), name:'sch-zone-'+(zone+1),id:'zone-'+(zone+1) ,hour:HOUR,minute:MINUTE,delta:DELTA, duration:DURATION} - zones[zone] = opts - console.log(zones[zone].id,'>hr:min:delta:duration:simultaneous=', - zones[zone].hour, - zones[zone].minute, - zones[zone].delta, - zones[zone].duration, - zones[zone].simultaneous) - - schs[zone] = new Schedule (opts) - schs[zone].on('update',val => { - console.log('just updated schedule') - console.dir(val) - }) - schs[zone].update() - - console.log('register action !!!!!!!!!!!!!!!!!!!!!') - schs[zone].registerStartAction(startAsync,zones[zone]) - schs[zone].registerStopAction(stop,zones[zone]) - console.log('register action done!!!!!!!!!!!!!!!!!!!!!') - - } - - const runner = new Runner ({ - name: 'irrigation', - // one: true, - schedules: schs, - }) - - runner.start() - const cd = setInterval(()=>{ - console.log('runner, next schedule trigger in', runner.countdown) - },1000) - setTimeout(() => { - runner.stop(true) - clearInterval(cd)}, - 15*1000 - ) - -})().catch(err => { - console.error('FATAL: UNABLE TO START SYSTEM!\n',err) -// process.kill(process.pid, 'SIGTERM') -}) diff --git a/examples/schedule.js b/examples/schedule.js index 6468ae4..f8f5c95 100644 --- a/examples/schedule.js +++ b/examples/schedule.js @@ -1,6 +1,6 @@ import Schedule from '../src/schedule.js' - - +// +// function demoStartAction (duration) { return new Promise(resolve => { console.log('starting schedule',this.name) @@ -20,33 +20,56 @@ function demoStartAction (duration) { } function demoStopAction () { - console.log('forced stop action for', this.name) + console.log('!!!!!forced stop action for', this.name) this.emit('abort') } +const DEMO = { + duration: 5, + schedule:{ + enabled: true, + simultaneous: false, + timing: { + hour: 6, + minute: 0, + delta: 12 + } + } +} + const sch = new Schedule ({ name: 'demo', - dev:true, - hour: 2, - minute: 0, - delta: 6, + settings : DEMO.schedule }) -sch.on('update',val => { - console.dir(val) +sch.rxSubscribe('nextTS','TS',function (val) { + console.log('subscription: nextTS was updated', val, sch.nextDT) }) -sch.update() -const DURATION = 5 -sch.registerStartAction(demoStartAction,DURATION) +console.log(sch.get('settings')) + +sch.on('updated', function (x) { + console.log('via emiiter: updated', x) +}) + +// sch.update(true) +console.log('changed minutes to 33') +sch.settings.timing.minute=33 + +// console.log(sch.get('settings')) + + +sch.registerStartAction(demoStartAction,DEMO.duration) sch.registerStopAction(demoStopAction) -sch.startAction() +sch.startAction() // start now -setTimeout(()=>{ +// show aborting +setTimeout(()=>{ // start again later + console.log('-------now show aborting a schedule run-----') sch.startAction() -},DURATION*1000) +},DEMO.duration*1000) setTimeout(()=>{ - sch.stopAction() // abort the run -},(DURATION*2-2)*1000) + sch.stopAction() // abort later run early +},(DEMO.duration*2-2)*1000) diff --git a/nodemon.json b/nodemon.json index 72863e2..8bfadd3 100644 --- a/nodemon.json +++ b/nodemon.json @@ -1,4 +1,4 @@ { "ignoreRoot": [".git"], - "watch": ["node_modules/@uci/","src/","examples/","index.js"] + "watch": ["node_modules/@uci/","node_modules/@uci-utils/","src/","examples/","index.js"] } diff --git a/package.json b/package.json index dbda4f7..bf1c1f2 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@uci-utils/scheduler", - "version": "0.0.7", + "version": "0.0.8", "description": "an interval reoccuring style scheduler and runner", "main": "src/index.js", "scripts": { @@ -10,7 +10,7 @@ "author": "", "license": "ISC", "dependencies": { - "await-to-js": "^2.1.1", + "@uci-utils/rx-class": "^0.1.2", "clone": "^2.1.2", "moment": "^2.26.0", "moment-duration-format": "^2.3.2" diff --git a/src/runner.js b/src/runner.js index 91e3672..6227657 100644 --- a/src/runner.js +++ b/src/runner.js @@ -1,101 +1,188 @@ import clone from 'clone' -import { EventEmitter } from 'events' +import RxClass from '@uci-utils/rx-class' + +const DEFAULT_SETTINGS = { + enabled: false, + timer: true, // will update countdown and other dynamic values every minute or second +} + +let update_all=false +let init = true // Queue Multiple Schedulers -class UCIScheduleRunner extends EventEmitter { +class UCIScheduleRunner extends RxClass { constructor(opts) { super(opts) this.name = opts.name || 'queue' + this.set('state',false) // if runner is actually started + this.set('timer',0) // opts.timer != null ? opts.timer : DEFAULT_SETTINGS.timer) + // console.log('timer', this.timer, DEFAULT_SETTINGS.timer) this.schedules = [] - if (opts.schedules) opts.schedules.forEach(sch => this.addSchedule(sch)) - this.one = opts.one this._toID = 0 // id of timeout - this._delayed = [] - this._activeCount = 0 - this._active = {} // object or lists of active schedules by timeout id - this._pausedSchedules = [] - this.init() - } - - get countdown() { - return (this.schedules[0]||{}).countdown - } - - get names(){ - return this.schedules.map(sch => { - return {name: sch.name, countdown: sch.countdown, next: sch.nextDT, simultaneous:sch.simultaneous} + this._delayed = [] // active schedules that are delayed + this._active = {} // object or lists of all active (running or queued) schedules by timeout id + this._disabledSchedules = [] + this.set('running',[]) // list of currently running schedules + this.set('queue',[]) // list of queued names awaiting run + this.set('nextTS',0) + this.set('nextDT','') + this.set('nextName','') + this.set('activeCount',0) + this.set('countdown','') + this.set('runningCount',0) + // console.log('passed schedule',opts.schedules.map(sch=>sch.id)) + if (opts.schedules) opts.schedules.forEach(sch => { + this.addSchedule(sch) }) - } - get nextName(){ - return (this.schedules[0]||{}).name - } - - get nextTS() { - return (this.schedules[0]||{}).nextTS - } - - get nextDT(){ - return (this.schedules[0]||{}).nextDT - } - - - init() { - - this._delayedListener = this.on('active', active => { - if (!active && this._delayed.length) { + this._delayedListener = this.rxSubscribe('activeCount','ready',count => { + if (!count && this._delayed.length) { const sch=this._delayed.shift() console.log('a delayed action now proceeding====>', sch.name, sch.toid) this.startScheduleAction(sch) } }) - } + this.rxSubscribe('timer','countdown', (interval) => { + // console.log('timer state changed', interval) + clearInterval(this.countdownInterval) + if (interval) this.countdownInterval = setInterval(this._countdown.bind(this),interval*1000) + else this.countdown = '' + }) - // main timeout based on the first schedule to trigger - setTimeout () { - const next = this.schedules[0] || {} - this._toID++ - console.log('+++++++++++++++++++++ next timeout',this._toID, 'in', Math.round(next.countdownMS/1000),'++++++++++++++++++++++++++') - this._timeout = setTimeout(() => { - console.log('**********************timeout triggered', this._toID, '********************************') - // let dur = 0 - // don't mutate original queue, make copy for active list - const queue = clone(this.schedules) - this._active[this._toID]=[] - queue.forEach(sch =>{ // add first and any others set for same time - // dur += sch.durationMS - // console.log('check if active', sch.name, sch.countdownMS, dur) - if (sch.countdownMS <= 1000) this._active[this._toID].push(sch) - if (sch.one) this.removeSchedule(sch.id) - }) - // this._active[this._toID].next = 0 - this.trigger(this._active[this._toID].length,this._toID) - this.emit('active',this._activeCount) - if (!this.one) this.start() - },next.countdownMS) - } - - stop (now) { - console.log('---------------------stopping runner------------------------') - clearTimeout(this._timeout) - if (now) { - console.log('clearing all delayed start schedules', this._delayed.map(sch=>sch.name)) - this._delayed = [] - console.log('stopping all in progress schedules') - for (let list in this._active) { - this._active[list].forEach(sch=>{ - console.log('active schedule',sch.name) - if (sch.stopAction) sch.stopAction() - else console.log(sch.name,'no stop action registered can not force active schedule to stop')} - ) + this.set('enabled', + opts.enabled != null ? opts.enabled: DEFAULT_SETTINGS.enabled, + { + subscribe:{ + name:'start', + handler: (start) => { + console.log('enabled', start) + if (start) this.start() + else if (!init) this.stop() + }} } + ) + init=false + + } // end construtor + + _countdown(now) { + const first = this.schedules[0] + // console.log('first', first.name) + if (!first) return + this.countdown = now ? first._countdown() : first.countdown + // console.log(this.countdown) + return this.countdown + } + + get _countdownMS() { + return (this.schedules[0]||{}).countdownMS + } + + // updateActiveNamesList(){ + // console.log('!!!!!!!!!!!!!!active list changed',Object.keys(this._active).length,this._delayed.length) + // let names = '' + // Object.entries(this._active).map(list => { + // names = names + list[1].map(sch=>sch.name).join(',') + // }) + // this.active = {running:names, queue:this._queue.map(sch=>sch.name).join(',')} + // console.log(this.active) + // return this.active + // } + + // init() { + // + // + // } // end init + + async start() { + console.log('-----------starting scheduler runner-----------------') + this.state = true + await this.update() + this._getEnabledSchedule().forEach(sch=> sch.timer=1.1) // fire off timers + this.timer = 1.1 + + this.rxSubscribe('nextTS','resetTO', () => { + // console.log('^^^^runner nextTS changed, resetting runner timeout') + this._countdown(true) + this.runnerTimeout() + }) + } + + stop (now=true) { + console.log('--------------stopping scheduler runner-------------------') + this.state = false + this.rxUnsubscribe('nextTS','resetTO') + this.timer = 0 + this._getEnabledSchedule().forEach(sch=> sch.timer=0) + this.nextDT = '' + this.countdown = '' + this.nextName = '' + this.runnerTimeout('stop') + this.removeActive() + } + + async abortRunning(id) { + console.log('aborting running schedule(s)', id||'') + const running = id ? (this.running.filter(sch=>sch.id===id) ||[]) : this.running + for (let sch of running) { + if (sch.stopAction) await sch.stopAction() + else console.log(sch.name,'no stop action registered can not force active schedule to stop') } } - start() { - this.update() - this.setTimeout() + removeFromQueue(id) { + console.log('removing queued schedule(s)', id||'') + const curr = this.queue.length + if (id) this.queue = this.queue.filter(s=>s.id!==id) + else this.queue = [] + this.activeCount-= curr-this.queue.length + } + + removeFromDelayed(id) { + console.log('removing delayed schedule(s)', id||'') + if (id) this._delayed = this.delayed.filter(s=>s.id!==id) + else this._delayed = [] + } + + async removeActive(id) { + this.removeFromDelayed(id) + await this.abortRunning(id) + this.removeFromQueue(id) + console.log('clearing list activated schedules') + if (id) { + for (let toid in this._active) { + this._active[toid] = this._active[toid].filter(s=>s.id!==id) + } + } else this._active = {} + } + + // main timeout based on the first schedule to trigger + runnerTimeout (stop) { + clearTimeout(this._runnerTimeout) + if (stop) {console.log('stopped, timeout cleared');return} // pass anything to stop + if (!this.nextSchedule) return + this._toID++ + console.log(this._toID, '+++++ next schedule trigger in', this.countdown,this.nextName,'++++') + // console.log('current schedules run order\n',this.schedules.map(sch=>`${sch.nextDT} : ${sch.name}`)) + this._runnerTimeout = setTimeout(async () => { + console.log('**********************timeout triggered for ID', this._toID, '****************************') + this._active[this._toID]=[] + let updates = [] + this.schedules.forEach(sch =>{ // add first and any others set for ~ same time + if (sch.countdownMS <= 1000 ) { // should catch any with same TS + updates.push(sch.update.bind(sch)) + const activeSch = clone(sch) + activeSch.toid = this._toID + activeSch.runID = `${this._toID}:${sch.id}` + this._active[this._toID].push(activeSch) + } + }) + this._active[this._toID].next = 0 + this.trigger(this._active[this._toID].length,this._toID) + updates.forEach(update=>update()) // only need update the ones that were active + // await this.update() // updates all _enabled + },this._countdownMS) } trigger (count,toid) { // trigger all schedules actions that have same timestamp as first in queue @@ -106,25 +193,40 @@ class UCIScheduleRunner extends EventEmitter { // console.log(count,toid,this._active[toid]) const idx = this._active[toid].length-count const sch = this._active[toid][idx] - sch.toid = toid - sch.runID = `${toid}:${sch.id}` count-- + this.activeCount++ + this.queue = this.queue.concat([sch]) // for rx must overwrite not push if (sch.simultaneous) this.startScheduleAction(sch) - else if (!this._activeCount) this.startScheduleAction(sch) - // if (idx===0 && !this._activeCount) this.startScheduleAction(sch) + else if (!this.runningCount) this.startScheduleAction(sch) + // if (idx===0 && !this.runningCount) this.startScheduleAction(sch) else { console.log('adding schedule to delay',sch.name,sch.toid) this._delayed.push(sch) - console.log(this._delayed.length) } + this.trigger(count,toid) } - // schedule instance addSchedule (sch) { + // console.log('####################',sch.id,sch.enabled,'adding###########') + this.schedules.push(sch) + this.enableSchedule(sch.id,sch.enabled) if (getBaseClass(sch) === 'UCISchedule') { - this.schedules.push(sch) - this.sort() + sch.rxSubscribe('settings.enabled','runner',function (sch) { + // console.log('runner: schedule enabled change',value) + this.enableSchedule(sch) + }.bind(this,sch)) + // console.log(sch.id,'enabled subscriptions',sch.$get('_rx_.props.settings.enabled.subs')) + sch.rxSubscribe('nextTS','runner', function () { + // console.log('current runner timestamp', this.nextTS) + if (!update_all && this.state) { + // console.log('RUNNER:',sch.id,'has updated its time stamp', sch.nextTS, sch.nextDT) + this.sort() + } + // console.log('updated runner schedules',this.nextTS,this.nextDT,this.nextName) + }.bind(this) + ) + // sch.update(true) } else { console.log('ERROR: passed schedule was not a instance of UCISchedule') @@ -135,74 +237,117 @@ class UCIScheduleRunner extends EventEmitter { // schedule id removeSchedule (id) { this.schedules = this.schedules.filter(a => a.id !== id) + this._disabledSchedules = this._disabledSchedules.filter(a => a.id !== id) + } + + // enables dissables the schdule in the runner + enableSchedule(sch) { + if (typeof sch ==='string') sch = this.getSchedule(sch) + if (!(sch || {}).id) return + // console.log(sch.id,'<<<<---------------enable/disable schedue--------->>>>',sch.enabled) + // console.log('enabled schedules', this.schedules.map(sch=>sch.id)) + // console.log('disabled schedules', this._disabledSchedules.map(sch=>sch.id)) + // if (state == null) state = true + if (sch.enabled) { + if (this._disabledSchedules.find(a => a.id === sch.id)) { + this.schedules.push(sch) + // remove from disabled list + this._disabledSchedules = this._disabledSchedules.filter(a => a.id !== sch.id) + } + } else { + if(this.schedules.find(a => a.id === sch.id)) { + this._disabledSchedules.push(sch) + this.schedules = this.schedules.filter(a => a.id !== sch.id) + } + } + // if (this._running) this.sort() + this.sort() + // console.log('after') + // console.log('enabled schedules', this.schedules.map(sch=>sch.id)) + // console.log('disabled schedules', this._disabledSchedules.map(sch=>sch.id)) } getSchedule (id) { - return this.schedules.find(a => a.id === id) + if (!id) return this.schedules.concat(this._disabledSchedules) + return this._getEnabledSchedule(id) || this._getDisabledSchedule(id) } + get nextSchedule () { + return this.schedules[0] + } + + // force an update updateSchedule (id) { this.getSchedule(id).update() } + _getEnabledSchedule (id) { + if (!id) return this.schedules + return this.schedules.find(a => a.id === id) + } + + _getDisabledSchedule (id) { + if (!id) return this._disabledSchedules + return this._disabledSchedules.find(a => a.id === id) + } + async startScheduleAction (sch, toid) { toid = sch.toid || toid - console.log('----',sch.runID,'scheduled run started-----') + console.log('---- a scheduled run started----',sch.runID) // TODO check if async or promise - this._activeCount++ - this.emit('active',this._activeCount) // runner has active schedules + this.runningCount++ + this.running = this.running.concat(this.queue.filter(s=>s.id===sch.id)) + this.queue = this.queue.filter(s=>s.id!==sch.id) sch.startAction(sch.runID) sch.once(sch.runID,removeScheduleFromActiveList.bind(this)) // emitted on abort too function removeScheduleFromActiveList() { - console.log('-----',sch.runID,'run complete or aborted removing from active list ----') - this._activeCount-- + console.log('-----scheduled run is complete or aborted removing from active list ----',sch.runID) // done remove from active list this._active[toid]=this._active[toid].filter(s=>s.id!==sch.id) + this.runningCount-- + this.activeCount-- + this.running = this.running.filter(s=>s.id!==sch.id) if (!this._active[toid].length) { // none left on that list, remove it - console.log('===========actions for timeout', toid, 'are complete===========') + console.log('===========all actions for timeout id', toid, 'are complete===========') delete this._active[toid] } - // console.log('schedule action complete', sch.name,toid, this._activeCount) - this.emit('active',this._activeCount) // runner has active schedules ?? + // console.log('schedule action complete', sch.name,toid, this.runningCount) } } - disableSchedule(id) { - this.enableSchedule(id,'false') - } - - enableSchedule(id,state) { - if (state == null) state = true - if (state) { - const sch = this._pausedSchedules.find(a => a.id === id) - if (sch) { - this.addSchedule(sch) - sch.enabled = true - sch.emit('enabled',true) - } - } else { - const sch = this.schedules.find(a => a.id === id) - if (sch) { - this._pausedSchedules.push(sch) - this.removeSchedule(id) - sch.enabled = false - sch.emit('enabled',false) - } + // this will force update all schedules enabled or disabled + async update () { + update_all = true + // console.log('updating all',this.schedules.length, update_all) + for(let sch of this._getEnabledSchedule()) { + await sch.update(true) + // console.log('done updating', sch.id) } + console.log('updated all schedules') + update_all = false + await this.sort() } - update () { - for(let sch of this.schedules) { - // console.log('updating', sch.name) - sch.update() - } - this.sort() - } - - sort () { + async sort (force) { + const oldFirst = (this.nextSchedule || {}).id + const curTS = this.nextTS || 0 + // console.log('current first',oldFirst) + if (this.nextSchedule == null) return + // console.log('Before Sorting>>>>>>>>>>>', this.schedules.map(sch => { return { name: sch.name, ts: sch.nextTS }}) ) this.schedules.sort((a,b) => a.nextTS - b.nextTS) + // console.log('sorted', this.schedules.map(sch => { return { name: sch.name, ts: sch.nextTS }}) ) + const first = this.nextSchedule || {} + // console.log('apply sort',first.id,oldFirst,curTS,first.nextTS) + if (first && first.nextTS) { + if (oldFirst.id !== first.id || first.nextTS > curTS || force) { + // console.log('!!!!!!!!!!sort resulted a change of nextTS', first.nextTS, first.name) + this.nextName = first.name + this.nextDT = first.nextDT + this.nextTS = first.nextTS + } + } } } // end class diff --git a/src/schedule.js b/src/schedule.js index 6e387ed..e6268a1 100644 --- a/src/schedule.js +++ b/src/schedule.js @@ -1,66 +1,131 @@ // import to from 'await-to-js' -import { EventEmitter } from 'events' +import RxClass from '@uci-utils/rx-class' import moment from 'moment' import 'moment-duration-format' +// import merge from 'deepmerge' // Single Device Scheduler User instance for each unique action -// see Schedulers to create a group with "collision protection" +const DEFAULT_SETTINGS = { + timing:{ + hour:6, + minute:0, + delta:12 + }, + enabled:true, + simultaneous:false +} -class UCISchedule extends EventEmitter { - constructor(opts) { +class UCISchedule extends RxClass { + constructor(opts={}) { + // console.log('schedule options\n', opts) super(opts) - this.name = opts.name || 'test' + // console.log('schedule options\n', opts) + this.name = opts.name || 'a periodic schedule' this.id = opts.id || this.name.replace(/ /g, '_') this.desc = opts.desc - this.hour = opts.hour || 0 - this.dev = opts.dev - this.minute = opts.minute || 0 - this.delta = opts.delta || 6 // time to next trigger in hours - // computed values - this.nextTS = 0 // the next trigger time in seconds - this.enabled = opts.enabled==null ? true : opts.enabled - this.active = false - this.simultaneous = opts.simultaneous // if true = run even if there are other active schedules - // this.lastActiveTS + // this.dev = opts.dev == null ? false : opts.dev + // ,DEFAULT_SETTINGS + this.set('settings',Object.assign({},DEFAULT_SETTINGS,opts.settings)) + this.set('nextTS',0) // the next trigger time in seconds + this.set('nextDT','') // the next trigger time in seconds + this.set('active',false) + this.set('countdown','') + // console.log(this.name, 'done setup') // Actions MUST be promise returning or async this._startAction = opts.startAction // || defaultStartAction.bind(this) // single (or array) of function(s) this._stopAction = opts.stopAction // || defaultStopAction.bind(this) // single (or array) of function(s) - this.update() + this._countdown = this._countdown.bind(this) + + Object.keys(this.settings.timing).forEach(prop=>{ + this.rxSubscribe(['settings.timing',prop], 'update',this.update.bind(this)) + // console.log('added subs', prop, this._rxGetObj(['settings.timing',prop]).subs) + }) + + this.rxSubscribe('settings.enabled','state', (value) => { + // console.log('schedule enabled change', ev) + this.timer=1.1 // any value but 1 or 60 will trigger + if (value) this.update() + }) + + this.set('timer',1) + this.rxSubscribe('timer','countdown', (value) => { + // console.log('timer interval', value, this.settings.enabled) + clearInterval(this.countdownInterval) + if (this.settings.enabled && value ) this.countdownInterval = setInterval(this._countdown.bind(this),value*1000) + else this.countdown = '' + }) } - get countdown() { - return moment.duration(parseInt((this.nextTS - Math.floor(Date.now()/1000))),'seconds').format('hh:mm:ss') + _countdown(unit) { + const secs = this.nextTS - Math.floor(Date.now()/1000) + unit = unit ? unit : secs/60 < 3 ? 'seconds' : 'minutes' + this.timer = unit === 'seconds' ? 1 : 60 + const format = unit === 'seconds'? 'mm:ss': 'dd hh:mm' + this.countdown = `${moment.duration( + parseInt( secs / (unit === 'seconds' ? 1 : 60) ) + ,unit + ).format(format)}\u00a0\u00a0(${format})` + // console.log(secs, this.timer, this.id,this.countdown) + return this.countdown } get countdownMS() { return (this.nextTS*1000 - Date.now()) } + // make simple access for settings + get hour () { return this.settings.timing.hour } + set hour (hour) { + this.settings.timing.hour= hour + } - get nextDT() { return moment(this.nextTS*1000).format('ddd, MMM Do h:mm A') } + get minute () { return this.settings.timing.minute } + set minute (minute) { + this.settings.timing.minute= minute + } - update() { // all TS in seconds - // console.log('updating',this.hour,this.minute,this.delta, this.dev) - let baseTS = this.hour*3600+this.minute*60 + get delta () { return this.settings.timing.delta } + set delta (delta) { + this.settings.timing.delta= delta + } + + get enabled () { return this.settings.enabled } + set enabled (enable) { + if (enable == null) enable=true // enabled by default + this.settings.enabled=enable + } + + async update(force) { // all TS in seconds + const { hour, minute, delta } = this.get('settings.timing') + const lastTS = this.nextTS + // console.log('>>>>>>>>>>>>>',this.name, 'updating',hour,minute,delta,lastTS) + let baseTS = hour*3600+minute*60 let dt = new Date() let intoDayTS = (dt.getSeconds() + (60 * dt.getMinutes()) + (60 * 60 * dt.getHours())) - let nowTS = Math.floor(Date.now()/1000) - this.nextTS = nowTS - intoDayTS + baseTS - if (!this.dev) { - while (nowTS > this.nextTS) { - // console.log(`now ${nowTS} is beyond next ${this.nextTS} adding delta ${this.delta} hours`) - this.nextTS += this.delta * 3600 - } - } else this.nextTS = nowTS + this.delta - // console.log(baseTS,intoDayTS,nowTS, nowTS-intoDayTS,this.nextTS) - this.emit('update',{id:this.id, ts:this.nextTS,dt:this.nextDT,countdown:this.countdown,simultaneous:this.simultaneous}) + let nowTS = Math.floor(Date.now()/1000) + 1 // last TS and nowTS are within 1 sec so add one + let nextTS = nowTS - intoDayTS + baseTS + // console.log(this.name, nextTS-nowTS,'first computed time stamp', nextTS) + while (nowTS > nextTS) { + // console.log(`now ${nowTS} is beyond next ${nextTS} adding delta ${delta * 60 * 60 } seconds`) + nextTS += (delta * 60 * 60 ) // delta in hours, TS in secs + } + // console.log(lastTS,'computed time stamp', nextTS) + if(nextTS !== lastTS || lastTS == null || force===true) { + // console.log('UPDATED TS>>>>>>>>>>>>>>>>', this.name,hour,minute,delta,nextTS,nextTS-lastTS) + this.nextDT = moment(nextTS*1000).format('ddd, MMM Do h:mm:ss A') + this.nextTS = nextTS + this._countdown() + // console.log(this.nextTS,this.nextDT, this.countdown) + this.emit('updated',{id:this.id,nextTS:nextTS,nextDT:this.nextDT, timing:this.get('settings.timing')}) + } } // MUST! be async or promise TODO make that check registerStartAction(func) { if (typeof func!=='function') throw new Error('start action must be a function and be async or promise returning') + // console.log(arguments) const args = [...arguments] args.shift() - console.log('registering start action with these arguments to be passed\n',args) + // console.log(this.name,this.id,'registering start action with these arguments to be passed\n',args) this._startAction = func.bind(this,...args) } @@ -68,29 +133,24 @@ class UCISchedule extends EventEmitter { if (typeof func!=='function') throw new Error('stop action must be a function') const args = [...arguments] args.shift() - console.log('registering stop action with these arguments to be passed\n',args) + // console.log('registering stop action with these arguments to be passed\n',args) this._stopAction = func.bind(this,...args) } async startAction () { - if (this._stopAction) { + if (this._startAction) { this.active = true - this.emit('active',true) - // console.log('passed run id name, prop run id',ev, this.runID) - // console.log(this) - await this._startAction(this) + await this._startAction(this) // this is active schedule copy this.active = false - this.emit('active',false) this.emit(this.runID) // runner listens for this ID of active schedule to know when it's done } else console.log('no registered start action, register a start action!') } async stopAction () { if (this._stopAction) { - await this._stopAction(this) + await this._stopAction(this) // this is active schedule copy this.emit(this.runID) this.active = false - this.emit('active',false) } else console.log('no registered stop action, unable to abort, action will complete normally') }