From 5c57e614e97bb77d288a33ac86721fdebe795cc3 Mon Sep 17 00:00:00 2001 From: David Kebler Date: Tue, 24 Mar 2020 14:50:50 -0700 Subject: [PATCH] 0.3.1 mcp230xxi rework added default consumer socket to interrupt process, named pipe by default added commands.interrupt to consumer namespace so pushed packets from interrupt process will be handled. interrupt process could still 'send' the packet if that is prefered but command would be prefaced with 'interrupt.' Fixed the interrupt:reset ready observer to it will get the interrupt ready state correctly. ready state after reset is sent in outgoing packet so action can be taken if something goes wrong. creates an iterrupt name for observers based on passed opts.interrupt.name or 'interrupt' by default. Improved switches example that uses options read from yaml file. Desgined to work with multi intererupt example and i2cbus example. --- examples/switches.js | 151 +++++++++++++++++++++++++++++++------------ package.json | 16 +++-- src/mcp230xxi.js | 85 ++++++++++++++---------- 3 files changed, 172 insertions(+), 80 deletions(-) diff --git a/examples/switches.js b/examples/switches.js index 34410de..a377c9f 100644 --- a/examples/switches.js +++ b/examples/switches.js @@ -1,64 +1,133 @@ -/* -* -* -*/ -import { MCP230XXi } from '../src' +import { readFile as read } from 'fs-read-data' +import onDeath from 'ondeath' -const HOST = process.env.BUS_HOST -const PATH = process.env.BUS_PATH -const PORT = process.env.BUS_PORT || 1776 -const ADDRESS = process.env.DEVICE_ADDR || 0x26 -const CHIP17 = !!process.env.CHIP17 || true -const LISTEN_PORT = process.env.PORT || 9001 +import { MCP230XXi } from '../src' -let switches = new MCP230XXi([9,10],{id:'switches', chip17:CHIP17, path:PATH, host:HOST, port:PORT, address:ADDRESS, iport:LISTEN_PORT}) +let options = {} -// switches.registerSocket('interrupt','c','t',{host:'switchesd.local', port:1777}) - -; + ; (async () => { - if (process.env.VERBOSE==='true') { + options = await read('./examples/switches.yaml') - switches.on('log', async log => { - if (log.level!=='trace') { - console.log( - 'LOG:',log.level,':', - log.msg, - 'socket:',log.socketName, - ) - if (log.packet) console.dir(log.packet) + options.id = options.id || options.name || 'switches' + + // console.log('----------------switches start options--------\n',options,'\n---------------------------------') + // console.log('----------------switches interrupt options--------\n',options.interrupt,'\n---------------------------------') + + let switches = new MCP230XXi(options) + + // TODO make this part of UCI logger + if ((process.env.UCI_ENV || '').includes('dev')) { + + switches.on('log',ev => { + switch (ev.level) { + // case 'warning': + // case 'error': + // case 'testing': + // case 'ready': + // case 'state': + case 'fatal': + // case 'mcp': + // case 'info': + // case 'debug': + // case 'interrupt': + console.log(ev.level.toUpperCase(),'\n',ev) + break } }) - switches.on('connection:socket', async ev => { - console.log('connection event: outbound > ', ev.state,'to socket',ev.socketName)// if (ev.state ==='connected') switches.duplex = true + process.once('SIGUSR2', async () => { + console.log('shutting down nodemon') + await shutdown.call(switches) + process.kill(process.pid, 'SIGUSR2') }) - switches.on('connection:consumer', async ev => { - // console.log(ev) - console.log('connection event: inbound >', ev.state,'from consumer',ev.name)// if (ev.state ==='connected') switches.duplex = true + } // dev + + else { + onDeath( async () => { + console.log('\nswitches plugin device, \nHe\'s dead Jim') + await shutdown.call(switches) + console.log('shutdown done') }) + } + + // uncomment to see connection events + // switches.on('connection:socket', async ev => { + // console.log('connection event: outbound > ', ev.state,'to socket',ev.socketName)// if (ev.state ==='connected') switches.duplex = true + // }) + + // switches.on('connection:consumer', async ev => { + // console.log('connection event: inbound >', ev.state,'from consumer',ev.name, ev.data)// if (ev.state ==='connected') switches.duplex = true + // }) + + + if (options.interrupt) { + console.log('interrupt processor registered') + switches.registerInterruptProcessor(iprocessor) // register interrupt processor from below } - console.log('ready observers for switches', switches.ready.observerNames) + switches.amendConsumerCommands( + { + reply: async function (packet) { // silence end point send command to bus + if (!packet.error) { + // console.log('reply processor', packet) + // if (packet._header.path.includes('i2c-bus')) { + // if (packet.args.cmd ===24 || packet.args.cmd===8) console.log('=== interrupt reset at i2c bus===') + // } else console.log('reply back from master socket', packet) + } else console.log('processing error at socket', packet) + } + } + ) - switches.ready.addObserverDetails('bus',{msg:'this is some details related to bus observer'}) + await switches.init() + console.log('after init',switches.ready.observers,switches.ready.state) - switches.ready.subscribe(x=>{ - console.log('=====================switches ready================?',x ? 'YES':'NO') - console.log('what has failed: ',switches.ready.failure,' details:', switches.ready.details.get(switches.ready.failure)||'none') - }) + switches.ready.all.subscribe( + function (ready) { + if (!ready) { + console.log(options.name, 'YIKES! some observer is still not reporting ready') + let failed = this.ready.state.filter(obs=> obs[1]===false).map(obs=>[obs[0],this.ready.getObserverDetails(obs)]) + console.log('those that have failed\n',failed) + // notifiy here + } else { + console.log(options.name, ': This switches Hardware is...ONLINE!!!!') + // console.log('state\n,',this.ready.state) + // here is where to continue with startup /circuits reset + } + }.bind(switches)) - // this.ready.subscribe('interrupt:connected',(res)=>console.log('interrupt connected............',res)) - // this.ready.subscribe('mcp',(res)=>console.log('mcp............',res)) - // this.ready.subscribe('interrupt:reset',(res)=>console.log('interrupt reset............',res)) + // setInterval(function () { + // let failed = this.ready.state.filter(obs=> obs[1]===false).map(obs=>[obs[0],this.ready.getObserverDetails(obs)]) + // console.log('those that have failed\n',failed) + // }.bind(switches),5000) - let res = await switches.socketsInit() - if (res.errors) console.log(res) + // pins are outputs by default or all interrupt pulldown if interrupt(s) being used. + // set customized pin configurations here or later via master controller and database })().catch(err => { - console.error('FATAL: UNABLE TO START SYSTEM!\n',err) + console.log('FATAL: UNABLE TO START switches - KILLING FOR OS RESTART !\n',err, options) + process.exitCode = 1 + process.kill(process.pid, 'SIGINT') }) + +function iprocessor (packet) { + packet.hardware = options.id // id needs to be the _id of the switches device in the database + packet.cmd = 'switches.triggered' + console.log('interrupt processor, could send on to another controller process') + console.dir(packet) +} + +async function shutdown (){ + let names = Object.keys(this.getSocket()) + console.log(names) + for (let name of names) { + console.log(name) + console.log(await this.removeSocket(name)) + console.log('after remove', name) + } + this.removeAllListeners() +} diff --git a/package.json b/package.json index 5766fad..cfc0df2 100644 --- a/package.json +++ b/package.json @@ -1,17 +1,19 @@ { "name": "@uci/mcp", "main": "src", - "version": "0.3.0", + "version": "0.3.1", "description": "Classes and Helper Functions for using the MCP chip on I2C Bus", "scripts": { "outputs": "./node_modules/.bin/nodemon -r esm --preserve-symlinks examples/outputs", - "switches": "./node_modules/.bin/nodemon -r esm --preserve-symlinks examples/switches", + "switches": "node -r esm -preserve-symlinks examples/switches", + "switches:dev": "UCI_ENV=dev ESM_DISABLE_CACHE=true ./node_modules/.bin/nodemon -r esm --preserve-symlinks examples/switches", "swrl": "UCI_ENV=dev node -r esm examples/mcp-switch-relay", "swr": "node -r esm examples/mcp-switch-relay", "test": "./node_modules/.bin/mocha --reporter list --timeout 30000", "testw": "./node_modules/.bin/mocha --reporter list -- watch --timeout 30000", "sample": "node demo/sample.js", - "inter": "sudo node -r esm examples/interrupt" + "inter": "sudo node -r esm examples/interrupt", + "yalcu": "./node_modules/.bin/nodemon --watch /home/sysadmin/.yalc/**/*.js --exec /opt/node-global-apps/bin/yalc update" }, "author": "David Kebler", "license": "MIT", @@ -31,12 +33,14 @@ "dependencies": { "@uci-utils/byte": "^0.2.3", "@uci-utils/logger": "0.0.16", - "@uci/i2c-device": "^0.3.0" + "@uci/i2c-device": "^0.3.1" }, "devDependencies": { "chai": "^4.2.0", "esm": "^3.2.25", - "mocha": "^7.1.0", - "nodemon": "^2.0.2" + "fs-read-data": "^1.0.4", + "mocha": "^7.1.1", + "nodemon": "^2.0.2", + "ondeath": "^1.0.0" } } diff --git a/src/mcp230xxi.js b/src/mcp230xxi.js index ab1bed6..c501231 100644 --- a/src/mcp230xxi.js +++ b/src/mcp230xxi.js @@ -1,9 +1,6 @@ import {MCP230XX, map, changed, isPlainObject, to, merge} from './mcp230xx' import { byteFormat } from '@uci-utils/byte' -// import Ready from '@uci-utils/ready' -import logger from '@uci-utils/logger' -let log = {} class MCP230XXi extends MCP230XX { constructor(pins, options={}) { @@ -12,14 +9,27 @@ class MCP230XXi extends MCP230XX { if (opts.interrupt) delete opts.interrupt.pins // if .interrupt was passed then .pins must be removed opts.lport = opts.lport || (opts.interrupt || {}).port || opts.iport if (!opts.lport) opts.lpath = opts.lpath || (opts.interrupt || {}).ipath || opts.ipath || 'interrupt' // must has a socket litener for interrupt process + // a socket option of name `interrupt` MUST be supplied or the default below will be used which assume only a named pipe of `interrupt` + const interrupt = { + name: 'interrupt', + type: 'c', + transport: 'n', + options: + { name: 'mcpi', + data: { name: 'mcpi', dep: 'interrupt' }, + initTimeout: 0, + retryWait: 5, + path: 'interrupt' + } + } + // set the required interrupt consumer to the default options if none supplied + if (!opts.sockets) opts.sockets = [ interrupt ] + else if (!opts.sockets.find(socket=>socket.name==='interrupt')) { + opts.sockets.push(interrupt) + } super(opts) + this.pinsCfg = opts.pinsCfg || this.chip17 ? [{port:'A', pins:'all', cfg:'input_interrupt'},{port:'B', pins:'all', cfg:'input_interrupt'}] : [{pins:'all', cfg:'input_interrupt'}] - log = logger({ - file: 'src/mcp230xxi.js', - class: 'MCP230XXi', - name: 'mcp-interrupt', - id: this.id - }) pins.forEach((pin,index) => { this[pin] = opts['i' + pin] || {} @@ -27,28 +37,33 @@ class MCP230XXi extends MCP230XX { }) this.pins = pins this.commands.interrupt = this.bindFuncs(icommands) // add interrupt pin commands to base set in "command.js" + this.addNamespace('commands.interrupt', 'c') // by default allow access to interrupt commands via pushed packet (gpio interrupt) + // they will be available via send as well to socket as interrupt. this._interruptProcess = process.bind(this) // default processor + this.iname = (opts.interrupt || {}).name || 'interrupt' // name of interrupt consumer - this.ready.addObserver(`${opts.interrupt.name}:process`) // will emit on received interrupt process ready packet - let interruptobs = this.ready.combineObservers(`${opts.interrupt.name}`,['mcp:configure',this.getSocket(opts.interrupt.name).obsName,`${opts.interrupt.name}:process`]) + this.ready.addObserver(`${this.iname}:process`) // will emit on received interrupt process ready packet + + // ready to reset interrupt when mcp is ready, the interrupt process is connected and it up + let interruptobs = this.ready.combineObservers(this.iname,['mcp:configure',this.getSocket(this.iname).obsName,`${this.iname}:process`]) this.ready.addObserver('interrupt:reset',interruptobs .pipe( map(async ready => { if (ready) { - let imcpready = await this.resetInterrupt() - if (imcpready){ - let res = await this.send(opts.interrupt.name,{cmd:'status'}) - ready = res.ready || res.pins.reduce((acc,pin)=>acc&&pin.ready,true) - } - + await this.resetInterrupt() + const istatus = await this.send(this.iname,{cmd:'status'}) // get the status from the gpio interrupt + if (istatus.pins) ready = istatus.pins.reduce((acc,pin)=>acc&&(pin.ready!=null?pin.ready :false),true) + else ready = istatus.ready + if (!ready) this.emit('log',{level:'fatal', msg:'gpio interrrupts were not initially reset',status:istatus}) } return ready }) //map )) - let socket = this.getSocket(`${options.name}:${options.interrupt.remote ? 't':'n'}`) // mcp t or n socket was created - this.consumerConnected(socket,{add:true,consumer:options.interrupt.name}) + // get listening socket and watch for incoming connection from interrupt process + let socket = this.getSocket(`${options.name}:${options.interrupt.remote ? 't':'n'}`) || this.getSocket(options.name)// mcp t or n socket was created + this.consumerConnected(socket,{add:true,consumer:this.iname}) } // end constructor @@ -61,12 +76,10 @@ class MCP230XXi extends MCP230XX { async resetInterrupt(pins) { if (!Array.isArray(pins)) pins = pins ? [pins] : this.pins - this._ireset = (await Promise.all(pins.map(async pin => { + return (await Promise.all(pins.map(async pin => { await this.bus.read(this.getPort(pin) !== 'B' ? 0x08 : 0x18) // 0x08 is intcap interrupt capture register return(await this.interruptState(pin)) - }) - )).reduce((res,val) => res && val) - return this._ireset + }))).reduce((res,val) => res && val,true) } getPort(pin) { @@ -112,28 +125,34 @@ const icommands = { }, // finds which mcp pin caused the interrupt find: async function(inter) { + let reply = {} + // this.emit('log',{level:'debug', msg:'incoming interrupt', packet:inter}) let ipin = inter.ipin || inter.pin if (!await this.interruptState(ipin)) { // if it's already reset then this is false trip - let res = await this.commands.pin.status({ pins: 'all', reg: 'intf', port:this.getPort(ipin) }) // read port interrupt status - let status = await this.resetInterrupt(ipin) // now reset + const res = await this.commands.pin.status({ pins: 'all', reg: 'intf', port:this.getPort(ipin) }) // read port interrupt status + const status = await this.resetInterrupt(ipin) // now reset + let istatus = await this.send(this.iname,{cmd:'status'}) // get the status from the gpio interrupt + if (istatus.ready==null && istatus.pins) istatus.ready = !!istatus.pins.find(el => el.pin === ipin).ready let pin = byteFormat(res.port||[], { in: 'ARY', out: 'PLC' })[0] if (!pin) { - log.warn({cmd:'interrupt.find', line:132, inter:inter, msg:'no pin associated with interrupt'}) - return { error: 'no pin associated with interrupt' } + this.emit('log',{level:'warn', cmd:inter.cmd, inter:inter, msg:'no pin associated with interrupt'}) + reply = { cmd:'error', error: 'no pin associated with interrupt' } } - else { // avoid bad interrupt (i.e. no pin caused interrupt) + else { // avoida bad interrupt (i.e. no pin caused interrupt) delete inter.cmd; delete inter._header inter.ipin = ipin inter.pin = pin inter.port = this[ipin].mport - inter.interrupt_ready = status + inter.mpc_interrupt_reset = status + inter.reset = istatus.ready inter.state = (await this.commands.pin.status({pin:pin, port:this.getPort(ipin)})).state inter.closed = inter.state === 'on' ? true : false this._interruptProcess(inter) - // replying with reset command which is also a check - return {cmd:'reset', pin:ipin} + reply = {cmd:'reply', msg:'mcp interrupt pin was found', inter:inter} } - } - } + if (!istatus.ready) this.emit('log',{level:'fatal', msg:'gpio interrrupt was not reset after firing',interrupt:inter}) + } else reply = { cmd:'reply', msg:'interrupt was already reset nothing to find/do' } + return reply + } // end find } // end commands