// local imports import Rsync from './rsync' // native imports import { EventEmitter as Emitter } from 'events' import { dirname } from 'path' // third party elements import merge from 'aggregation/es6' import { readFile } from 'fs-read-data' import Conf from 'conf' import getFiles from 'globby' import pathExists from 'path-exists' import to from 'await-to-js' // uci imports import logger from '@uci/logger' let log = {} // declare module wide log to be set during construction class Sync extends merge(Rsync, Emitter) { constructor(opts = {}) { super() log = logger({ package:'@uci/sync'}) this.opts = opts this.cli = opts.cli this.config = new Conf({projectName:'sync'}) this.jobsDir = process.env.SYNC_JOBS_DIR || opts.jobsDir || this.config.get('jobsDir') || dirname(this.config.path) this.sshDir = opts.sshDir || this.config.get('sshDir') || `${this.jobsDir}/ssh` this.optionsDir = opts.optionsDir || this.config.get('optionsDir') || `${this.jobsDir}/options` log.debug({configsDir:this.configsDir, msg:'configuration files directory'}) this.jobs = [] } setConfig(name,val) { this.config.set(name,val)} getConfig(name) { return this.config.get(name)} async setJobsDir(dir='',save) { let res = await pathExists(dir) if(res){ this.jobsDir=dir if(save) this.setDefaultJobsDir() return res } else { log.warn(`${dir} path does not exist - Jobs Directory remains ${this.jobsDir}`) return res } } setDefaultJobsDir() { this.config.set('jobsDir', this.jobsDir)} getDefaultJobsDir() { this.config.get('jobsDir', this.jobsDir) } async listJobFiles(dir) { return await getFiles(['**','*.yaml','*.yml'],{ onlyfiles:true, cwd:dir || this.jobsDir}) } async readOptionsFile(filePath,type='job') { let dir = {job:this.jobsDir,options:this.optionsDir,ssh:this.sshDir} let [err,res] = await to(readFile(`${dir[type]}/${filePath}`)) if (err) { [err,res] = await to(readFile(filePath)) if (err) { err = {filePath:filePath, error:err, type:type, dir:dir[type], msg:`unable to read ${filePath} options file`} log.warn(err) return err } } return res } live() { this.unset('n') } sshu (file) { if (file && (typeof options==='string')) { this.shell(`"sshu -c ${file} "`) } } async ssh (options) { if (!this.destination()) { log.warn({ssh:options, msg:'aborting ssh options - destination must be set before processing ssh option'}) return this } if (typeof options==='string') { // options is a filepath for ssh options let options = await this.readOptionsFile(options,'ssh') if (options.error) return this } if (options.host) { let username = options.username ? options.username+'@' : '' this.destination(`${username}${options.host}:${this.destination()}`) } else { log.warn({ssh:options, msg:'aborting ssh options, missing host value is required '}) return this } let cmd = [] // TODO allow all openssh client options if (options.privateKeyPath) cmd.push(`-i ${options.privateKeyPath}`) if (options.port) cmd.push(`-p ${options.port}`) if (options.configFile) cmd.push(`-F ${options.privateKeyPath}`) if (cmd) this.shell(cmd.join(' ')) return this } set (option={},value) { if (typeof option === 'string') super.set(option,value) // pass through else { // if (!Array.isArray(option)) { option = [option] } option.forEach( opt => { console.log('---',option,opt) typeof opt==='string' ? super.set(opt) : super.set(...Object.entries(opt).flat()) }) } return this } async loadJob (options) { if (typeof options ==='string') options = await this.readOptionsFile(options,'job') if (!options.error) { for (const option in options) { if(option === 'optionsFile') { let opts = await this.readOptionsFile(options.optionsFile,'options') if (!opts.error) { Object.keys(opts).forEach( opt => { this.processOption(opt,opts[opt]) }) } } else { this.processOption(option,options[option]) } } // end loop this.dry() // dry run by default must .live() .unset('n') } } // executes a method on the instance (might be in prototype chain) which may take a value(s) processOption (method, value) { if (typeof(this[method]) === 'function') { this[method](value) } } queueJobs () {} runJobs () {} async execute(opts={}) { log.info({cmd:this.command(), msg:'running rsync command'}) const superexecute = super.execute.bind(this) return new Promise((resolve, reject) => { let status let errors this.rsyncpid = superexecute( function(err, code, cmd) { if (err) { log.fatal({error:err, code:code, cmd:cmd, msg:'error during sync'}) reject ({error:err, code:code, cmd:cmd, msg:'error during sync'}) } if (errors) log.warn({errors: errors, cmd:cmd, msg:'sync ran but with with errors'}) log.info({cmd:cmd, status:status, msg:'sync run'}) resolve({cmd:cmd, errors:errors, status:status, msg:'sync run'}) }, function(data) { status += data.toString() if (opts.cli) process.stdout.write(data.toString()) }, function(data) { errors += data.toString() if (opts.cli) process.stderr.write(data.toString()) } ) }) } }// end Class Sync export default Sync // function isPlainObject (obj) { // return Object.prototype.toString.call(obj) === '[object Object]' // } // // function escapeSpaces (str) { // if (typeof str === 'string') { // return str.replace(/\b\s/g, '\\ ') // } else { // return path // } // }