import { Server } from 'net' import { unlink as fileDelete } from 'fs' import path from 'path' import mkdir from 'make-dir' import btc from 'better-try-catch' import _ON_DEATH from 'death' //this is intentionally ugly import JSONStream from './json-stream' import clone from 'clone' // import logger from '../../uci-logger/src/logger' import logger from '@uci/logger' let log = {} // TODO change default pipe dir for windows and mac os const DEFAULT_PIPE_DIR = (process.env.SOCKETS_DIR || '/tmp/UCI') const DEFAULT_SOCKET_NAME = 'uci-sock' export default class Socket extends Server { constructor (opts={}) { super() this.id = opts.id || opts.name || 'socket:'+ new Date().getTime() if (!opts.path) { opts.host = opts.host || '0.0.0.0' opts.port = opts.port || 8080 } else { if (typeof opts.path === 'boolean') opts.path = path.join(DEFAULT_PIPE_DIR,DEFAULT_SOCKET_NAME ) if (path.dirname(opts.path)==='.') opts.path = path.join(DEFAULT_PIPE_DIR,opts.path ) } this.opts = opts // for use to recover from selected errors //self bindings this._listen = this._listen.bind(this) this.create = this.create.bind(this) log = logger({file:'src/socket.js',class:'Socket',name:'socket',id:this.id}) } // end constructor async create () { return new Promise( async (resolve,reject) => { // couple ways to kill socket process when needed _ON_DEATH( async () => { log.info('\nhe\'s dead jim') await this._destroy() }) process.once('SIGUSR2', async () => { await this._destroy process.kill(process.pid, 'SIGUSR2') }) this.once('error', async (err) => { // recover from socket file that was not removed if (err.code === 'EADDRINUSE') { if (this.opts.path) { // if TCP socket should already be dead log.warn({socket: this.opts.path}, 'socket already exists...deleting') await fileDelete(this.opts.path) return await this._listen(this.opts) } } if (err.code ==='EACCES'){ console.log({socket: this.opts.path}, 'directory does not exist...creating') await mkdir(path.dirname(this.opts.path)) console.log({socket: this.opts.path}, 'created') log.warn({socket: this.opts.path}, 'directory does not exist...creating') return await this._listen(this.opts) } // otherwise fatally exit log.info(err, 'creating socket') reject(err) }) let [err, res] = await btc(this._listen)(this.opts) if (err) reject(err) resolve(res) }) // end creeate promise } // end create registerPacketProcessor (func) { this._packetProcess = func } async _listen (opts) { super.listen(opts, async (err, res) => { if (err) return err // this gets called for each client connection and is unique to each this.on('connection', async (socket) => { let write = this._write.bind(socket) const stream = new JSONStream() log.info('new consumer connecting sending handshake') write(await stream.serialize({'_handshake':true})) socket.on('data', stream.onData) // TODO need to start error listener for stream so errors can be processed stream.on('message', messageProcess.bind(this)) async function messageProcess (packet) { //console.log(' incoming packet on socket side',packet) let res = await this._packetProcess(clone(packet)) if (Object.keys(res).length === 0) res = { error: 'socket packet command function likely did not return a promise', packet:packet} res._header = clone(packet._header,false) //make sure return packet has header with id in case it was removed in processing delete packet._header // remove before adding to response header as request res._header.request = clone(packet,false) res._header.responder = {name:this.name,instanceID:this.id} res._header.socket = this._connectionKey if (!res.cmd) res.cmd = 'reply' // by default return command is 'reply' let [err, ser] = await btc(stream.serialize)(res) if (err) ser = await stream.serialize({ error: 'was not able to serialze the res packet', err:err, _header:{id:res._header.id}}) // console.log('serialized ready for write',ser) log.info(await write(ser)) } }) // end connecttion consumer log.info({opts: this.opts},'socket created') return res }) // end super listen callback } // end listen async _destroy () { log.info('closing down socket') await this.close() log.info('all connections closed....exiting') process.exit() } // default packet process, just a simple echo async _packetProcess (packet) { return new Promise(resolve => { resolve(packet) }) } // must have a consumer socket bound to use async _write(packet) { // timeout already set if sockect can't be drained in 10 secs return new Promise(resolve => { const cb = () => resolve('packet written to socket stream') if (!this.write(packet)) { this.once('drain',cb ) } else { process.nextTick(cb) } }) } } // end class