// local imports import Rsync from './rsync.js' // native imports import { EventEmitter } from 'events' import { dirname, normalize } from 'path' // third party elements import merge from '@uci-utils/class-merge' import loadYaml from 'load-yaml-file' import Conf from 'conf' import debounce from 'debounce-fn' import pathExists from 'path-exists' import { to } from 'await-to-js' import yaml from 'js-yaml' // uci imports import logger from '@uci-utils/logger' import Watcher from '@uci-utils/watcher' // import Watcher from '../../watcher/src/watcher' let log = {} // declare module wide log to be set during construction // class Sync extends merge(Rsync, Emitter) { class Sync extends EventEmitter { #config // #debounce #watcher #cwd #settings #sources #excludeFiles #exclude constructor(opts = {}) { super() log = logger({ package:'@uci/sync'}) this.#settings = Object.assign({},opts) // need deep clone? // this.#debounce = opts.debounce || null this.#settings.syncHandler = opts.syncHandler ? opts.syncHandler.bind(this) : defaultSyncHandler.bind(this) // TODO if opts include source and destination then call loadJob with them console.log('settings',this.#settings) this.#config = new Conf( { configName:this.#settings.configName, cwd:this.#settings.configDir, fileExtension: 'yaml', serialize: yaml.safeDump, deserialize: yaml.safeLoad } ) this.#config.set('test',"a test") console.dir(this.#config.store) this.#settings.jobsDir = process.env.SYNC_JOBS_DIR || this.#settings.jobsDir || this.#config.get('jobsDir') || dirname(this.#config.path) this.#settings.sshDir = opts.sshDir || this.#config.get('sshDir') || `${this.jobsDir}/ssh` this.#settings.optionsDir = this.#settings.optionsDir || this.#config.get('optionsDir') || `${this.jobsDir}/options` log.debug({jobsDir:this.jobsDir, sshDir:this.sshDir, optionsDir:this.optionsDir, msg:'configuration file directories'}) this.#watcher = new Watcher() } // getters and setters get watching() {return this.#watcher.watching } // watcher active? get watcher() { return this.#watcher} // in case one need set a listerner for other purposes get defaultJobsDir() { return this.#config.get('jobsDir', this.jobsDir) } set defaultJobsDir(jobsDir) { if (jobsDir==null) this.#config.delete('jobsDir') else this.#config.set('jobsDir', jobsDir) } setConfig(name,val) { this.#config.set(name,val)} getConfig(name) { return this.#config.get(name)} async setJobsDir(dir='',save) { let res = await pathExists(dir) if(res){ this.#settings.jobsDir=dir if(save) this.setDefaultJobsDir(dir) return res } else { log.warn(`${dir} path does not exist - Jobs Directory remains ${this.jobsDir}`) return res } } // internal private methods async #readOptionsFile(filePath,type='job') { let dir = {job:this.jobsDir,options:this.optionsDir,ssh:this.sshDir} let [err,res] = await to(readFile(`${dir[type]}/${filePath}`)) if (err) { [err,res] = await to(readFile(filePath)) if (err) { err = {filePath:normalize(`${this.#cwd}/${dir[type]}/${filePath}`), error:err, type:type, msg:`unable to read ${filePath} options file`} log.warn(err) return err } } return res } async #loadJob (options) { if (typeof options ==='string') options = await this.readOptionsFile(options,'job') if (!options.error) { for (const option in options) { if(option === 'optionsFile') { let opts = await this.readOptionsFile(options.optionsFile,'options') if (!opts.error) { for (const opt in Object.keys(opts)) { await this.processOption(opt,opts[opt]) } } } else { await this.processOption(option,options[option]) } } // end loop this.dry() // dry run by default must .live() .unset('n') let success = {options:options, cwd:this.#cwd, command:this.command(), msg: 'job options processed sucessfully'} log.info(success) return success } throw options // options is error } async runJob(options) { let res = await this.loadJob(options) this.live() await this.execute(this.opts) return res } // async listJobFiles(dir) { // return await getFiles(['**','*.yaml','*.yml'],{ onlyfiles:true, cwd:dir || this.jobsDir}) // } // executes a method on the instance (might be in prototype chain) which may take a value(s) async processOption (method, value) { log.debug({method:method, value:value, msg:`processing option ${method} with value ${value}`}) if (typeof(this[method]) === 'function') { await this[method](value) } } // methods ==================== live() { this.unset('n') return this } runOpts(options) { this.opts = Object.assign(this.opts,options) return this } set (option=[],value) { if (typeof option === 'string') super.set(option,value) // pass through else { option.forEach( opt => { typeof opt==='string' ? super.set(opt) : super.set(...Object.entries(opt).flat()) }) } return this } async watch(cmd) { // TODO make into switch ? log.debug(`watch command ${cmd}`) let opts = {source:this.#sources, excludeFrom:this.#excludeFiles, ignored:this.#exclude } if (isPlainObject(cmd) || cmd==null || (typeof cmd==='boolean' && cmd) || cmd==='init' || cmd==='add') { // if (cmd.wait) this.debounce(cmd) if ( cmd.init || cmd==='init') await this.#watcher.init(opts) return this } if (cmd==='remove') { this.#watcher.removeListener('changed', this.syncHandler ) this.#watcher.remove() return } if (cmd ==='on'|| cmd==='start') { this.#watcher.on('changed', this.syncHandler) this.#watcher.start(opts) return } if (cmd==='off' || cmd==='stop' ) { this.#watcher.removeListener('changed', this.syncHandler) this.#watcher.stop() return } if (cmd==='pause' ) { this.#watcher.removeListener('changed', this.syncHandler) return } if (cmd==='resume') { this.#watcher.on('changed', this.syncHandler) return } } // set debounce(opts) { // if (opts==null) this.#debounce=null // this.#debounce = opts // return this // } sshu (file) { if (file && (typeof options==='string')) { this.shell(`sshu -c ${file}`) } } async ssh (options) { if (!this.destination()) { log.warn({ssh:options, msg:'aborting ssh options - destination must be set before processing ssh option'}) return this } if (typeof options==='string') { // options is a filepath for ssh options let options = await this.readOptionsFile(options,'ssh') if (options.error) return this } if (options.host) { let username = options.username ? options.username+'@' : '' this.destination(`${username}${options.host}:${this.destination()}`) } else { log.warn({ssh:options, msg:'aborting ssh options, missing host value is required '}) return this } let cmd = [] // TODO allow all openssh client options // check if no leading / on path, then check KEYS_DIR if not then add ~/.ssh if (options.privateKeyPath) cmd.push(`-i ${options.privateKeyPath}`) if (options.port) cmd.push(`-p ${options.port}`) if (options.configFile) cmd.push(`-F ${options.configFile}`) if (options.quiet) cmd.push('-q') if (options.additional) cmd.push(options.additional) if (cmd.length !== 0) this.shell(`ssh ${cmd.join(' ')}`) return this } async execute(opts={}) { opts = Object.assign(this.opts,opts) log.info({cmd:this.command(), opts:opts, msg:'running rsync command'}) const superexecute = super.execute.bind(this) return new Promise((resolve, reject) => { let status='' let errors='' this.rsyncpid = superexecute( function(err, code, cmd) { if (err) { log.fatal({error:err, code:code, cmd:cmd, msg:'error during sync'}) reject ({error:err, code:code, cmd:cmd, msg:'error during sync'}) } if (errors) log.warn({errors: errors, cmd:cmd, msg:'sync ran but with with errors'}) log.info({cmd:cmd, status:status, msg:'sync run'}) if (!errors) this.emit('sync',status) resolve({cmd:cmd, errors:errors, status:status, msg:'sync run'}) }.bind(this), function(data) { status += data.toString() if (opts.cli) process.stdout.write(data.toString()) }, function(data) { errors += data.toString() if (opts.cli) process.stderr.write(data.toString()) } ) }) } }// end Class Sync export default Sync // default handler 'changed' event handler with optional debounce wrapper function defaultSyncHandler(change={}) { log.debug({file:change.file, type:change.type, msg:`file ${change.file} was ${change.type}`}) // if (this.#debounce==null) this.execute(this.opts) // else debounce(this.execute.bind(this),this.#debounce)(this.opts) } function isPlainObject (obj) { return Object.prototype.toString.call(obj) === '[object Object]' } // function escapeSpaces (str) { // if (typeof str === 'string') { // return str.replace(/\b\s/g, '\\ ') // } else { // return path // } // }