• Jump To … +
    ./src/consumer.js ./src/index.js ./src/json-stream.js ./src/socket.js
  • socket.js

  • ¶
    import { Server } from 'net'
    import { unlink as fileDelete } from 'fs'
    import { promisify } from 'util'
    import path from 'path'
    import mkdir from 'make-dir'
    import btc from 'better-try-catch'
    import _ON_DEATH from 'death' //this is intentionally ugly
    import JSONStream from './json-stream'
    import clone from 'clone'
  • ¶

    import logger from ‘../../uci-logger/src/logger’

    import logger from '@uci/logger'
    let log = {}
  • ¶

    TODO change default pipe dir for windows and mac os

    const DEFAULT_PIPE_DIR = (process.env.SOCKETS_DIR || '/tmp/UCI')
    const DEFAULT_SOCKET_NAME = 'uci-sock'
    
    export default class Socket extends Server {
      constructor (opts={}) {
        super()
        this.id = opts.id || opts.name || 'socket:'+ new Date().getTime()
        if (!opts.path) {
          opts.host = opts.host || '0.0.0.0'
          opts.port = opts.port || 8080
        } else {
          if (typeof opts.path === 'boolean') opts.path = path.join(DEFAULT_PIPE_DIR,DEFAULT_SOCKET_NAME )
          if (path.dirname(opts.path)==='.') opts.path = path.join(DEFAULT_PIPE_DIR,opts.path )
        }
        this.clientTracking = opts.clientTracking || true
        this.clients = []  // track consumers (i.e. clients)
        this.opts = opts  // for use to recover from selected errors
  • ¶

    self bindings

        this._listen = this._listen.bind(this)
        this.create = this.create.bind(this)
        log = logger({file:'src/socket.js',class:'Socket',name:'socket',id:this.id})
      } // end constructor
    
      async create () {
    
        return new Promise( async (resolve,reject) => {
  • ¶

    couple ways to kill socket process when needed

          _ON_DEATH( async () => {
            log.info('\nhe\'s dead jim')
            await this._destroy()
          })
          process.once('SIGUSR2', async () => {
            await this._destroy
            process.kill(process.pid, 'SIGUSR2')
          })
    
          this.once('error', async (err) => {
  • ¶

    recover from socket file that was not removed

            if (err.code === 'EADDRINUSE') {
              if (this.opts.path) { // if TCP socket should already be dead
                let [err, res] = await btc(promisify(fileDelete))(this.opts.path)
                if(!err) {
                  log.info({res:res, socket: this.opts.path}, 'socket already exists.....deleted')
                  return await this._listen(this.opts)
                }
                log.fatal({err:err},'error deleting socket.  Can not establish a socket')
                return err
              }
            }
            if (err.code ==='EACCES'){
              console.log({socket: this.opts.path}, 'directory does not exist...creating')
              await mkdir(path.dirname(this.opts.path))
              console.log({socket: this.opts.path}, 'created')
              log.warn({socket: this.opts.path}, 'directory does not exist...creating')
              return await this._listen(this.opts)
            }
  • ¶

    otherwise fatally exit

            log.info(err, 'creating socket')
            reject(err)
          })
    
          let [err, res] = await btc(this._listen)(this.opts)
          if (err) reject(err)
          resolve(res)
    
        }) // end creeate promise
      } // end create
    
      registerPacketProcessor (func) {
        this._packetProcess = func
      }
    
      async _listen (opts) {
        super.listen(opts, async (err, res) => {
          if (err) return err
  • ¶

    this gets called for each client connection and is unique to each

          this.on('connection', async (socket) => {
            const stream = new JSONStream()
            socket.stream = stream // need this to track clients
            let send = this._send.bind(socket)
            if (this.clientTracking) this.clients.push(socket)
  • ¶

    TODO add ‘close’ listener to socket to remove from this.clients

            log.info('new consumer connecting')
            log.info(await send(await stream.serialize({'_handshake':true})))
            if (this.opts.conPacket) {
              this.opts.conPacket._header = { id:'pushed'}
              log.info({conPacket:this.opts.conPacket},'pushing a preset command to just connected consumer')
              send(await stream.serialize(this.opts.conPacket)) // send a packet command on to consumer on connection
            }
            socket.on('data', stream.onData)
  • ¶

    TODO need to start error listener for stream so errors can be processed

            stream.on('message', messageProcess.bind(this,socket))
    
            async function messageProcess (client, packet) {
              log.info({packet:packet},'incoming packet on socket side')
              let res = {}
              if (this.clientTracking && packet.clientID) {
                client.ID = packet.clientID
                res.cmd='ackID'
              }
              else {
                res = await this._packetProcess(clone(packet)) || {}
                if (Object.keys(res).length === 0) res = { error: 'socket packet command function likely did not return a promise', packet:packet}
              }
              if (packet) {
                res._header = clone(packet._header,false) || {} //make sure return packet has header with id in case it was removed in processing
                delete packet._header  // remove before adding to response header as request
              } else res._header = {}
              res._header.request = clone(packet,false)
              res._header.responder = {name:this.name,instanceID:this.id}
              res._header.socket = this.address()
              if (!res.cmd) res.cmd = 'reply'  // by default return command is 'reply'
              let [err, ser] = await btc(stream.serialize)(res)
              if (err) ser = await stream.serialize({ error: 'was not able to serialze the res packet', err:err, _header:{id:res._header.id}})
              log.info(await send(ser))
            } // end process message
    
          }) // end connecttion consumer
          log.info({opts: this.opts},'socket created')
          return res
        }) // end super listen callback
    
      } // end listen
    
      async _destroy () {
        log.info('closing down socket')
        await this.close()
        log.info('all connections closed....exiting')
        process.exit()
      }
  • ¶

    default packet process, just a simple echo

      async _packetProcess (packet) {
        return new Promise(resolve => {
          resolve(packet)
        })
      }
  • ¶

    must have a consumer socket bound to use

      async _send(packet) {
  • ¶

    timeout already set if sockect can’t be drained in 10 secs

        return new Promise(resolve => {
          const cb = () => resolve('packet written to socket stream')
          if (!this.write(packet)) {
            this.once('drain',cb )
          } else {
            process.nextTick(cb)
          }
        })
      }
    
      async push (packet,id) {
        packet._header = { id:'pushed'}
        log.info({opts:this.opts,packet:packet},'pushing a packet to all connected consumers')
        this.clients.forEach(async (client) => {
          if (client.writable) {
            let [err, ser] = await btc(client.stream.serialize)(packet)
            if (err) ser = await client.stream.serialize({ error: 'was not able to serialze the res packet', err:err, _header:{id:packet._header.id}})
            if (!id || id ===client.ID ) await this._send.bind(client)(ser)
          }
        })
      }
    
    } // end class