replaced stream processing with more robust processing using buffer length using new JsonStream class
parent
9d6a562cda
commit
1fe0d71e0e
|
@ -730,11 +730,6 @@
|
||||||
"integrity": "sha1-3zrhmayt+31ECqrgsp4icrJOxhk=",
|
"integrity": "sha1-3zrhmayt+31ECqrgsp4icrJOxhk=",
|
||||||
"dev": true
|
"dev": true
|
||||||
},
|
},
|
||||||
"delimiter-stream": {
|
|
||||||
"version": "1.0.1",
|
|
||||||
"resolved": "http://trantor:8082/delimiter-stream/-/delimiter-stream-1.0.1.tgz",
|
|
||||||
"integrity": "sha1-XyUUUJNQcjq5lE2shvzYJ39TFBg="
|
|
||||||
},
|
|
||||||
"diff": {
|
"diff": {
|
||||||
"version": "3.3.1",
|
"version": "3.3.1",
|
||||||
"resolved": "http://trantor:8082/diff/-/diff-3.3.1.tgz",
|
"resolved": "http://trantor:8082/diff/-/diff-3.3.1.tgz",
|
||||||
|
@ -2899,7 +2894,8 @@
|
||||||
"lodash": {
|
"lodash": {
|
||||||
"version": "4.17.4",
|
"version": "4.17.4",
|
||||||
"resolved": "http://trantor:8082/lodash/-/lodash-4.17.4.tgz",
|
"resolved": "http://trantor:8082/lodash/-/lodash-4.17.4.tgz",
|
||||||
"integrity": "sha1-eCA6TRwyiuHYbcpkYONptX9AVa4="
|
"integrity": "sha1-eCA6TRwyiuHYbcpkYONptX9AVa4=",
|
||||||
|
"dev": true
|
||||||
},
|
},
|
||||||
"longest": {
|
"longest": {
|
||||||
"version": "1.0.1",
|
"version": "1.0.1",
|
||||||
|
@ -3044,7 +3040,8 @@
|
||||||
"moment": {
|
"moment": {
|
||||||
"version": "2.20.1",
|
"version": "2.20.1",
|
||||||
"resolved": "http://trantor:8082/moment/-/moment-2.20.1.tgz",
|
"resolved": "http://trantor:8082/moment/-/moment-2.20.1.tgz",
|
||||||
"integrity": "sha512-Yh9y73JRljxW5QxN08Fner68eFLxM5ynNOAw2LbIB1YAGeQzZT8QFSUvkAz609Zf+IHhhaUxqZK8dG3W/+HEvg=="
|
"integrity": "sha512-Yh9y73JRljxW5QxN08Fner68eFLxM5ynNOAw2LbIB1YAGeQzZT8QFSUvkAz609Zf+IHhhaUxqZK8dG3W/+HEvg==",
|
||||||
|
"optional": true
|
||||||
},
|
},
|
||||||
"ms": {
|
"ms": {
|
||||||
"version": "2.0.0",
|
"version": "2.0.0",
|
||||||
|
@ -3728,15 +3725,6 @@
|
||||||
"integrity": "sha1-tf3AjxKH6hF4Yo5BXiUTK3NkbG0=",
|
"integrity": "sha1-tf3AjxKH6hF4Yo5BXiUTK3NkbG0=",
|
||||||
"dev": true
|
"dev": true
|
||||||
},
|
},
|
||||||
"simple-node-logger": {
|
|
||||||
"version": "0.93.33",
|
|
||||||
"resolved": "http://trantor:8082/simple-node-logger/-/simple-node-logger-0.93.33.tgz",
|
|
||||||
"integrity": "sha512-ppFuaDeacR1Vu+cP17kwOWQsx5C1vbIRa54qm5WgZBzQ5eBue/GWsDd4sr++ITnWZIoIOvjx5kEm5AhP7IqU+Q==",
|
|
||||||
"requires": {
|
|
||||||
"lodash": "4.17.4",
|
|
||||||
"moment": "2.20.1"
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"slice-ansi": {
|
"slice-ansi": {
|
||||||
"version": "0.0.4",
|
"version": "0.0.4",
|
||||||
"resolved": "http://trantor:8082/slice-ansi/-/slice-ansi-0.0.4.tgz",
|
"resolved": "http://trantor:8082/slice-ansi/-/slice-ansi-0.0.4.tgz",
|
||||||
|
|
|
@ -42,8 +42,6 @@
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"better-try-catch": "^0.6.2",
|
"better-try-catch": "^0.6.2",
|
||||||
"bunyan": "^1.8.12",
|
"bunyan": "^1.8.12",
|
||||||
"death": "^1.1.0",
|
"death": "^1.1.0"
|
||||||
"delimiter-stream": "^1.0.1",
|
|
||||||
"simple-node-logger": "^0.93.33"
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1 +1,4 @@
|
||||||
## Various Communication Protocol/Transport Testing
|
## Extensons of Nodejs net 'Socket` and `Server` classes
|
||||||
|
|
||||||
|
Sockets now pass JSON objects (message packets)
|
||||||
|
Allows extension or passing of custom packet processors (e.g. MQTT) at each end of the socket.
|
||||||
|
|
|
@ -1,7 +1,8 @@
|
||||||
import { Socket } from 'net'
|
import { Socket } from 'net'
|
||||||
import btc from 'better-try-catch'
|
import btc from 'better-try-catch'
|
||||||
import bunyan from 'bunyan'
|
import bunyan from 'bunyan'
|
||||||
import Stream from 'delimiter-stream'
|
// import Stream from 'delimiter-stream'
|
||||||
|
import JsonStream from './json'
|
||||||
|
|
||||||
export default class Consumer extends Socket {
|
export default class Consumer extends Socket {
|
||||||
constructor (path, opts={}) {
|
constructor (path, opts={}) {
|
||||||
|
@ -15,24 +16,40 @@ export default class Consumer extends Socket {
|
||||||
this._pp = opts.packetProcessor || 'processPacket'
|
this._pp = opts.packetProcessor || 'processPacket'
|
||||||
this.keepAlive = opts.keepAlive ? opts.keepAlive : true
|
this.keepAlive = opts.keepAlive ? opts.keepAlive : true
|
||||||
this._ready = false
|
this._ready = false
|
||||||
this.timeout = opts.timeout || 1000
|
this.timeout = opts.timeout || 500
|
||||||
this.wait = opts.wait || 20
|
this.wait = opts.wait || 5
|
||||||
// logging
|
// logging
|
||||||
this.log_file=opts.log_file || './socket.log'
|
this.log_file=opts.log_file || './socket.log'
|
||||||
this.log_opts = {streams:[]}
|
this.log_opts = {streams:[]}
|
||||||
this.log_opts.name = opts.name ? opts.name : 'uci-unix-socket-consumer'
|
this.log_opts.name = opts.name ? opts.name : 'uci-socket-consumer'
|
||||||
this.log_opts.streams.push({level: 'info',path: this.log_file })
|
// this.log_opts.streams.push({level: 'info',path: this.log_file })
|
||||||
if (opts.log) this.log_opts.streams.push({level: 'info',stream: process.stdout})
|
if (opts.log) this.log_opts.streams.push({level: 'info',stream: process.stdout})
|
||||||
this.log = bunyan.createLogger(this.log_opts)
|
this.log = bunyan.createLogger(this.log_opts)
|
||||||
// bind to class for other class functions
|
// bind to class for other class functions
|
||||||
this.connect = this.connect.bind(this)
|
this.connect = this.connect.bind(this)
|
||||||
|
this.ready = this.ready.bind(this)
|
||||||
}
|
}
|
||||||
|
|
||||||
ready() {return this._ready}
|
ready() {return this._ready}
|
||||||
|
|
||||||
async connect (app) {
|
async connect (app={}) {
|
||||||
if (app) Object.assign(this, app)
|
|
||||||
this.listen()
|
this.stream = new JsonStream()
|
||||||
|
|
||||||
|
// first set the packet process
|
||||||
|
this._pp = app.pp || this._pp
|
||||||
|
this.pp = this.pp || this._pp
|
||||||
|
if (Object.keys(app).length === 0) app = this
|
||||||
|
// set a default processor if none provided
|
||||||
|
if (!app[this.pp]) {
|
||||||
|
this.pp = 'processPacket' // reset in case alt function is missing
|
||||||
|
app.processPacket = async (packet) => {
|
||||||
|
console.log('packet from socket')
|
||||||
|
console.dir(packet)
|
||||||
|
return packet }
|
||||||
|
}
|
||||||
|
|
||||||
|
this.listen(app)
|
||||||
this.log.info('listening')
|
this.log.info('listening')
|
||||||
|
|
||||||
return new Promise( (resolve,reject) => {
|
return new Promise( (resolve,reject) => {
|
||||||
|
@ -47,73 +64,34 @@ export default class Consumer extends Socket {
|
||||||
super.connect({ port:this.port, host:this.host, path: this.path }, async () => {
|
super.connect({ port:this.port, host:this.host, path: this.path }, async () => {
|
||||||
this.log.info({path:this.path},'connecting')
|
this.log.info({path:this.path},'connecting')
|
||||||
this.setKeepAlive(this.keepAlive)
|
this.setKeepAlive(this.keepAlive)
|
||||||
let [err, res] = await btc(isReady).bind(this)(this.ready.bind(this), this.wait, this.timeout)
|
let [err, res] = await btc(isReady).bind(this)(this.ready, this.wait, this.timeout)
|
||||||
if (err) reject(err)
|
if (err) reject(err)
|
||||||
this.log.info('handshake done, connected')
|
this.log.info('handshake done, connected')
|
||||||
resolve(res)
|
resolve(res)
|
||||||
})
|
})
|
||||||
// catch (err){
|
|
||||||
// console.log('===============',err)
|
|
||||||
// resolve('ready')
|
|
||||||
// }
|
|
||||||
// if (err) {
|
|
||||||
// console.log('===============',err)
|
|
||||||
// if (err.code === 'EISCONN') resolve('ready')
|
|
||||||
// else reject(err)
|
|
||||||
// }
|
|
||||||
|
|
||||||
|
|
||||||
}) //end promise
|
}) //end promise
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async send(packet) {
|
async send(packet) {
|
||||||
let [err, strbuf] = btc(JSON.stringify)(packet)
|
await this.write(this.stream.serialize(packet))
|
||||||
if (!err) {
|
// throw new Error('Cannot send connection not ready')
|
||||||
this.log.info({packet:packet},'attempting to send packet to socket')
|
|
||||||
strbuf += '\n'
|
|
||||||
|
|
||||||
return new Promise((resolve, reject) => {
|
|
||||||
super.write(strbuf, (err) => {
|
|
||||||
if (err) reject(err)
|
|
||||||
else resolve('complete')
|
|
||||||
})
|
|
||||||
})
|
|
||||||
}
|
|
||||||
else { this.log.info({packet:packet}, 'bad packet JSON syntax')}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async listen () {
|
async listen (app) {
|
||||||
|
|
||||||
let packet = new Stream()
|
this.on('data', this.stream.onData)
|
||||||
|
|
||||||
this.on('data', (chunk) => {
|
this.stream.on('message', messageProcess.bind(this))
|
||||||
packet.write(chunk)
|
|
||||||
})
|
|
||||||
|
|
||||||
packet.on('data', (strJSON) => {
|
async function messageProcess (packet) {
|
||||||
let [err, packet] = btc(JSON.parse)(strJSON)
|
// console.log('incoming packet from server', packet)
|
||||||
if (!err) {
|
|
||||||
if (packet.ready) {
|
if (packet.ready) {
|
||||||
this._ready = true
|
this._ready = true
|
||||||
return }
|
return }
|
||||||
|
await app[app.pp].bind(app)(packet)
|
||||||
// set packet processing
|
|
||||||
this.pp = this.pp || this._pp
|
|
||||||
|
|
||||||
// if no processor provided use this console logger one
|
|
||||||
if (!this[this.pp]) {
|
|
||||||
this.pp = 'processPacket'
|
|
||||||
this.processPacket = async (packet) => {
|
|
||||||
this.log.info({packet:packet},'process with default logger')
|
|
||||||
console.log('packet from socket')
|
|
||||||
console.dir(packet)
|
|
||||||
return packet }
|
|
||||||
}
|
}
|
||||||
this[this.pp].bind(this)(packet) // process the packet
|
|
||||||
}
|
|
||||||
else { this.log.info({strJSON: strJSON},'bad packet JSON syntax')}
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
} // end class
|
} // end class
|
||||||
|
@ -124,10 +102,10 @@ function isReady(ready, wait=30, timeout=1000) {
|
||||||
let time = 0
|
let time = 0
|
||||||
return new Promise((resolve, reject) => {
|
return new Promise((resolve, reject) => {
|
||||||
(function waitReady(){
|
(function waitReady(){
|
||||||
if (time > timeout) return reject('timeout trying to connect')
|
if (time > timeout) return reject(`timeout trying to connect after ${timeout}ms`)
|
||||||
if (ready()) return resolve('ready')
|
if (ready()) return resolve('ready')
|
||||||
log.info('waiting for 30ms for handshake')
|
log.info(`waiting ${wait}ms for handshake`)
|
||||||
time += 30
|
time += wait
|
||||||
setTimeout(waitReady, wait)
|
setTimeout(waitReady, wait)
|
||||||
})()
|
})()
|
||||||
})
|
})
|
||||||
|
|
|
@ -0,0 +1,78 @@
|
||||||
|
import {StringDecoder} from 'string_decoder'
|
||||||
|
import EventEmitter from 'events'
|
||||||
|
|
||||||
|
const decoder = new StringDecoder()
|
||||||
|
|
||||||
|
export default class JsonStream extends EventEmitter{
|
||||||
|
constructor(opts={}){
|
||||||
|
super()
|
||||||
|
this._contentLength = null
|
||||||
|
this._buffer = ''
|
||||||
|
this._delimeter = opts.delimiter || '#'
|
||||||
|
this.onData = this.onData.bind(this)
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
onData (data) {
|
||||||
|
data = decoder.write(data)
|
||||||
|
try {
|
||||||
|
this._handleData(data)
|
||||||
|
} catch (e) {
|
||||||
|
this.emit('error', { error: e })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
serialize(message) {
|
||||||
|
var messageData = JSON.stringify(message)
|
||||||
|
var length = Buffer.byteLength(messageData, 'utf8')
|
||||||
|
var data = length + this._delimeter + messageData
|
||||||
|
return data
|
||||||
|
}
|
||||||
|
|
||||||
|
_handleData (data) {
|
||||||
|
this._buffer += data
|
||||||
|
if (this._contentLength == null) {
|
||||||
|
var i = this._buffer.indexOf(this._delimeter)
|
||||||
|
//Check if the buffer has a this._opts.delimeter or "#", if not, the end of the buffer string might be in the middle of a content length string
|
||||||
|
if (i !== -1) {
|
||||||
|
var rawContentLength = this._buffer.substring(0, i)
|
||||||
|
this._contentLength = parseInt(rawContentLength)
|
||||||
|
if (isNaN(this._contentLength)) {
|
||||||
|
this._contentLength = null
|
||||||
|
this._buffer = ''
|
||||||
|
var err = new Error('Invalid content length supplied ('+rawContentLength+') in: '+this._buffer)
|
||||||
|
err.code = 'E_INVALID_CONTENT_LENGTH'
|
||||||
|
throw err
|
||||||
|
}
|
||||||
|
this._buffer = this._buffer.substring(i+1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (this._contentLength != null) {
|
||||||
|
var length = Buffer.byteLength(this._buffer, 'utf8')
|
||||||
|
if (length == this._contentLength) {
|
||||||
|
this._handleMessage(this._buffer)
|
||||||
|
} else if (length > this._contentLength) {
|
||||||
|
var message = this._buffer.substring(0, this._contentLength)
|
||||||
|
var rest = this._buffer.substring(this._contentLength)
|
||||||
|
this._handleMessage(message)
|
||||||
|
this.onData(rest)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_handleMessage (data) {
|
||||||
|
this._contentLength = null
|
||||||
|
this._buffer = ''
|
||||||
|
var message
|
||||||
|
try {
|
||||||
|
message = JSON.parse(data)
|
||||||
|
} catch (e) {
|
||||||
|
var err = new Error('Could not parse JSON: '+e.message+'\nRequest data: '+data)
|
||||||
|
err.code = 'E_INVALID_JSON'
|
||||||
|
throw err
|
||||||
|
}
|
||||||
|
message = message || {}
|
||||||
|
this.emit('message', message)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -3,7 +3,8 @@ import { unlink as fileDelete } from 'fs'
|
||||||
import btc from 'better-try-catch'
|
import btc from 'better-try-catch'
|
||||||
import ON_DEATH from 'death' //this is intentionally ugly
|
import ON_DEATH from 'death' //this is intentionally ugly
|
||||||
import bunyan from 'bunyan'
|
import bunyan from 'bunyan'
|
||||||
import Stream from 'delimiter-stream'
|
// import Stream from 'delimiter-stream'
|
||||||
|
import JsonStream from './json'
|
||||||
|
|
||||||
export default class Socket extends Server {
|
export default class Socket extends Server {
|
||||||
constructor (path, opts={}) {
|
constructor (path, opts={}) {
|
||||||
|
@ -17,15 +18,27 @@ export default class Socket extends Server {
|
||||||
// logging
|
// logging
|
||||||
this.log_file=opts.log_file || './socket.log'
|
this.log_file=opts.log_file || './socket.log'
|
||||||
this.log_opts = {streams:[]}
|
this.log_opts = {streams:[]}
|
||||||
this.log_opts.name = opts.name ? opts.name : 'uci-unix-socket-consumer'
|
this.log_opts.name = opts.name ? opts.name : 'uci-socket'
|
||||||
this.log_opts.streams.push({level: 'info',path: this.log_file })
|
// this.log_opts.streams.push({level: 'info',path: this.log_file })
|
||||||
if (opts.log) this.log_opts.streams.push({level: 'info',stream: process.stdout})
|
if (opts.log) this.log_opts.streams.push({level: 'info',stream: process.stdout})
|
||||||
this.log = bunyan.createLogger(this.log_opts)
|
this.log = bunyan.createLogger(this.log_opts)
|
||||||
|
//binding
|
||||||
|
|
||||||
} // end constructor
|
} // end constructor
|
||||||
|
|
||||||
|
async create (app={}) {
|
||||||
|
|
||||||
async create () {
|
// first set the packet process
|
||||||
|
this._pp = app.pp || this._pp
|
||||||
|
this.pp = this.pp || this._pp
|
||||||
|
if (Object.keys(app).length === 0) app = this
|
||||||
|
// set a default processor if none provided
|
||||||
|
if (!app[this.pp]) {
|
||||||
|
this.pp = 'processPacket' // reset in case alt function is missing
|
||||||
|
app.processPacket = async (packet) => {
|
||||||
|
packet.res='echoed'
|
||||||
|
return packet }
|
||||||
|
}
|
||||||
|
|
||||||
return new Promise( async (resolve,reject) => {
|
return new Promise( async (resolve,reject) => {
|
||||||
|
|
||||||
|
@ -47,7 +60,7 @@ export default class Socket extends Server {
|
||||||
if (path) { // if TCP socket should already be dead
|
if (path) { // if TCP socket should already be dead
|
||||||
this.log.info({socket: path}, 'already exists...deleting')
|
this.log.info({socket: path}, 'already exists...deleting')
|
||||||
await fileDelete(path)
|
await fileDelete(path)
|
||||||
return await this.listen.bind(this)(this.listen_opts)
|
return await this.listen.bind(this)(this.listen_opts,app)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// otherwise fatally exit
|
// otherwise fatally exit
|
||||||
|
@ -55,15 +68,14 @@ export default class Socket extends Server {
|
||||||
reject(err)
|
reject(err)
|
||||||
})
|
})
|
||||||
|
|
||||||
let [err, res] = await btc(this.listen.bind(this))(this.listen_opts)
|
let [err, res] = await btc(this.listen.bind(this))(this.listen_opts,app)
|
||||||
if (err) reject(err)
|
if (err) reject(err)
|
||||||
resolve(res)
|
resolve(res)
|
||||||
|
|
||||||
}) // end promise
|
}) // end promise
|
||||||
|
|
||||||
} // end create
|
} // end create
|
||||||
|
|
||||||
async listen (opts) {
|
async listen (opts,app) {
|
||||||
|
|
||||||
super.listen(opts, async (err, res) => {
|
super.listen(opts, async (err, res) => {
|
||||||
|
|
||||||
|
@ -71,45 +83,19 @@ export default class Socket extends Server {
|
||||||
// this gets called for each client connection and is unique to each
|
// this gets called for each client connection and is unique to each
|
||||||
this.on('connection', (socket) => {
|
this.on('connection', (socket) => {
|
||||||
|
|
||||||
this.log.info('server: new consumer connecting')
|
const stream = new JsonStream()
|
||||||
let packet = new Stream()
|
|
||||||
|
|
||||||
socket.on('data', async (chunk) => {
|
this.log.info('new consumer connecting sending handshake')
|
||||||
packet.write(chunk)
|
|
||||||
|
socket.write(stream.serialize({ready:true}))
|
||||||
|
|
||||||
|
socket.on('data', stream.onData)
|
||||||
|
|
||||||
|
stream.on('message', async function (packet) {
|
||||||
|
// console.log('incoming packet from consumer',packet)
|
||||||
|
socket.write(stream.serialize(await app[app.pp].bind(app)(packet)))
|
||||||
})
|
})
|
||||||
|
|
||||||
// when a complete JSON packet arrives process the packet
|
|
||||||
packet.on('data', async (strJSON) => {
|
|
||||||
|
|
||||||
let [err, packet] = btc(JSON.parse)(strJSON)
|
|
||||||
if (!err) {
|
|
||||||
this.log.info({packet:packet},'Server: packet received to socket')
|
|
||||||
|
|
||||||
// set default packet processing
|
|
||||||
|
|
||||||
this.pp = this.pp || this._pp
|
|
||||||
// console.log('==========',app.spp,'====',this.spp)
|
|
||||||
|
|
||||||
if (!this[this.pp]) {
|
|
||||||
this.processPacket = async (packet) => {
|
|
||||||
packet.res='echoed'
|
|
||||||
this.log.info({packet:packet},'packet being sent to consumer')
|
|
||||||
return packet }
|
|
||||||
}
|
|
||||||
socket.write(JSON.stringify(await this[this.pp].bind(this)(packet))+'\n' )
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
this.log.info(`bad packet JSON syntax \n ${strJSON}`)
|
|
||||||
let error = {
|
|
||||||
error: 'bad packet JSON syntax sent',
|
|
||||||
packet: strJSON
|
|
||||||
}
|
|
||||||
socket.write(JSON.stringify(error))
|
|
||||||
}
|
|
||||||
|
|
||||||
}) // end incoming string stream listerner
|
|
||||||
this.log.info('Server: sending handshake to consumer')
|
|
||||||
socket.write('{"ready":true}\n')
|
|
||||||
}) // end connected consumer
|
}) // end connected consumer
|
||||||
|
|
||||||
this.log.info({socket: this.listen_opts},'socket created')
|
this.log.info({socket: this.listen_opts},'socket created')
|
||||||
|
@ -118,6 +104,7 @@ export default class Socket extends Server {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
async destroy () {
|
async destroy () {
|
||||||
|
|
||||||
this.log.info('closing down socket')
|
this.log.info('closing down socket')
|
||||||
|
|
|
@ -61,7 +61,6 @@ describe('Connects and Processes a payload in a JSON packet', function(){
|
||||||
if (this.times<10) return
|
if (this.times<10) return
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// expect(packet.payload).to.equal('unix payload')
|
|
||||||
expect(packet.payload).to.equal('unix payload')
|
expect(packet.payload).to.equal('unix payload')
|
||||||
resolve()
|
resolve()
|
||||||
}
|
}
|
||||||
|
@ -92,11 +91,11 @@ describe('Connects and Processes a payload in a JSON packet', function(){
|
||||||
|
|
||||||
uconsumer.processPacket = function (packet) {
|
uconsumer.processPacket = function (packet) {
|
||||||
this.times++
|
this.times++
|
||||||
|
// console.log(this.times,packet.payload)
|
||||||
if (this.times<10) return
|
if (this.times<10) return
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// expect(packet.payload).to.equal('unix payload')
|
expect(packet.payload).to.equal('consumer 1 unix payload')
|
||||||
expect(packet.payload).to.equal('unix payload')
|
|
||||||
resolve()
|
resolve()
|
||||||
}
|
}
|
||||||
catch(error) {
|
catch(error) {
|
||||||
|
@ -105,16 +104,17 @@ describe('Connects and Processes a payload in a JSON packet', function(){
|
||||||
}
|
}
|
||||||
|
|
||||||
uconsumer2.processPacket = function (packet) {
|
uconsumer2.processPacket = function (packet) {
|
||||||
|
// console.log('processor2', packet.payload)
|
||||||
return packet
|
return packet
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
let [err] = await btc(uconsumer2.connect)()
|
let [err] = await btc(uconsumer2.connect)()
|
||||||
console.log('connect error', err)
|
if (err) reject(err)
|
||||||
let packet = {payload:'unix payload'}
|
let packet1 = {payload:'consumer 1 unix payload'}
|
||||||
|
let packet2 = {payload:'consumer2 unix payload'}
|
||||||
for (var i = 0; i < 11; i++) {
|
for (var i = 0; i < 11; i++) {
|
||||||
uconsumer.send(packet)
|
uconsumer.send(packet1)
|
||||||
uconsumer2.send(packet)
|
uconsumer2.send(packet2)
|
||||||
}
|
}
|
||||||
|
|
||||||
}) //end promise
|
}) //end promise
|
||||||
|
@ -169,4 +169,4 @@ describe('Connects and Processes a payload in a JSON packet', function(){
|
||||||
}) // end tcp socket 2 test
|
}) // end tcp socket 2 test
|
||||||
|
|
||||||
|
|
||||||
})
|
}) // end describe
|
||||||
|
|
Loading…
Reference in New Issue