Source code for agentMET4FOF.agents.base_agents

import base64
import datetime
import time
import warnings
from collections import deque
from io import BytesIO
from typing import Any, Callable, Dict, List, Optional, Union

import matplotlib.figure
import matplotlib.pyplot as plt
import mpld3
import numpy as np
from mesa import Agent as MesaAgent, Model as MesaModel
from osbrain import Agent as osBrainAgent
from plotly import tools as tls
from plotly.graph_objs import Scatter

from ..streams.base_streams import DataStreamMET4FOF
from ..utils.buffer import AgentBuffer

__all__ = [
    "AgentMET4FOF",
    "DataStreamAgent",
    "MonitorAgent",
]

from ..utils import Backend


[docs] class AgentMET4FOF(MesaAgent, osBrainAgent): """ Base class for all agents with specific functions to be overridden/supplied by user. Behavioral functions for users to provide are init_parameters, agent_loop and on_received_message. Communicative functions are bind_output, unbind_output and send_output. To learn more about the idea behind multi-agent systems (MAS) see [Bang2019]_. """ def __init__( self, name="", host=None, serializer=None, transport=None, attributes=None, backend=Backend.OSBRAIN, mesa_model=MesaModel, ): self.backend = self.validate_backend(backend) if self.backend == Backend.OSBRAIN: self._remove_methods(MesaAgent) osBrainAgent.__init__( self, name=name, host=host, serializer=serializer, transport=transport, attributes=attributes, ) elif self.backend == Backend.MESA: MesaAgent.__init__(self, name, mesa_model) self._remove_methods(osBrainAgent) self.init_mesa(name) self.unique_id = name self.name = name self.mesa_model = mesa_model @staticmethod def validate_backend(backend: Union[str, Backend]) -> Backend: if isinstance(backend, str): if backend.lower() == "osbrain": actual_backend = Backend.OSBRAIN elif backend.lower() == "mesa": actual_backend = Backend.MESA else: raise AgentMET4FOF.raise_not_implemented_backend() warnings.warn( f"The backend was specified using the string '{backend}'" f"but should be specified as one of {tuple(Backend)}. These " f"constants can and should be imported using " f"'from agentMET4FOF.utils import Backend'. The " f"string-based initialization might be removed any time. Please switch " f"instantly to {actual_backend}.", DeprecationWarning, ) return actual_backend if backend not in Backend: raise AgentMET4FOF.raise_not_implemented_backend() return backend @staticmethod def raise_not_implemented_backend(): return NotImplementedError( f"Backend has not been implemented. Valid choices are {tuple(Backend)}." ) def init_mesa(self, name): # MESA Specific parameters self.mesa_message_queue = deque([]) self.unique_id = name self.name = name
[docs] def step(self): """ Used for MESA backend only. Behaviour on every update step. """ # check if there's message in queue while len(self.mesa_message_queue) > 0: self.handle_process_data(self.mesa_message_queue.popleft()) # proceed with user-defined agent-loop self.agent_loop()
[docs] def _remove_methods(self, cls): """Remove methods from the other backends base class from the current agent""" for name in list(vars(cls)): if not name.startswith("__"): try: delattr(self, name) except AttributeError: # This situation only occurs when we start and stop agent # networks of differing backends in one sequence. Normally # ignoring these errors should be no problem. pass
[docs] def set_attr(self, **kwargs): for key, val in kwargs.items(): return setattr(self, key, val)
[docs] def get_attr(self, attr): return getattr(self, attr)
[docs] def init_agent(self, buffer_size=1000, log_mode=True): """Internal initialization to set up the agent This mainly sets the dictionaries of inputs, outputs and the public address. Attributes ---------- Inputs : dict Dictionary of Agents connected to its input channels. Messages will arrive from agents in this dictionary. Automatically updated when `bind_output()` function is called Outputs : dict Dictionary of Agents connected to its output channels. Messages will be sent to agents in this dictionary. Automatically updated when `bind_output()` function is called PubAddr_alias : str Name of Publish address socket PubAddr : str Publish address socket handle AgentType : str Name of class current_state : str Current state of agent. Can be used to define different states of operation such as "Running", "Idle, "Stop", etc.. Users will need to define their own flow of handling each type of `self.current_state` in the `agent_loop` loop_wait : int The interval to wait between loop. Call `init_agent_loop` to restart the timer or set the value of loop_wait in `init_parameters` when necessary. buffer_size : int The total number of elements to be stored in the agent :attr:`buffer` When total elements exceeds this number, the latest elements will be replaced with the incoming data elements """ self.Inputs = {} self.Outputs = {} self.Outputs_agent_channels = {} # keep track of agent subscription channels self.AgentType = type(self).__name__ self.log_mode = log_mode self.log_info("INITIALIZED") # These are the available states to change the agents' behavior in # agent_loop. self.states = {0: "Idle", 1: "Running", 2: "Pause", 3: "Stop", 4: "Reset"} self.current_state = self.states[0] self.loop_wait = None if not hasattr(self, "stylesheet"): self.stylesheet = "" self.output_channels_info = {} self.buffer_size = buffer_size self.buffer = self.init_buffer(self.buffer_size) if self.backend == Backend.OSBRAIN: self.PubAddr_alias = self.name + "_PUB" self.PubAddr = self.bind("PUB", alias=self.PubAddr_alias, transport="tcp")
[docs] def init_buffer(self, buffer_size): """ A method to initialise the buffer. By overriding this method, user can provide a custom buffer, instead of the regular AgentBuffer. This can be used, for example, to provide a MetrologicalAgentBuffer in the metrological agents. """ buffer = AgentBuffer(buffer_size) return buffer
[docs] def reset(self): """Reset the agent's states and parameters User can override this method to reset the specific parameters. """ self.log_info("RESET AGENT STATE") self.buffer.clear()
[docs] def init_parameters(self, **kwargs): """ User provided function to initialize parameters of choice. """ return 0
def log_ML(self, message): self.send("_logger", message, topic="ML_EXP")
[docs] def log_info(self, message): """ Prints logs to be saved into logfile with Logger Agent Parameters ---------- message : str Message to be logged to the internal Logger Agent """ try: if self.log_mode: if self.backend == Backend.OSBRAIN: super().log_info(message) elif self.backend == Backend.MESA: message = "[%s] (%s): %s" % ( datetime.datetime.utcnow(), self.name, message, ) print(message) except Exception as e: print(e) return 1
[docs] def init_agent_loop(self, loop_wait: Optional[float] = None): """Initiates the agent loop, which iterates every `loop_wait` seconds Stops every timers and initiate a new loop. Parameters ---------- loop_wait : float, optional The wait between each iteration of the loop, defaults to 1.0 """ if loop_wait is None: if self.loop_wait is None: self.loop_wait = 1.0 else: self.loop_wait = loop_wait if self.backend == Backend.OSBRAIN: self.stop_all_timers() # check if agent_loop is overridden by user if self.__class__.agent_loop == AgentMET4FOF.agent_loop: return 0 if self.backend == Backend.OSBRAIN: self.each(self.loop_wait, self.__class__.agent_loop) return 0
[docs] def stop_agent_loop(self): """Stops agent_loop from running Note that the agent will still be responding to messages. """ self.stop_all_timers()
[docs] def agent_loop(self): """ User defined method for the agent to execute for `loop_wait` seconds specified either in `self.loop_wait` or explicitly via `init_agent_loop( loop_wait)` To start a new loop, call `init_agent_loop(loop_wait)` on the agent. Example of usage is to check the `current_state` of the agent and send data periodically. """ return 0
[docs] def on_received_message(self, message): """ User-defined method and is triggered to handle the message passed by Input. Parameters ---------- message : Dictionary The message received is in form {'from':agent_name, 'data': data, 'senderType': agent_class, 'channel':channel_name}. agent_name is the name of the Input agent which sent the message data is the actual content of the message. """ return message
[docs] def buffer_filled(self, agent_name=None): """ Checks whether the internal buffer has been filled to the maximum allowed specified by self.buffer_size Parameters ---------- agent_name : str Index of the buffer which is the name of input agent. Returns ------- status of buffer filled : boolean """ return self.buffer.buffer_filled(agent_name)
[docs] def buffer_clear(self, agent_name: Optional[str] = None): """ Empties buffer which is a dict indexed by the `agent_name`. Parameters ---------- agent_name : str, optional Key of the memory dict, which can be the name of input agent, or self.name. If not supplied (default), we assume to clear the entire memory. """ self.buffer.clear(agent_name)
[docs] def buffer_store(self, agent_from: str, data=None, concat_axis=0): """ Updates data stored in ``self.buffer`` with the received message Checks if sender agent has sent any message before If it did,then append, otherwise create new entry for it Parameters ---------- agent_from : str Name of agent sender data Any supported data which can be stored in dict as buffer. See :class:`AgentBuffer` for details for more information. concat_axis : int, optional axis to concatenate on with the buffering for numpy arrays. Default is 0. """ self.buffer.store(agent_from=agent_from, data=data, concat_axis=concat_axis) self.log_info("Buffer: " + str(self.buffer.buffer))
[docs] def pack_data(self, data, channel="default"): """ Internal method to pack the data content into a dictionary before sending out. Special case : if the `data` is already a `message`, then the `from` and `senderType` will be altered to this agent, without altering the `data` and `channel` within the message this is used for more succinct data processing and passing. Parameters ---------- data : argument Data content to be packed before sending out to agents. channel : str Key of dictionary which stores data Returns ------- Packed message data : dict of the form {'from':agent_name, 'data': data, 'senderType': agent_class, 'channel':channel_name}. """ # if is a message type, override the `from` and `senderType` fields only if self._is_type_message(data): new_data = data new_data["from"] = self.name new_data["senderType"] = type(self).__name__ return new_data return { "from": self.name, "data": data, "senderType": type(self).__name__, "channel": channel, }
[docs] @staticmethod def _is_type_message(data: Any) -> bool: """Internal method to check if the data carries signature of an agent message Parameters ---------- data Data to be checked for type Returns ------- result : boolean """ if type(data) == dict: dict_keys = data.keys() if ( "from" in dict_keys and "data" in dict_keys and "senderType" in dict_keys ): return True return False
[docs] def send_output(self, data, channel="default"): """ Sends message data to all connected agents in self.Outputs. Output connection can first be formed by calling bind_output. By default calls pack_data(data) before sending out. Can specify specific channel as opposed to 'default' channel. Parameters ---------- data : argument Data content to be sent out channel : str Key of `message` dictionary which stores data Returns ------- message : dict {'from':agent_name, 'data': data, 'senderType': agent_class, 'channel':channel_name}. """ start_time_pack = time.time() packed_data = self.pack_data(data, channel=channel) if self.backend == Backend.OSBRAIN: self.send(self.PubAddr, packed_data, topic=channel) elif self.backend == Backend.MESA: for key, value in self.Outputs.items(): # if output agent has subscribed to a list of channels, # we check whether `channel` is subscribed in that list # if it is, then we append to that agent's message queue if isinstance(self.Outputs_agent_channels[key], list): if channel in self.Outputs_agent_channels[key]: value.mesa_message_queue.append(packed_data) elif channel == self.Outputs_agent_channels[key]: value.mesa_message_queue.append(packed_data) duration_time_pack = round(time.time() - start_time_pack, 6) # LOGGING try: if self.log_mode: self.log_info("Pack time: " + str(duration_time_pack)) self.log_info("Sending: " + str(data)) except Exception as e: print(e) # Add info of channel self._update_output_channels_info(packed_data["data"], packed_data["channel"]) return packed_data
[docs] def _update_output_channels_info(self, data, channel): """ Internal method to update the dict of output_channels_info. This is used in conjunction with send_output(). Checks and records data type & dimension and channel name If the data is nested within dict, then it will search deeper and subsequently record the info of each inner hierarchy Parameters ---------- data data to be checked for type & dimension channel : str name of channel to be recorded """ if channel not in self.output_channels_info.keys(): if type(data) == dict: nested_metadata = { key: ( { nested_dict_key: self._get_metadata(nested_dict_val) for nested_dict_key, nested_dict_val in data[key].items() } if isinstance(data[key], dict) else self._get_metadata(data[key]) ) for key in data.keys() } self.output_channels_info.update({channel: nested_metadata}) else: self.output_channels_info.update({channel: self._get_metadata(data)})
[docs] def _get_metadata(self, data): """ Internal helper function for getting the data type & dimensions of data. This is for update_output_channels_info() """ data_info = {} if type(data) == np.ndarray or type(data).__name__ == "DataFrame": data_info.update({"type": type(data).__name__, "shape": data.shape}) elif type(data) == list: data_info.update({"type": type(data).__name__, "len": len(data)}) else: data_info.update({"type": type(data).__name__}) return data_info
[docs] def handle_process_data(self, message): """Internal method to handle incoming message before calling on_received_message If current_state is either Stop or Reset, it will terminate early before entering on_received_message. """ if self.current_state == "Stop" or self.current_state == "Reset": return 0 # LOGGING try: self.log_info("Received: " + str(message)) except Exception as e: print(e) # process the received data here start_time_pack = time.time() if message["channel"] == "request-attr": self.respond_request_attr_(message["data"]) if message["channel"] == "request-method": self.respond_request_method_(message["data"]) elif ( message["channel"] == "reply-attr" or message["channel"] == "set-attr" ) and message["data"] != "NULL": self.respond_reply_attr_(message["data"]) else: self.on_received_message(message) end_time_pack = time.time() self.log_info("Tproc: " + str(round(end_time_pack - start_time_pack, 6)))
[docs] def send_request_attribute(self, attribute: str): """ Send a `request` of `attribute` to output agents. Output agents will reply with the requested `attribute` if they have. """ self.send_output(data=attribute, channel="request-attr")
[docs] def send_request_method(self, method: str, **method_params): """ Send a `request` of executing methods to output agents. Output agents will respond by calling the method. """ message = {"name": method} message.update(method_params) self.send_output(data=message, channel="request-method")
[docs] def send_set_attr(self, attr: str, value): """ Sends a message to set the `attr` of another agent to that of `value`. Parameters ---------- attr : str The variable name of the output agent to be set. value The value of the variable to be set """ self.send_output(data={attr: value}, channel="set-attr")
[docs] def respond_reply_attr_(self, message_data): """ Response to a `reply` of setting attribute """ if isinstance(message_data, str) and message_data == "NULL": return 0 else: key = next(iter(message_data)) setattr(self, key, message_data[key])
[docs] def respond_request_attr_(self, attribute: str): """ Response to a `request` of `attribute` from input agents. This agent reply with the requested `attribute` if it has it. """ if hasattr(self, attribute): self.send_output( data={attribute: self.get_attr(attribute)}, channel="reply-attr" ) else: self.log_info("'" + attribute + "' not available for reply.") self.send_output(data="NULL", channel="reply-attr")
[docs] def respond_request_method_(self, message_data: dict): """ Response to a `request` of executing `method` from input agents. This agent will execute the method with the provided parameters of the method. """ method_name = message_data["name"] data_params = {key: val for key, val in message_data.items() if key != "name"} if hasattr(self, method_name): self.get_attr(method_name)(**data_params)
[docs] def on_connect_output(self, output_agent): """This method is called whenever an agent is connected to its output This can be for example, to send `metadata` or `ping` to the output agent. """ return NotImplemented
[docs] def bind_output(self, output_agent, channel="default"): """Forms Output connection with another agent Any call on send_output will reach this newly binded agent. Adds the agent to its list of Outputs. Parameters ---------- output_agent : AgentMET4FOF or list Agent(s) to be binded to this agent's output channel channel : str or list of str Specific name of the channel(s) to be subscribed to. (Default = "data") """ if isinstance(output_agent, list): for agent in output_agent: self._bind_output(output_agent=agent, channel=channel) else: self._bind_output(output_agent=output_agent, channel=channel)
[docs] def _bind_output(self, output_agent, channel="default"): """ Internal method which implements the logic for connecting this agent, to the `output_agent`. """ if type(output_agent) == str: output_agent_id = output_agent else: output_agent_id = output_agent.get_attr("name") # if output_agent_id not in self.Outputs and output_agent_id != self.name: if output_agent_id not in self.Outputs and output_agent_id != self.name: # update self.Outputs list and Inputs list of output_module self.Outputs.update({output_agent.get_attr("name"): output_agent}) temp_updated_inputs = output_agent.get_attr("Inputs") temp_updated_inputs.update({self.name: self}) output_agent.set_attr(Inputs=temp_updated_inputs) # connect socket for osbrain if self.backend == Backend.OSBRAIN: self.Outputs_agent_channels.update( {output_agent.get_attr("name"): channel} ) # bind to the address if output_agent.has_socket(self.PubAddr_alias): if isinstance(channel, list): output_agent.connect( self.PubAddr, alias=self.PubAddr_alias, handler={ channel_name: AgentMET4FOF.handle_process_data for channel_name in channel }, ) else: output_agent.subscribe( self.PubAddr_alias, handler={channel: AgentMET4FOF.handle_process_data}, ) else: if isinstance(channel, list): output_agent.connect( self.PubAddr, alias=self.PubAddr_alias, handler={ channel_name: AgentMET4FOF.handle_process_data for channel_name in channel }, ) else: output_agent.connect( self.PubAddr, alias=self.PubAddr_alias, handler={channel: AgentMET4FOF.handle_process_data}, ) # update channels subscription information for mesa else: self.Outputs_agent_channels.update( {output_agent.get_attr("name"): channel} ) # calls on connect output method self.on_connect_output(output_agent) # LOGGING if self.log_mode: self.log_info("Connected output module: " + output_agent_id)
[docs] def unbind_output(self, output_agent): """Remove existing output connection with another agent This reverses the bind_output method. Parameters ---------- output_agent : AgentMET4FOF Agent binded to this agent's output channel """ if type(output_agent) == str: output_agent_id = output_agent else: output_agent_id = output_agent.get_attr("name") if output_agent_id in self.Outputs and output_agent_id != self.name: self.Outputs.pop(output_agent_id, None) new_inputs = output_agent.get_attr("Inputs") new_inputs.pop(self.name, None) output_agent.set_attr(Inputs=new_inputs) if self.backend == Backend.OSBRAIN: output_agent.unsubscribe( self.PubAddr_alias, topic=self.Outputs_agent_channels[output_agent_id], ) self.Outputs_agent_channels.pop(output_agent_id, None) # LOGGING if self.log_mode: self.log_info("Disconnected output module: " + output_agent_id)
[docs] def _convert_to_plotly(self, matplotlib_fig: matplotlib.figure.Figure): """ Internal method to convert matplotlib figure to plotly figure Parameters ---------- matplotlib_fig: plt.Figure Matplotlib figure to be converted """ # convert to plotly format matplotlib_fig.tight_layout() plotly_fig = tls.mpl_to_plotly(matplotlib_fig) plotly_fig["layout"]["showlegend"] = True return plotly_fig
[docs] def _fig_to_uri(self, matplotlib_fig: matplotlib.figure.Figure): """ Internal method to convert matplotlib figure to base64 uri image for display Parameters ---------- matplotlib_fig : plt.Figure Matplotlib figure to be converted """ out_img = BytesIO() matplotlib_fig.savefig(out_img, format="png") matplotlib_fig.clf() plt.close(matplotlib_fig) out_img.seek(0) # rewind file encoded = base64.b64encode(out_img.read()).decode("ascii").replace("\n", "") return "data:image/png;base64,{}".format(encoded)
[docs] def _convert_matplotlib_fig( self, fig: matplotlib.figure.Figure, mode: str = "image" ): """Convert matplotlib figure to be rendered by the dashboard""" error_msg = "Conversion mode " + mode + " is not implemented." if mode == "plotly": fig = self._convert_to_plotly(fig) elif mode == "image": fig = self._fig_to_uri(fig) elif mode == "mpld3": fig = mpld3.fig_to_dict(fig) else: raise NotImplementedError(error_msg) return fig
[docs] def send_plot( self, fig: Union[matplotlib.figure.Figure, Dict[str, matplotlib.figure.Figure]], mode: str = "image", ): """ Sends plot to agents connected to this agent's Output channel. This method is different from send_output which will be sent to through the 'plot' channel to be handled. Tradeoffs between "image" and "plotly" modes are that "image" are more stable and "plotly" are interactive. Note not all (complicated) matplotlib figures can be converted into a plotly figure. Parameters ---------- fig : matplotlib.figure.Figure or dict of matplotlib.figure.Figure Alternatively, multiple figures can be nested in a dict (with any preferred keys) e.g {"Temperature":matplotlib.Figure, "Acceleration":matplotlib.Figure} mode : str "image" - converts into image via encoding at base64 string. "plotly" - converts into plotly figure using `mpl_to_plotly` Default: "image" Returns ------- graph : str or plotly figure or dict of one of those converted figure(s) """ if isinstance(fig, matplotlib.figure.Figure): graph = {"mode": mode, "fig": self._convert_matplotlib_fig(fig, mode)} elif isinstance(fig, dict): # nested for key in fig.keys(): fig[key] = self._convert_matplotlib_fig(fig[key], mode) graph = {"mode": mode, "fig": list(fig.values())} elif isinstance(fig, list): graph = { "mode": mode, "fig": [self._convert_matplotlib_fig(fig_, mode) for fig_ in fig], } else: graph = {"mode": mode, "fig": fig} self.send_output(graph, channel="plot") return graph
def get_all_attr(self): _all_attr = self.__dict__ excludes = [ "Inputs", "Outputs", "buffer", "PubAddr_alias", "PubAddr", "states", "log_mode", "get_all_attr", "plots", "name", "agent_loop", ] filtered_attr = { key: val for key, val in _all_attr.items() if key.startswith("_") is False } filtered_attr = { key: val for key, val in filtered_attr.items() if key not in excludes and type(val).__name__ != "function" } filtered_attr = { key: ( val if ( isinstance(val, float) or isinstance(val, int) or isinstance(val, str) or key == "output_channels_info" ) else str(val) ) for key, val in filtered_attr.items() } filtered_attr = { key: val for key, val in filtered_attr.items() if "object" not in str(val) } return filtered_attr
[docs] def shutdown(self): if self.backend == Backend.OSBRAIN: osBrainAgent.shutdown(self) else: # self.backend == Backend.MESA: self.mesa_model.schedule.remove(self) del self
[docs] class DataStreamAgent(AgentMET4FOF): """Able to simulate generation of datastream by loading a given DataStreamMET4FOF Can be used in incremental training or batch training mode. To simulate batch training mode, set `pretrain_size=-1` , otherwise, set pretrain_size and batch_size for the respective. See `DataStreamMET4FOF` on loading your own data set as a data stream. """
[docs] def init_parameters( self, stream=DataStreamMET4FOF(), pretrain_size=None, batch_size=1, loop_wait=1, randomize=False, ): """ Parameters ---------- stream : DataStreamMET4FOF A DataStreamMET4FOF object which provides the sample data pretrain_size : int The number of sample data to send through in the first loop cycle, and subsequently, the batch_size will be used batch_size : int The number of sample data to send in every loop cycle loop_wait : int The duration to wait (seconds) at the end of each loop cycle before going into the next cycle randomize : bool Determines if the dataset should be shuffled before streaming """ self.stream = stream self.stream.prepare_for_use() if randomize: self.stream.randomize_data() self.batch_size = batch_size if pretrain_size is None: self.pretrain_size = batch_size else: self.pretrain_size = pretrain_size self.pretrain_done = False self.loop_wait = loop_wait
[docs] def agent_loop(self): if self.current_state == "Running": if self.pretrain_size is None: self.send_next_sample(self.batch_size) elif self.pretrain_size == -1 or self.batch_size == -1: self.send_all_sample() self.pretrain_done = True else: # handle pre-training mode if self.pretrain_done: self.send_next_sample(self.batch_size) else: self.send_next_sample(self.pretrain_size) self.pretrain_done = True
def send_next_sample(self, num_samples=1): if self.stream.has_more_samples(): data = self.stream.next_sample(num_samples) self.log_info("DATA SAMPLE ID: " + str(self.stream.sample_idx)) self.send_output(data)
[docs] def reset(self): super(DataStreamAgent, self).reset() self.stream.reset()
def send_all_sample(self): self.send_next_sample(-1)
[docs] class MonitorAgent(AgentMET4FOF): """ Unique Agent for storing plots and data from messages received from input agents. The dashboard searches for Monitor Agents' `buffer` and `plots` to draw the graphs "plot" channel is used to receive base64 images from agents to plot on dashboard Attributes ---------- plots : dict Dictionary of format `{agent1_name : agent1_plot, agent2_name : agent2_plot}` plot_filter : list of str List of keys to filter the 'data' upon receiving message to be saved into memory Used to specifically select only a few keys to be plotted custom_plot_function : callable a custom plot function that can be provided to handle the data in the monitor agents buffer (see :class:`AgentMET4FOF` for details). The function gets provided with the content (value) of the buffer and with the string of the sender agent's name as stored in the buffer's keys. Additionally any other parameters can be provided as a dict in custom_plot_parameters. custom_plot_parameters : dict a custom dictionary of parameters that shall be provided to each call of the custom_plot_function """
[docs] def init_parameters( self, plot_filter: Optional[List[str]] = None, custom_plot_function: Optional[Callable[..., Scatter]] = None, **kwargs, ): """Initialize the monitor agent's parameters Parameters ---------- plot_filter : list of str, optional List of keys to filter the 'data' upon receiving message to be saved into memory. Used to specifically select only a few keys to be plotted custom_plot_function : callable, optional a custom plot function that can be provided to handle the data in the monitor agents buffer (see :class:`AgentMET4FOF` for details). The function gets provided with the content (value) of the buffer and with the string of the sender agent's name as stored in the buffer's keys. Additionally any other parameters can be provided as a dict in custom_plot_parameters. By default the data gets plotted as shown in the various tutorials. kwargs : Any custom key word parameters that shall be provided to each call of the :attr:`custom_plot_function` """ self.plots = {} self.plot_filter = plot_filter self.custom_plot_function = custom_plot_function self.custom_plot_parameters = kwargs
[docs] def on_received_message(self, message): """ Handles incoming data from 'default' and 'plot' channels. Stores 'default' data into :attr:`buffer` and 'plot' data into :attr:`plots` Parameters ---------- message : dict Acceptable channel values are 'default' or 'plot' """ if message["channel"] == "default": if self.plot_filter: message["data"] = { key: message["data"][key] for key in self.plot_filter } self.buffer_store(agent_from=message["from"], data=message["data"]) elif message["channel"] == "plot": self.update_plot_memory(message) return 0
[docs] def update_plot_memory(self, message: Dict[str, Any]): """ Updates plot figures stored in `self.plots` with the received message Parameters ---------- message : dict Standard message format specified by AgentMET4FOF class Message['data'] needs to be base64 image string and can be nested in dictionary for multiple plots. Only the latest plot will be shown kept and does not keep a history of the plots. """ if type(message["data"]) != dict or message["from"] not in self.plots.keys(): self.plots[message["from"]] = message["data"] elif type(message["data"]) == dict: for key in message["data"].keys(): self.plots[message["from"]].update({key: message["data"][key]}) self.log_info("PLOTS: " + str(self.plots))
[docs] def reset(self): super(MonitorAgent, self).reset() del self.plots self.plots = {}