emonhub/emonhub.py

343 lines
13 KiB
Python
Executable File

#!/usr/bin/env python
"""
This code is released under the GNU Affero General Public License.
OpenEnergyMonitor project:
http://openenergymonitor.org
"""
import sys
import time
import logging
import logging.handlers
import signal
import argparse
import pprint
import Queue
import emonhub_setup as ehs
import emonhub_reporter as ehr
import emonhub_interfacer as ehi
import emonhub_coder as ehc
"""class EmonHub
Monitors data inputs through EmonHubInterfacer instances, and sends data to
target servers through EmonHubEmoncmsReporter instances.
Communicates with the user through an EmonHubSetup
"""
class EmonHub(object):
__version__ = 'Pre-Release Development Version (rc1.2)'
def __init__(self, setup):
"""Setup an OpenEnergyMonitor emonHub.
Interface (EmonHubSetup): User interface to the hub.
"""
# Initialize exit request flag
self._exit = False
# Initialize setup and get settings
self._setup = setup
settings = self._setup.settings
# Initialize logging
self._log = logging.getLogger("EmonHub")
self._set_logging_level('INFO', False)
self._log.info("EmonHub %s" % self.__version__)
self._log.info("Opening hub...")
# Initialize Reporters and Interfacers
self._reporters = {}
self._interfacers = {}
self._queue = {}
self._update_settings(settings)
def run(self):
"""Launch the hub.
Monitor the COM port and process data.
Check settings on a regular basis.
"""
# Set signal handler to catch SIGINT and shutdown gracefully
signal.signal(signal.SIGINT, self._sigint_handler)
# Until asked to stop
while not self._exit:
# Run setup and update settings if modified
self._setup.run()
if self._setup.check_settings():
self._update_settings(self._setup.settings)
# For all Interfacers
for I in self._interfacers.itervalues():
# Execute run method
I.run()
# Read socket
values = I.read()
# If complete and valid data was received
if values is not None:
# Place a copy of the values in a queue for each reporter
for name in self._reporters:
# discard if reporter 'pause' set to 'all' or 'in'
if 'pause' in self._reporters[name]._settings \
and str(self._reporters[name]._settings['pause']).lower() in \
['all', 'in']:
continue
self._log.debug("first")
self._queue[name].put(values)
self._log.debug("after")
# Sleep until next iteration
time.sleep(0.2)
def close(self):
"""Close hub. Do some cleanup before leaving."""
self._log.info("Exiting hub...")
for I in self._interfacers.itervalues():
I.close()
for R in self._reporters.itervalues():
R.stop = True
R.join()
self._log.info("Exit completed")
logging.shutdown()
def _sigint_handler(self, signal, frame):
"""Catch SIGINT (Ctrl+C)."""
self._log.debug("SIGINT received.")
# hub should exit at the end of current iteration.
self._exit = True
def _update_settings(self, settings):
"""Check settings and update if needed."""
# EmonHub Logging level
if 'loglevel' in settings['hub']:
self._set_logging_level(settings['hub']['loglevel'])
else:
self._set_logging_level()
# Create a place to hold buffer contents whilst a deletion & rebuild occurs
self.temp_buffer = {}
# Reporters
for name in self._reporters.keys():
# Delete reporters if not listed or have no 'Type' in the settings without further checks
# (This also provides an ability to delete & rebuild by commenting 'Type' in conf)
if not name in settings['reporters'] or not 'Type' in settings['reporters'][name]:
pass
else:
try:
# test for 'init_settings' and 'runtime_setting' sections
settings['reporters'][name]['init_settings']
settings['reporters'][name]['runtimesettings']
except Exception as e:
# If reporter's settings are incomplete, continue without updating
self._log.error("Unable to update '" + name + "' configuration: " + str(e))
continue
else:
# check init_settings against the file copy, if they are the same move on to the next
if self._reporters[name].init_settings == settings['reporters'][name]['init_settings']:
continue
else:
if self._reporters[name].buffer._data_buffer:
self.temp_buffer[name]= self._reporters[name].buffer._data_buffer
# Delete reporters if setting changed or name is unlisted or Type is missing
self._log.info("Deleting reporter '%s'", name)
self._reporters[name].stop = True
del(self._reporters[name])
for name, R in settings['reporters'].iteritems():
# If reporter does not exist, create it
if name not in self._reporters:
try:
if not 'Type' in R:
continue
self._log.info("Creating " + R['Type'] + " '%s' ", name)
# Create the queue for this reporter
self._queue[name] = Queue.Queue(0)
# This gets the class from the 'Type' string
reporter = getattr(ehr, R['Type'])(name, self._queue[name], **R['init_settings'])
reporter.set(**R['runtimesettings'])
reporter.init_settings = R['init_settings']
# If a memory buffer back-up exists copy it over and remove the back-up
if name in self.temp_buffer:
reporter.buffer._data_buffer = self.temp_buffer[name]
del self.temp_buffer[name]
except ehr.EmonHubReporterInitError as e:
# If reporter can't be created, log error and skip to next
self._log.error("Failed to create '" + name + "' reporter: " + str(e))
continue
except Exception as e:
# If reporter can't be created, log error and skip to next
self._log.error("Unable to create '" + name + "' reporter: " + str(e))
continue
else:
self._reporters[name] = reporter
else:
# Otherwise just update the runtime settings if possible
if 'runtimesettings' in R:
self._reporters[name].set(**R['runtimesettings'])
# Interfacers
for name in self._interfacers.keys():
# Delete interfacers if not listed or have no 'Type' in the settings without further checks
# (This also provides an ability to delete & rebuild by commenting 'Type' in conf)
if not name in settings['interfacers'] or not 'Type' in settings['interfacers'][name]:
pass
else:
try:
# test for 'init_settings' and 'runtime_setting' sections
settings['interfacers'][name]['init_settings']
settings['interfacers'][name]['runtimesettings']
except Exception as e:
# If interfacer's settings are incomplete, continue without updating
self._log.error("Unable to update '" + name + "' configuration: " + str(e))
continue
else:
# check init_settings against the file copy, if they are the same move on to the next
if self._interfacers[name].init_settings == settings['interfacers'][name]['init_settings']:
continue
self._interfacers[name].close()
self._log.info("Deleting interfacer '%s' ", name)
del(self._interfacers[name])
for name, I in settings['interfacers'].iteritems():
# If interfacer does not exist, create it
if name not in self._interfacers:
try:
if not 'Type' in I:
continue
self._log.info("Creating " + I['Type'] + " '%s' ", name)
# This gets the class from the 'Type' string
interfacer = getattr(ehi, I['Type'])(name, **I['init_settings'])
interfacer.set(**I['runtimesettings'])
interfacer.init_settings = I['init_settings']
except ehi.EmonHubInterfacerInitError as e:
# If interfacer can't be created, log error and skip to next
self._log.error("Failed to create '" + name + "' interfacer: " + str(e))
continue
except Exception as e:
# If interfacer can't be created, log error and skip to next
self._log.error("Unable to create '" + name + "' interfacer: " + str(e))
continue
else:
self._interfacers[name] = interfacer
else:
# Otherwise just update the runtime settings if possible
if 'runtimesettings' in I:
self._interfacers[name].set(**I['runtimesettings'])
if 'nodes' in settings:
ehc.nodelist = settings['nodes']
def _set_logging_level(self, level='WARNING', log=True):
"""Set logging level.
level (string): log level name in
('DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL')
"""
# Ensure "level" is all upper case
level = level.upper()
# Check level argument is valid
try:
loglevel = getattr(logging, level)
except AttributeError:
self._log.error('Logging level %s invalid' % level)
return False
except Exception as e:
self._log.error('Logging level %s ' % str(e))
return False
# Change level if different from current level
if loglevel != self._log.getEffectiveLevel():
self._log.setLevel(level)
if log:
self._log.info('Logging level set to %s' % level)
if __name__ == "__main__":
# Command line arguments parser
parser = argparse.ArgumentParser(description='OpenEnergyMonitor emonHub')
# Configuration file
parser.add_argument("--config-file", action="store",
help='Configuration file', default=sys.path[0]+'/../conf/emonhub.conf')
# Log file
parser.add_argument('--logfile', action='store', type=argparse.FileType('a'),
help='Log file (default: log to Standard error stream STDERR)')
# Show settings
parser.add_argument('--show-settings', action='store_true',
help='show settings and exit (for debugging purposes)')
# Show version
parser.add_argument('--version', action='store_true',
help='display version number and exit')
# Parse arguments
args = parser.parse_args()
# Display version number and exit
if args.version:
print('emonHub %s' % EmonHub.__version__)
sys.exit()
# Logging configuration
logger = logging.getLogger("EmonHub")
if args.logfile is None:
# If no path was specified, everything goes to sys.stderr
loghandler = logging.StreamHandler()
else:
# Otherwise, rotating logging over two 5 MB files
# If logfile is supplied, argparse opens the file in append mode,
# this ensures it is writable
# Close the file for now and get its path
args.logfile.close()
loghandler = logging.handlers.RotatingFileHandler(args.logfile.name,
'a', 5000 * 1024, 1)
# Format log strings
loghandler.setFormatter(logging.Formatter(
'%(asctime)s %(levelname)s %(message)s'))
logger.addHandler(loghandler)
# Initialize hub setup
try:
setup = ehs.EmonHubFileSetup(args.config_file)
except ehs.EmonHubSetupInitError as e:
logger.critical(e)
sys.exit("Unable to load configuration file: " + args.config_file)
# If in "Show settings" mode, print settings and exit
if args.show_settings:
setup.check_settings()
pprint.pprint(setup.settings)
# Otherwise, create, run, and close EmonHub instance
else:
try:
hub = EmonHub(setup)
except Exception as e:
sys.exit("Could not start EmonHub: " + str(e))
else:
hub.run()
# When done, close hub
hub.close()