uci-utils-sync/src/sync.js

266 lines
8.1 KiB
JavaScript

// local imports
import Rsync from './rsync'
// native imports
import { EventEmitter as Emitter } from 'events'
import { dirname } from 'path'
// third party elements
import merge from 'aggregation/es6'
import { readFile } from 'fs-read-data'
import Conf from 'conf'
import debounce from 'debounce-fn'
import pathExists from 'path-exists'
import to from 'await-to-js'
// uci imports
import logger from '@uci-utils/logger'
import Watcher from '@uci-utils/watcher'
let log = {} // declare module wide log to be set during construction
class Sync extends merge(Rsync, Emitter) {
constructor(opts = {}) {
super()
log = logger({ package:'@uci/sync'})
this.opts = opts
this._debounce = opts.debounce || null
this.syncHandler = opts.syncHandler || (()=>{})
// TODO if opts include source and destination then call loadJob with them
this.config = new Conf({projectName:'sync'})
this.jobsDir = process.env.SYNC_JOBS_DIR || opts.jobsDir || this.config.get('jobsDir') || dirname(this.config.path)
this.sshDir = opts.sshDir || this.config.get('sshDir') || `${this.jobsDir}/ssh`
this.optionsDir = opts.optionsDir || this.config.get('optionsDir') || `${this.jobsDir}/options`
log.debug({jobsDir:this.jobsDir, sshDir:this.sshDir, optionsDir:this.optionsDir, msg:'configuration file directories'})
this._watcher = new Watcher()
}
// getters and setters
get watching() {return this._watcher.watching } // watcher active?
get watcher() { return this._watcher} // in case one need set a listerner for other purposes
get defaultJobsDir() { return this.config.get('jobsDir', this.jobsDir) }
set defaultJobsDir(jobsDir) {
if (jobsDir==null) this.config.delete('jobsDir')
else this.config.set('jobsDir', jobsDir)
}
setConfig(name,val) { this.config.set(name,val)}
getConfig(name) { return this.config.get(name)}
async setJobsDir(dir='',save) {
let res = await pathExists(dir)
if(res){
this.jobsDir=dir
if(save) this.setDefaultJobsDir()
return res
}
else {
log.warn(`${dir} path does not exist - Jobs Directory remains ${this.jobsDir}`)
return res
}
}
// job and options processing
async runJob(options) {
await this.loadJob(options)
this.live()
this.execute(options)
}
async loadJob (options) {
if (typeof options ==='string') options = await this.readOptionsFile(options,'job')
if (!options.error) {
for (const option in options) {
if(option === 'optionsFile') {
let opts = await this.readOptionsFile(options.optionsFile,'options')
if (!opts.error) {
Object.keys(opts).forEach( async opt => {
await this.processOption(opt,opts[opt])
})
}
}
else {
await this.processOption(option,options[option])
}
} // end loop
this.dry() // dry run by default must .live() .unset('n')
}
}
// async listJobFiles(dir) {
// return await getFiles(['**','*.yaml','*.yml'],{ onlyfiles:true, cwd:dir || this.jobsDir})
// }
async readOptionsFile(filePath,type='job') {
let dir = {job:this.jobsDir,options:this.optionsDir,ssh:this.sshDir}
let [err,res] = await to(readFile(`${dir[type]}/${filePath}`))
if (err) {
[err,res] = await to(readFile(filePath))
if (err) {
err = {filePath:filePath, error:err, type:type, dir:dir[type], msg:`unable to read ${filePath} options file`}
log.warn(err)
return err
}
}
return res
}
// executes a method on the instance (might be in prototype chain) which may take a value(s)
async processOption (method, value) {
log.debug({method:method, value:value, msg:`processing option ${method} with value ${value}`})
if (typeof(this[method]) === 'function') {
await this[method](value)
}
}
// methods ====================
live() {
this.unset('n')
}
set (option=[],value) {
if (typeof option === 'string') super.set(option,value) // pass through
else {
option.forEach( opt => {
typeof opt==='string' ? super.set(opt) : super.set(...Object.entries(opt).flat())
})
}
return this
}
async watch(cmd) {
// TODO make into switch ?
log.debug(`watch command ${cmd}`)
if (isPlainObject(cmd) || cmd==null || (typeof cmd==='boolean' && cmd) || cmd==='init' || cmd==='add') {
if (cmd.wait) this.debounce(cmd)
let opts = {source:this._sources, excludeFrom:this._excludeFiles, ignored:this._exclude }
await this._watcher.init(opts)
this.syncHandler = syncHandler.call(this,log)
return
}
if (cmd==='remove') {
this._watcher.removeListener('changed', this.syncHandler )
this._watcher.remove()
return
}
if (cmd ==='on'|| cmd==='start') {
this._watcher.on('changed', this.syncHandler)
this._watcher.start()
return
}
if (cmd==='off' || cmd==='stop' ) {
this._watcher.removeListener('changed', this.syncHandler)
this._watcher.stop()
return
}
if (cmd==='pause' ) {
this._watcher.removeListener('changed', this.syncHandler)
return
}
if (cmd==='resume') {
this._watcher.on('changed', this.syncHandler)
return
}
// 'changed' event handler with optional debounce wrapper
function syncHandler(log) {
function sync (change) {
log.info({file:change.file, type:change.type, msg:`file ${change.file} was ${change.type}`})
this.execute()
}
log.debug(`in sync make Handler, ${this._debounce}, ${sync}`)
if (this._debounce==null) return sync.bind(this)
return debounce(sync.bind(this),{wait:this.opts.debounce})
}
}
debounce(opts) {
if (opts==null) this._debounce=null
this._debounce = opts
}
sshu (file) {
if (file && (typeof options==='string')) {
this.shell(`sshu -c ${file}`)
}
}
async ssh (options) {
if (!this.destination()) {
log.warn({ssh:options, msg:'aborting ssh options - destination must be set before processing ssh option'})
return this
}
if (typeof options==='string') { // options is a filepath for ssh options
let options = await this.readOptionsFile(options,'ssh')
if (options.error) return this
}
if (options.host)
{
let username = options.username ? options.username+'@' : ''
this.destination(`${username}${options.host}:${this.destination()}`)
}
else {
log.warn({ssh:options, msg:'aborting ssh options, missing host value is required '})
return this
}
let cmd = []
// TODO allow all openssh client options
if (options.privateKeyPath) cmd.push(`-i ${options.privateKeyPath}`)
if (options.port) cmd.push(`-p ${options.port}`)
if (options.configFile) cmd.push(`-F ${options.privateKeyPath}`)
if (cmd) this.shell(cmd.join(' '))
return this
}
async execute(opts={}) {
// log.info({cmd:this.command(), msg:'running rsync command'})
const superexecute = super.execute.bind(this)
return new Promise((resolve, reject) => {
let status
let errors
this.rsyncpid = superexecute(
function(err, code, cmd) {
if (err) {
log.fatal({error:err, code:code, cmd:cmd, msg:'error during sync'})
reject ({error:err, code:code, cmd:cmd, msg:'error during sync'})
}
if (errors) log.warn({errors: errors, cmd:cmd, msg:'sync ran but with with errors'})
log.debug({cmd:cmd, status:status, msg:'sync run'})
resolve({cmd:cmd, errors:errors, status:status, msg:'sync run'})
}, function(data) {
status += data.toString()
if (opts.cli) process.stdout.write(data.toString())
},
function(data) {
errors += data.toString()
if (opts.cli) process.stderr.write(data.toString())
}
)
})
}
}// end Class Sync
export default Sync
function isPlainObject (obj) {
return Object.prototype.toString.call(obj) === '[object Object]'
}
//
// function escapeSpaces (str) {
// if (typeof str === 'string') {
// return str.replace(/\b\s/g, '\\ ')
// } else {
// return path
// }
// }