diff --git a/package.json b/package.json index 88e0971..56449f2 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@uci/mqtt", - "version": "0.1.9", + "version": "0.1.10", "description": "mqtt client with json payloads and mqtt custom broker", "main": "src", "scripts": { @@ -25,9 +25,9 @@ }, "homepage": "https://github.com/uCOMmandIt/uci-changeme#readme", "dependencies": { - "@uci-utils/logger": "0.0.13", - "async-mqtt": "^1.0", - "better-try-catch": "^0.6.2", + "@uci-utils/logger": "^0.0.14", + "async-mqtt": "1.0.1", + "better-try-catch": "0.6.2", "is-plain-object": "^2.0.4", "lodash.merge": "^4.6.1", "lodash.union": "^4.6.0", diff --git a/src/client.js b/src/client.js index 4c4fbd1..fc9b478 100644 --- a/src/client.js +++ b/src/client.js @@ -25,12 +25,13 @@ class MQTTClient { this.id = opts.id || opts.name || 'mqtt:' + new Date().getTime() log = logger({ file: 'src/client.js', - class: 'Client', + package:'@uci/mqttjs', + class: 'MQTTClient', name: 'mqtt', id: this.id }) this.url = opts.url || null - // subscription topics can be string of commna delimited or array of strings see object see mqtt.js docs + // subscription topics can be string of commna delimited or array of strings. If none they by default topic is id this.topics = Array.isArray(opts.topics) ? opts.topics : opts.topics @@ -67,26 +68,17 @@ class MQTTClient { this._listen() this.on('reconnect', () => { - log.info('mqtt client reconnected to broker') + log.debug({method:'connect', line:71, msg:'mqtt client reconnected to broker'}) this.subscribe(this.topics) }) this.on('error', err => { - log.fatal({ err: err }, 'connection error to broker') + log.fatal({method:'connect', line:76, err: err, msg:'connection error to broker'}) }) this.subscribe(this.topics) - log.info( - { options: this._client.options }, - `mqtt client connected to broker at ${ - this._client.options.hostname - }:${this._client.options.port}` - ) - resolve( - `mqtt client connected to broker at ${ - this._client.options.hostname - }:${this._client.options.port}` - ) + log.debug({method:'connect', line:80, options: this._client.options, msg:`mqtt client connected to broker at ${this._client.options.hostname}:${this._client.options.port}`}) + resolve(`mqtt client connected to broker at ${this._client.options.hostname}:${this._client.options.port}`) }) }) //end promise } @@ -96,7 +88,7 @@ class MQTTClient { if (typeof topic === 'string') topic = topic.split(',') this.topics = union(this.topics, topic) } - log.info(`subscription for topic ${topic} added`) + log.debug({method:'subscribe', line:91, msg:`subscription for topic ${topic} added`}) this._subscribe(topic, options) } @@ -110,7 +102,7 @@ class MQTTClient { async send(packet, topics, options) { if (!this.hasOwnProperty('publish')) { - log.warn({url:this.url, opts:this.opts, msg:'connect method never called, initialization needed, aborting send'}) + log.error({method:'send', line:105, url:this.url, opts:this.opts, msg:'connect method never called, initialization needed, aborting send'}) return } if (typeof topics !== 'string' && !Array.isArray(topics)) { @@ -119,16 +111,20 @@ class MQTTClient { } else { if (typeof topics === 'string') topics = topics.split(',') } - let payload = {} + + let payload if (typeof packet !== 'string') { - payload = this._serialize(packet) - if (!payload) - payload = this._serialize({ err: 'could not serialze packet' }) - topics = [packet.cmd] || topics // if payload has a cmd use that as topic - } else payload = this._serialize({ data: packet }) + if (packet.payload) payload = packet.payload.toString() + else { + payload = this._serialize(packet) + if (!payload) payload = this._serialize({ err: 'could not serialze packet' }) + } + topics = packet.cmd ? [packet.cmd.replace(/\./gi,'/') ] : topics // if payload has a cmd use that as topic, convert any . to slash + } else payload = packet // packet was just a string + let pubs = [] topics.forEach(async topic => { - log.info(`sending ${payload} to topic ${topic} with options ${options}`) + log.debug({method:'send', line:127, msg:'mqtt publishing', payload:payload, topic:topic}) pubs.push(this.publish(topic, payload, options)) }) return await Promise.all(pubs) @@ -141,35 +137,33 @@ class MQTTClient { _serialize(json) { let [err, payload] = btc(JSON.stringify)(json) if (err) { - log.warn(`error unable to stringify json:${json}`) + log.warn({method:'_serialize', line:140, msg:`unable to stringify json for payload:${json}`}) return null } return payload } _listen() { - log.info( - `listening for incoming packets from broker on topics ${this.topics}` - ) + log.info({method:'_listen', line:147, msg:`listening for incoming mqtt packets from broker on topics ${this.topics}`}) this.on('message', messageProcess.bind(this)) async function messageProcess(topic, payload) { - log.info('incoming message on topic', topic) + log.debug({method:'_listen', line:153 ,msg:'incoming mqtt message to process', topic:topic, payload:payload}) let packet = this._handlePayload(payload) if (packet) packet.cmd = packet.cmd || topic // payload had no command that use topic as cmd else packet = { cmd: topic } let res = (await this._packetProcess(packet, topic)) || {} + log.debug({method:'_listen', line:159, msg:'processed packet back to broker - cmd: should NOT be a subscribed topic or infinite loop!', res:res, topics:this.topics}) + // TODO!!!!!!!!!! make sure cmd/topic is not the same as subscribed topics or can make an infinite loop this.send(res) } } _handlePayload(payload) { let [err, packet] = btc(JSON.parse)(payload.toString()) - if (err) { - log.info('payload not json returning as prop data:') - } - if (!isPlainObject(packet)) packet = { data: payload.toString() } + if (err) log.debug({method:'_handlePayload', line:201, msg:'payload is not json returning as prop data:'}) + if (!isPlainObject(packet)) return { data: payload.toString() } return packet } @@ -194,19 +188,19 @@ class MQTTClient { }) this.on('close', () => { - log.info('connection to broker was closed') + log.debug({method:'_log', line:193, msg:'connection to broker was closed'}) }) this.on('offline', () => { - log.info('this client has gone offline from broker') + log.warn({method:'_log', line:197, msg:'this client has gone offline from broker'}) }) this.on('packetsend', packet => { - log.info({ packet: packet }, 'outgoing packet to mqtt broker') + log.debug({method:'_log', line:201, packet: packet, msg:'outgoing packet to mqtt broker'}) }) this.on('packetreceive', packet => { - log.info({ packet: packet }, 'incoming packet from mqtt broker') + log.debug({method:'_log', line:205, packet: packet, msg:'incoming packet from mqtt broker'}) }) }