first commit of scheduler/queue working

This commit is contained in:
David Kebler 2020-05-28 15:26:30 -07:00
parent 861cfdabc0
commit 7a69d7ad07
9 changed files with 444 additions and 0 deletions

33
.eslintrc.js Normal file
View file

@ -0,0 +1,33 @@
module.exports = {
"ecmaFeatures": {
"modules": true,
"spread" : true,
"restParams" : true
},
"env": {
"es6": true,
"node": true,
"mocha": true
},
"parserOptions": {
"ecmaVersion": 2017
,"sourceType": "module"
},
"extends": "eslint:recommended",
"rules": {
"indent": [
"error",
2
],
"no-console": 0,
"semi": ["error", "never"],
"linebreak-style": [
"error",
"unix"
],
"quotes": [
"error",
"single"
]
}
}

2
.gitignore vendored Normal file
View file

@ -0,0 +1,2 @@
/node_modules/
/archive/

8
.npmignore Normal file
View file

@ -0,0 +1,8 @@
tests/
test/
*.test.js
testing/
examples/
*.lock
.eslintrc.js
.travis.yml

36
examples/queue.js Normal file
View file

@ -0,0 +1,36 @@
import Schedule from '../src/schedule.js'
import Queue from '../src/queue.js'
const HOUR = 0
const MINUTE = 0
const DELTA = 6
const DURATION = 2
const DEV = true // if true delta and duration are seconds
const NZONES = 4
let zones = []
for (let zone=0; zone < NZONES; zone++) {
let opts = {dev:DEV, simulanteous:(zone % 2), name:'zone-'+(zone+1),hour:HOUR,minute:MINUTE,delta:DELTA, duration:DURATION}
zones[zone] = new Schedule (opts)
// console.log(zones[zone].name,'>hr:min:delta:duration:simultaneous=',
// zones[zone].hour,
// zones[zone].minute,
// zones[zone].delta,
// zones[zone].duration,
// console.dir(val)
// zones[zone].simultanecous)
// zones[zone].on('update',val => {
// console.log('just updated schedule')
// console.dir(val)
// })
zones[zone].update()
}
const queue = new Queue ({
name: 'irrigation',
// one: true,
schedules: zones,
})
queue.start()
setTimeout(() => {queue.kill()},10*1000)

25
examples/schedule.js Normal file
View file

@ -0,0 +1,25 @@
import Schedule from '../src/schedule.js'
// import Queue from 'src/queue.js'
const zone1 = new Schedule ({
name: 'zone1',
hour: 2,
minute: 0,
delta: 6,
duration: 1
})
console.log('zone1>hr:min:delta:duration=', zone1.hour,zone1.minute,zone1.delta,zone1.duration)
zone1.on('update',val => {
console.dir(val)
})
zone1.update()
zone1.minute=30
zone1.hour=6
zone1.delta=12
setTimeout(()=>{
zone1.update()
process.exit()
},6000)

4
nodemon.json Normal file
View file

@ -0,0 +1,4 @@
{
"ignoreRoot": [".git"],
"watch": ["node_modules/@uci/","src/","examples/","index.js"]
}

25
package.json Normal file
View file

@ -0,0 +1,25 @@
{
"name": "scheduler",
"version": "0.0.1",
"description": "an irrigation style scheduler interfacing with Home Assistant",
"type": "module",
"scripts": {
"start": "node ./example/scheduler.js",
"dev": "UCI_ENV=dev nodemon -r esm ./",
"example:sch": "nodemon examples/schedule.js",
"example:que": "nodemon examples/queue.js",
"dev:debug": "UCI_ENV=dev UCI_LOG_LEVEL=debug nodemon -r esm ./"
},
"author": "",
"license": "ISC",
"dependencies": {
"await-to-js": "^2.1.1",
"clone": "^2.1.2",
"esm": "^3.2.25",
"moment": "^2.26.0",
"moment-duration-format": "^2.3.2"
},
"devDependencies": {
"nodemon": "^1.19.1"
}
}

199
src/queue.js Normal file
View file

@ -0,0 +1,199 @@
import clone from 'clone'
import { EventEmitter } from 'events'
// Queue Multiple Schedulers
class Queue extends EventEmitter {
constructor(opts) {
super(opts)
this.name = opts.name || 'queue'
this.queue = []
if (opts.schedules) opts.schedules.forEach(sch => this.add(sch))
this.one = opts.one
this._active = {} // list of active schedules organized by timeout
this._toID = 0 // id of timeout
this._delayed = []
this._activeCount = 0
this._active = {} // object or lists of active schedules by timeout id
this.init()
}
get countdown() {
return (this.queue[0]||{}).countdown
}
get names(){
return this.queue.map(sch => {
return {name: sch.name, countdown: sch.countdown, next: sch.nextDT, simultaneous:sch.simultaneous}
})
}
get nextName(){
return (this.queue[0]||{}).name
}
init() {
this.on('active', active=>{
if (!active && this._delayed.length) {
const sch=this._delayed.shift()
// console.log('a delayed action now proceeding====>', sch.name, sch.toid)
start.call(this,sch)
}
})
}
setTimeout () {
// clearTimeout(this._timeout)
const next = this.queue[0] || {}
this._toID++
console.log('+++++++++++++++++++++ next timeout',this._toID, 'in', Math.round(next.countdownMS/1000),'++++++++++++++++++++++++++')
this._timeout = setTimeout(() => {
console.log('**********************timeout triggered', this._toID, '********************************')
let dur = 0
// don't mutate original queue, make copy for active list
const queue = clone(this.queue)
this._active[this._toID]=[]
queue.forEach(sch =>{ // add first and any others set for same time
dur += sch.durationMS
// console.log('check if active', sch.name, sch.countdownMS, dur)
if (sch.countdownMS <= dur) this._active[this._toID].push(sch)
if (sch.one) this.removeSchedule(sch.id)
})
// this._active[this._toID].next = 0
this.trigger(this._active[this._toID].length,this._toID)
this.emit('active',this._activeCount)
if (!this.one) this.start()
},next.countdownMS)
}
stop () {
console.log('---------------------stopping queue, active schedules will complete--------------------------')
console.log(this._timeout)
clearTimeout(this._timeout)
}
kill () {
console.log('---------------------stopping queue, active schedules will be terminated as well--------------------')
clearTimeout(this._timeout)
// console.log(this._active)
for (let list in this._active) {
this._active[list].forEach(sch=>{
clearTimeout(sch._stopTimeout)
sch.stopAction()}
)
}
}
start() {
this.update()
this.setTimeout()
}
trigger (count,toid) { // trigger all schedules actions that have same timestamp as first in queue
if (count < 1 || count > this._active[toid].length) {
// console.log('done: returning')
return
}
// console.log(count,toid,this._active[toid])
const idx = this._active[toid].length-count
const sch = this._active[toid][idx]
sch.toid = toid
// let next = this._active[toid].next
// console.log(idx,sch)
count--
if (idx===0 && !this._activeCount) start.call(this,sch)
if (sch.simultaneous) start.call(this,sch)
else {
// console.log('adding schedule to delay',sch.name,sch.toid)
this._delayed.push(sch)
// console.log(this._delayed.length)
}
this.trigger(count,toid)
}
get nextTS() {
return (this.queue[0]||{}).nextTS
}
get nextDT(){
return (this.queue[0]||{}).nextDT
}
add (sch) {
if (getBaseClass(sch) === 'UCISchedule') {
this.queue.push(sch)
this.sort()
}
else {
console.log('ERROR: passed schedule was not a instance of UCISchedule')
console.log(sch.name)
}
}
removeSchedule (id) {
this.queue = this.queue.filter(a => a.id !== id)
}
getSchedule (id) {
return this.queue.find(a => a.id === id)
}
updateSchedule (id) {
this.getSchedule(id).update()
}
update () {
for(let sch of this.queue) {
// console.log('updating', sch.name)
sch.update()
}
this.sort()
}
sort () {
this.queue.sort((a,b) => a.nextTS - b.nextTS)
}
}
export default Queue
export {Queue}
function start (sch, toid) {
toid = sch.toid || toid
console.log('-----timeoutid',toid, 'start action-----')
sch.startAction()
this._activeCount++
this.emit('active',1)
sch._stopTimeout = setTimeout(() =>{ // stop action timeout
console.log('---timeoutid',toid, 'stop action----')
sch.stopAction()
this._activeCount--
// done remove from active list
this._active[toid]=this._active[toid].filter(s=>s.id!==sch.id)
if (!this._active[toid].length) {
// none left on that list, remove it
console.log('===========actions for timeout', toid, 'are complete===========')
delete this._active[toid]
}
// console.log('schedule action complete', sch.name,toid, this._activeCount)
this.emit('active',this._activeCount)
},sch.durationMS)
}
function getBaseClass(targetClass){
if(targetClass instanceof Object){
let baseClass = targetClass
while (baseClass){
const newBaseClass = Object.getPrototypeOf(baseClass)
if(newBaseClass && newBaseClass !== Object && newBaseClass.name){
baseClass = newBaseClass
}else{
break
}
}
return baseClass.constructor.name
}
}

112
src/schedule.js Normal file
View file

@ -0,0 +1,112 @@
// import to from 'await-to-js'
import { EventEmitter } from 'events'
import moment from 'moment'
import 'moment-duration-format'
// Single Device Scheduler User instance for each unique action
// see Schedulers to create a group with "collision protection"
class UCISchedule extends EventEmitter {
constructor(opts) {
super(opts)
this.name = opts.name || 'test_schedule'
this.id = opts.id || this.name
this.hour = opts.hour || 0
this.dev = opts.dev || process.env.UCI_ENV==='dev'
this.minute = opts.minute || 0
this.delta = opts.delta || 6 // time to next trigger in hours
this.duration = opts.duration || 10 // in minutes
// computed values
this.nextTS = 0 // the next trigger time in seconds
this.enabled = opts.enabled || true
// this.active = false // if schedule is currently active
// this.on('active',active=>this.active=active)
this.simultaneous = opts.simulanteous // if true delay this scheduled event until current scheduled event completes
// this.lastActiveTS
this._startAction = opts.startAction || defaultStartAction.bind(this) // single (or array) of function(s)
this._stopAction = opts.stopAction || defaultStopAction.bind(this) // single (or array) of function(s)
}
// async readyWait () {
// return new Promise( resolve => {
// if (!this.active) {
// resolve()
// return}
// this.once('active',active=>{if (!active) resolve()})
// })
// }
get countdown() {
return moment.duration(parseInt((this.nextTS - Math.floor(Date.now()/1000))),'seconds').format('hh:mm:ss')
}
// get countdownActive() {
// if (this.active) return moment.duration(parseInt((this.duration*60 + this.nextTS - Math.floor(Date.now()/1000))),'seconds').format('hh:mm:ss')
//
// }
get countdownMS() {
return (this.nextTS*1000 - Date.now())
}
get durationMS() {
return (this.duration*(this.dev ? 1 :60)*1000)
}
get nextDT() { return moment(this.nextTS*1000).format('ddd, MMM Do h:mm A') }
// get activeDuration() {
// return moment.duration((Math.floor(Date.now()/1000) - this.nextTS),'seconds').format('hh:mm:ss')
// }
// get activeRemaining() {
// return moment.duration((this.nextTS - Math.floor(Date.now()/1000) + this.duration * 60),'seconds').format('hh:mm:ss')
// }
update() { // all TS in seconds
let baseTS = this.hour*3600+this.minute*60
let dt = new Date()
let intoDayTS = (dt.getSeconds() + (60 * dt.getMinutes()) + (60 * 60 * dt.getHours()))
let nowTS = Math.floor(Date.now()/1000)
this.nextTS = nowTS - intoDayTS + baseTS
if (!this.dev) {
while (nowTS > this.nextTS) {
// console.log(`now ${nowTS} is beyond next ${this.nextTS} adding delta ${this.delta} hours`)
this.nextTS += this.delta * 3600
}
} else this.nextTS = nowTS + this.delta
// console.log(baseTS,intoDayTS,nowTS, nowTS-intoDayTS,this.nextTS)
this.emit('update',{id:this.id, ts:this.nextTS,dt:this.nextDT,countdown:this.countdown,simultaneous:this.simultaneous})
}
registerStartAction(func) {
if (!Array.isArray(func)) func = [func]
this.startAction = func
}
registerStopAction(func) {
if (!Array.isArray(func)) func = [func]
this.stopAction = func
}
async startAction (data) {
this._startAction(data)
}
async stopAction (data) {
this._stopAction(data)
}
}
function defaultStartAction () {
console.log('start action for', this.name,moment(Date.now()).format('ddd, MMM Do h:mm:ss A'))
}
function defaultStopAction () {
console.log('stop action for', this.name,moment(Date.now()).format('ddd, MMM Do h:mm:ss A'))
}
export default UCISchedule
export {UCISchedule as Schedule}