0.2.23 Rework interrupts - remove listen methods and automatically send and emit on any one individual interrupt/reset emit

alos bubble up connection events from individual interrupts
Improved/updated examples , remote example can connect to single or mutliple interrupt example
master
David Kebler 2019-12-23 14:15:50 -08:00
parent 01d2ebd3a1
commit 216bd6b98d
9 changed files with 180 additions and 165 deletions

View File

@ -10,7 +10,7 @@ module.exports = {
"mocha": true
},
"parserOptions": {
"ecmaVersion": 2017,
"ecmaVersion": 2017, // 10=2019
"sourceType": "module"
},
"extends": "eslint:recommended",

View File

@ -1,47 +0,0 @@
import Base from '@uci/base'
// const HOST = 'localhost'
const HOST = process.env.HOST || 'localhost'
const PORT = process.env.PORT // default 8080
let processor = new Base({ id:'remote-interrupt-processor', useRootNS:true})
processor.interrupt = async function (packet) {
return new Promise((resolve) => {
console.log('interrupt occured')
console.dir(packet)
resolve({status: 'processed'})
})
}
processor.reply = async function (packet) {
return new Promise((resolve) => {
console.log('reply from interrupt for request', packet._header.request)
console.dir(packet)
resolve({status: 'processed'})
})
}
processor.on('status', ev => {
console.log(`STATUS: **${ev.level}** ${ev.msg}`)}
)
processor.on('consumer-connection', ev => {
console.log(`client ${ev.name} was ${ev.state}`)
})
processor.on('reconnected', client => {
console.log('client reconnected:', client)
})
;
(async () => {
// processor.addSocket('inter','c','t', {host:HOST, port:PORT})
processor.addSocket('inter','c','t',{host:HOST, port:PORT})
await processor.init()
console.log('====sending fire command to interrupt===')
await processor.send({cmd: 'fire'})
// process.kill(process.pid, 'SIGTERM')
})().catch(err => {
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)
})

View File

@ -2,44 +2,44 @@ import Interrupts from '../src/interrupts'
const PINS = [4]
let interrupts = new Interrupts(PINS,{id:'multi-interrupt-example', resetInterval:1, resetEnabled:false, 4:{name:'mybutton'} })
const EDGE = process.env.EDGE || 'both'
const REMOTE_HOST = process.env.REMOTE_HOST || 'sbc'
const REMOTE_PORT = process.env.REMOTE_PORT || 9000
const PORT = process.env.PORT || 9004
let hook = function (packet)
{
packet.cmd = 'interrupt.find'
return packet
}
let interrupts = new Interrupts(PINS,{id:'multi-interrupt-example', resetInterval:1, resetEnabled:false, edge:EDGE, 4:{name:'mybutton'} })
// let hook = function (packet)
// {
// packet.cmd = 'interrupt.find'
// return packet
// }
// interrupts.registerHook(hook)
interrupts.on('status', ev => {
console.log(`STATUS:'--${ev.level}--" ${ev.msg}`)}
interrupts.on('log', ev => {
console.log(`LOG:'--${ev.level}--" ${ev.msg}`)}
)
interrupts.on('consumer-connection', ev => {
console.log(`client ${ev.name} was ${ev.state}`)
interrupts.on('connection:socket', ev => {
// console.dir(ev)
console.log(`connected to remote socket ${ev.socketName}: ${ev.state}`)
})
interrupts.listen(function (packet) {
console.log(`============== ${this.id}=========`)
console.log(`emitted packet from interrupt ${packet.id}, pin:${packet.pin}`)
console.dir(packet)
this.push(packet)
console.log('------------------------')
})
interrupts.listenReset(function (packet) {
console.log(`============== ${this.id}=========`)
console.log('an interrupt reset request emitted')
console.dir(packet)
console.log('------------------------')
})
//
// interrupts.listenReset(function (packet) {
// console.log(`============== ${this.id}=========`)
// console.log('an interrupt reset request emitted')
// console.dir(packet)
// console.log('------------------------')
// })
;
(async () => {
interrupts.addSocket('server','s','t')
interrupts.registerSocket('inter','c','t',{host:REMOTE_HOST, port:REMOTE_PORT})
interrupts.registerSocket('inters','s','t',{port:PORT})
await interrupts.init()
// interrupts.fire()
await interrupts.socketsInit()
})().catch(err => {
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)

61
examples/remote.js Normal file
View File

@ -0,0 +1,61 @@
// a remove interface for an interrrupt
// starts a socket on 9000 by default to receive interrupt events
// start a consumer on 9004 to send commends to gpio interrupt
import Base from '@uci/base'
const INTER_HOST = process.env.INTER_HOST || 'sbc'
const INTER_PORT = process.env.INTER_PORT || 9004
const PORT = process.env.PORT || 9000
let processor = new Base({ id:'remote-interrupt-processor', useRootNS:true})
processor.interrupt = async function (packet) {
return new Promise(async (resolve) => {
console.log('=============interrupt occured =======================')
delete packet._header
console.dir({pin:packet.pin,state:packet.state})
let status = await this.send({cmd:'status'})
// console.log('check status', status)
console.log('confirmed state', packet.state, (packet.state=== status[packet.pin] || packet.state===status.state) )
console.log('======================================')
resolve({processed:true})
// console.log('confirm state', (await this.send({cmd:'status'})).state===packet.state)
})
}
processor.reply = async function (packet) {
return new Promise((resolve) => {
// console.log('reply processor from interrupt socket for request', packet._header.request)
// console.dir(packet)
resolve(packet)
})
}
processor.on('connection:socket', async ev => {
console.log(`============connection event to remote socket========== ${ev.socketName}: ${ev.state}`)
if (ev.state ==='connected') {
console.log('============ test by sending fire,status,reset commands ==========')
console.log(await processor.send({cmd:'reset'}))
console.log(await processor.send({cmd:'status'}))
console.log(await processor.send({cmd:'fire'}))
}
})
processor.on('connection:consumer', ev => {
console.log(`==consumer connection===:',${ev.name}:${ev.state}`)
if (ev.state ==='connected') {
console.log('====================ready to receive and process a remote interrupt==================')
}
})
;
(async () => {
processor.addSocket('inter','c','t',{host:INTER_HOST, port:INTER_PORT,initTimeout:60})
processor.addSocket('inters','s','t',{port:PORT})
await processor.socketsInit()
})().catch(err => {
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)
})

View File

@ -4,11 +4,16 @@ const delay = time => new Promise(res=>setTimeout(()=>res(),time))
const IPIN = process.env.IPIN || 4
let interrupt = new Interrupt(IPIN,{id:'test-interrupt', wait:0, edge:'rising', resetInterval:1, reset:true})
const EDGE = process.env.EDGE || 'both'
const REMOTE_HOST = process.env.REMOTE_HOST || 'sbc'
const REMOTE_PORT = process.env.REMOTE_PORT || 9000
const PORT = process.env.PORT || 9000 + IPIN
let interrupt = new Interrupt(IPIN,{id:'test-interrupt', wait:0, edge:EDGE, resetInterval:1, reset:true})
interrupt.on('interrupt', packet => {
console.log('event: interrupt fired for',interrupt.pin_num)
console.log('count:', packet.count, 'state:',packet.state, 'cmd:',packet.cmd, 'data:',packet.data)
console.log('count:', packet.count, ', state:',packet.state, ', cmd:',packet.cmd, ', data:',packet.data)
})
interrupt.on('interrupt.reset', packet => {
@ -19,19 +24,16 @@ interrupt.on('interrupt.reset', packet => {
;
(async () => {
await interrupt.init()
console.log('interrupt ready and waiting')
console.log('manual fire of interrupt with default hook')
interrupt.fire()
console.log('manual fire of interrupt via after changing hook')
interrupt.registerHook((packet) => {
packet.data='some hook added data'
console.log('custom hook data prop added:', packet.data)
packet.data='a custom property <data> added to packet via hook'
// console.log('custom hook data prop added:', packet.data)
return packet
})
interrupt.fire()
await interrupt.init()
interrupt.addSocket('inter','c','t',{host:REMOTE_HOST, port:REMOTE_PORT})
interrupt.addSocket('inters','s','t',{port:PORT})
console.log(await interrupt.socketsInit())
console.log('interrupt ready and waiting')
})().catch(err => {

View File

@ -1,9 +1,8 @@
import Base from '@uci/base'
// const HOST = 'localhost'
const HOST = process.env.HOST || 'sbc'
const PORT = process.env.PORT
let processor = new Base({sockets:'inter#c>t', inter:{host:HOST, port:PORT}, id:'interrupt-processor', useRootNS:true})
const PORT = process.env.PORT || 9000
let processor = new Base({sockets:'inter#s>t', inter:{port:PORT}, id:'interrupt-processor', useRootNS:true})
processor.interrupt = async function (packet) {
return new Promise((resolve) => {
@ -26,11 +25,7 @@ processor.reply = async function (packet) {
(async () => {
await processor.init()
console.log('====sending fire command to interrupt===')
await processor.send({cmd: 'fire'})
console.log('====sending fire command to interrupt===')
await processor.send({cmd: 'fire'})
// process.kill(process.pid, 'SIGTERM')
})().catch(err => {
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)

View File

@ -1,18 +1,18 @@
{
"name": "@uci/interrupt",
"main": "src",
"version": "0.2.22",
"version": "0.2.23",
"description": "a class for adding interrupt processesing for gpio pins on Raspberry Pi and Similar SBCs",
"scripts": {
"single": "node -r esm examples/single",
"single:dev": "UCI_ENV=dev ./node_modules/.bin/nodemon -r esm examples/single",
"single:dev": "UCI_ENV=dev ./node_modules/.bin/nodemon -r esm --preserve-symlinks examples/single",
"single:debug": "UCI_LOG_LEVEL=debug npm run single:dev",
"multi": "node -r esm examples/multi",
"multi:dev": "UCI_ENV=dev ./node_modules/.bin/nodemon -r esm examples/multi",
"multi:dev": "UCI_ENV=dev ./node_modules/.bin/nodemon -r esm --preserve-symlinks examples/multi",
"multi:debug": "UCI_LOG_LEVEL=debug npm run multi:dev",
"client": "node -r esm examples/client",
"client:dev": "UCI_ENV=dev ./node_modules/.bin/nodemon -r esm examples/client",
"client:debug": "UCI_LOG_LEVEL=debug npm run client:dev"
"remote": "node -r esm examples/remote",
"remote:dev": "UCI_ENV=dev ./node_modules/.bin/nodemon -r esm --preserve-symlinks examples/remote",
"remote:debug": "UCI_LOG_LEVEL=debug npm run remote:dev"
},
"author": "David Kebler",
"license": "MIT",

View File

@ -9,6 +9,7 @@ let log = logger({package:'@uci/interrupt', file:'/src/interrupt.js'})
// for specific port number pass itrt.port,itrn.path,itrm.topic,itrw.port which override it if present
// conPacket is for connecting consumers. This will send this conPacket command on connect, which may needed to initilize something on related hardware
class Interrupt extends Base {
constructor(pin, opts = {}) {
pin = Number(pin) // make sure pin is a number!
super(opts)
@ -25,7 +26,7 @@ class Interrupt extends Base {
// pull down/up (down is default) can't be set here it is done by in DTOs or in RPI in config.txt
// this is only used to monitor the status of the interrupt
this.pull = opts.pull || 'down'
this.ready = this.edge === 'both' ? true : false // true is interrupt is ready
this.__ready = this.edge === 'both' ? true : false // true is interrupt is ready
this.pin = {}
this.count = 0
this.packet = opts.packet || {}
@ -42,21 +43,18 @@ class Interrupt extends Base {
} // end constructor
async init() {
await super.init()
this.count = 0
// TODO devel mock versions for testing on other than sbc with gpios
this.pin = new Gpio(this.pin_num, 'in', this.edge, { debounceTimeout:this.wait })
if (this.resetEnabled) log.debug({msg:'initial connect interrupt reset packet sent', ready:await this.reset(), method:'init', line:53})
if (this.resetInterval && this.resetEnabled) setInterval(this.reset.bind(this),this.resetInterval)
// if (this.resetEnabled) log.debug({msg:'initial connect interrupt reset packet sent', ready:await this.reset(), method:'init', line:53})
// if (this.resetInterval && this.resetEnabled) setInterval(this.reset.bind(this),this.resetInterval)
DeadJim( (signal,err) => {
log.warn({signal:signal, method:'init', line:56, error:err, msg:'Interrupt process was killed, remove watchers, unexport'})
this.pin.unwatchAll()
this.pin.unexport() // kill the kernel entry
})
this.pin.watch( function (err,value) {
@ -65,10 +63,7 @@ class Interrupt extends Base {
this._interruptProcess(value,err)
}.bind(this))
this.on('reconnected', this.reset.bind(this))
this.on('connected', this.reset.bind(this))
log.debug({msg:'new interrupt pin created and watching', method:'init', line: 62, pin_num:this.pin_num, state:await this.status(), ready:this.ready, edge:this.edge,debounce:this.wait})
log.info({msg:'new interrupt pin created and watching', method:'init', line: 62, pin_num:this.pin_num, state:await this.status(), ready:this.__ready, edge:this.edge,debounce:this.wait})
} // end init
@ -86,11 +81,11 @@ class Interrupt extends Base {
// returns true if pin is ready and waiting to trigger interrupt
async status(packet) {
let status = await this.pin.read()
if (this.edge !=='both') this.ready = this.pull==='down' ? !status : !!status // ready is always true for 'both'
if (this.edge !=='both') this.__ready = this.pull==='down' ? !status : !!status // ready is always true for 'both'
if (packet) {
packet.pin = this.pin_num
packet.state = status
if (this.edge !=='both') packet.ready = this.ready
if (this.edge !=='both') packet.ready = this.__ready
packet.cmd = 'reply'
return packet
}
@ -100,20 +95,19 @@ class Interrupt extends Base {
async reset(packet) {
let res = {}
if (this.edge !=='both' && this.resetEnabled) {
if (!this.ready) {
if (!this.__ready) {
let reset = Object.assign({},this.packet)
reset.cmd = this.resetCmd
this.emit(this.resetCmd,reset) // emit locally
await this.send(reset)
await this.push(reset)
await this.status()
log.error({msg: `interrupt was forced reset. ready now? ${this.ready}`})
res = {cmd:'reply', msg:`attempted interrupt reset ${this.ready? 'succeeded' : 'failed'}`, reset:true, ready:this.ready}
log.error({msg: `interrupt was forced reset. ready now? ${this.__ready}`})
res = {cmd:'reply', msg:`attempted interrupt reset ${this.__ready? 'succeeded' : 'failed'}`, reset:true, ready:this.__ready}
}
else res = {cmd:'reply', reset:false, ready:true, msg:'interrupt was ready, no action taken'}
} else res = {cmd:'reply', reset:false, ready:true, msg:'reset NA or disabled'}
if (packet) return Object.assign(packet,res)
return this.ready
return this.__ready
}
// use hook to do more processing
@ -127,12 +121,16 @@ class Interrupt extends Base {
packet.timeStamp = Date.now()
packet.dateTime = new Date().toString()
if (this._hookFunc) packet = await this._hookFunc.call(this,packet)
log.debug({packet: packet, msg:'interrupt tripped, emit/send/push packet to all connected/listening'})
log.debug({packet: packet, msg:'interrupt tripped, emit/send packet to all connected/listening'})
this.emit('interrupt',packet) // emit locally
this.send(packet) // will send a packet via client to any socket
this.push(packet) // will push on any socket
this.send(packet) // will send packet via client to any connected socket
}
// replace default processor function arguments are value of pin and any error
registerProcessor(func) {
this._interruptProcess = func
}
// hook into default interrupt processor for custom processing including packet modification
registerHook(func) {
if (func) this._hookFunc = func
else this._hookFunc=defaultHook
@ -147,7 +145,7 @@ export default Interrupt
async function defaultHook(packet) {
// return a promise or use await if anything async happens in hook
// new Promise((resolve) => {
console.log('==========default hook =============')
console.log('==========example hook fucntion =============')
console.log(`pin ${packet.pin} on sbc gpio bus has thrown an interrupt`)
console.log('can change anything in the packet in this hook')
console.log('to replace this use .registerHook(function)')

View File

@ -7,14 +7,20 @@ let log = {}
// will more easily create a group of sbc pin interrupts
class Interrupts extends Base {
constructor(pins, opts = {}) {
super(opts)
this.id = this.id || 'interrupts'
this.pins = pins.map(pin => Number(pin)) // make sure actual numbers are passed
this._interrupts = new Map()
this._s = { fire:this.fire.bind(this)} // make fire available via consumer packet send
this.resetCmd = opts.resetCmd || 'interrupt.reset'
log = logger({ name: 'interrupts', id: this.id, package:'@uci/interrupt', file:'src/interrupts.js'})
this.commands = {
fire:makefunc.bind(this,'fire'),
status:makefunc.bind(this,'status'),
reset:makefunc.bind(this,'reset'),
}
this.addNamespace('commands', 's') // give access to these commands above if a socket/server is created
let pinopts = {}
pins.forEach(pin => {
// remove per pin opts and store
@ -27,22 +33,38 @@ class Interrupts extends Base {
log.debug({ opts: pinopts[pin], method:'constructor', line:25, msg:`pin options for pin ${pin}`})
this._interrupts.set(pin, new Interrupt(pin, pinopts[pin]))
// bubble up events from single interrupts to common
const EVENTS = ['status','consumer-connection']
// bubble up events from single interrupts
const EVENTS=['log','connection','connection:consumer', 'connection:socket'] // that should emit up from pin base
EVENTS.forEach(event => {
this.interrupt(pin).on(event, data => {
data.interrupt = { msg:'emitted event from single interrupt', pin:pin, id:pinopts[pin].id }
this.emit(event,data)
this.interrupt(pin).on(event, obj => {
if (Object.prototype.toString.call(obj) !== '[object Object]') {
let data=obj
obj = {}
obj.data = data
}
obj.pin = pin
obj.id = pinopts[pin].id
this.emit(event,obj)
})
})
})
}
interrupt(pin) { return this._interrupts.get(Number(pin)) }
this._interrupts.forEach( inter => {
inter.on('interrupt', packet => {
this.send(packet) // send via common consumer
this.emit('interrupt',packet)
})
inter.on(this.resetCmd, packet => {
this.send(packet) // send via common consumer
this.emit(this.resetCmd,packet)
})
})
} // end constructor
interrupt(pin) { return this._interrupts.get(Number(pin)) } // get a handle to single interrupt
async init() {
let res = await super.init()
if (res.errors) return Promise.reject(res.errors)
return Promise.all(
Array.from(this._interrupts).map(inter => {
return inter[1].init()
@ -50,46 +72,17 @@ class Interrupts extends Base {
)
}
// combine all interrupt emits to one handler
async listen(fn) {
this._interrupts.forEach( inter => {
if (fn==='stop') inter.removeAllListeners(inter.packet.cmd)
else inter.on(inter.packet.cmd, fn.bind(this))
})
}
async listenReset(fn) {
this._interrupts.forEach( inter => {
if (fn==='stop') inter.removeAllListeners(inter.resetCmd)
else inter.on(inter.resetCmd, fn.bind(this))
})
}
// only adds consumer sockets to each interrupt to same socket/server
// alternatively use listen handler and single socket
async addInterSocket(name,type) {
if (type !=='s') {
return Promise.all(
Array.from(this._interrupts).map(inter => {
return inter[1].addSocket(...arguments)
})
)
this._interrupts.forEach( inter => {
inter.registerSocket(...arguments)
})
}
}
// manual firing of all pins for testing
async fire(packet={}) {
if (!packet.pin || packet.pin==='all') {
for (let inter of this._interrupts.entries()) {
packet[inter[0]] = await inter[1].fire({})
}
packet.cmd='reply'
return packet
}
let pin = isNaN(Number(packet)) ? packet.pin : packet
if (this._interrupts.has(Number(pin))) return await this.interrupt(packet.pin).fire(packet)
}
registerHook(func) {
this._interrupts.forEach(inter => {
inter.registerHook(func)
@ -99,3 +92,16 @@ class Interrupts extends Base {
} // end Class
export default Interrupts
async function makefunc(fn, packet) {
if (!packet.pin || packet.pin==='all') {
for (let inter of this._interrupts.entries()) {
packet[inter[0]] = await inter[1][fn]()
}
packet.cmd='reply'
return packet
}
let pin = isNaN(Number(packet)) ? packet.pin : packet
if (this._interrupts.has(Number(pin))) return await this.interrupt(packet.pin)[fn](packet)
}