From e5167c39950bdc39d73456a9d85e928b59165c2e Mon Sep 17 00:00:00 2001 From: David Kebler Date: Mon, 8 Jan 2018 13:06:01 -0800 Subject: [PATCH] basic working socket-consumer modules with no handshaking but passing json objects --- examples/client.mjs | 30 +++++++++++++++++ examples/server.mjs | 36 ++++++++++++++++++++ package.json | 22 ++++++++---- readme.md | 6 +--- src/consumer.mjs | 43 ++++++++++++++++++++++++ src/index.mjs | 2 ++ src/socket.mjs | 82 +++++++++++++++++++++++++++++++++++++++++++++ test.mjs | 12 ------- 8 files changed, 210 insertions(+), 23 deletions(-) create mode 100644 examples/client.mjs create mode 100644 examples/server.mjs create mode 100644 src/consumer.mjs create mode 100644 src/index.mjs create mode 100644 src/socket.mjs delete mode 100644 test.mjs diff --git a/examples/client.mjs b/examples/client.mjs new file mode 100644 index 0000000..c00d8b4 --- /dev/null +++ b/examples/client.mjs @@ -0,0 +1,30 @@ +import Consumer from '../src/consumer' + +const USOCKET = '/opt/sockets/samplecs.sock' + +const socket1 = new Consumer(USOCKET) +const socket2 = new Consumer(USOCKET) + +let packet1 = {name: 'socket1', cmd:'doit', data:'data sent by socket1'} +let packet2 = {name: 'socket2', cmd:'doit', data:'data sent by socket2'} + +; +(async () => { + + await socket1.connect() + await socket2.connect() + await socket1.listen(app) + await socket2.listen(app) + socket1.send(packet1) + socket2.send(packet2) + +})().catch(err => { + console.error('FATAL: UNABLE TO START SYSTEM!\n',err) +}) + + +// This is your socket handler waiting on a message to do something +function app (packet) { + console.log('incoming packet from socket to process') + console.dir(packet) +} diff --git a/examples/server.mjs b/examples/server.mjs new file mode 100644 index 0000000..b764b78 --- /dev/null +++ b/examples/server.mjs @@ -0,0 +1,36 @@ +import { Socket } from '../src' + +const USOCKET = '/opt/sockets/samplecs.sock' +; +(async () => { + + class Test { + constructor() { + this.socket = new Socket(USOCKET) + } + + async processPacket(packet) { + console.log('packet being processed') + console.dir(packet) + return await this[packet.cmd](packet.data,packet.name) + } + + async doit(data,name) { + let res = {} + console.log('data:', data) + res.status ='success' + res.name = name + res.data = 'this would be response from device' + return(res) + } + + init() { this.socket.create(this)} + } + + + let test = new Test() + await test.init() + +})().catch(err => { + console.error('FATAL: UNABLE TO START SYSTEM!\n',err) +}) diff --git a/package.json b/package.json index bbe6b25..66ebc4a 100644 --- a/package.json +++ b/package.json @@ -1,10 +1,17 @@ { - "name": "@uci/esmtesting", + "name": "@uci/unix-socket", "version": "0.1.0", - "description": "Test for esm", - "main": "index.js", + "description": "Bare bones intra Host Unix Socket for basic IPC on same machine", + "main": "src", "scripts": { - "start": "node -r @std/esm test" + "test": "node -r @std/esm test", + "testw": "./node_modules/.bin/nodemon -r @std/esm test", + "s": "node -r @std/esm examples/server", + "devs": "./node_modules/.bin/nodemon -r @std/esm -e mjs examples/server", + "c": "node -r @std/esm examples/client", + "cs": "node -r @std/esm client-server", + "ms": "node -r @std/esm mqtt-server", + "mc": "node -r @std/esm mqtt-client" }, "author": "David Kebler", "license": "MIT", @@ -24,9 +31,12 @@ "homepage": "https://github.com/uCOMmandIt/message#readme", "@std/esm": "cjs", "devDependencies": { - "@std/esm": "^0.18.0" + "@std/esm": "^0.18.0", + "nodemon": "^1.14.3" }, "dependencies": { - "json-ipc-lib": "^1.0.2" + "better-try-catch": "^0.6.2", + "death": "^1.1.0", + "simple-node-logger": "^0.93.33" } } diff --git a/readme.md b/readme.md index 5a743a3..98b2ce9 100644 --- a/readme.md +++ b/readme.md @@ -1,5 +1 @@ -## Native ESM Testing - -`npm start` - - esm module loader testing for module written with es modules but using .js as it was meant to be transpiled +## Various Communication Protocol/Transport Testing diff --git a/src/consumer.mjs b/src/consumer.mjs new file mode 100644 index 0000000..c3ede26 --- /dev/null +++ b/src/consumer.mjs @@ -0,0 +1,43 @@ +import { Socket } from 'net' +import btc from 'better-try-catch' + +export default class Consumer extends Socket { + constructor (path, opts={}) { + super() + this.path = path + this.keepAlive = opts.keepAlive ? opts.keepAlive : true + } + + async connect () { + + console.log('attempting to connect to socket: ', this.path) + + await super.connect({ path: this.path }) + console.log(`connected to ${this.path}`) + this.setKeepAlive(this.keepAlive) + + this.on('error', (error) => { + 'client socket error \n ', error.code + }) + + } + + async send(packet) { + let [err, strbuf] = btc(JSON.stringify)(packet) + if (!err) { + this.write(strbuf) + } + else { console.log(`bad packet JSON syntax \n ${packet}`)} + } + + async listen (app) { + this.on('data', async (buf) => { + let [err, packet] = btc(JSON.parse)(buf.toString()) + if (!err) { + app(packet) + } + else { console.log(`bad packet JSON syntax \n ${buf.toString()}`)} + }) + } + +} // end class diff --git a/src/index.mjs b/src/index.mjs new file mode 100644 index 0000000..55b503b --- /dev/null +++ b/src/index.mjs @@ -0,0 +1,2 @@ +export { default as Socket } from './socket' +export { default as Consumer } from './consumer' diff --git a/src/socket.mjs b/src/socket.mjs new file mode 100644 index 0000000..88c568b --- /dev/null +++ b/src/socket.mjs @@ -0,0 +1,82 @@ +import { Server } from 'net' +import { unlink as fileDelete } from 'fs' +import btc from 'better-try-catch' +import Logger from 'simple-node-logger' +import ON_DEATH from 'death' //this is intentionally ugly + +let logger = { + logFilePath:'logfile.log', + timestampFormat:'YYYY-MM-DD HH:mm:ss.SSS' +} + +export default class Socket extends Server { + constructor (path, opts={}) { + super() + this.path = path + this.logger = Logger.createSimpleLogger(opts.logger ? opts.logger: logger) + } // end constructor + + + async create ( app ) { + + this.on('error', async (err) => { + // recover from socket file that was not removed + if (err.code === 'EADDRINUSE') { + console.log(`socket path ${this.path} already exists...deleting`) + await fileDelete(this.path) + await this.listen(this.path) + return Promise.resolve(err.code) + } + // otherwise fatally exit + console.log('error creating socket: ',err.code) + return Promise.reject(err.code) + }) + + // + this.on('listening', async () => { + console.log(`socket created at ${this.path}`) + + // this gets called for each client connection and is unique to each + this.on('connection', (socket) => { + console.log('new consumer connected') + + socket.on('data', async (buf) => { + let [err, packet] = btc(JSON.parse)(buf.toString()) + if (!err) { + this.logger.info(`data packet received to socket \n ${packet}`) + socket.write(JSON.stringify(await app.processPacket.bind(app)(packet))) + } + else { console.log(`bad packet JSON syntax \n ${buf.toString()}`)} + + }) // end incoming data listerner + + }) // end connected consumer + }) // end socket listening listener + + // start it + await this.listen(this.path) + + // if socket is terminated then shutdown gracefully + ON_DEATH( async () => { + await this.destroy() + }) + + process.once('SIGUSR2', async () => { + await this.destroy + process.kill(process.pid, 'SIGUSR2') + + }) + + } // end create + + async destroy () { + + console.log('\nclosing down socket') + await this.close() + console.log('\n all connections closed....exiting') + process.exit() + + } // end destroy + + +} // end class diff --git a/test.mjs b/test.mjs deleted file mode 100644 index 7c7414f..0000000 --- a/test.mjs +++ /dev/null @@ -1,12 +0,0 @@ -import ipc from 'json-ipc-lib/src' - -console.log('clinet\n',[...getAllMethodNames(new ipc.client )]) - -function getAllMethodNames(obj) { - let methods = new Set() - while (obj = Reflect.getPrototypeOf(obj)) { - let keys = Reflect.ownKeys(obj) - keys.forEach((k) => methods.add(k)) - } - return methods -}