uci-socket/src/consumer.mjs

122 lines
3.4 KiB
JavaScript

import { Socket } from 'net'
import btc from 'better-try-catch'
import bunyan from 'bunyan'
// import Stream from 'delimiter-stream'
import JsonStream from './json'
export default class Consumer extends Socket {
constructor (path={}, opts={}) {
super()
// set or tcp socket
if (typeof(path)!=='string') {
opts = path
if (!path.host || !path.port) opts = path
this.host = path.host || '127.0.0.1' // TODO log a warning about host on same machine
this.port = path.port || 8080
} else this.path = path
this.packet = {
_process: async (packet) => {
console.log('default processor -- packet from socket')
console.dir(packet)
return packet }
}
this.keepAlive = opts.keepAlive ? opts.keepAlive : true
this._ready = false
this.timeout = opts.timeout || 500
this.wait = opts.wait || 5
this.stream = new JsonStream()
this.packet = {
_process: (packet) => {
packet.res='echoed'
return packet }
}
// logging
this.log_file=opts.log_file || './socket.log'
this.log_opts = {streams:[]}
this.log_opts.name = opts.name ? opts.name : 'uci-socket-consumer'
// this.log_opts.streams.push({level: 'info',path: this.log_file })
if (opts.log) this.log_opts.streams.push({level: 'info',stream: process.stdout})
this.log = bunyan.createLogger(this.log_opts)
// bind to class for other class functions
this.connect = this.connect.bind(this)
this.ready = this.ready.bind(this)
}
ready() {return this._ready}
async connect (context) {
if (context) this.packet.context = context
else this.packet.context = this
this.listen()
this.log.info('listening')
return new Promise( (resolve,reject) => {
this.on('error', async (err) => {
if (err.code === 'EISCONN') {
return resolve('ready')
}
return reject(err)
})
super.connect({ port:this.port, host:this.host, path: this.path }, async () => {
this.log.info({path:this.path},'connecting')
this.setKeepAlive(this.keepAlive)
let [err, res] = await btc(isReady).bind(this)(this.ready, this.wait, this.timeout)
if (err) reject(err)
this.log.info('handshake done, connected')
resolve(res)
})
}) //end promise
}
async send(packet) {
await this.write(this.stream.serialize(packet))
// handle error here?
}
async listen () {
this.on('data', this.stream.onData)
this.stream.on('message', messageProcess.bind(this))
async function messageProcess (packet) {
// console.log('incoming packet from server', packet)
if (packet.ready) {
this._ready = true
return }
// console.log('consumer processor',this.packet._process)
await this.packet._process(packet)
}
}
registerPacketContext(obj) {
this.packet.context = obj
}
registerPacketProcessor (func) {
this.packet._process = func
}
} // end class
// wait for handshake packet from socket
function isReady(ready, wait=30, timeout=1000) {
let log = this.log
let time = 0
return new Promise((resolve, reject) => {
(function waitReady(){
if (time > timeout) return reject(`timeout trying to connect after ${timeout}ms`)
if (ready()) return resolve('ready')
log.info(`waiting ${wait}ms for handshake`)
time += wait
setTimeout(waitReady, wait)
})()
})
}