streamline class method binding and setting of packet processing function

tls
David Kebler 2018-01-19 20:43:16 -08:00
parent 57324c616c
commit 4b13cef73a
6 changed files with 119 additions and 66 deletions

View File

@ -12,23 +12,26 @@ export default class Consumer extends Socket {
this.host = opts.host || '127.0.0.1' this.host = opts.host || '127.0.0.1'
this.port = opts.port || 8080 this.port = opts.port || 8080
} else this.path = path } else this.path = path
this._pp = opts.packetProcessor || 'processPacket'
this.keepAlive = opts.keepAlive ? opts.keepAlive : true this.keepAlive = opts.keepAlive ? opts.keepAlive : true
this._ready = false this._ready = false
this.timeout = opts.timeout || 1000 this.timeout = opts.timeout || 1000
this.wait = opts.wait || 30 this.wait = opts.wait || 20
// logging
this.log_file=opts.log_file || './socket.log' this.log_file=opts.log_file || './socket.log'
this.log_opts = {streams:[]} this.log_opts = {streams:[]}
this.log_opts.name = opts.name ? opts.name : 'uci-unix-socket-consumer' this.log_opts.name = opts.name ? opts.name : 'uci-unix-socket-consumer'
this.log_opts.streams.push({level: 'info',path: this.log_file }) this.log_opts.streams.push({level: 'info',path: this.log_file })
if (opts.log) this.log_opts.streams.push({level: 'info',stream: process.stdout}) if (opts.log) this.log_opts.streams.push({level: 'info',stream: process.stdout})
this.log = bunyan.createLogger(this.log_opts) this.log = bunyan.createLogger(this.log_opts)
// bind to class for other class functions
this.connect = this.connect.bind(this)
} }
ready() {return this._ready} ready() {return this._ready}
async connect (app) { async connect () {
await this.listen(app) await this.listen()
this.log.info('listening') this.log.info('listening')
return new Promise( (resolve,reject) => { return new Promise( (resolve,reject) => {
@ -66,7 +69,7 @@ export default class Consumer extends Socket {
else { this.log.info({packet:packet}, 'bad packet JSON syntax')} else { this.log.info({packet:packet}, 'bad packet JSON syntax')}
} }
async listen (app={}) { async listen () {
let packet = new Stream() let packet = new Stream()
@ -74,22 +77,26 @@ export default class Consumer extends Socket {
packet.write(chunk) packet.write(chunk)
}) })
packet.on('data', async (strJSON) => { packet.on('data', async (strJSON) => {
let [err, packet] = btc(JSON.parse)(strJSON) let [err, packet] = btc(JSON.parse)(strJSON)
if (!err) { if (!err) {
if (packet.ready) { if (packet.ready) {
this._ready = true this._ready = true
return } return }
// set default packet processing - simple print to console of packet
app.cpp = app.cpp || 'processPacket' // set packet processing
if (!app[app.cpp]) { this.pp = this.pp || this._pp
app.cpp = 'processPacket'
app.processPacket = async (packet) => { // if no processor provided use this console logger one
this.log.info({packet:packet},'incoming packet from socket') if (!this[this.pp]) {
this.pp = 'processPacket'
this.processPacket = async (packet) => {
this.log.info({packet:packet},'process with default logger')
console.log('packet from socket')
console.dir(packet)
return packet } return packet }
} }
await app[app.cpp](packet) // process the packet await this[this.pp].bind(this)(packet) // process the packet
} }
else { this.log.info({strJSON: strJSON},'bad packet JSON syntax')} else { this.log.info({strJSON: strJSON},'bad packet JSON syntax')}
}) })
@ -97,7 +104,7 @@ export default class Consumer extends Socket {
} // end class } // end class
// wait for handshake from socket // wait for handshake packet from socket
function isReady(ready, wait=30, timeout=1000) { function isReady(ready, wait=30, timeout=1000) {
let log = this.log let log = this.log
let time = 0 let time = 0

View File

@ -13,7 +13,7 @@ export default class Socket extends Server {
opts = path opts = path
this.listen_opts = { host: opts.host || '127.0.0.1', port: opts.port || 8080} this.listen_opts = { host: opts.host || '127.0.0.1', port: opts.port || 8080}
} else this.listen_opts = { path: path } } else this.listen_opts = { path: path }
this.spp = opts.spp || 'processPacket' this._pp = opts.packetProcessor || 'processPacket'
// logging // logging
this.log_file=opts.log_file || './socket.log' this.log_file=opts.log_file || './socket.log'
this.log_opts = {streams:[]} this.log_opts = {streams:[]}
@ -25,7 +25,7 @@ export default class Socket extends Server {
} // end constructor } // end constructor
async create (app={}) { async create () {
return new Promise( async (resolve,reject) => { return new Promise( async (resolve,reject) => {
@ -47,7 +47,7 @@ export default class Socket extends Server {
if (path) { // if TCP socket should already be dead if (path) { // if TCP socket should already be dead
this.log.info({socket: path}, 'already exists...deleting') this.log.info({socket: path}, 'already exists...deleting')
await fileDelete(path) await fileDelete(path)
return await this.listen.bind(this)(this.listen_opts, app) return await this.listen.bind(this)(this.listen_opts)
} }
} }
// otherwise fatally exit // otherwise fatally exit
@ -55,7 +55,7 @@ export default class Socket extends Server {
reject(err) reject(err)
}) })
let [err, res] = await btc(this.listen.bind(this))(this.listen_opts,app) let [err, res] = await btc(this.listen.bind(this))(this.listen_opts)
if (err) reject(err) if (err) reject(err)
resolve(res) resolve(res)
@ -63,7 +63,7 @@ export default class Socket extends Server {
} // end create } // end create
async listen (opts,app) { async listen (opts) {
super.listen(opts, async (err, res) => { super.listen(opts, async (err, res) => {
@ -86,18 +86,17 @@ export default class Socket extends Server {
this.log.info({packet:packet},'Server: packet received to socket') this.log.info({packet:packet},'Server: packet received to socket')
// set default packet processing // set default packet processing
// console.log('==========',app.spp,'====',this.spp)
this.spp = app.spp || this.spp this.pp = this.pp || this._pp
// console.log('==========',app.spp,'====',this.spp) // console.log('==========',app.spp,'====',this.spp)
if (!app[this.spp]) { if (!this[this.pp]) {
// app.spp = 'processPacket' this.processPacket = async (packet) => {
app.processPacket = async (packet) => {
packet.res='echoed' packet.res='echoed'
this.log.info({packet:packet},'packet being sent to consumer') this.log.info({packet:packet},'packet being sent to consumer')
return packet } return packet }
} }
socket.write(JSON.stringify(await app[this.spp].bind(app)(packet))+'\n' ) socket.write(JSON.stringify(await this[this.pp].bind(this)(packet))+'\n' )
} }
else { else {
this.log.info(`bad packet JSON syntax \n ${strJSON}`) this.log.info(`bad packet JSON syntax \n ${strJSON}`)
@ -119,7 +118,6 @@ export default class Socket extends Server {
} }
async destroy () { async destroy () {
this.log.info('closing down socket') this.log.info('closing down socket')

View File

@ -11,11 +11,14 @@ const USOCKET = __dirname + '/sample.sock'
let uconsumer = new Consumer(USOCKET, {name:'test-uconsumer'}) let uconsumer = new Consumer(USOCKET, {name:'test-uconsumer'})
let tcpconsumer = new Consumer({name:'test-tcpconsumer'}) let tcpconsumer = new Consumer({name:'test-tcpconsumer'})
let tcpconsumer2 = new Consumer({port:9080, packetProcessor:'test', name:'test-tcpconsumer-2'})
const delay = time => new Promise(res=>setTimeout(()=>res(),time)) const delay = time => new Promise(res=>setTimeout(()=>res(),time))
let usocket = {} let usocket = {}
let tcpsocket = {} let tcpsocket = {}
let tcpsocket2 = {}
describe('Connects and Processes a payload in a JSON packet', function(){ describe('Connects and Processes a payload in a JSON packet', function(){
@ -30,62 +33,98 @@ describe('Connects and Processes a payload in a JSON packet', function(){
console.log('[Socket]', String(buf)) console.log('[Socket]', String(buf))
}) })
tcpsocket2 = spawn('node',['-r', '@std/esm', './test/tcpsocket2'])
tcpsocket2.stdout.on('data', function(buf) {
console.log('[Socket]', String(buf))
})
await delay(500) // wait for sockets to get going await delay(500) // wait for sockets to get going
}) })
after(async function(){ after(async function(){
usocket.kill() usocket.kill()
tcpsocket.kill() tcpsocket.kill()
tcpsocket2.kill()
}) })
it('via unix socket', async function () { it('via unix socket with defaults', async function () {
uconsumer.times = 0
return new Promise(async function (resolve, reject) { return new Promise(async function (resolve, reject) {
const app = { setTimeout(() =>{ reject('10 packets not received in time')},1900)
processPacket: function (packet) {
try { uconsumer.processPacket = function (packet) {
expect(packet.payload).to.equal('processed unix payload') this.times++
resolve() if (this.times<10) return
}
catch(error) { try {
reject(error) // expect(packet.payload).to.equal('unix payload')
} expect(packet.payload).to.equal('unix payload')
resolve()
}
catch(error) {
reject(error)
} }
} }
let [err] = await btc(uconsumer.connect.bind(uconsumer))(app) let [err] = await btc(uconsumer.connect)()
if (err) reject(err) if (err) reject(err)
let packet = {payload:'unix payload'} let packet = {payload:'unix payload'}
uconsumer.send(packet) for (var i = 0; i < 11; i++) {
uconsumer.send(packet)
}
}) //end promise }) //end promise
}) // end unix socket test }) // end unix socket test
it('via tcp socket', async function () { it('via tcp socket with defaults', async function () {
return new Promise(async function (resolve, reject) { return new Promise(async function (resolve, reject) {
const app = { tcpconsumer.processPacket = function (packet) {
processPacket: function (packet) { try {
try { expect(packet.payload).to.equal('tcp processed tcp payload')
expect(packet.payload).to.equal('processed tcp payload') resolve()
resolve() }
} catch(error) {
catch(error) { reject(error)
reject(error)
}
} }
} }
let [err] = await btc(tcpconsumer.connect.bind(tcpconsumer))(app) let [err] = await btc(tcpconsumer.connect)()
if (err) reject(err) if (err) reject(err)
let packet = {payload:'tcp payload'} let packet = {payload:'tcp payload'}
tcpconsumer.send(packet) tcpconsumer.send(packet)
}) //end promise }) //end promise
}) // end unix socket test }) // end tcp socket test
it('via alternate port tcp socket with attached custom socket processing and alt named socket processing', async function () {
return new Promise(async function (resolve, reject) {
tcpconsumer2.test = function (packet) {
try {
expect(packet.payload).to.equal('alt tcp processed tcp payload')
resolve()
}
catch(error) {
reject(error)
}
}
let [err] = await btc(tcpconsumer2.connect)()
if (err) reject(err)
let packet = {payload:'tcp payload'}
tcpconsumer2.send(packet)
}) //end promise
}) // end tcp socket 2 test
}) })

View File

@ -2,18 +2,15 @@ import { Socket } from '../src'
let socket = new Socket({name:'tcp socket'}) let socket = new Socket({name:'tcp socket'})
const app = { socket.processPacket = async function (packet) {
spp: 'sprocessPacket', packet.payload = 'tcp processed '+packet.payload
sprocessPacket: async function (packet) { return packet
packet.payload = 'tcp processed '+packet.payload
return packet
}
} }
; ;
(async () => { (async () => {
await socket.create(app) await socket.create()
})().catch(err => { })().catch(err => {
console.error('FATAL: UNABLE TO START SYSTEM!\n',err) console.error('FATAL: UNABLE TO START SYSTEM!\n',err)

20
test/tcpsocket2.mjs Normal file
View File

@ -0,0 +1,20 @@
import { Socket } from '../src'
let socket = new Socket({port:9080, name:'tcp socket 8081'})
// socket.spp = 'saltprocessPacket'
socket.pp ='sprocessPacket'
socket.sprocessPacket = async function (packet) {
packet.payload = 'alt tcp processed '+packet.payload
return packet
}
;
(async () => {
await socket.create()
})().catch(err => {
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)
})

View File

@ -4,18 +4,10 @@ const USOCKET = __dirname + '/sample.sock'
let socket = new Socket(USOCKET,{name:'unix socket'}) let socket = new Socket(USOCKET,{name:'unix socket'})
const app = {
spp: 'sprocessPacket',
sprocessPacket: async function (packet) {
packet.payload = 'processed '+packet.payload
return packet
}
}
; ;
(async () => { (async () => {
await socket.create(app) await socket.create()
})().catch(err => { })().catch(err => {
console.error('FATAL: UNABLE TO START SYSTEM!\n',err) console.error('FATAL: UNABLE TO START SYSTEM!\n',err)