Init emonhub
This commit is contained in:
commit
605b8c2c4a
|
|
@ -0,0 +1,342 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
"""
|
||||
|
||||
This code is released under the GNU Affero General Public License.
|
||||
|
||||
OpenEnergyMonitor project:
|
||||
http://openenergymonitor.org
|
||||
|
||||
"""
|
||||
|
||||
import sys
|
||||
import time
|
||||
import logging
|
||||
import logging.handlers
|
||||
import signal
|
||||
import argparse
|
||||
import pprint
|
||||
import Queue
|
||||
|
||||
import emonhub_setup as ehs
|
||||
import emonhub_reporter as ehr
|
||||
import emonhub_interfacer as ehi
|
||||
import emonhub_coder as ehc
|
||||
|
||||
"""class EmonHub
|
||||
|
||||
Monitors data inputs through EmonHubInterfacer instances, and sends data to
|
||||
target servers through EmonHubEmoncmsReporter instances.
|
||||
|
||||
Communicates with the user through an EmonHubSetup
|
||||
|
||||
"""
|
||||
|
||||
|
||||
class EmonHub(object):
|
||||
|
||||
__version__ = 'Pre-Release Development Version (rc1.2)'
|
||||
|
||||
def __init__(self, setup):
|
||||
"""Setup an OpenEnergyMonitor emonHub.
|
||||
|
||||
Interface (EmonHubSetup): User interface to the hub.
|
||||
|
||||
"""
|
||||
|
||||
# Initialize exit request flag
|
||||
self._exit = False
|
||||
|
||||
# Initialize setup and get settings
|
||||
self._setup = setup
|
||||
settings = self._setup.settings
|
||||
|
||||
# Initialize logging
|
||||
self._log = logging.getLogger("EmonHub")
|
||||
self._set_logging_level('INFO', False)
|
||||
self._log.info("EmonHub %s" % self.__version__)
|
||||
self._log.info("Opening hub...")
|
||||
|
||||
# Initialize Reporters and Interfacers
|
||||
self._reporters = {}
|
||||
self._interfacers = {}
|
||||
self._queue = {}
|
||||
self._update_settings(settings)
|
||||
|
||||
def run(self):
|
||||
"""Launch the hub.
|
||||
|
||||
Monitor the COM port and process data.
|
||||
Check settings on a regular basis.
|
||||
|
||||
"""
|
||||
|
||||
# Set signal handler to catch SIGINT and shutdown gracefully
|
||||
signal.signal(signal.SIGINT, self._sigint_handler)
|
||||
|
||||
# Until asked to stop
|
||||
while not self._exit:
|
||||
|
||||
# Run setup and update settings if modified
|
||||
self._setup.run()
|
||||
if self._setup.check_settings():
|
||||
self._update_settings(self._setup.settings)
|
||||
|
||||
# For all Interfacers
|
||||
for I in self._interfacers.itervalues():
|
||||
# Execute run method
|
||||
I.run()
|
||||
# Read socket
|
||||
values = I.read()
|
||||
# If complete and valid data was received
|
||||
if values is not None:
|
||||
# Place a copy of the values in a queue for each reporter
|
||||
for name in self._reporters:
|
||||
# discard if reporter 'pause' set to 'all' or 'in'
|
||||
if 'pause' in self._reporters[name]._settings \
|
||||
and str(self._reporters[name]._settings['pause']).lower() in \
|
||||
['all', 'in']:
|
||||
continue
|
||||
self._log.debug("first")
|
||||
self._queue[name].put(values)
|
||||
self._log.debug("after")
|
||||
|
||||
# Sleep until next iteration
|
||||
time.sleep(0.2)
|
||||
|
||||
def close(self):
|
||||
"""Close hub. Do some cleanup before leaving."""
|
||||
|
||||
self._log.info("Exiting hub...")
|
||||
|
||||
for I in self._interfacers.itervalues():
|
||||
I.close()
|
||||
|
||||
for R in self._reporters.itervalues():
|
||||
R.stop = True
|
||||
R.join()
|
||||
|
||||
self._log.info("Exit completed")
|
||||
logging.shutdown()
|
||||
|
||||
def _sigint_handler(self, signal, frame):
|
||||
"""Catch SIGINT (Ctrl+C)."""
|
||||
|
||||
self._log.debug("SIGINT received.")
|
||||
# hub should exit at the end of current iteration.
|
||||
self._exit = True
|
||||
|
||||
def _update_settings(self, settings):
|
||||
"""Check settings and update if needed."""
|
||||
|
||||
# EmonHub Logging level
|
||||
if 'loglevel' in settings['hub']:
|
||||
self._set_logging_level(settings['hub']['loglevel'])
|
||||
else:
|
||||
self._set_logging_level()
|
||||
|
||||
# Create a place to hold buffer contents whilst a deletion & rebuild occurs
|
||||
self.temp_buffer = {}
|
||||
|
||||
# Reporters
|
||||
for name in self._reporters.keys():
|
||||
# Delete reporters if not listed or have no 'Type' in the settings without further checks
|
||||
# (This also provides an ability to delete & rebuild by commenting 'Type' in conf)
|
||||
if not name in settings['reporters'] or not 'Type' in settings['reporters'][name]:
|
||||
pass
|
||||
else:
|
||||
try:
|
||||
# test for 'init_settings' and 'runtime_setting' sections
|
||||
settings['reporters'][name]['init_settings']
|
||||
settings['reporters'][name]['runtimesettings']
|
||||
except Exception as e:
|
||||
# If reporter's settings are incomplete, continue without updating
|
||||
self._log.error("Unable to update '" + name + "' configuration: " + str(e))
|
||||
continue
|
||||
else:
|
||||
# check init_settings against the file copy, if they are the same move on to the next
|
||||
if self._reporters[name].init_settings == settings['reporters'][name]['init_settings']:
|
||||
continue
|
||||
else:
|
||||
if self._reporters[name].buffer._data_buffer:
|
||||
self.temp_buffer[name]= self._reporters[name].buffer._data_buffer
|
||||
# Delete reporters if setting changed or name is unlisted or Type is missing
|
||||
self._log.info("Deleting reporter '%s'", name)
|
||||
self._reporters[name].stop = True
|
||||
del(self._reporters[name])
|
||||
for name, R in settings['reporters'].iteritems():
|
||||
# If reporter does not exist, create it
|
||||
if name not in self._reporters:
|
||||
try:
|
||||
if not 'Type' in R:
|
||||
continue
|
||||
self._log.info("Creating " + R['Type'] + " '%s' ", name)
|
||||
# Create the queue for this reporter
|
||||
self._queue[name] = Queue.Queue(0)
|
||||
# This gets the class from the 'Type' string
|
||||
reporter = getattr(ehr, R['Type'])(name, self._queue[name], **R['init_settings'])
|
||||
reporter.set(**R['runtimesettings'])
|
||||
reporter.init_settings = R['init_settings']
|
||||
# If a memory buffer back-up exists copy it over and remove the back-up
|
||||
if name in self.temp_buffer:
|
||||
reporter.buffer._data_buffer = self.temp_buffer[name]
|
||||
del self.temp_buffer[name]
|
||||
except ehr.EmonHubReporterInitError as e:
|
||||
# If reporter can't be created, log error and skip to next
|
||||
self._log.error("Failed to create '" + name + "' reporter: " + str(e))
|
||||
continue
|
||||
except Exception as e:
|
||||
# If reporter can't be created, log error and skip to next
|
||||
self._log.error("Unable to create '" + name + "' reporter: " + str(e))
|
||||
continue
|
||||
else:
|
||||
self._reporters[name] = reporter
|
||||
else:
|
||||
# Otherwise just update the runtime settings if possible
|
||||
if 'runtimesettings' in R:
|
||||
self._reporters[name].set(**R['runtimesettings'])
|
||||
|
||||
# Interfacers
|
||||
for name in self._interfacers.keys():
|
||||
# Delete interfacers if not listed or have no 'Type' in the settings without further checks
|
||||
# (This also provides an ability to delete & rebuild by commenting 'Type' in conf)
|
||||
if not name in settings['interfacers'] or not 'Type' in settings['interfacers'][name]:
|
||||
pass
|
||||
else:
|
||||
try:
|
||||
# test for 'init_settings' and 'runtime_setting' sections
|
||||
settings['interfacers'][name]['init_settings']
|
||||
settings['interfacers'][name]['runtimesettings']
|
||||
except Exception as e:
|
||||
# If interfacer's settings are incomplete, continue without updating
|
||||
self._log.error("Unable to update '" + name + "' configuration: " + str(e))
|
||||
continue
|
||||
else:
|
||||
# check init_settings against the file copy, if they are the same move on to the next
|
||||
if self._interfacers[name].init_settings == settings['interfacers'][name]['init_settings']:
|
||||
continue
|
||||
self._interfacers[name].close()
|
||||
self._log.info("Deleting interfacer '%s' ", name)
|
||||
del(self._interfacers[name])
|
||||
for name, I in settings['interfacers'].iteritems():
|
||||
# If interfacer does not exist, create it
|
||||
if name not in self._interfacers:
|
||||
try:
|
||||
if not 'Type' in I:
|
||||
continue
|
||||
self._log.info("Creating " + I['Type'] + " '%s' ", name)
|
||||
# This gets the class from the 'Type' string
|
||||
interfacer = getattr(ehi, I['Type'])(name, **I['init_settings'])
|
||||
interfacer.set(**I['runtimesettings'])
|
||||
interfacer.init_settings = I['init_settings']
|
||||
except ehi.EmonHubInterfacerInitError as e:
|
||||
# If interfacer can't be created, log error and skip to next
|
||||
self._log.error("Failed to create '" + name + "' interfacer: " + str(e))
|
||||
continue
|
||||
except Exception as e:
|
||||
# If interfacer can't be created, log error and skip to next
|
||||
self._log.error("Unable to create '" + name + "' interfacer: " + str(e))
|
||||
continue
|
||||
else:
|
||||
self._interfacers[name] = interfacer
|
||||
else:
|
||||
# Otherwise just update the runtime settings if possible
|
||||
if 'runtimesettings' in I:
|
||||
self._interfacers[name].set(**I['runtimesettings'])
|
||||
|
||||
if 'nodes' in settings:
|
||||
ehc.nodelist = settings['nodes']
|
||||
|
||||
def _set_logging_level(self, level='WARNING', log=True):
|
||||
"""Set logging level.
|
||||
|
||||
level (string): log level name in
|
||||
('DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL')
|
||||
|
||||
"""
|
||||
|
||||
# Ensure "level" is all upper case
|
||||
level = level.upper()
|
||||
|
||||
# Check level argument is valid
|
||||
try:
|
||||
loglevel = getattr(logging, level)
|
||||
except AttributeError:
|
||||
self._log.error('Logging level %s invalid' % level)
|
||||
return False
|
||||
except Exception as e:
|
||||
self._log.error('Logging level %s ' % str(e))
|
||||
return False
|
||||
|
||||
# Change level if different from current level
|
||||
if loglevel != self._log.getEffectiveLevel():
|
||||
self._log.setLevel(level)
|
||||
if log:
|
||||
self._log.info('Logging level set to %s' % level)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
# Command line arguments parser
|
||||
parser = argparse.ArgumentParser(description='OpenEnergyMonitor emonHub')
|
||||
|
||||
# Configuration file
|
||||
parser.add_argument("--config-file", action="store",
|
||||
help='Configuration file', default=sys.path[0]+'/../conf/emonhub.conf')
|
||||
# Log file
|
||||
parser.add_argument('--logfile', action='store', type=argparse.FileType('a'),
|
||||
help='Log file (default: log to Standard error stream STDERR)')
|
||||
# Show settings
|
||||
parser.add_argument('--show-settings', action='store_true',
|
||||
help='show settings and exit (for debugging purposes)')
|
||||
# Show version
|
||||
parser.add_argument('--version', action='store_true',
|
||||
help='display version number and exit')
|
||||
# Parse arguments
|
||||
args = parser.parse_args()
|
||||
|
||||
# Display version number and exit
|
||||
if args.version:
|
||||
print('emonHub %s' % EmonHub.__version__)
|
||||
sys.exit()
|
||||
|
||||
# Logging configuration
|
||||
logger = logging.getLogger("EmonHub")
|
||||
if args.logfile is None:
|
||||
# If no path was specified, everything goes to sys.stderr
|
||||
loghandler = logging.StreamHandler()
|
||||
else:
|
||||
# Otherwise, rotating logging over two 5 MB files
|
||||
# If logfile is supplied, argparse opens the file in append mode,
|
||||
# this ensures it is writable
|
||||
# Close the file for now and get its path
|
||||
args.logfile.close()
|
||||
loghandler = logging.handlers.RotatingFileHandler(args.logfile.name,
|
||||
'a', 5000 * 1024, 1)
|
||||
# Format log strings
|
||||
loghandler.setFormatter(logging.Formatter(
|
||||
'%(asctime)s %(levelname)s %(message)s'))
|
||||
logger.addHandler(loghandler)
|
||||
|
||||
# Initialize hub setup
|
||||
try:
|
||||
setup = ehs.EmonHubFileSetup(args.config_file)
|
||||
except ehs.EmonHubSetupInitError as e:
|
||||
logger.critical(e)
|
||||
sys.exit("Unable to load configuration file: " + args.config_file)
|
||||
|
||||
# If in "Show settings" mode, print settings and exit
|
||||
if args.show_settings:
|
||||
setup.check_settings()
|
||||
pprint.pprint(setup.settings)
|
||||
|
||||
# Otherwise, create, run, and close EmonHub instance
|
||||
else:
|
||||
try:
|
||||
hub = EmonHub(setup)
|
||||
except Exception as e:
|
||||
sys.exit("Could not start EmonHub: " + str(e))
|
||||
else:
|
||||
hub.run()
|
||||
# When done, close hub
|
||||
hub.close()
|
||||
|
|
@ -0,0 +1,114 @@
|
|||
"""
|
||||
|
||||
This code is released under the GNU Affero General Public License.
|
||||
|
||||
OpenEnergyMonitor project:
|
||||
http://openenergymonitor.org
|
||||
|
||||
"""
|
||||
|
||||
import logging
|
||||
|
||||
"""class AbstractBuffer
|
||||
|
||||
Represents the actual buffer being used.
|
||||
"""
|
||||
|
||||
|
||||
class AbstractBuffer():
|
||||
|
||||
def storeItem(self, data):
|
||||
raise NotImplementedError
|
||||
|
||||
def retrieveItems(self, number):
|
||||
raise NotImplementedError
|
||||
|
||||
def retrieveItem(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def discardLastRetrievedItem(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def discardLastRetrievedItems(self, number):
|
||||
raise NotImplementedError
|
||||
|
||||
def hasItems(self):
|
||||
raise NotImplementedError
|
||||
|
||||
"""
|
||||
This implementation of the AbstractBuffer just uses an in-memory data structure.
|
||||
It's basically identical to the previous (inline) buffer.
|
||||
"""
|
||||
|
||||
|
||||
class InMemoryBuffer(AbstractBuffer):
|
||||
|
||||
def __init__(self, bufferName, buffer_size):
|
||||
self._bufferName = str(bufferName)
|
||||
self._buffer_type = "memory"
|
||||
self._maximumEntriesInBuffer = int(buffer_size)
|
||||
self._data_buffer = []
|
||||
self._log = logging.getLogger("EmonHub")
|
||||
|
||||
def hasItems(self):
|
||||
return self.size() > 0
|
||||
|
||||
def isFull(self):
|
||||
return self.size() >= self._maximumEntriesInBuffer
|
||||
|
||||
def getMaxEntrySliceIndex(self):
|
||||
return max(0,
|
||||
self.size() - self._maximumEntriesInBuffer - 1)
|
||||
|
||||
def discardOldestItems(self):
|
||||
self._data_buffer = self._data_buffer[self.getMaxEntrySliceIndex():]
|
||||
|
||||
def discardOldestItemsIfFull(self):
|
||||
if self.isFull():
|
||||
self._log.warning(
|
||||
"In-memory buffer (%s) reached limit of %d items, deleting oldest"
|
||||
% (self._bufferName, self._maximumEntriesInBuffer))
|
||||
self.discardOldestItems()
|
||||
|
||||
def storeItem(self, data):
|
||||
self.discardOldestItemsIfFull()
|
||||
self._data_buffer.append(data)
|
||||
|
||||
def retrieveItem(self):
|
||||
return self._data_buffer[0]
|
||||
|
||||
def retrieveItems(self, number):
|
||||
blen = len(self._data_buffer)
|
||||
if number > blen:
|
||||
number = blen
|
||||
return self._data_buffer[:number]
|
||||
|
||||
def discardLastRetrievedItem(self):
|
||||
del self._data_buffer[0]
|
||||
|
||||
def discardLastRetrievedItems(self, number):
|
||||
blen = len(self._data_buffer)
|
||||
if number > blen:
|
||||
number = blen
|
||||
self._data_buffer = self._data_buffer[number:]
|
||||
|
||||
def size(self):
|
||||
return len(self._data_buffer)
|
||||
|
||||
|
||||
"""
|
||||
The getBuffer function returns the buffer class corresponding to a
|
||||
buffering method passed as argument.
|
||||
"""
|
||||
bufferMethodMap = {
|
||||
'memory': InMemoryBuffer
|
||||
}
|
||||
|
||||
|
||||
def getBuffer(method):
|
||||
"""Returns the buffer class corresponding to the method
|
||||
|
||||
method (string): buffering method
|
||||
|
||||
"""
|
||||
return bufferMethodMap[method]
|
||||
|
|
@ -0,0 +1,32 @@
|
|||
import struct
|
||||
|
||||
# Initialize nodes data
|
||||
nodelist = {}
|
||||
|
||||
|
||||
def check_datacode(datacode):
|
||||
|
||||
# Data types & sizes (number of bytes)
|
||||
datacodes = {'b': '1', 'h': '2', 'i': '4', 'l': '4', 'q': '8', 'f': '4', 'd': '8',
|
||||
'B': '1', 'H': '2', 'I': '4', 'L': '4', 'Q': '8', 'c': '1', '?': '1'}
|
||||
|
||||
# if datacode is valid return the data size in bytes
|
||||
if datacode in datacodes:
|
||||
return int(datacodes[datacode])
|
||||
# if not valid return False
|
||||
else:
|
||||
return False
|
||||
|
||||
|
||||
def decode(datacode, frame):
|
||||
# Ensure little-endian & standard sizes used
|
||||
e = '<'
|
||||
|
||||
# set the base data type to bytes
|
||||
b = 'B'
|
||||
|
||||
# get data size from data code
|
||||
s = int(check_datacode(datacode))
|
||||
|
||||
result = struct.unpack(e + datacode[0], struct.pack(e + b*s, *frame))
|
||||
return result[0]
|
||||
|
|
@ -0,0 +1,698 @@
|
|||
"""
|
||||
|
||||
This code is released under the GNU Affero General Public License.
|
||||
|
||||
OpenEnergyMonitor project:
|
||||
http://openenergymonitor.org
|
||||
|
||||
"""
|
||||
|
||||
import serial
|
||||
import time
|
||||
import datetime
|
||||
import logging
|
||||
import socket
|
||||
import select
|
||||
|
||||
import emonhub_coder as ehc
|
||||
|
||||
"""class EmonHubInterfacer
|
||||
|
||||
Monitors a data source.
|
||||
|
||||
This almost empty class is meant to be inherited by subclasses specific to
|
||||
their data source.
|
||||
|
||||
"""
|
||||
|
||||
|
||||
class EmonHubInterfacer(object):
|
||||
|
||||
def __init__(self, name):
|
||||
|
||||
# Initialize logger
|
||||
self._log = logging.getLogger("EmonHub")
|
||||
|
||||
# Initialise settings
|
||||
self.name = name
|
||||
self.init_settings = {}
|
||||
self._defaults = {'pause': 'off', 'interval': 0, 'datacode': '0', 'timestamped': 'False'}
|
||||
self._settings = {}
|
||||
self._packet_counter = 0
|
||||
|
||||
# This line will stop the default values printing to logfile at start-up
|
||||
# unless they have been overwritten by emonhub.conf entries
|
||||
# comment out if diagnosing a startup value issue
|
||||
self._settings.update(self._defaults)
|
||||
|
||||
# Initialize interval timer's "started at" timestamp
|
||||
self._interval_timestamp = 0
|
||||
|
||||
def close(self):
|
||||
"""Close socket."""
|
||||
pass
|
||||
|
||||
def read(self):
|
||||
"""Read data from socket and process if complete line received.
|
||||
|
||||
Return data as a list: [NodeID, val1, val2]
|
||||
|
||||
"""
|
||||
pass
|
||||
|
||||
def _process_frame(self, frame, timestamp=0.0):
|
||||
"""Process a frame of data
|
||||
|
||||
f (string): 'NodeID val1 val2 ...'
|
||||
|
||||
This function splits the string into numbers and check its validity.
|
||||
|
||||
'NodeID val1 val2 ...' is the generic data format. If the source uses
|
||||
a different format, override this method.
|
||||
|
||||
Return data as a list: [NodeID, val1, val2]
|
||||
|
||||
"""
|
||||
|
||||
# Discard the frame if 'pause' set to 'all' or 'in'
|
||||
if 'pause' in self._settings and \
|
||||
str.lower(self._settings['pause']) in ['all', 'in']:
|
||||
return
|
||||
|
||||
# Add timestamp if not done already
|
||||
|
||||
if not timestamp:
|
||||
timestamp = round(time.time(), 2)
|
||||
|
||||
# Assign a "Packet" reference number
|
||||
self._packet_counter +=1
|
||||
ref = self._packet_counter
|
||||
|
||||
# Log data
|
||||
self._log.debug(str(ref) + " NEW FRAME : " + str(timestamp) + " " + frame)
|
||||
|
||||
# Get an array out of the space separated string
|
||||
frame = frame.strip().split(' ')
|
||||
|
||||
# create a RSSI variable
|
||||
self.rssi = False
|
||||
|
||||
# Validate frame
|
||||
validated = self._validate_frame(ref, frame)
|
||||
if not validated:
|
||||
#self._log.debug('Discard RX Frame "Failed validation"')
|
||||
return
|
||||
else:
|
||||
frame = self._decode_frame(ref, validated)
|
||||
|
||||
if frame:
|
||||
self._log.debug(str(ref) + " Timestamp : " + str(timestamp))
|
||||
self._log.debug(str(ref) + " Node : " + str(frame[0]))
|
||||
self._log.debug(str(ref) + " Values : " + str(frame[1:]))
|
||||
frame = [timestamp] + frame
|
||||
# Append RSSI only if value is not 'False'
|
||||
if self.rssi:
|
||||
self._log.debug(str(ref) + " RSSI : " + str(self.rssi))
|
||||
frame += [self.rssi]
|
||||
frame += [ref]
|
||||
else:
|
||||
return
|
||||
|
||||
# pause output if 'pause' set to 'all' or 'out'
|
||||
if 'pause' in self._settings \
|
||||
and str(self._settings['pause']).lower() in ['all', 'out']:
|
||||
return
|
||||
|
||||
return frame
|
||||
|
||||
def _validate_frame(self, ref, received):
|
||||
"""Validate a frame of data
|
||||
|
||||
This function performs logical tests to filter unsuitable data.
|
||||
Each test discards frame with a log entry if False
|
||||
|
||||
Returns True if data frame passes tests.
|
||||
|
||||
"""
|
||||
|
||||
# Discard if frame not of the form [node, val1, ...]
|
||||
# with number of elements at least 2
|
||||
if len(received) < 2:
|
||||
self._log.warning(str(ref) + " Discarded RX frame 'string too short' : " + str(received))
|
||||
return False
|
||||
|
||||
# Discard if anything non-numerical found
|
||||
try:
|
||||
[float(val) for val in received]
|
||||
except Exception:
|
||||
self._log.warning(str(ref) + " Discarded RX frame 'non-numerical content' : " + str(received))
|
||||
return False
|
||||
|
||||
# Discard if first value is not a valid node id
|
||||
n = float(received[0])
|
||||
if n % 1 != 0 or n < 0 or n > 31:
|
||||
self._log.warning(str(ref) + " Discarded RX frame 'node id outside scope' : " + str(received))
|
||||
return False
|
||||
|
||||
# If it passes all the checks return
|
||||
return received
|
||||
|
||||
def _decode_frame(self, ref, data):
|
||||
"""Decodes a frame of data
|
||||
|
||||
Performs decoding of data types
|
||||
|
||||
Returns decoded string of data.
|
||||
|
||||
"""
|
||||
|
||||
node = data[0]
|
||||
data = data[1:]
|
||||
decoded = []
|
||||
|
||||
# check if node is listed and has individual datacodes for each value
|
||||
if node in ehc.nodelist and 'datacodes' in ehc.nodelist[node]:
|
||||
# fetch the string of datacodes
|
||||
datacodes = ehc.nodelist[node]['datacodes']
|
||||
# fetch a string of data sizes based on the string of datacodes
|
||||
datasizes = []
|
||||
for code in datacodes:
|
||||
datasizes.append(ehc.check_datacode(code))
|
||||
# Discard the frame & return 'False' if it doesn't match the summed datasizes
|
||||
if len(data) != sum(datasizes):
|
||||
self._log.warning(str(ref) + " RX data length: " + str(len(data)) +
|
||||
" is not valid for datacodes " + str(datacodes))
|
||||
return False
|
||||
else:
|
||||
# Determine the expected number of values to be decoded
|
||||
count = len(datacodes)
|
||||
# Set decoder to "Per value" decoding using datacode 'False' as flag
|
||||
datacode = False
|
||||
else:
|
||||
# if node is listed, but has only a single default datacode for all values
|
||||
if node in ehc.nodelist and 'datacode' in ehc.nodelist[node]:
|
||||
datacode = ehc.nodelist[node]['datacode']
|
||||
else:
|
||||
# when node not listed or has no datacode(s) use the interfacers default if specified
|
||||
datacode = self._settings['datacode']
|
||||
# Ensure only int 0 is passed not str 0
|
||||
if datacode == '0':
|
||||
datacode = 0
|
||||
# when no (default)datacode(s) specified, pass string values back as numerical values
|
||||
if not datacode:
|
||||
for val in data:
|
||||
if float(val) % 1 != 0:
|
||||
val = float(val)
|
||||
else:
|
||||
val = int(float(val))
|
||||
decoded.append(val)
|
||||
# Discard frame if total size is not an exact multiple of the specified datacode size.
|
||||
elif len(data) % ehc.check_datacode(datacode) != 0:
|
||||
self._log.warning(str(ref) + " RX data length: " + str(len(data)) +
|
||||
" is not valid for datacode " + str(datacode))
|
||||
return False
|
||||
else:
|
||||
# Determine the number of values in the frame of the specified code & size
|
||||
count = len(data) / ehc.check_datacode(datacode)
|
||||
|
||||
# Decode the string of data one value at a time into "decoded"
|
||||
if not decoded:
|
||||
bytepos = int(0)
|
||||
for i in range(0, count, 1):
|
||||
# Use single datacode unless datacode = False then use datacodes
|
||||
dc = datacode
|
||||
if not datacode:
|
||||
dc = datacodes[i]
|
||||
# Determine the number of bytes to use for each value by it's datacode
|
||||
size = int(ehc.check_datacode(dc))
|
||||
try:
|
||||
value = ehc.decode(dc, [int(v) for v in data[bytepos:bytepos+size]])
|
||||
except:
|
||||
self._log.warning(str(ref) + " Unable to decode as values incorrect for datacode(s)")
|
||||
return False
|
||||
bytepos += size
|
||||
decoded.append(value)
|
||||
|
||||
# Insert node ID before data
|
||||
decoded.insert(0, int(node))
|
||||
return decoded
|
||||
|
||||
def set(self, **kwargs):
|
||||
"""Set configuration parameters.
|
||||
|
||||
**kwargs (dict): settings to be sent. Example:
|
||||
{'setting_1': 'value_1', 'setting_2': 'value_2'}
|
||||
|
||||
pause (string): pause status
|
||||
'pause' = all pause Interfacer fully, nothing read, processed or posted.
|
||||
'pause' = in pauses the input only, no input read performed
|
||||
'pause' = out pauses output only, input is read, processed but not posted to buffer
|
||||
'pause' = off pause is off and Interfacer is fully operational (default)
|
||||
|
||||
"""
|
||||
|
||||
for key, setting in self._defaults.iteritems():
|
||||
if key in kwargs.keys():
|
||||
setting = kwargs[key]
|
||||
else:
|
||||
setting = self._defaults[key]
|
||||
if key in self._settings and self._settings[key] == setting:
|
||||
continue
|
||||
elif key == 'pause' and str(setting).lower() in ['all', 'in', 'out', 'off']:
|
||||
pass
|
||||
elif key == 'interval' and str(setting).isdigit():
|
||||
pass
|
||||
elif key == 'datacode' and str(setting) in ['0', 'b', 'B', 'h', 'H', 'L', 'l', 'f']:
|
||||
pass
|
||||
elif key == 'timestamped' and str(setting).lower() in ['true', 'false']:
|
||||
pass
|
||||
else:
|
||||
self._log.warning("'%s' is not a valid setting for %s: %s" % (str(setting), self.name, key))
|
||||
continue
|
||||
self._settings[key] = setting
|
||||
self._log.debug("Setting " + self.name + " " + key + ": " + str(setting))
|
||||
|
||||
def run(self):
|
||||
"""Placeholder for background tasks.
|
||||
|
||||
Allows subclasses to specify actions that need to be done on a
|
||||
regular basis. This should be called in main loop by instantiater.
|
||||
|
||||
"""
|
||||
pass
|
||||
|
||||
def _open_serial_port(self, com_port, com_baud):
|
||||
"""Open serial port
|
||||
|
||||
com_port (string): path to COM port
|
||||
|
||||
"""
|
||||
|
||||
#if not int(com_baud) in [75, 110, 300, 1200, 2400, 4800, 9600, 19200, 38400, 57600, 115200]:
|
||||
# self._log.debug("Invalid 'com_baud': " + str(com_baud) + " | Default of 9600 used")
|
||||
# com_baud = 9600
|
||||
|
||||
try:
|
||||
s = serial.Serial(com_port, com_baud, timeout=0)
|
||||
self._log.debug("Opening serial port: " + str(com_port) + " @ "+ str(com_baud) + " bits/s")
|
||||
except serial.SerialException as e:
|
||||
self._log.error(e)
|
||||
raise EmonHubInterfacerInitError('Could not open COM port %s' %
|
||||
com_port)
|
||||
else:
|
||||
return s
|
||||
|
||||
def _open_socket(self, port_nb):
|
||||
"""Open a socket
|
||||
|
||||
port_nb (string): port number on which to open the socket
|
||||
|
||||
"""
|
||||
|
||||
self._log.debug('Opening socket on port %s', port_nb)
|
||||
|
||||
try:
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
s.bind(('', int(port_nb)))
|
||||
s.listen(1)
|
||||
except socket.error as e:
|
||||
self._log.error(e)
|
||||
raise EmonHubInterfacerInitError('Could not open port %s' %
|
||||
port_nb)
|
||||
else:
|
||||
return s
|
||||
|
||||
"""class EmonhubSerialInterfacer
|
||||
|
||||
Monitors the serial port for data
|
||||
|
||||
"""
|
||||
|
||||
|
||||
class EmonHubSerialInterfacer(EmonHubInterfacer):
|
||||
|
||||
def __init__(self, name, com_port='', com_baud=9600):
|
||||
"""Initialize interfacer
|
||||
|
||||
com_port (string): path to COM port
|
||||
|
||||
"""
|
||||
|
||||
# Initialization
|
||||
super(EmonHubSerialInterfacer, self).__init__(name)
|
||||
|
||||
# Open serial port
|
||||
self._ser = self._open_serial_port(com_port, com_baud)
|
||||
|
||||
# Initialize RX buffer
|
||||
self._rx_buf = ''
|
||||
|
||||
def close(self):
|
||||
"""Close serial port"""
|
||||
|
||||
# Close serial port
|
||||
if self._ser is not None:
|
||||
self._log.debug("Closing serial port")
|
||||
self._ser.close()
|
||||
|
||||
def read(self):
|
||||
"""Read data from serial port and process if complete line received.
|
||||
|
||||
Return data as a list: [NodeID, val1, val2]
|
||||
|
||||
"""
|
||||
|
||||
# Read serial RX
|
||||
self._rx_buf = self._rx_buf + self._ser.readline()
|
||||
|
||||
# If line incomplete, exit
|
||||
if '\r\n' not in self._rx_buf:
|
||||
return
|
||||
|
||||
# Remove CR,LF
|
||||
f = self._rx_buf[:-2]
|
||||
|
||||
# Reset buffer
|
||||
self._rx_buf = ''
|
||||
|
||||
# Discard empty frames
|
||||
if not f:
|
||||
self._log.warning("Discarded empty frame")
|
||||
return
|
||||
|
||||
# unix timestamp
|
||||
t = round(time.time(), 2)
|
||||
|
||||
# Process data frame
|
||||
return self._process_frame(f, t)
|
||||
|
||||
"""class EmonHubJeeInterfacer
|
||||
|
||||
Monitors the serial port for data from "Jee" type device
|
||||
|
||||
"""
|
||||
|
||||
|
||||
class EmonHubJeeInterfacer(EmonHubSerialInterfacer):
|
||||
|
||||
def __init__(self, name, com_port='/dev/ttyAMA0', com_baud=0):
|
||||
"""Initialize Interfacer
|
||||
|
||||
com_port (string): path to COM port
|
||||
|
||||
"""
|
||||
|
||||
# Initialization
|
||||
if com_baud != 0:
|
||||
super(EmonHubJeeInterfacer, self).__init__(name, com_port, com_baud)
|
||||
else:
|
||||
for com_baud in (38400, 9600):
|
||||
super(EmonHubJeeInterfacer, self).__init__(name, com_port, com_baud)
|
||||
self._ser.write("?")
|
||||
time.sleep(2)
|
||||
self._rx_buf = self._rx_buf + self._ser.readline()
|
||||
if '\r\n' in self._rx_buf or '\x00' in self._rx_buf:
|
||||
self._ser.flushInput()
|
||||
self._rx_buf=""
|
||||
break
|
||||
elif self._ser is not None:
|
||||
self._ser.close()
|
||||
continue
|
||||
|
||||
# Display device firmware version and current settings
|
||||
self.info = ["",""]
|
||||
if self._ser is not None:
|
||||
self._ser.write("v")
|
||||
time.sleep(2)
|
||||
self._rx_buf = self._rx_buf + self._ser.readline()
|
||||
if '\r\n' in self._rx_buf:
|
||||
self._rx_buf=""
|
||||
info = self._rx_buf + self._ser.readline()[:-2]
|
||||
if info != "":
|
||||
# Split the returned "info" string into firmware version & current settings
|
||||
self.info[0] = info.strip().split(' ')[0]
|
||||
self.info[1] = info.replace(str(self.info[0]), "")
|
||||
self._log.info( self.name + " device firmware version: " + self.info[0])
|
||||
self._log.info( self.name + " device current settings: " + str(self.info[1]))
|
||||
else:
|
||||
# since "v" command only v11> recommend firmware update ?
|
||||
#self._log.info( self.name + " device firmware is pre-version RFM12demo.11")
|
||||
self._log.info( self.name + " device firmware version & configuration: not available")
|
||||
else:
|
||||
self._log.warning("Device communication error - check settings")
|
||||
self._rx_buf=""
|
||||
self._ser.flushInput()
|
||||
|
||||
# Initialize settings
|
||||
self._defaults.update({'pause': 'off', 'interval': 0, 'datacode': 'h'})
|
||||
|
||||
# This line will stop the default values printing to logfile at start-up
|
||||
# unless they have been overwritten by emonhub.conf entries
|
||||
# comment out if diagnosing a startup value issue
|
||||
self._settings.update(self._defaults)
|
||||
|
||||
# Jee specific settings to be picked up as changes not defaults to initialise "Jee" device
|
||||
self._jee_settings = ({'baseid': '15', 'frequency': '433', 'group': '210', 'quiet': 'True'})
|
||||
self._jee_prefix = ({'baseid': 'i', 'frequency': '', 'group': 'g', 'quiet': 'q'})
|
||||
|
||||
# Pre-load Jee settings only if info string available for checks
|
||||
if all(i in self.info[1] for i in (" i", " g", " @ ", " MHz")):
|
||||
self._settings.update(self._jee_settings)
|
||||
|
||||
def read(self):
|
||||
"""Read data from serial port and process if complete line received.
|
||||
|
||||
Return data as a list: [NodeID, val1, val2]
|
||||
|
||||
"""
|
||||
|
||||
# Read serial RX
|
||||
self._rx_buf = self._rx_buf + self._ser.readline()
|
||||
|
||||
# If line incomplete, exit
|
||||
if '\r\n' not in self._rx_buf:
|
||||
return
|
||||
|
||||
# Remove CR,LF
|
||||
f = self._rx_buf[:-2]
|
||||
|
||||
# Reset buffer
|
||||
self._rx_buf = ''
|
||||
|
||||
# Discard empty frames
|
||||
if not f:
|
||||
self._log.warning("Discarded empty frame")
|
||||
return
|
||||
|
||||
# Discard information messages
|
||||
if (f[0] == '>'):
|
||||
self._log.debug(self.name + " acknowledged command: " + str(f))
|
||||
return
|
||||
|
||||
if (len(f)>2 and f[0:3] == ' ->'):
|
||||
self._log.debug(self.name + " confirmed sent packet size: " + str(f))
|
||||
return
|
||||
|
||||
if f[0] == '\x01':
|
||||
#self._log.debug("Ignoring frame consisting of SOH character" + str(f))
|
||||
return
|
||||
|
||||
if " i" and " g" and " @ " and " MHz" in f:
|
||||
self.info[1] = f
|
||||
self._log.debug( self.name + " device settings updated: " + str(self.info[1]))
|
||||
return
|
||||
|
||||
# unix timestamp
|
||||
t = round(time.time(), 2)
|
||||
|
||||
# Process data frame
|
||||
return self._process_frame(f, t)
|
||||
|
||||
def _validate_frame(self, ref, received):
|
||||
"""Validate a frame of data
|
||||
|
||||
This function performs logical tests to filter unsuitable data.
|
||||
Each test discards frame with a log entry if False
|
||||
|
||||
Returns True if data frame passes tests.
|
||||
|
||||
"""
|
||||
|
||||
if received[0] == '?'and str(received[-1])[0]=='(' and str(received[-1])[-1]==')':
|
||||
self._log.info(str(ref) + " Discard RX frame 'unreliable content' : RSSI " + str(received[-1]))
|
||||
return False
|
||||
|
||||
# Strip 'OK' from frame if needed
|
||||
if received[0]=='OK':
|
||||
received = received[1:]
|
||||
|
||||
# extract RSSI if packet is from RFM69 type Jee Device
|
||||
if str(received[-1])[0]=='(' and str(received[-1])[-1]==')':
|
||||
self.rssi = int(received[-1][1:-1])
|
||||
received = received[:-1]
|
||||
return received
|
||||
else:
|
||||
# set RSSI false for standard frames so RSSI is not re-appended later
|
||||
self.rssi = False
|
||||
|
||||
# include checks from parent
|
||||
if not super(EmonHubJeeInterfacer, self)._validate_frame(ref, received):
|
||||
return False
|
||||
|
||||
return received
|
||||
|
||||
def set(self, **kwargs):
|
||||
"""Send configuration parameters to the "Jee" type device through COM port
|
||||
|
||||
**kwargs (dict): settings to be modified. Available settings are
|
||||
'baseid', 'frequency', 'group'. Example:
|
||||
{'baseid': '15', 'frequency': '4', 'group': '210'}
|
||||
|
||||
"""
|
||||
|
||||
for key, setting in self._jee_settings.iteritems():
|
||||
# Decide which setting value to use
|
||||
if key in kwargs.keys():
|
||||
setting = kwargs[key]
|
||||
else:
|
||||
setting = self._jee_settings[key]
|
||||
# convert bools to ints
|
||||
if str.capitalize(str(setting)) in ['True', 'False']:
|
||||
setting = int(setting == "True")
|
||||
# confirmation string always contains baseid, group anf freq
|
||||
if " i" and " g" and " @ " and " MHz" in self.info[1]:
|
||||
# If setting confirmed as already set, continue without changing
|
||||
if (self._jee_prefix[key] + str(setting)) in self.info[1].split():
|
||||
continue
|
||||
elif key in self._settings and self._settings[key] == setting:
|
||||
continue
|
||||
if key == 'baseid' and int(setting) >=1 and int(setting) <=26:
|
||||
command = setting + 'i'
|
||||
elif key == 'frequency' and setting in ['433','868','915']:
|
||||
command = setting[:1] + 'b'
|
||||
elif key == 'group'and int(setting) >=0 and int(setting) <=212:
|
||||
command = setting + 'g'
|
||||
elif key == 'quiet' and int(setting) >=0 and int(setting) <2:
|
||||
command = str(setting) + 'q'
|
||||
else:
|
||||
self._log.warning("'%s' is not a valid setting for %s: %s" % (str(setting), self.name, key))
|
||||
continue
|
||||
self._settings[key] = setting
|
||||
self._log.info("Setting " + self.name + " %s: %s" % (key, setting) + " (" + command + ")")
|
||||
self._ser.write(command)
|
||||
# Wait a sec between two settings
|
||||
time.sleep(1)
|
||||
|
||||
# include kwargs from parent
|
||||
super(EmonHubJeeInterfacer, self).set(**kwargs)
|
||||
|
||||
def run(self):
|
||||
"""Actions that need to be done on a regular basis.
|
||||
|
||||
This should be called in main loop by instantiater.
|
||||
|
||||
"""
|
||||
|
||||
now = time.time()
|
||||
|
||||
# Broadcast time to synchronize emonGLCD
|
||||
interval = int(self._settings['interval'])
|
||||
if interval: # A value of 0 means don't do anything
|
||||
if now - self._interval_timestamp > interval:
|
||||
self._send_time()
|
||||
self._interval_timestamp = now
|
||||
|
||||
def _send_time(self):
|
||||
"""Send time over radio link to synchronize emonGLCD
|
||||
|
||||
The radio module can be used to broadcast time, which is useful
|
||||
to synchronize emonGLCD in particular.
|
||||
Beware, this is know to garble the serial link on RFM2Piv1
|
||||
sendtimeinterval defines the interval in seconds between two time
|
||||
broadcasts. 0 means never.
|
||||
|
||||
"""
|
||||
|
||||
now = datetime.datetime.now()
|
||||
|
||||
self._log.debug(self.name + " broadcasting time: %02d:%02d" % (now.hour, now.minute))
|
||||
|
||||
self._ser.write("00,%02d,%02d,00,s" % (now.hour, now.minute))
|
||||
|
||||
"""class EmonHubSocketInterfacer
|
||||
|
||||
Monitors a socket for data, typically from ethernet link
|
||||
|
||||
"""
|
||||
|
||||
|
||||
class EmonHubSocketInterfacer(EmonHubInterfacer):
|
||||
|
||||
def __init__(self, name, port_nb=50011):
|
||||
"""Initialize Interfacer
|
||||
|
||||
port_nb (string): port number on which to open the socket
|
||||
|
||||
"""
|
||||
|
||||
# Initialization
|
||||
super(EmonHubSocketInterfacer, self).__init__(name)
|
||||
|
||||
# Open socket
|
||||
self._socket = self._open_socket(port_nb)
|
||||
|
||||
# Initialize RX buffer for socket
|
||||
self._sock_rx_buf = ''
|
||||
|
||||
def close(self):
|
||||
"""Close socket."""
|
||||
|
||||
# Close socket
|
||||
if self._socket is not None:
|
||||
self._log.debug('Closing socket')
|
||||
self._socket.close()
|
||||
|
||||
def read(self):
|
||||
"""Read data from socket and process if complete line received.
|
||||
|
||||
Return data as a list: [NodeID, val1, val2]
|
||||
|
||||
"""
|
||||
|
||||
# Check if data received
|
||||
ready_to_read, ready_to_write, in_error = \
|
||||
select.select([self._socket], [], [], 0)
|
||||
|
||||
# If data received, add it to socket RX buffer
|
||||
if self._socket in ready_to_read:
|
||||
|
||||
# Accept connection
|
||||
conn, addr = self._socket.accept()
|
||||
|
||||
# Read data
|
||||
self._sock_rx_buf = self._sock_rx_buf + conn.recv(1024)
|
||||
|
||||
# Close connection
|
||||
conn.close()
|
||||
|
||||
# If there is at least one complete frame in the buffer
|
||||
if '\r\n' in self._sock_rx_buf:
|
||||
# Process and return first frame in buffer:
|
||||
f, self._sock_rx_buf = self._sock_rx_buf.split('\r\n', 1)
|
||||
if str(self._settings['timestamped']).lower() == "true":
|
||||
f = f.split(" ")
|
||||
t = float(f[0])
|
||||
f = ' '.join(map(str, f[1:]))
|
||||
return self._process_frame(f, t)
|
||||
else:
|
||||
return self._process_frame(f)
|
||||
|
||||
"""class EmonHubInterfacerInitError
|
||||
|
||||
Raise this when init fails.
|
||||
|
||||
"""
|
||||
|
||||
|
||||
class EmonHubInterfacerInitError(Exception):
|
||||
pass
|
||||
|
|
@ -0,0 +1,482 @@
|
|||
"""
|
||||
|
||||
This code is released under the GNU Affero General Public License.
|
||||
|
||||
OpenEnergyMonitor project:
|
||||
http://openenergymonitor.org
|
||||
|
||||
"""
|
||||
|
||||
import urllib2
|
||||
import httplib
|
||||
import time
|
||||
import logging
|
||||
import json
|
||||
import threading
|
||||
import Queue
|
||||
import ssl
|
||||
import base64
|
||||
|
||||
import emonhub_buffer as ehb
|
||||
|
||||
"""class EmonHubReporter
|
||||
|
||||
Stores server parameters and buffers the data between two HTTP requests
|
||||
|
||||
This class is meant to be inherited by subclasses specific to their
|
||||
destination server.
|
||||
|
||||
"""
|
||||
|
||||
|
||||
class EmonHubReporter(threading.Thread):
|
||||
|
||||
def __init__(self, reporterName, queue, buffer_type="memory", buffer_size=1000, **kwargs):
|
||||
"""Create a server data buffer initialized with server settings."""
|
||||
|
||||
# Initialize logger
|
||||
self._log = logging.getLogger("EmonHub")
|
||||
|
||||
# Initialise thread
|
||||
threading.Thread.__init__(self)
|
||||
|
||||
# Initialise settings
|
||||
self.name = reporterName
|
||||
self.init_settings = {}
|
||||
self._defaults = {'pause': 'off', 'interval': '0', 'batchsize': '1'}
|
||||
self._settings = {}
|
||||
self._queue = queue
|
||||
|
||||
# This line will stop the default values printing to logfile at start-up
|
||||
# unless they have been overwritten by emonhub.conf entries
|
||||
# comment out if diagnosing a startup value issue
|
||||
self._settings.update(self._defaults)
|
||||
|
||||
# Initialize interval timer's "started at" timestamp
|
||||
self._interval_timestamp = 0
|
||||
|
||||
# Create underlying buffer implementation
|
||||
self.buffer = ehb.getBuffer(buffer_type)(reporterName, buffer_size, **kwargs)
|
||||
|
||||
# set an absolute upper limit for number of items to process per post
|
||||
# number of items posted is the lower of this item limit, buffer_size, or the
|
||||
# batchsize, as set in reporter settings or by the default value.
|
||||
self._item_limit = buffer_size
|
||||
|
||||
self._log.info("Set up reporter '%s' (buffer: %s | size: %s)"
|
||||
% (reporterName, buffer_type, buffer_size))
|
||||
|
||||
# Init ctx
|
||||
self._ctx = ssl.create_default_context()
|
||||
self._ctx.check_hostname = False
|
||||
self._ctx.verify_mode = ssl.CERT_NONE
|
||||
# Initialise a thread and start the reporter
|
||||
self.stop = False
|
||||
self.start()
|
||||
|
||||
def set(self, **kwargs):
|
||||
"""Update settings.
|
||||
|
||||
**kwargs (dict): runtime settings to be modified.
|
||||
|
||||
url (string): eg: 'http://localhost/emoncms' or 'http://emoncms.org' (trailing slash optional)
|
||||
apikey (string): API key with write access
|
||||
pause (string): pause status
|
||||
'pause' = all pause fully, nothing posted to buffer or sent (buffer retained)
|
||||
'pause' = in pauses the input only, no add to buffer but flush still functional
|
||||
'pause' = out pauses output only, no flush but data can accumulate in buffer
|
||||
'pause' = off pause is off and reporter is fully operational
|
||||
|
||||
"""
|
||||
|
||||
for key, setting in self._defaults.iteritems():
|
||||
if key in kwargs.keys():
|
||||
setting = kwargs[key]
|
||||
else:
|
||||
setting = self._defaults[key]
|
||||
if key in self._settings and self._settings[key] == setting:
|
||||
continue
|
||||
elif key == 'pause' and str(setting).lower() in ['all', 'in', 'out', 'off']:
|
||||
pass
|
||||
elif key in ['interval', 'batchsize'] and setting.isdigit():
|
||||
pass
|
||||
else:
|
||||
self._log.warning("'%s' is not a valid setting for %s: %s" % (setting, self.name, key))
|
||||
self._settings[key] = setting
|
||||
self._log.debug("Setting " + self.name + " " + key + ": " + str(setting))
|
||||
|
||||
for key, setting in self._defaults.iteritems():
|
||||
valid = False
|
||||
if key in kwargs.keys():
|
||||
setting = kwargs[key]
|
||||
else:
|
||||
setting = self._defaults[key]
|
||||
if key in self._settings and self._settings[key] == setting:
|
||||
continue
|
||||
elif key == 'pause':
|
||||
if str(setting).lower() in ['all', 'in', 'out', 'off']:
|
||||
valid = True
|
||||
elif key == 'interval' or 'batchsize':
|
||||
if setting.isdigit():
|
||||
valid = True
|
||||
else:
|
||||
continue
|
||||
if valid:
|
||||
self._settings[key] = setting
|
||||
self._log.debug("Setting " + self.name + " " + key + ": " + str(setting))
|
||||
else:
|
||||
self._log.warning("'%s' is not a valid setting for %s: %s" % (setting, self.name, key))
|
||||
|
||||
|
||||
def add(self, data):
|
||||
"""Append data to buffer.
|
||||
|
||||
data (list): node and values (eg: '[node,val1,val2,...]')
|
||||
|
||||
"""
|
||||
|
||||
self._log.debug(str(data[-1]) + " Append to '" + self.name +
|
||||
"' buffer => time: " + str(data[0])
|
||||
+ ", data: " + str(data[1:-1])
|
||||
# TODO "ref" temporarily left on end of data string for info
|
||||
+ ", ref: " + str(data[-1]))
|
||||
# TODO "ref" removed from end of data string here so not sent to emoncms
|
||||
data = data[:-1]
|
||||
|
||||
# databuffer is of format:
|
||||
# [[timestamp, nodeid, datavalues][timestamp, nodeid, datavalues]]
|
||||
# [[1399980731, 10, 150, 3450 ...]]
|
||||
self.buffer.storeItem(data)
|
||||
|
||||
def run(self):
|
||||
"""
|
||||
Run the reporter thread.
|
||||
Any regularly performed tasks actioned here along with flushing the buffer
|
||||
|
||||
"""
|
||||
while not self.stop:
|
||||
# If there are frames in the queue
|
||||
while not self._queue.empty():
|
||||
# Add each frame to the buffer
|
||||
frame = self._queue.get()
|
||||
self.add(frame)
|
||||
# Don't loop to fast
|
||||
time.sleep(0.1)
|
||||
# Action reporter tasks
|
||||
self.action()
|
||||
|
||||
def action(self):
|
||||
"""
|
||||
|
||||
:return:
|
||||
"""
|
||||
|
||||
# pause output if 'pause' set to 'all' or 'out'
|
||||
if 'pause' in self._settings \
|
||||
and str(self._settings['pause']).lower() in ['all', 'out']:
|
||||
return
|
||||
|
||||
# If an interval is set, check if that time has passed since last post
|
||||
if int(self._settings['interval']) \
|
||||
and time.time() - self._interval_timestamp < int(self._settings['interval']):
|
||||
return
|
||||
else:
|
||||
# Then attempt to flush the buffer
|
||||
self.flush()
|
||||
|
||||
def flush(self):
|
||||
"""Send oldest data in buffer, if any."""
|
||||
|
||||
# Buffer management
|
||||
# If data buffer not empty, send a set of values
|
||||
if self.buffer.hasItems():
|
||||
max_items = int(self._settings['batchsize'])
|
||||
if max_items > self._item_limit:
|
||||
max_items = self._item_limit
|
||||
elif max_items <= 0:
|
||||
return
|
||||
|
||||
databuffer = self.buffer.retrieveItems(max_items)
|
||||
retrievedlength = len(databuffer)
|
||||
if self._process_post(databuffer):
|
||||
# In case of success, delete sample set from buffer
|
||||
self.buffer.discardLastRetrievedItems(retrievedlength)
|
||||
# log the time of last succesful post
|
||||
self._interval_timestamp = time.time()
|
||||
|
||||
def _process_post(self, data):
|
||||
"""
|
||||
To be implemented in subclass.
|
||||
|
||||
:return: True if data posted successfully and can be discarded
|
||||
"""
|
||||
pass
|
||||
|
||||
def _send_post(self, post_url, post_body=None, post_header=None):
|
||||
"""
|
||||
|
||||
:param post_url:
|
||||
:param post_body:
|
||||
:param post_header:
|
||||
:return: the received reply if request is successful
|
||||
"""
|
||||
"""Send data to server.
|
||||
|
||||
data (list): node and values (eg: '[node,val1,val2,...]')
|
||||
time (int): timestamp, time when sample was recorded
|
||||
|
||||
return True if data sent correctly
|
||||
|
||||
"""
|
||||
|
||||
reply = ""
|
||||
request = urllib2.Request(post_url, post_body)
|
||||
if post_header is not None:
|
||||
base64string = base64.encodestring('simon:bajsa').replace('\n', '')
|
||||
request.add_header("Authorization", "Basic %s" % base64string)
|
||||
|
||||
try:
|
||||
response = urllib2.urlopen(request, timeout=60, context=self._ctx)
|
||||
except urllib2.HTTPError as e:
|
||||
self._log.warning(self.name + " couldn't send to server, HTTPError: " +
|
||||
str(e.code))
|
||||
except urllib2.URLError as e:
|
||||
self._log.warning(self.name + " couldn't send to server, URLError: " +
|
||||
str(e.reason))
|
||||
except httplib.HTTPException:
|
||||
self._log.warning(self.name + " couldn't send to server, HTTPException")
|
||||
except Exception:
|
||||
import traceback
|
||||
self._log.warning(self.name + " couldn't send to server, Exception: " +
|
||||
traceback.format_exc())
|
||||
else:
|
||||
reply = response.read()
|
||||
finally:
|
||||
return reply
|
||||
|
||||
"""class EmonHubDomoticzReporter
|
||||
|
||||
Stores server parameters and buffers the data between two HTTP requests
|
||||
|
||||
"""
|
||||
|
||||
|
||||
class EmonHubDomoticzReporter(EmonHubReporter):
|
||||
|
||||
def __init__(self, reporterName, queue, **kwargs):
|
||||
"""Initialize reporter
|
||||
|
||||
"""
|
||||
|
||||
# Initialization
|
||||
super(EmonHubDomoticzReporter, self).__init__(reporterName, queue, **kwargs)
|
||||
|
||||
# add or alter any default settings for this reporter
|
||||
self._defaults.update({'batchsize': 100})
|
||||
self._cms_settings = {'url': 'http://emoncms.org'}
|
||||
|
||||
# This line will stop the default values printing to logfile at start-up
|
||||
self._settings.update(self._defaults)
|
||||
|
||||
# set an absolute upper limit for number of items to process per post
|
||||
self._item_limit = 250
|
||||
|
||||
def set(self, **kwargs):
|
||||
"""
|
||||
|
||||
:param kwargs:
|
||||
:return:
|
||||
"""
|
||||
|
||||
super (EmonHubDomoticzReporter, self).set(**kwargs)
|
||||
|
||||
self._log.info(self.name + " Set: ")
|
||||
for key, setting in self._cms_settings.iteritems():
|
||||
#valid = False
|
||||
if not key in kwargs.keys():
|
||||
setting = self._cms_settings[key]
|
||||
else:
|
||||
setting = kwargs[key]
|
||||
|
||||
if key == 'url' and setting[:4] == "http":
|
||||
self._log.info("Setting " + self.name + " url: " + setting)
|
||||
self._settings[key] = setting
|
||||
continue
|
||||
else:
|
||||
self._log.warning("'%s' is not valid for %s: %s" % (setting, self.name, key))
|
||||
|
||||
|
||||
def _process_post(self, databuffer):
|
||||
"""Send data to server."""
|
||||
|
||||
# databuffer is of format:
|
||||
# [[timestamp, nodeid, datavalues][timestamp, nodeid, datavalues]]
|
||||
# [[1399980731, 10, 150, 250 ...]]
|
||||
|
||||
self._log.debug(self.name + " databuffer %s" %databuffer)
|
||||
status = False
|
||||
post_body = None
|
||||
post_header = "%s:%s" %('simon', 'bajsa')
|
||||
|
||||
|
||||
for data in databuffer:
|
||||
data = data[2:]
|
||||
#valid = False
|
||||
#[ext_temp, temp, hum, batteri, pulse, rssi]
|
||||
self._log.debug(self.name + "data %s" %data)
|
||||
ext_temp = data[0] * 0.1
|
||||
temp = data[1] * 0.1
|
||||
hum = data[2] * 0.1
|
||||
batteri = data[3]
|
||||
pulse = data[4]
|
||||
rssi = abs(data[5])
|
||||
|
||||
domoAddParam = '&battery=%s' %(batteri)
|
||||
if temp < 84:
|
||||
domoTempHum = '/json.htm?type=command¶m=udevice&idx=56&nvalue=0&svalue=%s;%s;%s' %(temp,hum,0)
|
||||
post_url_tempHum = self._settings['url'] + domoTempHum + domoAddParam
|
||||
self._log.info(self.name + " sending tempHum: " + post_url_tempHum)
|
||||
reply = self._send_post(post_url_tempHum, post_body, post_header)
|
||||
if reply == 'ok':
|
||||
self._log.debug(self.name + " acknowledged receipt with '" + reply + "' from " + self._settings['url'])
|
||||
|
||||
if ext_temp < 84:
|
||||
domoTemp = '/json.htm?type=command¶m=udevice&idx=55&nvalue=0&svalue=%s' %(ext_temp)
|
||||
post_url_temp = self._settings['url'] + domoTemp + domoAddParam
|
||||
self._log.info(self.name + " sending: " + post_url_temp)
|
||||
reply = self._send_post(post_url_temp, post_body, post_header)
|
||||
|
||||
domoPulse = '/json.htm?type=command¶m=udevice&idx=54&svalue=%s' %(pulse)
|
||||
post_url_pulse = self._settings['url'] + domoPulse + domoAddParam
|
||||
self._log.info(self.name + " sending: " + post_url_pulse)
|
||||
|
||||
|
||||
# The Develop branch of emoncms allows for the sending of the apikey in the post
|
||||
# body, this should be moved from the url to the body as soon as this is widely
|
||||
# adopted
|
||||
|
||||
reply = self._send_post(post_url_pulse, post_body, post_header)
|
||||
|
||||
status = True
|
||||
|
||||
self._log.debug(self.name + " Next return" )
|
||||
if status:
|
||||
return True
|
||||
self._log.debug(self.name + " After return" )
|
||||
|
||||
"""class EmonHubEmoncmsReporter
|
||||
|
||||
Stores server parameters and buffers the data between two HTTP requests
|
||||
|
||||
"""
|
||||
|
||||
|
||||
class EmonHubEmoncmsReporter(EmonHubReporter):
|
||||
|
||||
def __init__(self, reporterName, queue, **kwargs):
|
||||
"""Initialize reporter
|
||||
|
||||
"""
|
||||
|
||||
# Initialization
|
||||
super(EmonHubEmoncmsReporter, self).__init__(reporterName, queue, **kwargs)
|
||||
|
||||
# add or alter any default settings for this reporter
|
||||
self._defaults.update({'batchsize': 100})
|
||||
self._cms_settings = {'apikey': "", 'url': 'http://emoncms.org'}
|
||||
|
||||
# This line will stop the default values printing to logfile at start-up
|
||||
self._settings.update(self._defaults)
|
||||
|
||||
# set an absolute upper limit for number of items to process per post
|
||||
self._item_limit = 250
|
||||
|
||||
def set(self, **kwargs):
|
||||
"""
|
||||
|
||||
:param kwargs:
|
||||
:return:
|
||||
"""
|
||||
|
||||
super (EmonHubEmoncmsReporter, self).set(**kwargs)
|
||||
|
||||
for key, setting in self._cms_settings.iteritems():
|
||||
#valid = False
|
||||
if not key in kwargs.keys():
|
||||
setting = self._cms_settings[key]
|
||||
else:
|
||||
setting = kwargs[key]
|
||||
if key in self._settings and self._settings[key] == setting:
|
||||
continue
|
||||
elif key == 'apikey':
|
||||
if str.lower(setting[:4]) == 'xxxx':
|
||||
self._log.warning("Setting " + self.name + " apikey: obscured")
|
||||
pass
|
||||
elif str.__len__(setting) == 32 :
|
||||
self._log.info("Setting " + self.name + " apikey: set")
|
||||
pass
|
||||
elif setting == "":
|
||||
self._log.info("Setting " + self.name + " apikey: null")
|
||||
pass
|
||||
else:
|
||||
self._log.warning("Setting " + self.name + " apikey: invalid format")
|
||||
continue
|
||||
self._settings[key] = setting
|
||||
# Next line will log apikey if uncommented (privacy ?)
|
||||
#self._log.debug(self.name + " apikey: " + str(setting))
|
||||
continue
|
||||
elif key == 'url' and setting[:4] == "http":
|
||||
self._log.info("Setting " + self.name + " url: " + setting)
|
||||
self._settings[key] = setting
|
||||
continue
|
||||
else:
|
||||
self._log.warning("'%s' is not valid for %s: %s" % (setting, self.name, key))
|
||||
|
||||
def _process_post(self, databuffer):
|
||||
"""Send data to server."""
|
||||
|
||||
# databuffer is of format:
|
||||
# [[timestamp, nodeid, datavalues][timestamp, nodeid, datavalues]]
|
||||
# [[1399980731, 10, 150, 250 ...]]
|
||||
|
||||
if not 'apikey' in self._settings.keys() or str.__len__(self._settings['apikey']) != 32 \
|
||||
or str.lower(self._settings['apikey']) == 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx':
|
||||
return
|
||||
|
||||
data_string = json.dumps(databuffer, separators=(',', ':'))
|
||||
|
||||
# Prepare URL string of the form
|
||||
# http://domain.tld/emoncms/input/bulk.json?apikey=12345
|
||||
# &data=[[0,10,82,23],[5,10,82,23],[10,10,82,23]]
|
||||
# &time=0
|
||||
|
||||
# Construct post_url (without apikey)
|
||||
post_url = self._settings['url']+'/input/bulk'+'.json?time=0&apikey='
|
||||
post_body = "data="+data_string
|
||||
|
||||
# logged before apikey added for security
|
||||
self._log.info(self.name + " sending: " + post_url + "E-M-O-N-C-M-S-A-P-I-K-E-Y&" + post_body)
|
||||
|
||||
# Add apikey to post_url
|
||||
post_url = post_url + self._settings['apikey']
|
||||
|
||||
# The Develop branch of emoncms allows for the sending of the apikey in the post
|
||||
# body, this should be moved from the url to the body as soon as this is widely
|
||||
# adopted
|
||||
|
||||
reply = self._send_post(post_url, post_body)
|
||||
if reply == 'ok':
|
||||
self._log.debug(self.name + " acknowledged receipt with '" + reply + "' from " + self._settings['url'])
|
||||
return True
|
||||
else:
|
||||
self._log.warning(self.name + " send failure: wanted 'ok' but got '" +reply+ "'")
|
||||
|
||||
"""class EmonHubReporterInitError
|
||||
|
||||
Raise this when init fails.
|
||||
|
||||
"""
|
||||
|
||||
|
||||
class EmonHubReporterInitError(Exception):
|
||||
pass
|
||||
|
|
@ -0,0 +1,163 @@
|
|||
"""
|
||||
|
||||
This code is released under the GNU Affero General Public License.
|
||||
|
||||
OpenEnergyMonitor project:
|
||||
http://openenergymonitor.org
|
||||
|
||||
"""
|
||||
|
||||
import time
|
||||
import logging
|
||||
from configobj import ConfigObj
|
||||
|
||||
"""class EmonHubSetup
|
||||
|
||||
User interface to setup the hub.
|
||||
|
||||
The settings attribute stores the settings of the hub. It is a
|
||||
dictionary with the following keys:
|
||||
|
||||
'hub': a dictionary containing the hub settings
|
||||
'interfacers': a dictionary containing the interfacers
|
||||
'reporters': a dictionary containing the reporters
|
||||
|
||||
The hub settings are:
|
||||
'loglevel': the logging level
|
||||
|
||||
interfacers and reporters are dictionaries with the following keys:
|
||||
'Type': class name
|
||||
'init_settings': dictionary with initialization settings
|
||||
'runtimesettings': dictionary with runtime settings
|
||||
Initialization and runtime settings depend on the interfacer and
|
||||
reporter type.
|
||||
|
||||
The run() method is supposed to be run regularly by the instantiater, to
|
||||
perform regular communication tasks.
|
||||
|
||||
The check_settings() method is run regularly as well. It checks the settings
|
||||
and returns True is settings were changed.
|
||||
|
||||
This almost empty class is meant to be inherited by subclasses specific to
|
||||
each setup.
|
||||
|
||||
"""
|
||||
|
||||
|
||||
class EmonHubSetup(object):
|
||||
|
||||
def __init__(self):
|
||||
|
||||
# Initialize logger
|
||||
self._log = logging.getLogger("EmonHub")
|
||||
|
||||
# Initialize settings
|
||||
self.settings = None
|
||||
|
||||
def run(self):
|
||||
"""Run in background.
|
||||
|
||||
To be implemented in child class.
|
||||
|
||||
"""
|
||||
pass
|
||||
|
||||
def check_settings(self):
|
||||
"""Check settings
|
||||
|
||||
Update attribute settings and return True if modified.
|
||||
|
||||
To be implemented in child class.
|
||||
|
||||
"""
|
||||
|
||||
|
||||
class EmonHubFileSetup(EmonHubSetup):
|
||||
|
||||
def __init__(self, filename):
|
||||
|
||||
# Initialization
|
||||
super(EmonHubFileSetup, self).__init__()
|
||||
|
||||
# Initialize update timestamp
|
||||
self._settings_update_timestamp = 0
|
||||
self._retry_time_interval = 5
|
||||
|
||||
# create a timeout message if time out is set (>0)
|
||||
if self._retry_time_interval > 0:
|
||||
self.retry_msg = " Retry in " + str(self._retry_time_interval) + " seconds"
|
||||
else:
|
||||
self.retry_msg = ""
|
||||
|
||||
# Initialize attribute settings as a ConfigObj instance
|
||||
try:
|
||||
self.settings = ConfigObj(filename, file_error=True)
|
||||
# Check the settings file sections
|
||||
self.settings['hub']
|
||||
self.settings['interfacers']
|
||||
self.settings['reporters']
|
||||
except IOError as e:
|
||||
raise EmonHubSetupInitError(e)
|
||||
except SyntaxError as e:
|
||||
raise EmonHubSetupInitError(
|
||||
'Error parsing config file \"%s\": ' % filename + str(e))
|
||||
except KeyError as e:
|
||||
raise EmonHubSetupInitError(
|
||||
'Configuration file error - section: ' + str(e))
|
||||
|
||||
def check_settings(self):
|
||||
"""Check settings
|
||||
|
||||
Update attribute settings and return True if modified.
|
||||
|
||||
"""
|
||||
|
||||
# Check settings only once per second
|
||||
now = time.time()
|
||||
if now - self._settings_update_timestamp < 0:
|
||||
return
|
||||
# Update timestamp
|
||||
self._settings_update_timestamp = now
|
||||
|
||||
# Backup settings
|
||||
settings = dict(self.settings)
|
||||
|
||||
# Get settings from file
|
||||
try:
|
||||
self.settings.reload()
|
||||
except IOError as e:
|
||||
self._log.warning('Could not get settings: ' + str(e) + self.retry_msg)
|
||||
self._settings_update_timestamp = now + self._retry_time_interval
|
||||
return
|
||||
except SyntaxError as e:
|
||||
self._log.warning('Could not get settings: ' +
|
||||
'Error parsing config file: ' + str(e) + self.retry_msg)
|
||||
self._settings_update_timestamp = now + self._retry_time_interval
|
||||
return
|
||||
except Exception:
|
||||
import traceback
|
||||
self._log.warning("Couldn't get settings, Exception: " +
|
||||
traceback.format_exc() + self.retry_msg)
|
||||
self._settings_update_timestamp = now + self._retry_time_interval
|
||||
return
|
||||
|
||||
if self.settings != settings:
|
||||
# Check the settings file sections
|
||||
try:
|
||||
self.settings['hub']
|
||||
self.settings['interfacers']
|
||||
self.settings['reporters']
|
||||
except KeyError as e:
|
||||
self._log.warning("Configuration file missing section: " + str(e))
|
||||
else:
|
||||
return True
|
||||
|
||||
"""class EmonHubSetupInitError
|
||||
|
||||
Raise this when init fails.
|
||||
|
||||
"""
|
||||
|
||||
|
||||
class EmonHubSetupInitError(Exception):
|
||||
pass
|
||||
Loading…
Reference in New Issue