From 57324c616c220a828218027121ccfd104725e62f Mon Sep 17 00:00:00 2001 From: David Kebler Date: Thu, 18 Jan 2018 21:21:06 -0800 Subject: [PATCH] added chunck processing into JSON packets using newline delimiter fixed left unix socket file bug improved some defaults handling added custom packet processing for tcp vs unix --- package-lock.json | 104 +++++++++++++++++++++----- package.json | 3 +- src/consumer.mjs | 36 +++++---- src/socket.mjs | 124 ++++++++++++++++++------------- test/.gitignore | 1 + test/socket.test.mjs | 2 +- test/tcpsocket.mjs | 20 +++++ test/{socket.mjs => usocket.mjs} | 8 +- 8 files changed, 206 insertions(+), 92 deletions(-) create mode 100644 test/tcpsocket.mjs rename test/{socket.mjs => usocket.mjs} (64%) diff --git a/package-lock.json b/package-lock.json index 84eac5e..a1d0313 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,5 +1,5 @@ { - "name": "@uci/unix-socket", + "name": "@uci/socket", "version": "0.1.0", "lockfileVersion": 1, "requires": true, @@ -273,8 +273,7 @@ "balanced-match": { "version": "1.0.0", "resolved": "http://trantor:8082/balanced-match/-/balanced-match-1.0.0.tgz", - "integrity": "sha1-ibTRmasr7kneFk6gK4nORi1xt2c=", - "dev": true + "integrity": "sha1-ibTRmasr7kneFk6gK4nORi1xt2c=" }, "bcrypt-pbkdf": { "version": "1.0.1", @@ -328,7 +327,6 @@ "version": "1.1.8", "resolved": "http://trantor:8082/brace-expansion/-/brace-expansion-1.1.8.tgz", "integrity": "sha1-wHshHHyVLsH479Uad+8NHTmQopI=", - "dev": true, "requires": { "balanced-match": "1.0.0", "concat-map": "0.0.1" @@ -351,6 +349,17 @@ "integrity": "sha1-81HTKWnTL6XXpVZxVCY9korjvR8=", "dev": true }, + "bunyan": { + "version": "1.8.12", + "resolved": "http://trantor:8082/bunyan/-/bunyan-1.8.12.tgz", + "integrity": "sha1-8VDw9nSKvdcq6uhPBEA74u8RN5c=", + "requires": { + "dtrace-provider": "0.8.6", + "moment": "2.20.1", + "mv": "2.1.1", + "safe-json-stringify": "1.0.4" + } + }, "caller-path": { "version": "0.1.0", "resolved": "http://trantor:8082/caller-path/-/caller-path-0.1.0.tgz", @@ -556,8 +565,7 @@ "concat-map": { "version": "0.0.1", "resolved": "http://trantor:8082/concat-map/-/concat-map-0.0.1.tgz", - "integrity": "sha1-2Klr13/Wjfd5OnMDajug1UBdR3s=", - "dev": true + "integrity": "sha1-2Klr13/Wjfd5OnMDajug1UBdR3s=" }, "concat-stream": { "version": "1.6.0", @@ -722,6 +730,11 @@ "integrity": "sha1-3zrhmayt+31ECqrgsp4icrJOxhk=", "dev": true }, + "delimiter-stream": { + "version": "1.0.1", + "resolved": "http://trantor:8082/delimiter-stream/-/delimiter-stream-1.0.1.tgz", + "integrity": "sha1-XyUUUJNQcjq5lE2shvzYJ39TFBg=" + }, "diff": { "version": "3.3.1", "resolved": "http://trantor:8082/diff/-/diff-3.3.1.tgz", @@ -746,6 +759,15 @@ "is-obj": "1.0.1" } }, + "dtrace-provider": { + "version": "0.8.6", + "resolved": "http://trantor:8082/dtrace-provider/-/dtrace-provider-0.8.6.tgz", + "integrity": "sha1-QooiOv4DQl0s1tY0f99AxmkDVj0=", + "optional": true, + "requires": { + "nan": "2.8.0" + } + }, "duplexer": { "version": "0.1.1", "resolved": "http://trantor:8082/duplexer/-/duplexer-0.1.1.tgz", @@ -1208,6 +1230,7 @@ "dev": true, "optional": true, "requires": { + "nan": "2.8.0", "node-pre-gyp": "0.6.39" }, "dependencies": { @@ -2374,7 +2397,6 @@ "version": "1.0.6", "resolved": "http://trantor:8082/inflight/-/inflight-1.0.6.tgz", "integrity": "sha1-Sb1jMdfQLQwJvJEKEHW6gWW1bfk=", - "dev": true, "requires": { "once": "1.4.0", "wrappy": "1.0.2" @@ -2383,8 +2405,7 @@ "inherits": { "version": "2.0.3", "resolved": "http://trantor:8082/inherits/-/inherits-2.0.3.tgz", - "integrity": "sha1-Yzwsg+PaQqUC9SRmAiSA9CCCYd4=", - "dev": true + "integrity": "sha1-Yzwsg+PaQqUC9SRmAiSA9CCCYd4=" }, "ini": { "version": "1.3.5", @@ -2957,7 +2978,6 @@ "version": "3.0.4", "resolved": "http://trantor:8082/minimatch/-/minimatch-3.0.4.tgz", "integrity": "sha512-yJHVQEhyqPLUTgt9B83PXu6W3rx4MvvHvSUvToogpwoGDOUQ+yDrR0HRot+yOCdCO7u4hX3pWft6kWBBcqh0UA==", - "dev": true, "requires": { "brace-expansion": "1.1.8" } @@ -2972,7 +2992,6 @@ "version": "0.5.1", "resolved": "http://trantor:8082/mkdirp/-/mkdirp-0.5.1.tgz", "integrity": "sha1-MAV0OOrGz3+MR2fzhkjWaX11yQM=", - "dev": true, "requires": { "minimist": "0.0.8" }, @@ -2980,8 +2999,7 @@ "minimist": { "version": "0.0.8", "resolved": "http://trantor:8082/minimist/-/minimist-0.0.8.tgz", - "integrity": "sha1-hX/Kv8M5fSYluCKCYuhqp6ARsF0=", - "dev": true + "integrity": "sha1-hX/Kv8M5fSYluCKCYuhqp6ARsF0=" } } }, @@ -3040,12 +3058,59 @@ "integrity": "sha1-j7+rsKmKJT0xhDMfno3rc3L6xsA=", "dev": true }, + "mv": { + "version": "2.1.1", + "resolved": "http://trantor:8082/mv/-/mv-2.1.1.tgz", + "integrity": "sha1-rmzg1vbV4KT32JN5jQPB6pVZtqI=", + "optional": true, + "requires": { + "mkdirp": "0.5.1", + "ncp": "2.0.0", + "rimraf": "2.4.5" + }, + "dependencies": { + "glob": { + "version": "6.0.4", + "resolved": "http://trantor:8082/glob/-/glob-6.0.4.tgz", + "integrity": "sha1-DwiGD2oVUSey+t1PnOJLGqtuTSI=", + "optional": true, + "requires": { + "inflight": "1.0.6", + "inherits": "2.0.3", + "minimatch": "3.0.4", + "once": "1.4.0", + "path-is-absolute": "1.0.1" + } + }, + "rimraf": { + "version": "2.4.5", + "resolved": "http://trantor:8082/rimraf/-/rimraf-2.4.5.tgz", + "integrity": "sha1-7nEM5dk6j9uFb7Xqj/Di11k0sto=", + "optional": true, + "requires": { + "glob": "6.0.4" + } + } + } + }, + "nan": { + "version": "2.8.0", + "resolved": "http://trantor:8082/nan/-/nan-2.8.0.tgz", + "integrity": "sha1-7XFfP+neArV6XmJS2QqWZ14fCFo=", + "optional": true + }, "natural-compare": { "version": "1.4.0", "resolved": "http://trantor:8082/natural-compare/-/natural-compare-1.4.0.tgz", "integrity": "sha1-Sr6/7tdUHywnrPspvbvRXI1bpPc=", "dev": true }, + "ncp": { + "version": "2.0.0", + "resolved": "http://trantor:8082/ncp/-/ncp-2.0.0.tgz", + "integrity": "sha1-GVoh1sRuNh0vsSgbo4uR6d9727M=", + "optional": true + }, "nodemon": { "version": "1.14.3", "resolved": "http://trantor:8082/nodemon/-/nodemon-1.14.3.tgz", @@ -3121,7 +3186,6 @@ "version": "1.4.0", "resolved": "http://trantor:8082/once/-/once-1.4.0.tgz", "integrity": "sha1-WDsap3WWHUsROsF9nFC6753Xa9E=", - "dev": true, "requires": { "wrappy": "1.0.2" } @@ -3209,8 +3273,7 @@ "path-is-absolute": { "version": "1.0.1", "resolved": "http://trantor:8082/path-is-absolute/-/path-is-absolute-1.0.1.tgz", - "integrity": "sha1-F0uSaHNVNP+8es5r9TpanhtcX18=", - "dev": true + "integrity": "sha1-F0uSaHNVNP+8es5r9TpanhtcX18=" }, "path-is-inside": { "version": "1.0.2", @@ -3606,6 +3669,12 @@ "integrity": "sha512-kKvNJn6Mm93gAczWVJg7wH+wGYWNrDHdWvpUmHyEsgCtIwwo3bqPtV4tR5tuPaUhTOo/kvhVwd8XwwOllGYkbg==", "dev": true }, + "safe-json-stringify": { + "version": "1.0.4", + "resolved": "http://trantor:8082/safe-json-stringify/-/safe-json-stringify-1.0.4.tgz", + "integrity": "sha1-gaCY9Efku8P/MxKiQ1IbwGDvWRE=", + "optional": true + }, "semver": { "version": "5.4.1", "resolved": "http://trantor:8082/semver/-/semver-5.4.1.tgz", @@ -4095,8 +4164,7 @@ "wrappy": { "version": "1.0.2", "resolved": "http://trantor:8082/wrappy/-/wrappy-1.0.2.tgz", - "integrity": "sha1-tSQ9jz7BqjXxNkYFvA0QNuMKtp8=", - "dev": true + "integrity": "sha1-tSQ9jz7BqjXxNkYFvA0QNuMKtp8=" }, "write": { "version": "0.2.1", diff --git a/package.json b/package.json index 110e5b8..52cf92b 100644 --- a/package.json +++ b/package.json @@ -1,5 +1,5 @@ { - "name": "@uci/unix-socket", + "name": "@uci/socket", "version": "0.1.0", "description": "Bare bones intra Host Unix Socket for basic IPC on same machine", "main": "src", @@ -42,6 +42,7 @@ "better-try-catch": "^0.6.2", "bunyan": "^1.8.12", "death": "^1.1.0", + "delimiter-stream": "^1.0.1", "simple-node-logger": "^0.93.33" } } diff --git a/src/consumer.mjs b/src/consumer.mjs index 09a3544..c256285 100644 --- a/src/consumer.mjs +++ b/src/consumer.mjs @@ -1,6 +1,7 @@ import { Socket } from 'net' import btc from 'better-try-catch' import bunyan from 'bunyan' +import Stream from 'delimiter-stream' export default class Consumer extends Socket { constructor (path, opts={}) { @@ -8,14 +9,10 @@ export default class Consumer extends Socket { // set or tcp socket if (typeof(path)!=='string') { opts = path - this.host = '127.0.0.1' + this.host = opts.host || '127.0.0.1' this.port = opts.port || 8080 - } else { - if (opts.tcp) { - this.host = path - this.port = opts.port || 8080 - } else this.path = path - } + } else this.path = path + this.keepAlive = opts.keepAlive ? opts.keepAlive : true this._ready = false this.timeout = opts.timeout || 1000 @@ -57,9 +54,10 @@ export default class Consumer extends Socket { let [err, strbuf] = btc(JSON.stringify)(packet) if (!err) { this.log.info({packet:packet},'attempting to send packet to socket') + strbuf += '\n' return new Promise((resolve, reject) => { - this.write(strbuf, (err) => { + super.write(strbuf, (err) => { if (err) reject(err) else resolve('complete') }) @@ -69,23 +67,31 @@ export default class Consumer extends Socket { } async listen (app={}) { - this.on('data', async (buf) => { - let [err, packet] = btc(JSON.parse)(buf.toString()) + + let packet = new Stream() + + this.on('data', async (chunk) => { + packet.write(chunk) + }) + + + packet.on('data', async (strJSON) => { + let [err, packet] = btc(JSON.parse)(strJSON) if (!err) { if (packet.ready) { this._ready = true return } // set default packet processing - simple print to console of packet - app.ucpp = app.ucpp || 'processPacket' - if (!app[app.ucpp]) { - app.ucpp = 'processPacket' + app.cpp = app.cpp || 'processPacket' + if (!app[app.cpp]) { + app.cpp = 'processPacket' app.processPacket = async (packet) => { this.log.info({packet:packet},'incoming packet from socket') return packet } } - await app[app.ucpp](packet) // process the packet + await app[app.cpp](packet) // process the packet } - else { this.log.info({buf: buf.toString()},'bad packet JSON syntax')} + else { this.log.info({strJSON: strJSON},'bad packet JSON syntax')} }) } diff --git a/src/socket.mjs b/src/socket.mjs index 4b12dee..88c9eef 100644 --- a/src/socket.mjs +++ b/src/socket.mjs @@ -3,6 +3,7 @@ import { unlink as fileDelete } from 'fs' import btc from 'better-try-catch' import ON_DEATH from 'death' //this is intentionally ugly import bunyan from 'bunyan' +import Stream from 'delimiter-stream' export default class Socket extends Server { constructor (path, opts={}) { @@ -10,12 +11,10 @@ export default class Socket extends Server { // set or tcp socket if (typeof(path)!=='string') { opts = path - this.listen_opts = { host: '127.0.0.1', port: opts.port || 8080} - } else { - if (opts.tcp) { - this.listen_opts = { host: path, port: opts.port || 8080} - } else this.listen_opts = { path: path } - } + this.listen_opts = { host: opts.host || '127.0.0.1', port: opts.port || 8080} + } else this.listen_opts = { path: path } + this.spp = opts.spp || 'processPacket' + // logging this.log_file=opts.log_file || './socket.log' this.log_opts = {streams:[]} this.log_opts.name = opts.name ? opts.name : 'uci-unix-socket-consumer' @@ -28,7 +27,7 @@ export default class Socket extends Server { async create (app={}) { - return new Promise( (resolve,reject) => { + return new Promise( async (resolve,reject) => { ON_DEATH( async () => { this.log.info('\nhe\'s dead jim') @@ -44,61 +43,82 @@ export default class Socket extends Server { this.on('error', async (err) => { // recover from socket file that was not removed if (err.code === 'EADDRINUSE') { - this.log.info({socket: this.path}, 'already exists...deleting') - await fileDelete(this.path) - return await this.listen(this.path, app) + let path = this.listen_opts.path + if (path) { // if TCP socket should already be dead + this.log.info({socket: path}, 'already exists...deleting') + await fileDelete(path) + return await this.listen.bind(this)(this.listen_opts, app) + } } // otherwise fatally exit this.log.info(err, 'creating socket') reject(err) }) - this.listen(this.listen_opts, async (err, res) => { + let [err, res] = await btc(this.listen.bind(this))(this.listen_opts,app) + if (err) reject(err) + resolve(res) - if (err) reject(err) - // this gets called for each client connection and is unique to each - this.on('connection', (socket) => { - this.log.info('server: new consumer connecting') - - socket.on('data', async (buf) => { - let [err, packet] = btc(JSON.parse)(buf.toString()) - if (!err) { - if (this.log) this.log.info(`data packet received to socket \n ${packet}`) - this.log.info({packet:packet},'Server: packet received to socket') - - // set default packet processing - app.uspp = app.uspp || 'processPacket' - if (!app[app.uspp]) { - app.uspp = 'processPacket' - app.processPacket = async (packet) => { - packet.res='echoed' - this.log.info({packet:packet},'packet being sent to consumer') - return packet } - } - - socket.write(JSON.stringify(await app[app.uspp].bind(app)(packet))) - } - else { - this.log.info(`bad packet JSON syntax \n ${buf.toString()}`) - let error = { - error: 'bad packet JSON syntax sent', - packet: buf.toString() - } - socket.write(JSON.stringify(error)) - } - - }) // end incoming data listerner - this.log.info('Server: sending handshake to consumer') - socket.write('{"ready":true}') - }) // end connected consumer - - this.log.info({socket: this.path},'socket created') - resolve(res) - }) - }) + }) // end promise } // end create + async listen (opts,app) { + + super.listen(opts, async (err, res) => { + + if (err) return err + // this gets called for each client connection and is unique to each + this.on('connection', (socket) => { + + this.log.info('server: new consumer connecting') + let packet = new Stream() + + socket.on('data', async (chunk) => { + packet.write(chunk) + }) + + // when a complete JSON packet arrives process the packet + packet.on('data', async (strJSON) => { + + let [err, packet] = btc(JSON.parse)(strJSON) + if (!err) { + this.log.info({packet:packet},'Server: packet received to socket') + + // set default packet processing + // console.log('==========',app.spp,'====',this.spp) + this.spp = app.spp || this.spp + // console.log('==========',app.spp,'====',this.spp) + + if (!app[this.spp]) { + // app.spp = 'processPacket' + app.processPacket = async (packet) => { + packet.res='echoed' + this.log.info({packet:packet},'packet being sent to consumer') + return packet } + } + socket.write(JSON.stringify(await app[this.spp].bind(app)(packet))+'\n' ) + } + else { + this.log.info(`bad packet JSON syntax \n ${strJSON}`) + let error = { + error: 'bad packet JSON syntax sent', + packet: strJSON + } + socket.write(JSON.stringify(error)) + } + + }) // end incoming string stream listerner + this.log.info('Server: sending handshake to consumer') + socket.write('{"ready":true}\n') + }) // end connected consumer + + this.log.info({socket: this.listen_opts},'socket created') + return res + }) // end listen callback + + } + async destroy () { diff --git a/test/.gitignore b/test/.gitignore index c74d682..b049ffa 100644 --- a/test/.gitignore +++ b/test/.gitignore @@ -1 +1,2 @@ *.sock +/node_modules/ diff --git a/test/socket.test.mjs b/test/socket.test.mjs index 668cc22..d7389da 100644 --- a/test/socket.test.mjs +++ b/test/socket.test.mjs @@ -10,7 +10,7 @@ import { Consumer } from '../src' const USOCKET = __dirname + '/sample.sock' let uconsumer = new Consumer(USOCKET, {name:'test-uconsumer'}) -let tcpconsumer = new Consumer('localhost',{port: 8081, tcp:true, name:'test-tcpconsumer'}) +let tcpconsumer = new Consumer({name:'test-tcpconsumer'}) const delay = time => new Promise(res=>setTimeout(()=>res(),time)) diff --git a/test/tcpsocket.mjs b/test/tcpsocket.mjs new file mode 100644 index 0000000..9eb6f14 --- /dev/null +++ b/test/tcpsocket.mjs @@ -0,0 +1,20 @@ +import { Socket } from '../src' + +let socket = new Socket({name:'tcp socket'}) + +const app = { + spp: 'sprocessPacket', + sprocessPacket: async function (packet) { + packet.payload = 'tcp processed '+packet.payload + return packet + } +} + +; +(async () => { + + await socket.create(app) + +})().catch(err => { + console.error('FATAL: UNABLE TO START SYSTEM!\n',err) +}) diff --git a/test/socket.mjs b/test/usocket.mjs similarity index 64% rename from test/socket.mjs rename to test/usocket.mjs index 790d6d5..f9a2bba 100644 --- a/test/socket.mjs +++ b/test/usocket.mjs @@ -2,14 +2,12 @@ import { Socket } from '../src' const USOCKET = __dirname + '/sample.sock' -let socket = new Socket(USOCKET) +let socket = new Socket(USOCKET,{name:'unix socket'}) const app = { - uspp: 'sprocessPacket', + spp: 'sprocessPacket', sprocessPacket: async function (packet) { - packet.processed = true - console.log('server: packet processed') - console.dir(packet) + packet.payload = 'processed '+packet.payload return packet } }