added in test, passing. Added in bunyan logger
This commit is contained in:
parent
ae72d3af02
commit
2b14508d6f
10 changed files with 200 additions and 106 deletions
1
.gitignore
vendored
1
.gitignore
vendored
|
@ -1,3 +1,4 @@
|
|||
/node_modules/
|
||||
/coverage/
|
||||
/syncd/
|
||||
*.log
|
||||
|
|
1
examples/.gitignore
vendored
Normal file
1
examples/.gitignore
vendored
Normal file
|
@ -0,0 +1 @@
|
|||
*.sock
|
|
@ -2,16 +2,16 @@ import Consumer from '../src/consumer'
|
|||
|
||||
const USOCKET = __dirname + '/sample.sock'
|
||||
|
||||
const socket1 = new Consumer(USOCKET)
|
||||
const socket2 = new Consumer(USOCKET)
|
||||
const client1= new Consumer(USOCKET, {log:true,name:'example-consumer1' })
|
||||
const client2 = new Consumer(USOCKET, {log:true,name:'example-consumer2'})
|
||||
|
||||
let packet1 = {name: 'socket1', cmd:'doit', data:'data sent by socket1'}
|
||||
let packet2 = {name: 'socket2', cmd:'doit', data:'data sent by socket2'}
|
||||
let packet1 = {name: 'client1', cmd:'doit', data:'data sent by client1'}
|
||||
let packet2 = {name: 'client2', cmd:'doit', data:'data sent by client2'}
|
||||
|
||||
// This is your socket handler waiting on a message to do something
|
||||
// This is your client handler object waiting on a message to do something
|
||||
let app = {
|
||||
processIt: function processPacket (packet) {
|
||||
console.log('incoming packet from socket to process')
|
||||
console.log('your custom processing of incoming packet')
|
||||
console.dir(packet)
|
||||
},
|
||||
ucpp: 'processIt'
|
||||
|
@ -20,12 +20,8 @@ let app = {
|
|||
;
|
||||
(async () => {
|
||||
|
||||
await socket1.connect()
|
||||
await socket2.connect()
|
||||
await socket1.listen(app)
|
||||
await socket2.listen(app)
|
||||
socket1.send(packet1)
|
||||
socket2.send(packet2)
|
||||
await Promise.all([client1.connect(app),client2.connect(app)])
|
||||
await Promise.all([client1.send(packet1),client2.send(packet2)])
|
||||
|
||||
})().catch(err => {
|
||||
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)
|
||||
|
|
|
@ -2,20 +2,19 @@ import { Socket } from '../src'
|
|||
|
||||
const USOCKET = __dirname + '/sample.sock'
|
||||
|
||||
console.log(USOCKET)
|
||||
;
|
||||
(async () => {
|
||||
|
||||
class Test {
|
||||
constructor(opts={}) {
|
||||
this.socket = new Socket(USOCKET)
|
||||
this.uspp = opts.uspp || 'processPacket'
|
||||
constructor() {
|
||||
this.socket = new Socket(USOCKET,{name:'example-socket'})
|
||||
}
|
||||
|
||||
async processPacket(packet) {
|
||||
console.log('packet being processed')
|
||||
console.dir(packet)
|
||||
return await this[packet.cmd](packet.data,packet.name)
|
||||
if (packet.cmd) return await this[packet.cmd](packet.data,packet.name)
|
||||
return {error: 'no command in packet', packet: packet }
|
||||
}
|
||||
|
||||
async doit(data,name) {
|
||||
|
@ -27,10 +26,9 @@ console.log(USOCKET)
|
|||
return(res)
|
||||
}
|
||||
|
||||
init() { this.socket.create(this)}
|
||||
async init() { return this.socket.create(this)}
|
||||
}
|
||||
|
||||
|
||||
let test = new Test()
|
||||
await test.init()
|
||||
|
||||
|
|
|
@ -7,9 +7,9 @@
|
|||
"testw": "mocha -r @std/esm test/*.test.mjs --watch --recurse --watch-extensions mjs",
|
||||
"test": "mocha -r @std/esm test/*.test.mjs",
|
||||
"testci": "istanbul cover ./node_modules/.bin/_mocha --report lcovonly -- -R spec --recursive && codecov || true",
|
||||
"s": "node -r @std/esm examples/server",
|
||||
"s": "node -r @std/esm examples/server | ./node_modules/.bin/bunyan",
|
||||
"devs": "./node_modules/.bin/nodemon -r @std/esm -e mjs examples/server",
|
||||
"c": "node -r @std/esm examples/client"
|
||||
"c": "node -r @std/esm examples/client | ./node_modules/.bin/bunyan -o short"
|
||||
},
|
||||
"author": "David Kebler",
|
||||
"license": "MIT",
|
||||
|
@ -40,6 +40,7 @@
|
|||
},
|
||||
"dependencies": {
|
||||
"better-try-catch": "^0.6.2",
|
||||
"bunyan": "^1.8.12",
|
||||
"death": "^1.1.0",
|
||||
"simple-node-logger": "^0.93.33"
|
||||
}
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
import { Socket } from 'net'
|
||||
import btc from 'better-try-catch'
|
||||
import bunyan from 'bunyan'
|
||||
|
||||
export default class Consumer extends Socket {
|
||||
constructor (path, opts={}) {
|
||||
|
@ -7,6 +8,14 @@ export default class Consumer extends Socket {
|
|||
this.path = path
|
||||
this.keepAlive = opts.keepAlive ? opts.keepAlive : true
|
||||
this._ready = false
|
||||
this.timeout = opts.timeout || 1000
|
||||
this.wait = opts.wait || 30
|
||||
this.log_file=opts.log_file || './log.log'
|
||||
this.log_opts = {streams:[]}
|
||||
this.log_opts.name = opts.name ? opts.name : 'uci-unix-socket-consumer'
|
||||
this.log_opts.streams.push({level: 'info',path: this.log_file })
|
||||
if (opts.log) this.log_opts.streams.push({level: 'info',stream: process.stdout})
|
||||
this.log = bunyan.createLogger(this.log_opts)
|
||||
}
|
||||
|
||||
ready() {return this._ready}
|
||||
|
@ -14,38 +23,42 @@ export default class Consumer extends Socket {
|
|||
async connect (app) {
|
||||
|
||||
await this.listen(app)
|
||||
console.log('consumer: listening')
|
||||
this.log.info('listening')
|
||||
|
||||
await super.connect({ path: this.path })
|
||||
console.log(`consumer: connected to ${this.path}`)
|
||||
this.setKeepAlive(this.keepAlive)
|
||||
return new Promise( (resolve,reject) => {
|
||||
|
||||
this.on('error', (error) => {
|
||||
'client socket error \n ', error.code
|
||||
this.on('error', (err) => {
|
||||
reject(err)
|
||||
})
|
||||
|
||||
super.connect({ path: this.path }, async () => {
|
||||
this.log.info({path:this.path},'connecting')
|
||||
this.setKeepAlive(this.keepAlive)
|
||||
let [err, res] = await btc(isReady).bind(this)(this.ready.bind(this), this.wait, this.timeout)
|
||||
if (err) reject(err)
|
||||
this.log.info('handshake done, connected')
|
||||
resolve(res)
|
||||
})
|
||||
})
|
||||
|
||||
return await isReady(this.ready.bind(this))
|
||||
|
||||
}
|
||||
|
||||
async send(packet) {
|
||||
let [err, strbuf] = btc(JSON.stringify)(packet)
|
||||
if (!err) {
|
||||
// await promisify(this.write)(strbuf)
|
||||
console.log('attempting to send')
|
||||
// console.log(await this.write(strbuf))
|
||||
let res = await new Promise((resolve, reject) => { //returning promise
|
||||
this.log.info({packet:packet},'attempting to send packet to socket')
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
this.write(strbuf, (err) => {
|
||||
if (err) reject(err)
|
||||
else resolve('complete')
|
||||
})
|
||||
})
|
||||
console.log('send is', res)
|
||||
}
|
||||
else { console.log(`bad packet JSON syntax \n ${packet} \n${err}`)}
|
||||
else { this.log.info({packet:packet}, 'bad packet JSON syntax')}
|
||||
}
|
||||
|
||||
async listen (app) {
|
||||
async listen (app={}) {
|
||||
this.on('data', async (buf) => {
|
||||
let [err, packet] = btc(JSON.parse)(buf.toString())
|
||||
if (!err) {
|
||||
|
@ -57,28 +70,28 @@ export default class Consumer extends Socket {
|
|||
if (!app[app.ucpp]) {
|
||||
app.ucpp = 'processPacket'
|
||||
app.processPacket = async (packet) => {
|
||||
console.log('incoming packet from socket')
|
||||
console.dir(packet)
|
||||
this.log.info({packet:packet},'incoming packet from socket')
|
||||
return packet }
|
||||
}
|
||||
app[app.ucpp](packet) // process the packet
|
||||
}
|
||||
else { console.log(`bad packet JSON syntax \n ${buf.toString()}`)}
|
||||
else { this.log.info({buf: buf.toString()},'bad packet JSON syntax')}
|
||||
})
|
||||
}
|
||||
|
||||
} // end class
|
||||
|
||||
// wait for handshake from socket
|
||||
function isReady(ready) {
|
||||
function isReady(ready, wait=30, timeout=1000) {
|
||||
let log = this.log
|
||||
let time = 0
|
||||
return new Promise(function (resolve, reject) {
|
||||
return new Promise((resolve, reject) => {
|
||||
(function waitReady(){
|
||||
if (time > 3000) return reject('timeout')
|
||||
if (time > timeout) return reject('timeout trying to connect')
|
||||
if (ready()) return resolve('ready')
|
||||
console.log('waiting for 30ms')
|
||||
log.info('waiting for 30ms for handshake')
|
||||
time += 30
|
||||
setTimeout(waitReady, 30)
|
||||
setTimeout(waitReady, wait)
|
||||
})()
|
||||
})
|
||||
}
|
||||
|
|
134
src/socket.mjs
134
src/socket.mjs
|
@ -1,92 +1,102 @@
|
|||
import { Server } from 'net'
|
||||
import { unlink as fileDelete } from 'fs'
|
||||
import btc from 'better-try-catch'
|
||||
import Logger from 'simple-node-logger'
|
||||
import ON_DEATH from 'death' //this is intentionally ugly
|
||||
|
||||
let logger = {
|
||||
logFilePath:'logfile.log',
|
||||
timestampFormat:'YYYY-MM-DD HH:mm:ss.SSS'
|
||||
}
|
||||
import bunyan from 'bunyan'
|
||||
|
||||
export default class Socket extends Server {
|
||||
constructor (path, opts={}) {
|
||||
super()
|
||||
this.path = path
|
||||
if (opts.log) {
|
||||
this.log = opts.log
|
||||
this.logger = Logger.createSimpleLogger(opts.logger ? opts.logger: logger)
|
||||
}
|
||||
this.log_file=opts.log_file || './log.log'
|
||||
this.log_opts = {streams:[]}
|
||||
this.log_opts.name = opts.name ? opts.name : 'uci-unix-socket-consumer'
|
||||
this.log_opts.streams.push({level: 'info',path: this.log_file })
|
||||
if (opts.log) this.log_opts.streams.push({level: 'info',stream: process.stdout})
|
||||
this.log = bunyan.createLogger(this.log_opts)
|
||||
|
||||
} // end constructor
|
||||
|
||||
|
||||
async create ( app ) {
|
||||
async create (app={}) {
|
||||
|
||||
this.on('error', async (err) => {
|
||||
// recover from socket file that was not removed
|
||||
if (err.code === 'EADDRINUSE') {
|
||||
console.log(`socket path ${this.path} already exists...deleting`)
|
||||
await fileDelete(this.path)
|
||||
await this.listen(this.path)
|
||||
return Promise.resolve(err.code)
|
||||
}
|
||||
// otherwise fatally exit
|
||||
console.log('error creating socket: ',err.code)
|
||||
return Promise.reject(err.code)
|
||||
})
|
||||
return new Promise( (resolve,reject) => {
|
||||
|
||||
//
|
||||
this.once('listening', async () => {
|
||||
console.log(`socket created at ${this.path}`)
|
||||
ON_DEATH( async () => {
|
||||
this.log.info('\nhe\'s dead jim')
|
||||
await this.destroy()
|
||||
|
||||
// this gets called for each client connection and is unique to each
|
||||
this.once('connection', (socket) => {
|
||||
console.log('server: new consumer connected')
|
||||
})
|
||||
|
||||
socket.on('data', async (buf) => {
|
||||
let [err, packet] = btc(JSON.parse)(buf.toString())
|
||||
if (!err) {
|
||||
if (this.log) this.logger.info(`data packet received to socket \n ${packet}`)
|
||||
process.once('SIGUSR2', async () => {
|
||||
await this.destroy
|
||||
process.kill(process.pid, 'SIGUSR2')
|
||||
})
|
||||
|
||||
// set default packet processing
|
||||
app.uspp = app.uspp || 'processPacket'
|
||||
if (!app[app.uspp]) {
|
||||
app.uspp = 'processPacket'
|
||||
app.processPacket = async (packet) => {
|
||||
packet.res='echoed'
|
||||
return packet }
|
||||
this.on('error', async (err) => {
|
||||
// recover from socket file that was not removed
|
||||
if (err.code === 'EADDRINUSE') {
|
||||
this.log.info({socket: this.path}, 'already exists...deleting')
|
||||
await fileDelete(this.path)
|
||||
return await this.listen(this.path, app)
|
||||
}
|
||||
// otherwise fatally exit
|
||||
this.log.info(err, 'creating socket')
|
||||
reject(err)
|
||||
})
|
||||
|
||||
this.listen(this.path, async (err, res) => {
|
||||
|
||||
if (err) reject(err)
|
||||
// this gets called for each client connection and is unique to each
|
||||
this.on('connection', (socket) => {
|
||||
this.log.info('server: new consumer connecting')
|
||||
|
||||
socket.on('data', async (buf) => {
|
||||
let [err, packet] = btc(JSON.parse)(buf.toString())
|
||||
if (!err) {
|
||||
if (this.log) this.logger.info(`data packet received to socket \n ${packet}`)
|
||||
this.log.info({packet:packet},'Server: packet received to socket')
|
||||
|
||||
// set default packet processing
|
||||
app.uspp = app.uspp || 'processPacket'
|
||||
if (!app[app.uspp]) {
|
||||
app.uspp = 'processPacket'
|
||||
app.processPacket = async (packet) => {
|
||||
packet.res='echoed'
|
||||
this.log.info({packet:packet},'packet being sent to consumer')
|
||||
return packet }
|
||||
}
|
||||
|
||||
socket.write(JSON.stringify(await app[app.uspp].bind(app)(packet)))
|
||||
}
|
||||
else {
|
||||
this.log.info(`bad packet JSON syntax \n ${buf.toString()}`)
|
||||
let error = {
|
||||
error: 'bad packet JSON syntax sent',
|
||||
packet: buf.toString()
|
||||
}
|
||||
socket.write(JSON.stringify(error))
|
||||
}
|
||||
|
||||
socket.write(JSON.stringify(await app[app.uspp].bind(app)(packet)))
|
||||
}
|
||||
else { console.log(`bad packet JSON syntax \n ${buf.toString()}`)}
|
||||
|
||||
}) // end incoming data listerner
|
||||
socket.write('{"ready":true}')
|
||||
}) // end connected consumer
|
||||
}) // end socket listening listener
|
||||
|
||||
// start it
|
||||
await this.listen(this.path)
|
||||
|
||||
// if socket is terminated then shutdown gracefully
|
||||
ON_DEATH( async () => {
|
||||
await this.destroy()
|
||||
})
|
||||
|
||||
process.once('SIGUSR2', async () => {
|
||||
await this.destroy
|
||||
process.kill(process.pid, 'SIGUSR2')
|
||||
}) // end incoming data listerner
|
||||
this.log.info('Server: sending handshake to consumer')
|
||||
socket.write('{"ready":true}')
|
||||
}) // end connected consumer
|
||||
|
||||
this.log.info({socket: this.path},'socket created')
|
||||
resolve(res)
|
||||
})
|
||||
})
|
||||
|
||||
} // end create
|
||||
|
||||
|
||||
async destroy () {
|
||||
|
||||
console.log('\nclosing down socket')
|
||||
this.log.info('closing down socket')
|
||||
await this.close()
|
||||
console.log('\n all connections closed....exiting')
|
||||
this.log.info('all connections closed....exiting')
|
||||
process.exit()
|
||||
|
||||
} // end destroy
|
||||
|
|
1
test/.gitignore
vendored
Normal file
1
test/.gitignore
vendored
Normal file
|
@ -0,0 +1 @@
|
|||
*.sock
|
24
test/socket.mjs
Normal file
24
test/socket.mjs
Normal file
|
@ -0,0 +1,24 @@
|
|||
import { Socket } from '../src'
|
||||
|
||||
const USOCKET = __dirname + '/sample.sock'
|
||||
|
||||
let socket = new Socket(USOCKET)
|
||||
|
||||
const app = {
|
||||
uspp: 'sprocessPacket',
|
||||
sprocessPacket: async function (packet) {
|
||||
packet.processed = true
|
||||
console.log('server: packet processed')
|
||||
console.dir(packet)
|
||||
return packet
|
||||
}
|
||||
}
|
||||
|
||||
;
|
||||
(async () => {
|
||||
|
||||
await socket.create(app)
|
||||
|
||||
})().catch(err => {
|
||||
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)
|
||||
})
|
49
test/socket.test.mjs
Normal file
49
test/socket.test.mjs
Normal file
|
@ -0,0 +1,49 @@
|
|||
import { spawn } from 'child_process'
|
||||
import chai from 'chai'
|
||||
import chaiAsPromised from 'chai-as-promised'
|
||||
chai.use(chaiAsPromised)
|
||||
const expect = chai.expect
|
||||
|
||||
import { Consumer } from '../src'
|
||||
|
||||
const USOCKET = __dirname + '/sample.sock'
|
||||
|
||||
let consumer = new Consumer(USOCKET)
|
||||
|
||||
const app = {
|
||||
ucpp: 'cprocessPacket',
|
||||
cprocessPacket: async function (packet) {
|
||||
if (packet.processed) consumer.emit(packet.cmd,packet.payload)
|
||||
}
|
||||
}
|
||||
|
||||
const delay = time => new Promise(res=>setTimeout(()=>res(),time))
|
||||
|
||||
;
|
||||
(async () => {
|
||||
|
||||
let socket ={}
|
||||
|
||||
before(async function(){
|
||||
socket = spawn('node',['-r', '@std/esm', './test/socket'])
|
||||
await delay(500) // wait for socket to get going
|
||||
})
|
||||
|
||||
after(async function(){
|
||||
socket.kill()
|
||||
})
|
||||
|
||||
it('Connects and Processes some payload', async function () {
|
||||
|
||||
console.log('connection is ',await consumer.connect(app))
|
||||
consumer.on('test1', function(payload){
|
||||
expect(payload).to.equal('payload1')
|
||||
})
|
||||
let packet = {id: 'test consumer', cmd:'test1', payload:'payload1'}
|
||||
consumer.send(packet)
|
||||
})
|
||||
|
||||
|
||||
})().catch(err => {
|
||||
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)
|
||||
})
|
Loading…
Reference in a new issue