uci-socket/src/consumer.mjs

113 lines
3.3 KiB
JavaScript

import { Socket } from 'net'
import btc from 'better-try-catch'
import bunyan from 'bunyan'
// import Stream from 'delimiter-stream'
import JsonStream from './json'
export default class Consumer extends Socket {
constructor (path, opts={}) {
super()
// set or tcp socket
if (typeof(path)!=='string') {
opts = path
this.host = opts.host || '127.0.0.1'
this.port = opts.port || 8080
} else this.path = path
this._pp = opts.packetProcessor || 'processPacket'
this.keepAlive = opts.keepAlive ? opts.keepAlive : true
this._ready = false
this.timeout = opts.timeout || 500
this.wait = opts.wait || 5
this.stream = new JsonStream()
// logging
this.log_file=opts.log_file || './socket.log'
this.log_opts = {streams:[]}
this.log_opts.name = opts.name ? opts.name : 'uci-socket-consumer'
// this.log_opts.streams.push({level: 'info',path: this.log_file })
if (opts.log) this.log_opts.streams.push({level: 'info',stream: process.stdout})
this.log = bunyan.createLogger(this.log_opts)
// bind to class for other class functions
this.connect = this.connect.bind(this)
this.ready = this.ready.bind(this)
}
ready() {return this._ready}
async connect (app={}) {
// first set the packet process
this.pp = this.pp || this._pp
if (Object.keys(app).length === 0) app = this
else app.pp = app.pp || this._pp
// set a default processor if none provided
if (!app[this.pp]) {
this.pp = 'processPacket' // reset in case alt function is missing
app.processPacket = async (packet) => {
console.log('packet from socket')
console.dir(packet)
return packet }
}
this.listen(app)
this.log.info('listening')
return new Promise( (resolve,reject) => {
this.on('error', async (err) => {
if (err.code === 'EISCONN') {
return resolve('ready')
}
return reject(err)
})
super.connect({ port:this.port, host:this.host, path: this.path }, async () => {
this.log.info({path:this.path},'connecting')
this.setKeepAlive(this.keepAlive)
let [err, res] = await btc(isReady).bind(this)(this.ready, this.wait, this.timeout)
if (err) reject(err)
this.log.info('handshake done, connected')
resolve(res)
})
}) //end promise
}
async send(packet) {
await this.write(this.stream.serialize(packet))
// throw new Error('Cannot send connection not ready')
}
async listen (app) {
this.on('data', this.stream.onData)
this.stream.on('message', messageProcess.bind(this))
async function messageProcess (packet) {
// console.log('incoming packet from server', packet)
if (packet.ready) {
this._ready = true
return }
// console.log('consumer processor',app[app.pp])
await app[app.pp].bind(app)(packet)
}
}
} // end class
// wait for handshake packet from socket
function isReady(ready, wait=30, timeout=1000) {
let log = this.log
let time = 0
return new Promise((resolve, reject) => {
(function waitReady(){
if (time > timeout) return reject(`timeout trying to connect after ${timeout}ms`)
if (ready()) return resolve('ready')
log.info(`waiting ${wait}ms for handshake`)
time += wait
setTimeout(waitReady, wait)
})()
})
}