uci-socket/src/consumer.mjs

135 lines
3.8 KiB
JavaScript

import { Socket } from 'net'
import btc from 'better-try-catch'
import bunyan from 'bunyan'
import Stream from 'delimiter-stream'
export default class Consumer extends Socket {
constructor (path, opts={}) {
super()
// set or tcp socket
if (typeof(path)!=='string') {
opts = path
this.host = opts.host || '127.0.0.1'
this.port = opts.port || 8080
} else this.path = path
this._pp = opts.packetProcessor || 'processPacket'
this.keepAlive = opts.keepAlive ? opts.keepAlive : true
this._ready = false
this.timeout = opts.timeout || 1000
this.wait = opts.wait || 20
// logging
this.log_file=opts.log_file || './socket.log'
this.log_opts = {streams:[]}
this.log_opts.name = opts.name ? opts.name : 'uci-unix-socket-consumer'
this.log_opts.streams.push({level: 'info',path: this.log_file })
if (opts.log) this.log_opts.streams.push({level: 'info',stream: process.stdout})
this.log = bunyan.createLogger(this.log_opts)
// bind to class for other class functions
this.connect = this.connect.bind(this)
}
ready() {return this._ready}
async connect (app) {
if (app) Object.assign(this, app)
this.listen()
this.log.info('listening')
return new Promise( (resolve,reject) => {
this.on('error', async (err) => {
if (err.code === 'EISCONN') {
return resolve('ready')
}
return reject(err)
})
super.connect({ port:this.port, host:this.host, path: this.path }, async () => {
this.log.info({path:this.path},'connecting')
this.setKeepAlive(this.keepAlive)
let [err, res] = await btc(isReady).bind(this)(this.ready.bind(this), this.wait, this.timeout)
if (err) reject(err)
this.log.info('handshake done, connected')
resolve(res)
})
// catch (err){
// console.log('===============',err)
// resolve('ready')
// }
// if (err) {
// console.log('===============',err)
// if (err.code === 'EISCONN') resolve('ready')
// else reject(err)
// }
}) //end promise
}
async send(packet) {
let [err, strbuf] = btc(JSON.stringify)(packet)
if (!err) {
this.log.info({packet:packet},'attempting to send packet to socket')
strbuf += '\n'
return new Promise((resolve, reject) => {
super.write(strbuf, (err) => {
if (err) reject(err)
else resolve('complete')
})
})
}
else { this.log.info({packet:packet}, 'bad packet JSON syntax')}
}
async listen () {
let packet = new Stream()
this.on('data', (chunk) => {
packet.write(chunk)
})
packet.on('data', (strJSON) => {
let [err, packet] = btc(JSON.parse)(strJSON)
if (!err) {
if (packet.ready) {
this._ready = true
return }
// set packet processing
this.pp = this.pp || this._pp
// if no processor provided use this console logger one
if (!this[this.pp]) {
this.pp = 'processPacket'
this.processPacket = async (packet) => {
this.log.info({packet:packet},'process with default logger')
console.log('packet from socket')
console.dir(packet)
return packet }
}
this[this.pp].bind(this)(packet) // process the packet
}
else { this.log.info({strJSON: strJSON},'bad packet JSON syntax')}
})
}
} // end class
// wait for handshake packet from socket
function isReady(ready, wait=30, timeout=1000) {
let log = this.log
let time = 0
return new Promise((resolve, reject) => {
(function waitReady(){
if (time > timeout) return reject('timeout trying to connect')
if (ready()) return resolve('ready')
log.info('waiting for 30ms for handshake')
time += 30
setTimeout(waitReady, wait)
})()
})
}