agentMET4FOF utilities

Buffering for agents

This module contains the buffer classes utilized by the agents

It contains the following classes:

  • AgentBuffer: Buffer class which is instantiated in every agent to store data incrementally
  • MetrologicalAgentBuffer: Buffer class which is instantiated in every metrological agent to store data
class agentMET4FOF.utils.buffer.AgentBuffer(buffer_size: Optional[int] = 1000)[source]

Buffer class which is instantiated in every agent to store data incrementally

This buffer is necessary to handle multiple inputs coming from agents.

We can access the buffer like a dict with exposed functions such as .values(), .keys() and .items(). The actual dict object is stored in the variable buffer.

buffer

The buffer can be a dict of iterables, or a dict of dict of iterables for nested named data. The keys are the names of agents.

Type:dict of iterables or dict of dicts of iterables
buffer_size

The total number of elements to be stored in the agent buffer

Type:int
supported_datatypes

List of all types supported and thus properly handled by the buffer. Defaults to np.ndarray, list and Pandas DataFrame

Type:list of types
_concatenate(iterable: Union[numpy.ndarray, list, pandas.core.frame.DataFrame], data: Union[numpy.ndarray, list, pandas.core.frame.DataFrame], concat_axis: int = 0) → Iterable[T_co][source]

Concatenate the given iterable with data

Handles the concatenation function depending on the datatype, and truncates it if the buffer is filled to buffer_size.

Parameters:
  • iterable (any in supported_datatype) – The current buffer to be concatenated with.
  • data (np.ndarray, DataFrame, list) – New incoming data
Returns:

the original buffer with the data appended

Return type:

any in supported_datatype

_iterable_filled(iterable: Sized) → Optional[bool][source]

Internal method for checking on length of iterables of supported types

Parameters:iterable (Any) – Expected to be an iterable of one of the supported datatypes but could be any.
Returns:True if the iterable is of one of the supported datatypes and has reached buffer_size in length or False if not or None in case it is not of one of the supported datatypes.
Return type:bool or None
static _popleft(iterable: Union[numpy.ndarray, list, pandas.core.frame.DataFrame], n: Optional[int] = 1) → Tuple[Union[numpy.ndarray, list, pandas.core.frame.DataFrame], Union[numpy.ndarray, list, pandas.core.frame.DataFrame]][source]

Internal handler of the actual popping mechanism based on type of iterable

Parameters:
  • n (int) – Number of elements to retrieve from buffer.
  • iterable (any in supported_datatypes) – The current buffer to retrieve from.
Returns:

  • 2-tuple of each either one of np.ndarray,
  • list or Pandas DataFrame – The retrieved elements and the residual items in the buffer

buffer_filled(agent_from: Optional[str] = None) → bool[source]

Checks whether buffer is filled, by comparing against the buffer_size

For nested dict, this returns True if any of the iterables is beyond the buffer_size. For any of the dict values , which is not one of supported_datatypes this returns None.

Parameters:agent_from (str, optional) – Name of input agent in the buffer dict to be looked up. If agent_from is not provided, we check for all iterables in the buffer (default).
Returns:True if either the or any of the iterables has reached buffer_size or None in case none of the values is of one of the supported datatypes. False if all present iterable can take at least one more element according to buffer_size.
Return type:bool or None
check_supported_datatype(obj: object) → bool[source]

Checks whether value is an object of one of the supported data types

Parameters:obj (object) – Value to be checked
Returns:result – True if value is an object of one of the supported data types, False if not
Return type:boolean
clear(agent_from: Optional[str] = None)[source]

Clears the data in the buffer

Parameters:agent_from (str, optional) – Name of agent, if agent_from is not given, the entire buffer is flushed. (default)
items()[source]

Interface to access the internal dict’s items()

keys()[source]

Interface to access the internal dict’s keys()

popleft(n: Optional[int] = 1) → Union[Dict[KT, VT], numpy.ndarray, list, pandas.core.frame.DataFrame][source]

Pops the first n entries in the buffer

Parameters:n (int) – Number of elements to retrieve from buffer
Returns:
  • dict, np.ndarray, list or Pandas
  • DataFrame – The retrieved elements
store(agent_from: Union[Dict[str, Union[numpy.ndarray, list, pandas.core.frame.DataFrame]], str], data: Union[numpy.ndarray, list, pandas.core.frame.DataFrame, float, int] = None, concat_axis: Optional[int] = 0)[source]

Stores data into buffer with the received message

Checks if sender agent has sent any message before. If it did, then append, otherwise create new entry for it.

Parameters:
  • agent_from (dict | str) – if type is dict, we expect it to be the agentMET4FOF dict message to be compliant with older code (keys from and data present’), otherwise we expect it to be name of agent sender and data will need to be passed as parameter
  • data (np.ndarray, DataFrame, list, float or int) – Not used if agent_from is a dict. Otherwise data is compulsory.
  • concat_axis (int, optional) – axis to concatenate on with the buffering for numpy arrays. Default is 0.
update(agent_from: Union[Dict[str, Union[numpy.ndarray, list, pandas.core.frame.DataFrame]], str], data: Union[numpy.ndarray, list, pandas.core.frame.DataFrame, float, int] = None)[source]

Overrides data in the buffer dict keyed by agent_from with value data

If data is a single value, this converts it into a list first before storing in the buffer dict.

Parameters:
  • agent_from (str) – Name of agent sender
  • data (np.ndarray, DataFrame, list, float or int) – New incoming data
values()[source]

Interface to access the internal dict’s values()

class agentMET4FOF.utils.buffer.MetrologicalAgentBuffer(buffer_size: int = 1000)[source]

Buffer class which is instantiated in every metrological agent to store data

This buffer is necessary to handle multiple inputs coming from agents.

We can access the buffer like a dict with exposed functions such as .values(), .keys() and .items(). The actual dict object is stored in the attribute buffer. The list in supported_datatypes contains one more element for metrological agents, namely TimeSeriesBuffer.

_concatenate(iterable: time_series_buffer.buffer.TimeSeriesBuffer, data: Union[numpy.ndarray, list, pandas.core.frame.DataFrame], concat_axis: int = 0) → time_series_buffer.buffer.TimeSeriesBuffer[source]

Concatenate the given TimeSeriesBuffer with data

Add data to the TimeSeriesBuffer object.

Parameters:
  • iterable (TimeSeriesBuffer) – The current buffer to be concatenated with.
  • data (np.ndarray, DataFrame, list) – New incoming data
Returns:

the original buffer with the data appended

Return type:

TimeSeriesBuffer

convert_single_to_tsbuffer(single_data: Union[List[T], Tuple, numpy.ndarray])[source]

Convert common data in agentMET4FOF to TimeSeriesBuffer

Parameters:single_data (iterable of iterables (list, tuple, np.ndarray) with shape (N, M)) –
  • M==2 (pairs): assumed to be like (time, value)
  • M==3 (triple): assumed to be like (time, value, value_unc)
  • M==4 (4-tuple): assumed to be like (time, time_unc, value, value_unc)
Returns:the new TimeSeriesBuffer object
Return type:TimeSeriesBuffer
update(agent_from: str, data: Union[Dict[KT, VT], List[T], Tuple, numpy.ndarray]) → time_series_buffer.buffer.TimeSeriesBuffer[source]

Overrides data in the buffer dict keyed by agent_from with value data

Parameters:
  • agent_from (str) – Name of agent sender
  • data (dict or iterable of iterables (list, tuple, np.ndarray) with shape (N, M) – the data to be stored in the metrological buffer
Returns:

the updated TimeSeriesBuffer object

Return type:

TimeSeriesBuffer