refactor send/write so send is async and will wait for reponse from socket.

write now makes sure to drain before allowing another packet write
refactored tests with new await send returning processed packet.
tls
David Kebler 2018-02-12 14:41:06 -08:00
parent d3b979b1fd
commit 28768c8007
11 changed files with 230 additions and 266 deletions

View File

@ -1,9 +1,7 @@
import Consumer from '../src/consumer'
const USOCKET = __dirname + '/sample.sock'
const client1= new Consumer({path:USOCKET,name:'example-consumer1' })
const client2 = new Consumer({path:USOCKET,name:'example-consumer2'})
const client1= new Consumer({np:true,name:'example-consumer1' })
const client2 = new Consumer({np:true,name:'example-consumer2'})
let packet1 = {name: 'client1', cmd:'doit', data:'data sent by client1'}
let packet2 = {name: 'client2', cmd:'doit', data:'data sent by client2'}
@ -13,18 +11,23 @@ const process = function (packet) {
console.log(`Packet from ${packet.name} Processed by Socket: ${packet.status}`)
}
client1.registerPacketProcessor(process)
client2._packetProcess = process
// client1.registerPacketProcessor(process)
//
// client2._packetProcess = process
;
(async () => {
await Promise.all([client1.connect(),client2.connect()])
client1.send(packet1)
client2.send(packet2)
client1.end()
client2.end()
// await Promise.all([client1.connect(),client2.connect()])
await client1.connect()
console.log('=========\n',await client1.send(packet1))
// client1.send(packet1)
// client2.send(packet2)
// client1.end()
// client2.end()
})().catch(err => {
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)

View File

@ -1,8 +1,6 @@
import { Socket } from '../src'
const USOCKET = __dirname + '/sample.sock'
;
;
(async () => {
class Test extends Socket {
@ -18,18 +16,21 @@ const USOCKET = __dirname + '/sample.sock'
}
async doit(data,name) {
return new Promise(resolve => {
let res = {}
console.log('data:', data)
res.status ='success'
res.name = name
res.cmd = 'reply'
res.data = 'this might be response data from another process'
return(res)
res.data = 'this would be response data from socket doit function'
resolve(res)
})
}
}
let test = new Test({path:USOCKET})
let test = new Test({np:true})
await test.create()
})().catch(err => {

View File

@ -1,17 +1,18 @@
{
"name": "@uci/socket",
"version": "0.1.0",
"description": "Bare bones intra Host Unix Socket for basic IPC on same machine",
"description": "JSON packet intra(named)/inter(TCP) host communication over socket",
"main": "src",
"scripts": {
"testw": "mocha -r @std/esm test/*.test.mjs --watch --recurse --watch-extensions mjs",
"test": "mocha -r @std/esm test/*.test.mjs",
"testlog": "mocha -r @std/esm test/*.test.mjs | ./node_modules/.bin/bunyan",
"testci": "istanbul cover ./node_modules/.bin/_mocha --report lcovonly -- -R spec --recursive && codecov || true",
"s": "node -r @std/esm examples/server | ./node_modules/.bin/bunyan",
"devs": "./node_modules/.bin/nodemon -r @std/esm -e mjs examples/server",
"c": "node -r @std/esm examples/client | ./node_modules/.bin/bunyan -o short",
"c2": "node -r @std/esm examples/client2 | ./node_modules/.bin/bunyan -o short"
"s": "SOCKETS_DIR=/opt/sockets DEBUG=true node -r @std/esm examples/server",
"devs": "SOCKETS_DIR=/opt/sockets DEBUG=true ./node_modules/.bin/nodemon -r @std/esm -e mjs examples/server",
"c": "SOCKETS_DIR=/opt/sockets node -r @std/esm examples/client",
"devc": "SOCKETS_DIR=/opt/sockets DEBUG=true node -r @std/esm examples/client",
"c2": "node -r @std/esm examples/client2"
},
"author": "David Kebler",
"license": "MIT",
@ -21,9 +22,14 @@
},
"keywords": [
"node.js",
"i2c",
"rpi",
"raspberrypi"
"socket",
"net",
"JSON",
"packet",
"serialize",
"named pipe",
"unix socket",
"TCP"
],
"bugs": {
"url": "https://github.com/uCOMmandIt/message/issues"
@ -36,11 +42,11 @@
"chai-as-promised": "^7.1.1",
"codecov": "^3.0.0",
"istanbul": "^0.4.5",
"mocha": "^4.0.1"
"mocha": "^4.0.1",
"nodemon": "^1.14.12"
},
"dependencies": {
"better-try-catch": "^0.6.2",
"bunyan": "^1.8.12",
"death": "^1.1.0"
}
}

View File

@ -1,6 +1,7 @@
import { Socket } from 'net'
import btc from 'better-try-catch'
import JsonStream from './json-stream'
import {promisify} from 'util'
import logger from '../../uci-logger/src/logger'
let log = {}
@ -34,6 +35,7 @@ export default class Consumer extends Socket {
// bind to class for other class functions
this.connect = this.connect.bind(this)
this.__ready = this.__ready.bind(this)
// this._write = this._write.bind(this)
}
async connect () {
@ -85,15 +87,30 @@ export default class Consumer extends Socket {
}
async send(packet) {
await this.write(await this.stream.serialize(packet))
// TODO handle possible error
// TODO await response if required by setting id to packet
// then set a flag (and promise) that is resovled in the listener
return new Promise( async (resolve) => {
setTimeout(() => {resolve({error:'no response from socket in 10sec'})},10000)
packet._id = Math.random().toString().slice(2)
// console.log('sending to socket', packet.id)
let [err, res] = await btc(this.stream.serialize)(packet)
if (err) resolve({error:'unable to serialize packet for sending', packet:packet})
log.info(await this.__write(res))
this.once(packet._id,async function(reply){
// console.log('reply emitted',reply)
this.removeAllListeners(reply.id)
delete reply._id
let res = await this._packetProcess(reply)
if (!res) { // if process was not promise returning like just logged to console
res = reply
log.warn('consumer function was not promise returning - resolving unprocessed')
}
resolve(res)
}) //end listener
})
}
// TODO register alt stream processor (emit 'message' with JSON, serialize function, onData method for raw socket chucks)
// TODO register authenciation function (set up default)
registerPacketProcessor (func) {
@ -102,6 +119,19 @@ export default class Consumer extends Socket {
// PRIVATE METHODS
async __write(packet) {
// timeout already set if sockect can't be drained in 10 secs
return new Promise(resolve => {
const cb = () => resolve('packet written to socket stream')
if (!super.write(packet)) {
this.once('drain',cb )
} else {
process.nextTick(cb)
}
})
}
__ready() {return this._ready}
async _listen () {
@ -112,17 +142,19 @@ export default class Consumer extends Socket {
if (packet._handshake) {
this._ready = true
return }
await this._packetProcess(packet)
// TODO send back ack with consumer ID and authorization and wait
// when authorized drop through here to emit
// console.log('incoming packet',packet)
this.emit(packet._id, packet)
}
}
// default packet process just a simple console logger
// default packet process just a simple console logger. ignores any cmd: prop
_packetProcess (packet) {
console.log('default consumer processor -- packet from socket')
console.log('default consumer processor -- log packet from socket to console')
console.dir(packet)
}
} // end class

View File

@ -13,10 +13,12 @@ export default class JsonStream extends EventEmitter{
this._buffer = ''
this._delimeter = opts.delimiter || '#'
this.onData = this.onData.bind(this)
this.serialize = this.serialize.bind(this)
}
onData (data) {
// console.log('a chunk arrived', data)
data = decoder.write(data)
try {
this._handleData(data)
@ -33,7 +35,7 @@ export default class JsonStream extends EventEmitter{
if (err2) reject(err2)
let data = length + this._delimeter + messageData
// console.log('serialized',data)
return resolve(data)
resolve(data)
})
}

View File

@ -46,7 +46,7 @@ export default class Socket extends Server {
process.kill(process.pid, 'SIGUSR2')
})
this.on('error', async (err) => {
this.once('error', async (err) => {
// recover from socket file that was not removed
if (err.code === 'EADDRINUSE') {
if (this.opts.np) { // if TCP socket should already be dead
@ -77,18 +77,20 @@ export default class Socket extends Server {
if (err) return err
// this gets called for each client connection and is unique to each
this.on('connection', async (socket) => {
let write = this._write.bind(socket)
const stream = new JSONStream()
log.info('new consumer connecting sending handshake')
socket.write(await stream.serialize({'_handshake':true}))
write(await stream.serialize({'_handshake':true}))
socket.on('data', stream.onData)
// TODO need to start error listener for stream so errors can be processed
stream.on('message', messageProcess.bind(this))
async function messageProcess (packet) {
// console.log('before processing',packet)
let processed = await this._packetProcess(packet)
if (!processed) processed = { error: 'packet command function likely did not return a promise', packet:packet}
if (!processed) processed = { error: 'socket packet command function likely did not return a promise', packet:packet}
processed._id = packet._id //make sure return packet has its ID
// console.log('after processing',processed)
socket.write(await stream.serialize(processed))
write(await stream.serialize(processed))
}
}) // end connecttion consumer
log.info({opts: this.opts},'socket created')
@ -106,8 +108,25 @@ export default class Socket extends Server {
// default packet process, just a simple echo
_packetProcess (packet) {
packet.res='echoed'
return packet
return new Promise(resolve => {
packet.response='this packet was echoed by default'
resolve(packet)
})
}
// must have a consumer socket bound to use
async _write(packet) {
// timeout already set if sockect can't be drained in 10 secs
return new Promise(resolve => {
const cb = () => resolve('packet written to socket stream')
if (!this.write(packet)) {
this.once('drain',cb )
} else {
process.nextTick(cb)
}
})
}
} // end class

View File

@ -7,8 +7,10 @@ let socket = new Socket({path:USOCKET,name:'default-unix-socket'})
socket.test = 'at socket => '
socket.registerPacketProcessor(async function (packet) {
return new Promise((resolve) => {
packet.payload = 'overwrite default processor from instance '+ this.test + packet.payload
return packet
resolve(packet)
})
})
;

View File

@ -33,29 +33,21 @@ describe('Connects and Processes a payload in a JSON packet via TCP Socket', fun
it('with default host and port', async function () {
let tcpconsumer_default = new Consumer({name:'tcpconsumer', log:false})
return new Promise(async function (resolve, reject) {
let [err] = await btc(tcpconsumer_default.connect)()
if (err) {
console.log('unable to connect to socket to start test', tcpconsumer_default.port)
process.kill(process.pid, 'SIGTERM')
}
tcpconsumer_default._packetProcess = function (packet) {
try {
expect(packet.payload).to.equal('8080:tcp payload')
resolve()
}
catch(error) {
reject(error)
}
}
tcpconsumer_default.registerPacketProcessor(async function (packet) {
return new Promise((resolve) => {
packet.payload = packet.payload +':local'
resolve(packet)})
})
let packet = {payload:'tcp payload'}
tcpconsumer_default.send(packet)
}) //end promise
let res = await tcpconsumer_default.send(packet)
expect(res.payload).to.equal('8080:tcp payload:local')
}) // end tcp socket test
@ -63,8 +55,6 @@ describe('Connects and Processes a payload in a JSON packet via TCP Socket', fun
let tcpconsumer_9080 = new Consumer({port:9080, name:'tcp-consumer-9080'})
return new Promise(async function (resolve, reject) {
let [err] = await btc(tcpconsumer_9080.connect)()
if (err) {
@ -72,20 +62,15 @@ describe('Connects and Processes a payload in a JSON packet via TCP Socket', fun
process.kill(process.pid, 'SIGTERM')
}
tcpconsumer_9080.registerPacketProcessor(function (packet) {
try {
expect(packet.payload).to.equal('9080:tcp payload')
resolve()
}
catch(error) {
reject(error)
}
tcpconsumer_9080.registerPacketProcessor(async function (packet) {
return new Promise((resolve) => {
packet.payload = packet.payload +':local'
resolve(packet)})
})
let packet = {payload:'tcp payload'}
tcpconsumer_9080.send(packet)
}) //end promise
let res = await tcpconsumer_9080.send(packet)
expect(res.payload).to.equal('9080:tcp payload:local')
}) // end tcp socket 2 test

View File

@ -1,107 +0,0 @@
import { spawn } from 'child_process'
import chai from 'chai'
import chaiAsPromised from 'chai-as-promised'
import btc from 'better-try-catch'
chai.use(chaiAsPromised)
const expect = chai.expect
import { Consumer } from '../src'
const USOCKET = __dirname + '/sockets/test.sock'
const SOCKET_FILE = 'usocket-default-overwrite'
let consumer = new Consumer({path:USOCKET,name:'unix-consumer'})
let consumer2 = new Consumer({path:USOCKET, name:'unix-consumer2'})
// const delay = time => new Promise(res=>setTimeout(()=>res(),time))
let socket = {}
describe('Connects and Processes a payload via Unix Socket using JSON packet with alt default processor', function(){
before(async function(){
socket = spawn('node',['-r', '@std/esm', './test/sockets/'+SOCKET_FILE])
socket.stdout.on('data', function(buf) {
console.log('[Socket]', String(buf))
})
})
after(async function(){
socket.kill()
})
it('Tests JSON packet procssing, 10 packets', async function () {
consumer.times = 0
return new Promise(async function (resolve, reject) {
setTimeout(() =>{ reject('10 packets not received in time')},1900)
consumer._packetProcess = function (packet) {
this.times++
if (this.times!==11) return
try {
expect(packet.payload).to.equal('overwrite default processor from instance at socket => unix payload')
resolve()
}
catch(error) {
reject(error)
}
}
let [err] = await btc(consumer.connect)()
if (err) reject(err)
let packet = {payload:'unix payload'}
for (var i = 0; i < 11; i++) {
consumer.send(packet)
}
}) //end promise
}) // end unix socket test
it('unix socket with two consumers alternating packets, 10 packets each', async function () {
consumer.times = 0
return new Promise(async function (resolve, reject) {
setTimeout(() =>{ reject('10 packets not received in time')},1900)
consumer._packetProcess = function (packet) {
this.times++
// console.log(this.times,packet.payload)
if (this.times!==11) return
try {
expect(packet.payload).to.equal('overwrite default processor from instance at socket => consumer 1 unix payload')
resolve()
}
catch(error) {
reject(error)
}
}
consumer2._packetProcess = function (packet) {
return packet
}
let [err] = await btc(consumer2.connect)()
if (err) reject(err)
let packet1 = {payload:'consumer 1 unix payload'}
let packet2 = {payload:'consumer2 unix payload'}
for (var i = 0; i < 11; i++) {
consumer.send(packet1)
consumer2.send(packet2)
}
}) //end promise
}) // end unix socket test
}) // end describe

View File

@ -30,11 +30,9 @@ describe('Connects and Processes a payload via Unix Socket using JSON packet wit
socket.kill()
})
it('Tests unix socket with default echo JSON packet procssing, 10 packets with conect via connect', async function () {
const TIMES = 500
consumer.times = 0
return new Promise(async function (resolve, reject) {
it(`Tests unix socket with default echo JSON packet procssing with ${TIMES} packets sent`, async function () {
let [err] = await btc(consumer.connect)()
if (err) {
@ -42,35 +40,26 @@ describe('Connects and Processes a payload via Unix Socket using JSON packet wit
process.kill(process.pid, 'SIGTERM')
}
setTimeout(() =>{ reject('10 packets not received in time')},1900)
consumer._packetProcess = function (packet) {
this.times++
if (this.times!==11) return
consumer.registerPacketProcessor(async function (packet) {
return new Promise((resolve) => {
packet.times += 1
if (packet.times === TIMES) packet.payload = 'local1:'+packet.payload
resolve(packet)})
})
try {
expect(packet.payload).to.equal('unix payload')
resolve()
}
catch(error) {
reject(error)
}
}
for (var i = 0; i < 11; i++) {
let packet = {payload:'unix payload'}
consumer.send(packet)
let packet = {payload:'payload', times:0}
for (let i = 1; i <= TIMES; i++) {
packet = await consumer.send(packet)
}
}) //end promise
expect(packet.payload+':'+packet.times).to.equal('local1:payload:'+TIMES)
}) // end unix socket test
it('unix socket with two consumers alternating packets, 10 packets each with local and added context', async function () {
it(`unix socket with two consumers alternating packets, ${TIMES} packets each and local processing`, async function () {
consumer.times = 0
consumer.test = ':local'
let [err] = await btc(consumer2.connect)()
if (err) {
@ -78,41 +67,21 @@ describe('Connects and Processes a payload via Unix Socket using JSON packet wit
process.kill(process.pid, 'SIGTERM')
}
return new Promise(async function (resolve, reject) {
setTimeout(() =>{ reject('10 packets not received in time')},1900)
consumer.registerPacketProcessor(function (packet) {
this.times++
// console.log(this.times,packet.payload)
if (this.times!==11) return
packet.payload = packet.payload + this.test
try {
expect(packet.payload).to.equal('consumer 1 unix payload:local')
resolve()
}
catch(error) {
reject(error)
}
consumer2.registerPacketProcessor(async function (packet) {
return new Promise((resolve) => {
packet.times += 1
if (packet.times === TIMES) packet.payload = 'local2:'+packet.payload
resolve(packet)})
})
consumer2._packetProcess = function (packet) {
return packet
let packet = {consumer:1, payload:'payload', times:-1}
for (let i = 0; i < TIMES; i++) {
packet = await consumer.send(packet)
if (packet.times === TIMES) packet.times = 1
packet = await consumer2.send(packet)
}
let packet1 = {payload:'consumer 1 unix payload'}
let packet2 = {payload:'consumer2 unix payload'}
for (var i = 0; i < 11; i++) {
consumer.send(packet1)
consumer2.send(packet2)
}
}) //end promise
expect(packet.payload+':'+packet.times).to.equal('local2:local1:payload:'+TIMES)
}) // end unix socket test
}) // end describe

52
test/usocket.test.mjs Normal file
View File

@ -0,0 +1,52 @@
import { spawn } from 'child_process'
import chai from 'chai'
import chaiAsPromised from 'chai-as-promised'
import btc from 'better-try-catch'
chai.use(chaiAsPromised)
const expect = chai.expect
import { Consumer } from '../src'
const USOCKET = __dirname + '/sockets/test.sock'
const SOCKET_FILE = 'usocket'
let consumer = new Consumer({path:USOCKET,name:'unix-consumer'})
// const delay = time => new Promise(res=>setTimeout(()=>res(),time))
let socket = {}
describe('Connects and Processes a payload via Unix Socket using JSON packet with alt processor', function(){
before(async function(){
socket = spawn('node',['-r', '@std/esm', './test/sockets/'+SOCKET_FILE])
socket.stdout.on('data', function(buf) {
console.log('[Socket]', String(buf))
})
})
after(async function(){
socket.kill()
})
it('Tests alternate JSON packet procssing at socket and consumer', async function () {
let [err] = await btc(consumer.connect)()
if (err) {
console.log('unable to connect to socket to start test', consumer.path)
process.kill(process.pid, 'SIGTERM')
}
consumer.registerPacketProcessor(async function (packet) {
return new Promise((resolve) => {
packet.payload = 'local:'+packet.payload
resolve(packet)})
})
let packet = { payload:'payload'}
let res = await consumer.send(packet)
expect(res.payload).to.equal('local:overwrite default processor from instance at socket => payload')
}) // end unix socket test
}) // end describe