diff --git a/package-lock.json b/package-lock.json index a1d0313..824a9df 100644 --- a/package-lock.json +++ b/package-lock.json @@ -730,11 +730,6 @@ "integrity": "sha1-3zrhmayt+31ECqrgsp4icrJOxhk=", "dev": true }, - "delimiter-stream": { - "version": "1.0.1", - "resolved": "http://trantor:8082/delimiter-stream/-/delimiter-stream-1.0.1.tgz", - "integrity": "sha1-XyUUUJNQcjq5lE2shvzYJ39TFBg=" - }, "diff": { "version": "3.3.1", "resolved": "http://trantor:8082/diff/-/diff-3.3.1.tgz", @@ -2899,7 +2894,8 @@ "lodash": { "version": "4.17.4", "resolved": "http://trantor:8082/lodash/-/lodash-4.17.4.tgz", - "integrity": "sha1-eCA6TRwyiuHYbcpkYONptX9AVa4=" + "integrity": "sha1-eCA6TRwyiuHYbcpkYONptX9AVa4=", + "dev": true }, "longest": { "version": "1.0.1", @@ -3044,7 +3040,8 @@ "moment": { "version": "2.20.1", "resolved": "http://trantor:8082/moment/-/moment-2.20.1.tgz", - "integrity": "sha512-Yh9y73JRljxW5QxN08Fner68eFLxM5ynNOAw2LbIB1YAGeQzZT8QFSUvkAz609Zf+IHhhaUxqZK8dG3W/+HEvg==" + "integrity": "sha512-Yh9y73JRljxW5QxN08Fner68eFLxM5ynNOAw2LbIB1YAGeQzZT8QFSUvkAz609Zf+IHhhaUxqZK8dG3W/+HEvg==", + "optional": true }, "ms": { "version": "2.0.0", @@ -3728,15 +3725,6 @@ "integrity": "sha1-tf3AjxKH6hF4Yo5BXiUTK3NkbG0=", "dev": true }, - "simple-node-logger": { - "version": "0.93.33", - "resolved": "http://trantor:8082/simple-node-logger/-/simple-node-logger-0.93.33.tgz", - "integrity": "sha512-ppFuaDeacR1Vu+cP17kwOWQsx5C1vbIRa54qm5WgZBzQ5eBue/GWsDd4sr++ITnWZIoIOvjx5kEm5AhP7IqU+Q==", - "requires": { - "lodash": "4.17.4", - "moment": "2.20.1" - } - }, "slice-ansi": { "version": "0.0.4", "resolved": "http://trantor:8082/slice-ansi/-/slice-ansi-0.0.4.tgz", diff --git a/package.json b/package.json index 0bd4b25..7d56c28 100644 --- a/package.json +++ b/package.json @@ -42,8 +42,6 @@ "dependencies": { "better-try-catch": "^0.6.2", "bunyan": "^1.8.12", - "death": "^1.1.0", - "delimiter-stream": "^1.0.1", - "simple-node-logger": "^0.93.33" + "death": "^1.1.0" } } diff --git a/readme.md b/readme.md index 98b2ce9..3db9d26 100644 --- a/readme.md +++ b/readme.md @@ -1 +1,4 @@ -## Various Communication Protocol/Transport Testing +## Extensons of Nodejs net 'Socket` and `Server` classes + +Sockets now pass JSON objects (message packets) +Allows extension or passing of custom packet processors (e.g. MQTT) at each end of the socket. diff --git a/src/consumer.mjs b/src/consumer.mjs index a2d965b..aff6d6c 100644 --- a/src/consumer.mjs +++ b/src/consumer.mjs @@ -1,7 +1,8 @@ import { Socket } from 'net' import btc from 'better-try-catch' import bunyan from 'bunyan' -import Stream from 'delimiter-stream' +// import Stream from 'delimiter-stream' +import JsonStream from './json' export default class Consumer extends Socket { constructor (path, opts={}) { @@ -15,24 +16,40 @@ export default class Consumer extends Socket { this._pp = opts.packetProcessor || 'processPacket' this.keepAlive = opts.keepAlive ? opts.keepAlive : true this._ready = false - this.timeout = opts.timeout || 1000 - this.wait = opts.wait || 20 + this.timeout = opts.timeout || 500 + this.wait = opts.wait || 5 // logging this.log_file=opts.log_file || './socket.log' this.log_opts = {streams:[]} - this.log_opts.name = opts.name ? opts.name : 'uci-unix-socket-consumer' - this.log_opts.streams.push({level: 'info',path: this.log_file }) + this.log_opts.name = opts.name ? opts.name : 'uci-socket-consumer' + // this.log_opts.streams.push({level: 'info',path: this.log_file }) if (opts.log) this.log_opts.streams.push({level: 'info',stream: process.stdout}) this.log = bunyan.createLogger(this.log_opts) // bind to class for other class functions this.connect = this.connect.bind(this) + this.ready = this.ready.bind(this) } ready() {return this._ready} - async connect (app) { - if (app) Object.assign(this, app) - this.listen() + async connect (app={}) { + + this.stream = new JsonStream() + + // first set the packet process + this._pp = app.pp || this._pp + this.pp = this.pp || this._pp + if (Object.keys(app).length === 0) app = this + // set a default processor if none provided + if (!app[this.pp]) { + this.pp = 'processPacket' // reset in case alt function is missing + app.processPacket = async (packet) => { + console.log('packet from socket') + console.dir(packet) + return packet } + } + + this.listen(app) this.log.info('listening') return new Promise( (resolve,reject) => { @@ -47,73 +64,34 @@ export default class Consumer extends Socket { super.connect({ port:this.port, host:this.host, path: this.path }, async () => { this.log.info({path:this.path},'connecting') this.setKeepAlive(this.keepAlive) - let [err, res] = await btc(isReady).bind(this)(this.ready.bind(this), this.wait, this.timeout) + let [err, res] = await btc(isReady).bind(this)(this.ready, this.wait, this.timeout) if (err) reject(err) this.log.info('handshake done, connected') resolve(res) }) - // catch (err){ - // console.log('===============',err) - // resolve('ready') - // } - // if (err) { - // console.log('===============',err) - // if (err.code === 'EISCONN') resolve('ready') - // else reject(err) - // } - }) //end promise } async send(packet) { - let [err, strbuf] = btc(JSON.stringify)(packet) - if (!err) { - this.log.info({packet:packet},'attempting to send packet to socket') - strbuf += '\n' - - return new Promise((resolve, reject) => { - super.write(strbuf, (err) => { - if (err) reject(err) - else resolve('complete') - }) - }) - } - else { this.log.info({packet:packet}, 'bad packet JSON syntax')} + await this.write(this.stream.serialize(packet)) + // throw new Error('Cannot send connection not ready') } - async listen () { + async listen (app) { - let packet = new Stream() + this.on('data', this.stream.onData) - this.on('data', (chunk) => { - packet.write(chunk) - }) + this.stream.on('message', messageProcess.bind(this)) - packet.on('data', (strJSON) => { - let [err, packet] = btc(JSON.parse)(strJSON) - if (!err) { - if (packet.ready) { - this._ready = true - return } - - // set packet processing - this.pp = this.pp || this._pp - - // if no processor provided use this console logger one - if (!this[this.pp]) { - this.pp = 'processPacket' - this.processPacket = async (packet) => { - this.log.info({packet:packet},'process with default logger') - console.log('packet from socket') - console.dir(packet) - return packet } - } - this[this.pp].bind(this)(packet) // process the packet - } - else { this.log.info({strJSON: strJSON},'bad packet JSON syntax')} - }) + async function messageProcess (packet) { + // console.log('incoming packet from server', packet) + if (packet.ready) { + this._ready = true + return } + await app[app.pp].bind(app)(packet) + } } } // end class @@ -124,10 +102,10 @@ function isReady(ready, wait=30, timeout=1000) { let time = 0 return new Promise((resolve, reject) => { (function waitReady(){ - if (time > timeout) return reject('timeout trying to connect') + if (time > timeout) return reject(`timeout trying to connect after ${timeout}ms`) if (ready()) return resolve('ready') - log.info('waiting for 30ms for handshake') - time += 30 + log.info(`waiting ${wait}ms for handshake`) + time += wait setTimeout(waitReady, wait) })() }) diff --git a/src/json.mjs b/src/json.mjs new file mode 100644 index 0000000..46516c9 --- /dev/null +++ b/src/json.mjs @@ -0,0 +1,78 @@ +import {StringDecoder} from 'string_decoder' +import EventEmitter from 'events' + +const decoder = new StringDecoder() + +export default class JsonStream extends EventEmitter{ + constructor(opts={}){ + super() + this._contentLength = null + this._buffer = '' + this._delimeter = opts.delimiter || '#' + this.onData = this.onData.bind(this) + } + + + onData (data) { + data = decoder.write(data) + try { + this._handleData(data) + } catch (e) { + this.emit('error', { error: e }) + } + } + + serialize(message) { + var messageData = JSON.stringify(message) + var length = Buffer.byteLength(messageData, 'utf8') + var data = length + this._delimeter + messageData + return data + } + + _handleData (data) { + this._buffer += data + if (this._contentLength == null) { + var i = this._buffer.indexOf(this._delimeter) + //Check if the buffer has a this._opts.delimeter or "#", if not, the end of the buffer string might be in the middle of a content length string + if (i !== -1) { + var rawContentLength = this._buffer.substring(0, i) + this._contentLength = parseInt(rawContentLength) + if (isNaN(this._contentLength)) { + this._contentLength = null + this._buffer = '' + var err = new Error('Invalid content length supplied ('+rawContentLength+') in: '+this._buffer) + err.code = 'E_INVALID_CONTENT_LENGTH' + throw err + } + this._buffer = this._buffer.substring(i+1) + } + } + if (this._contentLength != null) { + var length = Buffer.byteLength(this._buffer, 'utf8') + if (length == this._contentLength) { + this._handleMessage(this._buffer) + } else if (length > this._contentLength) { + var message = this._buffer.substring(0, this._contentLength) + var rest = this._buffer.substring(this._contentLength) + this._handleMessage(message) + this.onData(rest) + } + } + } + + _handleMessage (data) { + this._contentLength = null + this._buffer = '' + var message + try { + message = JSON.parse(data) + } catch (e) { + var err = new Error('Could not parse JSON: '+e.message+'\nRequest data: '+data) + err.code = 'E_INVALID_JSON' + throw err + } + message = message || {} + this.emit('message', message) + } + +} diff --git a/src/socket.mjs b/src/socket.mjs index e444df4..b1e9b64 100644 --- a/src/socket.mjs +++ b/src/socket.mjs @@ -3,7 +3,8 @@ import { unlink as fileDelete } from 'fs' import btc from 'better-try-catch' import ON_DEATH from 'death' //this is intentionally ugly import bunyan from 'bunyan' -import Stream from 'delimiter-stream' +// import Stream from 'delimiter-stream' +import JsonStream from './json' export default class Socket extends Server { constructor (path, opts={}) { @@ -17,15 +18,27 @@ export default class Socket extends Server { // logging this.log_file=opts.log_file || './socket.log' this.log_opts = {streams:[]} - this.log_opts.name = opts.name ? opts.name : 'uci-unix-socket-consumer' - this.log_opts.streams.push({level: 'info',path: this.log_file }) + this.log_opts.name = opts.name ? opts.name : 'uci-socket' + // this.log_opts.streams.push({level: 'info',path: this.log_file }) if (opts.log) this.log_opts.streams.push({level: 'info',stream: process.stdout}) this.log = bunyan.createLogger(this.log_opts) + //binding } // end constructor + async create (app={}) { - async create () { +// first set the packet process + this._pp = app.pp || this._pp + this.pp = this.pp || this._pp + if (Object.keys(app).length === 0) app = this +// set a default processor if none provided + if (!app[this.pp]) { + this.pp = 'processPacket' // reset in case alt function is missing + app.processPacket = async (packet) => { + packet.res='echoed' + return packet } + } return new Promise( async (resolve,reject) => { @@ -47,7 +60,7 @@ export default class Socket extends Server { if (path) { // if TCP socket should already be dead this.log.info({socket: path}, 'already exists...deleting') await fileDelete(path) - return await this.listen.bind(this)(this.listen_opts) + return await this.listen.bind(this)(this.listen_opts,app) } } // otherwise fatally exit @@ -55,15 +68,14 @@ export default class Socket extends Server { reject(err) }) - let [err, res] = await btc(this.listen.bind(this))(this.listen_opts) + let [err, res] = await btc(this.listen.bind(this))(this.listen_opts,app) if (err) reject(err) resolve(res) }) // end promise - } // end create - async listen (opts) { + async listen (opts,app) { super.listen(opts, async (err, res) => { @@ -71,45 +83,19 @@ export default class Socket extends Server { // this gets called for each client connection and is unique to each this.on('connection', (socket) => { - this.log.info('server: new consumer connecting') - let packet = new Stream() + const stream = new JsonStream() - socket.on('data', async (chunk) => { - packet.write(chunk) + this.log.info('new consumer connecting sending handshake') + + socket.write(stream.serialize({ready:true})) + + socket.on('data', stream.onData) + + stream.on('message', async function (packet) { + // console.log('incoming packet from consumer',packet) + socket.write(stream.serialize(await app[app.pp].bind(app)(packet))) }) - // when a complete JSON packet arrives process the packet - packet.on('data', async (strJSON) => { - - let [err, packet] = btc(JSON.parse)(strJSON) - if (!err) { - this.log.info({packet:packet},'Server: packet received to socket') - - // set default packet processing - - this.pp = this.pp || this._pp - // console.log('==========',app.spp,'====',this.spp) - - if (!this[this.pp]) { - this.processPacket = async (packet) => { - packet.res='echoed' - this.log.info({packet:packet},'packet being sent to consumer') - return packet } - } - socket.write(JSON.stringify(await this[this.pp].bind(this)(packet))+'\n' ) - } - else { - this.log.info(`bad packet JSON syntax \n ${strJSON}`) - let error = { - error: 'bad packet JSON syntax sent', - packet: strJSON - } - socket.write(JSON.stringify(error)) - } - - }) // end incoming string stream listerner - this.log.info('Server: sending handshake to consumer') - socket.write('{"ready":true}\n') }) // end connected consumer this.log.info({socket: this.listen_opts},'socket created') @@ -118,6 +104,7 @@ export default class Socket extends Server { } + async destroy () { this.log.info('closing down socket') diff --git a/test/socket.test.mjs b/test/socket.test.mjs index 670cc39..93550a8 100644 --- a/test/socket.test.mjs +++ b/test/socket.test.mjs @@ -61,7 +61,6 @@ describe('Connects and Processes a payload in a JSON packet', function(){ if (this.times<10) return try { - // expect(packet.payload).to.equal('unix payload') expect(packet.payload).to.equal('unix payload') resolve() } @@ -92,11 +91,11 @@ describe('Connects and Processes a payload in a JSON packet', function(){ uconsumer.processPacket = function (packet) { this.times++ + // console.log(this.times,packet.payload) if (this.times<10) return try { - // expect(packet.payload).to.equal('unix payload') - expect(packet.payload).to.equal('unix payload') + expect(packet.payload).to.equal('consumer 1 unix payload') resolve() } catch(error) { @@ -105,16 +104,17 @@ describe('Connects and Processes a payload in a JSON packet', function(){ } uconsumer2.processPacket = function (packet) { + // console.log('processor2', packet.payload) return packet } - let [err] = await btc(uconsumer2.connect)() - console.log('connect error', err) - let packet = {payload:'unix payload'} + if (err) reject(err) + let packet1 = {payload:'consumer 1 unix payload'} + let packet2 = {payload:'consumer2 unix payload'} for (var i = 0; i < 11; i++) { - uconsumer.send(packet) - uconsumer2.send(packet) + uconsumer.send(packet1) + uconsumer2.send(packet2) } }) //end promise @@ -169,4 +169,4 @@ describe('Connects and Processes a payload in a JSON packet', function(){ }) // end tcp socket 2 test -}) +}) // end describe