diff --git a/examples/client-push.js b/examples/client-push.js new file mode 100644 index 0000000..3125038 --- /dev/null +++ b/examples/client-push.js @@ -0,0 +1,24 @@ +import Consumer from '../src/consumer' + +// const client1= new Consumer({name:'example-consumer1' }) +const client= new Consumer({path:true, name:'example-consumer'}) + +// let packet = {name: 'client', cmd:'doit', data:'data sent by client'} + +// This is your client handler object waiting on a message to do something +const process = async function (packet) { + console.log('====== packet pushed from server ======') + console.dir(packet) +} + +client.registerPacketProcessor(process) + +; +(async () => { + + client.connect() + + +})().catch(err => { + console.error('FATAL: UNABLE TO START SYSTEM!\n',err) +}) diff --git a/examples/server-push.js b/examples/server-push.js new file mode 100644 index 0000000..fcba02c --- /dev/null +++ b/examples/server-push.js @@ -0,0 +1,39 @@ +import { Socket } from '../src' + + ; +(async () => { + + class Test extends Socket { + constructor(opts) { + super(opts) + } + + async _packetProcess(packet) { + console.log('packet being processed at socket') + if (packet.cmd) return await this[packet.cmd](packet.data,packet.name) + return {error: 'no command in packet', packet: packet } + } + + async doit(data,name) { + return new Promise(resolve => { + let res = {} + console.log('data sent to doit = ', data) + res.status ='success' + res.name = name + res.cmd = 'reply' + res.data = 'this would be response data from socket doit function' + resolve(res) + }) + + } + + } + + // let test = new Test() + let test = new Test({path:true}) + await test.create() + setInterval( () => { test.push({name:'pushed', status:'some pushed data'}) },10000) + +})().catch(err => { + console.error('FATAL: UNABLE TO START SYSTEM!\n',err) +}) diff --git a/package.json b/package.json index d8db821..29b72f5 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@uci/socket", - "version": "0.1.4", + "version": "0.1.6", "description": "JSON packet intra(named)/inter(TCP) host communication over socket", "main": "src", "scripts": { @@ -9,8 +9,10 @@ "testlog": "DEBUG=true mocha -r esm --timeout 10000 test/*.test.mjs", "testci": "istanbul cover ./node_modules/.bin/_mocha --report lcovonly -- -R spec --recursive && codecov || true", "s": "DEBUG=true node -r esm examples/server", + "sp": "DEBUG=true node -r esm examples/server-push", "devs": "SOCKETS_DIR=/opt/sockets DEBUG=true ./node_modules/.bin/nodemon -r esm-e mjs examples/server", "c": "DEBUG=true node -r esm examples/client", + "cp": "DEBUG=true node -r esm examples/client-push", "devc": "SOCKETS_DIR=/opt/sockets DEBUG=true node -r esm examples/client", "c2": "node -r esm examples/client2" }, diff --git a/src/consumer.js b/src/consumer.js index fc7f31a..1696afd 100644 --- a/src/consumer.js +++ b/src/consumer.js @@ -26,7 +26,7 @@ export default class Consumer extends Socket { if (path.dirname(opts.path)==='.') opts.path = path.join(DEFAULT_PIPE_DIR,opts.path ) } this.opts=opts - this.keepAlive = opts.keepAlive ? opts.keepAlive : true + this.keepAlive = 'keepAlive' in opts ? opts.keepAlive : true this._ready = false this.timeout = opts.timeout || 500 this.wait = opts.wait || 5 @@ -66,18 +66,25 @@ export default class Consumer extends Socket { }) this.on('error', async (err) => { + log.warn({error:err.code},`connect error ${err.code}`) if (err.code === 'EISCONN') { return resolve('ready') } - log.warn(err.code) - setTimeout(() =>{ - // log.warn(`retrying connect to ${this.opts}`) - connect() - } - ,this.wait*100) + + setTimeout( () =>{ connect() },this.wait*100) + }) - connect() + this.on('end', async () => { + log.warn('socket (server) terminated unexpectantly') + if (this.keepAlive) { + log.info('keep alive was set, so waiting on server to come online for reconnect') + this.destroy() + this.emit('error', {code:'DISCONNECTED'}) + } + }) + + connect() // initial connect request }) //end promise diff --git a/src/socket.js b/src/socket.js index b4268fa..417f137 100644 --- a/src/socket.js +++ b/src/socket.js @@ -53,7 +53,7 @@ export default class Socket extends Server { // recover from socket file that was not removed if (err.code === 'EADDRINUSE') { if (this.opts.path) { // if TCP socket should already be dead - let [err, res] = log.info(await btc(promisify(fileDelete))(this.opts.path)) + let [err, res] = await btc(promisify(fileDelete))(this.opts.path) if(!err) { log.info({res:res, socket: this.opts.path}, 'socket already exists.....deleted') return await this._listen(this.opts) @@ -161,6 +161,7 @@ export default class Socket extends Server { async push (packet,id) { packet._header = { id:'pushed'} + log.info({opts:this.opts,packet:packet},'pushing a packet to all connected consumers') this.clients.forEach(async (client) => { if (client.writable) { let [err, ser] = await btc(client.stream.serialize)(packet)