import createSocket from './createSocket.js' import to from 'await-to-js' import btc from 'better-try-catch' import isPlainObject from 'is-plain-object' import { EventEmitter } from 'events' import logger from '@uci-utils/logger' let log = {} // declare module wide log to be set during construction const DEFUALT_HASS_OPTS = { host: 'localhost', serverPath: 'api/websocket', protocol: 'ws', retryTimeout: 5000, timeout: 5000, retryCount: -1, port: 8123, connectionMonitor: true } class HomeAssistant extends EventEmitter { constructor(opts) { super() log = logger({ name: 'HomeAssistant', id: this.id }) // console.log('options',opts) this.opts = Object.assign(DEFUALT_HASS_OPTS, opts) // console.log({msg:'config to constructor',opts:this.opts}) this.url = (this.opts.url ? `${this.opts.url} + /${this.opts.serverPath}` : `${this.opts.protocol}://${this.opts.host}:${this.opts.port}`) + `/${this.opts.serverPath}` log.debug({msg:'url for websocket', url:this.url}) this.cmdId = 1 this.eventBusId = null // this._watchLists = {} this._watchList=[] this._silence = [] this.connected=false this.ready=false // this.watchListEventPrefix = opts.watchListEventPrefix || 'wl' this.on('error',msg=> log.error(msg)) } watch (ent,handler) { this._watchList.push(ent) this.on(ent,handler) } unWatch(ent) { this._watchList = this._watchList.filter(e=>e!==ent) this.removeListener(ent) } async connect () { this.opts.retriesLeft = this.opts.retryCount let [err,socket] = await to(createSocket(this.url,this.opts)) if (err) { const error ={msg:'error in connection, unable to establish socket', error:err} log.debug(error) this.emit('error',error) return error } this.socket = socket this.emit('connection','connected') log.info('Successfuly connected to Home Assistant') this.connected=true await this._listen() this.monitorConnection(this.opts.connectionMonitor) return 'success' } // end connect async disconnect() { this.socket.removeAllListeners('message') // cancels _listen this.socket = {} this.emit('connection','disconnected') } async exit() { await this.disconnect() // also do any other cleanup and saving log.debug('exiting per request') process.exit(0) } nextId () { return ++this.cmdId } // if entity is added with call to silence // useful for service calls where the response may result in unwanted state change events isSilent(id) { return this._silence.includes(id) } silence(id) { if (id) this._silence.push(id) } hear(id) { if (id) this._silence = this._silence.filter(ent=> ent!==id) } async _listen() { this.socket.on('message', (ev) => { let [err, packet] = btc(JSON.parse)(ev.data) if (err) { this.emit('error',{msg:'failed json parse of event data', event:ev.data, error:err}) } else { // console.log('incoming event packet from server', packet.id, packet.type) if (packet.type === 'event') { if(this._watchList.includes(packet.event.data.entity_id)) { // console.log('emitting',packet.event.data.entity_id, packet.event.event_type) // this.emit('event', packet) if (packet.event.event_type === 'state_changed') { this.emit('state_changed',packet.event.data) // console.log('emitting',packet.event.data.entity_id,!this.isSilent(packet.event.data.entity_id)) if (!this.isSilent(packet.event.data.entity_id)) this.emit(packet.event.data.entity_id,packet.event.data.new_state) // else console.log(packet.event.data.entity_id, 'was silent, not emitting') } } return } // result if (packet.type === 'result') { if (!packet.success) { this.emit('error',{msg:'failed result', packet:packet}) this.emit(packet.id,{error:packet.error}) } else this.emit(packet.id,{id:packet.id, result:packet.result || packet.success}) return } // pong if (packet.type === 'pong') { this.emit('pong', packet) // so anyone can listen to pong this.emit(packet.id, 'pong') // for send command } } }) // subscribe to complete event bus const res = await this.send('subscribe_events') if (res.error) { const error = {msg:'subscription to event bus failed!', level:'fatal', error:res.error} this.emit('error', error) } else { log.info({msg:'connection to Home Assitant ready for communication'}) this.eventBusId = res.id this.emit('connection','ready') this.ready=true } // resubscribe to any specific events that are in stored on disk or in memeory } // end listen send (cmd,options={}) { return new Promise( (resolve, reject) => { // if (!this.connected) reject({error:'disconnected', packet:packet}) let packet = options if (isPlainObject(cmd)) { packet = cmd } else { packet.type = cmd } packet.id = packet.id || this.nextId() // console.log('message to send', packet) let [err, message] = btc(JSON.stringify)(packet) // try/catch or btc if (err) { const error = {msg:'failed to parse message', packet:packet} this.emit('error',error) reject(error) } this.socket.send(message) let timeout = setTimeout( ()=>{ let error = {msg:'failed to get a response in 5 seconds', packet:packet} reject(error) this.emit('error',error) },5000) this.on(packet.id, (res) => { log.debug({msg:'reply packet from send', packet:packet, response:res}) clearTimeout(timeout) resolve(res) }) }) } async getEntities (ents='all',type='obj') { // console.log('getting entities', ents) let single = false if (typeof ents ==='string') { if (ents ==='array' || ents ==='all') {ents = 'all'; type = 'array'} else single=true } let [err, res ] = await to(this.send('get_states')) if (err) { const error = {msg:'unable to get entities', entities:ents, error:err} this.emit('error',error) return error } if (ents !== 'all') { if (typeof list === 'string') ents = [ents] return res.result.filter( item => ents.indexOf(item.entity_id) > -1 ) } if (single) return res[0] || {} if (type == 'obj') { let obj = {} res.forEach(ent => obj[ent.entity_id]=ent) return obj } return res.result } // todo need to autobind async callService(domain,service,data) { // console.log(domain,service,data) if (data.value !== {} || data.value==null) { let packet = { type: 'call_service', domain: domain, service: service, service_data: data } this.silence(data.entity_id) const [err, res] = await to(this.send(packet)) this.hear(data.entity_id) if (err) { const error ={msg:'service call failed', level:'error', error:err} this.emit('error', error) return error } return res } } // only changes state passively async updateEntity(entity,state) { return await this.callService('python_script','state_change',{entity_id:entity, state:state}) } async setVariable(eid,value) { return await this.callService('variable','set_variable', {variable:eid, value:value}) } async switch(eid, mode='toggle') { // works for input_boolean too if (typeof mode ==='boolean') mode = mode ?'on':'off' else mode = mode.toLowerCase() const domain = eid.split('.').shift() const service = (mode === 'on' || mode === 'off') ? `turn_${mode}` : 'toggle' return await this.callService(domain,service,{entity_id:eid}) } async setNumber(eid,value) { const domain = eid.split('.').shift() const service = 'set_value' return await this.callService(domain, service, {entity_id:eid, value:value}) } // if value is array then it's an array of options to set for select async setSelect(eid,value) { const domain = eid.split('.').shift() const service = Array.isArray(value) ? 'set_options':'select_option' const key = Array.isArray(value) ? 'options' : 'option' return await this.callService(domain,service, {entity_id:eid, [key]:value}) } monitorConnection(enabled) { // console.log('monitoring', enabled) if (!enabled) console.log('WARNING: connection monitoring is disabled, system will not know if HA disconnected') let ping = null // reference to holds the ping timeout async function queuePing() { // let id = this.nextId() log.debug(`sending ping id ${this.cmdId+1}, setting pong timeout`) let [err, res] = await to(this.send('ping')) if (err) { clearTimeout(ping) const msg ='no pong received in 5 seconds, notifiy and attempt new connection' log.debug({msg:msg}) // this.emit('error',{msg:msg}) await this.disconnect() // removes socket and clears message listener this.connect() } else { if (res !== 'pong' ) { const error = {msg:'something major wrong, message was not a pong for this id', response:res} this.emit('error',error) } else { log.debug('pong received, waiting 5 secs before sending next ping') setTimeout( () => ping = queuePing.call(this),5000) } } } if (enabled) { log.debug('enabling ping pong monitor') ping = queuePing.call(this) } else { if (ping) clearTimeout(ping) log.debug('ping pong monitor disabled') } } } // end class export default HomeAssistant