// Websocket is a native global for vanilla JS /* globals WebSocket:true */ import btc from 'better-try-catch' import EventEmitter from 'eventemitter3' import autoBind from 'auto-bind' class Consumer extends EventEmitter { constructor (url, opts = {}) { super() this.name = opts.name || 'browser' this.instanceID = new Date().getTime() this.url = url this.protocol = opts.protocol autoBind(this) } async connect () { return new Promise((resolve,reject) => { const socket = new WebSocket(this.url, this.protocol) // Connection opened socket.addEventListener('open', open.bind(this)) function open () { this.socket = socket resolve(`socket open to server at : ${this.url}`) } setTimeout(function () { reject(new Error('Socket did not connect in 5 seconds')) }, 5000) socket.addEventListener('error', function () { // console.log('Web Socket error occurred') reject(new Error('Could not connect to socket server ')) }) }) } listen (func) { this.socket.addEventListener('message', handler.bind(this)) this.on('pushed',async function(packet){ // TODO do some extra security here? let res = await this._packetProcess(packet) if (!res) { // if process was not promise returning like just logged to console console.log('warning: consumer process function was not promise returning') } }) function handler (event) { let packet = {} if (this.socket.readyState === 1) { let [err, parsed] = btc(JSON.parse)(event.data) if (err) packet = {error: `Could not parse JSON: ${event.data}`} else packet = parsed } else { packet = {error: `Connection not Ready, CODE:${this.socket.readyState}`} } if (func) func(packet) this.emit(packet._header.id, packet) } } async send (packet) { return new Promise((resolve, reject) => { if (this.socket.readyState !== 1) reject(new Error(`Connection not Ready, CODE:${this.socket.readyState}`)) packet._header = { id: Math.random().toString().slice(2), // need this for when multiple sends for different consumers use same packet instanceack sender: { name: this.name, instanceID: this.instanceID }, url: this.url } let [err, message] = btc(JSON.stringify)(packet) if (err) reject(new Error(`Could not JSON stringify: ${packet}`)) // console.log('message to send', message) this.socket.send(message) // listen for when packet comes back with unique header id this.once(packet._header.id, async function (reply) { let res = await this._packetProcess(reply) if (!res) { // if process was not promise returning like just logged to console res = reply console.log('consumer function was not promise returning - resolving unprocessed') } resolve(res) }) // end reply listener }) } registerPacketProcessor (func) { this._packetProcess = func } async _packetProcess (packet) { return Promise.resolve(packet) } } // end Consumer Class export default Consumer