swtich to pino logging with uci-logger module
parent
40d9565bf3
commit
bbc29317ee
|
@ -28,7 +28,7 @@ const USOCKET = __dirname + '/sample.sock'
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let test = new Test(USOCKET,{path: USOCKET, name:'example-socket'})
|
let test = new Test(USOCKET)
|
||||||
test.packet = test
|
test.packet = test
|
||||||
await test.create()
|
await test.create()
|
||||||
|
|
||||||
|
|
|
@ -32,11 +32,9 @@
|
||||||
"@std/esm": "cjs",
|
"@std/esm": "cjs",
|
||||||
"devDependencies": {
|
"devDependencies": {
|
||||||
"@std/esm": "^0.18.0",
|
"@std/esm": "^0.18.0",
|
||||||
"nodemon": "^1.14.3",
|
|
||||||
"chai": "^4.1.2",
|
"chai": "^4.1.2",
|
||||||
"chai-as-promised": "^7.1.1",
|
"chai-as-promised": "^7.1.1",
|
||||||
"codecov": "^3.0.0",
|
"codecov": "^3.0.0",
|
||||||
"eslint": "^3.19.0",
|
|
||||||
"istanbul": "^0.4.5",
|
"istanbul": "^0.4.5",
|
||||||
"mocha": "^4.0.1"
|
"mocha": "^4.0.1"
|
||||||
},
|
},
|
||||||
|
|
|
@ -1,7 +1,9 @@
|
||||||
import { Socket } from 'net'
|
import { Socket } from 'net'
|
||||||
import btc from 'better-try-catch'
|
import btc from 'better-try-catch'
|
||||||
import bunyan from 'bunyan'
|
// import bunyan from 'bunyan'
|
||||||
// import Stream from 'delimiter-stream'
|
import logger from '../../uci-logger/src/logger'
|
||||||
|
let log = {}
|
||||||
|
|
||||||
import JsonStream from './json-stream'
|
import JsonStream from './json-stream'
|
||||||
|
|
||||||
export default class Consumer extends Socket {
|
export default class Consumer extends Socket {
|
||||||
|
@ -29,20 +31,12 @@ export default class Consumer extends Socket {
|
||||||
console.dir(packet)
|
console.dir(packet)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// logging
|
log = logger.child({repo:'uci-socket',npm:'@uci/socket',file:'src/socket.mjs',class:'Socket', id:this.id, created:new Date().getTime()})
|
||||||
this.log_file=opts.log_file || './socket.log'
|
|
||||||
this.log_opts = {streams:[]}
|
|
||||||
this.log_opts.name = this.id
|
|
||||||
// this.log_opts.streams.push({level: 'info',path: this.log_file })
|
|
||||||
if (opts.log) this.log_opts.streams.push({level: 'info',stream: process.stdout})
|
|
||||||
this.log = bunyan.createLogger(this.log_opts)
|
|
||||||
// bind to class for other class functions
|
// bind to class for other class functions
|
||||||
this.connect = this.connect.bind(this)
|
this.connect = this.connect.bind(this)
|
||||||
this.ready = this.ready.bind(this)
|
this.__ready = this.__ready.bind(this)
|
||||||
}
|
}
|
||||||
|
|
||||||
ready() {return this._ready}
|
|
||||||
|
|
||||||
async connect () {
|
async connect () {
|
||||||
|
|
||||||
// if (context) this.packet.context = context
|
// if (context) this.packet.context = context
|
||||||
|
@ -51,8 +45,8 @@ export default class Consumer extends Socket {
|
||||||
return new Promise( (resolve,reject) => {
|
return new Promise( (resolve,reject) => {
|
||||||
|
|
||||||
const connect = () => {
|
const connect = () => {
|
||||||
if (this.host ==='127.0.0.1') this.log.warn('tcp consumer on same machine as host, use named Pipe(Unix) Socket Instead')
|
if (this.host ==='127.0.0.1') log.warn('tcp consumer on same machine as host, use named Pipe(Unix) Socket Instead')
|
||||||
this.log.info(`attempting to connect ${this.id} to ${this.path?this.path:this.host+':'+this.port}`)
|
log.info(`attempting to connect ${this.id} to ${this.path?this.path:this.host+':'+this.port}`)
|
||||||
super.connect({ port:this.port, host:this.host, path: this.path })
|
super.connect({ port:this.port, host:this.host, path: this.path })
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -64,11 +58,11 @@ export default class Consumer extends Socket {
|
||||||
this.once('connect', async () => {
|
this.once('connect', async () => {
|
||||||
clearTimeout(timeout)
|
clearTimeout(timeout)
|
||||||
this._listen()
|
this._listen()
|
||||||
this.log.info({path:this.path, host:this.host, post:this.port },`connected ${this.path?this.path:this.host+':'+this.port} waiting for socket ready handshake`)
|
log.info({path:this.path, host:this.host, post:this.port },`connected ${this.path?this.path:this.host+':'+this.port} waiting for socket ready handshake`)
|
||||||
this.setKeepAlive(this.keepAlive)
|
this.setKeepAlive(this.keepAlive)
|
||||||
let [err, res] = await btc(isReady).bind(this)(this.ready, this.wait, this.timeout)
|
let [err, res] = await btc(isReady).bind(this)(this.__ready, this.wait, this.timeout)
|
||||||
if (err) reject(err)
|
if (err) reject(err)
|
||||||
this.log.info('handshake done, authenticating')
|
log.info('handshake done, authenticating')
|
||||||
// TODO authenticate here by encrypting a payload with private key and sending that.
|
// TODO authenticate here by encrypting a payload with private key and sending that.
|
||||||
// await btc(authenticate)
|
// await btc(authenticate)
|
||||||
resolve(res)
|
resolve(res)
|
||||||
|
@ -78,9 +72,9 @@ export default class Consumer extends Socket {
|
||||||
if (err.code === 'EISCONN') {
|
if (err.code === 'EISCONN') {
|
||||||
return resolve('ready')
|
return resolve('ready')
|
||||||
}
|
}
|
||||||
this.log.warn(err.code)
|
log.warn(err.code)
|
||||||
setTimeout(() =>{
|
setTimeout(() =>{
|
||||||
this.log.warn(`retrying connect to ${this.path?this.path:this.host+':'+this.port}`)
|
log.warn(`retrying connect to ${this.path?this.path:this.host+':'+this.port}`)
|
||||||
connect()
|
connect()
|
||||||
}
|
}
|
||||||
,this.wait*10)
|
,this.wait*10)
|
||||||
|
@ -92,18 +86,6 @@ export default class Consumer extends Socket {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async _listen () {
|
|
||||||
this.log.info('listening for incoming packets from socket')
|
|
||||||
this.on('data', this.stream.onData)
|
|
||||||
this.stream.on('message', messageProcess.bind(this))
|
|
||||||
async function messageProcess (packet) {
|
|
||||||
if (packet.ready) {
|
|
||||||
this._ready = true
|
|
||||||
return }
|
|
||||||
await this.packet._process(packet)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async send(packet) {
|
async send(packet) {
|
||||||
await this.write(this.stream.serialize(packet))
|
await this.write(this.stream.serialize(packet))
|
||||||
// TODO handle error here? and/or await response if required before allowing more sending
|
// TODO handle error here? and/or await response if required before allowing more sending
|
||||||
|
@ -117,11 +99,24 @@ export default class Consumer extends Socket {
|
||||||
this.packet._process = func
|
this.packet._process = func
|
||||||
}
|
}
|
||||||
|
|
||||||
|
__ready() {return this._ready}
|
||||||
|
|
||||||
|
async _listen () {
|
||||||
|
log.info('listening for incoming packets from socket')
|
||||||
|
this.on('data', this.stream.onData)
|
||||||
|
this.stream.on('message', messageProcess.bind(this))
|
||||||
|
async function messageProcess (packet) {
|
||||||
|
if (packet.ready) {
|
||||||
|
this._ready = true
|
||||||
|
return }
|
||||||
|
await this.packet._process(packet)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
} // end class
|
} // end class
|
||||||
|
|
||||||
// wait for handshake packet from socket
|
// wait for handshake packet from socket
|
||||||
function isReady(ready, wait=30, timeout=1000) {
|
function isReady(ready, wait=30, timeout=1000) {
|
||||||
let log = this.log
|
|
||||||
let time = 0
|
let time = 0
|
||||||
return new Promise((resolve, reject) => {
|
return new Promise((resolve, reject) => {
|
||||||
(function waitReady(){
|
(function waitReady(){
|
||||||
|
|
|
@ -2,14 +2,15 @@ import { Server } from 'net'
|
||||||
import { unlink as fileDelete } from 'fs'
|
import { unlink as fileDelete } from 'fs'
|
||||||
import btc from 'better-try-catch'
|
import btc from 'better-try-catch'
|
||||||
import ON_DEATH from 'death' //this is intentionally ugly
|
import ON_DEATH from 'death' //this is intentionally ugly
|
||||||
import bunyan from 'bunyan'
|
import JSONStream from './json-stream'
|
||||||
// import Stream from 'delimiter-stream'
|
|
||||||
import JsonStream from './json-stream'
|
import logger from '../../uci-logger/src/logger'
|
||||||
|
let log = {}
|
||||||
|
|
||||||
export default class Socket extends Server {
|
export default class Socket extends Server {
|
||||||
constructor (path,opts={}) {
|
constructor (path,opts={}) {
|
||||||
super()
|
super()
|
||||||
this.id = opts.id || opts.name || 'socket:'+ Math.random()*100
|
this.id = opts.id || opts.name || 'socket:'+ new Date().getTime()
|
||||||
if (typeof(path)!=='string') {
|
if (typeof(path)!=='string') {
|
||||||
if (arguments.length === 2) {
|
if (arguments.length === 2) {
|
||||||
opts.host = path.host || opts.host
|
opts.host = path.host || opts.host
|
||||||
|
@ -22,16 +23,10 @@ export default class Socket extends Server {
|
||||||
packet.res='echoed'
|
packet.res='echoed'
|
||||||
return packet }
|
return packet }
|
||||||
}
|
}
|
||||||
// Change to environment based configuration for logger
|
|
||||||
this.log_file=opts.log_file || './socket.log'
|
|
||||||
this.log_opts = {streams:[]}
|
|
||||||
this.log_opts.name = this.id
|
|
||||||
// if (opts.log===1)// this.log_opts.streams.push({level: 'info',path: this.log_file })
|
|
||||||
if (opts.log) this.log_opts.streams.push({level: 'info',stream: process.stdout})
|
|
||||||
this.log = bunyan.createLogger(this.log_opts)
|
|
||||||
//self binding
|
//self binding
|
||||||
this._listen = this._listen.bind(this)
|
this._listen = this._listen.bind(this)
|
||||||
this.create = this.create.bind(this)
|
this.create = this.create.bind(this)
|
||||||
|
log = logger.child({repo:'uci-socket',npm:'@uci/socket',file:'src/socket.mjs',class:'Socket', id:this.id, created:new Date().getTime()})
|
||||||
|
|
||||||
} // end constructor
|
} // end constructor
|
||||||
|
|
||||||
|
@ -40,7 +35,7 @@ export default class Socket extends Server {
|
||||||
return new Promise( async (resolve,reject) => {
|
return new Promise( async (resolve,reject) => {
|
||||||
|
|
||||||
ON_DEATH( async () => {
|
ON_DEATH( async () => {
|
||||||
this.log.info('\nhe\'s dead jim')
|
log.info('\nhe\'s dead jim')
|
||||||
await this._destroy()
|
await this._destroy()
|
||||||
|
|
||||||
})
|
})
|
||||||
|
@ -55,13 +50,13 @@ export default class Socket extends Server {
|
||||||
if (err.code === 'EADDRINUSE') {
|
if (err.code === 'EADDRINUSE') {
|
||||||
let path = this.listen_opts.path
|
let path = this.listen_opts.path
|
||||||
if (path) { // if TCP socket should already be dead
|
if (path) { // if TCP socket should already be dead
|
||||||
this.log.info({socket: path}, 'already exists...deleting')
|
log.info({socket: path}, 'already exists...deleting')
|
||||||
await fileDelete(path)
|
await fileDelete(path)
|
||||||
return await this._listen(this.listen_opts)
|
return await this._listen(this.listen_opts)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// otherwise fatally exit
|
// otherwise fatally exit
|
||||||
this.log.info(err, 'creating socket')
|
log.info(err, 'creating socket')
|
||||||
reject(err)
|
reject(err)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -81,8 +76,8 @@ export default class Socket extends Server {
|
||||||
// this gets called for each client connection and is unique to each
|
// this gets called for each client connection and is unique to each
|
||||||
this.on('connection', (socket) => {
|
this.on('connection', (socket) => {
|
||||||
|
|
||||||
const stream = new JsonStream()
|
const stream = new JSONStream()
|
||||||
this.log.info('new consumer connecting sending handshake')
|
log.info('new consumer connecting sending handshake')
|
||||||
|
|
||||||
socket.write(stream.serialize({ready:true}))
|
socket.write(stream.serialize({ready:true}))
|
||||||
|
|
||||||
|
@ -94,16 +89,16 @@ export default class Socket extends Server {
|
||||||
}
|
}
|
||||||
|
|
||||||
}) // end connected consumer
|
}) // end connected consumer
|
||||||
this.log.info({socket: this.listen_opts},'socket created')
|
log.info({socket: this.listen_opts},'socket created')
|
||||||
return res
|
return res
|
||||||
}) // end listen callback
|
}) // end listen callback
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async _destroy () {
|
async _destroy () {
|
||||||
this.log.info('closing down socket')
|
log.info('closing down socket')
|
||||||
await this.close()
|
await this.close()
|
||||||
this.log.info('all connections closed....exiting')
|
log.info('all connections closed....exiting')
|
||||||
process.exit()
|
process.exit()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -24,7 +24,6 @@ describe('Connects and Processes a payload via Unix Socket using JSON packet wit
|
||||||
socket.stdout.on('data', function(buf) {
|
socket.stdout.on('data', function(buf) {
|
||||||
console.log('[Socket]', String(buf))
|
console.log('[Socket]', String(buf))
|
||||||
})
|
})
|
||||||
// await delay(500) // wait for sockets to get going
|
|
||||||
})
|
})
|
||||||
|
|
||||||
after(async function(){
|
after(async function(){
|
||||||
|
|
Loading…
Reference in New Issue