fix tcp options handling error

add id property
use _ for private methods
tls
David Kebler 2018-01-31 11:30:32 -08:00
parent 3beb53d2a3
commit 40d9565bf3
3 changed files with 38 additions and 36 deletions

View File

@ -7,18 +7,16 @@ import JsonStream from './json-stream'
export default class Consumer extends Socket { export default class Consumer extends Socket {
constructor (path={}, opts={}) { constructor (path={}, opts={}) {
super() super()
// set or tcp socket this.id = opts.id || opts.name || 'consumer:'+ Math.random()*100
if (typeof(path)!=='string') { if (typeof(path)!=='string') {
opts = path if (arguments.length === 2) {
if (!path.host || !path.port) opts = path opts.host = path.host || opts.host
this.host = path.host || '127.0.0.1' // TODO log a warning about host on same machine opts.port = path.port || opts.port
this.port = path.port || 8080 } else opts=path
} else this.path = path this.host = opts.host || '127.0.0.1' // TODO log a warning about host on same machine
this.packet = { this.port = opts.port || 8080
_process: async (packet) => { } else {
console.log('default processor -- packet from socket') this.path = path
console.dir(packet)
return packet }
} }
this.keepAlive = opts.keepAlive ? opts.keepAlive : true this.keepAlive = opts.keepAlive ? opts.keepAlive : true
this._ready = false this._ready = false
@ -26,14 +24,15 @@ export default class Consumer extends Socket {
this.wait = opts.wait || 5 this.wait = opts.wait || 5
this.stream = new JsonStream() this.stream = new JsonStream()
this.packet = { this.packet = {
_process: (packet) => { _process: async (packet) => {
packet.res='echoed' console.log('default consumer processor -- packet from socket')
return packet } console.dir(packet)
}
} }
// logging // logging
this.log_file=opts.log_file || './socket.log' this.log_file=opts.log_file || './socket.log'
this.log_opts = {streams:[]} this.log_opts = {streams:[]}
this.log_opts.name = opts.name ? opts.name : 'uci-socket-consumer' this.log_opts.name = this.id
// this.log_opts.streams.push({level: 'info',path: this.log_file }) // this.log_opts.streams.push({level: 'info',path: this.log_file })
if (opts.log) this.log_opts.streams.push({level: 'info',stream: process.stdout}) if (opts.log) this.log_opts.streams.push({level: 'info',stream: process.stdout})
this.log = bunyan.createLogger(this.log_opts) this.log = bunyan.createLogger(this.log_opts)
@ -52,18 +51,20 @@ export default class Consumer extends Socket {
return new Promise( (resolve,reject) => { return new Promise( (resolve,reject) => {
const connect = () => { const connect = () => {
if (this.host ==='127.0.0.1') this.log.warn('tcp consumer on same machine as host, use named Pipe(Unix) Socket Instead')
this.log.info(`attempting to connect ${this.id} to ${this.path?this.path:this.host+':'+this.port}`)
super.connect({ port:this.port, host:this.host, path: this.path }) super.connect({ port:this.port, host:this.host, path: this.path })
} }
const timeout = setTimeout(() =>{ const timeout = setTimeout(() =>{
reject(`unable to connect in ${this.timeout*10}ms`) reject(`unable to connect in ${this.timeout*10}ms to ${this.path?this.path:this.host+':'+this.port}`)
} }
,this.timeout*10) ,this.timeout*10)
this.once('connect', async () => { this.once('connect', async () => {
clearTimeout(timeout) clearTimeout(timeout)
this.listen() this._listen()
this.log.info({path:this.path},'connected waiting for socket ready handshake') this.log.info({path:this.path, host:this.host, post:this.port },`connected ${this.path?this.path:this.host+':'+this.port} waiting for socket ready handshake`)
this.setKeepAlive(this.keepAlive) this.setKeepAlive(this.keepAlive)
let [err, res] = await btc(isReady).bind(this)(this.ready, this.wait, this.timeout) let [err, res] = await btc(isReady).bind(this)(this.ready, this.wait, this.timeout)
if (err) reject(err) if (err) reject(err)
@ -79,7 +80,7 @@ export default class Consumer extends Socket {
} }
this.log.warn(err.code) this.log.warn(err.code)
setTimeout(() =>{ setTimeout(() =>{
this.log.warn('retry connect') this.log.warn(`retrying connect to ${this.path?this.path:this.host+':'+this.port}`)
connect() connect()
} }
,this.wait*10) ,this.wait*10)
@ -91,8 +92,7 @@ export default class Consumer extends Socket {
} }
async listen () { async _listen () {
this.log.info('listening for incoming packets from socket') this.log.info('listening for incoming packets from socket')
this.on('data', this.stream.onData) this.on('data', this.stream.onData)
this.stream.on('message', messageProcess.bind(this)) this.stream.on('message', messageProcess.bind(this))

View File

@ -7,11 +7,15 @@ import bunyan from 'bunyan'
import JsonStream from './json-stream' import JsonStream from './json-stream'
export default class Socket extends Server { export default class Socket extends Server {
constructor (path={},opts={}) { constructor (path,opts={}) {
super() super()
this.id = opts.id || opts.name || 'socket:'+ Math.random()*100
if (typeof(path)!=='string') { if (typeof(path)!=='string') {
if (!path.host || !path.port) opts = path if (arguments.length === 2) {
this.listen_opts = { host: path.host || '0.0.0.0', port: path.port || 8080} opts.host = path.host || opts.host
opts.port = path.port || opts.port
} else opts=path
this.listen_opts = { host: opts.host || '0.0.0.0', port: opts.port || 8080}
} else this.listen_opts = { path: path } } else this.listen_opts = { path: path }
this.packet = { // default packet processing - simple echo server this.packet = { // default packet processing - simple echo server
_process: (packet) => { _process: (packet) => {
@ -21,12 +25,12 @@ export default class Socket extends Server {
// Change to environment based configuration for logger // Change to environment based configuration for logger
this.log_file=opts.log_file || './socket.log' this.log_file=opts.log_file || './socket.log'
this.log_opts = {streams:[]} this.log_opts = {streams:[]}
this.log_opts.name = opts.name ? opts.name : 'uci-socket' this.log_opts.name = this.id
// if (opts.log===1)// this.log_opts.streams.push({level: 'info',path: this.log_file }) // if (opts.log===1)// this.log_opts.streams.push({level: 'info',path: this.log_file })
if (opts.log) this.log_opts.streams.push({level: 'info',stream: process.stdout}) if (opts.log) this.log_opts.streams.push({level: 'info',stream: process.stdout})
this.log = bunyan.createLogger(this.log_opts) this.log = bunyan.createLogger(this.log_opts)
//self binding //self binding
this.listen = this.listen.bind(this) this._listen = this._listen.bind(this)
this.create = this.create.bind(this) this.create = this.create.bind(this)
} // end constructor } // end constructor
@ -37,12 +41,12 @@ export default class Socket extends Server {
ON_DEATH( async () => { ON_DEATH( async () => {
this.log.info('\nhe\'s dead jim') this.log.info('\nhe\'s dead jim')
await this.destroy() await this._destroy()
}) })
process.once('SIGUSR2', async () => { process.once('SIGUSR2', async () => {
await this.destroy await this._destroy
process.kill(process.pid, 'SIGUSR2') process.kill(process.pid, 'SIGUSR2')
}) })
@ -53,7 +57,7 @@ export default class Socket extends Server {
if (path) { // if TCP socket should already be dead if (path) { // if TCP socket should already be dead
this.log.info({socket: path}, 'already exists...deleting') this.log.info({socket: path}, 'already exists...deleting')
await fileDelete(path) await fileDelete(path)
return await this.listen(this.listen_opts) return await this._listen(this.listen_opts)
} }
} }
// otherwise fatally exit // otherwise fatally exit
@ -61,23 +65,23 @@ export default class Socket extends Server {
reject(err) reject(err)
}) })
let [err, res] = await btc(this.listen)(this.listen_opts) let [err, res] = await btc(this._listen)(this.listen_opts)
if (err) reject(err) if (err) reject(err)
resolve(res) resolve(res)
}) // end promise }) // end promise
} // end create } // end create
async listen (opts) { async _listen (opts) {
super.listen(opts, async (err, res) => { super.listen(opts, async (err, res) => {
if (err) return err if (err) return err
// this gets called for each client connection and is unique to each // this gets called for each client connection and is unique to each
this.on('connection', (socket) => { this.on('connection', (socket) => {
const stream = new JsonStream() const stream = new JsonStream()
this.log.info('new consumer connecting sending handshake') this.log.info('new consumer connecting sending handshake')
socket.write(stream.serialize({ready:true})) socket.write(stream.serialize({ready:true}))
@ -90,15 +94,13 @@ export default class Socket extends Server {
} }
}) // end connected consumer }) // end connected consumer
this.log.info({socket: this.listen_opts},'socket created') this.log.info({socket: this.listen_opts},'socket created')
return res return res
}) // end listen callback }) // end listen callback
} }
async destroy () { async _destroy () {
this.log.info('closing down socket') this.log.info('closing down socket')
await this.close() await this.close()
this.log.info('all connections closed....exiting') this.log.info('all connections closed....exiting')

View File

@ -87,7 +87,7 @@ describe('Connects and Processes a payload via Unix Socket using JSON packet wit
} }
} }
consumer2.processPacket = function (packet) { consumer2.packet._process = function (packet) {
return packet return packet
} }