From 06dd1b97c09cac3b132b6f35b5b33608bd2b39f8 Mon Sep 17 00:00:00 2001 From: David Kebler Date: Wed, 11 Sep 2019 18:34:54 -0700 Subject: [PATCH] 0.1.22 refactor bus functions so they are generated from a common function. use the uci/base processing and namespaces add a queue for bus function calls rework all the examples --- .gitignore | 1 + .npmignore | 1 + examples/bus.js | 5 +- examples/client.js | 10 ++- examples/queue.js | 24 ++++++ examples/relays.js | 28 +++++-- examples/scan.js | 43 +++++++--- nodemon.json | 4 + package.json | 8 +- src/bus.js | 198 +++++++++++++++++++++------------------------ 10 files changed, 199 insertions(+), 123 deletions(-) create mode 100644 examples/queue.js create mode 100644 nodemon.json diff --git a/.gitignore b/.gitignore index ba05da9..aab1692 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ /node_modules/ *.lock +/archive/ diff --git a/.npmignore b/.npmignore index 528897c..d276448 100644 --- a/.npmignore +++ b/.npmignore @@ -1,3 +1,4 @@ +archive/ tests/ test/ *.test.js diff --git a/examples/bus.js b/examples/bus.js index 79967d7..1156efd 100644 --- a/examples/bus.js +++ b/examples/bus.js @@ -5,10 +5,13 @@ import Bus from '../src/bus' // can use SOCKETS_DIR='' env variable to get a 'i2c-bus.sock' in a particular directory // e.g. SOCKETS_DIR=/opt/sockets node -r @std/esm i2cbus + +const CONCURRENCY = 1 + ; (async () => { - let i2cbus = new Bus({id:'i2c-busx',tcp:true}) + let i2cbus = new Bus({id:'i2c-busx',tcp:true, concurrency:CONCURRENCY}) await i2cbus.init() diff --git a/examples/client.js b/examples/client.js index 523f3b1..a42a24d 100644 --- a/examples/client.js +++ b/examples/client.js @@ -7,7 +7,6 @@ const DEVICE = (process.env.BUS_DEVICE || 'scan') const TRANSPORT = (process.env.TRANSPORT || 'tcp') import Base from '@uci/base' -// import {test, reply } from './relays' const HOST = (process.env.BUS_HOST || 'sbc') const PORT = (process.env.BUS_PORT || 1776) @@ -17,11 +16,18 @@ let options = {id:'i2c-client', useRootNS:true} ; (async () => { // import not supported by eslint, but esm does so ignore parsing error - let {test,reply} = await import(`./${DEVICE}`) + let {test,reply,error} = await import(`./${DEVICE}`) let client = new Base(options) if (TRANSPORT==='tcp') client.addSocket('tcp','c','t',{host:HOST, port:1776}) else client.addSocket('np','c','n',{path:'i2c-bus'}) client.reply = reply // add reply processor + client.error = error + client.on('pushed', packet => { + if (packet.error) { + console.log('=======pushed error received=======\n', packet.error) + console.log('==================') + } + }) await client.init() await test.call(client,ADDRESS) process.kill(process.pid, 'SIGTERM') diff --git a/examples/queue.js b/examples/queue.js new file mode 100644 index 0000000..0be38b5 --- /dev/null +++ b/examples/queue.js @@ -0,0 +1,24 @@ +const delay = time => new Promise(res=>setTimeout(()=>res(),time)) + +export async function error (packet) { + // if (packet.error) { + console.log('======== error during socket server processing') + console.log(packet.error) + console.log('==========') + // } +} + +export async function reply (packet) { + console.log('reply done with', packet.count) +} + +export async function test (address) { + console.log('============= Queue Stress Test ============') + let packet = {cmd:'write', args:{address:address,cmd: 9, byte:1} } + for (let i = 0; i < 100 ; i++) { + packet = {cmd:'scan', count:i} + console.log('queued:', packet.count) + this.send({cmd:'scan', count:i}) + } + await delay(10000) +} diff --git a/examples/relays.js b/examples/relays.js index ac02284..d02c3d9 100644 --- a/examples/relays.js +++ b/examples/relays.js @@ -1,16 +1,32 @@ const delay = time => new Promise(res=>setTimeout(()=>res(),time)) +export async function error (packet) { + console.log('======== error during socket/server processing') + if(packet.msg) console.log(packet.msg) + if(packet.error) console.log(packet.error) + if(packet._header.request) { + console.log('sent request was') + console.dir(packet._header.request) + } + console.log('==========') +} + export function reply (packet) { let req = packet._header.request - console.log(`response from relays for ${req.cmd}:`,req.args, `was ${packet.response}`) + if (req.cmd !=='busFuncs') console.log(`response from relays for ${req.cmd}:`,req.args, `was ${packet.response}`) + else { + console.log('==========Available Bus Functions=====') + console.dir(packet.functions) + console.log('===============') + } } export async function test (address) { let packet console.log('=============sending packets for i2c mcp23008 relay device ============') - console.log('setting ioddir') - packet = {cmd:'write', args:{address:address,cmd: 0, byte:0} } + console.log('setting all pins to outputs') + packet = {cmd:'write', args:{address:address,cmd: 0} } console.dir(packet) await this.send(packet) packet = {cmd:'read', args:{address:address ,cmd:0} } @@ -23,11 +39,11 @@ export async function test (address) { packet = {cmd:'write', args:{address:address,cmd: 9, byte:byte} } console.log(`==== relay ${i+1} on with byte: ${byte} ===`) console.dir(packet) - await this.send(packet) + this.send(packet) packet = {cmd:'read', args:{address:address ,cmd:9} } console.dir(packet) - await this.send(packet) - await delay(1000) + this.send(packet) + // await delay(1000) } console.log('========= done each relay, clear (off) this ============') packet = {cmd:'write', args:{address:address,cmd: 9, byte:0} } diff --git a/examples/scan.js b/examples/scan.js index 6a773bc..599e841 100644 --- a/examples/scan.js +++ b/examples/scan.js @@ -1,16 +1,41 @@ -export function reply (packet) { +export async function error (packet) { + // if (packet.error) { + console.log('======== error during socket server processing') + console.log(packet.error) + console.log('==========') + // } +} - let addresses = (radix=16) => { return packet.response.map(device => { - return device.toString(radix)}) } - // console.log(packet) - console.log('==== device decimal addreses on i2cbus ===\n',addresses(10)) - console.log('==== device hex addreses on i2cbus ===\n',addresses()) +export async function reply (packet) { + let req = packet._header.request + console.log('==== reply packet received ======') + console.log(req) + if (packet.response) console.log(packet.response) + if (req.cmd ==='busFuncs') { + console.log('==========Available Bus Functions=====') + console.dir(packet.functions) + } } export async function test () { - console.log('=============sending scan request ============') - let packet = {cmd:'scan'} - console.dir(packet) + console.log('=============Bus Capabilities ============') + let packet = {cmd:'methods'} await this.send(packet) + console.log('=============UCI Available Functions ============') + packet = {cmd:'busFuncs'} + await this.send(packet) + console.log('=============Device Address Scan Request ============') + packet = {cmd:'scan'} + let addresses = (await this.send(packet)).response + console.log('addresses to check',addresses) + console.log('=============Device ID scan request ============') + if (addresses.length === 0) console.log('no devices operational on the bus') + else { + for (var address of addresses) { + console.log(`getting device ID for address ${address}/${address.toString(16)}`) + let packet = {cmd:'deviceId', args:{address:address}} + await this.send(packet) + } + } } diff --git a/nodemon.json b/nodemon.json new file mode 100644 index 0000000..c995aad --- /dev/null +++ b/nodemon.json @@ -0,0 +1,4 @@ +{ + "ignoreRoot": [".git"], + "watch": ["node_modules/@uci/","node_modules/@uci-utils/","src/","examples/bus.js"] +} diff --git a/package.json b/package.json index f543785..5e568c0 100644 --- a/package.json +++ b/package.json @@ -1,13 +1,18 @@ { "name": "@uci/i2c-bus", - "version": "0.1.21", + "version": "0.1.22", "description": "I2c Bus Classes for Communication to I2C bus via socket or direct call", "main": "src/bus", "scripts": { "client": "node --require esm examples/client || true", + "client:dev": "UCI_ENV=dev ./node_modules/.bin/nodemon --watch examples --require esm --preserve-symlinks examples/client || true", "client:pipe": "TRANSPORT=pipe npm run client", + "client:pipe:dev": "TRANSPORT=pipe npm run client:dev", "relays": "BUS_DEVICE=relays npm run client", + "relays:dev": "BUS_DEVICE=relays npm run client:dev", "relays:pipe": "TRANSPORT=pipe npm run relays", + "relays:pipe:dev": "TRANSPORT=pipe npm run relays:dev", + "queue": "BUS_DEVICE=queue npm run client", "bus": "node --require esm examples/bus", "bus:dev": "UCI_ENV=dev ./node_modules/.bin/nodemon --require esm examples/bus", "bus:debug": "UCI_LOG_LEVEL=debug npm run bus:dev", @@ -36,6 +41,7 @@ "@uci-utils/logger": "^0.0.15", "@uci/base": "^0.1.27", "better-try-catch": "^0.6.2", + "p-queue": "^6.1.1", "pify": "^4.0.1" }, "devDependencies": { diff --git a/src/bus.js b/src/bus.js index f628f6f..b54bfa7 100644 --- a/src/bus.js +++ b/src/bus.js @@ -1,6 +1,8 @@ -import i2c from 'i2c-bus' import pify from 'pify' -import btc from 'better-try-catch' +// import btc from 'better-try-catch' +import to from 'await-to-js' +import i2c from 'i2c-bus' +import PQueue from 'p-queue' import Base from '@uci/base' import logger from '@uci-utils/logger' @@ -27,121 +29,109 @@ class I2CBus extends Base { log.debug({ opts: opts }, 'created bus with these opts') this.busnum = opts.busnum || 1 this.i2cbus = i2c.open(this.busnum, () => {}) - this.bus = bus_funcs - // this.init = this.init.bind(this) + this._funcs = bus_funcs + this.bus = {} + this.addBusFuncs(this._funcs) + this.addNamespace('bus') + this.bus.busFuncs = async (packet) => { + packet.functions = this.busFuncs + packet.cmd='reply' + return packet + } + this._queue = new PQueue({concurrency: opts.concurrency || 1}) } async init() { await super.init() + let count = 0 + this._queue.on('active', () => { + log.debug(`Queue: working on item #${++count}. Size: ${this._queue.size} Pending: ${this._queue.pending}`) + }) } - // TODO use the replacement method instead of replacing _packetPorcess - // or refactor bus_funcs and add to a namespace so default processing can be used - // which will allow adding more command to the this module - // can use a packet Hook to do validation. - async _packetProcess(sname, packet) { - if (!packet.cmd) return { error: 'no cmd: key in packet', packet: packet } - if (this.bus[packet.cmd]) { - let checked = validateArgs(packet) // handle with before hook - if (checked.error) return checked.error - let [err, res] = await btc(this.bus[packet.cmd].bind(this))(packet.args) - if (err) return { error: err.msg, packet: packet } - packet.response = res || '' - packet.cmd = 'reply' - return packet - } else { - return { - error: 'no i2c bus function available for packet command', - packet: packet - } + + get busFuncs() { return this._funcs} + + addBusFuncs(funcs) { + for (let name in funcs) { + let func = funcs[name] + this.addBusFunc(name,func) } } + + addBusFunc(name,func) { + this.bus[name] = busFunction.bind(this, func.name || name , func.args || []) + this._funcs[name]=func + } + } // end of Bus Packet Class export default I2CBus -const validateArgs = function(packet) { - let missing = [] - const ne = arg => { - if (packet.args[arg] === undefined) missing.push(arg) - } - if (packet.cmd === 'scan' || packet.cmd === 'close') return {} - ne('address') - switch (packet.cmd) { - case 'readRaw': - case 'writeRaw': - ne('length') - ne('buffer') +const validateArg = function (arg,value) { + let valid = false + switch (arg) { + case 'address': + valid = Number.isInteger(value) && value >=0 && value <= 119 break - case 'read': - case 'read2': - case 'write': - case 'write2': - ne('cmd') + case 'length': + case 'cmd': + valid = Number.isInteger(value) + break + case 'byte': + valid = Number.isInteger(value) && value >= 0 && value <= 255 + break + case 'word': + valid = Number.isInteger(value) && value >= 0 && value <= 65535 + break + case 'bit': + valid = Number.isInteger(value) && (value === 0 || value === 1) + break + case 'buffer': + valid = Buffer.isBuffer(value) } - switch (packet.cmd) { - case 'write': - case 'write2': - case 'send': - ne('byte') - } - if (missing.length > 0) { - return { - error: `following bus arguments are missing ${missing}`, - packet: packet - } - } - return {} + return valid } -const bus_funcs = { - scan: function() { - return pify(this.i2cbus.scan).bind(this.i2cbus)() - }, - close: function() { - return pify(this.i2cbus.close).bind(this.i2cbus)() - }, - readRaw: function(args) { - return pify(this.i2cbus.i2cRead).bind(this.i2cbus)( - args.address, - args.length, - args.buffer - ) - }, - writeRaw: function(args) { - return pify(this.i2cbus.i2cWrite).bind(this.i2cbus)( - args.address, - args.length, - args.buffer - ) - }, - read: function(args) { - log.debug({msg:'read i2c bus', method:'read', line:118, args:args}) - return pify(this.i2cbus.readByte).bind(this.i2cbus)(args.address, args.cmd) - }, - write: function(args) { - log.debug({msg:'write i2c bus', method:'write', line:122, args:args}) - return pify(this.i2cbus.writeByte.bind(this.i2cbus))( - args.address, - args.cmd, - args.byte - ) - }, - read2: function(args) { - return pify(this.i2cbus.readWord.bind(this.i2cbus))(args.address, args.cmd) - }, - write2: function(args) { - return pify(this.i2cbus.writeWord.bind(this.i2cbus))( - args.address, - args.cmd, - args.byte - ) - }, - receive: function(args) { - // console.log('receivebyte', address) - return pify(this.i2cbus.receiveByte.bind(this.i2cbus))(args.address) - }, - send: function(args) { - // console.log('sendbyte', address,byte) - return pify(this.i2cbus.sendByte.bind(this.i2cbus))(args.address, args.byte) +async function busFunction (func,args,packet) { + let argsV = [] + args.some(arg => { + if (packet.args) { + let argv = packet.args[arg] + if (argv != null) { + if(validateArg(arg,argv)) argsV.push(argv) + else { + packet.error = `argument ${arg} has an invalide value ${argv} for function ${func}` + return true + } + } else { + packet.error = `missing argument, ${arg}, for function ${func}` + return true + } + } + }) + if (!packet.error) { + log.trace({msg:'adding to queue', function:func,arguments:argsV}) + let busfunc = pify(this.i2cbus[func].bind(this.i2cbus,...argsV)) + let [err,res] = await to(this._queue.add(busfunc)) + if (err) packet.error = `error during call to ${func} with arguments ${JSON.stringify(packet.args)}: ${err}` + else packet.response = res } -} //end i2c functions + packet.cmd = 'reply' + return packet +} + +// default function set +const bus_funcs = { + scan: {}, + close: {}, + methods: {name:'i2cFuncs'}, + deviceId: {args:['address']}, + readRaw: {name:'i2cRead', args:['address','length','buffer']}, + writeRaw: {name:'i2cWrite', args:['address','length','buffer']}, + read: {name:'readByte', args:['address','cmd']}, + write: {name:'writeByte', args:['address','cmd','byte']}, + read2: {name:'readWord', args:['address','cmd']}, + write2: {name:'writeWord', args:['address','cmd','word']}, + receive: {name:'receiveByte', args:['address']}, + send: {name:'sendByte', args:['address','byte']} +}