agentMET4FOF utilities
Buffering for agents
This module contains the buffer classes utilized by the agents
It contains the following classes:
AgentBuffer
: Buffer class which is instantiated in every agent to store data incrementallyMetrologicalAgentBuffer
: Buffer class which is instantiated in every metrological agent to store data
- class agentMET4FOF.utils.buffer.AgentBuffer(buffer_size: int | None = 1000)[source]
Buffer class which is instantiated in every agent to store data incrementally
This buffer is necessary to handle multiple inputs coming from agents.
We can access the buffer like a dict with exposed functions such as .values(), .keys() and .items(). The actual dict object is stored in the variable
buffer
.- buffer
The buffer can be a dict of iterables, or a dict of dict of iterables for nested named data. The keys are the names of agents.
- supported_datatypes
List of all types supported and thus properly handled by the buffer. Defaults to
np.ndarray
, list and PandasDataFrame
- Type:
list of types
- _concatenate(iterable: ndarray | list | DataFrame, data: ndarray | list | DataFrame, concat_axis: int = 0) Iterable [source]
Concatenate the given
iterable
withdata
Handles the concatenation function depending on the datatype, and truncates it if the buffer is filled to buffer_size.
- Parameters:
iterable (any in supported_datatype) – The current buffer to be concatenated with.
data (np.ndarray, DataFrame, list) – New incoming data
- Returns:
the original buffer with the data appended
- Return type:
any in supported_datatype
- _iterable_filled(iterable: Sized) bool | None [source]
Internal method for checking on length of iterables of supported types
- Parameters:
iterable (Any) – Expected to be an iterable of one of the supported datatypes but could be any.
- Returns:
True if the iterable is of one of the supported datatypes and has reached
buffer_size
in length or False if not or None in case it is not of one of the supported datatypes.- Return type:
bool or None
- static _popleft(iterable: ndarray | list | DataFrame, n: int | None = 1) Tuple[ndarray | list | DataFrame, ndarray | list | DataFrame] [source]
Internal handler of the actual popping mechanism based on type of iterable
- Parameters:
n (int) – Number of elements to retrieve from buffer.
iterable (any in
supported_datatypes
) – The current buffer to retrieve from.
- Returns:
2-tuple of each either one of
np.ndarray
,list or Pandas
DataFrame
– The retrieved elements and the residual items in the buffer
- buffer_filled(agent_from: str | None = None) bool [source]
Checks whether buffer is filled, by comparing against the
buffer_size
For nested dict, this returns True if any of the iterables is beyond the
buffer_size
. For any of the dict values , which is not one ofsupported_datatypes
this returns None.- Parameters:
agent_from (str, optional) – Name of input agent in the buffer dict to be looked up. If
agent_from
is not provided, we check for all iterables in the buffer (default).- Returns:
True if either the or any of the iterables has reached
buffer_size
or None in case none of the values is of one of the supported datatypes. False if all present iterable can take at least one more element according tobuffer_size
.- Return type:
bool or None
- check_supported_datatype(obj: object) bool [source]
Checks whether value is an object of one of the supported data types
- Parameters:
obj (object) – Value to be checked
- Returns:
result – True if value is an object of one of the supported data types, False if not
- Return type:
boolean
- clear(agent_from: str | None = None)[source]
Clears the data in the buffer
- Parameters:
agent_from (str, optional) – Name of agent, if
agent_from
is not given, the entire buffer is flushed. (default)
- popleft(n: int | None = 1) Dict | ndarray | list | DataFrame [source]
Pops the first n entries in the buffer
- Parameters:
n (int) – Number of elements to retrieve from buffer
- Returns:
dict,
np.ndarray
, list or PandasDataFrame
– The retrieved elements
- store(agent_from: Dict[str, ndarray | list | DataFrame] | str, data: ndarray | list | DataFrame | float | int = None, concat_axis: int | None = 0)[source]
Stores data into
buffer
with the received messageChecks if sender agent has sent any message before. If it did, then append, otherwise create new entry for it.
- Parameters:
agent_from (dict | str) – if type is dict, we expect it to be the agentMET4FOF dict message to be compliant with older code (keys
from
anddata
present’), otherwise we expect it to be name of agent sender anddata
will need to be passed as parameterdata (np.ndarray, DataFrame, list, float or int) – Not used if
agent_from
is a dict. Otherwisedata
is compulsory.concat_axis (int, optional) – axis to concatenate on with the buffering for numpy arrays. Default is 0.
- class agentMET4FOF.utils.buffer.MetrologicalAgentBuffer(buffer_size: int = 1000)[source]
Buffer class which is instantiated in every metrological agent to store data
This buffer is necessary to handle multiple inputs coming from agents.
We can access the buffer like a dict with exposed functions such as .values(), .keys() and .items(). The actual dict object is stored in the attribute
buffer
. The list insupported_datatypes
contains one more element for metrological agents, namelyTimeSeriesBuffer
.- _concatenate(iterable: TimeSeriesBuffer, data: ndarray | list | DataFrame, concat_axis: int = 0) TimeSeriesBuffer [source]
Concatenate the given
TimeSeriesBuffer
withdata
Add
data
to theTimeSeriesBuffer
object.- Parameters:
iterable (TimeSeriesBuffer) – The current buffer to be concatenated with.
data (np.ndarray, DataFrame, list) – New incoming data
- Returns:
the original buffer with the data appended
- Return type:
TimeSeriesBuffer
- convert_single_to_tsbuffer(single_data: List | Tuple | ndarray)[source]
Convert common data in agentMET4FOF to
TimeSeriesBuffer
- Parameters:
single_data (iterable of iterables (list, tuple, np.ndarray) with shape (N, M)) –
M==2 (pairs): assumed to be like (time, value)
M==3 (triple): assumed to be like (time, value, value_unc)
M==4 (4-tuple): assumed to be like (time, time_unc, value, value_unc)
- Returns:
the new
TimeSeriesBuffer
object- Return type:
TimeSeriesBuffer
- update(agent_from: str, data: Dict | List | Tuple | ndarray) TimeSeriesBuffer [source]
Overrides data in the buffer dict keyed by agent_from with value data
- Parameters:
- Returns:
the updated
TimeSeriesBuffer
object- Return type:
TimeSeriesBuffer