added option for tcp socket so now module is just uci-socket. No authentification nor https at this point

tls
David Kebler 2018-01-16 12:43:36 -08:00
parent 2b14508d6f
commit f0c6e96d02
3 changed files with 93 additions and 33 deletions

View File

@ -5,12 +5,22 @@ import bunyan from 'bunyan'
export default class Consumer extends Socket { export default class Consumer extends Socket {
constructor (path, opts={}) { constructor (path, opts={}) {
super() super()
this.path = path // set or tcp socket
if (typeof(path)!=='string') {
opts = path
this.host = '127.0.0.1'
this.port = opts.port || 8080
} else {
if (opts.tcp) {
this.host = path
this.port = opts.port || 8080
} else this.path = path
}
this.keepAlive = opts.keepAlive ? opts.keepAlive : true this.keepAlive = opts.keepAlive ? opts.keepAlive : true
this._ready = false this._ready = false
this.timeout = opts.timeout || 1000 this.timeout = opts.timeout || 1000
this.wait = opts.wait || 30 this.wait = opts.wait || 30
this.log_file=opts.log_file || './log.log' this.log_file=opts.log_file || './socket.log'
this.log_opts = {streams:[]} this.log_opts = {streams:[]}
this.log_opts.name = opts.name ? opts.name : 'uci-unix-socket-consumer' this.log_opts.name = opts.name ? opts.name : 'uci-unix-socket-consumer'
this.log_opts.streams.push({level: 'info',path: this.log_file }) this.log_opts.streams.push({level: 'info',path: this.log_file })
@ -21,17 +31,17 @@ export default class Consumer extends Socket {
ready() {return this._ready} ready() {return this._ready}
async connect (app) { async connect (app) {
await this.listen(app) await this.listen(app)
this.log.info('listening') this.log.info('listening')
return new Promise( (resolve,reject) => { return new Promise( (resolve,reject) => {
this.on('error', (err) => { this.on('error', (err) => {
console.log(err.code)
reject(err) reject(err)
}) })
super.connect({ path: this.path }, async () => { super.connect({ port:this.port, host:this.host, path: this.path }, async () => {
this.log.info({path:this.path},'connecting') this.log.info({path:this.path},'connecting')
this.setKeepAlive(this.keepAlive) this.setKeepAlive(this.keepAlive)
let [err, res] = await btc(isReady).bind(this)(this.ready.bind(this), this.wait, this.timeout) let [err, res] = await btc(isReady).bind(this)(this.ready.bind(this), this.wait, this.timeout)
@ -73,7 +83,7 @@ export default class Consumer extends Socket {
this.log.info({packet:packet},'incoming packet from socket') this.log.info({packet:packet},'incoming packet from socket')
return packet } return packet }
} }
app[app.ucpp](packet) // process the packet await app[app.ucpp](packet) // process the packet
} }
else { this.log.info({buf: buf.toString()},'bad packet JSON syntax')} else { this.log.info({buf: buf.toString()},'bad packet JSON syntax')}
}) })

View File

@ -7,8 +7,16 @@ import bunyan from 'bunyan'
export default class Socket extends Server { export default class Socket extends Server {
constructor (path, opts={}) { constructor (path, opts={}) {
super() super()
this.path = path // set or tcp socket
this.log_file=opts.log_file || './log.log' if (typeof(path)!=='string') {
opts = path
this.listen_opts = { host: '127.0.0.1', port: opts.port || 8080}
} else {
if (opts.tcp) {
this.listen_opts = { host: path, port: opts.port || 8080}
} else this.listen_opts = { path: path }
}
this.log_file=opts.log_file || './socket.log'
this.log_opts = {streams:[]} this.log_opts = {streams:[]}
this.log_opts.name = opts.name ? opts.name : 'uci-unix-socket-consumer' this.log_opts.name = opts.name ? opts.name : 'uci-unix-socket-consumer'
this.log_opts.streams.push({level: 'info',path: this.log_file }) this.log_opts.streams.push({level: 'info',path: this.log_file })
@ -45,7 +53,7 @@ export default class Socket extends Server {
reject(err) reject(err)
}) })
this.listen(this.path, async (err, res) => { this.listen(this.listen_opts, async (err, res) => {
if (err) reject(err) if (err) reject(err)
// this gets called for each client connection and is unique to each // this gets called for each client connection and is unique to each
@ -55,7 +63,7 @@ export default class Socket extends Server {
socket.on('data', async (buf) => { socket.on('data', async (buf) => {
let [err, packet] = btc(JSON.parse)(buf.toString()) let [err, packet] = btc(JSON.parse)(buf.toString())
if (!err) { if (!err) {
if (this.log) this.logger.info(`data packet received to socket \n ${packet}`) if (this.log) this.log.info(`data packet received to socket \n ${packet}`)
this.log.info({packet:packet},'Server: packet received to socket') this.log.info({packet:packet},'Server: packet received to socket')
// set default packet processing // set default packet processing

View File

@ -1,6 +1,7 @@
import { spawn } from 'child_process' import { spawn } from 'child_process'
import chai from 'chai' import chai from 'chai'
import chaiAsPromised from 'chai-as-promised' import chaiAsPromised from 'chai-as-promised'
import btc from 'better-try-catch'
chai.use(chaiAsPromised) chai.use(chaiAsPromised)
const expect = chai.expect const expect = chai.expect
@ -8,42 +9,83 @@ import { Consumer } from '../src'
const USOCKET = __dirname + '/sample.sock' const USOCKET = __dirname + '/sample.sock'
let consumer = new Consumer(USOCKET) let uconsumer = new Consumer(USOCKET, {name:'test-uconsumer'})
let tcpconsumer = new Consumer('localhost',{port: 8081, tcp:true, name:'test-tcpconsumer'})
const app = {
ucpp: 'cprocessPacket',
cprocessPacket: async function (packet) {
if (packet.processed) consumer.emit(packet.cmd,packet.payload)
}
}
const delay = time => new Promise(res=>setTimeout(()=>res(),time)) const delay = time => new Promise(res=>setTimeout(()=>res(),time))
; let usocket = {}
(async () => { let tcpsocket = {}
let socket ={} describe('Connects and Processes a payload in a JSON packet', function(){
before(async function(){ before(async function(){
socket = spawn('node',['-r', '@std/esm', './test/socket']) usocket = spawn('node',['-r', '@std/esm', './test/usocket'])
await delay(500) // wait for socket to get going usocket.stdout.on('data', function(buf) {
console.log('[Socket]', String(buf))
})
tcpsocket = spawn('node',['-r', '@std/esm', './test/tcpsocket'])
tcpsocket.stdout.on('data', function(buf) {
console.log('[Socket]', String(buf))
})
await delay(500) // wait for sockets to get going
}) })
after(async function(){ after(async function(){
socket.kill() usocket.kill()
tcpsocket.kill()
}) })
it('Connects and Processes some payload', async function () { it('via unix socket', async function () {
console.log('connection is ',await consumer.connect(app)) return new Promise(async function (resolve, reject) {
consumer.on('test1', function(payload){
expect(payload).to.equal('payload1')
})
let packet = {id: 'test consumer', cmd:'test1', payload:'payload1'}
consumer.send(packet)
})
const app = {
processPacket: function (packet) {
try {
expect(packet.payload).to.equal('processed unix payload')
resolve()
}
catch(error) {
reject(error)
}
}
}
let [err] = await btc(uconsumer.connect.bind(uconsumer))(app)
if (err) reject(err)
let packet = {payload:'unix payload'}
uconsumer.send(packet)
}) //end promise
}) // end unix socket test
it('via tcp socket', async function () {
return new Promise(async function (resolve, reject) {
const app = {
processPacket: function (packet) {
try {
expect(packet.payload).to.equal('processed tcp payload')
resolve()
}
catch(error) {
reject(error)
}
}
}
let [err] = await btc(tcpconsumer.connect.bind(tcpconsumer))(app)
if (err) reject(err)
let packet = {payload:'tcp payload'}
tcpconsumer.send(packet)
}) //end promise
}) // end unix socket test
})().catch(err => {
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)
}) })