From 87e94b765b6ae965820f89ba0406e0c1cee27cc7 Mon Sep 17 00:00:00 2001 From: David Kebler Date: Thu, 16 Jan 2020 22:12:30 -0800 Subject: [PATCH] 0.3.12 support for multiple conPackets as an array move to using _listening to store state as .listening already used by ws emit 'socket' for socket listening state --- examples/server.js | 86 ---------------------------------------------- package.json | 2 +- src/socket.js | 35 ++++++++++++++----- 3 files changed, 28 insertions(+), 95 deletions(-) delete mode 100644 examples/server.js diff --git a/examples/server.js b/examples/server.js deleted file mode 100644 index 384b075..0000000 --- a/examples/server.js +++ /dev/null @@ -1,86 +0,0 @@ -import Socket from '../src' - -const ANONYMOUS = true -const PORT = 8090 - -function packetProcess (packet) { - - const cmd = { - ack: function(packet){ - return new Promise( async (resolve) => { - packet.ack = true - packet.sender= packet.sender || (packet._header ? packet._header.sender.name : 'unknown') - packet.msg=`this is acknlowdgement that ${packet.sender} ack was received` - this.push(packet) // push to all active socket servers - return resolve(packet) // this goes back to sender - }) - }, - switch:{ - status: function(packet){ - return new Promise( async (resolve) => { - packet.cmd='switch/status' - packet.state=this.switches[packet.id-1] - packet.sender= packet.sender || (packet._header ? packet._header.sender.name : 'unknown') - this.push(packet) // push to all active socket servers - let res = { response:'status pushed on to all clients'} - return resolve(res) // this goes back to sender - }) - }, - on: function(packet){ - return new Promise( async (resolve) => { - packet.cmd='switch/status' - packet.state='on' - this.switches[packet.id-1] = 'on' - packet.sender= packet.sender || (packet._header ? packet._header.sender.name : 'unknown') - this.push(packet) // push to all active socket servers - let res = { response:'status change - pushed on to all clients', id:packet.id} - return resolve(res) // this goes back to sender - }) - }, - off: function(packet){ - return new Promise( async (resolve) => { - packet.cmd='switch/status' - packet.state='off' - this.switches[packet.id-1] = 'off' - packet.sender= packet.sender || (packet._header ? packet._header.sender.name : 'unknown') - this.push(packet) // push to all active socket servers - let res = { response:'status change - pushed off to all clients'} - return resolve(res) // this goes back to sender - }) - }, - toggle: function(packet){ - return new Promise( async (resolve) => { - this.switches[packet.id-1] = this.switches[packet.id-1]==='on' ? 'off' : 'on' - packet.state = this.switches[packet.id-1] - packet.cmd ='switch/status' - packet.sender= packet.sender || (packet._header ? packet._header.sender.name : 'unknown') - this.push(packet) // push to all active socket servers - let res = { response:`status change - pushed toggled state of ${packet.state} to all clients`} - return resolve(res) // this goes back to sender - }) - } - } - } - // console.log('this in packet processor',packet.cmd) - let cmdProps = packet.cmd.split('/') - let func = cmd[cmdProps[0]] - // console.log(func) - if (cmdProps[1]) func = func[cmdProps[1]] - // console.log(func) - if (typeof func === 'function') return func.call(this,packet) - else { - packet.error='no function for command' - return packet - } -} - - -// let test = new Test() -let test = new Socket({ port: PORT, allowAnonymous:ANONYMOUS }) -test.switches = ['off','off','off','off'] // database to hold switch states -test.registerPacketProcessor(packetProcess) -;(async () => { - console.log(await test.create()) -})().catch(err => { - console.error('FATAL: UNABLE TO START SYSTEM!\n', err) -}) diff --git a/package.json b/package.json index 117c516..0602cd6 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@uci/websocket", - "version": "0.3.11", + "version": "0.3.12", "description": "JSON packet host websocket server", "main": "src", "scripts": { diff --git a/src/socket.js b/src/socket.js index 0181dd7..6365bb2 100644 --- a/src/socket.js +++ b/src/socket.js @@ -33,10 +33,12 @@ class Socket extends Server { this.pingInterval = opts.pingInterval === false ? opts.pingInterval : (opts.pingInterval * 1000 || 5000) this.nextconsumerID = 0 // incrementer for default initial consumer ID this.consumers = new Map() + this.conPackets = opts.conPackets || opts.conPacket this.errors = [] this.errorCount = 0 this.create = this.create.bind(this) this._destroy = this._destroy.bind(this) + this._listening=false this.authenticateconsumer = this.authenticateconsumer.bind(this) this._authenticate = this._authenticate.bind(this) this.close = promisify(this.close).bind(this) @@ -49,7 +51,9 @@ class Socket extends Server { }) } // end constructor - get active() { return this.listening } + get active() { + return this._listening + } /** * create - Description @@ -57,7 +61,7 @@ class Socket extends Server { * @returns {type} Description */ async create() { - // return Promise.resolve('what') + this.emit('socket',{state:'creating', msg:'creating socket for consumers to connect'}) return new Promise((resolve, reject) => { _ON_DEATH(async () => { log.error({method:'create', line:51, msg:'\nhe\'s dead jim'}) @@ -81,11 +85,17 @@ class Socket extends Server { this.on('error', err => { this.errorCount +=1 // log errors here this.errors.push(err) - if(this.errorCount>2) this.emit('log', {level:'warn',msg:'something bad maybe going on, 3 errors', errors:this.errors}) + if(this.errorCount>2) { + this.emit('log', {level:'warn',msg:'something bad maybe going on, 3 errors', errors:this.errors}) + this.emit('socket',{state:'error', msg:'2 to 5 socket errors', errors:this.errors}) + } if(this.errorCount>5) { let errors = {level:'fatal',msg:'something fatal is going on, 6 errors', errors:this.errors} log.fatal(errors) - this.listening=false + this._listening=false + this.close(() => { + this.emit('socket',{state:'offline', msg:'too many socket errors no longer listening for consumers to connect'}) + }) this.emit('log', {active:this.active}) this.emit('log', errors) } @@ -93,7 +103,10 @@ class Socket extends Server { this.wss = new WSS({server:this}) this.wss.on('error', err => {this.emit('error', err)}) // bubble up errors this.wss.on('connection', this._connectionHandler.bind(this)) + this._listening=true + let msg = `socket ready and listening ${typeof this.address() ==='string' ? `at ${this.address()}` : `on port ${this.address().port}`}` this.emit('log',{active:this.active}) + this.emit('socket',{state:'listening', msg:msg}) resolve(`websocket ready and listening at ${this.address().address}:${this.address().port}`) }) super.listen(this.opts) @@ -251,10 +264,16 @@ class Socket extends Server { } } - if (this.opts.conPacket) { - this.opts.conPacket._header = { id: 'pushed' } - log.debug({method:'_listen', line:171, conPacket: this.opts.conPacket, msg:'pushing a preset command to just connected consumer'}) - this._send(consumer,this.opts.conPacket) // send a packet command on to consumer on connection + if (this.conPackets) { + this.conPackets = Array.isArray(this.conPackets) ? this.conPackets : [this.conPackets] + log.debug({method:'_listen', line:171, conPackets: this.conPackets, msg:'pushing a preset packets to just connected consumer'}) + this.conPackets.forEach(packet => { + if (packet) { + packet._header = {type:'on connection packet', id: 'pushed'} + // console.log('new consumer',consumer.name,'pushing connection packet \n',packet) + this._send(consumer,packet) // send a packet command on to consumer on connection + } + }) } this.emit('connection:consumer',{state:'connected', name:consumer.name, id:consumer.id})