import { Server } from 'net' import { unlink as fileDelete } from 'fs' import { promisify } from 'util' import path from 'path' import mkdir from 'make-dir' import btc from 'better-try-catch' import _ON_DEATH from 'death' //this is intentionally ugly import JSONStream from './json-stream' import clone from 'clone' // import logger from '../../uci-logger/src/logger' import logger from '@uci/logger' let log = {} // TODO change default pipe dir for windows and mac os const DEFAULT_PIPE_DIR = (process.env.SOCKETS_DIR || '/tmp/UCI') const DEFAULT_SOCKET_NAME = 'uci-sock' export default class Socket extends Server { constructor (opts={}) { super() this.id = opts.id || opts.name || 'socket:'+ new Date().getTime() if (!opts.path) { opts.host = opts.host || '0.0.0.0' opts.port = opts.port || 8080 } else { if (typeof opts.path === 'boolean') opts.path = path.join(DEFAULT_PIPE_DIR,DEFAULT_SOCKET_NAME ) if (path.dirname(opts.path)==='.') opts.path = path.join(DEFAULT_PIPE_DIR,opts.path ) } this.clientTracking = opts.clientTracking || true this.clients = [] // track consumers (i.e. clients) this.opts = opts // for use to recover from selected errors //self bindings this._listen = this._listen.bind(this) this.create = this.create.bind(this) log = logger({file:'src/socket.js',class:'Socket',name:'socket',id:this.id}) } // end constructor async create () { return new Promise( async (resolve,reject) => { // couple ways to kill socket process when needed _ON_DEATH( async () => { log.info('\nhe\'s dead jim') await this._destroy() }) process.once('SIGUSR2', async () => { await this._destroy process.kill(process.pid, 'SIGUSR2') }) this.once('error', async (err) => { // recover from socket file that was not removed if (err.code === 'EADDRINUSE') { if (this.opts.path) { // if TCP socket should already be dead let [err, res] = await btc(promisify(fileDelete))(this.opts.path) if(!err) { log.info({res:res, socket: this.opts.path}, 'socket already exists.....deleted') return await this._listen(this.opts) } log.fatal({err:err},'error deleting socket. Can not establish a socket') return err } } if (err.code ==='EACCES'){ console.log({socket: this.opts.path}, 'directory does not exist...creating') await mkdir(path.dirname(this.opts.path)) console.log({socket: this.opts.path}, 'created') log.warn({socket: this.opts.path}, 'directory does not exist...creating') return await this._listen(this.opts) } // otherwise fatally exit log.info(err, 'creating socket') reject(err) }) let [err, res] = await btc(this._listen)(this.opts) if (err) reject(err) resolve(res) }) // end creeate promise } // end create registerPacketProcessor (func) { this._packetProcess = func } async _listen (opts) { super.listen(opts, async (err, res) => { if (err) return err // this gets called for each client connection and is unique to each this.on('connection', async (socket) => { const stream = new JSONStream() socket.stream = stream // need this to track clients let send = this._send.bind(socket) if (this.clientTracking) this.clients.push(socket) // TODO add 'close' listener to socket to remove from this.clients log.info('new consumer connecting') log.info(await send(await stream.serialize({'_handshake':true}))) if (this.opts.conPacket) { this.opts.conPacket._header = { id:'pushed'} log.info({conPacket:this.opts.conPacket},'pushing a preset command to just connected consumer') send(await stream.serialize(this.opts.conPacket)) // send a packet command on to consumer on connection } socket.on('data', stream.onData) // TODO need to start error listener for stream so errors can be processed stream.on('message', messageProcess.bind(this,socket)) async function messageProcess (client, packet) { log.info({packet:packet},'incoming packet on socket side') let res = {} if (this.clientTracking && packet.clientID) { client.ID = packet.clientID res.cmd='ackID' } else { res = await this._packetProcess(clone(packet)) || {} if (Object.keys(res).length === 0) res = { error: 'socket packet command function likely did not return a promise', packet:packet} } if (packet) { res._header = clone(packet._header,false) || {} //make sure return packet has header with id in case it was removed in processing delete packet._header // remove before adding to response header as request } else res._header = {} res._header.request = clone(packet,false) res._header.responder = {name:this.name,instanceID:this.id} res._header.socket = this.address() if (!res.cmd) res.cmd = 'reply' // by default return command is 'reply' let [err, ser] = await btc(stream.serialize)(res) if (err) ser = await stream.serialize({ error: 'was not able to serialze the res packet', err:err, _header:{id:res._header.id}}) log.info(await send(ser)) } // end process message }) // end connecttion consumer log.info({opts: this.opts},'socket created') return res }) // end super listen callback } // end listen async _destroy () { log.info('closing down socket') await this.close() log.info('all connections closed....exiting') process.exit() } // default packet process, just a simple echo async _packetProcess (packet) { return new Promise(resolve => { resolve(packet) }) } // must have a consumer socket bound to use async _send(packet) { // timeout already set if sockect can't be drained in 10 secs return new Promise(resolve => { const cb = () => resolve('packet written to socket stream') if (!this.write(packet)) { this.once('drain',cb ) } else { process.nextTick(cb) } }) } async push (packet,id) { packet._header = { id:'pushed'} log.info({opts:this.opts,packet:packet},'pushing a packet to all connected consumers') this.clients.forEach(async (client) => { if (client.writable) { let [err, ser] = await btc(client.stream.serialize)(packet) if (err) ser = await client.stream.serialize({ error: 'was not able to serialze the res packet', err:err, _header:{id:packet._header.id}}) if (!id || id ===client.ID ) await this._send.bind(client)(ser) } }) } } // end class