0.0.6 Fix issue with sync handler used by watcher not calling properly.

new other minor fix, and error handling
master
David Kebler 2019-02-17 14:45:08 -08:00
parent 7cd943422e
commit 98d6365b61
3 changed files with 29 additions and 31 deletions

View File

@ -1,4 +1,4 @@
import Sync from '../src/sync'
import Sync from '@uci-utils/sync'
import onDeath from 'ondeath'
(async () => {

View File

@ -1,10 +1,10 @@
{
"name": "@uci-utils/sync",
"version": "0.0.5",
"version": "0.0.6",
"description": "module to copy, maintain, and launch hardware modules on other machines",
"main": "src/index.js",
"bin": {
"ssh": "./bin/sync.js"
"syncu": "./bin/sync.js"
},
"scripts": {
"sync": "node -r esm ./bin/sync",
@ -33,7 +33,7 @@
"dependencies": {
"@uci-utils/logger": "0.0.13",
"@uci-utils/read-lines": "^0.2.1",
"@uci-utils/watcher": "^0.2.3",
"@uci-utils/watcher": "^0.2.4",
"aggregation": "^1.2.5",
"await-to-js": "^2.1.1",
"conf": "^2.2.0",
@ -41,7 +41,7 @@
"esm": "^3.2.5",
"fs-read-data": "^1.0.4",
"path-exists": "^3.0.0",
"yargs": "^13.1.0"
"yargs": "^13.2.0"
},
"devDependencies": {
"chai": "^4.2.0",

View File

@ -2,7 +2,7 @@
import Rsync from './rsync'
// native imports
import { EventEmitter as Emitter } from 'events'
import { dirname } from 'path'
import { dirname, normalize } from 'path'
// third party elements
import merge from 'aggregation/es6'
import { readFile } from 'fs-read-data'
@ -22,7 +22,7 @@ class Sync extends merge(Rsync, Emitter) {
log = logger({ package:'@uci/sync'})
this.opts = opts
this._debounce = opts.debounce || null
this.syncHandler = opts.syncHandler || (()=>{})
this.syncHandler = opts.syncHandler ? opts.syncHandler.bind(this) : defaultSyncHandler.bind(this)
// TODO if opts include source and destination then call loadJob with them
this.config = new Conf({projectName:'sync'})
this.jobsDir = process.env.SYNC_JOBS_DIR || opts.jobsDir || this.config.get('jobsDir') || dirname(this.config.path)
@ -60,9 +60,10 @@ class Sync extends merge(Rsync, Emitter) {
// job and options processing
async runJob(options) {
await this.loadJob(options)
let res = await this.loadJob(options)
this.live()
await this.execute(this.opts)
return res
}
async loadJob (options) {
@ -72,9 +73,9 @@ class Sync extends merge(Rsync, Emitter) {
if(option === 'optionsFile') {
let opts = await this.readOptionsFile(options.optionsFile,'options')
if (!opts.error) {
Object.keys(opts).forEach( async opt => {
for (const opt in Object.keys(opts)) {
await this.processOption(opt,opts[opt])
})
}
}
}
else {
@ -82,10 +83,10 @@ class Sync extends merge(Rsync, Emitter) {
}
} // end loop
this.dry() // dry run by default must .live() .unset('n')
let success = {options:options, cwd:__dirname,msg: 'job options processed sucessfully'}
let success = {options:options, cwd:this._cwd, command:this.command(), msg: 'job options processed sucessfully'}
log.info(success)
return success
} return new Error(options) // options is error
} throw options // options is error
}
@ -99,7 +100,7 @@ class Sync extends merge(Rsync, Emitter) {
if (err) {
[err,res] = await to(readFile(filePath))
if (err) {
err = {filePath:filePath, cwd:__dirname, error:err, type:type, dir:dir[type], msg:`unable to read ${filePath} options file`}
err = {filePath:normalize(`${this._cwd}/${dir[type]}/${filePath}`), error:err, type:type, msg:`unable to read ${filePath} options file`}
log.warn(err)
return err
}
@ -119,10 +120,12 @@ class Sync extends merge(Rsync, Emitter) {
live() {
this.unset('n')
return this
}
runOpts(options) {
this.opts = Object.assign(this.opts,options)
return this
}
set (option=[],value) {
@ -138,14 +141,12 @@ class Sync extends merge(Rsync, Emitter) {
async watch(cmd) {
// TODO make into switch ?
log.debug(`watch command ${cmd}`)
let opts = {source:this._sources, excludeFrom:this._excludeFiles, ignored:this._exclude }
if (isPlainObject(cmd) || cmd==null || (typeof cmd==='boolean' && cmd) || cmd==='init' || cmd==='add') {
if (cmd.wait) this.debounce(cmd)
let opts = {source:this._sources, excludeFrom:this._excludeFiles, ignored:this._exclude }
await this._watcher.init(opts)
this.syncHandler = syncHandler.call(this,log)
return
if ( cmd.init || cmd==='init') await this._watcher.init(opts)
return this
}
if (cmd==='remove') {
@ -156,7 +157,7 @@ class Sync extends merge(Rsync, Emitter) {
if (cmd ==='on'|| cmd==='start') {
this._watcher.on('changed', this.syncHandler)
this._watcher.start()
this._watcher.start(opts)
return
}
@ -175,22 +176,12 @@ class Sync extends merge(Rsync, Emitter) {
this._watcher.on('changed', this.syncHandler)
return
}
// 'changed' event handler with optional debounce wrapper
function syncHandler(log) {
function sync (change) {
log.info({file:change.file, type:change.type, msg:`file ${change.file} was ${change.type}`})
this.execute(this.opts)
}
log.debug(`in sync make Handler, ${this._debounce}, ${sync}`)
if (this._debounce==null) return sync.bind(this)
return debounce(sync.bind(this),{wait:this.opts.debounce})
}
}
debounce(opts) {
if (opts==null) this._debounce=null
this._debounce = opts
return this
}
sshu (file) {
@ -231,6 +222,7 @@ class Sync extends merge(Rsync, Emitter) {
}
async execute(opts={}) {
opts = Object.assign(this.opts,opts)
log.info({cmd:this.command(), opts:opts, msg:'running rsync command'})
const superexecute = super.execute.bind(this)
return new Promise((resolve, reject) => {
@ -261,11 +253,17 @@ class Sync extends merge(Rsync, Emitter) {
export default Sync
// default handler 'changed' event handler with optional debounce wrapper
function defaultSyncHandler(change={}) {
log.debug({file:change.file, type:change.type, msg:`file ${change.file} was ${change.type}`})
if (this._debounce==null) this.execute(this.opts)
else debounce(this.execute.bind(this),this._debounce)(this.opts)
}
function isPlainObject (obj) {
return Object.prototype.toString.call(obj) === '[object Object]'
}
//
// function escapeSpaces (str) {
// if (typeof str === 'string') {
// return str.replace(/\b\s/g, '\\ ')