0.3.1 refactor ack processing, fix ackInterval option issue

much improved example that reads in a yaml file of options
nodemon.json set up for yaml as well
master
David Kebler 2020-03-24 14:28:43 -07:00
parent 933c3d843c
commit 0c7bfd11f5
4 changed files with 94 additions and 20 deletions

View File

@ -1,20 +1,87 @@
/* import { readFile as read } from 'fs-read-data'
* Basic i2c bus listening at 1776 (by default when tcp:true) and also named pipe 'i2c-bus' by default in default /tmp directory import onDeath from 'ondeath'
*
*/
import Bus from '../src/bus'
// can use SOCKETS_DIR='' env variable to get a 'i2c-bus.sock' in a particular directory
// e.g. SOCKETS_DIR=/opt/sockets node -r @std/esm i2cbus
const CONCURRENCY = 1
import Bus from '../src/bus.js'
let options = {}
; ;
(async () => { (async () => {
let i2cbus = new Bus({id:'i2c-busx',tcp:true, concurrency:CONCURRENCY}) options = await read('./examples/bus.yaml')
options.id = options.id ||'i2c-bus'
// console.log('----------------switches start options--------\n',options,'\n---------------------------------')
// console.log('----------------switches interrupt options--------\n',options.interrupt,'\n---------------------------------')
let i2cbus = new Bus(options)
if ((process.env.UCI_ENV ||'').includes('dev')) {
i2cbus.on('log',ev => {
switch (ev.level) {
// case 'warning':
// case 'error':
// case 'testing':
case 'ready':
case 'state':
case 'i2c':
case 'fatal':
// case 'info':
console.log(ev.level.toUpperCase(),'\n',ev)
break
}
})
process.once('SIGUSR2', async () => {
shutdown.call(i2cbus)
process.kill(process.pid, 'SIGUSR2')
})
} // dev
onDeath( async () => {
console.log('\nThe I2C Bus, \nHe\'s dead Jim')
shutdown.call(i2cbus)
console.log('shutdown done')
process.exit()
})
console.log('before init')
await i2cbus.init() await i2cbus.init()
console.log('after init',i2cbus.ready.observers,i2cbus.ready.state)
i2cbus.ready.all.subscribe(
(ready) => {
if (!ready) {
console.log(options.name, 'YIKES! some observer is not reporting ready')
let failed = i2cbus.ready.state.filter(obs=> obs[1]===false).map(obs=>[obs[0],i2cbus.ready.getObserverDetails(obs)])
console.log('those that have failed\n',failed)
} else {
console.log(options.name, ': This I2C BUS is ONLINE!!!!')
console.log(i2cbus.ready.state)
}
})
})().catch(err => { })().catch(err => {
console.error('FATAL: UNABLE TO START SYSTEM!\n',err) console.log('FATAL: UNABLE TO START IC2BUS - KILLING FOR OS RESTART !\n',err, options)
process.exitCode = 1
process.kill(process.pid, 'SIGINT')
}) })
async function shutdown (){
let names = Object.keys(this.getSocket())
console.log(names)
for (let name of names) {
console.log(name)
console.log(await this.removeSocket(name))
console.log('after remove', name)
}
this.removeAllListeners()
}

View File

@ -1,4 +1,5 @@
{ {
"ignoreRoot": [".git"], "ignoreRoot": [".git"],
"watch": ["node_modules/@uci/","node_modules/@uci-utils/","src/","examples/bus.js"] "watch": ["node_modules/@uci/","node_modules/@uci-utils/","src/","examples/"],
"ext":"js,json,yaml,yml"
} }

View File

@ -1,6 +1,6 @@
{ {
"name": "@uci/i2c-bus", "name": "@uci/i2c-bus",
"version": "0.3.0", "version": "0.3.1",
"description": "I2c Bus Classes for Communication to I2C bus via socket or direct call", "description": "I2c Bus Classes for Communication to I2C bus via socket or direct call",
"main": "src/bus", "main": "src/bus",
"scripts": { "scripts": {
@ -14,9 +14,10 @@
"relays:pipe:dev": "TRANSPORT=pipe npm run relays:dev", "relays:pipe:dev": "TRANSPORT=pipe npm run relays:dev",
"queue": "BUS_DEVICE=queue npm run client", "queue": "BUS_DEVICE=queue npm run client",
"bus": "node --require esm examples/bus", "bus": "node --require esm examples/bus",
"bus:dev": "UCI_ENV=dev ./node_modules/.bin/nodemon --require esm examples/bus", "bus:dev": "UCI_ENV=dev ./node_modules/.bin/nodemon --require esm --preserve-symlinks examples/bus",
"bus:debug": "UCI_LOG_LEVEL=debug npm run bus:dev", "bus:debug": "UCI_LOG_LEVEL=debug npm run bus:dev",
"busl": "UCI_ENV=pro node --require esm examples/bus" "busl": "UCI_ENV=pro node --require esm examples/bus",
"yalcu": "./node_modules/.bin/nodemon --watch /home/sysadmin/.yalc/**/*.js --exec /opt/node-global-apps/bin/yalc update"
}, },
"author": "David Kebler", "author": "David Kebler",
"license": "MIT", "license": "MIT",
@ -39,13 +40,15 @@
}, },
"dependencies": { "dependencies": {
"@uci-utils/logger": "^0.0.16", "@uci-utils/logger": "^0.0.16",
"@uci/base": "^0.5.0", "@uci/base": "^0.5.1",
"await-to-js": "^2.1.1", "await-to-js": "^2.1.1",
"p-queue": "^6.3.0", "p-queue": "^6.3.0",
"pify": "^5.0.0" "pify": "^5.0.0"
}, },
"devDependencies": { "devDependencies": {
"esm": "^3.2.25", "esm": "^3.2.25",
"nodemon": "^2.0.2" "fs-read-data": "^1.0.4",
"nodemon": "^2.0.2",
"ondeath": "^1.0.0"
} }
} }

View File

@ -37,15 +37,17 @@ class I2CBus extends Base {
else bus.ack = true else bus.ack = true
return bus return bus
}, },
this.ackInterval = opts.ackInterval || 5 this.ackInterval = opts.ackInterval!=null ? opts.ackInterval : 5
this._queue = new PQueue({concurrency: opts.concurrency || 1}) this._queue = new PQueue({concurrency: opts.concurrency || 1})
this.ready.addObserver('bus:ack') this.ready.addObserver('bus:ack')
} } // constructor
async init() { async init() {
await super.init() await super.init()
let count = 0 let count = 0
this.autoAck(this.ackInterval) const ack = (await this.bus.ack.call(this)).ack
this.emit('bus:ack',ack) // will be picked up by ready observer
this.autoAck(this.ackInterval) // if auto on then will notice when there is issue/connection with i2cbus
this._queue.on('active', () => { this._queue.on('active', () => {
log.debug(`Queue: working on item #${++count}. Size: ${this._queue.size} Pending: ${this._queue.pending}`) log.debug(`Queue: working on item #${++count}. Size: ${this._queue.size} Pending: ${this._queue.pending}`)
}) })
@ -56,7 +58,7 @@ class I2CBus extends Base {
autoAck(interval) { autoAck(interval) {
if (interval>.9) this._autoAck = setInterval(async ()=> { if (interval>.9) this._autoAck = setInterval(async ()=> {
let ack = (await this.bus.ack.call(this)).ack let ack = (await this.bus.ack.call(this)).ack
this.emit('bus:ack',ack) this.emit('bus:ack',ack) // will be picked up by ready observer
},interval*1000) },interval*1000)
else clearInterval(this._autoAck) else clearInterval(this._autoAck)
} }
@ -123,6 +125,7 @@ async function busFunction (name, func, args=[],packet={}) {
} }
} }
}) })
// this.emit('log', {level:'i2c', packet:packet, error:packet.error, msg:'packet going to bus'})
if (!packet.error) { if (!packet.error) {
log.trace({msg:'adding to queue', function:name,arguments:argsV}) log.trace({msg:'adding to queue', function:name,arguments:argsV})
let [err,res] = await to(this._queue.add(() => func.call(this.i2cbus,...argsV))) let [err,res] = await to(this._queue.add(() => func.call(this.i2cbus,...argsV)))