""" This code is released under the GNU Affero General Public License. OpenEnergyMonitor project: http://openenergymonitor.org """ import urllib2 import httplib import time import logging import json import threading import Queue import ssl import base64 import emonhub_buffer as ehb """class EmonHubReporter Stores server parameters and buffers the data between two HTTP requests This class is meant to be inherited by subclasses specific to their destination server. """ class EmonHubReporter(threading.Thread): def __init__(self, reporterName, queue, buffer_type="memory", buffer_size=1000, **kwargs): """Create a server data buffer initialized with server settings.""" # Initialize logger self._log = logging.getLogger("EmonHub") # Initialise thread threading.Thread.__init__(self) # Initialise settings self.name = reporterName self.init_settings = {} self._defaults = {'pause': 'off', 'interval': '0', 'batchsize': '1'} self._settings = {} self._queue = queue # This line will stop the default values printing to logfile at start-up # unless they have been overwritten by emonhub.conf entries # comment out if diagnosing a startup value issue self._settings.update(self._defaults) # Initialize interval timer's "started at" timestamp self._interval_timestamp = 0 # Create underlying buffer implementation self.buffer = ehb.getBuffer(buffer_type)(reporterName, buffer_size, **kwargs) # set an absolute upper limit for number of items to process per post # number of items posted is the lower of this item limit, buffer_size, or the # batchsize, as set in reporter settings or by the default value. self._item_limit = buffer_size self._log.info("Set up reporter '%s' (buffer: %s | size: %s)" % (reporterName, buffer_type, buffer_size)) # Init ctx self._ctx = ssl.create_default_context() self._ctx.check_hostname = False self._ctx.verify_mode = ssl.CERT_NONE # Initialise a thread and start the reporter self.stop = False self.start() def set(self, **kwargs): """Update settings. **kwargs (dict): runtime settings to be modified. url (string): eg: 'http://localhost/emoncms' or 'http://emoncms.org' (trailing slash optional) apikey (string): API key with write access pause (string): pause status 'pause' = all pause fully, nothing posted to buffer or sent (buffer retained) 'pause' = in pauses the input only, no add to buffer but flush still functional 'pause' = out pauses output only, no flush but data can accumulate in buffer 'pause' = off pause is off and reporter is fully operational """ for key, setting in self._defaults.iteritems(): if key in kwargs.keys(): setting = kwargs[key] else: setting = self._defaults[key] if key in self._settings and self._settings[key] == setting: continue elif key == 'pause' and str(setting).lower() in ['all', 'in', 'out', 'off']: pass elif key in ['interval', 'batchsize'] and setting.isdigit(): pass else: self._log.warning("'%s' is not a valid setting for %s: %s" % (setting, self.name, key)) self._settings[key] = setting self._log.debug("Setting " + self.name + " " + key + ": " + str(setting)) for key, setting in self._defaults.iteritems(): valid = False if key in kwargs.keys(): setting = kwargs[key] else: setting = self._defaults[key] if key in self._settings and self._settings[key] == setting: continue elif key == 'pause': if str(setting).lower() in ['all', 'in', 'out', 'off']: valid = True elif key == 'interval' or 'batchsize': if setting.isdigit(): valid = True else: continue if valid: self._settings[key] = setting self._log.debug("Setting " + self.name + " " + key + ": " + str(setting)) else: self._log.warning("'%s' is not a valid setting for %s: %s" % (setting, self.name, key)) def add(self, data): """Append data to buffer. data (list): node and values (eg: '[node,val1,val2,...]') """ self._log.debug(str(data[-1]) + " Append to '" + self.name + "' buffer => time: " + str(data[0]) + ", data: " + str(data[1:-1]) # TODO "ref" temporarily left on end of data string for info + ", ref: " + str(data[-1])) # TODO "ref" removed from end of data string here so not sent to emoncms data = data[:-1] # databuffer is of format: # [[timestamp, nodeid, datavalues][timestamp, nodeid, datavalues]] # [[1399980731, 10, 150, 3450 ...]] self.buffer.storeItem(data) def run(self): """ Run the reporter thread. Any regularly performed tasks actioned here along with flushing the buffer """ while not self.stop: # If there are frames in the queue while not self._queue.empty(): # Add each frame to the buffer frame = self._queue.get() self.add(frame) # Don't loop to fast time.sleep(0.1) # Action reporter tasks self.action() def action(self): """ :return: """ # pause output if 'pause' set to 'all' or 'out' if 'pause' in self._settings \ and str(self._settings['pause']).lower() in ['all', 'out']: return # If an interval is set, check if that time has passed since last post if int(self._settings['interval']) \ and time.time() - self._interval_timestamp < int(self._settings['interval']): return else: # Then attempt to flush the buffer self.flush() def flush(self): """Send oldest data in buffer, if any.""" # Buffer management # If data buffer not empty, send a set of values if self.buffer.hasItems(): max_items = int(self._settings['batchsize']) if max_items > self._item_limit: max_items = self._item_limit elif max_items <= 0: return databuffer = self.buffer.retrieveItems(max_items) retrievedlength = len(databuffer) if self._process_post(databuffer): # In case of success, delete sample set from buffer self.buffer.discardLastRetrievedItems(retrievedlength) # log the time of last succesful post self._interval_timestamp = time.time() def _process_post(self, data): """ To be implemented in subclass. :return: True if data posted successfully and can be discarded """ pass def _send_post(self, post_url, post_body=None, post_header=None): """ :param post_url: :param post_body: :param post_header: :return: the received reply if request is successful """ """Send data to server. data (list): node and values (eg: '[node,val1,val2,...]') time (int): timestamp, time when sample was recorded return True if data sent correctly """ reply = "" request = urllib2.Request(post_url, post_body) if post_header is not None: base64string = base64.encodestring('simon:bajsa').replace('\n', '') request.add_header("Authorization", "Basic %s" % base64string) try: response = urllib2.urlopen(request, timeout=60, context=self._ctx) except urllib2.HTTPError as e: self._log.warning(self.name + " couldn't send to server, HTTPError: " + str(e.code)) except urllib2.URLError as e: self._log.warning(self.name + " couldn't send to server, URLError: " + str(e.reason)) except httplib.HTTPException: self._log.warning(self.name + " couldn't send to server, HTTPException") except Exception: import traceback self._log.warning(self.name + " couldn't send to server, Exception: " + traceback.format_exc()) else: reply = response.read() finally: return reply """class EmonHubDomoticzReporter Stores server parameters and buffers the data between two HTTP requests """ class EmonHubDomoticzReporter(EmonHubReporter): def __init__(self, reporterName, queue, **kwargs): """Initialize reporter """ # Initialization super(EmonHubDomoticzReporter, self).__init__(reporterName, queue, **kwargs) # add or alter any default settings for this reporter self._defaults.update({'batchsize': 100}) self._cms_settings = {'url': 'http://emoncms.org'} # This line will stop the default values printing to logfile at start-up self._settings.update(self._defaults) # set an absolute upper limit for number of items to process per post self._item_limit = 250 def set(self, **kwargs): """ :param kwargs: :return: """ super (EmonHubDomoticzReporter, self).set(**kwargs) self._log.info(self.name + " Set: ") for key, setting in self._cms_settings.iteritems(): #valid = False if not key in kwargs.keys(): setting = self._cms_settings[key] else: setting = kwargs[key] if key == 'url' and setting[:4] == "http": self._log.info("Setting " + self.name + " url: " + setting) self._settings[key] = setting continue else: self._log.warning("'%s' is not valid for %s: %s" % (setting, self.name, key)) def _process_post(self, databuffer): """Send data to server.""" # databuffer is of format: # [[timestamp, nodeid, datavalues][timestamp, nodeid, datavalues]] # [[1399980731, 10, 150, 250 ...]] self._log.debug(self.name + " databuffer %s" %databuffer) status = False post_body = None post_header = "%s:%s" %('simon', 'bajsa') for data in databuffer: data = data[2:] #valid = False #[ext_temp, temp, hum, batteri, pulse, rssi] self._log.debug(self.name + "data %s" %data) ext_temp = data[0] * 0.1 temp = data[1] * 0.1 hum = data[2] * 0.1 batteri = data[3] pulse = data[4] rssi = abs(data[5]) domoAddParam = '&battery=%s' %(batteri) if temp < 84: domoTempHum = '/json.htm?type=command¶m=udevice&idx=56&nvalue=0&svalue=%s;%s;%s' %(temp,hum,0) post_url_tempHum = self._settings['url'] + domoTempHum + domoAddParam self._log.info(self.name + " sending tempHum: " + post_url_tempHum) reply = self._send_post(post_url_tempHum, post_body, post_header) if reply == 'ok': self._log.debug(self.name + " acknowledged receipt with '" + reply + "' from " + self._settings['url']) if ext_temp < 84: domoTemp = '/json.htm?type=command¶m=udevice&idx=55&nvalue=0&svalue=%s' %(ext_temp) post_url_temp = self._settings['url'] + domoTemp + domoAddParam self._log.info(self.name + " sending: " + post_url_temp) reply = self._send_post(post_url_temp, post_body, post_header) self._log.debug(self.name + " Pulse: " + str(pulse)) if pulse > 100: domoPulse = '/json.htm?type=command¶m=udevice&idx=54&svalue=%s' %(pulse) post_url_pulse = self._settings['url'] + domoPulse + domoAddParam self._log.info(self.name + " sending: " + post_url_pulse + " pulse: " + str(pulse)) # The Develop branch of emoncms allows for the sending of the apikey in the post # body, this should be moved from the url to the body as soon as this is widely # adopted reply = self._send_post(post_url_pulse, post_body, post_header) status = True self._log.debug(self.name + " Next return" ) if status: return True self._log.debug(self.name + " After return" ) """class EmonHubEmoncmsReporter Stores server parameters and buffers the data between two HTTP requests """ class EmonHubEmoncmsReporter(EmonHubReporter): def __init__(self, reporterName, queue, **kwargs): """Initialize reporter """ # Initialization super(EmonHubEmoncmsReporter, self).__init__(reporterName, queue, **kwargs) # add or alter any default settings for this reporter self._defaults.update({'batchsize': 100}) self._cms_settings = {'apikey': "", 'url': 'http://emoncms.org'} # This line will stop the default values printing to logfile at start-up self._settings.update(self._defaults) # set an absolute upper limit for number of items to process per post self._item_limit = 250 def set(self, **kwargs): """ :param kwargs: :return: """ super (EmonHubEmoncmsReporter, self).set(**kwargs) for key, setting in self._cms_settings.iteritems(): #valid = False if not key in kwargs.keys(): setting = self._cms_settings[key] else: setting = kwargs[key] if key in self._settings and self._settings[key] == setting: continue elif key == 'apikey': if str.lower(setting[:4]) == 'xxxx': self._log.warning("Setting " + self.name + " apikey: obscured") pass elif str.__len__(setting) == 32 : self._log.info("Setting " + self.name + " apikey: set") pass elif setting == "": self._log.info("Setting " + self.name + " apikey: null") pass else: self._log.warning("Setting " + self.name + " apikey: invalid format") continue self._settings[key] = setting # Next line will log apikey if uncommented (privacy ?) #self._log.debug(self.name + " apikey: " + str(setting)) continue elif key == 'url' and setting[:4] == "http": self._log.info("Setting " + self.name + " url: " + setting) self._settings[key] = setting continue else: self._log.warning("'%s' is not valid for %s: %s" % (setting, self.name, key)) def _process_post(self, databuffer): """Send data to server.""" # databuffer is of format: # [[timestamp, nodeid, datavalues][timestamp, nodeid, datavalues]] # [[1399980731, 10, 150, 250 ...]] if not 'apikey' in self._settings.keys() or str.__len__(self._settings['apikey']) != 32 \ or str.lower(self._settings['apikey']) == 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx': return data_string = json.dumps(databuffer, separators=(',', ':')) # Prepare URL string of the form # http://domain.tld/emoncms/input/bulk.json?apikey=12345 # &data=[[0,10,82,23],[5,10,82,23],[10,10,82,23]] # &time=0 # Construct post_url (without apikey) post_url = self._settings['url']+'/input/bulk'+'.json?time=0&apikey=' post_body = "data="+data_string # logged before apikey added for security self._log.info(self.name + " sending: " + post_url + "E-M-O-N-C-M-S-A-P-I-K-E-Y&" + post_body) # Add apikey to post_url post_url = post_url + self._settings['apikey'] # The Develop branch of emoncms allows for the sending of the apikey in the post # body, this should be moved from the url to the body as soon as this is widely # adopted reply = self._send_post(post_url, post_body) if reply == 'ok': self._log.debug(self.name + " acknowledged receipt with '" + reply + "' from " + self._settings['url']) return True else: self._log.warning(self.name + " send failure: wanted 'ok' but got '" +reply+ "'") """class EmonHubReporterInitError Raise this when init fails. """ class EmonHubReporterInitError(Exception): pass