added chunck processing into JSON packets using newline delimiter

fixed left unix socket file bug
improved some defaults handling
added custom packet processing for tcp vs unix
tls
David Kebler 2018-01-18 21:21:06 -08:00
parent f0c6e96d02
commit 57324c616c
8 changed files with 206 additions and 92 deletions

104
package-lock.json generated
View File

@ -1,5 +1,5 @@
{
"name": "@uci/unix-socket",
"name": "@uci/socket",
"version": "0.1.0",
"lockfileVersion": 1,
"requires": true,
@ -273,8 +273,7 @@
"balanced-match": {
"version": "1.0.0",
"resolved": "http://trantor:8082/balanced-match/-/balanced-match-1.0.0.tgz",
"integrity": "sha1-ibTRmasr7kneFk6gK4nORi1xt2c=",
"dev": true
"integrity": "sha1-ibTRmasr7kneFk6gK4nORi1xt2c="
},
"bcrypt-pbkdf": {
"version": "1.0.1",
@ -328,7 +327,6 @@
"version": "1.1.8",
"resolved": "http://trantor:8082/brace-expansion/-/brace-expansion-1.1.8.tgz",
"integrity": "sha1-wHshHHyVLsH479Uad+8NHTmQopI=",
"dev": true,
"requires": {
"balanced-match": "1.0.0",
"concat-map": "0.0.1"
@ -351,6 +349,17 @@
"integrity": "sha1-81HTKWnTL6XXpVZxVCY9korjvR8=",
"dev": true
},
"bunyan": {
"version": "1.8.12",
"resolved": "http://trantor:8082/bunyan/-/bunyan-1.8.12.tgz",
"integrity": "sha1-8VDw9nSKvdcq6uhPBEA74u8RN5c=",
"requires": {
"dtrace-provider": "0.8.6",
"moment": "2.20.1",
"mv": "2.1.1",
"safe-json-stringify": "1.0.4"
}
},
"caller-path": {
"version": "0.1.0",
"resolved": "http://trantor:8082/caller-path/-/caller-path-0.1.0.tgz",
@ -556,8 +565,7 @@
"concat-map": {
"version": "0.0.1",
"resolved": "http://trantor:8082/concat-map/-/concat-map-0.0.1.tgz",
"integrity": "sha1-2Klr13/Wjfd5OnMDajug1UBdR3s=",
"dev": true
"integrity": "sha1-2Klr13/Wjfd5OnMDajug1UBdR3s="
},
"concat-stream": {
"version": "1.6.0",
@ -722,6 +730,11 @@
"integrity": "sha1-3zrhmayt+31ECqrgsp4icrJOxhk=",
"dev": true
},
"delimiter-stream": {
"version": "1.0.1",
"resolved": "http://trantor:8082/delimiter-stream/-/delimiter-stream-1.0.1.tgz",
"integrity": "sha1-XyUUUJNQcjq5lE2shvzYJ39TFBg="
},
"diff": {
"version": "3.3.1",
"resolved": "http://trantor:8082/diff/-/diff-3.3.1.tgz",
@ -746,6 +759,15 @@
"is-obj": "1.0.1"
}
},
"dtrace-provider": {
"version": "0.8.6",
"resolved": "http://trantor:8082/dtrace-provider/-/dtrace-provider-0.8.6.tgz",
"integrity": "sha1-QooiOv4DQl0s1tY0f99AxmkDVj0=",
"optional": true,
"requires": {
"nan": "2.8.0"
}
},
"duplexer": {
"version": "0.1.1",
"resolved": "http://trantor:8082/duplexer/-/duplexer-0.1.1.tgz",
@ -1208,6 +1230,7 @@
"dev": true,
"optional": true,
"requires": {
"nan": "2.8.0",
"node-pre-gyp": "0.6.39"
},
"dependencies": {
@ -2374,7 +2397,6 @@
"version": "1.0.6",
"resolved": "http://trantor:8082/inflight/-/inflight-1.0.6.tgz",
"integrity": "sha1-Sb1jMdfQLQwJvJEKEHW6gWW1bfk=",
"dev": true,
"requires": {
"once": "1.4.0",
"wrappy": "1.0.2"
@ -2383,8 +2405,7 @@
"inherits": {
"version": "2.0.3",
"resolved": "http://trantor:8082/inherits/-/inherits-2.0.3.tgz",
"integrity": "sha1-Yzwsg+PaQqUC9SRmAiSA9CCCYd4=",
"dev": true
"integrity": "sha1-Yzwsg+PaQqUC9SRmAiSA9CCCYd4="
},
"ini": {
"version": "1.3.5",
@ -2957,7 +2978,6 @@
"version": "3.0.4",
"resolved": "http://trantor:8082/minimatch/-/minimatch-3.0.4.tgz",
"integrity": "sha512-yJHVQEhyqPLUTgt9B83PXu6W3rx4MvvHvSUvToogpwoGDOUQ+yDrR0HRot+yOCdCO7u4hX3pWft6kWBBcqh0UA==",
"dev": true,
"requires": {
"brace-expansion": "1.1.8"
}
@ -2972,7 +2992,6 @@
"version": "0.5.1",
"resolved": "http://trantor:8082/mkdirp/-/mkdirp-0.5.1.tgz",
"integrity": "sha1-MAV0OOrGz3+MR2fzhkjWaX11yQM=",
"dev": true,
"requires": {
"minimist": "0.0.8"
},
@ -2980,8 +2999,7 @@
"minimist": {
"version": "0.0.8",
"resolved": "http://trantor:8082/minimist/-/minimist-0.0.8.tgz",
"integrity": "sha1-hX/Kv8M5fSYluCKCYuhqp6ARsF0=",
"dev": true
"integrity": "sha1-hX/Kv8M5fSYluCKCYuhqp6ARsF0="
}
}
},
@ -3040,12 +3058,59 @@
"integrity": "sha1-j7+rsKmKJT0xhDMfno3rc3L6xsA=",
"dev": true
},
"mv": {
"version": "2.1.1",
"resolved": "http://trantor:8082/mv/-/mv-2.1.1.tgz",
"integrity": "sha1-rmzg1vbV4KT32JN5jQPB6pVZtqI=",
"optional": true,
"requires": {
"mkdirp": "0.5.1",
"ncp": "2.0.0",
"rimraf": "2.4.5"
},
"dependencies": {
"glob": {
"version": "6.0.4",
"resolved": "http://trantor:8082/glob/-/glob-6.0.4.tgz",
"integrity": "sha1-DwiGD2oVUSey+t1PnOJLGqtuTSI=",
"optional": true,
"requires": {
"inflight": "1.0.6",
"inherits": "2.0.3",
"minimatch": "3.0.4",
"once": "1.4.0",
"path-is-absolute": "1.0.1"
}
},
"rimraf": {
"version": "2.4.5",
"resolved": "http://trantor:8082/rimraf/-/rimraf-2.4.5.tgz",
"integrity": "sha1-7nEM5dk6j9uFb7Xqj/Di11k0sto=",
"optional": true,
"requires": {
"glob": "6.0.4"
}
}
}
},
"nan": {
"version": "2.8.0",
"resolved": "http://trantor:8082/nan/-/nan-2.8.0.tgz",
"integrity": "sha1-7XFfP+neArV6XmJS2QqWZ14fCFo=",
"optional": true
},
"natural-compare": {
"version": "1.4.0",
"resolved": "http://trantor:8082/natural-compare/-/natural-compare-1.4.0.tgz",
"integrity": "sha1-Sr6/7tdUHywnrPspvbvRXI1bpPc=",
"dev": true
},
"ncp": {
"version": "2.0.0",
"resolved": "http://trantor:8082/ncp/-/ncp-2.0.0.tgz",
"integrity": "sha1-GVoh1sRuNh0vsSgbo4uR6d9727M=",
"optional": true
},
"nodemon": {
"version": "1.14.3",
"resolved": "http://trantor:8082/nodemon/-/nodemon-1.14.3.tgz",
@ -3121,7 +3186,6 @@
"version": "1.4.0",
"resolved": "http://trantor:8082/once/-/once-1.4.0.tgz",
"integrity": "sha1-WDsap3WWHUsROsF9nFC6753Xa9E=",
"dev": true,
"requires": {
"wrappy": "1.0.2"
}
@ -3209,8 +3273,7 @@
"path-is-absolute": {
"version": "1.0.1",
"resolved": "http://trantor:8082/path-is-absolute/-/path-is-absolute-1.0.1.tgz",
"integrity": "sha1-F0uSaHNVNP+8es5r9TpanhtcX18=",
"dev": true
"integrity": "sha1-F0uSaHNVNP+8es5r9TpanhtcX18="
},
"path-is-inside": {
"version": "1.0.2",
@ -3606,6 +3669,12 @@
"integrity": "sha512-kKvNJn6Mm93gAczWVJg7wH+wGYWNrDHdWvpUmHyEsgCtIwwo3bqPtV4tR5tuPaUhTOo/kvhVwd8XwwOllGYkbg==",
"dev": true
},
"safe-json-stringify": {
"version": "1.0.4",
"resolved": "http://trantor:8082/safe-json-stringify/-/safe-json-stringify-1.0.4.tgz",
"integrity": "sha1-gaCY9Efku8P/MxKiQ1IbwGDvWRE=",
"optional": true
},
"semver": {
"version": "5.4.1",
"resolved": "http://trantor:8082/semver/-/semver-5.4.1.tgz",
@ -4095,8 +4164,7 @@
"wrappy": {
"version": "1.0.2",
"resolved": "http://trantor:8082/wrappy/-/wrappy-1.0.2.tgz",
"integrity": "sha1-tSQ9jz7BqjXxNkYFvA0QNuMKtp8=",
"dev": true
"integrity": "sha1-tSQ9jz7BqjXxNkYFvA0QNuMKtp8="
},
"write": {
"version": "0.2.1",

View File

@ -1,5 +1,5 @@
{
"name": "@uci/unix-socket",
"name": "@uci/socket",
"version": "0.1.0",
"description": "Bare bones intra Host Unix Socket for basic IPC on same machine",
"main": "src",
@ -42,6 +42,7 @@
"better-try-catch": "^0.6.2",
"bunyan": "^1.8.12",
"death": "^1.1.0",
"delimiter-stream": "^1.0.1",
"simple-node-logger": "^0.93.33"
}
}

View File

@ -1,6 +1,7 @@
import { Socket } from 'net'
import btc from 'better-try-catch'
import bunyan from 'bunyan'
import Stream from 'delimiter-stream'
export default class Consumer extends Socket {
constructor (path, opts={}) {
@ -8,14 +9,10 @@ export default class Consumer extends Socket {
// set or tcp socket
if (typeof(path)!=='string') {
opts = path
this.host = '127.0.0.1'
this.host = opts.host || '127.0.0.1'
this.port = opts.port || 8080
} else {
if (opts.tcp) {
this.host = path
this.port = opts.port || 8080
} else this.path = path
}
} else this.path = path
this.keepAlive = opts.keepAlive ? opts.keepAlive : true
this._ready = false
this.timeout = opts.timeout || 1000
@ -57,9 +54,10 @@ export default class Consumer extends Socket {
let [err, strbuf] = btc(JSON.stringify)(packet)
if (!err) {
this.log.info({packet:packet},'attempting to send packet to socket')
strbuf += '\n'
return new Promise((resolve, reject) => {
this.write(strbuf, (err) => {
super.write(strbuf, (err) => {
if (err) reject(err)
else resolve('complete')
})
@ -69,23 +67,31 @@ export default class Consumer extends Socket {
}
async listen (app={}) {
this.on('data', async (buf) => {
let [err, packet] = btc(JSON.parse)(buf.toString())
let packet = new Stream()
this.on('data', async (chunk) => {
packet.write(chunk)
})
packet.on('data', async (strJSON) => {
let [err, packet] = btc(JSON.parse)(strJSON)
if (!err) {
if (packet.ready) {
this._ready = true
return }
// set default packet processing - simple print to console of packet
app.ucpp = app.ucpp || 'processPacket'
if (!app[app.ucpp]) {
app.ucpp = 'processPacket'
app.cpp = app.cpp || 'processPacket'
if (!app[app.cpp]) {
app.cpp = 'processPacket'
app.processPacket = async (packet) => {
this.log.info({packet:packet},'incoming packet from socket')
return packet }
}
await app[app.ucpp](packet) // process the packet
await app[app.cpp](packet) // process the packet
}
else { this.log.info({buf: buf.toString()},'bad packet JSON syntax')}
else { this.log.info({strJSON: strJSON},'bad packet JSON syntax')}
})
}

View File

@ -3,6 +3,7 @@ import { unlink as fileDelete } from 'fs'
import btc from 'better-try-catch'
import ON_DEATH from 'death' //this is intentionally ugly
import bunyan from 'bunyan'
import Stream from 'delimiter-stream'
export default class Socket extends Server {
constructor (path, opts={}) {
@ -10,12 +11,10 @@ export default class Socket extends Server {
// set or tcp socket
if (typeof(path)!=='string') {
opts = path
this.listen_opts = { host: '127.0.0.1', port: opts.port || 8080}
} else {
if (opts.tcp) {
this.listen_opts = { host: path, port: opts.port || 8080}
} else this.listen_opts = { path: path }
}
this.listen_opts = { host: opts.host || '127.0.0.1', port: opts.port || 8080}
} else this.listen_opts = { path: path }
this.spp = opts.spp || 'processPacket'
// logging
this.log_file=opts.log_file || './socket.log'
this.log_opts = {streams:[]}
this.log_opts.name = opts.name ? opts.name : 'uci-unix-socket-consumer'
@ -28,7 +27,7 @@ export default class Socket extends Server {
async create (app={}) {
return new Promise( (resolve,reject) => {
return new Promise( async (resolve,reject) => {
ON_DEATH( async () => {
this.log.info('\nhe\'s dead jim')
@ -44,61 +43,82 @@ export default class Socket extends Server {
this.on('error', async (err) => {
// recover from socket file that was not removed
if (err.code === 'EADDRINUSE') {
this.log.info({socket: this.path}, 'already exists...deleting')
await fileDelete(this.path)
return await this.listen(this.path, app)
let path = this.listen_opts.path
if (path) { // if TCP socket should already be dead
this.log.info({socket: path}, 'already exists...deleting')
await fileDelete(path)
return await this.listen.bind(this)(this.listen_opts, app)
}
}
// otherwise fatally exit
this.log.info(err, 'creating socket')
reject(err)
})
this.listen(this.listen_opts, async (err, res) => {
let [err, res] = await btc(this.listen.bind(this))(this.listen_opts,app)
if (err) reject(err)
resolve(res)
if (err) reject(err)
// this gets called for each client connection and is unique to each
this.on('connection', (socket) => {
this.log.info('server: new consumer connecting')
socket.on('data', async (buf) => {
let [err, packet] = btc(JSON.parse)(buf.toString())
if (!err) {
if (this.log) this.log.info(`data packet received to socket \n ${packet}`)
this.log.info({packet:packet},'Server: packet received to socket')
// set default packet processing
app.uspp = app.uspp || 'processPacket'
if (!app[app.uspp]) {
app.uspp = 'processPacket'
app.processPacket = async (packet) => {
packet.res='echoed'
this.log.info({packet:packet},'packet being sent to consumer')
return packet }
}
socket.write(JSON.stringify(await app[app.uspp].bind(app)(packet)))
}
else {
this.log.info(`bad packet JSON syntax \n ${buf.toString()}`)
let error = {
error: 'bad packet JSON syntax sent',
packet: buf.toString()
}
socket.write(JSON.stringify(error))
}
}) // end incoming data listerner
this.log.info('Server: sending handshake to consumer')
socket.write('{"ready":true}')
}) // end connected consumer
this.log.info({socket: this.path},'socket created')
resolve(res)
})
})
}) // end promise
} // end create
async listen (opts,app) {
super.listen(opts, async (err, res) => {
if (err) return err
// this gets called for each client connection and is unique to each
this.on('connection', (socket) => {
this.log.info('server: new consumer connecting')
let packet = new Stream()
socket.on('data', async (chunk) => {
packet.write(chunk)
})
// when a complete JSON packet arrives process the packet
packet.on('data', async (strJSON) => {
let [err, packet] = btc(JSON.parse)(strJSON)
if (!err) {
this.log.info({packet:packet},'Server: packet received to socket')
// set default packet processing
// console.log('==========',app.spp,'====',this.spp)
this.spp = app.spp || this.spp
// console.log('==========',app.spp,'====',this.spp)
if (!app[this.spp]) {
// app.spp = 'processPacket'
app.processPacket = async (packet) => {
packet.res='echoed'
this.log.info({packet:packet},'packet being sent to consumer')
return packet }
}
socket.write(JSON.stringify(await app[this.spp].bind(app)(packet))+'\n' )
}
else {
this.log.info(`bad packet JSON syntax \n ${strJSON}`)
let error = {
error: 'bad packet JSON syntax sent',
packet: strJSON
}
socket.write(JSON.stringify(error))
}
}) // end incoming string stream listerner
this.log.info('Server: sending handshake to consumer')
socket.write('{"ready":true}\n')
}) // end connected consumer
this.log.info({socket: this.listen_opts},'socket created')
return res
}) // end listen callback
}
async destroy () {

1
test/.gitignore vendored
View File

@ -1 +1,2 @@
*.sock
/node_modules/

View File

@ -10,7 +10,7 @@ import { Consumer } from '../src'
const USOCKET = __dirname + '/sample.sock'
let uconsumer = new Consumer(USOCKET, {name:'test-uconsumer'})
let tcpconsumer = new Consumer('localhost',{port: 8081, tcp:true, name:'test-tcpconsumer'})
let tcpconsumer = new Consumer({name:'test-tcpconsumer'})
const delay = time => new Promise(res=>setTimeout(()=>res(),time))

20
test/tcpsocket.mjs Normal file
View File

@ -0,0 +1,20 @@
import { Socket } from '../src'
let socket = new Socket({name:'tcp socket'})
const app = {
spp: 'sprocessPacket',
sprocessPacket: async function (packet) {
packet.payload = 'tcp processed '+packet.payload
return packet
}
}
;
(async () => {
await socket.create(app)
})().catch(err => {
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)
})

View File

@ -2,14 +2,12 @@ import { Socket } from '../src'
const USOCKET = __dirname + '/sample.sock'
let socket = new Socket(USOCKET)
let socket = new Socket(USOCKET,{name:'unix socket'})
const app = {
uspp: 'sprocessPacket',
spp: 'sprocessPacket',
sprocessPacket: async function (packet) {
packet.processed = true
console.log('server: packet processed')
console.dir(packet)
packet.payload = 'processed '+packet.payload
return packet
}
}