moved log options to top

refactor passed int opts  (path no longer passed separately)
removed .packet property moving process to root property
and thus _packetProcess is private method for packet processing
re-export in index both Socket and Consumer as named and as props of default
refactor test and examples accordingly
tls
David Kebler 2018-02-03 13:33:25 -08:00
parent bbc29317ee
commit 751491ee00
14 changed files with 141 additions and 121 deletions

View File

@ -2,8 +2,8 @@ import Consumer from '../src/consumer'
const USOCKET = __dirname + '/sample.sock' const USOCKET = __dirname + '/sample.sock'
const client1= new Consumer(USOCKET, {log:false,name:'example-consumer1' }) const client1= new Consumer({path:USOCKET,name:'example-consumer1' })
const client2 = new Consumer(USOCKET, {log:false,name:'example-consumer2'}) const client2 = new Consumer({path:USOCKET,name:'example-consumer2'})
let packet1 = {name: 'client1', cmd:'doit', data:'data sent by client1'} let packet1 = {name: 'client1', cmd:'doit', data:'data sent by client1'}
let packet2 = {name: 'client2', cmd:'doit', data:'data sent by client2'} let packet2 = {name: 'client2', cmd:'doit', data:'data sent by client2'}
@ -15,13 +15,16 @@ const process = function (packet) {
client1.registerPacketProcessor(process) client1.registerPacketProcessor(process)
client2.packet._process = process client2._packetProcess = process
; ;
(async () => { (async () => {
await Promise.all([client1.connect(),client2.connect()]) await Promise.all([client1.connect(),client2.connect()])
await Promise.all([client1.send(packet1),client2.send(packet2)]) client1.send(packet1)
client2.send(packet2)
client1.end()
client2.end()
})().catch(err => { })().catch(err => {
console.error('FATAL: UNABLE TO START SYSTEM!\n',err) console.error('FATAL: UNABLE TO START SYSTEM!\n',err)

View File

@ -6,14 +6,21 @@ const USOCKET = __dirname + '/sample.sock'
class Client extends Consumer { class Client extends Consumer {
constructor(path,opts) { constructor(path,opts) {
super(path,opts) super(path,opts)
this.packet._process = async function (packet) {
console.log(`Packet from ${packet.name} Processed by Socket: ${packet.status}`)
}
}
} }
const client1= new Client(USOCKET, {log:false,name:'example-consumer1' }) async _packetProcess (packet) {
const client2 = new Client(USOCKET, {log:false,name:'example-consumer2'}) this[packet.cmd](packet)
}
async reply (packet) {
console.log(`Packet from ${packet.name} Processed by Socket: ${packet.status}`)
console.log(`Socket replied with data: ${packet.data}`)
}
}
const client1= new Client({path:USOCKET,name:'example-consumer1' })
const client2 = new Client({path:USOCKET,name:'example-consumer2'})
let packet1 = {name: 'client1', cmd:'doit', data:'data sent by client1'} let packet1 = {name: 'client1', cmd:'doit', data:'data sent by client1'}
let packet2 = {name: 'client2', cmd:'doit', data:'data sent by client2'} let packet2 = {name: 'client2', cmd:'doit', data:'data sent by client2'}
@ -23,7 +30,8 @@ let packet2 = {name: 'client2', cmd:'doit', data:'data sent by client2'}
await Promise.all([client1.connect(),client2.connect()]) await Promise.all([client1.connect(),client2.connect()])
await Promise.all([client1.send(packet1),client2.send(packet2)]) await Promise.all([client1.send(packet1),client2.send(packet2)])
client1.end()
client2.end()
})().catch(err => { })().catch(err => {
console.error('FATAL: UNABLE TO START SYSTEM!\n',err) console.error('FATAL: UNABLE TO START SYSTEM!\n',err)
}) })

View File

@ -6,11 +6,11 @@ const USOCKET = __dirname + '/sample.sock'
(async () => { (async () => {
class Test extends Socket { class Test extends Socket {
constructor(path,opts) { constructor(opts) {
super(path,opts) super(opts)
} }
async _process(packet) { async _packetProcess(packet) {
console.log('packet being processed') console.log('packet being processed')
console.dir(packet) console.dir(packet)
if (packet.cmd) return await this[packet.cmd](packet.data,packet.name) if (packet.cmd) return await this[packet.cmd](packet.data,packet.name)
@ -22,14 +22,14 @@ const USOCKET = __dirname + '/sample.sock'
console.log('data:', data) console.log('data:', data)
res.status ='success' res.status ='success'
res.name = name res.name = name
res.data = 'this would be response from device' res.cmd = 'reply'
res.data = 'this might be response data from another process'
return(res) return(res)
} }
} }
let test = new Test(USOCKET) let test = new Test({path:USOCKET})
test.packet = test
await test.create() await test.create()
})().catch(err => { })().catch(err => {

View File

@ -1,37 +1,36 @@
import { Socket } from 'net' import { Socket } from 'net'
import btc from 'better-try-catch' import btc from 'better-try-catch'
// import bunyan from 'bunyan'
import logger from '../../uci-logger/src/logger'
let log = {}
import JsonStream from './json-stream' import JsonStream from './json-stream'
export default class Consumer extends Socket { import logger from '../../uci-logger/src/logger'
constructor (path={}, opts={}) { let log = {}
super() const LOG_OPTS = {
this.id = opts.id || opts.name || 'consumer:'+ Math.random()*100 repo:'uci-socket',
if (typeof(path)!=='string') { npm:'@uci/socket',
if (arguments.length === 2) { file:'src/consumer.mjs',
opts.host = path.host || opts.host class:'Consumer',
opts.port = path.port || opts.port id:this.id,
} else opts=path instance_created:new Date().getTime()
this.host = opts.host || '127.0.0.1' // TODO log a warning about host on same machine
this.port = opts.port || 8080
} else {
this.path = path
} }
const DEFAULT_PIPE = __dirname + '/unix.sock'
export default class Consumer extends Socket {
constructor (opts={}) {
super()
this.id = opts.id || opts.name || 'socket:'+ new Date().getTime()
if (!opts.path && opts.np) opts.path = DEFAULT_PIPE
if (!opts.path) {
opts.host = opts.host || '127.0.0.1'
opts.port = opts.port || 8080
} else opts.np = true
this.opts=opts
this.keepAlive = opts.keepAlive ? opts.keepAlive : true this.keepAlive = opts.keepAlive ? opts.keepAlive : true
this._ready = false this._ready = false
this.timeout = opts.timeout || 500 this.timeout = opts.timeout || 500
this.wait = opts.wait || 5 this.wait = opts.wait || 5
this.stream = new JsonStream() this.stream = new JsonStream()
this.packet = { log = logger.child(LOG_OPTS)
_process: async (packet) => {
console.log('default consumer processor -- packet from socket')
console.dir(packet)
}
}
log = logger.child({repo:'uci-socket',npm:'@uci/socket',file:'src/socket.mjs',class:'Socket', id:this.id, created:new Date().getTime()})
// bind to class for other class functions // bind to class for other class functions
this.connect = this.connect.bind(this) this.connect = this.connect.bind(this)
this.__ready = this.__ready.bind(this) this.__ready = this.__ready.bind(this)
@ -45,20 +44,20 @@ export default class Consumer extends Socket {
return new Promise( (resolve,reject) => { return new Promise( (resolve,reject) => {
const connect = () => { const connect = () => {
if (this.host ==='127.0.0.1') log.warn('tcp consumer on same machine as host, use named Pipe(Unix) Socket Instead') if (this.opts.host ==='127.0.0.1') log.warn('tcp consumer on same machine as host, use named Pipe(Unix) Socket Instead')
log.info(`attempting to connect ${this.id} to ${this.path?this.path:this.host+':'+this.port}`) log.info({opts:this.opts},`attempting to connect ${this.id} to socket`)
super.connect({ port:this.port, host:this.host, path: this.path }) super.connect(this.opts)
} }
const timeout = setTimeout(() =>{ const timeout = setTimeout(() =>{
reject(`unable to connect in ${this.timeout*10}ms to ${this.path?this.path:this.host+':'+this.port}`) reject({opts:this.opts},`unable to connect in ${this.timeout*10}ms`)
} }
,this.timeout*10) ,this.timeout*10)
this.once('connect', async () => { this.once('connect', async () => {
clearTimeout(timeout) clearTimeout(timeout)
this._listen() this._listen()
log.info({path:this.path, host:this.host, post:this.port },`connected ${this.path?this.path:this.host+':'+this.port} waiting for socket ready handshake`) log.info({opts:this.opts},'connected waiting for socket ready handshake')
this.setKeepAlive(this.keepAlive) this.setKeepAlive(this.keepAlive)
let [err, res] = await btc(isReady).bind(this)(this.__ready, this.wait, this.timeout) let [err, res] = await btc(isReady).bind(this)(this.__ready, this.wait, this.timeout)
if (err) reject(err) if (err) reject(err)
@ -74,10 +73,10 @@ export default class Consumer extends Socket {
} }
log.warn(err.code) log.warn(err.code)
setTimeout(() =>{ setTimeout(() =>{
log.warn(`retrying connect to ${this.path?this.path:this.host+':'+this.port}`) // log.warn(`retrying connect to ${this.opts}`)
connect() connect()
} }
,this.wait*10) ,this.wait*100)
}) })
connect() connect()
@ -88,7 +87,9 @@ export default class Consumer extends Socket {
async send(packet) { async send(packet) {
await this.write(this.stream.serialize(packet)) await this.write(this.stream.serialize(packet))
// TODO handle error here? and/or await response if required before allowing more sending // TODO handle possible error
// TODO await response if required by setting id to packet
// then set a flag (and promise) that is resovled in the listener
} }
// TODO register alt stream processor (emit 'message' with JSON, serialize function, onData method for raw socket chucks) // TODO register alt stream processor (emit 'message' with JSON, serialize function, onData method for raw socket chucks)
@ -96,9 +97,11 @@ export default class Consumer extends Socket {
// TODO register authenciation function (set up default) // TODO register authenciation function (set up default)
registerPacketProcessor (func) { registerPacketProcessor (func) {
this.packet._process = func this._packetProcess = func
} }
// PRIVATE METHODS
__ready() {return this._ready} __ready() {return this._ready}
async _listen () { async _listen () {
@ -109,13 +112,22 @@ export default class Consumer extends Socket {
if (packet.ready) { if (packet.ready) {
this._ready = true this._ready = true
return } return }
await this.packet._process(packet) await this._packetProcess(packet)
} }
} }
// default packet process just a simple console logger
_packetProcess (packet) {
console.log('default consumer processor -- packet from socket')
console.dir(packet)
}
} // end class } // end class
// wait for handshake packet from socket
// HELP FUNCTIONS
// wait until a passed ready function returns true
function isReady(ready, wait=30, timeout=1000) { function isReady(ready, wait=30, timeout=1000) {
let time = 0 let time = 0
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {

View File

@ -1,2 +1,6 @@
export { default as Socket } from './socket' import Socket from './socket'
export { default as Consumer } from './consumer' import Consumer from './consumer'
export {Socket as Socket}
export { Consumer as Consumer }
export default { Socket, Consumer }

View File

@ -1,45 +1,46 @@
import { Server } from 'net' import { Server } from 'net'
import { unlink as fileDelete } from 'fs' import { unlink as fileDelete } from 'fs'
import btc from 'better-try-catch' import btc from 'better-try-catch'
import ON_DEATH from 'death' //this is intentionally ugly import _ON_DEATH from 'death' //this is intentionally ugly
import JSONStream from './json-stream' import JSONStream from './json-stream'
import logger from '../../uci-logger/src/logger' import logger from '../../uci-logger/src/logger'
let log = {} let log = {}
const LOG_OPTS = {
repo:'uci-socket',
npm:'@uci/socket',
file:'src/socket.mjs',
class:'Socket',
id:this.id,
instance_created:new Date().getTime()
}
const DEFAULT_PIPE = __dirname + '/unix.sock'
export default class Socket extends Server { export default class Socket extends Server {
constructor (path,opts={}) { constructor (opts={}) {
super() super()
this.id = opts.id || opts.name || 'socket:'+ new Date().getTime() this.id = opts.id || opts.name || 'socket:'+ new Date().getTime()
if (typeof(path)!=='string') { if (!opts.path && opts.np) opts.path = DEFAULT_PIPE
if (arguments.length === 2) { if (!opts.path) {
opts.host = path.host || opts.host opts.host = opts.host || '0.0.0.0'
opts.port = path.port || opts.port opts.port = opts.port || 8080
} else opts=path } else opts.np = true
this.listen_opts = { host: opts.host || '0.0.0.0', port: opts.port || 8080} this.opts = opts
} else this.listen_opts = { path: path } //self bindings
this.packet = { // default packet processing - simple echo server
_process: (packet) => {
packet.res='echoed'
return packet }
}
//self binding
this._listen = this._listen.bind(this) this._listen = this._listen.bind(this)
this.create = this.create.bind(this) this.create = this.create.bind(this)
log = logger.child({repo:'uci-socket',npm:'@uci/socket',file:'src/socket.mjs',class:'Socket', id:this.id, created:new Date().getTime()}) log = logger.child(LOG_OPTS) //create instance logger set LOG_OPTS above
} // end constructor } // end constructor
async create () { async create () {
return new Promise( async (resolve,reject) => { return new Promise( async (resolve,reject) => {
// couple ways to kill socket process when needed
ON_DEATH( async () => { _ON_DEATH( async () => {
log.info('\nhe\'s dead jim') log.info('\nhe\'s dead jim')
await this._destroy() await this._destroy()
}) })
process.once('SIGUSR2', async () => { process.once('SIGUSR2', async () => {
await this._destroy await this._destroy
process.kill(process.pid, 'SIGUSR2') process.kill(process.pid, 'SIGUSR2')
@ -48,11 +49,10 @@ export default class Socket extends Server {
this.on('error', async (err) => { this.on('error', async (err) => {
// recover from socket file that was not removed // recover from socket file that was not removed
if (err.code === 'EADDRINUSE') { if (err.code === 'EADDRINUSE') {
let path = this.listen_opts.path if (this.opts.np) { // if TCP socket should already be dead
if (path) { // if TCP socket should already be dead log.info({socket: this.opts.path}, 'already exists...deleting')
log.info({socket: path}, 'already exists...deleting') await fileDelete(this.opts.path)
await fileDelete(path) return await this._listen(this.opts)
return await this._listen(this.listen_opts)
} }
} }
// otherwise fatally exit // otherwise fatally exit
@ -60,40 +60,37 @@ export default class Socket extends Server {
reject(err) reject(err)
}) })
let [err, res] = await btc(this._listen)(this.listen_opts) let [err, res] = await btc(this._listen)(this.opts)
if (err) reject(err) if (err) reject(err)
resolve(res) resolve(res)
}) // end promise }) // end promise
} // end create } // end create
registerPacketProcessor (func) {
this._packetProcess = func
}
async _listen (opts) { async _listen (opts) {
super.listen(opts, async (err, res) => { super.listen(opts, async (err, res) => {
if (err) return err if (err) return err
// this gets called for each client connection and is unique to each // this gets called for each client connection and is unique to each
this.on('connection', (socket) => { this.on('connection', (socket) => {
const stream = new JSONStream() const stream = new JSONStream()
log.info('new consumer connecting sending handshake') log.info('new consumer connecting sending handshake')
socket.write(stream.serialize({ready:true})) socket.write(stream.serialize({ready:true}))
socket.on('data', stream.onData) socket.on('data', stream.onData)
stream.on('message', messageProcess.bind(this)) stream.on('message', messageProcess.bind(this))
async function messageProcess (packet) { async function messageProcess (packet) {
socket.write(stream.serialize(await this.packet._process(packet))) socket.write(stream.serialize(await this._packetProcess(packet)))
} }
}) // end connecttion consumer
}) // end connected consumer log.info({opts: this.opts},'socket created')
log.info({socket: this.listen_opts},'socket created')
return res return res
}) // end listen callback }) // end super listen callback
} } // end listen
async _destroy () { async _destroy () {
log.info('closing down socket') log.info('closing down socket')
@ -102,8 +99,10 @@ export default class Socket extends Server {
process.exit() process.exit()
} }
registerPacketProcessor (func) { // default packet process, just a simple echo
this.packet._process = func _packetProcess (packet) {
packet.res='echoed'
return packet
} }
} // end class } // end class

View File

@ -1,4 +1,4 @@
export default async function (packet) { export default async function (packet) {
packet.payload = this.port +':'+packet.payload packet.payload = this.opts.port +':'+packet.payload
return packet return packet
} }

View File

@ -4,7 +4,6 @@ import process from './tcp-process'
let socket = new Socket({port:9080, name:'tcp socket 9080'}) let socket = new Socket({port:9080, name:'tcp socket 9080'})
socket.registerPacketProcessor(process) socket.registerPacketProcessor(process)
socket.packet.port = socket.listen_opts.port
; ;
(async () => { (async () => {

View File

@ -1,10 +1,9 @@
import { Socket } from '../../src' import { Socket } from '../../src'
import process from './tcp-process' import process from './tcp-process'
let socket = new Socket({name:'tcp socket', log:false}) let socket = new Socket({name:'tcp socket'})
socket.registerPacketProcessor(process) socket.registerPacketProcessor(process)
socket.packet.port = socket.listen_opts.port
; ;
(async () => { (async () => {

View File

@ -2,9 +2,9 @@ import { Socket } from '../../src'
const USOCKET = __dirname + '/test.sock' const USOCKET = __dirname + '/test.sock'
let socket = new Socket(USOCKET,{name:'unix socket'}) let socket = new Socket({path:USOCKET,name:'default-unix-socket'})
socket.packet.test = 'at socket => ' socket.test = 'at socket => '
socket.registerPacketProcessor(async function (packet) { socket.registerPacketProcessor(async function (packet) {
packet.payload = 'overwrite default processor from instance '+ this.test + packet.payload packet.payload = 'overwrite default processor from instance '+ this.test + packet.payload

View File

@ -2,7 +2,7 @@ import { Socket } from '../../src'
const USOCKET = __dirname + '/test.sock' const USOCKET = __dirname + '/test.sock'
let socket = new Socket(USOCKET,{name:'unix socket', log:false}) let socket = new Socket({path:USOCKET,name:'default-unix-socket'})
; ;
(async () => { (async () => {

View File

@ -7,8 +7,6 @@ const expect = chai.expect
import { Consumer } from '../src' import { Consumer } from '../src'
const delay = time => new Promise(res=>setTimeout(()=>res(),time))
let tcpsocket_default = {} let tcpsocket_default = {}
let tcpsocket_9080 = {} let tcpsocket_9080 = {}
@ -44,7 +42,7 @@ describe('Connects and Processes a payload in a JSON packet via TCP Socket', fun
process.kill(process.pid, 'SIGTERM') process.kill(process.pid, 'SIGTERM')
} }
tcpconsumer_default.packet._process = function (packet) { tcpconsumer_default._packetProcess = function (packet) {
try { try {
expect(packet.payload).to.equal('8080:tcp payload') expect(packet.payload).to.equal('8080:tcp payload')
resolve() resolve()

View File

@ -10,10 +10,10 @@ import { Consumer } from '../src'
const USOCKET = __dirname + '/sockets/test.sock' const USOCKET = __dirname + '/sockets/test.sock'
const SOCKET_FILE = 'usocket-default-overwrite' const SOCKET_FILE = 'usocket-default-overwrite'
let consumer = new Consumer(USOCKET, {name:'unix-consumer'}) let consumer = new Consumer({path:USOCKET,name:'unix-consumer'})
let consumer2 = new Consumer(USOCKET, {name:'unix-consumer2'}) let consumer2 = new Consumer({path:USOCKET, name:'unix-consumer2'})
const delay = time => new Promise(res=>setTimeout(()=>res(),time)) // const delay = time => new Promise(res=>setTimeout(()=>res(),time))
let socket = {} let socket = {}
@ -24,8 +24,6 @@ describe('Connects and Processes a payload via Unix Socket using JSON packet wit
socket.stdout.on('data', function(buf) { socket.stdout.on('data', function(buf) {
console.log('[Socket]', String(buf)) console.log('[Socket]', String(buf))
}) })
await delay(500) // wait for sockets to get going
}) })
after(async function(){ after(async function(){
@ -34,13 +32,13 @@ describe('Connects and Processes a payload via Unix Socket using JSON packet wit
it('Tests JSON packet procssing, 10 packets', async function () { it('Tests JSON packet procssing, 10 packets', async function () {
consumer.packet.times = 0 consumer.times = 0
return new Promise(async function (resolve, reject) { return new Promise(async function (resolve, reject) {
setTimeout(() =>{ reject('10 packets not received in time')},1900) setTimeout(() =>{ reject('10 packets not received in time')},1900)
consumer.packet._process = function (packet) { consumer._packetProcess = function (packet) {
this.times++ this.times++
if (this.times!==11) return if (this.times!==11) return
@ -67,13 +65,13 @@ describe('Connects and Processes a payload via Unix Socket using JSON packet wit
it('unix socket with two consumers alternating packets, 10 packets each', async function () { it('unix socket with two consumers alternating packets, 10 packets each', async function () {
consumer.packet.times = 0 consumer.times = 0
return new Promise(async function (resolve, reject) { return new Promise(async function (resolve, reject) {
setTimeout(() =>{ reject('10 packets not received in time')},1900) setTimeout(() =>{ reject('10 packets not received in time')},1900)
consumer.packet._process = function (packet) { consumer._packetProcess = function (packet) {
this.times++ this.times++
// console.log(this.times,packet.payload) // console.log(this.times,packet.payload)
if (this.times!==11) return if (this.times!==11) return
@ -87,7 +85,7 @@ describe('Connects and Processes a payload via Unix Socket using JSON packet wit
} }
} }
consumer2.packet._process = function (packet) { consumer2._packetProcess = function (packet) {
return packet return packet
} }

View File

@ -10,10 +10,10 @@ import { Consumer } from '../src'
const USOCKET = __dirname + '/sockets/test.sock' const USOCKET = __dirname + '/sockets/test.sock'
const SOCKET_FILE = 'usocket-default' const SOCKET_FILE = 'usocket-default'
let consumer = new Consumer(USOCKET, {name:'unix-consumer', log:false}) let consumer = new Consumer({path:USOCKET,name:'unix-consumer'})
let consumer2 = new Consumer(USOCKET, {name:'unix-consumer2'}) let consumer2 = new Consumer({path:USOCKET, name:'unix-consumer2'})
const delay = time => new Promise(res=>setTimeout(()=>res(),time)) // const delay = time => new Promise(res=>setTimeout(()=>res(),time))
let socket = {} let socket = {}
@ -32,7 +32,7 @@ describe('Connects and Processes a payload via Unix Socket using JSON packet wit
it('Tests unix socket with default echo JSON packet procssing, 10 packets with conect via connect', async function () { it('Tests unix socket with default echo JSON packet procssing, 10 packets with conect via connect', async function () {
consumer.packet.times = 0 consumer.times = 0
return new Promise(async function (resolve, reject) { return new Promise(async function (resolve, reject) {
@ -44,7 +44,7 @@ describe('Connects and Processes a payload via Unix Socket using JSON packet wit
setTimeout(() =>{ reject('10 packets not received in time')},1900) setTimeout(() =>{ reject('10 packets not received in time')},1900)
consumer.packet._process = function (packet) { consumer._packetProcess = function (packet) {
this.times++ this.times++
if (this.times!==11) return if (this.times!==11) return
@ -69,8 +69,8 @@ describe('Connects and Processes a payload via Unix Socket using JSON packet wit
it('unix socket with two consumers alternating packets, 10 packets each with local and added context', async function () { it('unix socket with two consumers alternating packets, 10 packets each with local and added context', async function () {
consumer.packet.times = 0 consumer.times = 0
consumer.packet.test = ':local' consumer.test = ':local'
let [err] = await btc(consumer2.connect)() let [err] = await btc(consumer2.connect)()
if (err) { if (err) {
@ -98,7 +98,7 @@ describe('Connects and Processes a payload via Unix Socket using JSON packet wit
} }
}) })
consumer2.packet._process = function (packet) { consumer2._packetProcess = function (packet) {
return packet return packet
} }