From 691ddf26666a487eda7fa7e74f37142e80a86362 Mon Sep 17 00:00:00 2001 From: David Kebler Date: Sat, 20 Apr 2019 16:54:07 -0700 Subject: [PATCH] add reset and status of interrupt commands for interrupts add methods so add socket to each interrupt and also listener common function for 'interrupt' event on all interrupts --- src/interrupt.js | 159 +++++++++++++++++++++++++++++++--------------- src/interrupts.js | 28 +++++--- 2 files changed, 128 insertions(+), 59 deletions(-) diff --git a/src/interrupt.js b/src/interrupt.js index f0b09ab..414d31e 100644 --- a/src/interrupt.js +++ b/src/interrupt.js @@ -11,59 +11,79 @@ let log = logger({package:'@uci/interrupt', file:'/src/interrupt.js'}) class Interrupt extends Base { constructor(pin, opts = {}) { if (typeof pin !=='number') pin = parseInt(pin) // make sure pin is a number! - opts.conPacket = (opts.resetCmd && !opts.conPacket) ? { cmd: opts.resetCmd, pin: pin } : opts.conPacket // will use either option - log.debug({conPacket: opts.conPacket, msg:'connection packet for consumers'}) - if (opts.path || opts.itrn) { - opts.itrn = opts.itrn || {} - if (opts.path && typeof opts.path !=='boolean') opts.path = opts.path + ':' + pin - if (typeof opts.path ==='boolean') opts.path = '' - opts.itrn.path = opts.itrn.path || opts.path || 'interrupt:' + pin - opts.itrn.conPacket = opts.conPacket - opts.sockets = (opts.sockets ? opts.sockets + ',' : '') + 'itrn#s>n' - } - if (opts.topic || opts.itrm) { - opts.itrm = opts.itrm || {} - opts.itrm.topics = opts.itrm.topic || opts.topic +'/'+ pin || 'interrupt/' + pin - opts.sockets = (opts.sockets ? opts.sockets + ',' : '') + 'itrm#s>m' - } + // opts.conPacket = (opts.resetCmd && !opts.conPacket) ? { cmd: opts.resetCmd, pin: pin } : opts.conPacket // will use either option + // log.debug({conPacket: opts.conPacket, msg:'connection packet for consumers'}) + + // if (opts.path || opts.itrn) { + // opts.itrn = opts.itrn || {} + // if (opts.path && typeof opts.path !=='boolean') opts.path = opts.path + ':' + pin + // if (typeof opts.path ==='boolean') opts.path = '' + // opts.itrn.path = opts.itrn.path || opts.path || 'interrupt:' + pin + // opts.itrn.conPacket = opts.conPacket + // opts.sockets = (opts.sockets ? opts.sockets + ',' : '') + 'itrn#s>n' + // } + // if (opts.topic || opts.itrm) { + // opts.itrm = opts.itrm || {} + // opts.itrm.topics = opts.itrm.topic || opts.topic +'/'+ pin || 'interrupt/' + pin + // opts.sockets = (opts.sockets ? opts.sockets + ',' : '') + 'itrm#s>m' + // } + // + // if (opts.itrw || opts.wport || opts.wport===0 ) { + // opts.itrw = opts.itrw || {} + // if (opts.wport) opts.wport = opts.wport + +pin + // opts.itrw.port = opts.itrw.port || opts.wport || 9100 + +pin + // opts.sockets = (opts.sockets ? opts.sockets + ',' : '') + 'itrw#s>w' + // } + // // default is a tcp socket server at 9000+pin + // // if (opts.itrt || opts.port || opts.port===0 || !opts.sockets ) { + // if (opts.itrt || opts.port || opts.port===0 ) { + // opts.itrt = opts.itrt || {} + // if (opts.port) opts.port = opts.port + +pin + // opts.itrt.port = opts.itrt.port || opts.port || 9000 + +pin + // opts.itrt.conPacket = opts.conPacket + // opts.sockets = (opts.sockets ? opts.sockets + ',' : '') + 'itrt#s>t' + // } - if (opts.itrw || opts.wport || opts.wport===0 ) { - opts.itrw = opts.itrw || {} - if (opts.wport) opts.wport = opts.wport + +pin - opts.itrw.port = opts.itrw.port || opts.wport || 9100 + +pin - opts.sockets = (opts.sockets ? opts.sockets + ',' : '') + 'itrw#s>w' - } - // default is a tcp socket server at 9000+pin - if (opts.itrt || opts.port || opts.port===0 || !opts.sockets ) { - opts.itrt = opts.itrt || {} - if (opts.port) opts.port = opts.port + +pin - opts.itrt.port = opts.itrt.port || opts.port || 9000 + +pin - opts.itrt.conPacket = opts.conPacket - opts.sockets = (opts.sockets ? opts.sockets + ',' : '') + 'itrt#s>t' - } super(opts) this.id = (opts.id || 'interrupt') + ':' + pin log.info({ pins: pin, opts: opts }, 'created interrupt with these opts') this.pin_num = pin + this.resetCmd = opts.resetCmd || 'interrupt.reset' + this.resetInterval = opts.resetInterval * 1000 // sets an interval timeout to check on status and send/emit reset command this.mock = opts.mock || process.env.MOCK this.wait = opts.wait || 0 // debounce is off by default // https://github.com/fivdi/onoff#gpiogpio-direction--edge--options this.edge = opts.edge || 'rising' // falling,both,none=no interrupt - // this.pull = opts.pull || 'low' // 'high' + // pull down/up (down is default) can't be set here it is done by in DTOs or in RPI in config.txt + // this is only used to monitor the status of the interrupt + this.pull = opts.pull || 'down' + this.ready = false // true is interrupt is ready this.pin = {} this.hook = opts.hook this.packet = opts.packet || {} this.packet.id = this.id this.packet.pin = this.pin_num - this.packet.cmd = this.packet.cmd || opts.pushCmd || 'interrupt' + this.packet.cmd = this.packet.cmd || opts.cmd || opts.interruptCmd || 'interrupt' this.packet.count = 0 this._hookFunc = defaultHook - this.s = { fire:this.fire.bind(this)} // make fire available via consumer packet send + this.commands = { + fire:this.fire.bind(this), + status:this.status.bind(this), + reset: this.reset.bind(this) + } + this.addNamespace('commands', 's') // give access to these commands above if a socket/server is created } // end constructor async init() { await super.init() + this.count = 0 + + // TODO devel mock versions for testing on other than sbc with gpios + this.pin = new Gpio(this.pin_num, 'in', this.edge, { debounceTimeout:this.wait }) + + console.log('initial connect reset sent',await this.reset()) + DeadJim( (signal,err) => { log.warn({signal:signal, error:err, msg:'Interrupt was killed'}) this.pin.unwatchAll() @@ -71,37 +91,76 @@ class Interrupt extends Base { }) - this.pin = new Gpio(this.pin_num, 'in', this.edge, { debounceTimeout:this.wait }) + log.debug({msg:'new interrupt pin created and watching', num:this.pin_num, status:await this.status() ? 'ready' : 'not ready', pin:this.pin, edge:this.edge,debounce:this.wait}) - log.debug({msg:'new interrupt pin created and watching', pin:this.pin, edge:this.edge,debounce:this.wait}) + this.socket.mcp.on('reconnected', () => console.log('test listen reconnected')) + + console.log('setting connect listener') + this.consumersListen('reconnected', () => console.log('connected to a socket emit a reset')) + // await this.reset() + // }) + + this.socket.mcp.emit('reconnected') + + if (this.resetTimeout) setInterval(this.reset.bind(this),this.resetTimeout) this.pin.watch( function (err,value) { - log.debug('sbc interrupt tripped', err, value) - this._interruptProcess(this.packet,err,value) + log.debug('sbc interrupt tripped, value:', value, 'error:', err) + this.count +=1 + this._interruptProcess(value,err) }.bind(this)) } // end init // manual firing for testing - async fire(packet) { - console.log('manually firing interrupt for pin', this.pin_num) - await this._interruptProcess(this.packet,null,'manual') + async fire(packet={}) { + log.info({msg:`mock manually firing interrupt for pin ${this.pin_num}`}) + await this._interruptProcess(1) packet.status = 'fired' + packet.ipin = this.pin_num + packet.cmd = 'reply' return packet } - // use hook to do more processing - async _interruptProcess(packet,err,value) { - console.log('from watch listener', packet.pin, err, value) - packet.error = err - packet.state = value - packet.count += 1 - packet.time = new Date().getTime() - if (this.hook) packet = await this._hookFunc.call(this,packet) - log.debug({ pin:this.pin, packet: packet }, 'packet pushing to all clients') - await this.push(packet) + // returns true if pin is ready and waiting to trigger interrupt + async status(packet={}) { + let status = await this.pin.read() + this.ready = this.pull==='down' ? !status : !!status + packet.pin = this.pin_num + packet.cmd = 'reply' + packet.ready = this.ready + return packet } + async reset(packet={}) { + if (!(await this.status()).ready) { + packet.cmd = this.resetCmd + packet.pin =this.pin_num + this.emit(this.resetCmd) // emit locally + await this.send(packet) + await this.push(packet) + log.debug({msg: `interrupt was reset. ready now? ${(await this.status()).ready}`}) + return {cmd:'reply', reset:true, ready: (await this.status()).ready} + } + return {cmd:'reply', reset:false, ready:true} + } + + + // use hook to do more processing + async _interruptProcess(value,err) { + let packet = Object.assign({},this.packet) + packet.error = err + packet.state = value + packet.count = this.count + packet.timeStamp = Date.now() + packet.dateTime = new Date().toString() + if (this.hook) packet = await this._hookFunc.call(this,packet) + log.debug({packet: packet, msg:'interrupt tripped, emit/send/push packet to all connected/listening'}) + this.emit('interrupt',packet) // emit locally + await this.send(packet) // will send a packet via client to any socket + this.push(packet) // will push on any socket + // if (this.resetTimeout) await this.reset() // extra insurance that interrupt got reset + } registerHook(func) { this._hookFunc = func @@ -118,7 +177,7 @@ async function defaultHook(packet) { // new Promise((resolve) => { console.log('==========default hook =============') console.log(`pin ${packet.pin} on sbc gpio bus has thrown an interrupt`) - console.log(`pushing to all connected socket client with cmd:${packet.cmd}`) + console.log(`emitting/sending/pushing to all connected socket client with cmd:${packet.cmd}`) console.dir(packet) console.log('replace by a new function with .registerHook(function) to overwrite this') console.log('Must be async/promise returning if anything async happens in your hook') diff --git a/src/interrupts.js b/src/interrupts.js index 7ca636c..e126ba1 100644 --- a/src/interrupts.js +++ b/src/interrupts.js @@ -1,8 +1,10 @@ import Interrupt from './interrupt' - import logger from '@uci-utils/logger' let log = {} + +// will more easily create a group of sbc pin interrupts + class Interrupts { constructor(pins, opts = {}) { this.id = this.id || 'interrupts' @@ -33,6 +35,21 @@ class Interrupts { ) } + async listen(fn) { + this.pins.forEach(pin => { + if (fn==='stop') this.interrupt[pin].removeAllListeners('interrupt') + else this.interrupt[pin].on('interrupt', fn.bind(this)) + }) + } + + async addSocket() { + return Promise.all( + this.pins.map(pin => { + return this.interrupt[pin].addSocket(...arguments) + }) + ) + } + // manual firing for testing async fire(packet) { if (packet.pin) return await this.interrupt[packet.pin].fire(packet) @@ -41,19 +58,12 @@ class Interrupts { } return packet } - registerHook(func) { this.pins.forEach(async pin => { this.interrupt[pin].registerHook(func) }) } - // used for testing - push(packet) { - this.pins.forEach(async pin => { - console.log('=======all push============', pin, packet) - this.interrupt[pin].push(packet) - }) - } + } // end Class export default Interrupts