refactor options processing, remove opts.np
parent
283957f641
commit
1f45aec2fa
|
@ -1,19 +1,16 @@
|
|||
import Consumer from '../src/consumer'
|
||||
|
||||
const client1= new Consumer({np:true,name:'example-consumer1' })
|
||||
const client2 = new Consumer({np:true,name:'example-consumer2'})
|
||||
// const client1= new Consumer({name:'example-consumer1' })
|
||||
const client= new Consumer({path:true, name:'example-consumer' })
|
||||
|
||||
let packet1 = {name: 'client1', cmd:'doit', data:'data sent by client1'}
|
||||
let packet2 = {name: 'client2', cmd:'doit', data:'data sent by client2'}
|
||||
let packet = {name: 'client', cmd:'doit', data:'data sent by client'}
|
||||
|
||||
// This is your client handler object waiting on a message to do something
|
||||
const process = function (packet) {
|
||||
console.log(`Packet from ${packet.name} Processed by Socket: ${packet.status}`)
|
||||
}
|
||||
|
||||
client1.registerPacketProcessor(process)
|
||||
//
|
||||
// client2._packetProcess = process
|
||||
client.registerPacketProcessor(process)
|
||||
|
||||
;
|
||||
(async () => {
|
||||
|
@ -21,13 +18,9 @@ client1.registerPacketProcessor(process)
|
|||
|
||||
|
||||
// await Promise.all([client1.connect(),client2.connect()])
|
||||
await client1.connect()
|
||||
console.log('=========\n',await client1.send(packet1))
|
||||
|
||||
// client1.send(packet1)
|
||||
// client2.send(packet2)
|
||||
// client1.end()
|
||||
// client2.end()
|
||||
await client.connect()
|
||||
console.log('=========\n',await client.send(packet))
|
||||
client.end()
|
||||
|
||||
})().catch(err => {
|
||||
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)
|
||||
|
|
|
@ -19,8 +19,8 @@ class Client extends Consumer {
|
|||
|
||||
}
|
||||
|
||||
const client1= new Client({path:USOCKET,name:'example-consumer1' })
|
||||
const client2 = new Client({path:USOCKET,name:'example-consumer2'})
|
||||
const client1= new Client({path:true,name:'example-consumer1' })
|
||||
const client2 = new Client({path:true,name:'example-consumer2'})
|
||||
|
||||
let packet1 = {name: 'client1', cmd:'doit', data:'data sent by client1'}
|
||||
let packet2 = {name: 'client2', cmd:'doit', data:'data sent by client2'}
|
||||
|
@ -29,7 +29,7 @@ let packet2 = {name: 'client2', cmd:'doit', data:'data sent by client2'}
|
|||
(async () => {
|
||||
|
||||
await Promise.all([client1.connect(),client2.connect()])
|
||||
await Promise.all([client1.send(packet1),client2.send(packet2)])
|
||||
console.log(await Promise.all([client1.send(packet1),client2.send(packet2)]))
|
||||
client1.end()
|
||||
client2.end()
|
||||
})().catch(err => {
|
||||
|
|
|
@ -29,7 +29,8 @@ import { Socket } from '../src'
|
|||
|
||||
}
|
||||
|
||||
let test = new Test({np:true})
|
||||
// let test = new Test()
|
||||
let test = new Test({path:true})
|
||||
await test.create()
|
||||
|
||||
})().catch(err => {
|
||||
|
|
11
package.json
11
package.json
|
@ -5,12 +5,12 @@
|
|||
"main": "src",
|
||||
"scripts": {
|
||||
"testw": "mocha -r @std/esm test/*.test.mjs --watch --recurse --watch-extensions mjs",
|
||||
"test": "mocha -r @std/esm test/*.test.mjs",
|
||||
"testlog": "DEBUG=true mocha -r @std/esm test/*.test.mjs",
|
||||
"test": "mocha -r @std/esm --timeout 10000 test/*.test.mjs",
|
||||
"testlog": "DEBUG=true mocha -r @std/esm --timeout 10000 test/*.test.mjs",
|
||||
"testci": "istanbul cover ./node_modules/.bin/_mocha --report lcovonly -- -R spec --recursive && codecov || true",
|
||||
"s": "SOCKETS_DIR=/opt/sockets DEBUG=true node -r @std/esm examples/server",
|
||||
"s": "DEBUG=true node -r @std/esm examples/server",
|
||||
"devs": "SOCKETS_DIR=/opt/sockets DEBUG=true ./node_modules/.bin/nodemon -r @std/esm -e mjs examples/server",
|
||||
"c": "SOCKETS_DIR=/opt/sockets node -r @std/esm examples/client",
|
||||
"c": "DEBUG=true node -r @std/esm examples/client",
|
||||
"devc": "SOCKETS_DIR=/opt/sockets DEBUG=true node -r @std/esm examples/client",
|
||||
"c2": "node -r @std/esm examples/client2"
|
||||
},
|
||||
|
@ -48,6 +48,7 @@
|
|||
"dependencies": {
|
||||
"better-try-catch": "^0.6.2",
|
||||
"clone": "^2.1.1",
|
||||
"death": "^1.1.0"
|
||||
"death": "^1.1.0",
|
||||
"make-dir": "^1.2.0"
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
import { Socket } from 'net'
|
||||
import path from 'path'
|
||||
import btc from 'better-try-catch'
|
||||
import JsonStream from './json-stream'
|
||||
|
||||
|
@ -15,24 +16,29 @@ const LOG_OPTS = {
|
|||
instance_created:new Date().getTime()
|
||||
}
|
||||
|
||||
const DEFAULT_PIPE = (process.env.SOCKETS_DIR || __dirname) + '/uci-socket.sock'
|
||||
// TODO change default pipe dir for windows and mac os
|
||||
const DEFAULT_PIPE_DIR = (process.env.SOCKETS_DIR || '/tmp/UCI')
|
||||
const DEFAULT_SOCKET_NAME = 'uci-sock'
|
||||
|
||||
export default class Consumer extends Socket {
|
||||
constructor (opts={}) {
|
||||
super()
|
||||
log = logger.child(LOG_OPTS)
|
||||
this.id = opts.id || opts.name || 'socket:'+ new Date().getTime()
|
||||
if (!opts.path && opts.np) opts.path = DEFAULT_PIPE
|
||||
if (!opts.path) {
|
||||
log.warn({opts:opts},'no host supplied using localhost...use named piped instead')
|
||||
opts.host = opts.host || '127.0.0.1'
|
||||
opts.port = opts.port || 8080
|
||||
} else opts.np = true
|
||||
} else {
|
||||
if (typeof opts.path === 'boolean') opts.path = path.join(DEFAULT_PIPE_DIR,DEFAULT_SOCKET_NAME )
|
||||
if (path.dirname(opts.path)==='.') opts.path = path.join(DEFAULT_PIPE_DIR,opts.path )
|
||||
}
|
||||
this.opts=opts
|
||||
this.keepAlive = opts.keepAlive ? opts.keepAlive : true
|
||||
this._ready = false
|
||||
this.timeout = opts.timeout || 500
|
||||
this.wait = opts.wait || 5
|
||||
this.stream = new JsonStream()
|
||||
log = logger.child(LOG_OPTS)
|
||||
// bind to class for other class functions
|
||||
this.connect = this.connect.bind(this)
|
||||
this.__ready = this.__ready.bind(this)
|
||||
|
@ -41,9 +47,6 @@ export default class Consumer extends Socket {
|
|||
|
||||
async connect () {
|
||||
|
||||
// if (context) this.packet.context = context
|
||||
// else this.packet.context = this
|
||||
|
||||
return new Promise( (resolve,reject) => {
|
||||
|
||||
const connect = () => {
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
import { Server } from 'net'
|
||||
import { unlink as fileDelete } from 'fs'
|
||||
import path from 'path'
|
||||
import mkdir from 'make-dir'
|
||||
import btc from 'better-try-catch'
|
||||
import _ON_DEATH from 'death' //this is intentionally ugly
|
||||
import JSONStream from './json-stream'
|
||||
|
@ -16,23 +18,26 @@ const LOG_OPTS = {
|
|||
id:this.id,
|
||||
instance_created:new Date().getTime()
|
||||
}
|
||||
const DEFAULT_PIPE = (process.env.SOCKETS_DIR || __dirname) + '/uci-socket.sock'
|
||||
// TODO change default pipe dir for windows and mac os
|
||||
const DEFAULT_PIPE_DIR = (process.env.SOCKETS_DIR || '/tmp/UCI')
|
||||
const DEFAULT_SOCKET_NAME = 'uci-sock'
|
||||
|
||||
export default class Socket extends Server {
|
||||
constructor (opts={}) {
|
||||
super()
|
||||
this.id = opts.id || opts.name || 'socket:'+ new Date().getTime()
|
||||
if (!opts.path && opts.np) opts.path = DEFAULT_PIPE
|
||||
if (!opts.path) {
|
||||
opts.host = opts.host || '0.0.0.0'
|
||||
opts.port = opts.port || 8080
|
||||
} else opts.np = true
|
||||
this.opts = opts
|
||||
} else {
|
||||
if (typeof opts.path === 'boolean') opts.path = path.join(DEFAULT_PIPE_DIR,DEFAULT_SOCKET_NAME )
|
||||
if (path.dirname(opts.path)==='.') opts.path = path.join(DEFAULT_PIPE_DIR,opts.path )
|
||||
}
|
||||
this.opts = opts // for use to recover from selected errors
|
||||
//self bindings
|
||||
this._listen = this._listen.bind(this)
|
||||
this.create = this.create.bind(this)
|
||||
log = logger.child(LOG_OPTS) //create instance logger set LOG_OPTS above
|
||||
|
||||
} // end constructor
|
||||
|
||||
async create () {
|
||||
|
@ -51,12 +56,19 @@ export default class Socket extends Server {
|
|||
this.once('error', async (err) => {
|
||||
// recover from socket file that was not removed
|
||||
if (err.code === 'EADDRINUSE') {
|
||||
if (this.opts.np) { // if TCP socket should already be dead
|
||||
log.info({socket: this.opts.path}, 'already exists...deleting')
|
||||
if (this.opts.path) { // if TCP socket should already be dead
|
||||
log.warn({socket: this.opts.path}, 'socket already exists...deleting')
|
||||
await fileDelete(this.opts.path)
|
||||
return await this._listen(this.opts)
|
||||
}
|
||||
}
|
||||
if (err.code ==='EACCES'){
|
||||
console.log({socket: this.opts.path}, 'directory does not exist...creating')
|
||||
await mkdir(path.dirname(this.opts.path))
|
||||
console.log({socket: this.opts.path}, 'created')
|
||||
log.warn({socket: this.opts.path}, 'directory does not exist...creating')
|
||||
return await this._listen(this.opts)
|
||||
}
|
||||
// otherwise fatally exit
|
||||
log.info(err, 'creating socket')
|
||||
reject(err)
|
||||
|
|
|
@ -1,8 +1,6 @@
|
|||
import { Socket } from '../../src'
|
||||
|
||||
const USOCKET = __dirname + '/test.sock'
|
||||
|
||||
let socket = new Socket({path:USOCKET,name:'default-unix-socket'})
|
||||
let socket = new Socket({path:true,name:'default-unix-socket'})
|
||||
|
||||
;
|
||||
(async () => {
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
import { Socket } from '../../src'
|
||||
|
||||
const USOCKET = __dirname + '/test.sock'
|
||||
const USOCKET = 'usocket'
|
||||
|
||||
let socket = new Socket({path:USOCKET,name:'default-unix-socket'})
|
||||
|
||||
|
|
|
@ -31,7 +31,7 @@ describe('Connects and Processes a payload in a JSON packet via TCP Socket', fun
|
|||
})
|
||||
|
||||
it('with default host and port', async function () {
|
||||
let tcpconsumer_default = new Consumer({name:'tcpconsumer', log:false})
|
||||
let tcpconsumer_default = new Consumer({name:'tcpconsumer'})
|
||||
|
||||
let [err] = await btc(tcpconsumer_default.connect)()
|
||||
if (err) {
|
||||
|
|
|
@ -7,13 +7,10 @@ const expect = chai.expect
|
|||
|
||||
import { Consumer } from '../src'
|
||||
|
||||
const USOCKET = __dirname + '/sockets/test.sock'
|
||||
const SOCKET_FILE = 'usocket-default'
|
||||
|
||||
let consumer = new Consumer({path:USOCKET,name:'unix-consumer'})
|
||||
let consumer2 = new Consumer({path:USOCKET, name:'unix-consumer2'})
|
||||
|
||||
// const delay = time => new Promise(res=>setTimeout(()=>res(),time))
|
||||
let consumer = new Consumer({path:true,name:'unix-consumer'})
|
||||
let consumer2 = new Consumer({path:true, name:'unix-consumer2'})
|
||||
|
||||
let socket = {}
|
||||
|
||||
|
@ -30,7 +27,7 @@ describe('Connects and Processes a payload via Unix Socket using JSON packet wit
|
|||
socket.kill()
|
||||
})
|
||||
|
||||
const TIMES = 5000
|
||||
const TIMES = 3000
|
||||
|
||||
it(`Tests unix socket with default echo JSON packet procssing with ${TIMES} packets sent`, async function () {
|
||||
|
||||
|
|
|
@ -7,10 +7,9 @@ const expect = chai.expect
|
|||
|
||||
import { Consumer } from '../src'
|
||||
|
||||
const USOCKET = __dirname + '/sockets/test.sock'
|
||||
const SOCKET_FILE = 'usocket'
|
||||
|
||||
let consumer = new Consumer({path:USOCKET,name:'unix-consumer'})
|
||||
let consumer = new Consumer({path:SOCKET_FILE,name:'unix-consumer'})
|
||||
|
||||
// const delay = time => new Promise(res=>setTimeout(()=>res(),time))
|
||||
|
||||
|
|
Loading…
Reference in New Issue