diff --git a/.gitignore b/.gitignore index 24c853d..39b0099 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ /node_modules/ /coverage/ /syncd/ +*.log diff --git a/examples/.gitignore b/examples/.gitignore new file mode 100644 index 0000000..c74d682 --- /dev/null +++ b/examples/.gitignore @@ -0,0 +1 @@ +*.sock diff --git a/examples/client.mjs b/examples/client.mjs index 5539573..443e11b 100644 --- a/examples/client.mjs +++ b/examples/client.mjs @@ -2,16 +2,16 @@ import Consumer from '../src/consumer' const USOCKET = __dirname + '/sample.sock' -const socket1 = new Consumer(USOCKET) -const socket2 = new Consumer(USOCKET) +const client1= new Consumer(USOCKET, {log:true,name:'example-consumer1' }) +const client2 = new Consumer(USOCKET, {log:true,name:'example-consumer2'}) -let packet1 = {name: 'socket1', cmd:'doit', data:'data sent by socket1'} -let packet2 = {name: 'socket2', cmd:'doit', data:'data sent by socket2'} +let packet1 = {name: 'client1', cmd:'doit', data:'data sent by client1'} +let packet2 = {name: 'client2', cmd:'doit', data:'data sent by client2'} -// This is your socket handler waiting on a message to do something +// This is your client handler object waiting on a message to do something let app = { processIt: function processPacket (packet) { - console.log('incoming packet from socket to process') + console.log('your custom processing of incoming packet') console.dir(packet) }, ucpp: 'processIt' @@ -20,12 +20,8 @@ let app = { ; (async () => { - await socket1.connect() - await socket2.connect() - await socket1.listen(app) - await socket2.listen(app) - socket1.send(packet1) - socket2.send(packet2) + await Promise.all([client1.connect(app),client2.connect(app)]) + await Promise.all([client1.send(packet1),client2.send(packet2)]) })().catch(err => { console.error('FATAL: UNABLE TO START SYSTEM!\n',err) diff --git a/examples/server.mjs b/examples/server.mjs index 1b40775..05768d8 100644 --- a/examples/server.mjs +++ b/examples/server.mjs @@ -2,20 +2,19 @@ import { Socket } from '../src' const USOCKET = __dirname + '/sample.sock' -console.log(USOCKET) ; (async () => { class Test { - constructor(opts={}) { - this.socket = new Socket(USOCKET) - this.uspp = opts.uspp || 'processPacket' + constructor() { + this.socket = new Socket(USOCKET,{name:'example-socket'}) } async processPacket(packet) { console.log('packet being processed') console.dir(packet) - return await this[packet.cmd](packet.data,packet.name) + if (packet.cmd) return await this[packet.cmd](packet.data,packet.name) + return {error: 'no command in packet', packet: packet } } async doit(data,name) { @@ -27,10 +26,9 @@ console.log(USOCKET) return(res) } - init() { this.socket.create(this)} + async init() { return this.socket.create(this)} } - let test = new Test() await test.init() diff --git a/package.json b/package.json index 9adfb76..110e5b8 100644 --- a/package.json +++ b/package.json @@ -7,9 +7,9 @@ "testw": "mocha -r @std/esm test/*.test.mjs --watch --recurse --watch-extensions mjs", "test": "mocha -r @std/esm test/*.test.mjs", "testci": "istanbul cover ./node_modules/.bin/_mocha --report lcovonly -- -R spec --recursive && codecov || true", - "s": "node -r @std/esm examples/server", + "s": "node -r @std/esm examples/server | ./node_modules/.bin/bunyan", "devs": "./node_modules/.bin/nodemon -r @std/esm -e mjs examples/server", - "c": "node -r @std/esm examples/client" + "c": "node -r @std/esm examples/client | ./node_modules/.bin/bunyan -o short" }, "author": "David Kebler", "license": "MIT", @@ -40,6 +40,7 @@ }, "dependencies": { "better-try-catch": "^0.6.2", + "bunyan": "^1.8.12", "death": "^1.1.0", "simple-node-logger": "^0.93.33" } diff --git a/src/consumer.mjs b/src/consumer.mjs index 4416b00..bba1864 100644 --- a/src/consumer.mjs +++ b/src/consumer.mjs @@ -1,5 +1,6 @@ import { Socket } from 'net' import btc from 'better-try-catch' +import bunyan from 'bunyan' export default class Consumer extends Socket { constructor (path, opts={}) { @@ -7,6 +8,14 @@ export default class Consumer extends Socket { this.path = path this.keepAlive = opts.keepAlive ? opts.keepAlive : true this._ready = false + this.timeout = opts.timeout || 1000 + this.wait = opts.wait || 30 + this.log_file=opts.log_file || './log.log' + this.log_opts = {streams:[]} + this.log_opts.name = opts.name ? opts.name : 'uci-unix-socket-consumer' + this.log_opts.streams.push({level: 'info',path: this.log_file }) + if (opts.log) this.log_opts.streams.push({level: 'info',stream: process.stdout}) + this.log = bunyan.createLogger(this.log_opts) } ready() {return this._ready} @@ -14,38 +23,42 @@ export default class Consumer extends Socket { async connect (app) { await this.listen(app) - console.log('consumer: listening') + this.log.info('listening') - await super.connect({ path: this.path }) - console.log(`consumer: connected to ${this.path}`) - this.setKeepAlive(this.keepAlive) + return new Promise( (resolve,reject) => { - this.on('error', (error) => { - 'client socket error \n ', error.code + this.on('error', (err) => { + reject(err) + }) + + super.connect({ path: this.path }, async () => { + this.log.info({path:this.path},'connecting') + this.setKeepAlive(this.keepAlive) + let [err, res] = await btc(isReady).bind(this)(this.ready.bind(this), this.wait, this.timeout) + if (err) reject(err) + this.log.info('handshake done, connected') + resolve(res) + }) }) - return await isReady(this.ready.bind(this)) - } async send(packet) { let [err, strbuf] = btc(JSON.stringify)(packet) if (!err) { - // await promisify(this.write)(strbuf) - console.log('attempting to send') - // console.log(await this.write(strbuf)) - let res = await new Promise((resolve, reject) => { //returning promise + this.log.info({packet:packet},'attempting to send packet to socket') + + return new Promise((resolve, reject) => { this.write(strbuf, (err) => { if (err) reject(err) else resolve('complete') }) }) - console.log('send is', res) } - else { console.log(`bad packet JSON syntax \n ${packet} \n${err}`)} + else { this.log.info({packet:packet}, 'bad packet JSON syntax')} } - async listen (app) { + async listen (app={}) { this.on('data', async (buf) => { let [err, packet] = btc(JSON.parse)(buf.toString()) if (!err) { @@ -57,28 +70,28 @@ export default class Consumer extends Socket { if (!app[app.ucpp]) { app.ucpp = 'processPacket' app.processPacket = async (packet) => { - console.log('incoming packet from socket') - console.dir(packet) + this.log.info({packet:packet},'incoming packet from socket') return packet } } app[app.ucpp](packet) // process the packet } - else { console.log(`bad packet JSON syntax \n ${buf.toString()}`)} + else { this.log.info({buf: buf.toString()},'bad packet JSON syntax')} }) } } // end class // wait for handshake from socket -function isReady(ready) { +function isReady(ready, wait=30, timeout=1000) { + let log = this.log let time = 0 - return new Promise(function (resolve, reject) { + return new Promise((resolve, reject) => { (function waitReady(){ - if (time > 3000) return reject('timeout') + if (time > timeout) return reject('timeout trying to connect') if (ready()) return resolve('ready') - console.log('waiting for 30ms') + log.info('waiting for 30ms for handshake') time += 30 - setTimeout(waitReady, 30) + setTimeout(waitReady, wait) })() }) } diff --git a/src/socket.mjs b/src/socket.mjs index 58bc5a4..3427d92 100644 --- a/src/socket.mjs +++ b/src/socket.mjs @@ -1,92 +1,102 @@ import { Server } from 'net' import { unlink as fileDelete } from 'fs' import btc from 'better-try-catch' -import Logger from 'simple-node-logger' import ON_DEATH from 'death' //this is intentionally ugly - -let logger = { - logFilePath:'logfile.log', - timestampFormat:'YYYY-MM-DD HH:mm:ss.SSS' -} +import bunyan from 'bunyan' export default class Socket extends Server { constructor (path, opts={}) { super() this.path = path - if (opts.log) { - this.log = opts.log - this.logger = Logger.createSimpleLogger(opts.logger ? opts.logger: logger) - } + this.log_file=opts.log_file || './log.log' + this.log_opts = {streams:[]} + this.log_opts.name = opts.name ? opts.name : 'uci-unix-socket-consumer' + this.log_opts.streams.push({level: 'info',path: this.log_file }) + if (opts.log) this.log_opts.streams.push({level: 'info',stream: process.stdout}) + this.log = bunyan.createLogger(this.log_opts) + } // end constructor - async create ( app ) { + async create (app={}) { - this.on('error', async (err) => { - // recover from socket file that was not removed - if (err.code === 'EADDRINUSE') { - console.log(`socket path ${this.path} already exists...deleting`) - await fileDelete(this.path) - await this.listen(this.path) - return Promise.resolve(err.code) - } - // otherwise fatally exit - console.log('error creating socket: ',err.code) - return Promise.reject(err.code) - }) + return new Promise( (resolve,reject) => { - // - this.once('listening', async () => { - console.log(`socket created at ${this.path}`) + ON_DEATH( async () => { + this.log.info('\nhe\'s dead jim') + await this.destroy() - // this gets called for each client connection and is unique to each - this.once('connection', (socket) => { - console.log('server: new consumer connected') + }) - socket.on('data', async (buf) => { - let [err, packet] = btc(JSON.parse)(buf.toString()) - if (!err) { - if (this.log) this.logger.info(`data packet received to socket \n ${packet}`) + process.once('SIGUSR2', async () => { + await this.destroy + process.kill(process.pid, 'SIGUSR2') + }) - // set default packet processing - app.uspp = app.uspp || 'processPacket' - if (!app[app.uspp]) { - app.uspp = 'processPacket' - app.processPacket = async (packet) => { - packet.res='echoed' - return packet } + this.on('error', async (err) => { + // recover from socket file that was not removed + if (err.code === 'EADDRINUSE') { + this.log.info({socket: this.path}, 'already exists...deleting') + await fileDelete(this.path) + return await this.listen(this.path, app) + } + // otherwise fatally exit + this.log.info(err, 'creating socket') + reject(err) + }) + + this.listen(this.path, async (err, res) => { + + if (err) reject(err) + // this gets called for each client connection and is unique to each + this.on('connection', (socket) => { + this.log.info('server: new consumer connecting') + + socket.on('data', async (buf) => { + let [err, packet] = btc(JSON.parse)(buf.toString()) + if (!err) { + if (this.log) this.logger.info(`data packet received to socket \n ${packet}`) + this.log.info({packet:packet},'Server: packet received to socket') + + // set default packet processing + app.uspp = app.uspp || 'processPacket' + if (!app[app.uspp]) { + app.uspp = 'processPacket' + app.processPacket = async (packet) => { + packet.res='echoed' + this.log.info({packet:packet},'packet being sent to consumer') + return packet } + } + + socket.write(JSON.stringify(await app[app.uspp].bind(app)(packet))) + } + else { + this.log.info(`bad packet JSON syntax \n ${buf.toString()}`) + let error = { + error: 'bad packet JSON syntax sent', + packet: buf.toString() + } + socket.write(JSON.stringify(error)) } - socket.write(JSON.stringify(await app[app.uspp].bind(app)(packet))) - } - else { console.log(`bad packet JSON syntax \n ${buf.toString()}`)} - - }) // end incoming data listerner - socket.write('{"ready":true}') - }) // end connected consumer - }) // end socket listening listener - - // start it - await this.listen(this.path) - - // if socket is terminated then shutdown gracefully - ON_DEATH( async () => { - await this.destroy() - }) - - process.once('SIGUSR2', async () => { - await this.destroy - process.kill(process.pid, 'SIGUSR2') + }) // end incoming data listerner + this.log.info('Server: sending handshake to consumer') + socket.write('{"ready":true}') + }) // end connected consumer + this.log.info({socket: this.path},'socket created') + resolve(res) + }) }) } // end create + async destroy () { - console.log('\nclosing down socket') + this.log.info('closing down socket') await this.close() - console.log('\n all connections closed....exiting') + this.log.info('all connections closed....exiting') process.exit() } // end destroy diff --git a/test/.gitignore b/test/.gitignore new file mode 100644 index 0000000..c74d682 --- /dev/null +++ b/test/.gitignore @@ -0,0 +1 @@ +*.sock diff --git a/test/socket.mjs b/test/socket.mjs new file mode 100644 index 0000000..790d6d5 --- /dev/null +++ b/test/socket.mjs @@ -0,0 +1,24 @@ +import { Socket } from '../src' + +const USOCKET = __dirname + '/sample.sock' + +let socket = new Socket(USOCKET) + +const app = { + uspp: 'sprocessPacket', + sprocessPacket: async function (packet) { + packet.processed = true + console.log('server: packet processed') + console.dir(packet) + return packet + } +} + +; +(async () => { + + await socket.create(app) + +})().catch(err => { + console.error('FATAL: UNABLE TO START SYSTEM!\n',err) +}) diff --git a/test/socket.test.mjs b/test/socket.test.mjs new file mode 100644 index 0000000..bfb6246 --- /dev/null +++ b/test/socket.test.mjs @@ -0,0 +1,49 @@ +import { spawn } from 'child_process' +import chai from 'chai' +import chaiAsPromised from 'chai-as-promised' +chai.use(chaiAsPromised) +const expect = chai.expect + +import { Consumer } from '../src' + +const USOCKET = __dirname + '/sample.sock' + +let consumer = new Consumer(USOCKET) + +const app = { + ucpp: 'cprocessPacket', + cprocessPacket: async function (packet) { + if (packet.processed) consumer.emit(packet.cmd,packet.payload) + } +} + +const delay = time => new Promise(res=>setTimeout(()=>res(),time)) + +; +(async () => { + + let socket ={} + + before(async function(){ + socket = spawn('node',['-r', '@std/esm', './test/socket']) + await delay(500) // wait for socket to get going + }) + + after(async function(){ + socket.kill() + }) + + it('Connects and Processes some payload', async function () { + + console.log('connection is ',await consumer.connect(app)) + consumer.on('test1', function(payload){ + expect(payload).to.equal('payload1') + }) + let packet = {id: 'test consumer', cmd:'test1', payload:'payload1'} + consumer.send(packet) + }) + + +})().catch(err => { + console.error('FATAL: UNABLE TO START SYSTEM!\n',err) +})