Compare commits

...

4 Commits

Author SHA1 Message Date
David Kebler 09da08b548 0.3.3 update deps 2020-07-26 15:32:48 -07:00
David Kebler b856710a3b 0.3.2 catch no consumer on send 2020-07-12 18:28:44 -07:00
David Kebler 9528e71abb 0.3.1 added 100ms delay to connection packet push to avoid issue where connection packet is not sent/received 2020-03-24 14:13:50 -07:00
David Kebler 70fee6fc7c 0.3.0 bumping to .3 with upcoming 3-2020 deployment of lighting system.
no apparent bugs
2020-03-15 15:25:46 -07:00
3 changed files with 28 additions and 24 deletions

View File

@ -1,6 +1,6 @@
{
"name": "@uci/socket",
"version": "0.2.33",
"version": "0.3.3",
"description": "JSON packet intra(named)/inter(TCP) host communication over socket",
"main": "src",
"scripts": {
@ -43,16 +43,16 @@
"chai": "^4.2.0",
"chai-as-promised": "^7.1.1",
"esm": "^3.2.25",
"mocha": "^6.2.2",
"nodemon": "^2.0.0"
"mocha": "^8.0.1",
"nodemon": "^2.0.4"
},
"dependencies": {
"@uci-utils/logger": "^0.0.16",
"@uci-utils/logger": "^0.0.18",
"better-try-catch": "^0.6.2",
"clone": "^2.1.2",
"death": "^1.1.0",
"delay": "^4.3.0",
"make-dir": "^3.0.0",
"delay": "^4.4.0",
"make-dir": "^3.1.0",
"p-reflect": "^2.1.0"
}
}

View File

@ -70,7 +70,6 @@ class SocketConsumer extends Socket {
this._connection = 'offline'
this._first = true // first connection or not
this._pingTimeout // value sent from socket upon connect
console.log('consumer.js @uci/socket package tag 0.2.33')
}
get connected() { return this._connected}
@ -190,7 +189,6 @@ class SocketConsumer extends Socket {
if (err)
resolve({error: 'unable to serialize packet for sending',packet: packet})
if (this.active && this.writable) {
console.log('writable can write')
let res2 = await this.__write(res)
if (res2.error) resolve(res2)
// if no write error then wait for send response
@ -295,7 +293,6 @@ async function handshake (packet) {
this.notify('handshake')
const authPacket = Object.assign(this._authenticate() || {}, {_authenticate:true, data:this._data})
// console.log('----------------authentication packet---------------',authPacket)
console.log('authenticate writable',this.writable)
let res ={}
if (!this.writable) res.error ='socket stream not writable'
else res = await this._authenticateSend(authPacket)

View File

@ -66,7 +66,7 @@ export default function socketClass(Server) {
this.pingInterval = opts.pingInterval === false ? 0 : (opts.pingInterval * 1000 || 5000)
this.consumers = new Map() // track consumers (i.e. clients) TODO use a Map
this.nextConsumerID = 0 // incrementer for default initial consumer ID
this.conPackets = opts.conPackets || opts.conPacket
this.conPackets = opts.conPackets || [opts.conPacket]
this.opts = opts // for use to recover from selected errors
this.errorCount = 0
//self bindings
@ -80,7 +80,6 @@ export default function socketClass(Server) {
class: 'Socket',
id: this.id
})
console.log('socket-class: @uci/socket package tag 0.2.33')
} // end constructor
@ -113,7 +112,7 @@ export default function socketClass(Server) {
log.info({method:'create', line:99, res: res, socket: this.opts.path, msg:'socket already exists.....deleted'})
// try again
this.removeAllListeners('listening')
return await this.create()
resolve(await this.create())
}
log.error({method:'create', line:102, err: err, msg:'error deleting socket. Can not establish a socket'})
}
@ -123,7 +122,7 @@ export default function socketClass(Server) {
await mkdir(path.dirname(this.opts.path))
log.debug({method:'create', line:109, socket: this.opts.path, msg:'directory created'})
this.removeAllListeners('listening')
return await this.create()
resolve(await this.create())
}
// otherwise fatally exit
log.error({method:'create', line:113, err:err, opts:this.opts, msg:`error creating socket server ${this.name}`})
@ -156,7 +155,6 @@ export default function socketClass(Server) {
let obj = {method:'create', line:54, msg:msg}
log.info(obj)
this.on('connection', this._connectionHandler.bind(this))
this.emit('log:',)
resolve(msg)
})
@ -251,7 +249,7 @@ export default function socketClass(Server) {
if (opts.consumer) opts.consumers = [opts.consumer]
consumers = Array.from(this.consumers).filter(([sid,consumer]) =>
opts.consumers.some(con=> {
console.log(consumer.sid,consumer.data,con)
// console.log('filtering consumers', consumer.sid,consumer.data,con)
return (
con === ( (consumer.data ||{}).name || (consumer.data ||{}).id ) ||
con.sid=== sid ||
@ -412,18 +410,16 @@ export default function socketClass(Server) {
// all's set enable main incoming message processor
stream.on('message', messageProcess.bind(this, consumer))
if (this.conPackets) {
this.conPackets = Array.isArray(this.conPackets) ? this.conPackets : [this.conPackets]
log.debug({method:'_listen', line:171, conPacket: this.opts.conPacket, msg:'pushing a preset command to just connected consumer'})
// FIXME, don't use forEach and use promise map.
this.conPackets.forEach(packet => {
if (packet) {
if (this.conPackets.length) {
setTimeout(async () => {
for (let packet of this.conPackets) {
packet._header = {type:'on connection packet', id: 'pushed'}
this._send(consumer,packet) // send a packet command on to consumer on connection
await this._send(consumer,packet) // send a packet command on to consumer on connection
}
})
},100)
}
this.emit('log',{level:'info', msg:'a consumer connected and authenticated', name:consumer.name, id:consumer.id})
this.emit('connection:consumer',{state:'connected', msg:`consumer ${(consumer.data ||{}).name} connected and authenticated to socket ${this.id}`,
name:(consumer.data ||{}).name ||(consumer.data ||{}).id || consumer.sid,
@ -435,7 +431,7 @@ export default function socketClass(Server) {
// that's it. Connection is active
async function messageProcess(consumer, packet) {
log.debug({method:'_listen', line:179, packet: packet, consumer:consumer.data, msg:'incoming packet on socket side'})
this.emit('log',{level:'mcp', packet: packet, consumer:consumer.data, msg:'incoming packet on socket side'})
let res = (await this._packetProcess(clone(packet))) || {}
if (Object.keys(res).length === 0)
res = {
@ -481,12 +477,23 @@ export default function socketClass(Server) {
async _send(consumer, packet) {
log.trace({msg:`sending to consumer:${consumer.sid}:${consumer.data.name}`, consumer:consumer.data, packet:packet})
return new Promise(async (resolve, reject) => {
if (!consumer) {
console.log('no consumer rejecting packet send')
reject('no consumer specified can not send packet')
return
}
if (!consumer.writable) {
console.log('no consumer writeable stream rejecting packet send')
reject('socket stream closed can not send packet')
return
}
let [err,ser] = await btc(consumer.stream.serialize)(packet)
if (err) reject('unable to serialze the packet')
if (!ser) {
// console.log('empty-serialized packet', consumer.name, consumer.socketName)
reject('empty packet rejecting send, nothing to send')
return
}
const cb = () => resolve('packet written to socket stream')
if (!consumer.write(ser)) {
consumer.once('drain', cb)