with keepAlive set (by default) if the socket terminates the consumer will attempt to reconnect indefinitely

tls
David Kebler 2018-07-30 19:07:03 -07:00
parent 36d1cf629f
commit 01c4756e26
5 changed files with 83 additions and 10 deletions

24
examples/client-push.js Normal file
View File

@ -0,0 +1,24 @@
import Consumer from '../src/consumer'
// const client1= new Consumer({name:'example-consumer1' })
const client= new Consumer({path:true, name:'example-consumer'})
// let packet = {name: 'client', cmd:'doit', data:'data sent by client'}
// This is your client handler object waiting on a message to do something
const process = async function (packet) {
console.log('====== packet pushed from server ======')
console.dir(packet)
}
client.registerPacketProcessor(process)
;
(async () => {
client.connect()
})().catch(err => {
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)
})

39
examples/server-push.js Normal file
View File

@ -0,0 +1,39 @@
import { Socket } from '../src'
;
(async () => {
class Test extends Socket {
constructor(opts) {
super(opts)
}
async _packetProcess(packet) {
console.log('packet being processed at socket')
if (packet.cmd) return await this[packet.cmd](packet.data,packet.name)
return {error: 'no command in packet', packet: packet }
}
async doit(data,name) {
return new Promise(resolve => {
let res = {}
console.log('data sent to doit = ', data)
res.status ='success'
res.name = name
res.cmd = 'reply'
res.data = 'this would be response data from socket doit function'
resolve(res)
})
}
}
// let test = new Test()
let test = new Test({path:true})
await test.create()
setInterval( () => { test.push({name:'pushed', status:'some pushed data'}) },10000)
})().catch(err => {
console.error('FATAL: UNABLE TO START SYSTEM!\n',err)
})

View File

@ -1,6 +1,6 @@
{
"name": "@uci/socket",
"version": "0.1.4",
"version": "0.1.6",
"description": "JSON packet intra(named)/inter(TCP) host communication over socket",
"main": "src",
"scripts": {
@ -9,8 +9,10 @@
"testlog": "DEBUG=true mocha -r esm --timeout 10000 test/*.test.mjs",
"testci": "istanbul cover ./node_modules/.bin/_mocha --report lcovonly -- -R spec --recursive && codecov || true",
"s": "DEBUG=true node -r esm examples/server",
"sp": "DEBUG=true node -r esm examples/server-push",
"devs": "SOCKETS_DIR=/opt/sockets DEBUG=true ./node_modules/.bin/nodemon -r esm-e mjs examples/server",
"c": "DEBUG=true node -r esm examples/client",
"cp": "DEBUG=true node -r esm examples/client-push",
"devc": "SOCKETS_DIR=/opt/sockets DEBUG=true node -r esm examples/client",
"c2": "node -r esm examples/client2"
},

View File

@ -26,7 +26,7 @@ export default class Consumer extends Socket {
if (path.dirname(opts.path)==='.') opts.path = path.join(DEFAULT_PIPE_DIR,opts.path )
}
this.opts=opts
this.keepAlive = opts.keepAlive ? opts.keepAlive : true
this.keepAlive = 'keepAlive' in opts ? opts.keepAlive : true
this._ready = false
this.timeout = opts.timeout || 500
this.wait = opts.wait || 5
@ -66,18 +66,25 @@ export default class Consumer extends Socket {
})
this.on('error', async (err) => {
log.warn({error:err.code},`connect error ${err.code}`)
if (err.code === 'EISCONN') {
return resolve('ready')
}
log.warn(err.code)
setTimeout(() =>{
// log.warn(`retrying connect to ${this.opts}`)
connect()
}
,this.wait*100)
setTimeout( () =>{ connect() },this.wait*100)
})
connect()
this.on('end', async () => {
log.warn('socket (server) terminated unexpectantly')
if (this.keepAlive) {
log.info('keep alive was set, so waiting on server to come online for reconnect')
this.destroy()
this.emit('error', {code:'DISCONNECTED'})
}
})
connect() // initial connect request
}) //end promise

View File

@ -53,7 +53,7 @@ export default class Socket extends Server {
// recover from socket file that was not removed
if (err.code === 'EADDRINUSE') {
if (this.opts.path) { // if TCP socket should already be dead
let [err, res] = log.info(await btc(promisify(fileDelete))(this.opts.path))
let [err, res] = await btc(promisify(fileDelete))(this.opts.path)
if(!err) {
log.info({res:res, socket: this.opts.path}, 'socket already exists.....deleted')
return await this._listen(this.opts)
@ -161,6 +161,7 @@ export default class Socket extends Server {
async push (packet,id) {
packet._header = { id:'pushed'}
log.info({opts:this.opts,packet:packet},'pushing a packet to all connected consumers')
this.clients.forEach(async (client) => {
if (client.writable) {
let [err, ser] = await btc(client.stream.serialize)(packet)