diff --git a/.eslintrc.js b/.eslintrc.js index 2bed546..6465c7c 100644 --- a/.eslintrc.js +++ b/.eslintrc.js @@ -10,7 +10,7 @@ module.exports = { "mocha": true }, "parserOptions": { - "ecmaVersion": 2017, + "ecmaVersion": 2017, // 10=2019 "sourceType": "module" }, "extends": "eslint:recommended", diff --git a/examples/client.js b/examples/client.js deleted file mode 100644 index a07c498..0000000 --- a/examples/client.js +++ /dev/null @@ -1,47 +0,0 @@ -import Base from '@uci/base' - -// const HOST = 'localhost' -const HOST = process.env.HOST || 'localhost' -const PORT = process.env.PORT // default 8080 -let processor = new Base({ id:'remote-interrupt-processor', useRootNS:true}) - -processor.interrupt = async function (packet) { - return new Promise((resolve) => { - console.log('interrupt occured') - console.dir(packet) - resolve({status: 'processed'}) - }) -} - -processor.reply = async function (packet) { - return new Promise((resolve) => { - console.log('reply from interrupt for request', packet._header.request) - console.dir(packet) - resolve({status: 'processed'}) - }) -} - -processor.on('status', ev => { - console.log(`STATUS: **${ev.level}** ${ev.msg}`)} -) - -processor.on('consumer-connection', ev => { - console.log(`client ${ev.name} was ${ev.state}`) -}) - -processor.on('reconnected', client => { - console.log('client reconnected:', client) -}) -; -(async () => { - - // processor.addSocket('inter','c','t', {host:HOST, port:PORT}) - processor.addSocket('inter','c','t',{host:HOST, port:PORT}) - await processor.init() - console.log('====sending fire command to interrupt===') - await processor.send({cmd: 'fire'}) - // process.kill(process.pid, 'SIGTERM') - -})().catch(err => { - console.error('FATAL: UNABLE TO START SYSTEM!\n',err) -}) diff --git a/examples/multi.js b/examples/multi.js index 7435159..73c5ba0 100644 --- a/examples/multi.js +++ b/examples/multi.js @@ -2,44 +2,44 @@ import Interrupts from '../src/interrupts' const PINS = [4] -let interrupts = new Interrupts(PINS,{id:'multi-interrupt-example', resetInterval:1, resetEnabled:false, 4:{name:'mybutton'} }) +const EDGE = process.env.EDGE || 'both' +const REMOTE_HOST = process.env.REMOTE_HOST || 'sbc' +const REMOTE_PORT = process.env.REMOTE_PORT || 9000 +const PORT = process.env.PORT || 9004 -let hook = function (packet) -{ - packet.cmd = 'interrupt.find' - return packet -} +let interrupts = new Interrupts(PINS,{id:'multi-interrupt-example', resetInterval:1, resetEnabled:false, edge:EDGE, 4:{name:'mybutton'} }) + +// let hook = function (packet) +// { +// packet.cmd = 'interrupt.find' +// return packet +// } // interrupts.registerHook(hook) -interrupts.on('status', ev => { - console.log(`STATUS:'--${ev.level}--" ${ev.msg}`)} +interrupts.on('log', ev => { + console.log(`LOG:'--${ev.level}--" ${ev.msg}`)} ) -interrupts.on('consumer-connection', ev => { - console.log(`client ${ev.name} was ${ev.state}`) + +interrupts.on('connection:socket', ev => { + // console.dir(ev) + console.log(`connected to remote socket ${ev.socketName}: ${ev.state}`) }) -interrupts.listen(function (packet) { - console.log(`============== ${this.id}=========`) - console.log(`emitted packet from interrupt ${packet.id}, pin:${packet.pin}`) - console.dir(packet) - this.push(packet) - console.log('------------------------') -}) - - -interrupts.listenReset(function (packet) { - console.log(`============== ${this.id}=========`) - console.log('an interrupt reset request emitted') - console.dir(packet) - console.log('------------------------') -}) +// +// interrupts.listenReset(function (packet) { +// console.log(`============== ${this.id}=========`) +// console.log('an interrupt reset request emitted') +// console.dir(packet) +// console.log('------------------------') +// }) ; (async () => { - interrupts.addSocket('server','s','t') + interrupts.registerSocket('inter','c','t',{host:REMOTE_HOST, port:REMOTE_PORT}) + interrupts.registerSocket('inters','s','t',{port:PORT}) await interrupts.init() - // interrupts.fire() + await interrupts.socketsInit() })().catch(err => { console.error('FATAL: UNABLE TO START SYSTEM!\n',err) diff --git a/examples/remote.js b/examples/remote.js new file mode 100644 index 0000000..4227cbb --- /dev/null +++ b/examples/remote.js @@ -0,0 +1,61 @@ +// a remove interface for an interrrupt +// starts a socket on 9000 by default to receive interrupt events +// start a consumer on 9004 to send commends to gpio interrupt + +import Base from '@uci/base' + +const INTER_HOST = process.env.INTER_HOST || 'sbc' +const INTER_PORT = process.env.INTER_PORT || 9004 +const PORT = process.env.PORT || 9000 +let processor = new Base({ id:'remote-interrupt-processor', useRootNS:true}) + +processor.interrupt = async function (packet) { + return new Promise(async (resolve) => { + console.log('=============interrupt occured =======================') + delete packet._header + console.dir({pin:packet.pin,state:packet.state}) + let status = await this.send({cmd:'status'}) + // console.log('check status', status) + console.log('confirmed state', packet.state, (packet.state=== status[packet.pin] || packet.state===status.state) ) + console.log('======================================') + resolve({processed:true}) + // console.log('confirm state', (await this.send({cmd:'status'})).state===packet.state) + }) +} + +processor.reply = async function (packet) { + return new Promise((resolve) => { + // console.log('reply processor from interrupt socket for request', packet._header.request) + // console.dir(packet) + resolve(packet) + }) +} + + +processor.on('connection:socket', async ev => { + console.log(`============connection event to remote socket========== ${ev.socketName}: ${ev.state}`) + if (ev.state ==='connected') { + console.log('============ test by sending fire,status,reset commands ==========') + console.log(await processor.send({cmd:'reset'})) + console.log(await processor.send({cmd:'status'})) + console.log(await processor.send({cmd:'fire'})) + } +}) + +processor.on('connection:consumer', ev => { + console.log(`==consumer connection===:',${ev.name}:${ev.state}`) + if (ev.state ==='connected') { + console.log('====================ready to receive and process a remote interrupt==================') + } +}) + +; +(async () => { + + processor.addSocket('inter','c','t',{host:INTER_HOST, port:INTER_PORT,initTimeout:60}) + processor.addSocket('inters','s','t',{port:PORT}) + await processor.socketsInit() + +})().catch(err => { + console.error('FATAL: UNABLE TO START SYSTEM!\n',err) +}) diff --git a/examples/single.js b/examples/single.js index 8159ed6..8be6f2a 100644 --- a/examples/single.js +++ b/examples/single.js @@ -4,11 +4,16 @@ const delay = time => new Promise(res=>setTimeout(()=>res(),time)) const IPIN = process.env.IPIN || 4 -let interrupt = new Interrupt(IPIN,{id:'test-interrupt', wait:0, edge:'rising', resetInterval:1, reset:true}) +const EDGE = process.env.EDGE || 'both' +const REMOTE_HOST = process.env.REMOTE_HOST || 'sbc' +const REMOTE_PORT = process.env.REMOTE_PORT || 9000 +const PORT = process.env.PORT || 9000 + IPIN + +let interrupt = new Interrupt(IPIN,{id:'test-interrupt', wait:0, edge:EDGE, resetInterval:1, reset:true}) interrupt.on('interrupt', packet => { console.log('event: interrupt fired for',interrupt.pin_num) - console.log('count:', packet.count, 'state:',packet.state, 'cmd:',packet.cmd, 'data:',packet.data) + console.log('count:', packet.count, ', state:',packet.state, ', cmd:',packet.cmd, ', data:',packet.data) }) interrupt.on('interrupt.reset', packet => { @@ -19,19 +24,16 @@ interrupt.on('interrupt.reset', packet => { ; (async () => { - await interrupt.init() - console.log('interrupt ready and waiting') - console.log('manual fire of interrupt with default hook') - interrupt.fire() - console.log('manual fire of interrupt via after changing hook') - interrupt.registerHook((packet) => { - packet.data='some hook added data' - console.log('custom hook data prop added:', packet.data) + packet.data='a custom property added to packet via hook' + // console.log('custom hook data prop added:', packet.data) return packet }) - - interrupt.fire() + await interrupt.init() + interrupt.addSocket('inter','c','t',{host:REMOTE_HOST, port:REMOTE_PORT}) + interrupt.addSocket('inters','s','t',{port:PORT}) + console.log(await interrupt.socketsInit()) + console.log('interrupt ready and waiting') })().catch(err => { diff --git a/examples/socket.js b/examples/socket.js index 2c2092c..9d0ea1c 100644 --- a/examples/socket.js +++ b/examples/socket.js @@ -1,9 +1,8 @@ import Base from '@uci/base' -// const HOST = 'localhost' -const HOST = process.env.HOST || 'sbc' -const PORT = process.env.PORT -let processor = new Base({sockets:'inter#c>t', inter:{host:HOST, port:PORT}, id:'interrupt-processor', useRootNS:true}) +const PORT = process.env.PORT || 9000 + +let processor = new Base({sockets:'inter#s>t', inter:{port:PORT}, id:'interrupt-processor', useRootNS:true}) processor.interrupt = async function (packet) { return new Promise((resolve) => { @@ -26,11 +25,7 @@ processor.reply = async function (packet) { (async () => { await processor.init() - console.log('====sending fire command to interrupt===') - await processor.send({cmd: 'fire'}) - console.log('====sending fire command to interrupt===') - await processor.send({cmd: 'fire'}) - // process.kill(process.pid, 'SIGTERM') + })().catch(err => { console.error('FATAL: UNABLE TO START SYSTEM!\n',err) diff --git a/package.json b/package.json index 8faad15..5c7247b 100644 --- a/package.json +++ b/package.json @@ -1,18 +1,18 @@ { "name": "@uci/interrupt", "main": "src", - "version": "0.2.22", + "version": "0.2.23", "description": "a class for adding interrupt processesing for gpio pins on Raspberry Pi and Similar SBCs", "scripts": { "single": "node -r esm examples/single", - "single:dev": "UCI_ENV=dev ./node_modules/.bin/nodemon -r esm examples/single", + "single:dev": "UCI_ENV=dev ./node_modules/.bin/nodemon -r esm --preserve-symlinks examples/single", "single:debug": "UCI_LOG_LEVEL=debug npm run single:dev", "multi": "node -r esm examples/multi", - "multi:dev": "UCI_ENV=dev ./node_modules/.bin/nodemon -r esm examples/multi", + "multi:dev": "UCI_ENV=dev ./node_modules/.bin/nodemon -r esm --preserve-symlinks examples/multi", "multi:debug": "UCI_LOG_LEVEL=debug npm run multi:dev", - "client": "node -r esm examples/client", - "client:dev": "UCI_ENV=dev ./node_modules/.bin/nodemon -r esm examples/client", - "client:debug": "UCI_LOG_LEVEL=debug npm run client:dev" + "remote": "node -r esm examples/remote", + "remote:dev": "UCI_ENV=dev ./node_modules/.bin/nodemon -r esm --preserve-symlinks examples/remote", + "remote:debug": "UCI_LOG_LEVEL=debug npm run remote:dev" }, "author": "David Kebler", "license": "MIT", diff --git a/src/interrupt.js b/src/interrupt.js index 60c370a..71947a0 100644 --- a/src/interrupt.js +++ b/src/interrupt.js @@ -9,6 +9,7 @@ let log = logger({package:'@uci/interrupt', file:'/src/interrupt.js'}) // for specific port number pass itrt.port,itrn.path,itrm.topic,itrw.port which override it if present // conPacket is for connecting consumers. This will send this conPacket command on connect, which may needed to initilize something on related hardware class Interrupt extends Base { + constructor(pin, opts = {}) { pin = Number(pin) // make sure pin is a number! super(opts) @@ -25,7 +26,7 @@ class Interrupt extends Base { // pull down/up (down is default) can't be set here it is done by in DTOs or in RPI in config.txt // this is only used to monitor the status of the interrupt this.pull = opts.pull || 'down' - this.ready = this.edge === 'both' ? true : false // true is interrupt is ready + this.__ready = this.edge === 'both' ? true : false // true is interrupt is ready this.pin = {} this.count = 0 this.packet = opts.packet || {} @@ -42,21 +43,18 @@ class Interrupt extends Base { } // end constructor async init() { - await super.init() this.count = 0 - // TODO devel mock versions for testing on other than sbc with gpios this.pin = new Gpio(this.pin_num, 'in', this.edge, { debounceTimeout:this.wait }) - if (this.resetEnabled) log.debug({msg:'initial connect interrupt reset packet sent', ready:await this.reset(), method:'init', line:53}) - if (this.resetInterval && this.resetEnabled) setInterval(this.reset.bind(this),this.resetInterval) + // if (this.resetEnabled) log.debug({msg:'initial connect interrupt reset packet sent', ready:await this.reset(), method:'init', line:53}) + // if (this.resetInterval && this.resetEnabled) setInterval(this.reset.bind(this),this.resetInterval) DeadJim( (signal,err) => { log.warn({signal:signal, method:'init', line:56, error:err, msg:'Interrupt process was killed, remove watchers, unexport'}) this.pin.unwatchAll() this.pin.unexport() // kill the kernel entry - }) this.pin.watch( function (err,value) { @@ -65,10 +63,7 @@ class Interrupt extends Base { this._interruptProcess(value,err) }.bind(this)) - this.on('reconnected', this.reset.bind(this)) - this.on('connected', this.reset.bind(this)) - - log.debug({msg:'new interrupt pin created and watching', method:'init', line: 62, pin_num:this.pin_num, state:await this.status(), ready:this.ready, edge:this.edge,debounce:this.wait}) + log.info({msg:'new interrupt pin created and watching', method:'init', line: 62, pin_num:this.pin_num, state:await this.status(), ready:this.__ready, edge:this.edge,debounce:this.wait}) } // end init @@ -86,11 +81,11 @@ class Interrupt extends Base { // returns true if pin is ready and waiting to trigger interrupt async status(packet) { let status = await this.pin.read() - if (this.edge !=='both') this.ready = this.pull==='down' ? !status : !!status // ready is always true for 'both' + if (this.edge !=='both') this.__ready = this.pull==='down' ? !status : !!status // ready is always true for 'both' if (packet) { packet.pin = this.pin_num packet.state = status - if (this.edge !=='both') packet.ready = this.ready + if (this.edge !=='both') packet.ready = this.__ready packet.cmd = 'reply' return packet } @@ -100,20 +95,19 @@ class Interrupt extends Base { async reset(packet) { let res = {} if (this.edge !=='both' && this.resetEnabled) { - if (!this.ready) { + if (!this.__ready) { let reset = Object.assign({},this.packet) reset.cmd = this.resetCmd this.emit(this.resetCmd,reset) // emit locally await this.send(reset) - await this.push(reset) await this.status() - log.error({msg: `interrupt was forced reset. ready now? ${this.ready}`}) - res = {cmd:'reply', msg:`attempted interrupt reset ${this.ready? 'succeeded' : 'failed'}`, reset:true, ready:this.ready} + log.error({msg: `interrupt was forced reset. ready now? ${this.__ready}`}) + res = {cmd:'reply', msg:`attempted interrupt reset ${this.__ready? 'succeeded' : 'failed'}`, reset:true, ready:this.__ready} } else res = {cmd:'reply', reset:false, ready:true, msg:'interrupt was ready, no action taken'} } else res = {cmd:'reply', reset:false, ready:true, msg:'reset NA or disabled'} if (packet) return Object.assign(packet,res) - return this.ready + return this.__ready } // use hook to do more processing @@ -127,12 +121,16 @@ class Interrupt extends Base { packet.timeStamp = Date.now() packet.dateTime = new Date().toString() if (this._hookFunc) packet = await this._hookFunc.call(this,packet) - log.debug({packet: packet, msg:'interrupt tripped, emit/send/push packet to all connected/listening'}) + log.debug({packet: packet, msg:'interrupt tripped, emit/send packet to all connected/listening'}) this.emit('interrupt',packet) // emit locally - this.send(packet) // will send a packet via client to any socket - this.push(packet) // will push on any socket + this.send(packet) // will send packet via client to any connected socket } + // replace default processor function arguments are value of pin and any error + registerProcessor(func) { + this._interruptProcess = func + } + // hook into default interrupt processor for custom processing including packet modification registerHook(func) { if (func) this._hookFunc = func else this._hookFunc=defaultHook @@ -147,7 +145,7 @@ export default Interrupt async function defaultHook(packet) { // return a promise or use await if anything async happens in hook // new Promise((resolve) => { - console.log('==========default hook =============') + console.log('==========example hook fucntion =============') console.log(`pin ${packet.pin} on sbc gpio bus has thrown an interrupt`) console.log('can change anything in the packet in this hook') console.log('to replace this use .registerHook(function)') diff --git a/src/interrupts.js b/src/interrupts.js index 2268829..0041062 100644 --- a/src/interrupts.js +++ b/src/interrupts.js @@ -7,14 +7,20 @@ let log = {} // will more easily create a group of sbc pin interrupts class Interrupts extends Base { + constructor(pins, opts = {}) { super(opts) this.id = this.id || 'interrupts' this.pins = pins.map(pin => Number(pin)) // make sure actual numbers are passed this._interrupts = new Map() - this._s = { fire:this.fire.bind(this)} // make fire available via consumer packet send this.resetCmd = opts.resetCmd || 'interrupt.reset' log = logger({ name: 'interrupts', id: this.id, package:'@uci/interrupt', file:'src/interrupts.js'}) + this.commands = { + fire:makefunc.bind(this,'fire'), + status:makefunc.bind(this,'status'), + reset:makefunc.bind(this,'reset'), + } + this.addNamespace('commands', 's') // give access to these commands above if a socket/server is created let pinopts = {} pins.forEach(pin => { // remove per pin opts and store @@ -27,22 +33,38 @@ class Interrupts extends Base { log.debug({ opts: pinopts[pin], method:'constructor', line:25, msg:`pin options for pin ${pin}`}) this._interrupts.set(pin, new Interrupt(pin, pinopts[pin])) - // bubble up events from single interrupts to common - const EVENTS = ['status','consumer-connection'] + // bubble up events from single interrupts + const EVENTS=['log','connection','connection:consumer', 'connection:socket'] // that should emit up from pin base EVENTS.forEach(event => { - this.interrupt(pin).on(event, data => { - data.interrupt = { msg:'emitted event from single interrupt', pin:pin, id:pinopts[pin].id } - this.emit(event,data) + this.interrupt(pin).on(event, obj => { + if (Object.prototype.toString.call(obj) !== '[object Object]') { + let data=obj + obj = {} + obj.data = data + } + obj.pin = pin + obj.id = pinopts[pin].id + this.emit(event,obj) }) }) }) - } - interrupt(pin) { return this._interrupts.get(Number(pin)) } + this._interrupts.forEach( inter => { + inter.on('interrupt', packet => { + this.send(packet) // send via common consumer + this.emit('interrupt',packet) + }) + inter.on(this.resetCmd, packet => { + this.send(packet) // send via common consumer + this.emit(this.resetCmd,packet) + }) + }) + + } // end constructor + + interrupt(pin) { return this._interrupts.get(Number(pin)) } // get a handle to single interrupt async init() { - let res = await super.init() - if (res.errors) return Promise.reject(res.errors) return Promise.all( Array.from(this._interrupts).map(inter => { return inter[1].init() @@ -50,46 +72,17 @@ class Interrupts extends Base { ) } - // combine all interrupt emits to one handler - async listen(fn) { - this._interrupts.forEach( inter => { - if (fn==='stop') inter.removeAllListeners(inter.packet.cmd) - else inter.on(inter.packet.cmd, fn.bind(this)) - }) - } - - async listenReset(fn) { - this._interrupts.forEach( inter => { - if (fn==='stop') inter.removeAllListeners(inter.resetCmd) - else inter.on(inter.resetCmd, fn.bind(this)) - }) - } // only adds consumer sockets to each interrupt to same socket/server // alternatively use listen handler and single socket async addInterSocket(name,type) { if (type !=='s') { - return Promise.all( - Array.from(this._interrupts).map(inter => { - return inter[1].addSocket(...arguments) - }) - ) + this._interrupts.forEach( inter => { + inter.registerSocket(...arguments) + }) } } - // manual firing of all pins for testing - async fire(packet={}) { - if (!packet.pin || packet.pin==='all') { - for (let inter of this._interrupts.entries()) { - packet[inter[0]] = await inter[1].fire({}) - } - packet.cmd='reply' - return packet - } - let pin = isNaN(Number(packet)) ? packet.pin : packet - if (this._interrupts.has(Number(pin))) return await this.interrupt(packet.pin).fire(packet) - } - registerHook(func) { this._interrupts.forEach(inter => { inter.registerHook(func) @@ -99,3 +92,16 @@ class Interrupts extends Base { } // end Class export default Interrupts + + +async function makefunc(fn, packet) { + if (!packet.pin || packet.pin==='all') { + for (let inter of this._interrupts.entries()) { + packet[inter[0]] = await inter[1][fn]() + } + packet.cmd='reply' + return packet + } + let pin = isNaN(Number(packet)) ? packet.pin : packet + if (this._interrupts.has(Number(pin))) return await this.interrupt(packet.pin)[fn](packet) +}