0.1.22 refactor bus functions so they are generated from a common function.

use the uci/base processing and namespaces
add a queue for bus function calls
rework all the examples
master
David Kebler 2019-09-11 18:34:54 -07:00
parent 95a8d20649
commit 06dd1b97c0
10 changed files with 199 additions and 123 deletions

1
.gitignore vendored
View File

@ -1,2 +1,3 @@
/node_modules/
*.lock
/archive/

View File

@ -1,3 +1,4 @@
archive/
tests/
test/
*.test.js

View File

@ -5,10 +5,13 @@
import Bus from '../src/bus'
// can use SOCKETS_DIR='' env variable to get a 'i2c-bus.sock' in a particular directory
// e.g. SOCKETS_DIR=/opt/sockets node -r @std/esm i2cbus
const CONCURRENCY = 1
;
(async () => {
let i2cbus = new Bus({id:'i2c-busx',tcp:true})
let i2cbus = new Bus({id:'i2c-busx',tcp:true, concurrency:CONCURRENCY})
await i2cbus.init()

View File

@ -7,7 +7,6 @@ const DEVICE = (process.env.BUS_DEVICE || 'scan')
const TRANSPORT = (process.env.TRANSPORT || 'tcp')
import Base from '@uci/base'
// import {test, reply } from './relays'
const HOST = (process.env.BUS_HOST || 'sbc')
const PORT = (process.env.BUS_PORT || 1776)
@ -17,11 +16,18 @@ let options = {id:'i2c-client', useRootNS:true}
;
(async () => {
// import not supported by eslint, but esm does so ignore parsing error
let {test,reply} = await import(`./${DEVICE}`)
let {test,reply,error} = await import(`./${DEVICE}`)
let client = new Base(options)
if (TRANSPORT==='tcp') client.addSocket('tcp','c','t',{host:HOST, port:1776})
else client.addSocket('np','c','n',{path:'i2c-bus'})
client.reply = reply // add reply processor
client.error = error
client.on('pushed', packet => {
if (packet.error) {
console.log('=======pushed error received=======\n', packet.error)
console.log('==================')
}
})
await client.init()
await test.call(client,ADDRESS)
process.kill(process.pid, 'SIGTERM')

24
examples/queue.js Normal file
View File

@ -0,0 +1,24 @@
const delay = time => new Promise(res=>setTimeout(()=>res(),time))
export async function error (packet) {
// if (packet.error) {
console.log('======== error during socket server processing')
console.log(packet.error)
console.log('==========')
// }
}
export async function reply (packet) {
console.log('reply done with', packet.count)
}
export async function test (address) {
console.log('============= Queue Stress Test ============')
let packet = {cmd:'write', args:{address:address,cmd: 9, byte:1} }
for (let i = 0; i < 100 ; i++) {
packet = {cmd:'scan', count:i}
console.log('queued:', packet.count)
this.send({cmd:'scan', count:i})
}
await delay(10000)
}

View File

@ -1,16 +1,32 @@
const delay = time => new Promise(res=>setTimeout(()=>res(),time))
export async function error (packet) {
console.log('======== error during socket/server processing')
if(packet.msg) console.log(packet.msg)
if(packet.error) console.log(packet.error)
if(packet._header.request) {
console.log('sent request was')
console.dir(packet._header.request)
}
console.log('==========')
}
export function reply (packet) {
let req = packet._header.request
console.log(`response from relays for ${req.cmd}:`,req.args, `was ${packet.response}`)
if (req.cmd !=='busFuncs') console.log(`response from relays for ${req.cmd}:`,req.args, `was ${packet.response}`)
else {
console.log('==========Available Bus Functions=====')
console.dir(packet.functions)
console.log('===============')
}
}
export async function test (address) {
let packet
console.log('=============sending packets for i2c mcp23008 relay device ============')
console.log('setting ioddir')
packet = {cmd:'write', args:{address:address,cmd: 0, byte:0} }
console.log('setting all pins to outputs')
packet = {cmd:'write', args:{address:address,cmd: 0} }
console.dir(packet)
await this.send(packet)
packet = {cmd:'read', args:{address:address ,cmd:0} }
@ -23,11 +39,11 @@ export async function test (address) {
packet = {cmd:'write', args:{address:address,cmd: 9, byte:byte} }
console.log(`==== relay ${i+1} on with byte: ${byte} ===`)
console.dir(packet)
await this.send(packet)
this.send(packet)
packet = {cmd:'read', args:{address:address ,cmd:9} }
console.dir(packet)
await this.send(packet)
await delay(1000)
this.send(packet)
// await delay(1000)
}
console.log('========= done each relay, clear (off) this ============')
packet = {cmd:'write', args:{address:address,cmd: 9, byte:0} }

View File

@ -1,16 +1,41 @@
export function reply (packet) {
export async function error (packet) {
// if (packet.error) {
console.log('======== error during socket server processing')
console.log(packet.error)
console.log('==========')
// }
}
let addresses = (radix=16) => { return packet.response.map(device => {
return device.toString(radix)}) }
// console.log(packet)
console.log('==== device decimal addreses on i2cbus ===\n',addresses(10))
console.log('==== device hex addreses on i2cbus ===\n',addresses())
export async function reply (packet) {
let req = packet._header.request
console.log('==== reply packet received ======')
console.log(req)
if (packet.response) console.log(packet.response)
if (req.cmd ==='busFuncs') {
console.log('==========Available Bus Functions=====')
console.dir(packet.functions)
}
}
export async function test () {
console.log('=============sending scan request ============')
let packet = {cmd:'scan'}
console.dir(packet)
console.log('=============Bus Capabilities ============')
let packet = {cmd:'methods'}
await this.send(packet)
console.log('=============UCI Available Functions ============')
packet = {cmd:'busFuncs'}
await this.send(packet)
console.log('=============Device Address Scan Request ============')
packet = {cmd:'scan'}
let addresses = (await this.send(packet)).response
console.log('addresses to check',addresses)
console.log('=============Device ID scan request ============')
if (addresses.length === 0) console.log('no devices operational on the bus')
else {
for (var address of addresses) {
console.log(`getting device ID for address ${address}/${address.toString(16)}`)
let packet = {cmd:'deviceId', args:{address:address}}
await this.send(packet)
}
}
}

4
nodemon.json Normal file
View File

@ -0,0 +1,4 @@
{
"ignoreRoot": [".git"],
"watch": ["node_modules/@uci/","node_modules/@uci-utils/","src/","examples/bus.js"]
}

View File

@ -1,13 +1,18 @@
{
"name": "@uci/i2c-bus",
"version": "0.1.21",
"version": "0.1.22",
"description": "I2c Bus Classes for Communication to I2C bus via socket or direct call",
"main": "src/bus",
"scripts": {
"client": "node --require esm examples/client || true",
"client:dev": "UCI_ENV=dev ./node_modules/.bin/nodemon --watch examples --require esm --preserve-symlinks examples/client || true",
"client:pipe": "TRANSPORT=pipe npm run client",
"client:pipe:dev": "TRANSPORT=pipe npm run client:dev",
"relays": "BUS_DEVICE=relays npm run client",
"relays:dev": "BUS_DEVICE=relays npm run client:dev",
"relays:pipe": "TRANSPORT=pipe npm run relays",
"relays:pipe:dev": "TRANSPORT=pipe npm run relays:dev",
"queue": "BUS_DEVICE=queue npm run client",
"bus": "node --require esm examples/bus",
"bus:dev": "UCI_ENV=dev ./node_modules/.bin/nodemon --require esm examples/bus",
"bus:debug": "UCI_LOG_LEVEL=debug npm run bus:dev",
@ -36,6 +41,7 @@
"@uci-utils/logger": "^0.0.15",
"@uci/base": "^0.1.27",
"better-try-catch": "^0.6.2",
"p-queue": "^6.1.1",
"pify": "^4.0.1"
},
"devDependencies": {

View File

@ -1,6 +1,8 @@
import i2c from 'i2c-bus'
import pify from 'pify'
import btc from 'better-try-catch'
// import btc from 'better-try-catch'
import to from 'await-to-js'
import i2c from 'i2c-bus'
import PQueue from 'p-queue'
import Base from '@uci/base'
import logger from '@uci-utils/logger'
@ -27,121 +29,109 @@ class I2CBus extends Base {
log.debug({ opts: opts }, 'created bus with these opts')
this.busnum = opts.busnum || 1
this.i2cbus = i2c.open(this.busnum, () => {})
this.bus = bus_funcs
// this.init = this.init.bind(this)
this._funcs = bus_funcs
this.bus = {}
this.addBusFuncs(this._funcs)
this.addNamespace('bus')
this.bus.busFuncs = async (packet) => {
packet.functions = this.busFuncs
packet.cmd='reply'
return packet
}
this._queue = new PQueue({concurrency: opts.concurrency || 1})
}
async init() {
await super.init()
let count = 0
this._queue.on('active', () => {
log.debug(`Queue: working on item #${++count}. Size: ${this._queue.size} Pending: ${this._queue.pending}`)
})
}
// TODO use the replacement method instead of replacing _packetPorcess
// or refactor bus_funcs and add to a namespace so default processing can be used
// which will allow adding more command to the this module
// can use a packet Hook to do validation.
async _packetProcess(sname, packet) {
if (!packet.cmd) return { error: 'no cmd: key in packet', packet: packet }
if (this.bus[packet.cmd]) {
let checked = validateArgs(packet) // handle with before hook
if (checked.error) return checked.error
let [err, res] = await btc(this.bus[packet.cmd].bind(this))(packet.args)
if (err) return { error: err.msg, packet: packet }
packet.response = res || ''
packet.cmd = 'reply'
return packet
} else {
return {
error: 'no i2c bus function available for packet command',
packet: packet
}
get busFuncs() { return this._funcs}
addBusFuncs(funcs) {
for (let name in funcs) {
let func = funcs[name]
this.addBusFunc(name,func)
}
}
addBusFunc(name,func) {
this.bus[name] = busFunction.bind(this, func.name || name , func.args || [])
this._funcs[name]=func
}
} // end of Bus Packet Class
export default I2CBus
const validateArgs = function(packet) {
let missing = []
const ne = arg => {
if (packet.args[arg] === undefined) missing.push(arg)
}
if (packet.cmd === 'scan' || packet.cmd === 'close') return {}
ne('address')
switch (packet.cmd) {
case 'readRaw':
case 'writeRaw':
ne('length')
ne('buffer')
const validateArg = function (arg,value) {
let valid = false
switch (arg) {
case 'address':
valid = Number.isInteger(value) && value >=0 && value <= 119
break
case 'read':
case 'read2':
case 'write':
case 'write2':
ne('cmd')
case 'length':
case 'cmd':
valid = Number.isInteger(value)
break
case 'byte':
valid = Number.isInteger(value) && value >= 0 && value <= 255
break
case 'word':
valid = Number.isInteger(value) && value >= 0 && value <= 65535
break
case 'bit':
valid = Number.isInteger(value) && (value === 0 || value === 1)
break
case 'buffer':
valid = Buffer.isBuffer(value)
}
switch (packet.cmd) {
case 'write':
case 'write2':
case 'send':
ne('byte')
}
if (missing.length > 0) {
return {
error: `following bus arguments are missing ${missing}`,
packet: packet
}
}
return {}
return valid
}
const bus_funcs = {
scan: function() {
return pify(this.i2cbus.scan).bind(this.i2cbus)()
},
close: function() {
return pify(this.i2cbus.close).bind(this.i2cbus)()
},
readRaw: function(args) {
return pify(this.i2cbus.i2cRead).bind(this.i2cbus)(
args.address,
args.length,
args.buffer
)
},
writeRaw: function(args) {
return pify(this.i2cbus.i2cWrite).bind(this.i2cbus)(
args.address,
args.length,
args.buffer
)
},
read: function(args) {
log.debug({msg:'read i2c bus', method:'read', line:118, args:args})
return pify(this.i2cbus.readByte).bind(this.i2cbus)(args.address, args.cmd)
},
write: function(args) {
log.debug({msg:'write i2c bus', method:'write', line:122, args:args})
return pify(this.i2cbus.writeByte.bind(this.i2cbus))(
args.address,
args.cmd,
args.byte
)
},
read2: function(args) {
return pify(this.i2cbus.readWord.bind(this.i2cbus))(args.address, args.cmd)
},
write2: function(args) {
return pify(this.i2cbus.writeWord.bind(this.i2cbus))(
args.address,
args.cmd,
args.byte
)
},
receive: function(args) {
// console.log('receivebyte', address)
return pify(this.i2cbus.receiveByte.bind(this.i2cbus))(args.address)
},
send: function(args) {
// console.log('sendbyte', address,byte)
return pify(this.i2cbus.sendByte.bind(this.i2cbus))(args.address, args.byte)
async function busFunction (func,args,packet) {
let argsV = []
args.some(arg => {
if (packet.args) {
let argv = packet.args[arg]
if (argv != null) {
if(validateArg(arg,argv)) argsV.push(argv)
else {
packet.error = `argument ${arg} has an invalide value ${argv} for function ${func}`
return true
}
} else {
packet.error = `missing argument, ${arg}, for function ${func}`
return true
}
}
})
if (!packet.error) {
log.trace({msg:'adding to queue', function:func,arguments:argsV})
let busfunc = pify(this.i2cbus[func].bind(this.i2cbus,...argsV))
let [err,res] = await to(this._queue.add(busfunc))
if (err) packet.error = `error during call to ${func} with arguments ${JSON.stringify(packet.args)}: ${err}`
else packet.response = res
}
} //end i2c functions
packet.cmd = 'reply'
return packet
}
// default function set
const bus_funcs = {
scan: {},
close: {},
methods: {name:'i2cFuncs'},
deviceId: {args:['address']},
readRaw: {name:'i2cRead', args:['address','length','buffer']},
writeRaw: {name:'i2cWrite', args:['address','length','buffer']},
read: {name:'readByte', args:['address','cmd']},
write: {name:'writeByte', args:['address','cmd','byte']},
read2: {name:'readWord', args:['address','cmd']},
write2: {name:'writeWord', args:['address','cmd','word']},
receive: {name:'receiveByte', args:['address']},
send: {name:'sendByte', args:['address','byte']}
}