add reset and status of interrupt commands

for interrupts add methods so add socket to each interrupt and also listener common function for 'interrupt' event on all interrupts
master
David Kebler 2019-04-20 16:54:07 -07:00
parent e210c404e0
commit 691ddf2666
2 changed files with 128 additions and 59 deletions

View File

@ -11,59 +11,79 @@ let log = logger({package:'@uci/interrupt', file:'/src/interrupt.js'})
class Interrupt extends Base { class Interrupt extends Base {
constructor(pin, opts = {}) { constructor(pin, opts = {}) {
if (typeof pin !=='number') pin = parseInt(pin) // make sure pin is a number! if (typeof pin !=='number') pin = parseInt(pin) // make sure pin is a number!
opts.conPacket = (opts.resetCmd && !opts.conPacket) ? { cmd: opts.resetCmd, pin: pin } : opts.conPacket // will use either option // opts.conPacket = (opts.resetCmd && !opts.conPacket) ? { cmd: opts.resetCmd, pin: pin } : opts.conPacket // will use either option
log.debug({conPacket: opts.conPacket, msg:'connection packet for consumers'}) // log.debug({conPacket: opts.conPacket, msg:'connection packet for consumers'})
if (opts.path || opts.itrn) {
opts.itrn = opts.itrn || {} // if (opts.path || opts.itrn) {
if (opts.path && typeof opts.path !=='boolean') opts.path = opts.path + ':' + pin // opts.itrn = opts.itrn || {}
if (typeof opts.path ==='boolean') opts.path = '' // if (opts.path && typeof opts.path !=='boolean') opts.path = opts.path + ':' + pin
opts.itrn.path = opts.itrn.path || opts.path || 'interrupt:' + pin // if (typeof opts.path ==='boolean') opts.path = ''
opts.itrn.conPacket = opts.conPacket // opts.itrn.path = opts.itrn.path || opts.path || 'interrupt:' + pin
opts.sockets = (opts.sockets ? opts.sockets + ',' : '') + 'itrn#s>n' // opts.itrn.conPacket = opts.conPacket
} // opts.sockets = (opts.sockets ? opts.sockets + ',' : '') + 'itrn#s>n'
if (opts.topic || opts.itrm) { // }
opts.itrm = opts.itrm || {} // if (opts.topic || opts.itrm) {
opts.itrm.topics = opts.itrm.topic || opts.topic +'/'+ pin || 'interrupt/' + pin // opts.itrm = opts.itrm || {}
opts.sockets = (opts.sockets ? opts.sockets + ',' : '') + 'itrm#s>m' // opts.itrm.topics = opts.itrm.topic || opts.topic +'/'+ pin || 'interrupt/' + pin
} // opts.sockets = (opts.sockets ? opts.sockets + ',' : '') + 'itrm#s>m'
// }
//
// if (opts.itrw || opts.wport || opts.wport===0 ) {
// opts.itrw = opts.itrw || {}
// if (opts.wport) opts.wport = opts.wport + +pin
// opts.itrw.port = opts.itrw.port || opts.wport || 9100 + +pin
// opts.sockets = (opts.sockets ? opts.sockets + ',' : '') + 'itrw#s>w'
// }
// // default is a tcp socket server at 9000+pin
// // if (opts.itrt || opts.port || opts.port===0 || !opts.sockets ) {
// if (opts.itrt || opts.port || opts.port===0 ) {
// opts.itrt = opts.itrt || {}
// if (opts.port) opts.port = opts.port + +pin
// opts.itrt.port = opts.itrt.port || opts.port || 9000 + +pin
// opts.itrt.conPacket = opts.conPacket
// opts.sockets = (opts.sockets ? opts.sockets + ',' : '') + 'itrt#s>t'
// }
if (opts.itrw || opts.wport || opts.wport===0 ) {
opts.itrw = opts.itrw || {}
if (opts.wport) opts.wport = opts.wport + +pin
opts.itrw.port = opts.itrw.port || opts.wport || 9100 + +pin
opts.sockets = (opts.sockets ? opts.sockets + ',' : '') + 'itrw#s>w'
}
// default is a tcp socket server at 9000+pin
if (opts.itrt || opts.port || opts.port===0 || !opts.sockets ) {
opts.itrt = opts.itrt || {}
if (opts.port) opts.port = opts.port + +pin
opts.itrt.port = opts.itrt.port || opts.port || 9000 + +pin
opts.itrt.conPacket = opts.conPacket
opts.sockets = (opts.sockets ? opts.sockets + ',' : '') + 'itrt#s>t'
}
super(opts) super(opts)
this.id = (opts.id || 'interrupt') + ':' + pin this.id = (opts.id || 'interrupt') + ':' + pin
log.info({ pins: pin, opts: opts }, 'created interrupt with these opts') log.info({ pins: pin, opts: opts }, 'created interrupt with these opts')
this.pin_num = pin this.pin_num = pin
this.resetCmd = opts.resetCmd || 'interrupt.reset'
this.resetInterval = opts.resetInterval * 1000 // sets an interval timeout to check on status and send/emit reset command
this.mock = opts.mock || process.env.MOCK this.mock = opts.mock || process.env.MOCK
this.wait = opts.wait || 0 // debounce is off by default this.wait = opts.wait || 0 // debounce is off by default
// https://github.com/fivdi/onoff#gpiogpio-direction--edge--options // https://github.com/fivdi/onoff#gpiogpio-direction--edge--options
this.edge = opts.edge || 'rising' // falling,both,none=no interrupt this.edge = opts.edge || 'rising' // falling,both,none=no interrupt
// this.pull = opts.pull || 'low' // 'high' // pull down/up (down is default) can't be set here it is done by in DTOs or in RPI in config.txt
// this is only used to monitor the status of the interrupt
this.pull = opts.pull || 'down'
this.ready = false // true is interrupt is ready
this.pin = {} this.pin = {}
this.hook = opts.hook this.hook = opts.hook
this.packet = opts.packet || {} this.packet = opts.packet || {}
this.packet.id = this.id this.packet.id = this.id
this.packet.pin = this.pin_num this.packet.pin = this.pin_num
this.packet.cmd = this.packet.cmd || opts.pushCmd || 'interrupt' this.packet.cmd = this.packet.cmd || opts.cmd || opts.interruptCmd || 'interrupt'
this.packet.count = 0 this.packet.count = 0
this._hookFunc = defaultHook this._hookFunc = defaultHook
this.s = { fire:this.fire.bind(this)} // make fire available via consumer packet send this.commands = {
fire:this.fire.bind(this),
status:this.status.bind(this),
reset: this.reset.bind(this)
}
this.addNamespace('commands', 's') // give access to these commands above if a socket/server is created
} // end constructor } // end constructor
async init() { async init() {
await super.init() await super.init()
this.count = 0
// TODO devel mock versions for testing on other than sbc with gpios
this.pin = new Gpio(this.pin_num, 'in', this.edge, { debounceTimeout:this.wait })
console.log('initial connect reset sent',await this.reset())
DeadJim( (signal,err) => { DeadJim( (signal,err) => {
log.warn({signal:signal, error:err, msg:'Interrupt was killed'}) log.warn({signal:signal, error:err, msg:'Interrupt was killed'})
this.pin.unwatchAll() this.pin.unwatchAll()
@ -71,37 +91,76 @@ class Interrupt extends Base {
}) })
this.pin = new Gpio(this.pin_num, 'in', this.edge, { debounceTimeout:this.wait }) log.debug({msg:'new interrupt pin created and watching', num:this.pin_num, status:await this.status() ? 'ready' : 'not ready', pin:this.pin, edge:this.edge,debounce:this.wait})
log.debug({msg:'new interrupt pin created and watching', pin:this.pin, edge:this.edge,debounce:this.wait}) this.socket.mcp.on('reconnected', () => console.log('test listen reconnected'))
console.log('setting connect listener')
this.consumersListen('reconnected', () => console.log('connected to a socket emit a reset'))
// await this.reset()
// })
this.socket.mcp.emit('reconnected')
if (this.resetTimeout) setInterval(this.reset.bind(this),this.resetTimeout)
this.pin.watch( function (err,value) { this.pin.watch( function (err,value) {
log.debug('sbc interrupt tripped', err, value) log.debug('sbc interrupt tripped, value:', value, 'error:', err)
this._interruptProcess(this.packet,err,value) this.count +=1
this._interruptProcess(value,err)
}.bind(this)) }.bind(this))
} // end init } // end init
// manual firing for testing // manual firing for testing
async fire(packet) { async fire(packet={}) {
console.log('manually firing interrupt for pin', this.pin_num) log.info({msg:`mock manually firing interrupt for pin ${this.pin_num}`})
await this._interruptProcess(this.packet,null,'manual') await this._interruptProcess(1)
packet.status = 'fired' packet.status = 'fired'
packet.ipin = this.pin_num
packet.cmd = 'reply'
return packet return packet
} }
// use hook to do more processing // returns true if pin is ready and waiting to trigger interrupt
async _interruptProcess(packet,err,value) { async status(packet={}) {
console.log('from watch listener', packet.pin, err, value) let status = await this.pin.read()
packet.error = err this.ready = this.pull==='down' ? !status : !!status
packet.state = value packet.pin = this.pin_num
packet.count += 1 packet.cmd = 'reply'
packet.time = new Date().getTime() packet.ready = this.ready
if (this.hook) packet = await this._hookFunc.call(this,packet) return packet
log.debug({ pin:this.pin, packet: packet }, 'packet pushing to all clients')
await this.push(packet)
} }
async reset(packet={}) {
if (!(await this.status()).ready) {
packet.cmd = this.resetCmd
packet.pin =this.pin_num
this.emit(this.resetCmd) // emit locally
await this.send(packet)
await this.push(packet)
log.debug({msg: `interrupt was reset. ready now? ${(await this.status()).ready}`})
return {cmd:'reply', reset:true, ready: (await this.status()).ready}
}
return {cmd:'reply', reset:false, ready:true}
}
// use hook to do more processing
async _interruptProcess(value,err) {
let packet = Object.assign({},this.packet)
packet.error = err
packet.state = value
packet.count = this.count
packet.timeStamp = Date.now()
packet.dateTime = new Date().toString()
if (this.hook) packet = await this._hookFunc.call(this,packet)
log.debug({packet: packet, msg:'interrupt tripped, emit/send/push packet to all connected/listening'})
this.emit('interrupt',packet) // emit locally
await this.send(packet) // will send a packet via client to any socket
this.push(packet) // will push on any socket
// if (this.resetTimeout) await this.reset() // extra insurance that interrupt got reset
}
registerHook(func) { registerHook(func) {
this._hookFunc = func this._hookFunc = func
@ -118,7 +177,7 @@ async function defaultHook(packet) {
// new Promise((resolve) => { // new Promise((resolve) => {
console.log('==========default hook =============') console.log('==========default hook =============')
console.log(`pin ${packet.pin} on sbc gpio bus has thrown an interrupt`) console.log(`pin ${packet.pin} on sbc gpio bus has thrown an interrupt`)
console.log(`pushing to all connected socket client with cmd:${packet.cmd}`) console.log(`emitting/sending/pushing to all connected socket client with cmd:${packet.cmd}`)
console.dir(packet) console.dir(packet)
console.log('replace by a new function with .registerHook(function) to overwrite this') console.log('replace by a new function with .registerHook(function) to overwrite this')
console.log('Must be async/promise returning if anything async happens in your hook') console.log('Must be async/promise returning if anything async happens in your hook')

View File

@ -1,8 +1,10 @@
import Interrupt from './interrupt' import Interrupt from './interrupt'
import logger from '@uci-utils/logger' import logger from '@uci-utils/logger'
let log = {} let log = {}
// will more easily create a group of sbc pin interrupts
class Interrupts { class Interrupts {
constructor(pins, opts = {}) { constructor(pins, opts = {}) {
this.id = this.id || 'interrupts' this.id = this.id || 'interrupts'
@ -33,6 +35,21 @@ class Interrupts {
) )
} }
async listen(fn) {
this.pins.forEach(pin => {
if (fn==='stop') this.interrupt[pin].removeAllListeners('interrupt')
else this.interrupt[pin].on('interrupt', fn.bind(this))
})
}
async addSocket() {
return Promise.all(
this.pins.map(pin => {
return this.interrupt[pin].addSocket(...arguments)
})
)
}
// manual firing for testing // manual firing for testing
async fire(packet) { async fire(packet) {
if (packet.pin) return await this.interrupt[packet.pin].fire(packet) if (packet.pin) return await this.interrupt[packet.pin].fire(packet)
@ -41,19 +58,12 @@ class Interrupts {
} return packet } return packet
} }
registerHook(func) { registerHook(func) {
this.pins.forEach(async pin => { this.pins.forEach(async pin => {
this.interrupt[pin].registerHook(func) this.interrupt[pin].registerHook(func)
}) })
} }
// used for testing
push(packet) {
this.pins.forEach(async pin => {
console.log('=======all push============', pin, packet)
this.interrupt[pin].push(packet)
})
}
} // end Class } // end Class
export default Interrupts export default Interrupts