diff --git a/.npmignore b/.npmignore index f16fc41..66591e2 100644 --- a/.npmignore +++ b/.npmignore @@ -2,3 +2,7 @@ tests/ test/ *.test.js testing/ +examples/ +yarn.lock +travis.yml +.eslintrc.js diff --git a/examples/client-push.js b/examples/client-push.js index 3125038..80296b6 100644 --- a/examples/client-push.js +++ b/examples/client-push.js @@ -7,8 +7,17 @@ const client= new Consumer({path:true, name:'example-consumer'}) // This is your client handler object waiting on a message to do something const process = async function (packet) { + // return new Promise((resolve) => { + // console.log('====== packet pushed from server ======') + // console.dir(packet) + // // setTimeout(resolve('done'),100) + // resolve('done') + // }) console.log('====== packet pushed from server ======') console.dir(packet) + // setTimeout(resolve('done'),100) + return Promise.resolve('done') + } client.registerPacketProcessor(process) @@ -16,7 +25,7 @@ client.registerPacketProcessor(process) ; (async () => { - client.connect() + console.log(await client.connect()) })().catch(err => { diff --git a/examples/server-push.js b/examples/server-push.js index fcba02c..e0f26e7 100644 --- a/examples/server-push.js +++ b/examples/server-push.js @@ -30,9 +30,12 @@ import { Socket } from '../src' } // let test = new Test() - let test = new Test({path:true}) + let test = new Test({path:true, conPacket:{onconnect:'this is a packet sent to consumer as soon as it connects'}}) await test.create() - setInterval( () => { test.push({name:'pushed', status:'some pushed data'}) },10000) + let count = 0 + setInterval( () => { + count++ + test.push({name:'pushed', count:count, status:'some pushed data'}) },10000) })().catch(err => { console.error('FATAL: UNABLE TO START SYSTEM!\n',err) diff --git a/package.json b/package.json index 29b72f5..af1327e 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@uci/socket", - "version": "0.1.6", + "version": "0.1.9", "description": "JSON packet intra(named)/inter(TCP) host communication over socket", "main": "src", "scripts": { diff --git a/src/consumer.js b/src/consumer.js index 1696afd..51c8ae3 100644 --- a/src/consumer.js +++ b/src/consumer.js @@ -28,8 +28,8 @@ export default class Consumer extends Socket { this.opts=opts this.keepAlive = 'keepAlive' in opts ? opts.keepAlive : true this._ready = false - this.timeout = opts.timeout || 500 - this.wait = opts.wait || 5 + this.timeout = opts.timeout || 30 + this.wait = opts.wait || 1 this.stream = new JsonStream() // bind to class for other class functions this.connect = this.connect.bind(this) @@ -47,10 +47,13 @@ export default class Consumer extends Socket { super.connect(this.opts) } + let reconnect = {} const timeout = setTimeout(() =>{ - reject({opts:this.opts},`unable to connect in ${this.timeout*10}ms`) + clearTimeout(reconnect) + log.fatal({opts:this.opts},`unable to connect in ${this.timeout}s`) + reject({opts:this.opts},`unable to connect to socket server in ${this.timeout}secs`) } - ,this.timeout*10) + ,this.timeout*1000) this.once('connect', async () => { clearTimeout(timeout) @@ -71,7 +74,8 @@ export default class Consumer extends Socket { return resolve('ready') } - setTimeout( () =>{ connect() },this.wait*100) + reconnect = setTimeout( () =>{ connect() + },this.wait*1000 ) }) @@ -105,14 +109,12 @@ export default class Consumer extends Socket { } let [err, res] = await btc(this.stream.serialize)(packet) if (err) resolve({error:'unable to serialize packet for sending', packet:packet}) - log.info(await this.__write(res)) - // console.log('listerner set', packet._header.id, packet._header.sender.instanceID) + await this.__write(res) this.once(packet._header.id,async function(reply){ - // console.log('reply emitted',reply) let res = await this._packetProcess(reply) if (!res) { // if process was not promise returning like just logged to console res = reply - log.warn('consumer function was not promise returning further processing may be out of sequence') + // log.warn('consumer function was not promise returning further processing may be out of sequence') } resolve(res) }) //end listener @@ -131,7 +133,7 @@ export default class Consumer extends Socket { async __write(packet) { // timeout already set if sockect can't be drained in 10 secs return new Promise(resolve => { - const cb = () => resolve('=====>packet written to consumer side socket stream ') + const cb = () => resolve('packet written to consumer side socket stream ') if (!super.write(packet)) { this.once('drain',cb ) } else { @@ -150,7 +152,7 @@ export default class Consumer extends Socket { // TODO do some extra security here? let res = await this._packetProcess(packet) if (!res) { // if process was not promise returning like just logged to console - log.warn('consumer function was not promise returning') + // log.warn('consumer function was not promise returning') } }) // listen on socket stream diff --git a/src/socket.js b/src/socket.js index 417f137..f6bac98 100644 --- a/src/socket.js +++ b/src/socket.js @@ -97,6 +97,11 @@ export default class Socket extends Server { // TODO add 'close' listener to socket to remove from this.clients log.info('new consumer connecting') log.info(await send(await stream.serialize({'_handshake':true}))) + if (this.opts.conPacket) { + this.opts.conPacket._header = { id:'pushed'} + log.info({conPacket:this.opts.conPacket},'pushing a preset command to just connected consumer') + send(await stream.serialize(this.opts.conPacket)) // send a packet command on to consumer on connection + } socket.on('data', stream.onData) // TODO need to start error listener for stream so errors can be processed stream.on('message', messageProcess.bind(this,socket))