158 lines
4.8 KiB
JavaScript
158 lines
4.8 KiB
JavaScript
// import { Socket, Consumer } from '@uci/socket'
|
|
import UCISocket from '../../uci-socket/src'
|
|
import packet from './packet.mjs'
|
|
import EventEmitter from 'events'
|
|
|
|
const USOCKET = __dirname + '/unix.sock'
|
|
|
|
export default class Base extends EventEmitter {
|
|
constructor(opts={}) {
|
|
super()
|
|
opts.path = opts.path || USOCKET
|
|
this.id = opts.id || opts.name || 'uci:'+ Math.random()*100
|
|
this.desc = opts.desc // additional details for humans
|
|
this.socket={}
|
|
opts.sockets.split(/[,|\s]+/).forEach( opts_socket => {
|
|
let socket = {}
|
|
opts_socket.split(/[>#]+/).map(function(prop,index) {
|
|
socket[SOCKET_INFO_KEYS[index]] = prop
|
|
})
|
|
let args =[]
|
|
if (!opts[socket.name]) opts[socket.name] = {}
|
|
let path = opts[socket.name].path || opts.path
|
|
if (socket.transport ==='n') args.push(path)
|
|
opts.name = this.id +':'+ socket.name
|
|
args.push(opts)
|
|
this.socket[socket.name] = new UCISocket[TRANSLATIONS[socket.type]](...args)
|
|
Object.assign(this.socket[socket.name],socket)
|
|
// this will be removed when moving to emit
|
|
this.socket[socket.name].packet = Object.assign({},packet[TRANSLATIONS[socket.type]])
|
|
console.log(socket.name,'=========\n', this.socket[socket.name])
|
|
})
|
|
|
|
|
|
// opts.id = this.id +':'+sock
|
|
// if (!opts[sock]) opts[sock] = {}
|
|
// switch (sock) {
|
|
// case
|
|
// 'us':
|
|
// opts.us.path = opts.us.path || opts.path
|
|
//
|
|
// this.socket[sock].info = {transport:'np', id:sock, }
|
|
// this.socket[sock].packet._process = socketEmit.bind(this.socket[sock],this.emit)
|
|
// break
|
|
// case 'uc':
|
|
// opts.uc.path = opts.uc.path || opts.path
|
|
// this.socket[sock] = new Consumer(opts.uc.path,opts)
|
|
// this.socket[sock].packet = Object.assign({},packet.consumer)
|
|
// break
|
|
// case 'ts':
|
|
// this.socket[sock]= new Socket(opts.ts, opts)
|
|
// this.socket[sock].packet = Object.assign({},packet.socket)
|
|
// break
|
|
// case 'tc':
|
|
// this.socket[sock]= new Consumer(opts.tc,opts)
|
|
// this.socket[sock].packet = Object.assign({},packet.consumer)
|
|
// }
|
|
// })
|
|
|
|
|
|
} // end constructor
|
|
|
|
async init (context) {
|
|
let sockets = []
|
|
context = context || this // additional context is Base instance (or callsite) if none supplied
|
|
this.registerPacketContext(context)
|
|
for(let type of Object.keys(this.socket)){
|
|
if (type.indexOf('s')!==-1) {
|
|
sockets.push(this.socket[type].create())
|
|
// setTimeout(this.socket[type].create,200) // test for consumer retry for delayed socket creation
|
|
}
|
|
else {
|
|
sockets.push(this.socket[type].connect())
|
|
}
|
|
}
|
|
await Promise.all(sockets)
|
|
|
|
this.on('packet', packet => {
|
|
console.log('==========>',packet.socket)
|
|
})
|
|
} // init
|
|
|
|
registerPacketContext(type, context) {
|
|
if (typeof type === 'string') {
|
|
this.socket[type].packet.context = context
|
|
} else {
|
|
context=type
|
|
for(let type of Object.keys(this.socket)){
|
|
this.socket[type].packet.context = context
|
|
}
|
|
}
|
|
}
|
|
|
|
amendPacketContext(type, context) {
|
|
if (typeof type === 'string') {
|
|
Object.assign(this.socket[type].packet.context,context)
|
|
} else {
|
|
context=type
|
|
for(let type of Object.keys(this.socket)){
|
|
Object.assign(this.socket[type].packet.context,context)
|
|
}
|
|
}
|
|
}
|
|
|
|
amendPacketProcessing(type,funcs) {
|
|
if (typeof type === 'string') {
|
|
this.socket[type].packet = Object.assign({},this.socket[type].packet,funcs)
|
|
} else {
|
|
funcs=type
|
|
for(let type of Object.keys(this.socket)){
|
|
this.socket[type].packet = Object.assign(this.socket[type].packet,funcs)
|
|
}
|
|
}
|
|
}
|
|
|
|
registerPacketProcessor(type,func) {
|
|
if (typeof type === 'string') {
|
|
this.socket[type].packet._process = func
|
|
} else {
|
|
func = type
|
|
for(let type of Object.keys(this.socket)){
|
|
this.socket[type].registerPacketProcessor(func)
|
|
}
|
|
}
|
|
}
|
|
|
|
async sendTCP(packet) {
|
|
await this.socket.tc.send(packet)
|
|
}
|
|
|
|
async sendIPC(packet) {
|
|
await this.socket.uc.send(packet)
|
|
}
|
|
|
|
async send (transport, packet) {
|
|
if (typeof transport === 'string') {
|
|
if (this.socket[transport]) await this.socket.uc.send(packet)
|
|
}
|
|
else {
|
|
packet = transport
|
|
for(let type of Object.keys(this.socket)){
|
|
if (type.indexOf('c')!==-1) {
|
|
await this.socket[type].send(packet)
|
|
}
|
|
}
|
|
}
|
|
this.emit('packet', packet) // intra process, need to set a 'packet' listener to process it
|
|
}
|
|
|
|
} // end class
|
|
|
|
const SOCKET_INFO_KEYS = ['name','type','transport']
|
|
const TRANSLATIONS = {t:'TCP',s:'Socket',c:'Consumer',n:'Named Pipe',}
|
|
|
|
const socketEmit = function (emit,packet) {
|
|
packet.socket = this.info
|
|
emit('packet', packet)
|
|
}
|