From 751491ee00241c3e490d48b41ed53865abc2e187 Mon Sep 17 00:00:00 2001 From: David Kebler Date: Sat, 3 Feb 2018 13:33:25 -0800 Subject: [PATCH] moved log options to top refactor passed int opts (path no longer passed separately) removed .packet property moving process to root property and thus _packetProcess is private method for packet processing re-export in index both Socket and Consumer as named and as props of default refactor test and examples accordingly --- examples/client.mjs | 11 +-- examples/client2.mjs | 20 ++++-- examples/server.mjs | 12 ++-- src/consumer.mjs | 80 ++++++++++++--------- src/index.mjs | 8 ++- src/socket.mjs | 81 +++++++++++----------- test/sockets/tcp-process.mjs | 2 +- test/sockets/tcpsocket-9080.mjs | 1 - test/sockets/tcpsocket-default.mjs | 3 +- test/sockets/usocket-default-overwrite.mjs | 4 +- test/sockets/usocket-default.mjs | 2 +- test/tcp.test.mjs | 4 +- test/usocket-default-overwrite.test.mjs | 18 +++-- test/usocket-default.test.mjs | 16 ++--- 14 files changed, 141 insertions(+), 121 deletions(-) diff --git a/examples/client.mjs b/examples/client.mjs index 61ca779..a612566 100644 --- a/examples/client.mjs +++ b/examples/client.mjs @@ -2,8 +2,8 @@ import Consumer from '../src/consumer' const USOCKET = __dirname + '/sample.sock' -const client1= new Consumer(USOCKET, {log:false,name:'example-consumer1' }) -const client2 = new Consumer(USOCKET, {log:false,name:'example-consumer2'}) +const client1= new Consumer({path:USOCKET,name:'example-consumer1' }) +const client2 = new Consumer({path:USOCKET,name:'example-consumer2'}) let packet1 = {name: 'client1', cmd:'doit', data:'data sent by client1'} let packet2 = {name: 'client2', cmd:'doit', data:'data sent by client2'} @@ -15,13 +15,16 @@ const process = function (packet) { client1.registerPacketProcessor(process) -client2.packet._process = process +client2._packetProcess = process ; (async () => { await Promise.all([client1.connect(),client2.connect()]) - await Promise.all([client1.send(packet1),client2.send(packet2)]) + client1.send(packet1) + client2.send(packet2) + client1.end() + client2.end() })().catch(err => { console.error('FATAL: UNABLE TO START SYSTEM!\n',err) diff --git a/examples/client2.mjs b/examples/client2.mjs index 8e846f4..291e3e6 100644 --- a/examples/client2.mjs +++ b/examples/client2.mjs @@ -6,14 +6,21 @@ const USOCKET = __dirname + '/sample.sock' class Client extends Consumer { constructor(path,opts) { super(path,opts) - this.packet._process = async function (packet) { - console.log(`Packet from ${packet.name} Processed by Socket: ${packet.status}`) - } } + + async _packetProcess (packet) { + this[packet.cmd](packet) + } + + async reply (packet) { + console.log(`Packet from ${packet.name} Processed by Socket: ${packet.status}`) + console.log(`Socket replied with data: ${packet.data}`) + } + } -const client1= new Client(USOCKET, {log:false,name:'example-consumer1' }) -const client2 = new Client(USOCKET, {log:false,name:'example-consumer2'}) +const client1= new Client({path:USOCKET,name:'example-consumer1' }) +const client2 = new Client({path:USOCKET,name:'example-consumer2'}) let packet1 = {name: 'client1', cmd:'doit', data:'data sent by client1'} let packet2 = {name: 'client2', cmd:'doit', data:'data sent by client2'} @@ -23,7 +30,8 @@ let packet2 = {name: 'client2', cmd:'doit', data:'data sent by client2'} await Promise.all([client1.connect(),client2.connect()]) await Promise.all([client1.send(packet1),client2.send(packet2)]) - + client1.end() + client2.end() })().catch(err => { console.error('FATAL: UNABLE TO START SYSTEM!\n',err) }) diff --git a/examples/server.mjs b/examples/server.mjs index b15f114..3e2907a 100644 --- a/examples/server.mjs +++ b/examples/server.mjs @@ -6,11 +6,11 @@ const USOCKET = __dirname + '/sample.sock' (async () => { class Test extends Socket { - constructor(path,opts) { - super(path,opts) + constructor(opts) { + super(opts) } - async _process(packet) { + async _packetProcess(packet) { console.log('packet being processed') console.dir(packet) if (packet.cmd) return await this[packet.cmd](packet.data,packet.name) @@ -22,14 +22,14 @@ const USOCKET = __dirname + '/sample.sock' console.log('data:', data) res.status ='success' res.name = name - res.data = 'this would be response from device' + res.cmd = 'reply' + res.data = 'this might be response data from another process' return(res) } } - let test = new Test(USOCKET) - test.packet = test + let test = new Test({path:USOCKET}) await test.create() })().catch(err => { diff --git a/src/consumer.mjs b/src/consumer.mjs index 01499ba..e57b838 100644 --- a/src/consumer.mjs +++ b/src/consumer.mjs @@ -1,37 +1,36 @@ import { Socket } from 'net' import btc from 'better-try-catch' -// import bunyan from 'bunyan' -import logger from '../../uci-logger/src/logger' -let log = {} - import JsonStream from './json-stream' +import logger from '../../uci-logger/src/logger' +let log = {} +const LOG_OPTS = { + repo:'uci-socket', + npm:'@uci/socket', + file:'src/consumer.mjs', + class:'Consumer', + id:this.id, + instance_created:new Date().getTime() +} + +const DEFAULT_PIPE = __dirname + '/unix.sock' + export default class Consumer extends Socket { - constructor (path={}, opts={}) { + constructor (opts={}) { super() - this.id = opts.id || opts.name || 'consumer:'+ Math.random()*100 - if (typeof(path)!=='string') { - if (arguments.length === 2) { - opts.host = path.host || opts.host - opts.port = path.port || opts.port - } else opts=path - this.host = opts.host || '127.0.0.1' // TODO log a warning about host on same machine - this.port = opts.port || 8080 - } else { - this.path = path - } + this.id = opts.id || opts.name || 'socket:'+ new Date().getTime() + if (!opts.path && opts.np) opts.path = DEFAULT_PIPE + if (!opts.path) { + opts.host = opts.host || '127.0.0.1' + opts.port = opts.port || 8080 + } else opts.np = true + this.opts=opts this.keepAlive = opts.keepAlive ? opts.keepAlive : true this._ready = false this.timeout = opts.timeout || 500 this.wait = opts.wait || 5 this.stream = new JsonStream() - this.packet = { - _process: async (packet) => { - console.log('default consumer processor -- packet from socket') - console.dir(packet) - } - } - log = logger.child({repo:'uci-socket',npm:'@uci/socket',file:'src/socket.mjs',class:'Socket', id:this.id, created:new Date().getTime()}) + log = logger.child(LOG_OPTS) // bind to class for other class functions this.connect = this.connect.bind(this) this.__ready = this.__ready.bind(this) @@ -45,20 +44,20 @@ export default class Consumer extends Socket { return new Promise( (resolve,reject) => { const connect = () => { - if (this.host ==='127.0.0.1') log.warn('tcp consumer on same machine as host, use named Pipe(Unix) Socket Instead') - log.info(`attempting to connect ${this.id} to ${this.path?this.path:this.host+':'+this.port}`) - super.connect({ port:this.port, host:this.host, path: this.path }) + if (this.opts.host ==='127.0.0.1') log.warn('tcp consumer on same machine as host, use named Pipe(Unix) Socket Instead') + log.info({opts:this.opts},`attempting to connect ${this.id} to socket`) + super.connect(this.opts) } const timeout = setTimeout(() =>{ - reject(`unable to connect in ${this.timeout*10}ms to ${this.path?this.path:this.host+':'+this.port}`) + reject({opts:this.opts},`unable to connect in ${this.timeout*10}ms`) } ,this.timeout*10) this.once('connect', async () => { clearTimeout(timeout) this._listen() - log.info({path:this.path, host:this.host, post:this.port },`connected ${this.path?this.path:this.host+':'+this.port} waiting for socket ready handshake`) + log.info({opts:this.opts},'connected waiting for socket ready handshake') this.setKeepAlive(this.keepAlive) let [err, res] = await btc(isReady).bind(this)(this.__ready, this.wait, this.timeout) if (err) reject(err) @@ -74,10 +73,10 @@ export default class Consumer extends Socket { } log.warn(err.code) setTimeout(() =>{ - log.warn(`retrying connect to ${this.path?this.path:this.host+':'+this.port}`) + // log.warn(`retrying connect to ${this.opts}`) connect() } - ,this.wait*10) + ,this.wait*100) }) connect() @@ -88,7 +87,9 @@ export default class Consumer extends Socket { async send(packet) { await this.write(this.stream.serialize(packet)) - // TODO handle error here? and/or await response if required before allowing more sending + // TODO handle possible error + // TODO await response if required by setting id to packet + // then set a flag (and promise) that is resovled in the listener } // TODO register alt stream processor (emit 'message' with JSON, serialize function, onData method for raw socket chucks) @@ -96,9 +97,11 @@ export default class Consumer extends Socket { // TODO register authenciation function (set up default) registerPacketProcessor (func) { - this.packet._process = func + this._packetProcess = func } + // PRIVATE METHODS + __ready() {return this._ready} async _listen () { @@ -109,13 +112,22 @@ export default class Consumer extends Socket { if (packet.ready) { this._ready = true return } - await this.packet._process(packet) + await this._packetProcess(packet) } } +// default packet process just a simple console logger + _packetProcess (packet) { + console.log('default consumer processor -- packet from socket') + console.dir(packet) + } + + } // end class -// wait for handshake packet from socket + +// HELP FUNCTIONS +// wait until a passed ready function returns true function isReady(ready, wait=30, timeout=1000) { let time = 0 return new Promise((resolve, reject) => { diff --git a/src/index.mjs b/src/index.mjs index 55b503b..3def1f5 100644 --- a/src/index.mjs +++ b/src/index.mjs @@ -1,2 +1,6 @@ -export { default as Socket } from './socket' -export { default as Consumer } from './consumer' +import Socket from './socket' +import Consumer from './consumer' + +export {Socket as Socket} +export { Consumer as Consumer } +export default { Socket, Consumer } diff --git a/src/socket.mjs b/src/socket.mjs index b3c32fb..6454d64 100644 --- a/src/socket.mjs +++ b/src/socket.mjs @@ -1,45 +1,46 @@ import { Server } from 'net' import { unlink as fileDelete } from 'fs' import btc from 'better-try-catch' -import ON_DEATH from 'death' //this is intentionally ugly +import _ON_DEATH from 'death' //this is intentionally ugly import JSONStream from './json-stream' import logger from '../../uci-logger/src/logger' let log = {} +const LOG_OPTS = { + repo:'uci-socket', + npm:'@uci/socket', + file:'src/socket.mjs', + class:'Socket', + id:this.id, + instance_created:new Date().getTime() +} +const DEFAULT_PIPE = __dirname + '/unix.sock' export default class Socket extends Server { - constructor (path,opts={}) { + constructor (opts={}) { super() this.id = opts.id || opts.name || 'socket:'+ new Date().getTime() - if (typeof(path)!=='string') { - if (arguments.length === 2) { - opts.host = path.host || opts.host - opts.port = path.port || opts.port - } else opts=path - this.listen_opts = { host: opts.host || '0.0.0.0', port: opts.port || 8080} - } else this.listen_opts = { path: path } - this.packet = { // default packet processing - simple echo server - _process: (packet) => { - packet.res='echoed' - return packet } - } - //self binding + if (!opts.path && opts.np) opts.path = DEFAULT_PIPE + if (!opts.path) { + opts.host = opts.host || '0.0.0.0' + opts.port = opts.port || 8080 + } else opts.np = true + this.opts = opts + //self bindings this._listen = this._listen.bind(this) this.create = this.create.bind(this) - log = logger.child({repo:'uci-socket',npm:'@uci/socket',file:'src/socket.mjs',class:'Socket', id:this.id, created:new Date().getTime()}) + log = logger.child(LOG_OPTS) //create instance logger set LOG_OPTS above } // end constructor async create () { return new Promise( async (resolve,reject) => { - - ON_DEATH( async () => { + // couple ways to kill socket process when needed + _ON_DEATH( async () => { log.info('\nhe\'s dead jim') await this._destroy() - }) - process.once('SIGUSR2', async () => { await this._destroy process.kill(process.pid, 'SIGUSR2') @@ -48,11 +49,10 @@ export default class Socket extends Server { this.on('error', async (err) => { // recover from socket file that was not removed if (err.code === 'EADDRINUSE') { - let path = this.listen_opts.path - if (path) { // if TCP socket should already be dead - log.info({socket: path}, 'already exists...deleting') - await fileDelete(path) - return await this._listen(this.listen_opts) + if (this.opts.np) { // if TCP socket should already be dead + log.info({socket: this.opts.path}, 'already exists...deleting') + await fileDelete(this.opts.path) + return await this._listen(this.opts) } } // otherwise fatally exit @@ -60,40 +60,37 @@ export default class Socket extends Server { reject(err) }) - let [err, res] = await btc(this._listen)(this.listen_opts) + let [err, res] = await btc(this._listen)(this.opts) if (err) reject(err) resolve(res) }) // end promise } // end create + + registerPacketProcessor (func) { + this._packetProcess = func + } + async _listen (opts) { - super.listen(opts, async (err, res) => { - if (err) return err - // this gets called for each client connection and is unique to each this.on('connection', (socket) => { - const stream = new JSONStream() log.info('new consumer connecting sending handshake') - socket.write(stream.serialize({ready:true})) - socket.on('data', stream.onData) - stream.on('message', messageProcess.bind(this)) async function messageProcess (packet) { - socket.write(stream.serialize(await this.packet._process(packet))) + socket.write(stream.serialize(await this._packetProcess(packet))) } - - }) // end connected consumer - log.info({socket: this.listen_opts},'socket created') + }) // end connecttion consumer + log.info({opts: this.opts},'socket created') return res - }) // end listen callback + }) // end super listen callback - } + } // end listen async _destroy () { log.info('closing down socket') @@ -102,8 +99,10 @@ export default class Socket extends Server { process.exit() } - registerPacketProcessor (func) { - this.packet._process = func + // default packet process, just a simple echo + _packetProcess (packet) { + packet.res='echoed' + return packet } } // end class diff --git a/test/sockets/tcp-process.mjs b/test/sockets/tcp-process.mjs index f40b0fd..5eeb46a 100644 --- a/test/sockets/tcp-process.mjs +++ b/test/sockets/tcp-process.mjs @@ -1,4 +1,4 @@ export default async function (packet) { - packet.payload = this.port +':'+packet.payload + packet.payload = this.opts.port +':'+packet.payload return packet } diff --git a/test/sockets/tcpsocket-9080.mjs b/test/sockets/tcpsocket-9080.mjs index 7eac50f..af5b0a5 100644 --- a/test/sockets/tcpsocket-9080.mjs +++ b/test/sockets/tcpsocket-9080.mjs @@ -4,7 +4,6 @@ import process from './tcp-process' let socket = new Socket({port:9080, name:'tcp socket 9080'}) socket.registerPacketProcessor(process) -socket.packet.port = socket.listen_opts.port ; (async () => { diff --git a/test/sockets/tcpsocket-default.mjs b/test/sockets/tcpsocket-default.mjs index b7869f9..86ef932 100644 --- a/test/sockets/tcpsocket-default.mjs +++ b/test/sockets/tcpsocket-default.mjs @@ -1,10 +1,9 @@ import { Socket } from '../../src' import process from './tcp-process' -let socket = new Socket({name:'tcp socket', log:false}) +let socket = new Socket({name:'tcp socket'}) socket.registerPacketProcessor(process) -socket.packet.port = socket.listen_opts.port ; (async () => { diff --git a/test/sockets/usocket-default-overwrite.mjs b/test/sockets/usocket-default-overwrite.mjs index 2746705..47bf915 100644 --- a/test/sockets/usocket-default-overwrite.mjs +++ b/test/sockets/usocket-default-overwrite.mjs @@ -2,9 +2,9 @@ import { Socket } from '../../src' const USOCKET = __dirname + '/test.sock' -let socket = new Socket(USOCKET,{name:'unix socket'}) +let socket = new Socket({path:USOCKET,name:'default-unix-socket'}) -socket.packet.test = 'at socket => ' +socket.test = 'at socket => ' socket.registerPacketProcessor(async function (packet) { packet.payload = 'overwrite default processor from instance '+ this.test + packet.payload diff --git a/test/sockets/usocket-default.mjs b/test/sockets/usocket-default.mjs index ef39570..795e801 100644 --- a/test/sockets/usocket-default.mjs +++ b/test/sockets/usocket-default.mjs @@ -2,7 +2,7 @@ import { Socket } from '../../src' const USOCKET = __dirname + '/test.sock' -let socket = new Socket(USOCKET,{name:'unix socket', log:false}) +let socket = new Socket({path:USOCKET,name:'default-unix-socket'}) ; (async () => { diff --git a/test/tcp.test.mjs b/test/tcp.test.mjs index 0fd13cc..f9f8ee6 100644 --- a/test/tcp.test.mjs +++ b/test/tcp.test.mjs @@ -7,8 +7,6 @@ const expect = chai.expect import { Consumer } from '../src' -const delay = time => new Promise(res=>setTimeout(()=>res(),time)) - let tcpsocket_default = {} let tcpsocket_9080 = {} @@ -44,7 +42,7 @@ describe('Connects and Processes a payload in a JSON packet via TCP Socket', fun process.kill(process.pid, 'SIGTERM') } - tcpconsumer_default.packet._process = function (packet) { + tcpconsumer_default._packetProcess = function (packet) { try { expect(packet.payload).to.equal('8080:tcp payload') resolve() diff --git a/test/usocket-default-overwrite.test.mjs b/test/usocket-default-overwrite.test.mjs index 2eb3deb..62d6073 100644 --- a/test/usocket-default-overwrite.test.mjs +++ b/test/usocket-default-overwrite.test.mjs @@ -10,10 +10,10 @@ import { Consumer } from '../src' const USOCKET = __dirname + '/sockets/test.sock' const SOCKET_FILE = 'usocket-default-overwrite' -let consumer = new Consumer(USOCKET, {name:'unix-consumer'}) -let consumer2 = new Consumer(USOCKET, {name:'unix-consumer2'}) +let consumer = new Consumer({path:USOCKET,name:'unix-consumer'}) +let consumer2 = new Consumer({path:USOCKET, name:'unix-consumer2'}) -const delay = time => new Promise(res=>setTimeout(()=>res(),time)) +// const delay = time => new Promise(res=>setTimeout(()=>res(),time)) let socket = {} @@ -24,8 +24,6 @@ describe('Connects and Processes a payload via Unix Socket using JSON packet wit socket.stdout.on('data', function(buf) { console.log('[Socket]', String(buf)) }) - - await delay(500) // wait for sockets to get going }) after(async function(){ @@ -34,13 +32,13 @@ describe('Connects and Processes a payload via Unix Socket using JSON packet wit it('Tests JSON packet procssing, 10 packets', async function () { - consumer.packet.times = 0 + consumer.times = 0 return new Promise(async function (resolve, reject) { setTimeout(() =>{ reject('10 packets not received in time')},1900) - consumer.packet._process = function (packet) { + consumer._packetProcess = function (packet) { this.times++ if (this.times!==11) return @@ -67,13 +65,13 @@ describe('Connects and Processes a payload via Unix Socket using JSON packet wit it('unix socket with two consumers alternating packets, 10 packets each', async function () { - consumer.packet.times = 0 + consumer.times = 0 return new Promise(async function (resolve, reject) { setTimeout(() =>{ reject('10 packets not received in time')},1900) - consumer.packet._process = function (packet) { + consumer._packetProcess = function (packet) { this.times++ // console.log(this.times,packet.payload) if (this.times!==11) return @@ -87,7 +85,7 @@ describe('Connects and Processes a payload via Unix Socket using JSON packet wit } } - consumer2.packet._process = function (packet) { + consumer2._packetProcess = function (packet) { return packet } diff --git a/test/usocket-default.test.mjs b/test/usocket-default.test.mjs index bef290e..cda83b1 100644 --- a/test/usocket-default.test.mjs +++ b/test/usocket-default.test.mjs @@ -10,10 +10,10 @@ import { Consumer } from '../src' const USOCKET = __dirname + '/sockets/test.sock' const SOCKET_FILE = 'usocket-default' -let consumer = new Consumer(USOCKET, {name:'unix-consumer', log:false}) -let consumer2 = new Consumer(USOCKET, {name:'unix-consumer2'}) +let consumer = new Consumer({path:USOCKET,name:'unix-consumer'}) +let consumer2 = new Consumer({path:USOCKET, name:'unix-consumer2'}) -const delay = time => new Promise(res=>setTimeout(()=>res(),time)) +// const delay = time => new Promise(res=>setTimeout(()=>res(),time)) let socket = {} @@ -32,7 +32,7 @@ describe('Connects and Processes a payload via Unix Socket using JSON packet wit it('Tests unix socket with default echo JSON packet procssing, 10 packets with conect via connect', async function () { - consumer.packet.times = 0 + consumer.times = 0 return new Promise(async function (resolve, reject) { @@ -44,7 +44,7 @@ describe('Connects and Processes a payload via Unix Socket using JSON packet wit setTimeout(() =>{ reject('10 packets not received in time')},1900) - consumer.packet._process = function (packet) { + consumer._packetProcess = function (packet) { this.times++ if (this.times!==11) return @@ -69,8 +69,8 @@ describe('Connects and Processes a payload via Unix Socket using JSON packet wit it('unix socket with two consumers alternating packets, 10 packets each with local and added context', async function () { - consumer.packet.times = 0 - consumer.packet.test = ':local' + consumer.times = 0 + consumer.test = ':local' let [err] = await btc(consumer2.connect)() if (err) { @@ -98,7 +98,7 @@ describe('Connects and Processes a payload via Unix Socket using JSON packet wit } }) - consumer2.packet._process = function (packet) { + consumer2._packetProcess = function (packet) { return packet }