agentMET4FOF utilities¶
Buffering for agents¶
This module contains the buffer classes utilized by the agents
It contains the following classes:
AgentBuffer
: Buffer class which is instantiated in every agent to store data incrementallyMetrologicalAgentBuffer
: Buffer class which is instantiated in every metrological agent to store data
-
class
agentMET4FOF.utils.buffer.
AgentBuffer
(buffer_size: Optional[int] = 1000)[source]¶ Buffer class which is instantiated in every agent to store data incrementally
This buffer is necessary to handle multiple inputs coming from agents.
We can access the buffer like a dict with exposed functions such as .values(), .keys() and .items(). The actual dict object is stored in the variable
buffer
.-
buffer
¶ The buffer can be a dict of iterables, or a dict of dict of iterables for nested named data. The keys are the names of agents.
Type: dict of iterables or dict of dicts of iterables
-
supported_datatypes
¶ List of all types supported and thus properly handled by the buffer. Defaults to
np.ndarray
, list and PandasDataFrame
Type: list of types
-
_concatenate
(iterable: Union[numpy.ndarray, list, pandas.core.frame.DataFrame], data: Union[numpy.ndarray, list, pandas.core.frame.DataFrame], concat_axis: int = 0) → Iterable[T_co][source]¶ Concatenate the given
iterable
withdata
Handles the concatenation function depending on the datatype, and truncates it if the buffer is filled to buffer_size.
Parameters: - iterable (any in supported_datatype) – The current buffer to be concatenated with.
- data (np.ndarray, DataFrame, list) – New incoming data
Returns: the original buffer with the data appended
Return type: any in supported_datatype
-
_iterable_filled
(iterable: Sized) → Optional[bool][source]¶ Internal method for checking on length of iterables of supported types
Parameters: iterable (Any) – Expected to be an iterable of one of the supported datatypes but could be any. Returns: True if the iterable is of one of the supported datatypes and has reached buffer_size
in length or False if not or None in case it is not of one of the supported datatypes.Return type: bool or None
-
static
_popleft
(iterable: Union[numpy.ndarray, list, pandas.core.frame.DataFrame], n: Optional[int] = 1) → Tuple[Union[numpy.ndarray, list, pandas.core.frame.DataFrame], Union[numpy.ndarray, list, pandas.core.frame.DataFrame]][source]¶ Internal handler of the actual popping mechanism based on type of iterable
Parameters: - n (int) – Number of elements to retrieve from buffer.
- iterable (any in
supported_datatypes
) – The current buffer to retrieve from.
Returns: - 2-tuple of each either one of
np.ndarray
, - list or Pandas
DataFrame
– The retrieved elements and the residual items in the buffer
-
buffer_filled
(agent_from: Optional[str] = None) → bool[source]¶ Checks whether buffer is filled, by comparing against the
buffer_size
For nested dict, this returns True if any of the iterables is beyond the
buffer_size
. For any of the dict values , which is not one ofsupported_datatypes
this returns None.Parameters: agent_from (str, optional) – Name of input agent in the buffer dict to be looked up. If agent_from
is not provided, we check for all iterables in the buffer (default).Returns: True if either the or any of the iterables has reached buffer_size
or None in case none of the values is of one of the supported datatypes. False if all present iterable can take at least one more element according tobuffer_size
.Return type: bool or None
-
check_supported_datatype
(obj: object) → bool[source]¶ Checks whether value is an object of one of the supported data types
Parameters: obj (object) – Value to be checked Returns: result – True if value is an object of one of the supported data types, False if not Return type: boolean
-
clear
(agent_from: Optional[str] = None)[source]¶ Clears the data in the buffer
Parameters: agent_from (str, optional) – Name of agent, if agent_from
is not given, the entire buffer is flushed. (default)
-
popleft
(n: Optional[int] = 1) → Union[Dict[KT, VT], numpy.ndarray, list, pandas.core.frame.DataFrame][source]¶ Pops the first n entries in the buffer
Parameters: n (int) – Number of elements to retrieve from buffer Returns: - dict,
np.ndarray
, list or Pandas DataFrame
– The retrieved elements
- dict,
-
store
(agent_from: Union[Dict[str, Union[numpy.ndarray, list, pandas.core.frame.DataFrame]], str], data: Union[numpy.ndarray, list, pandas.core.frame.DataFrame, float, int] = None, concat_axis: Optional[int] = 0)[source]¶ Stores data into
buffer
with the received messageChecks if sender agent has sent any message before. If it did, then append, otherwise create new entry for it.
Parameters: - agent_from (dict | str) – if type is dict, we expect it to be the agentMET4FOF dict message to be
compliant with older code (keys
from
anddata
present’), otherwise we expect it to be name of agent sender anddata
will need to be passed as parameter - data (np.ndarray, DataFrame, list, float or int) – Not used if
agent_from
is a dict. Otherwisedata
is compulsory. - concat_axis (int, optional) – axis to concatenate on with the buffering for numpy arrays. Default is 0.
- agent_from (dict | str) – if type is dict, we expect it to be the agentMET4FOF dict message to be
compliant with older code (keys
-
update
(agent_from: Union[Dict[str, Union[numpy.ndarray, list, pandas.core.frame.DataFrame]], str], data: Union[numpy.ndarray, list, pandas.core.frame.DataFrame, float, int] = None)[source]¶ Overrides data in the buffer dict keyed by
agent_from
with valuedata
If
data
is a single value, this converts it into a list first before storing in the buffer dict.Parameters:
-
-
class
agentMET4FOF.utils.buffer.
MetrologicalAgentBuffer
(buffer_size: int = 1000)[source]¶ Buffer class which is instantiated in every metrological agent to store data
This buffer is necessary to handle multiple inputs coming from agents.
We can access the buffer like a dict with exposed functions such as .values(), .keys() and .items(). The actual dict object is stored in the attribute
buffer
. The list insupported_datatypes
contains one more element for metrological agents, namelyTimeSeriesBuffer
.-
_concatenate
(iterable: time_series_buffer.buffer.TimeSeriesBuffer, data: Union[numpy.ndarray, list, pandas.core.frame.DataFrame], concat_axis: int = 0) → time_series_buffer.buffer.TimeSeriesBuffer[source]¶ Concatenate the given
TimeSeriesBuffer
withdata
Add
data
to theTimeSeriesBuffer
object.Parameters: - iterable (TimeSeriesBuffer) – The current buffer to be concatenated with.
- data (np.ndarray, DataFrame, list) – New incoming data
Returns: the original buffer with the data appended
Return type: TimeSeriesBuffer
-
convert_single_to_tsbuffer
(single_data: Union[List[T], Tuple, numpy.ndarray])[source]¶ Convert common data in agentMET4FOF to
TimeSeriesBuffer
Parameters: single_data (iterable of iterables (list, tuple, np.ndarray) with shape (N, M)) – - M==2 (pairs): assumed to be like (time, value)
- M==3 (triple): assumed to be like (time, value, value_unc)
- M==4 (4-tuple): assumed to be like (time, time_unc, value, value_unc)
Returns: the new TimeSeriesBuffer
objectReturn type: TimeSeriesBuffer
-
update
(agent_from: str, data: Union[Dict[KT, VT], List[T], Tuple, numpy.ndarray]) → time_series_buffer.buffer.TimeSeriesBuffer[source]¶ Overrides data in the buffer dict keyed by agent_from with value data
Parameters: Returns: the updated
TimeSeriesBuffer
objectReturn type: TimeSeriesBuffer
-