diff --git a/examples/multi.js b/examples/multi.js index 7fa12d1..1e61da6 100644 --- a/examples/multi.js +++ b/examples/multi.js @@ -1,33 +1,87 @@ -import Interrupts from '../src/interrupts' +import { readFile as read } from 'fs-read-data' +import onDeath from 'ondeath' -const PINS = [9,10] +import { Interrupts } from '../src' -const EDGE = process.env.EDGE || 'both' -const REMOTE_HOST = process.env.REMOTE_HOST || 'sbc' -const REMOTE_PORT = process.env.REMOTE_PORT || 9001 +let options = {} -let interrupts = new Interrupts(PINS, { id:'multi-interrupt', cmd:'interrupt.find', resetCmd:'interrupt.reset', resetInterval:0, edge:EDGE }) - -interrupts.amendConsumerCommands( {reply:() => {}} ) - -interrupts.on('log', ev => { - if (ev.level !== 'debug' && ev.level !== 'trace') console.log(`LOG:'--${ev.level}-- ${ev.msg}`) - // console.log(`LOG:'--${ev.level}-- ${ev.msg}`) -}) - -interrupts.on('connection:socket', ev => { - console.log(`remote socket connection event ${ev.socketName}: ${ev.state}`) -}) - -; + ; (async () => { - interrupts.registerSocket('interrupt','c','t',{host:REMOTE_HOST, port:REMOTE_PORT, name:'interrupt', data:{pins:PINS, type:'interrupt'}}) - // interrupts.registerSocket('listen','s','t',{port:1777}) + options = await read('./examples/multi.yaml') + + options.id = options.id || options.name || 'switches' + + console.log('----------------interrupt start options--------\n',options,'\n---------------------------------') + + let interrupts = new Interrupts(options.pins,options) + + // Uncomment to listen to local events + // interrupts.on('interrupt', packet => { + // console.log(packet.pin, 'has tripped \n interupt has been pushed \n but could take extra action here as well') + // }) + // // + // interrupts.on('ready:24', ready => { + // console.log(ready, 'pin 24, ready: is emitted every time status is checked') + // }) + + if ((process.env.UCI_ENV || '').includes('dev')) { + interrupts.on('log',ev => { + switch (ev.level) { + // case 'warning': + // case 'error': + // case 'info': + // case 'testing': + case 'interrupt': + case 'fatal': + console.log(ev.level.toUpperCase(),'\n',ev) + break + } + }) + + process.once('SIGUSR2', async () => { + shutdown.call(interrupts) + process.kill(process.pid, 'SIGUSR2') + }) + } else { + + onDeath( async () => { + console.log('\ninterrupt plugin device, \nHe\'s dead Jim') + shutdown.call(interrupts) + console.log('shutdown done') + }) + + } + await interrupts.init() - await interrupts.socketsInit() + + console.log('after init',interrupts.ready.observers,interrupts.ready.state) + + interrupts.ready.all.subscribe( + (ready) => { + if (!ready) { + console.log(options.name, 'YIKES! some observer is still not reporting ready') + let failed = interrupts.ready.state.filter(obs=> obs[1]===false).map(obs=>[obs[0],interrupts.ready.getObserverDetails(obs)]) + console.log('those that have failed\n',failed) + // notifiy here + } else { + console.log(options.name, ': This GPIO Interrupt Hardware is ONLINE!!!!') + interrupts.send({cmd:'interrupt.find', stuff:'some info'}) + } + }) })().catch(err => { - console.error('FATAL: UNABLE TO START SYSTEM!\n',err) - // process.kill(process.pid, 'SIGTERM') + console.log('FATAL: UNABLE TO START SYSTEM!\n',err) + process.kill(process.pid, 'SIGTERM') }) + +async function shutdown (){ + let names = Object.keys(this.getSocket()) + console.log(names) + for (let name of names) { + console.log(name) + console.log(await this.removeSocket(name)) + console.log('after remove', name) + } + this.removeAllListeners() +} diff --git a/examples/single.js b/examples/single.js index 8be6f2a..760dfae 100644 --- a/examples/single.js +++ b/examples/single.js @@ -5,8 +5,8 @@ const delay = time => new Promise(res=>setTimeout(()=>res(),time)) const IPIN = process.env.IPIN || 4 const EDGE = process.env.EDGE || 'both' -const REMOTE_HOST = process.env.REMOTE_HOST || 'sbc' -const REMOTE_PORT = process.env.REMOTE_PORT || 9000 +// const REMOTE_HOST = process.env.REMOTE_HOST || 'sbc' +// const REMOTE_PORT = process.env.REMOTE_PORT || 9000 const PORT = process.env.PORT || 9000 + IPIN let interrupt = new Interrupt(IPIN,{id:'test-interrupt', wait:0, edge:EDGE, resetInterval:1, reset:true}) @@ -30,7 +30,7 @@ interrupt.on('interrupt.reset', packet => { return packet }) await interrupt.init() - interrupt.addSocket('inter','c','t',{host:REMOTE_HOST, port:REMOTE_PORT}) + // interrupt.addSocket('inter','c','t',{host:REMOTE_HOST, port:REMOTE_PORT}) interrupt.addSocket('inters','s','t',{port:PORT}) console.log(await interrupt.socketsInit()) console.log('interrupt ready and waiting') diff --git a/package.json b/package.json index 0f36b1e..82f6420 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "@uci/interrupt", "main": "src", - "version": "0.3.0", + "version": "0.3.1", "description": "a class for adding interrupt processesing for gpio pins on Raspberry Pi and Similar SBCs", "scripts": { "single": "node -r esm examples/single", @@ -12,7 +12,8 @@ "multi:debug": "UCI_LOG_LEVEL=debug npm run multi:dev", "remote": "node -r esm examples/remote", "remote:dev": "UCI_ENV=dev ./node_modules/.bin/nodemon -r esm --preserve-symlinks examples/remote", - "remote:debug": "UCI_LOG_LEVEL=debug npm run remote:dev" + "remote:debug": "UCI_LOG_LEVEL=debug npm run remote:dev", + "yalcu": "./node_modules/.bin/nodemon --watch /home/sysadmin/.yalc/**/*.js --exec /opt/node-global-apps/bin/yalc update" }, "author": "David Kebler", "license": "MIT", @@ -33,14 +34,16 @@ "homepage": "https://github.com/uCOMmandIt/uci-interrrupt#readme", "dependencies": { "@uci-utils/logger": "^0.0.16", - "@uci/base": "^0.5.0", + "@uci/base": "^0.5.1", "death": "^1.1.0", "onoff": "^5.0.1" }, "devDependencies": { "chai": "^4.2.0", "esm": "^3.2.25", - "mocha": "^7.1.0", - "nodemon": "^2.0.2" + "fs-read-data": "^1.0.4", + "mocha": "^7.1.1", + "nodemon": "^2.0.2", + "ondeath": "^1.0.0" } } diff --git a/src/interrupt.js b/src/interrupt.js index b4f204c..181137e 100644 --- a/src/interrupt.js +++ b/src/interrupt.js @@ -1,155 +1,180 @@ import Base from '@uci/base' +import EventEmitter from 'events' import DeadJim from 'death' import { Gpio } from 'onoff' import logger from '@uci-utils/logger' let log = logger({package:'@uci/interrupt', file:'/src/interrupt.js'}) -// a pin makes a socket (server/listner) for each pin to which a consumer can be connected -// if opts .port/.path/.topic/.wport are base number/name to which pin number is added/appended (default being 9000,9100,'interrupt') -// for specific port number pass itrt.port,itrn.path,itrm.topic,itrw.port which override it if present -// conPacket is for connecting consumers. This will send this conPacket command on connect, which may needed to initilize something on related hardware -class Interrupt extends Base { - constructor(pin, opts = {}) { - pin = Number(pin) // make sure pin is a number! - super(opts) - this.id = opts.name || (opts.id || 'interrupt') + ':' + pin - log.debug({ pins: pin, opts: opts, method:'constructor', line:16, msg:'created interrupt with these opts'}) - this.pin_num = pin - this.resetCmd = opts.resetCmd || 'interrupt.reset' - this.resetInterval = opts.resetInterval - this.mock = opts.mock || process.env.MOCK - this.wait = opts.wait || 0 // debounce is off by default - // https://github.com/fivdi/onoff#gpiogpio-direction--edge--options - this.edge = opts.edge || 'rising' // falling,both,none=no interrupt - // pull down/up (down is default) can't be set here - // it is done by in DTOs or in RPI in config.txt or via an external pullup/down resistor - // this setting is only needed to monitor the state of the ready interrupt and should match what is set in hardware - this.pull = opts.pull || 'down' - this.pin = {} - this.count = 0 - this.packet = opts.packet || {} - this.packet.id = this.id - this.packet.pin = this.pin_num - this.packet.cmd = this.packet.cmd || opts.cmd || opts.interruptCmd || 'interrupt' - this.packet.count = this.count - this.amendConsumerCommands({reset:this.reset.bind(this)}) - this.amendSocketCommands({ - reset:this.reset.bind(this), - status:this.status.bind(this), - fire:this.fire.bind(this), - intervalReset:this.intervalReset.bind(this) - }) - } // end constructor +function ExtendsInterrupt (pin,opts= {}) { - async init() { - - this.count = 0 - // TODO devel mock versions for testing on other than sbc with gpios - this.pin = new Gpio(this.pin_num, 'in', this.edge, { debounceTimeout:this.wait }) - - DeadJim( (signal,err) => { - log.warn({signal:signal, method:'init', line:56, error:err, msg:'Interrupt process was killed, remove watchers, unexport'}) - clearInterval(this._intervalReset) - this.pin.unwatchAll() - this.pin.unexport() // kill the kernel entry - }) - - this.pin.watch( function (err,value) { - log.debug('interrupt tripped, value:', value, 'error:', err) - this.count +=1 - this._interruptProcess(value,err) - }.bind(this)) - - await this.intervalReset(this.resetInterval) - - this.emit('log',{level:'info', msg:`new interrupt pin ${this.pin_num} created and watching`, state:await this.status(), edge:this.edge,debounce:this.wait}) - - super.init() - - } // end init - - // manual firing for testing - async fire(packet={}) { - log.debug({method:'fire', line:82, msg:`mock manually firing interrupt for pin ${this.pin_num}`}) - await this._interruptProcess(1) - packet.status = 'fired' - packet.pin = this.pin_num - return packet + if (!new.target) { + throw new Error('Uncaught TypeError: Class constructor Interrupt cannot be invoked without \'new\'') } - // returns true if pin is ready and waiting to trigger interrupt - async status(packet) { - let state = await this.pin.read() - let ready = this.pull==='down' ? !state : !!state // ready is always true for 'both' - if (packet) { + if (isNaN(pin)) { + throw new Error('supplied pin argument is not a number') + } + + const SuperClass = opts.multiple ? EventEmitter : Base + + // this always emits local events but will also send if any consumers are created/online + class Interrupt extends SuperClass { + constructor(pin, opts = {}) { + pin = Number(pin) // make sure pin is a number! + super(opts) + this.id = opts.name || (opts.id || 'interrupt') + ':' + pin + log.debug({ pins: pin, opts: opts, method:'constructor', line:16, msg:'created interrupt with these opts'}) + this.pin_num = pin + this.resetCmd = opts.resetCmd || 'interrupt.reset' + this.resetInterval = opts.resetInterval + this.mock = opts.mock || process.env.MOCK + this.wait = opts.wait || 0 // debounce is off by default + // https://github.com/fivdi/onoff#gpiogpio-direction--edge--options + this.edge = opts.edge || 'rising' // falling,both,none=no interrupt + // pull down/up (down is default) can't be set here + // it is done by in DTOs or in RPI in config.txt or via an external pullup/down resistor + // this setting is only needed to monitor the state of the ready interrupt and should match what is set in hardware + this.pull = opts.pull || 'down' + this.pin = {} + this.count = 0 + this.packet = opts.packet || {} + this.packet.id = this.id + this.packet.pin = this.pin_num + this.packet.cmd = this.packet.cmd || opts.cmd || opts.interruptCmd || 'interrupt' + this.packet.count = this.count + this.multiple = opts.multiple // multiple means that this is one of multiple interrupt pins thus it only emits, no direct socket communication + if (opts.multiple) { + this.send = ()=>{} // if not using uci-base then ignore the send and push methods + this.push = ()=>{} + } + else { + this.amendConsumerCommands({reset:this.reset.bind(this)}) + this.amendSocketCommands({ + reset:this.reset.bind(this), + status:this.status.bind(this), + fire:this.fire.bind(this), + intervalReset:this.intervalReset.bind(this) + }) + } + + } // end construtor + + async init() { + + this.count = 0 + // TODO devel mock versions for testing on other than sbc with gpios + this.pin = new Gpio(this.pin_num, 'in', this.edge, { debounceTimeout:this.wait }) + + DeadJim( (signal,err) => { + log.warn({signal:signal, method:'init', line:56, error:err, msg:'Interrupt process was killed, remove watchers, unexport'}) + clearInterval(this._intervalReset) + this.pin.unwatchAll() + this.pin.unexport() // kill the kernel entry + }) + + this.pin.watch( function (err,value) { + log.debug('interrupt tripped, value:', value, 'error:', err) + this.count +=1 + this._interruptProcess(value,err) + }.bind(this)) + + await this.intervalReset(this.resetInterval) + const state = await this.status() + + this.emit('log',{level:'interrupt', msg:`new interrupt pin ${this.pin_num} created and watching`, state:state, edge:this.edge,debounce:this.wait}) + + if (super.init) super.init() // only call if superclass has init (i.e. uci base) + + } // end init + + // manual firing for testing + async fire(packet={}) { + log.debug({method:'fire', line:82, msg:`mock manually firing interrupt for pin ${this.pin_num}`}) + await this._interruptProcess(1) + packet.status = 'fired' packet.pin = this.pin_num - packet.state = state - if (this.edge !=='both') packet.ready = ready return packet } - return this.edge ==='both' ? state : ready - } - async intervalReset(packet) { - let interval = typeof packet === 'number'? packet : (packet || {}).interval - if (!interval || interval<=0) { - clearInterval(this._intervalReset) - this._intervalReset = null - } - else { - this._intervalReset = setInterval(this.reset.bind(this), interval*1000) - } - return this._intervalReset ? true : false - } - - async reset(packet={}) { - let res = {} - this.emit('log',{level:'info', msg:`interrupt reset request for pin ${this.pin_num}`}) - if (this.edge ==='both') res = {level:'info', reset:false, ready:true, msg:'interrupt triggered on rising and falling, no reset action needed'} - else { - if(!await this.status()) { - delete packet._header - this.emit('reset',packet) // emit locally - packet.cmd = this.resetCmd - let state = await this.status() - res = {level:state?'debug':'error', msg:`attempted interrupt reset of pin ${this.pin_num} ${state ? 'succeeded' : 'failed'}`, reset:true, ready:state} + // returns true if pin is ready and waiting to trigger interrupt + async status(packet) { + let state = await this.pin.read() + let ready = this.edge==='both' ? true : this.pull==='down' ? !state : !!state // ready is always true for 'both' + this.emit('ready',ready) + if (packet) { + packet.pin = this.pin_num + packet.state = state + if (this.edge !=='both') packet.ready = ready + return packet } - else res = {level:'info', reset:false, ready:true, msg:`pin ${this.pin_num} interrupt was ready, no action taken`} + return ready } - this.emit('log',res) - return res - } - // use hook to do more processing - async _interruptProcess(value,err) { - let packet = Object.assign({},this.packet) - packet.id = this.id - packet.pin = this.pin_num - packet.error = err - packet.state = value // state of gpio pin - packet.count = this.count - packet.timeStamp = Date.now() - packet.dateTime = new Date().toString() - if (this._hookFunc) packet = await this._hookFunc.call(this,packet) - this.emit('log',{packet: packet, msg:`interrupt tripped for pin ${this.pin_num}, emit/send packet to all listening`}) - this.emit('interrupt',packet) // emit locally - this.send(packet) // no need to await reply - } + async intervalReset(packet) { + let interval = typeof packet === 'number'? packet : (packet || {}).interval + if (!interval || interval<=0) { + clearInterval(this._intervalReset) + this._intervalReset = null + } + else { + this._intervalReset = setInterval(this.reset.bind(this), interval*1000) + } + return this._intervalReset ? true : false + } - // replace default processor function arguments are value of pin and any error - registerProcessor(func) { - this._interruptProcess = func - } - // hook into default interrupt processor for custom processing including packet modification - registerHook(func) { - if (func) this._hookFunc = func - else this._hookFunc=defaultHook - } + // remote reset + async reset(packet={}) { + let res = {} + this.emit('log',{level:'info', msg:`interrupt reset request for pin ${this.pin_num}`}) + if (this.edge ==='both') res = {level:'info', reset:false, ready:true, msg:'interrupt triggered on rising and falling, no reset action needed'} + else { + if(!await this.status()) { + delete packet._header + this.emit('reset',packet) // emit locally, then instance can listen and take custome action (e.g. send reset to mcp) + packet.cmd = this.resetCmd + let state = await this.status() + res = {level:state?'debug':'error', msg:`attempted interrupt reset of pin ${this.pin_num} ${state ? 'succeeded' : 'failed'}`, reset:true, ready:state} + } + else res = {level:'info', reset:false, ready:true, msg:`pin ${this.pin_num} interrupt was ready, no action taken`} + } + this.emit('log',res) + return res + } + // creates an interrupt event packet and emits/sends it + // use hook to do more processing + async _interruptProcess(value,err) { + let packet = Object.assign({},this.packet) + packet.id = this.id + packet.pin = this.pin_num + packet.error = err + packet.state = value // state of gpio pin + packet.count = this.count + packet.timeStamp = Date.now() + packet.dateTime = new Date().toString() + if (this._hookFunc) packet = await this._hookFunc.call(this,packet) + this.emit('log',{level:'interrupt', packet: packet, msg:`interrupt tripped for pin ${this.pin_num}`}) + this.emit('interrupt',packet) // emit locally + this.push(packet) // no need to await reply, push to any connected consumer by default + } -} // end Class + // replace default processor function arguments are value of pin and any error + registerProcessor(func) { + this._interruptProcess = func + } + // hook into default interrupt processor for custom processing including packet modification + registerHook(func) { + if (func) this._hookFunc = func + else this._hookFunc=defaultHook + } -export default Interrupt + } // end Class + + return new Interrupt(pin,opts) +} + +export default ExtendsInterrupt +export { ExtendsInterrupt as Interrupt } //default hook diff --git a/src/interrupts.js b/src/interrupts.js index b738b84..19ae180 100644 --- a/src/interrupts.js +++ b/src/interrupts.js @@ -26,6 +26,7 @@ class Interrupts extends Base { // remove per pin opts and store pinopts[pin] = Object.assign({}, opts[pin]) delete opts[pin] + pinopts[pin].multiple=true // each pin will only extend a simple emitter, uci-base }) pins.forEach(pin => { pinopts[pin] = Object.assign({}, opts, pinopts[pin]) @@ -33,35 +34,28 @@ class Interrupts extends Base { log.debug({ opts: pinopts[pin], method:'constructor', line:25, msg:`pin options for pin ${pin}`}) this._interrupts.set(pin, new Interrupt(pin, pinopts[pin])) - // TODO add socket and add methods to namespace if set - // bubble up events from single interrupts - const EVENTS=['log','connection:consumer', 'connection:socket'] // that should emit up from pin base + const EVENTS=['log','interrupt','reset', 'ready'] // that should emit up from pin base EVENTS.forEach(event => { this.interrupt(pin).on(event, obj => { - if (Object.prototype.toString.call(obj) !== '[object Object]') { - let data=obj - obj = {} - obj.data = data + if (event==='ready') this.emit('ready:'+pin,obj) + else { + if (Object.prototype.toString.call(obj) !== '[object Object]') { // is plain object + let data=obj + obj = {} + obj.data = data + } + obj.pin = pin + obj.id = pinopts[pin].id + this.emit(event,obj) // will emit a pin specific event } - obj.pin = pin - obj.id = pinopts[pin].id - this.emit(event,obj) }) }) - }) - this._interrupts.forEach( inter => { - inter.on('interrupt', packet => { - this.emit('interrupt',packet) - }) - inter.on('reset', packet => { - this.emit('reset',packet) - }) - // overwrite single interrupt send - inter.send = async (packet) => { - return await this.send(packet) // send via common consumer - } + }) // end pins construct + + this.on('interrupt', async ev=>{ + await this.push(ev) // push interrupt to any connected consumers }) } // end constructor @@ -69,7 +63,7 @@ class Interrupts extends Base { interrupt(pin) { return this._interrupts.get(Number(pin)) } // get a handle to single interrupt async init() { - super.init() + await super.init() return Promise.all( Array.from(this._interrupts).map(inter => { return inter[1].init() @@ -77,17 +71,7 @@ class Interrupts extends Base { ) } - // only adds consumer sockets to each interrupt to same socket/server - // alternatively use listen handler and single socket - async addInterSocket(name,type) { - if (type !=='s') { - this._interrupts.forEach( inter => { - inter.registerSocket(...arguments) - }) - } - } - - registerHook(func) { + registerHook(func) { // same hook for each pin this._interrupts.forEach(inter => { inter.registerHook(func) }) @@ -97,6 +81,7 @@ class Interrupts extends Base { export default Interrupts +// combines indiviudal pin function calls into single call and combines returns async function makefunc(fn, packet={}) { if (!packet.pin || packet.pin==='all') { let rpacket = {pins:[]}