305 lines
9.3 KiB
JavaScript
Executable File
305 lines
9.3 KiB
JavaScript
Executable File
// local imports
|
|
import Rsync from './rsync.js'
|
|
// native imports
|
|
import { EventEmitter } from 'events'
|
|
import { dirname, normalize } from 'path'
|
|
// third party elements
|
|
import merge from '@uci-utils/class-merge'
|
|
import loadYaml from 'load-yaml-file'
|
|
import Conf from 'conf'
|
|
import debounce from 'debounce-fn'
|
|
import pathExists from 'path-exists'
|
|
import { to } from 'await-to-js'
|
|
import yaml from 'js-yaml'
|
|
// uci imports
|
|
import logger from '@uci-utils/logger'
|
|
import Watcher from '@uci-utils/watcher'
|
|
// import Watcher from '../../watcher/src/watcher'
|
|
let log = {} // declare module wide log to be set during construction
|
|
|
|
// class Sync extends merge(Rsync, Emitter) {
|
|
class Sync extends EventEmitter {
|
|
|
|
#config
|
|
// #debounce
|
|
#watcher
|
|
#cwd
|
|
#settings
|
|
#sources
|
|
#excludeFiles
|
|
#exclude
|
|
|
|
|
|
constructor(opts = {}) {
|
|
super()
|
|
log = logger({ package:'@uci/sync'})
|
|
this.#settings = Object.assign({},opts) // need deep clone?
|
|
// this.#debounce = opts.debounce || null
|
|
this.#settings.syncHandler = opts.syncHandler ? opts.syncHandler.bind(this) : defaultSyncHandler.bind(this)
|
|
// TODO if opts include source and destination then call loadJob with them
|
|
console.log('settings',this.#settings)
|
|
this.#config = new Conf(
|
|
{
|
|
configName:this.#settings.configName,
|
|
cwd:this.#settings.configDir,
|
|
fileExtension: 'yaml',
|
|
serialize: yaml.safeDump,
|
|
deserialize: yaml.safeLoad
|
|
}
|
|
)
|
|
this.#config.set('test',"a test")
|
|
console.dir(this.#config.store)
|
|
this.#settings.jobsDir = process.env.SYNC_JOBS_DIR || this.#settings.jobsDir || this.#config.get('jobsDir') || dirname(this.#config.path)
|
|
this.#settings.sshDir = opts.sshDir || this.#config.get('sshDir') || `${this.jobsDir}/ssh`
|
|
this.#settings.optionsDir = this.#settings.optionsDir || this.#config.get('optionsDir') || `${this.jobsDir}/options`
|
|
log.debug({jobsDir:this.jobsDir, sshDir:this.sshDir, optionsDir:this.optionsDir, msg:'configuration file directories'})
|
|
this.#watcher = new Watcher()
|
|
}
|
|
// getters and setters
|
|
|
|
get watching() {return this.#watcher.watching } // watcher active?
|
|
get watcher() { return this.#watcher} // in case one need set a listerner for other purposes
|
|
|
|
get defaultJobsDir() { return this.#config.get('jobsDir', this.jobsDir) }
|
|
set defaultJobsDir(jobsDir) {
|
|
if (jobsDir==null) this.#config.delete('jobsDir')
|
|
else this.#config.set('jobsDir', jobsDir)
|
|
}
|
|
|
|
setConfig(name,val) { this.#config.set(name,val)}
|
|
getConfig(name) { return this.#config.get(name)}
|
|
|
|
async setJobsDir(dir='',save) {
|
|
let res = await pathExists(dir)
|
|
if(res){
|
|
this.#settings.jobsDir=dir
|
|
if(save) this.setDefaultJobsDir(dir)
|
|
return res
|
|
}
|
|
else {
|
|
log.warn(`${dir} path does not exist - Jobs Directory remains ${this.jobsDir}`)
|
|
return res
|
|
}
|
|
}
|
|
|
|
// internal private methods
|
|
|
|
async #readOptionsFile(filePath,type='job') {
|
|
let dir = {job:this.jobsDir,options:this.optionsDir,ssh:this.sshDir}
|
|
let [err,res] = await to(readFile(`${dir[type]}/${filePath}`))
|
|
if (err) {
|
|
[err,res] = await to(readFile(filePath))
|
|
if (err) {
|
|
err = {filePath:normalize(`${this.#cwd}/${dir[type]}/${filePath}`), error:err, type:type, msg:`unable to read ${filePath} options file`}
|
|
log.warn(err)
|
|
return err
|
|
}
|
|
}
|
|
return res
|
|
}
|
|
|
|
async #loadJob (options) {
|
|
if (typeof options ==='string') options = await this.readOptionsFile(options,'job')
|
|
if (!options.error) {
|
|
for (const option in options) {
|
|
if(option === 'optionsFile') {
|
|
let opts = await this.readOptionsFile(options.optionsFile,'options')
|
|
if (!opts.error) {
|
|
for (const opt in Object.keys(opts)) {
|
|
await this.processOption(opt,opts[opt])
|
|
}
|
|
}
|
|
}
|
|
else {
|
|
await this.processOption(option,options[option])
|
|
}
|
|
} // end loop
|
|
this.dry() // dry run by default must .live() .unset('n')
|
|
let success = {options:options, cwd:this.#cwd, command:this.command(), msg: 'job options processed sucessfully'}
|
|
log.info(success)
|
|
return success
|
|
} throw options // options is error
|
|
}
|
|
|
|
|
|
async runJob(options) {
|
|
let res = await this.loadJob(options)
|
|
this.live()
|
|
await this.execute(this.opts)
|
|
return res
|
|
}
|
|
|
|
|
|
// async listJobFiles(dir) {
|
|
// return await getFiles(['**','*.yaml','*.yml'],{ onlyfiles:true, cwd:dir || this.jobsDir})
|
|
// }
|
|
|
|
|
|
|
|
// executes a method on the instance (might be in prototype chain) which may take a value(s)
|
|
async processOption (method, value) {
|
|
log.debug({method:method, value:value, msg:`processing option ${method} with value ${value}`})
|
|
if (typeof(this[method]) === 'function') {
|
|
await this[method](value)
|
|
}
|
|
}
|
|
|
|
// methods ====================
|
|
|
|
live() {
|
|
this.unset('n')
|
|
return this
|
|
}
|
|
|
|
runOpts(options) {
|
|
this.opts = Object.assign(this.opts,options)
|
|
return this
|
|
}
|
|
|
|
set (option=[],value) {
|
|
if (typeof option === 'string') super.set(option,value) // pass through
|
|
else {
|
|
option.forEach( opt => {
|
|
typeof opt==='string' ? super.set(opt) : super.set(...Object.entries(opt).flat())
|
|
})
|
|
}
|
|
return this
|
|
}
|
|
|
|
async watch(cmd) {
|
|
// TODO make into switch ?
|
|
log.debug(`watch command ${cmd}`)
|
|
let opts = {source:this.#sources, excludeFrom:this.#excludeFiles, ignored:this.#exclude }
|
|
|
|
if (isPlainObject(cmd) || cmd==null || (typeof cmd==='boolean' && cmd) || cmd==='init' || cmd==='add') {
|
|
// if (cmd.wait) this.debounce(cmd)
|
|
if ( cmd.init || cmd==='init') await this.#watcher.init(opts)
|
|
return this
|
|
}
|
|
|
|
if (cmd==='remove') {
|
|
this.#watcher.removeListener('changed', this.syncHandler )
|
|
this.#watcher.remove()
|
|
return
|
|
}
|
|
|
|
if (cmd ==='on'|| cmd==='start') {
|
|
this.#watcher.on('changed', this.syncHandler)
|
|
this.#watcher.start(opts)
|
|
return
|
|
}
|
|
|
|
if (cmd==='off' || cmd==='stop' ) {
|
|
this.#watcher.removeListener('changed', this.syncHandler)
|
|
this.#watcher.stop()
|
|
return
|
|
}
|
|
|
|
if (cmd==='pause' ) {
|
|
this.#watcher.removeListener('changed', this.syncHandler)
|
|
return
|
|
}
|
|
|
|
if (cmd==='resume') {
|
|
this.#watcher.on('changed', this.syncHandler)
|
|
return
|
|
}
|
|
}
|
|
|
|
// set debounce(opts) {
|
|
// if (opts==null) this.#debounce=null
|
|
// this.#debounce = opts
|
|
// return this
|
|
// }
|
|
|
|
sshu (file) {
|
|
if (file && (typeof options==='string')) {
|
|
this.shell(`sshu -c ${file}`)
|
|
}
|
|
}
|
|
|
|
async ssh (options) {
|
|
if (!this.destination()) {
|
|
log.warn({ssh:options, msg:'aborting ssh options - destination must be set before processing ssh option'})
|
|
return this
|
|
}
|
|
if (typeof options==='string') { // options is a filepath for ssh options
|
|
let options = await this.readOptionsFile(options,'ssh')
|
|
if (options.error) return this
|
|
}
|
|
|
|
if (options.host)
|
|
{
|
|
let username = options.username ? options.username+'@' : ''
|
|
this.destination(`${username}${options.host}:${this.destination()}`)
|
|
}
|
|
else {
|
|
log.warn({ssh:options, msg:'aborting ssh options, missing host value is required '})
|
|
return this
|
|
}
|
|
let cmd = []
|
|
// TODO allow all openssh client options
|
|
// check if no leading / on path, then check KEYS_DIR if not then add ~/.ssh
|
|
if (options.privateKeyPath) cmd.push(`-i ${options.privateKeyPath}`)
|
|
if (options.port) cmd.push(`-p ${options.port}`)
|
|
if (options.configFile) cmd.push(`-F ${options.configFile}`)
|
|
if (options.quiet) cmd.push('-q')
|
|
if (options.additional) cmd.push(options.additional)
|
|
if (cmd.length !== 0) this.shell(`ssh ${cmd.join(' ')}`)
|
|
|
|
return this
|
|
}
|
|
|
|
async execute(opts={}) {
|
|
opts = Object.assign(this.opts,opts)
|
|
log.info({cmd:this.command(), opts:opts, msg:'running rsync command'})
|
|
const superexecute = super.execute.bind(this)
|
|
return new Promise((resolve, reject) => {
|
|
let status=''
|
|
let errors=''
|
|
this.rsyncpid = superexecute(
|
|
function(err, code, cmd) {
|
|
if (err) {
|
|
log.fatal({error:err, code:code, cmd:cmd, msg:'error during sync'})
|
|
reject ({error:err, code:code, cmd:cmd, msg:'error during sync'})
|
|
}
|
|
if (errors) log.warn({errors: errors, cmd:cmd, msg:'sync ran but with with errors'})
|
|
log.info({cmd:cmd, status:status, msg:'sync run'})
|
|
if (!errors) this.emit('sync',status)
|
|
resolve({cmd:cmd, errors:errors, status:status, msg:'sync run'})
|
|
}.bind(this),
|
|
function(data) {
|
|
status += data.toString()
|
|
if (opts.cli) process.stdout.write(data.toString())
|
|
},
|
|
function(data) {
|
|
errors += data.toString()
|
|
if (opts.cli) process.stderr.write(data.toString())
|
|
}
|
|
)
|
|
})
|
|
}
|
|
|
|
}// end Class Sync
|
|
|
|
export default Sync
|
|
|
|
// default handler 'changed' event handler with optional debounce wrapper
|
|
function defaultSyncHandler(change={}) {
|
|
log.debug({file:change.file, type:change.type, msg:`file ${change.file} was ${change.type}`})
|
|
// if (this.#debounce==null) this.execute(this.opts)
|
|
// else debounce(this.execute.bind(this),this.#debounce)(this.opts)
|
|
}
|
|
|
|
function isPlainObject (obj) {
|
|
return Object.prototype.toString.call(obj) === '[object Object]'
|
|
}
|
|
|
|
// function escapeSpaces (str) {
|
|
// if (typeof str === 'string') {
|
|
// return str.replace(/\b\s/g, '\\ ')
|
|
// } else {
|
|
// return path
|
|
// }
|
|
// }
|