From a6d928bf5629a7b3453eb06d2a01db2bb0c8b599 Mon Sep 17 00:00:00 2001 From: David Kebler Date: Tue, 14 Jan 2020 09:20:44 -0800 Subject: [PATCH] 0.1.38 processing.js added default ready command for socket removed arrow functions from socket cmd functions must Always be regular functions for setting context base.js better validation and defaults for options to registerSocket add by default a ready observer for when socket starts listening --- package.json | 2 +- src/base.js | 64 ++++++++++++++++++++++++++--------------------- src/processing.js | 16 ++++++++---- 3 files changed, 47 insertions(+), 35 deletions(-) diff --git a/package.json b/package.json index e28d5f9..996d674 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@uci/base", - "version": "0.1.37", + "version": "0.1.38", "description": "Multi type and transport JSON packet communication base class. Used in UCI extended classes", "main": "src/base", "scripts": { diff --git a/src/base.js b/src/base.js index 15a476c..ab10d15 100644 --- a/src/base.js +++ b/src/base.js @@ -62,7 +62,8 @@ const TRANSLATE = { class Base extends EventEmitter { constructor(opts = {}) { super() - this.id = opts.id || opts.name || 'uci-base:' + new Date().getTime() + this.name = opts.name || opts.appName || 'a base class instance' + this.id = opts.id || 'uci-base:' + new Date().getTime() log = logger({ name: 'base', id: this.id }) this.desc = opts.desc // additional details for humans this._socket = {} // holds all the various communication sockets @@ -71,37 +72,31 @@ class Base extends EventEmitter { this.retryWait = opts.retryWait this.defaultReturnCmd = opts.defaultReturnCmd this._cmdProcessors = { _default: cmdProcessor } + this.ready = new Ready({emitter: this}) // _c and _s are the default namespaces this._namespaces =Object.assign({},namespaces) this._c = Object.assign({},defaultCmds.c) this._s = Object.assign({},defaultCmds.s) + // make available a method that will bind a whole object tree of functions + // Note: functions called via a command namespace are called by base connext by default + // if called directlly/locally they should be bound to base context if desired + this.bindFuncs = bindFuncs if (opts.useRootNS) { // add root of instance to checking for command functions - not recommended! this._namespaces.s.splice(-1, 0, null) this._namespaces.c.splice(-1, 0, null) } - // method that will bind a whole object tree of functions - this.bindFuncs = bindFuncs - // predefined sockets: - // comma delimited list of this form '#>' - this._socket = {} + // this.bindFuncs = bindFuncs // make available a method that will bind a whole object tree of functions + this._socket = {} // where all sockets are stored + // at creation defined sockets: + if (opts.port) this.registerSocket(`${opts.name||'base'}${opts.port&&opts.path ? ':t':''}`,'s','t',{port:opts.port}) + if (opts.path) this.registerSocket(`${opts.name||'base'}${opts.port&&opts.path ? ':n':''}`,'s','n',{path: opts.path}) if (opts.sockets) { - opts.sockets.split(/[,|\s]+/).forEach(socketStr => { - let socket = {} - socketStr.split(/[>#]+/).map(function(prop, index) { - socket[SOCKET_INFO_KEYS[index]] = prop - }) - this.registerSocket( - socket.name, - socket.type, - socket.transport, - opts[socket.name] - ) - }) + let sockets = opts.sockets + sockets = Array.isArray(sockets) ? sockets:[sockets] + sockets.forEach(socket => this.registerSocket(socket)) } - this.ready = new Ready({emitter: this, verbose:process.env.UCI_READY_VERBOSE}) - } // end constructor /* @@ -187,16 +182,23 @@ class Base extends EventEmitter { * @returns {function} if called before base initialzation it can be ignored as all added sockets will be initialized. After through it be called to initialize that socket */ registerSocket(name, type = 'c', transport = 'n', options = {}) { + // console.log('=========================================REGISTER=========',name) + if (isPlainObject(name)) ({name, type = 'c', transport = 'n', options = {}} = name) + if (typeof name !=='string') return null transport = this._validateTransport(transport) - log.debug({ socketName: name, type: type, tranport: transport, options: options, method:'addSocket', line:147, msg:`adding socket ${name}`}) - options.id = options.id || this.id + ':' + name - if (type==='c') options = Object.assign({initTimeout:this.initTimeout, retryWait:this.retryWait},options) - if (type==='s') options = Object.assign({defaultReturnCmd:this.defaultReturnCmd},options) + // console.log({ socketName: name, type: type, tranport: transport, options: options, method:'addSocket', line:198, msg:`adding socket ${name}`}) + options.id = options.id || name + options.name = options.name || name + // TODO add a 'd' type for duplex which creates an 's' first and waits on connect to make a 'c' + if (type==='c') options = Object.assign({initTimeout:this.initTimeout, retryWait:this.retryWait},options) // outbound + if (type==='s') options = Object.assign({defaultReturnCmd:this.defaultReturnCmd},options) // inbound + // TODO get rid of hard coded transports and use registered transports (t and n being default) switch (transport) { case 'n': options.path = options.path || true // falls through case 't': + // console.log('==========socket options==========\n',name,type,transport,options) this._socket[name] = new Socket[TRANSLATE[type]](options) break case 'm': @@ -232,16 +234,20 @@ class Base extends EventEmitter { this.emit(event,obj) }) }) - if (type==='c') { - - this.ready.addObserver(name,this.getSocket(name),{event:'connection:socket',condition:ev=>{return ev.state==='connected'}}) + this.ready.addObserver(name,this._socket[name],{event:'connection:socket',condition:ev=>{return ev.state==='connected'}}) this._socket[name].on('pushed', packet => { packet._header.socketName=name this.emit('pushed', packet) }) } + + if (type==='s') { + // this._socket[name].on('socket',ev=>console.log(ev)) + this.ready.addObserver(name,this._socket[name],{ event:'socket', condition: ev => (ev || {}).state ==='listening' }) + } + return this.getSocketInit(name) // returns the init function (e.g. connect or create) for the socket } } @@ -634,12 +640,12 @@ class Base extends EventEmitter { packet.cmd, this._namespaces[this._type(socket) + this._transport(socket)] ) - if (cmd_func) return await cmd_func.bind(this)(packet) // todo try .call + if (cmd_func) return await cmd_func.call(this,packet) // todo try .call cmd_func = this._getCmdFuncNamespace( packet.cmd, this._namespaces[this._type(socket)] ) - if (cmd_func) return await cmd_func.bind(this)(packet) + if (cmd_func) return await cmd_func.call(this,packet) return 'failed' } diff --git a/src/processing.js b/src/processing.js index e08b2c0..1fc5d9e 100644 --- a/src/processing.js +++ b/src/processing.js @@ -10,7 +10,7 @@ let log = logger({ package: 'base',file:'processing.js'}) // common processor, will call based on type s or c the ones below const cmdProcessor = async function (packet,socket) { - let [err,res] = await to(_process[this.getSocket(socket).type].bind(this)(packet,socket)) + let [err,res] = await to(_process[this.getSocket(socket).type].call(this,packet,socket)) if (err) { let error = {cmd:'error', error:err, packet:packet, socket:socket, function:'processor', line: 15, msg:`'unhandled error in packet command function ${packet.cmd}`} log.error(error) @@ -21,8 +21,6 @@ const cmdProcessor = async function (packet,socket) { return res } -export { cmdProcessor, defaultCmds, namespaces } - // default processors for socket/server and consumer/client const _process = { s: async function (packet,socket) { @@ -64,17 +62,23 @@ const namespaces = { const defaultCmds ={ s:{ - echo: async packet => { + echo: function (packet) { packet.processed = true packet.msg = 'default socket echo' return packet }, // add sedning along an ack to any consumers and or pushing to other sockets on this device - ack: async packet => { + ack: async function (packet) { packet.cmd = 'reply' packet.ack = true packet.msg = 'this is the base default ack, superceed in your instance or extended class' return packet + }, + ready: async function (packet) { + console.log('=========================READY RECEIVED AND EMITTED ======================================\n',packet) + packet.msg = 'ready state was emitted on receiving process' + this.emit(packet.event || packet.name || packet.id, !!(packet.ready || packet.value || packet.state)) + return packet } }, c:{ @@ -88,3 +92,5 @@ const defaultCmds ={ } } } + +export { cmdProcessor, defaultCmds, namespaces }