commit 605b8c2c4a308c31f8b54bc46c5a328016cd8610 Author: Simon Milvert Date: Fri Mar 31 18:07:37 2017 +0200 Init emonhub diff --git a/emonhub.py b/emonhub.py new file mode 100755 index 0000000..7e10469 --- /dev/null +++ b/emonhub.py @@ -0,0 +1,342 @@ +#!/usr/bin/env python + +""" + + This code is released under the GNU Affero General Public License. + + OpenEnergyMonitor project: + http://openenergymonitor.org + +""" + +import sys +import time +import logging +import logging.handlers +import signal +import argparse +import pprint +import Queue + +import emonhub_setup as ehs +import emonhub_reporter as ehr +import emonhub_interfacer as ehi +import emonhub_coder as ehc + +"""class EmonHub + +Monitors data inputs through EmonHubInterfacer instances, and sends data to +target servers through EmonHubEmoncmsReporter instances. + +Communicates with the user through an EmonHubSetup + +""" + + +class EmonHub(object): + + __version__ = 'Pre-Release Development Version (rc1.2)' + + def __init__(self, setup): + """Setup an OpenEnergyMonitor emonHub. + + Interface (EmonHubSetup): User interface to the hub. + + """ + + # Initialize exit request flag + self._exit = False + + # Initialize setup and get settings + self._setup = setup + settings = self._setup.settings + + # Initialize logging + self._log = logging.getLogger("EmonHub") + self._set_logging_level('INFO', False) + self._log.info("EmonHub %s" % self.__version__) + self._log.info("Opening hub...") + + # Initialize Reporters and Interfacers + self._reporters = {} + self._interfacers = {} + self._queue = {} + self._update_settings(settings) + + def run(self): + """Launch the hub. + + Monitor the COM port and process data. + Check settings on a regular basis. + + """ + + # Set signal handler to catch SIGINT and shutdown gracefully + signal.signal(signal.SIGINT, self._sigint_handler) + + # Until asked to stop + while not self._exit: + + # Run setup and update settings if modified + self._setup.run() + if self._setup.check_settings(): + self._update_settings(self._setup.settings) + + # For all Interfacers + for I in self._interfacers.itervalues(): + # Execute run method + I.run() + # Read socket + values = I.read() + # If complete and valid data was received + if values is not None: + # Place a copy of the values in a queue for each reporter + for name in self._reporters: + # discard if reporter 'pause' set to 'all' or 'in' + if 'pause' in self._reporters[name]._settings \ + and str(self._reporters[name]._settings['pause']).lower() in \ + ['all', 'in']: + continue + self._log.debug("first") + self._queue[name].put(values) + self._log.debug("after") + + # Sleep until next iteration + time.sleep(0.2) + + def close(self): + """Close hub. Do some cleanup before leaving.""" + + self._log.info("Exiting hub...") + + for I in self._interfacers.itervalues(): + I.close() + + for R in self._reporters.itervalues(): + R.stop = True + R.join() + + self._log.info("Exit completed") + logging.shutdown() + + def _sigint_handler(self, signal, frame): + """Catch SIGINT (Ctrl+C).""" + + self._log.debug("SIGINT received.") + # hub should exit at the end of current iteration. + self._exit = True + + def _update_settings(self, settings): + """Check settings and update if needed.""" + + # EmonHub Logging level + if 'loglevel' in settings['hub']: + self._set_logging_level(settings['hub']['loglevel']) + else: + self._set_logging_level() + + # Create a place to hold buffer contents whilst a deletion & rebuild occurs + self.temp_buffer = {} + + # Reporters + for name in self._reporters.keys(): + # Delete reporters if not listed or have no 'Type' in the settings without further checks + # (This also provides an ability to delete & rebuild by commenting 'Type' in conf) + if not name in settings['reporters'] or not 'Type' in settings['reporters'][name]: + pass + else: + try: + # test for 'init_settings' and 'runtime_setting' sections + settings['reporters'][name]['init_settings'] + settings['reporters'][name]['runtimesettings'] + except Exception as e: + # If reporter's settings are incomplete, continue without updating + self._log.error("Unable to update '" + name + "' configuration: " + str(e)) + continue + else: + # check init_settings against the file copy, if they are the same move on to the next + if self._reporters[name].init_settings == settings['reporters'][name]['init_settings']: + continue + else: + if self._reporters[name].buffer._data_buffer: + self.temp_buffer[name]= self._reporters[name].buffer._data_buffer + # Delete reporters if setting changed or name is unlisted or Type is missing + self._log.info("Deleting reporter '%s'", name) + self._reporters[name].stop = True + del(self._reporters[name]) + for name, R in settings['reporters'].iteritems(): + # If reporter does not exist, create it + if name not in self._reporters: + try: + if not 'Type' in R: + continue + self._log.info("Creating " + R['Type'] + " '%s' ", name) + # Create the queue for this reporter + self._queue[name] = Queue.Queue(0) + # This gets the class from the 'Type' string + reporter = getattr(ehr, R['Type'])(name, self._queue[name], **R['init_settings']) + reporter.set(**R['runtimesettings']) + reporter.init_settings = R['init_settings'] + # If a memory buffer back-up exists copy it over and remove the back-up + if name in self.temp_buffer: + reporter.buffer._data_buffer = self.temp_buffer[name] + del self.temp_buffer[name] + except ehr.EmonHubReporterInitError as e: + # If reporter can't be created, log error and skip to next + self._log.error("Failed to create '" + name + "' reporter: " + str(e)) + continue + except Exception as e: + # If reporter can't be created, log error and skip to next + self._log.error("Unable to create '" + name + "' reporter: " + str(e)) + continue + else: + self._reporters[name] = reporter + else: + # Otherwise just update the runtime settings if possible + if 'runtimesettings' in R: + self._reporters[name].set(**R['runtimesettings']) + + # Interfacers + for name in self._interfacers.keys(): + # Delete interfacers if not listed or have no 'Type' in the settings without further checks + # (This also provides an ability to delete & rebuild by commenting 'Type' in conf) + if not name in settings['interfacers'] or not 'Type' in settings['interfacers'][name]: + pass + else: + try: + # test for 'init_settings' and 'runtime_setting' sections + settings['interfacers'][name]['init_settings'] + settings['interfacers'][name]['runtimesettings'] + except Exception as e: + # If interfacer's settings are incomplete, continue without updating + self._log.error("Unable to update '" + name + "' configuration: " + str(e)) + continue + else: + # check init_settings against the file copy, if they are the same move on to the next + if self._interfacers[name].init_settings == settings['interfacers'][name]['init_settings']: + continue + self._interfacers[name].close() + self._log.info("Deleting interfacer '%s' ", name) + del(self._interfacers[name]) + for name, I in settings['interfacers'].iteritems(): + # If interfacer does not exist, create it + if name not in self._interfacers: + try: + if not 'Type' in I: + continue + self._log.info("Creating " + I['Type'] + " '%s' ", name) + # This gets the class from the 'Type' string + interfacer = getattr(ehi, I['Type'])(name, **I['init_settings']) + interfacer.set(**I['runtimesettings']) + interfacer.init_settings = I['init_settings'] + except ehi.EmonHubInterfacerInitError as e: + # If interfacer can't be created, log error and skip to next + self._log.error("Failed to create '" + name + "' interfacer: " + str(e)) + continue + except Exception as e: + # If interfacer can't be created, log error and skip to next + self._log.error("Unable to create '" + name + "' interfacer: " + str(e)) + continue + else: + self._interfacers[name] = interfacer + else: + # Otherwise just update the runtime settings if possible + if 'runtimesettings' in I: + self._interfacers[name].set(**I['runtimesettings']) + + if 'nodes' in settings: + ehc.nodelist = settings['nodes'] + + def _set_logging_level(self, level='WARNING', log=True): + """Set logging level. + + level (string): log level name in + ('DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL') + + """ + + # Ensure "level" is all upper case + level = level.upper() + + # Check level argument is valid + try: + loglevel = getattr(logging, level) + except AttributeError: + self._log.error('Logging level %s invalid' % level) + return False + except Exception as e: + self._log.error('Logging level %s ' % str(e)) + return False + + # Change level if different from current level + if loglevel != self._log.getEffectiveLevel(): + self._log.setLevel(level) + if log: + self._log.info('Logging level set to %s' % level) + + +if __name__ == "__main__": + + # Command line arguments parser + parser = argparse.ArgumentParser(description='OpenEnergyMonitor emonHub') + + # Configuration file + parser.add_argument("--config-file", action="store", + help='Configuration file', default=sys.path[0]+'/../conf/emonhub.conf') + # Log file + parser.add_argument('--logfile', action='store', type=argparse.FileType('a'), + help='Log file (default: log to Standard error stream STDERR)') + # Show settings + parser.add_argument('--show-settings', action='store_true', + help='show settings and exit (for debugging purposes)') + # Show version + parser.add_argument('--version', action='store_true', + help='display version number and exit') + # Parse arguments + args = parser.parse_args() + + # Display version number and exit + if args.version: + print('emonHub %s' % EmonHub.__version__) + sys.exit() + + # Logging configuration + logger = logging.getLogger("EmonHub") + if args.logfile is None: + # If no path was specified, everything goes to sys.stderr + loghandler = logging.StreamHandler() + else: + # Otherwise, rotating logging over two 5 MB files + # If logfile is supplied, argparse opens the file in append mode, + # this ensures it is writable + # Close the file for now and get its path + args.logfile.close() + loghandler = logging.handlers.RotatingFileHandler(args.logfile.name, + 'a', 5000 * 1024, 1) + # Format log strings + loghandler.setFormatter(logging.Formatter( + '%(asctime)s %(levelname)s %(message)s')) + logger.addHandler(loghandler) + + # Initialize hub setup + try: + setup = ehs.EmonHubFileSetup(args.config_file) + except ehs.EmonHubSetupInitError as e: + logger.critical(e) + sys.exit("Unable to load configuration file: " + args.config_file) + + # If in "Show settings" mode, print settings and exit + if args.show_settings: + setup.check_settings() + pprint.pprint(setup.settings) + + # Otherwise, create, run, and close EmonHub instance + else: + try: + hub = EmonHub(setup) + except Exception as e: + sys.exit("Could not start EmonHub: " + str(e)) + else: + hub.run() + # When done, close hub + hub.close() diff --git a/emonhub_buffer.py b/emonhub_buffer.py new file mode 100755 index 0000000..43d34d4 --- /dev/null +++ b/emonhub_buffer.py @@ -0,0 +1,114 @@ +""" + + This code is released under the GNU Affero General Public License. + + OpenEnergyMonitor project: + http://openenergymonitor.org + +""" + +import logging + +"""class AbstractBuffer + +Represents the actual buffer being used. +""" + + +class AbstractBuffer(): + + def storeItem(self, data): + raise NotImplementedError + + def retrieveItems(self, number): + raise NotImplementedError + + def retrieveItem(self): + raise NotImplementedError + + def discardLastRetrievedItem(self): + raise NotImplementedError + + def discardLastRetrievedItems(self, number): + raise NotImplementedError + + def hasItems(self): + raise NotImplementedError + +""" +This implementation of the AbstractBuffer just uses an in-memory data structure. +It's basically identical to the previous (inline) buffer. +""" + + +class InMemoryBuffer(AbstractBuffer): + + def __init__(self, bufferName, buffer_size): + self._bufferName = str(bufferName) + self._buffer_type = "memory" + self._maximumEntriesInBuffer = int(buffer_size) + self._data_buffer = [] + self._log = logging.getLogger("EmonHub") + + def hasItems(self): + return self.size() > 0 + + def isFull(self): + return self.size() >= self._maximumEntriesInBuffer + + def getMaxEntrySliceIndex(self): + return max(0, + self.size() - self._maximumEntriesInBuffer - 1) + + def discardOldestItems(self): + self._data_buffer = self._data_buffer[self.getMaxEntrySliceIndex():] + + def discardOldestItemsIfFull(self): + if self.isFull(): + self._log.warning( + "In-memory buffer (%s) reached limit of %d items, deleting oldest" + % (self._bufferName, self._maximumEntriesInBuffer)) + self.discardOldestItems() + + def storeItem(self, data): + self.discardOldestItemsIfFull() + self._data_buffer.append(data) + + def retrieveItem(self): + return self._data_buffer[0] + + def retrieveItems(self, number): + blen = len(self._data_buffer) + if number > blen: + number = blen + return self._data_buffer[:number] + + def discardLastRetrievedItem(self): + del self._data_buffer[0] + + def discardLastRetrievedItems(self, number): + blen = len(self._data_buffer) + if number > blen: + number = blen + self._data_buffer = self._data_buffer[number:] + + def size(self): + return len(self._data_buffer) + + +""" +The getBuffer function returns the buffer class corresponding to a +buffering method passed as argument. +""" +bufferMethodMap = { + 'memory': InMemoryBuffer + } + + +def getBuffer(method): + """Returns the buffer class corresponding to the method + + method (string): buffering method + + """ + return bufferMethodMap[method] diff --git a/emonhub_coder.py b/emonhub_coder.py new file mode 100755 index 0000000..38aaf85 --- /dev/null +++ b/emonhub_coder.py @@ -0,0 +1,32 @@ +import struct + +# Initialize nodes data +nodelist = {} + + +def check_datacode(datacode): + + # Data types & sizes (number of bytes) + datacodes = {'b': '1', 'h': '2', 'i': '4', 'l': '4', 'q': '8', 'f': '4', 'd': '8', + 'B': '1', 'H': '2', 'I': '4', 'L': '4', 'Q': '8', 'c': '1', '?': '1'} + + # if datacode is valid return the data size in bytes + if datacode in datacodes: + return int(datacodes[datacode]) + # if not valid return False + else: + return False + + +def decode(datacode, frame): + # Ensure little-endian & standard sizes used + e = '<' + + # set the base data type to bytes + b = 'B' + + # get data size from data code + s = int(check_datacode(datacode)) + + result = struct.unpack(e + datacode[0], struct.pack(e + b*s, *frame)) + return result[0] diff --git a/emonhub_interfacer.py b/emonhub_interfacer.py new file mode 100755 index 0000000..6df6000 --- /dev/null +++ b/emonhub_interfacer.py @@ -0,0 +1,698 @@ +""" + + This code is released under the GNU Affero General Public License. + + OpenEnergyMonitor project: + http://openenergymonitor.org + +""" + +import serial +import time +import datetime +import logging +import socket +import select + +import emonhub_coder as ehc + +"""class EmonHubInterfacer + +Monitors a data source. + +This almost empty class is meant to be inherited by subclasses specific to +their data source. + +""" + + +class EmonHubInterfacer(object): + + def __init__(self, name): + + # Initialize logger + self._log = logging.getLogger("EmonHub") + + # Initialise settings + self.name = name + self.init_settings = {} + self._defaults = {'pause': 'off', 'interval': 0, 'datacode': '0', 'timestamped': 'False'} + self._settings = {} + self._packet_counter = 0 + + # This line will stop the default values printing to logfile at start-up + # unless they have been overwritten by emonhub.conf entries + # comment out if diagnosing a startup value issue + self._settings.update(self._defaults) + + # Initialize interval timer's "started at" timestamp + self._interval_timestamp = 0 + + def close(self): + """Close socket.""" + pass + + def read(self): + """Read data from socket and process if complete line received. + + Return data as a list: [NodeID, val1, val2] + + """ + pass + + def _process_frame(self, frame, timestamp=0.0): + """Process a frame of data + + f (string): 'NodeID val1 val2 ...' + + This function splits the string into numbers and check its validity. + + 'NodeID val1 val2 ...' is the generic data format. If the source uses + a different format, override this method. + + Return data as a list: [NodeID, val1, val2] + + """ + + # Discard the frame if 'pause' set to 'all' or 'in' + if 'pause' in self._settings and \ + str.lower(self._settings['pause']) in ['all', 'in']: + return + + # Add timestamp if not done already + + if not timestamp: + timestamp = round(time.time(), 2) + + # Assign a "Packet" reference number + self._packet_counter +=1 + ref = self._packet_counter + + # Log data + self._log.debug(str(ref) + " NEW FRAME : " + str(timestamp) + " " + frame) + + # Get an array out of the space separated string + frame = frame.strip().split(' ') + + # create a RSSI variable + self.rssi = False + + # Validate frame + validated = self._validate_frame(ref, frame) + if not validated: + #self._log.debug('Discard RX Frame "Failed validation"') + return + else: + frame = self._decode_frame(ref, validated) + + if frame: + self._log.debug(str(ref) + " Timestamp : " + str(timestamp)) + self._log.debug(str(ref) + " Node : " + str(frame[0])) + self._log.debug(str(ref) + " Values : " + str(frame[1:])) + frame = [timestamp] + frame + # Append RSSI only if value is not 'False' + if self.rssi: + self._log.debug(str(ref) + " RSSI : " + str(self.rssi)) + frame += [self.rssi] + frame += [ref] + else: + return + + # pause output if 'pause' set to 'all' or 'out' + if 'pause' in self._settings \ + and str(self._settings['pause']).lower() in ['all', 'out']: + return + + return frame + + def _validate_frame(self, ref, received): + """Validate a frame of data + + This function performs logical tests to filter unsuitable data. + Each test discards frame with a log entry if False + + Returns True if data frame passes tests. + + """ + + # Discard if frame not of the form [node, val1, ...] + # with number of elements at least 2 + if len(received) < 2: + self._log.warning(str(ref) + " Discarded RX frame 'string too short' : " + str(received)) + return False + + # Discard if anything non-numerical found + try: + [float(val) for val in received] + except Exception: + self._log.warning(str(ref) + " Discarded RX frame 'non-numerical content' : " + str(received)) + return False + + # Discard if first value is not a valid node id + n = float(received[0]) + if n % 1 != 0 or n < 0 or n > 31: + self._log.warning(str(ref) + " Discarded RX frame 'node id outside scope' : " + str(received)) + return False + + # If it passes all the checks return + return received + + def _decode_frame(self, ref, data): + """Decodes a frame of data + + Performs decoding of data types + + Returns decoded string of data. + + """ + + node = data[0] + data = data[1:] + decoded = [] + + # check if node is listed and has individual datacodes for each value + if node in ehc.nodelist and 'datacodes' in ehc.nodelist[node]: + # fetch the string of datacodes + datacodes = ehc.nodelist[node]['datacodes'] + # fetch a string of data sizes based on the string of datacodes + datasizes = [] + for code in datacodes: + datasizes.append(ehc.check_datacode(code)) + # Discard the frame & return 'False' if it doesn't match the summed datasizes + if len(data) != sum(datasizes): + self._log.warning(str(ref) + " RX data length: " + str(len(data)) + + " is not valid for datacodes " + str(datacodes)) + return False + else: + # Determine the expected number of values to be decoded + count = len(datacodes) + # Set decoder to "Per value" decoding using datacode 'False' as flag + datacode = False + else: + # if node is listed, but has only a single default datacode for all values + if node in ehc.nodelist and 'datacode' in ehc.nodelist[node]: + datacode = ehc.nodelist[node]['datacode'] + else: + # when node not listed or has no datacode(s) use the interfacers default if specified + datacode = self._settings['datacode'] + # Ensure only int 0 is passed not str 0 + if datacode == '0': + datacode = 0 + # when no (default)datacode(s) specified, pass string values back as numerical values + if not datacode: + for val in data: + if float(val) % 1 != 0: + val = float(val) + else: + val = int(float(val)) + decoded.append(val) + # Discard frame if total size is not an exact multiple of the specified datacode size. + elif len(data) % ehc.check_datacode(datacode) != 0: + self._log.warning(str(ref) + " RX data length: " + str(len(data)) + + " is not valid for datacode " + str(datacode)) + return False + else: + # Determine the number of values in the frame of the specified code & size + count = len(data) / ehc.check_datacode(datacode) + + # Decode the string of data one value at a time into "decoded" + if not decoded: + bytepos = int(0) + for i in range(0, count, 1): + # Use single datacode unless datacode = False then use datacodes + dc = datacode + if not datacode: + dc = datacodes[i] + # Determine the number of bytes to use for each value by it's datacode + size = int(ehc.check_datacode(dc)) + try: + value = ehc.decode(dc, [int(v) for v in data[bytepos:bytepos+size]]) + except: + self._log.warning(str(ref) + " Unable to decode as values incorrect for datacode(s)") + return False + bytepos += size + decoded.append(value) + + # Insert node ID before data + decoded.insert(0, int(node)) + return decoded + + def set(self, **kwargs): + """Set configuration parameters. + + **kwargs (dict): settings to be sent. Example: + {'setting_1': 'value_1', 'setting_2': 'value_2'} + + pause (string): pause status + 'pause' = all pause Interfacer fully, nothing read, processed or posted. + 'pause' = in pauses the input only, no input read performed + 'pause' = out pauses output only, input is read, processed but not posted to buffer + 'pause' = off pause is off and Interfacer is fully operational (default) + + """ + + for key, setting in self._defaults.iteritems(): + if key in kwargs.keys(): + setting = kwargs[key] + else: + setting = self._defaults[key] + if key in self._settings and self._settings[key] == setting: + continue + elif key == 'pause' and str(setting).lower() in ['all', 'in', 'out', 'off']: + pass + elif key == 'interval' and str(setting).isdigit(): + pass + elif key == 'datacode' and str(setting) in ['0', 'b', 'B', 'h', 'H', 'L', 'l', 'f']: + pass + elif key == 'timestamped' and str(setting).lower() in ['true', 'false']: + pass + else: + self._log.warning("'%s' is not a valid setting for %s: %s" % (str(setting), self.name, key)) + continue + self._settings[key] = setting + self._log.debug("Setting " + self.name + " " + key + ": " + str(setting)) + + def run(self): + """Placeholder for background tasks. + + Allows subclasses to specify actions that need to be done on a + regular basis. This should be called in main loop by instantiater. + + """ + pass + + def _open_serial_port(self, com_port, com_baud): + """Open serial port + + com_port (string): path to COM port + + """ + + #if not int(com_baud) in [75, 110, 300, 1200, 2400, 4800, 9600, 19200, 38400, 57600, 115200]: + # self._log.debug("Invalid 'com_baud': " + str(com_baud) + " | Default of 9600 used") + # com_baud = 9600 + + try: + s = serial.Serial(com_port, com_baud, timeout=0) + self._log.debug("Opening serial port: " + str(com_port) + " @ "+ str(com_baud) + " bits/s") + except serial.SerialException as e: + self._log.error(e) + raise EmonHubInterfacerInitError('Could not open COM port %s' % + com_port) + else: + return s + + def _open_socket(self, port_nb): + """Open a socket + + port_nb (string): port number on which to open the socket + + """ + + self._log.debug('Opening socket on port %s', port_nb) + + try: + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.bind(('', int(port_nb))) + s.listen(1) + except socket.error as e: + self._log.error(e) + raise EmonHubInterfacerInitError('Could not open port %s' % + port_nb) + else: + return s + +"""class EmonhubSerialInterfacer + +Monitors the serial port for data + +""" + + +class EmonHubSerialInterfacer(EmonHubInterfacer): + + def __init__(self, name, com_port='', com_baud=9600): + """Initialize interfacer + + com_port (string): path to COM port + + """ + + # Initialization + super(EmonHubSerialInterfacer, self).__init__(name) + + # Open serial port + self._ser = self._open_serial_port(com_port, com_baud) + + # Initialize RX buffer + self._rx_buf = '' + + def close(self): + """Close serial port""" + + # Close serial port + if self._ser is not None: + self._log.debug("Closing serial port") + self._ser.close() + + def read(self): + """Read data from serial port and process if complete line received. + + Return data as a list: [NodeID, val1, val2] + + """ + + # Read serial RX + self._rx_buf = self._rx_buf + self._ser.readline() + + # If line incomplete, exit + if '\r\n' not in self._rx_buf: + return + + # Remove CR,LF + f = self._rx_buf[:-2] + + # Reset buffer + self._rx_buf = '' + + # Discard empty frames + if not f: + self._log.warning("Discarded empty frame") + return + + # unix timestamp + t = round(time.time(), 2) + + # Process data frame + return self._process_frame(f, t) + +"""class EmonHubJeeInterfacer + +Monitors the serial port for data from "Jee" type device + +""" + + +class EmonHubJeeInterfacer(EmonHubSerialInterfacer): + + def __init__(self, name, com_port='/dev/ttyAMA0', com_baud=0): + """Initialize Interfacer + + com_port (string): path to COM port + + """ + + # Initialization + if com_baud != 0: + super(EmonHubJeeInterfacer, self).__init__(name, com_port, com_baud) + else: + for com_baud in (38400, 9600): + super(EmonHubJeeInterfacer, self).__init__(name, com_port, com_baud) + self._ser.write("?") + time.sleep(2) + self._rx_buf = self._rx_buf + self._ser.readline() + if '\r\n' in self._rx_buf or '\x00' in self._rx_buf: + self._ser.flushInput() + self._rx_buf="" + break + elif self._ser is not None: + self._ser.close() + continue + + # Display device firmware version and current settings + self.info = ["",""] + if self._ser is not None: + self._ser.write("v") + time.sleep(2) + self._rx_buf = self._rx_buf + self._ser.readline() + if '\r\n' in self._rx_buf: + self._rx_buf="" + info = self._rx_buf + self._ser.readline()[:-2] + if info != "": + # Split the returned "info" string into firmware version & current settings + self.info[0] = info.strip().split(' ')[0] + self.info[1] = info.replace(str(self.info[0]), "") + self._log.info( self.name + " device firmware version: " + self.info[0]) + self._log.info( self.name + " device current settings: " + str(self.info[1])) + else: + # since "v" command only v11> recommend firmware update ? + #self._log.info( self.name + " device firmware is pre-version RFM12demo.11") + self._log.info( self.name + " device firmware version & configuration: not available") + else: + self._log.warning("Device communication error - check settings") + self._rx_buf="" + self._ser.flushInput() + + # Initialize settings + self._defaults.update({'pause': 'off', 'interval': 0, 'datacode': 'h'}) + + # This line will stop the default values printing to logfile at start-up + # unless they have been overwritten by emonhub.conf entries + # comment out if diagnosing a startup value issue + self._settings.update(self._defaults) + + # Jee specific settings to be picked up as changes not defaults to initialise "Jee" device + self._jee_settings = ({'baseid': '15', 'frequency': '433', 'group': '210', 'quiet': 'True'}) + self._jee_prefix = ({'baseid': 'i', 'frequency': '', 'group': 'g', 'quiet': 'q'}) + + # Pre-load Jee settings only if info string available for checks + if all(i in self.info[1] for i in (" i", " g", " @ ", " MHz")): + self._settings.update(self._jee_settings) + + def read(self): + """Read data from serial port and process if complete line received. + + Return data as a list: [NodeID, val1, val2] + + """ + + # Read serial RX + self._rx_buf = self._rx_buf + self._ser.readline() + + # If line incomplete, exit + if '\r\n' not in self._rx_buf: + return + + # Remove CR,LF + f = self._rx_buf[:-2] + + # Reset buffer + self._rx_buf = '' + + # Discard empty frames + if not f: + self._log.warning("Discarded empty frame") + return + + # Discard information messages + if (f[0] == '>'): + self._log.debug(self.name + " acknowledged command: " + str(f)) + return + + if (len(f)>2 and f[0:3] == ' ->'): + self._log.debug(self.name + " confirmed sent packet size: " + str(f)) + return + + if f[0] == '\x01': + #self._log.debug("Ignoring frame consisting of SOH character" + str(f)) + return + + if " i" and " g" and " @ " and " MHz" in f: + self.info[1] = f + self._log.debug( self.name + " device settings updated: " + str(self.info[1])) + return + + # unix timestamp + t = round(time.time(), 2) + + # Process data frame + return self._process_frame(f, t) + + def _validate_frame(self, ref, received): + """Validate a frame of data + + This function performs logical tests to filter unsuitable data. + Each test discards frame with a log entry if False + + Returns True if data frame passes tests. + + """ + + if received[0] == '?'and str(received[-1])[0]=='(' and str(received[-1])[-1]==')': + self._log.info(str(ref) + " Discard RX frame 'unreliable content' : RSSI " + str(received[-1])) + return False + + # Strip 'OK' from frame if needed + if received[0]=='OK': + received = received[1:] + + # extract RSSI if packet is from RFM69 type Jee Device + if str(received[-1])[0]=='(' and str(received[-1])[-1]==')': + self.rssi = int(received[-1][1:-1]) + received = received[:-1] + return received + else: + # set RSSI false for standard frames so RSSI is not re-appended later + self.rssi = False + + # include checks from parent + if not super(EmonHubJeeInterfacer, self)._validate_frame(ref, received): + return False + + return received + + def set(self, **kwargs): + """Send configuration parameters to the "Jee" type device through COM port + + **kwargs (dict): settings to be modified. Available settings are + 'baseid', 'frequency', 'group'. Example: + {'baseid': '15', 'frequency': '4', 'group': '210'} + + """ + + for key, setting in self._jee_settings.iteritems(): + # Decide which setting value to use + if key in kwargs.keys(): + setting = kwargs[key] + else: + setting = self._jee_settings[key] + # convert bools to ints + if str.capitalize(str(setting)) in ['True', 'False']: + setting = int(setting == "True") + # confirmation string always contains baseid, group anf freq + if " i" and " g" and " @ " and " MHz" in self.info[1]: + # If setting confirmed as already set, continue without changing + if (self._jee_prefix[key] + str(setting)) in self.info[1].split(): + continue + elif key in self._settings and self._settings[key] == setting: + continue + if key == 'baseid' and int(setting) >=1 and int(setting) <=26: + command = setting + 'i' + elif key == 'frequency' and setting in ['433','868','915']: + command = setting[:1] + 'b' + elif key == 'group'and int(setting) >=0 and int(setting) <=212: + command = setting + 'g' + elif key == 'quiet' and int(setting) >=0 and int(setting) <2: + command = str(setting) + 'q' + else: + self._log.warning("'%s' is not a valid setting for %s: %s" % (str(setting), self.name, key)) + continue + self._settings[key] = setting + self._log.info("Setting " + self.name + " %s: %s" % (key, setting) + " (" + command + ")") + self._ser.write(command) + # Wait a sec between two settings + time.sleep(1) + + # include kwargs from parent + super(EmonHubJeeInterfacer, self).set(**kwargs) + + def run(self): + """Actions that need to be done on a regular basis. + + This should be called in main loop by instantiater. + + """ + + now = time.time() + + # Broadcast time to synchronize emonGLCD + interval = int(self._settings['interval']) + if interval: # A value of 0 means don't do anything + if now - self._interval_timestamp > interval: + self._send_time() + self._interval_timestamp = now + + def _send_time(self): + """Send time over radio link to synchronize emonGLCD + + The radio module can be used to broadcast time, which is useful + to synchronize emonGLCD in particular. + Beware, this is know to garble the serial link on RFM2Piv1 + sendtimeinterval defines the interval in seconds between two time + broadcasts. 0 means never. + + """ + + now = datetime.datetime.now() + + self._log.debug(self.name + " broadcasting time: %02d:%02d" % (now.hour, now.minute)) + + self._ser.write("00,%02d,%02d,00,s" % (now.hour, now.minute)) + +"""class EmonHubSocketInterfacer + +Monitors a socket for data, typically from ethernet link + +""" + + +class EmonHubSocketInterfacer(EmonHubInterfacer): + + def __init__(self, name, port_nb=50011): + """Initialize Interfacer + + port_nb (string): port number on which to open the socket + + """ + + # Initialization + super(EmonHubSocketInterfacer, self).__init__(name) + + # Open socket + self._socket = self._open_socket(port_nb) + + # Initialize RX buffer for socket + self._sock_rx_buf = '' + + def close(self): + """Close socket.""" + + # Close socket + if self._socket is not None: + self._log.debug('Closing socket') + self._socket.close() + + def read(self): + """Read data from socket and process if complete line received. + + Return data as a list: [NodeID, val1, val2] + + """ + + # Check if data received + ready_to_read, ready_to_write, in_error = \ + select.select([self._socket], [], [], 0) + + # If data received, add it to socket RX buffer + if self._socket in ready_to_read: + + # Accept connection + conn, addr = self._socket.accept() + + # Read data + self._sock_rx_buf = self._sock_rx_buf + conn.recv(1024) + + # Close connection + conn.close() + + # If there is at least one complete frame in the buffer + if '\r\n' in self._sock_rx_buf: + # Process and return first frame in buffer: + f, self._sock_rx_buf = self._sock_rx_buf.split('\r\n', 1) + if str(self._settings['timestamped']).lower() == "true": + f = f.split(" ") + t = float(f[0]) + f = ' '.join(map(str, f[1:])) + return self._process_frame(f, t) + else: + return self._process_frame(f) + +"""class EmonHubInterfacerInitError + +Raise this when init fails. + +""" + + +class EmonHubInterfacerInitError(Exception): + pass diff --git a/emonhub_reporter.py b/emonhub_reporter.py new file mode 100755 index 0000000..d087aa7 --- /dev/null +++ b/emonhub_reporter.py @@ -0,0 +1,482 @@ +""" + + This code is released under the GNU Affero General Public License. + + OpenEnergyMonitor project: + http://openenergymonitor.org + +""" + +import urllib2 +import httplib +import time +import logging +import json +import threading +import Queue +import ssl +import base64 + +import emonhub_buffer as ehb + +"""class EmonHubReporter + +Stores server parameters and buffers the data between two HTTP requests + +This class is meant to be inherited by subclasses specific to their +destination server. + +""" + + +class EmonHubReporter(threading.Thread): + + def __init__(self, reporterName, queue, buffer_type="memory", buffer_size=1000, **kwargs): + """Create a server data buffer initialized with server settings.""" + + # Initialize logger + self._log = logging.getLogger("EmonHub") + + # Initialise thread + threading.Thread.__init__(self) + + # Initialise settings + self.name = reporterName + self.init_settings = {} + self._defaults = {'pause': 'off', 'interval': '0', 'batchsize': '1'} + self._settings = {} + self._queue = queue + + # This line will stop the default values printing to logfile at start-up + # unless they have been overwritten by emonhub.conf entries + # comment out if diagnosing a startup value issue + self._settings.update(self._defaults) + + # Initialize interval timer's "started at" timestamp + self._interval_timestamp = 0 + + # Create underlying buffer implementation + self.buffer = ehb.getBuffer(buffer_type)(reporterName, buffer_size, **kwargs) + + # set an absolute upper limit for number of items to process per post + # number of items posted is the lower of this item limit, buffer_size, or the + # batchsize, as set in reporter settings or by the default value. + self._item_limit = buffer_size + + self._log.info("Set up reporter '%s' (buffer: %s | size: %s)" + % (reporterName, buffer_type, buffer_size)) + + # Init ctx + self._ctx = ssl.create_default_context() + self._ctx.check_hostname = False + self._ctx.verify_mode = ssl.CERT_NONE + # Initialise a thread and start the reporter + self.stop = False + self.start() + + def set(self, **kwargs): + """Update settings. + + **kwargs (dict): runtime settings to be modified. + + url (string): eg: 'http://localhost/emoncms' or 'http://emoncms.org' (trailing slash optional) + apikey (string): API key with write access + pause (string): pause status + 'pause' = all pause fully, nothing posted to buffer or sent (buffer retained) + 'pause' = in pauses the input only, no add to buffer but flush still functional + 'pause' = out pauses output only, no flush but data can accumulate in buffer + 'pause' = off pause is off and reporter is fully operational + + """ + + for key, setting in self._defaults.iteritems(): + if key in kwargs.keys(): + setting = kwargs[key] + else: + setting = self._defaults[key] + if key in self._settings and self._settings[key] == setting: + continue + elif key == 'pause' and str(setting).lower() in ['all', 'in', 'out', 'off']: + pass + elif key in ['interval', 'batchsize'] and setting.isdigit(): + pass + else: + self._log.warning("'%s' is not a valid setting for %s: %s" % (setting, self.name, key)) + self._settings[key] = setting + self._log.debug("Setting " + self.name + " " + key + ": " + str(setting)) + + for key, setting in self._defaults.iteritems(): + valid = False + if key in kwargs.keys(): + setting = kwargs[key] + else: + setting = self._defaults[key] + if key in self._settings and self._settings[key] == setting: + continue + elif key == 'pause': + if str(setting).lower() in ['all', 'in', 'out', 'off']: + valid = True + elif key == 'interval' or 'batchsize': + if setting.isdigit(): + valid = True + else: + continue + if valid: + self._settings[key] = setting + self._log.debug("Setting " + self.name + " " + key + ": " + str(setting)) + else: + self._log.warning("'%s' is not a valid setting for %s: %s" % (setting, self.name, key)) + + + def add(self, data): + """Append data to buffer. + + data (list): node and values (eg: '[node,val1,val2,...]') + + """ + + self._log.debug(str(data[-1]) + " Append to '" + self.name + + "' buffer => time: " + str(data[0]) + + ", data: " + str(data[1:-1]) + # TODO "ref" temporarily left on end of data string for info + + ", ref: " + str(data[-1])) + # TODO "ref" removed from end of data string here so not sent to emoncms + data = data[:-1] + + # databuffer is of format: + # [[timestamp, nodeid, datavalues][timestamp, nodeid, datavalues]] + # [[1399980731, 10, 150, 3450 ...]] + self.buffer.storeItem(data) + + def run(self): + """ + Run the reporter thread. + Any regularly performed tasks actioned here along with flushing the buffer + + """ + while not self.stop: + # If there are frames in the queue + while not self._queue.empty(): + # Add each frame to the buffer + frame = self._queue.get() + self.add(frame) + # Don't loop to fast + time.sleep(0.1) + # Action reporter tasks + self.action() + + def action(self): + """ + + :return: + """ + + # pause output if 'pause' set to 'all' or 'out' + if 'pause' in self._settings \ + and str(self._settings['pause']).lower() in ['all', 'out']: + return + + # If an interval is set, check if that time has passed since last post + if int(self._settings['interval']) \ + and time.time() - self._interval_timestamp < int(self._settings['interval']): + return + else: + # Then attempt to flush the buffer + self.flush() + + def flush(self): + """Send oldest data in buffer, if any.""" + + # Buffer management + # If data buffer not empty, send a set of values + if self.buffer.hasItems(): + max_items = int(self._settings['batchsize']) + if max_items > self._item_limit: + max_items = self._item_limit + elif max_items <= 0: + return + + databuffer = self.buffer.retrieveItems(max_items) + retrievedlength = len(databuffer) + if self._process_post(databuffer): + # In case of success, delete sample set from buffer + self.buffer.discardLastRetrievedItems(retrievedlength) + # log the time of last succesful post + self._interval_timestamp = time.time() + + def _process_post(self, data): + """ + To be implemented in subclass. + + :return: True if data posted successfully and can be discarded + """ + pass + + def _send_post(self, post_url, post_body=None, post_header=None): + """ + + :param post_url: + :param post_body: + :param post_header: + :return: the received reply if request is successful + """ + """Send data to server. + + data (list): node and values (eg: '[node,val1,val2,...]') + time (int): timestamp, time when sample was recorded + + return True if data sent correctly + + """ + + reply = "" + request = urllib2.Request(post_url, post_body) + if post_header is not None: + base64string = base64.encodestring('simon:bajsa').replace('\n', '') + request.add_header("Authorization", "Basic %s" % base64string) + + try: + response = urllib2.urlopen(request, timeout=60, context=self._ctx) + except urllib2.HTTPError as e: + self._log.warning(self.name + " couldn't send to server, HTTPError: " + + str(e.code)) + except urllib2.URLError as e: + self._log.warning(self.name + " couldn't send to server, URLError: " + + str(e.reason)) + except httplib.HTTPException: + self._log.warning(self.name + " couldn't send to server, HTTPException") + except Exception: + import traceback + self._log.warning(self.name + " couldn't send to server, Exception: " + + traceback.format_exc()) + else: + reply = response.read() + finally: + return reply + +"""class EmonHubDomoticzReporter + +Stores server parameters and buffers the data between two HTTP requests + +""" + + +class EmonHubDomoticzReporter(EmonHubReporter): + + def __init__(self, reporterName, queue, **kwargs): + """Initialize reporter + + """ + + # Initialization + super(EmonHubDomoticzReporter, self).__init__(reporterName, queue, **kwargs) + + # add or alter any default settings for this reporter + self._defaults.update({'batchsize': 100}) + self._cms_settings = {'url': 'http://emoncms.org'} + + # This line will stop the default values printing to logfile at start-up + self._settings.update(self._defaults) + + # set an absolute upper limit for number of items to process per post + self._item_limit = 250 + + def set(self, **kwargs): + """ + + :param kwargs: + :return: + """ + + super (EmonHubDomoticzReporter, self).set(**kwargs) + + self._log.info(self.name + " Set: ") + for key, setting in self._cms_settings.iteritems(): + #valid = False + if not key in kwargs.keys(): + setting = self._cms_settings[key] + else: + setting = kwargs[key] + + if key == 'url' and setting[:4] == "http": + self._log.info("Setting " + self.name + " url: " + setting) + self._settings[key] = setting + continue + else: + self._log.warning("'%s' is not valid for %s: %s" % (setting, self.name, key)) + + + def _process_post(self, databuffer): + """Send data to server.""" + + # databuffer is of format: + # [[timestamp, nodeid, datavalues][timestamp, nodeid, datavalues]] + # [[1399980731, 10, 150, 250 ...]] + + self._log.debug(self.name + " databuffer %s" %databuffer) + status = False + post_body = None + post_header = "%s:%s" %('simon', 'bajsa') + + + for data in databuffer: + data = data[2:] + #valid = False + #[ext_temp, temp, hum, batteri, pulse, rssi] + self._log.debug(self.name + "data %s" %data) + ext_temp = data[0] * 0.1 + temp = data[1] * 0.1 + hum = data[2] * 0.1 + batteri = data[3] + pulse = data[4] + rssi = abs(data[5]) + + domoAddParam = '&battery=%s' %(batteri) + if temp < 84: + domoTempHum = '/json.htm?type=command¶m=udevice&idx=56&nvalue=0&svalue=%s;%s;%s' %(temp,hum,0) + post_url_tempHum = self._settings['url'] + domoTempHum + domoAddParam + self._log.info(self.name + " sending tempHum: " + post_url_tempHum) + reply = self._send_post(post_url_tempHum, post_body, post_header) + if reply == 'ok': + self._log.debug(self.name + " acknowledged receipt with '" + reply + "' from " + self._settings['url']) + + if ext_temp < 84: + domoTemp = '/json.htm?type=command¶m=udevice&idx=55&nvalue=0&svalue=%s' %(ext_temp) + post_url_temp = self._settings['url'] + domoTemp + domoAddParam + self._log.info(self.name + " sending: " + post_url_temp) + reply = self._send_post(post_url_temp, post_body, post_header) + + domoPulse = '/json.htm?type=command¶m=udevice&idx=54&svalue=%s' %(pulse) + post_url_pulse = self._settings['url'] + domoPulse + domoAddParam + self._log.info(self.name + " sending: " + post_url_pulse) + + + # The Develop branch of emoncms allows for the sending of the apikey in the post + # body, this should be moved from the url to the body as soon as this is widely + # adopted + + reply = self._send_post(post_url_pulse, post_body, post_header) + + status = True + + self._log.debug(self.name + " Next return" ) + if status: + return True + self._log.debug(self.name + " After return" ) + +"""class EmonHubEmoncmsReporter + +Stores server parameters and buffers the data between two HTTP requests + +""" + + +class EmonHubEmoncmsReporter(EmonHubReporter): + + def __init__(self, reporterName, queue, **kwargs): + """Initialize reporter + + """ + + # Initialization + super(EmonHubEmoncmsReporter, self).__init__(reporterName, queue, **kwargs) + + # add or alter any default settings for this reporter + self._defaults.update({'batchsize': 100}) + self._cms_settings = {'apikey': "", 'url': 'http://emoncms.org'} + + # This line will stop the default values printing to logfile at start-up + self._settings.update(self._defaults) + + # set an absolute upper limit for number of items to process per post + self._item_limit = 250 + + def set(self, **kwargs): + """ + + :param kwargs: + :return: + """ + + super (EmonHubEmoncmsReporter, self).set(**kwargs) + + for key, setting in self._cms_settings.iteritems(): + #valid = False + if not key in kwargs.keys(): + setting = self._cms_settings[key] + else: + setting = kwargs[key] + if key in self._settings and self._settings[key] == setting: + continue + elif key == 'apikey': + if str.lower(setting[:4]) == 'xxxx': + self._log.warning("Setting " + self.name + " apikey: obscured") + pass + elif str.__len__(setting) == 32 : + self._log.info("Setting " + self.name + " apikey: set") + pass + elif setting == "": + self._log.info("Setting " + self.name + " apikey: null") + pass + else: + self._log.warning("Setting " + self.name + " apikey: invalid format") + continue + self._settings[key] = setting + # Next line will log apikey if uncommented (privacy ?) + #self._log.debug(self.name + " apikey: " + str(setting)) + continue + elif key == 'url' and setting[:4] == "http": + self._log.info("Setting " + self.name + " url: " + setting) + self._settings[key] = setting + continue + else: + self._log.warning("'%s' is not valid for %s: %s" % (setting, self.name, key)) + + def _process_post(self, databuffer): + """Send data to server.""" + + # databuffer is of format: + # [[timestamp, nodeid, datavalues][timestamp, nodeid, datavalues]] + # [[1399980731, 10, 150, 250 ...]] + + if not 'apikey' in self._settings.keys() or str.__len__(self._settings['apikey']) != 32 \ + or str.lower(self._settings['apikey']) == 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx': + return + + data_string = json.dumps(databuffer, separators=(',', ':')) + + # Prepare URL string of the form + # http://domain.tld/emoncms/input/bulk.json?apikey=12345 + # &data=[[0,10,82,23],[5,10,82,23],[10,10,82,23]] + # &time=0 + + # Construct post_url (without apikey) + post_url = self._settings['url']+'/input/bulk'+'.json?time=0&apikey=' + post_body = "data="+data_string + + # logged before apikey added for security + self._log.info(self.name + " sending: " + post_url + "E-M-O-N-C-M-S-A-P-I-K-E-Y&" + post_body) + + # Add apikey to post_url + post_url = post_url + self._settings['apikey'] + + # The Develop branch of emoncms allows for the sending of the apikey in the post + # body, this should be moved from the url to the body as soon as this is widely + # adopted + + reply = self._send_post(post_url, post_body) + if reply == 'ok': + self._log.debug(self.name + " acknowledged receipt with '" + reply + "' from " + self._settings['url']) + return True + else: + self._log.warning(self.name + " send failure: wanted 'ok' but got '" +reply+ "'") + +"""class EmonHubReporterInitError + +Raise this when init fails. + +""" + + +class EmonHubReporterInitError(Exception): + pass diff --git a/emonhub_setup.py b/emonhub_setup.py new file mode 100755 index 0000000..f8d9d9c --- /dev/null +++ b/emonhub_setup.py @@ -0,0 +1,163 @@ +""" + + This code is released under the GNU Affero General Public License. + + OpenEnergyMonitor project: + http://openenergymonitor.org + +""" + +import time +import logging +from configobj import ConfigObj + +"""class EmonHubSetup + +User interface to setup the hub. + +The settings attribute stores the settings of the hub. It is a +dictionary with the following keys: + + 'hub': a dictionary containing the hub settings + 'interfacers': a dictionary containing the interfacers + 'reporters': a dictionary containing the reporters + + The hub settings are: + 'loglevel': the logging level + + interfacers and reporters are dictionaries with the following keys: + 'Type': class name + 'init_settings': dictionary with initialization settings + 'runtimesettings': dictionary with runtime settings + Initialization and runtime settings depend on the interfacer and + reporter type. + +The run() method is supposed to be run regularly by the instantiater, to +perform regular communication tasks. + +The check_settings() method is run regularly as well. It checks the settings +and returns True is settings were changed. + +This almost empty class is meant to be inherited by subclasses specific to +each setup. + +""" + + +class EmonHubSetup(object): + + def __init__(self): + + # Initialize logger + self._log = logging.getLogger("EmonHub") + + # Initialize settings + self.settings = None + + def run(self): + """Run in background. + + To be implemented in child class. + + """ + pass + + def check_settings(self): + """Check settings + + Update attribute settings and return True if modified. + + To be implemented in child class. + + """ + + +class EmonHubFileSetup(EmonHubSetup): + + def __init__(self, filename): + + # Initialization + super(EmonHubFileSetup, self).__init__() + + # Initialize update timestamp + self._settings_update_timestamp = 0 + self._retry_time_interval = 5 + + # create a timeout message if time out is set (>0) + if self._retry_time_interval > 0: + self.retry_msg = " Retry in " + str(self._retry_time_interval) + " seconds" + else: + self.retry_msg = "" + + # Initialize attribute settings as a ConfigObj instance + try: + self.settings = ConfigObj(filename, file_error=True) + # Check the settings file sections + self.settings['hub'] + self.settings['interfacers'] + self.settings['reporters'] + except IOError as e: + raise EmonHubSetupInitError(e) + except SyntaxError as e: + raise EmonHubSetupInitError( + 'Error parsing config file \"%s\": ' % filename + str(e)) + except KeyError as e: + raise EmonHubSetupInitError( + 'Configuration file error - section: ' + str(e)) + + def check_settings(self): + """Check settings + + Update attribute settings and return True if modified. + + """ + + # Check settings only once per second + now = time.time() + if now - self._settings_update_timestamp < 0: + return + # Update timestamp + self._settings_update_timestamp = now + + # Backup settings + settings = dict(self.settings) + + # Get settings from file + try: + self.settings.reload() + except IOError as e: + self._log.warning('Could not get settings: ' + str(e) + self.retry_msg) + self._settings_update_timestamp = now + self._retry_time_interval + return + except SyntaxError as e: + self._log.warning('Could not get settings: ' + + 'Error parsing config file: ' + str(e) + self.retry_msg) + self._settings_update_timestamp = now + self._retry_time_interval + return + except Exception: + import traceback + self._log.warning("Couldn't get settings, Exception: " + + traceback.format_exc() + self.retry_msg) + self._settings_update_timestamp = now + self._retry_time_interval + return + + if self.settings != settings: + # Check the settings file sections + try: + self.settings['hub'] + self.settings['interfacers'] + self.settings['reporters'] + except KeyError as e: + self._log.warning("Configuration file missing section: " + str(e)) + else: + return True + +"""class EmonHubSetupInitError + +Raise this when init fails. + +""" + + +class EmonHubSetupInitError(Exception): + pass