Source code for agentMET4FOF.utils.buffer

"""This module contains the buffer classes utilized by the agents

It contains the following classes:

- :class:`AgentBuffer`: Buffer class which is instantiated in every agent to store data
  incrementally
- :class:`MetrologicalAgentBuffer`: Buffer class which is instantiated in every
  metrological agent to store data
"""

import copy
from typing import Dict, Iterable, List, Optional, Sized, Tuple, Union

import numpy as np
import pandas as pd
from pandas import DataFrame

__all__ = ["AgentBuffer", "MetrologicalAgentBuffer"]

from time_series_buffer import TimeSeriesBuffer


[docs] class AgentBuffer: """Buffer class which is instantiated in every agent to store data incrementally This buffer is necessary to handle multiple inputs coming from agents. We can access the buffer like a dict with exposed functions such as .values(), .keys() and .items(). The actual dict object is stored in the variable :attr:`buffer`. Attributes ---------- buffer : dict of iterables or dict of dicts of iterables The buffer can be a dict of iterables, or a dict of dict of iterables for nested named data. The keys are the names of agents. buffer_size : int The total number of elements to be stored in the agent :attr:`buffer` supported_datatypes : list of types List of all types supported and thus properly handled by the buffer. Defaults to :class:`np.ndarray <NumPy:numpy.ndarray>`, list and Pandas :class:`DataFrame <Pandas:pandas.DataFrame>` """ def __init__(self, buffer_size: Optional[int] = 1000): """Initialise a new agent buffer object Parameters ---------- buffer_size: int, optional Length of buffer allowed. """ self.buffer = {} self.buffer_size = buffer_size self.supported_datatypes = [list, pd.DataFrame, np.ndarray] def __getitem__(self, key): return self.buffer[key]
[docs] def check_supported_datatype(self, obj: object) -> bool: """Checks whether `value` is an object of one of the supported data types Parameters ---------- obj : object Value to be checked Returns ------ result : boolean True if value is an object of one of the supported data types, False if not """ for supported_datatype in self.supported_datatypes: if isinstance(obj, supported_datatype): return True return False
[docs] def update( self, agent_from: Union[Dict[str, Union[np.ndarray, list, pd.DataFrame]], str], data: Union[np.ndarray, list, pd.DataFrame, float, int] = None, ): """Overrides data in the buffer dict keyed by ``agent_from`` with value ``data`` If ``data`` is a single value, this converts it into a list first before storing in the buffer dict. Parameters ---------- agent_from : str Name of agent sender data : np.ndarray, DataFrame, list, float or int New incoming data """ # handle if data type nested in dict if isinstance(data, dict): # check for each value datatype for key, value in data.items(): # if the value is not list types, turn it into a list of single value # i.e [value] if not self.check_supported_datatype(value): data[key] = [value] elif not self.check_supported_datatype(data): data = [data] self.buffer.update({agent_from: data}) return self.buffer
[docs] def _concatenate( self, iterable: Union[np.ndarray, list, pd.DataFrame], data: Union[np.ndarray, list, DataFrame], concat_axis: int = 0, ) -> Iterable: """Concatenate the given ``iterable`` with ``data`` Handles the concatenation function depending on the datatype, and truncates it if the buffer is filled to `buffer_size`. Parameters ---------- iterable : any in supported_datatype The current buffer to be concatenated with. data : np.ndarray, DataFrame, list New incoming data Returns ------- any in supported_datatype the original buffer with the data appended """ # handle list if isinstance(iterable, list): iterable += data # check if exceed memory buffer size, remove the first n elements which # exceeded the size if len(iterable) > self.buffer_size: truncated_element_index = len(iterable) - self.buffer_size iterable = iterable[truncated_element_index:] # handle if data type is np.ndarray elif isinstance(iterable, np.ndarray): iterable = np.concatenate((iterable, data), axis=concat_axis) if len(iterable) > self.buffer_size: truncated_element_index = len(iterable) - self.buffer_size iterable = iterable[truncated_element_index:] # handle if data type is pd.DataFrame elif isinstance(iterable, pd.DataFrame): iterable = pd.concat([iterable, data], ignore_index=True, axis=concat_axis) if len(iterable) > self.buffer_size: truncated_element_index = len(iterable) - self.buffer_size iterable = iterable.truncate(before=truncated_element_index) return iterable
[docs] def buffer_filled(self, agent_from: Optional[str] = None) -> bool: """Checks whether buffer is filled, by comparing against the :attr:`buffer_size` For nested dict, this returns True if any of the iterables is beyond the :attr:`buffer_size`. For any of the dict values , which is not one of :attr:`supported_datatypes` this returns None. Parameters ---------- agent_from : str, optional Name of input agent in the buffer dict to be looked up. If ``agent_from`` is not provided, we check for all iterables in the buffer (default). Returns ------- bool or None True if either the or any of the iterables has reached :attr:`buffer_size` or None in case none of the values is of one of the supported datatypes. False if all present iterable can take at least one more element according to :attr:`buffer_size`. """ if agent_from is None: return any([self._iterable_filled(iterable) for iterable in self.values()]) elif isinstance(self[agent_from], dict): return any( [ self._iterable_filled(iterable) for iterable in self[agent_from].values() ] ) else: return self._iterable_filled(self[agent_from])
[docs] def _iterable_filled(self, iterable: Sized) -> Union[bool, None]: """Internal method for checking on length of iterables of supported types Parameters ---------- iterable : Any Expected to be an iterable of one of the supported datatypes but could be any. Returns ------- bool or None True if the iterable is of one of the supported datatypes and has reached :attr:`buffer_size` in length or False if not or None in case it is not of one of the supported datatypes. """ if self.check_supported_datatype(iterable): if len(iterable) >= self.buffer_size: return True return False
[docs] def popleft(self, n: Optional[int] = 1) -> Union[Dict, np.ndarray, list, DataFrame]: """Pops the first n entries in the buffer Parameters --------- n : int Number of elements to retrieve from buffer Returns ------- dict, :class:`np.ndarray <NumPy:numpy.ndarray>`, list or Pandas :class:`DataFrame <Pandas:pandas.DataFrame>` The retrieved elements """ popped_buffer = copy.copy(self.buffer) remaining_buffer = copy.copy(self.buffer) if isinstance(popped_buffer, dict): for key, value in popped_buffer.items(): value, remaining_buffer[key] = self._popleft(value, n) else: popped_buffer, remaining_buffer = self._popleft(popped_buffer, n) self.buffer = remaining_buffer return popped_buffer
[docs] @staticmethod def _popleft( iterable: Union[np.ndarray, list, DataFrame], n: Optional[int] = 1 ) -> Tuple[Union[np.ndarray, list, DataFrame], Union[np.ndarray, list, DataFrame]]: """Internal handler of the actual popping mechanism based on type of iterable Parameters --------- n : int Number of elements to retrieve from buffer. iterable : any in :attr:`supported_datatypes` The current buffer to retrieve from. Returns ------- 2-tuple of each either one of :class:`np.ndarray <NumPy:numpy.ndarray>`, list or Pandas :class:`DataFrame <Pandas:pandas.DataFrame>` The retrieved elements and the residual items in the buffer """ popped_item = 0 if isinstance(iterable, list): popped_item = iterable[:n] iterable = iterable[n:] elif isinstance(iterable, np.ndarray): popped_item = iterable[:n] iterable = iterable[n:] elif isinstance(iterable, pd.DataFrame): popped_item = iterable.iloc[:n] iterable = iterable.iloc[n:] return popped_item, iterable
[docs] def clear(self, agent_from: Optional[str] = None): """Clears the data in the buffer Parameters ---------- agent_from : str, optional Name of agent, if ``agent_from`` is not given, the entire buffer is flushed. (default) """ if agent_from is None: self.buffer = {} elif agent_from in self.buffer: del self.buffer[agent_from]
[docs] def store( self, agent_from: Union[Dict[str, Union[np.ndarray, list, pd.DataFrame]], str], data: Union[np.ndarray, list, pd.DataFrame, float, int] = None, concat_axis: Optional[int] = 0, ): """Stores data into :attr:`buffer` with the received message Checks if sender agent has sent any message before. If it did, then append, otherwise create new entry for it. Parameters ---------- agent_from : dict | str if type is dict, we expect it to be the agentMET4FOF dict message to be compliant with older code (keys ``from`` and ``data`` present'), otherwise we expect it to be name of agent sender and ``data`` will need to be passed as parameter data : np.ndarray, DataFrame, list, float or int Not used if ``agent_from`` is a dict. Otherwise ``data`` is compulsory. concat_axis : int, optional axis to concatenate on with the buffering for numpy arrays. Default is 0. """ # Store into a separate variables, it will be used frequently later for the # type checks. If first argument is the agentMET4FOF dict message in old format if isinstance(agent_from, dict): message_from = agent_from["from"] message_data = agent_from["data"] # ... otherwise, we expect the name of agent_sender and the data to be passed. else: message_from = agent_from message_data = data # check if sender agent has sent any message before: # if it did,then append, otherwise create new entry for the input agent if message_from not in self.buffer: self.update(message_from, message_data) return 0 # otherwise 'sender' exists in memory, handle appending # acceptable data types : list, dict, ndarray, dataframe, single values # handle nested data in dict if isinstance(message_data, dict): for key, value in message_data.items(): # if it is a single value, then we convert it into a single element list if not self.check_supported_datatype(value): value = [value] # check if the key exist # if it does, then append if key in self.buffer[agent_from].keys(): self.buffer[agent_from][key] = self._concatenate( self.buffer[agent_from][key], value, concat_axis ) # otherwise, create new entry else: self.buffer[agent_from].update({key: value}) else: if not self.check_supported_datatype(message_data): message_data = [message_data] self.buffer[agent_from] = self._concatenate( self.buffer[agent_from], message_data, concat_axis )
[docs] def values(self): """Interface to access the internal dict's values()""" return self.buffer.values()
[docs] def items(self): """Interface to access the internal dict's items()""" return self.buffer.items()
[docs] def keys(self): """Interface to access the internal dict's keys()""" return self.buffer.keys()
[docs] class MetrologicalAgentBuffer(AgentBuffer): """Buffer class which is instantiated in every metrological agent to store data This buffer is necessary to handle multiple inputs coming from agents. We can access the buffer like a dict with exposed functions such as .values(), .keys() and .items(). The actual dict object is stored in the attribute :attr:`buffer <agentMET4FOF.agents.AgentBuffer.buffer>`. The list in :attr:`supported_datatypes <agentMET4FOF.agents.AgentBuffer.supported_datatypes>` contains one more element for metrological agents, namely :class:`TimeSeriesBuffer <time-series-buffer:time_series_buffer.buffer.TimeSeriesBuffer>`. """ def __init__(self, buffer_size: int = 1000): """Initialise a new agent buffer object Parameters ---------- buffer_size: int Length of buffer allowed. """ super(MetrologicalAgentBuffer, self).__init__(buffer_size) self.supported_datatypes.append(TimeSeriesBuffer)
[docs] def convert_single_to_tsbuffer(self, single_data: Union[List, Tuple, np.ndarray]): """Convert common data in agentMET4FOF to :class:`TimeSeriesBuffer <time-series-buffer:time_series_buffer.buffer.TimeSeriesBuffer>` Parameters ---------- single_data : iterable of iterables (list, tuple, np.ndarray) with shape (N, M) * M==2 (pairs): assumed to be like (time, value) * M==3 (triple): assumed to be like (time, value, value_unc) * M==4 (4-tuple): assumed to be like (time, time_unc, value, value_unc) Returns ------- TimeSeriesBuffer the new :class:`TimeSeriesBuffer <time-series-buffer:time_series_buffer.buffer.TimeSeriesBuffer>` object """ ts = TimeSeriesBuffer(maxlen=self.buffer_size) ts.add(single_data) return ts
[docs] def update( self, agent_from: str, data: Union[Dict, List, Tuple, np.ndarray], ) -> TimeSeriesBuffer: """Overrides data in the buffer dict keyed by `agent_from` with value `data` Parameters ---------- agent_from : str Name of agent sender data : dict or iterable of iterables (list, tuple, np.ndarray) with shape (N, M the data to be stored in the metrological buffer Returns ------- TimeSeriesBuffer the updated :class:`TimeSeriesBuffer <time-series-buffer:time_series_buffer.buffer.TimeSeriesBuffer>` object """ # handle if data type nested in dict if isinstance(data, dict): # check for each value datatype for key, value in data.items(): data[key] = self.convert_single_to_tsbuffer(value) else: data = self.convert_single_to_tsbuffer(data) self.buffer.update({agent_from: data}) return self.buffer
[docs] def _concatenate( self, iterable: TimeSeriesBuffer, data: Union[np.ndarray, list, pd.DataFrame], concat_axis: int = 0, ) -> TimeSeriesBuffer: """Concatenate the given ``TimeSeriesBuffer`` with ``data`` Add ``data`` to the :class:`TimeSeriesBuffer <time-series-buffer:time_series_buffer.buffer.TimeSeriesBuffer>` object. Parameters ---------- iterable : TimeSeriesBuffer The current buffer to be concatenated with. data : np.ndarray, DataFrame, list New incoming data Returns ------- TimeSeriesBuffer the original buffer with the data appended """ iterable.add(data) return iterable