Source code for agentMET4FOF.agents

# Agent dependencies
import base64
import copy
import csv
import datetime
import re
import sys
import time
from collections import deque
from io import BytesIO
from threading import Timer
from typing import Any, Callable, Dict, Iterable, List, Optional, Sized, Tuple, Union

import matplotlib.figure
import matplotlib.pyplot as plt
import mpld3
import networkx as nx
import numpy as np
import pandas as pd
from mesa import Agent as MesaAgent, Model
from mesa.time import BaseScheduler
from osbrain import Agent as osBrainAgent, NSProxy, run_agent, run_nameserver
from pandas import DataFrame
from plotly import tools as tls
from .dashboard.default_network_stylesheet import default_agent_network_stylesheet
from plotly.graph_objs import Scatter

from .dashboard.Dashboard_agt_net import Dashboard_agt_net
from .streams import DataStreamMET4FOF, SineGenerator


[docs]class AgentMET4FOF(MesaAgent, osBrainAgent): """ Base class for all agents with specific functions to be overridden/supplied by user. Behavioral functions for users to provide are init_parameters, agent_loop and on_received_message. Communicative functions are bind_output, unbind_output and send_output. """ def __init__(self, name='', host=None, serializer=None, transport=None, attributes=None, backend="osbrain", mesa_model=None): self.backend = backend.lower() if self.backend == "osbrain": self._remove_methods(MesaAgent) osBrainAgent.__init__(self, name=name, host=host, serializer=serializer, transport=transport, attributes=attributes) elif self.backend == "mesa": MesaAgent.__init__(self, name, mesa_model) self._remove_methods(osBrainAgent) self.init_mesa(name) self.unique_id = name self.name = name self.mesa_model = mesa_model else: raise NotImplementedError("Backend has not been implemented. Valid choices are 'osbrain' and 'mesa'.") def init_mesa(self, name): # MESA Specific parameters self.mesa_message_queue = deque([]) self.unique_id = name self.name = name
[docs] def step(self): """ Used for MESA backend only. Behaviour on every update step. """ # check if there's message in queue while len(self.mesa_message_queue) > 0: self.handle_process_data(self.mesa_message_queue.popleft()) # proceed with user-defined agent-loop self.agent_loop()
def _remove_methods(self, cls): """Remove methods from the other backends base class from the current agent""" for name in list(vars(cls)): if not name.startswith("__"): try: delattr(self, name) except AttributeError: # This situation only occurs when we start and stop agent # networks of differing backends in one sequence. Normally # ignoring these errors should be no problem. pass
[docs] def set_attr(self, **kwargs): for key, val in kwargs.items(): return setattr(self, key, val)
[docs] def get_attr(self, attr): return getattr(self, attr)
[docs] def init_agent(self, buffer_size=1000, log_mode=True): """ Internal initialization to setup the agent: mainly on setting the dictionary of Inputs, Outputs, PubAddr. Calls user-defined `init_parameters()` upon finishing. Attributes ---------- Inputs : dict Dictionary of Agents connected to its input channels. Messages will arrive from agents in this dictionary. Automatically updated when `bind_output()` function is called Outputs : dict Dictionary of Agents connected to its output channels. Messages will be sent to agents in this dictionary. Automatically updated when `bind_output()` function is called PubAddr_alias : str Name of Publish address socket PubAddr : str Publish address socket handle AgentType : str Name of class current_state : str Current state of agent. Can be used to define different states of operation such as "Running", "Idle, "Stop", etc.. Users will need to define their own flow of handling each type of `self.current_state` in the `agent_loop` loop_wait : int The interval to wait between loop. Call `init_agent_loop` to restart the timer or set the value of loop_wait in `init_parameters` when necessary. buffer_size : int The total number of elements to be stored in the agent :attr:`buffer` When total elements exceeds this number, the latest elements will be replaced with the incoming data elements """ self.Inputs = {} self.Outputs = {} self.Outputs_agent_channels = {} #keep track of agent subscription channels self.AgentType = type(self).__name__ self.log_mode = log_mode self.log_info("INITIALIZED") # These are the available states to change the agents' behavior in # agent_loop. self.states = {0: "Idle", 1: "Running", 2: "Pause", 3: "Stop", 4: "Reset"} self.current_state = self.states[0] self.loop_wait = None if not hasattr(self, "stylesheet"): self.stylesheet = "" self.output_channels_info = {} self.buffer_size = buffer_size self.buffer = self.init_buffer(self.buffer_size) if self.backend == 'osbrain': self.PubAddr_alias = self.name + "_PUB" self.PubAddr = self.bind('PUB', alias=self.PubAddr_alias, transport='tcp')
[docs] def init_buffer(self, buffer_size): """ A method to initialise the buffer. By overriding this method, user can provide a custom buffer, instead of the regular AgentBuffer. This can be used, for example, to provide a MetrologicalAgentBuffer in the metrological agents. """ buffer = AgentBuffer(buffer_size) return buffer
[docs] def reset(self): """ This method will be called on all agents when the global `reset_agents` is called by the AgentNetwork and when the Reset button is clicked on the dashboard. Method to reset the agent's states and parameters. User can override this method to reset the specific parameters. """ self.log_info("RESET AGENT STATE") self.buffer.clear()
[docs] def init_parameters(self): """ User provided function to initialize parameters of choice. """ return 0
def log_ML(self, message): self.send("_logger", message, topic="ML_EXP")
[docs] def log_info(self, message): """ Prints logs to be saved into logfile with Logger Agent Parameters ---------- message : str Message to be logged to the internal Logger Agent """ try: if self.log_mode: if self.backend == "osbrain": super().log_info(message) elif self.backend == "mesa": message = '[%s] (%s): %s' % (datetime.datetime.utcnow(), self.name, message) print(message) except Exception as e: print(e) return 1
[docs] def init_agent_loop(self, loop_wait: Optional[int] = None): """ Initiates the agent loop, which iterates every `loop_wait` seconds Stops every timers and initiate a new loop. Parameters ---------- loop_wait : int, optional The wait between each iteration of the loop """ # most default: loop wait has not been set in init_parameters() not init_agent_loop() if self.loop_wait is None and loop_wait is None: set_loop_wait = 1.0 # init_agent_loop overrides loop_wait parameter elif loop_wait is not None: set_loop_wait = loop_wait # otherwise assume init_parameters() have set loop_wait elif self.loop_wait is not None: set_loop_wait = self.loop_wait self.loop_wait = set_loop_wait if self.backend == "osbrain": self.stop_all_timers() # check if agent_loop is overridden by user if self.__class__.agent_loop == AgentMET4FOF.agent_loop: return 0 else: if self.backend == "osbrain": self.each(self.loop_wait, self.__class__.agent_loop) return 0
[docs] def stop_agent_loop(self): """ Stops agent_loop from running. Note that the agent will still be responding to messages """ self.stop_all_timers()
[docs] def agent_loop(self): """ User defined method for the agent to execute for `loop_wait` seconds specified either in `self.loop_wait` or explicitly via`init_agent_loop(loop_wait)` To start a new loop, call `init_agent_loop(loop_wait)` on the agent Example of usage is to check the `current_state` of the agent and send data periodically """ return 0
[docs] def on_received_message(self, message): """ User-defined method and is triggered to handle the message passed by Input. Parameters ---------- message : Dictionary The message received is in form {'from':agent_name, 'data': data, 'senderType': agent_class, 'channel':channel_name} agent_name is the name of the Input agent which sent the message data is the actual content of the message """ return message
[docs] def buffer_filled(self, agent_name=None): """ Checks whether the internal buffer has been filled to the maximum allowed specified by self.buffer_size Parameters ---------- agent_name : str Index of the buffer which is the name of input agent. Returns ------- status of buffer filled : boolean """ return self.buffer.buffer_filled(agent_name)
[docs] def buffer_clear(self, agent_name: Optional[str] = None): """ Empties buffer which is a dict indexed by the `agent_name`. Parameters ---------- agent_name : str, optional Key of the memory dict, which can be the name of input agent, or self.name. If not supplied (default), we assume to clear the entire memory. """ self.buffer.clear(agent_name)
[docs] def buffer_store(self, agent_from: str, data=None, concat_axis=0): """ Updates data stored in `self.buffer` with the received message Checks if sender agent has sent any message before If it did,then append, otherwise create new entry for it Parameters ---------- agent_from : str Name of agent sender data Any supported data which can be stored in dict as buffer. See AgentBuffer for more information. """ self.buffer.store(agent_from=agent_from, data=data, concat_axis=concat_axis) self.log_info("Buffer: " + str(self.buffer.buffer))
[docs] def pack_data(self, data, channel='default'): """ Internal method to pack the data content into a dictionary before sending out. Special case : if the `data` is already a `message`, then the `from` and `senderType` will be altered to this agent, without altering the `data` and `channel` within the message this is used for more succinct data processing and passing. Parameters ---------- data : argument Data content to be packed before sending out to agents. channel : str Key of dictionary which stores data Returns ------- Packed message data : dict of the form {'from':agent_name, 'data': data, 'senderType': agent_class, 'channel':channel_name}. """ # if is a message type, override the `from` and `senderType` fields only if self._is_type_message(data): new_data = data new_data['from'] = self.name new_data['senderType'] = type(self).__name__ return new_data return {'from': self.name, 'data': data, 'senderType': type(self).__name__, 'channel': channel}
def _is_type_message(self, data): """ Internal method to check if the data carries signature of an agent message type Parameters ---------- data Data to be checked for type Returns ------- result : boolean """ if type(data) == dict: dict_keys = data.keys() if 'from' in dict_keys and 'data' in dict_keys and 'senderType' in dict_keys: return True return False
[docs] def send_output(self, data, channel='default'): """ Sends message data to all connected agents in self.Outputs. Output connection can first be formed by calling bind_output. By default calls pack_data(data) before sending out. Can specify specific channel as opposed to 'default' channel. Parameters ---------- data : argument Data content to be sent out channel : str Key of `message` dictionary which stores data Returns ------- message : dict of the form {'from':agent_name, 'data': data, 'senderType': agent_class, 'channel':channel_name}. """ start_time_pack = time.time() packed_data = self.pack_data(data, channel=channel) if self.backend == "osbrain": self.send(self.PubAddr, packed_data, topic=channel) elif self.backend == "mesa": for key, value in self.Outputs.items(): # if output agent has subscribed to a list of channels, # we check whether `channel` is subscribed in that list # if it is, then we append to that agent's message queue if isinstance(self.Outputs_agent_channels[key], list): if channel in self.Outputs_agent_channels[key]: value.mesa_message_queue.append(packed_data) elif channel == self.Outputs_agent_channels[key]: value.mesa_message_queue.append(packed_data) duration_time_pack = round(time.time() - start_time_pack, 6) # LOGGING try: if self.log_mode: self.log_info("Pack time: " + str(duration_time_pack)) self.log_info("Sending: " + str(data)) except Exception as e: print(e) # Add info of channel self._update_output_channels_info(packed_data['data'], packed_data['channel']) return packed_data
def _update_output_channels_info(self, data, channel): """ Internal method to update the dict of output_channels_info. This is used in conjunction with send_output(). Checks and records data type & dimension and channel name If the data is nested within dict, then it will search deeper and subsequently record the info of each inner hierarchy Parameters ---------- data data to be checked for type & dimension channel : str name of channel to be recorded """ if channel not in self.output_channels_info.keys(): if type(data) == dict: nested_metadata = {key: {nested_dict_key:self._get_metadata(nested_dict_val) for nested_dict_key,nested_dict_val in data[key].items()} if isinstance(data[key],dict) else self._get_metadata(data[key]) for key in data.keys()} self.output_channels_info.update({channel: nested_metadata}) else: self.output_channels_info.update({channel: self._get_metadata(data)}) def _get_metadata(self, data): """ Internal helper function for getting the data type & dimensions of data. This is for update_output_channels_info() """ data_info = {} if type(data) == np.ndarray or type(data).__name__ == "DataFrame": data_info.update({'type': type(data).__name__, 'shape': data.shape}) elif type(data) == list: data_info.update({'type': type(data).__name__, 'len': len(data)}) else: data_info.update({'type': type(data).__name__}) return data_info
[docs] def handle_process_data(self, message): """ Internal method to handle incoming message before calling user-defined on_received_message method. If current_state is either Stop or Reset, it will terminate early before entering on_received_message """ if self.current_state == "Stop" or self.current_state == "Reset": return 0 # LOGGING try: self.log_info("Received: " + str(message)) except Exception as e: print(e) # process the received data here start_time_pack = time.time() if message["channel"] == "request-attr": self.respond_request_attr_(message["data"]) if message["channel"] == "request-method": self.respond_request_method_(message["data"]) elif (message["channel"] == "reply-attr" or message["channel"] =="set-attr") and message["data"] != "NULL": self.respond_reply_attr_(message["data"]) else: self.on_received_message(message) end_time_pack = time.time() self.log_info("Tproc: " + str(round(end_time_pack - start_time_pack, 6)))
[docs] def send_request_attribute(self, attribute : str): """ Send a `request` of `attribute` to output agents. Output agents will reply with the requested `attribute` if they have. """ self.send_output(data=attribute,channel="request-attr")
[docs] def send_request_method(self, method : str, **method_params): """ Send a `request` of executing methods to output agents. Output agents will respond by calling the method. """ message = {"name":method} message.update(method_params) self.send_output(data=message,channel="request-method")
[docs] def send_set_attr(self, attr :str, value): """ Sends a message to set the `attr` of another agent to that of `value`. Parameters ---------- attr : str The variable name of the output agent to be set. value The value of the variable to be set """ self.send_output(data={attr: value}, channel="set-attr")
[docs] def respond_reply_attr_(self, message_data): """ Response to a `reply` of setting attribute """ if isinstance(message_data, str) and message_data == "NULL": return 0 else: key = next(iter(message_data)) setattr(self, key, message_data[key])
[docs] def respond_request_attr_(self, attribute: str): """ Response to a `request` of `attribute` from input agents. This agent reply with the requested `attribute` if it has it. """ if hasattr(self, attribute): self.send_output(data={attribute:self.get_attr(attribute)}, channel="reply-attr") else: self.log_info("'"+attribute+"' not available for reply.") self.send_output(data="NULL", channel="reply-attr")
[docs] def respond_request_method_(self, message_data:dict): """ Response to a `request` of executing `method` from input agents. This agent will execute the method with the provided parameters of the method. """ method_name = message_data["name"] data_params = {key:val for key,val in message_data.items() if key != "name"} if hasattr(self, method_name): self.get_attr(method_name)(**data_params)
[docs] def on_connect_output(self, output_agent): """ This user provided method is called whenever an agent is connected to its output. This can be for example, to send `metadata` or `ping` to the output agent. """ return NotImplemented
[docs] def bind_output(self, output_agent, channel="default"): """ Forms Output connection with another agent. Any call on send_output will reach this newly binded agent Adds the agent to its list of Outputs. Parameters ---------- output_agent : AgentMET4FOF or list Agent(s) to be binded to this agent's output channel channel : str or list of str Specific name of the channel(s) to be subscribed to. (Default = "data") """ if isinstance(output_agent, list): for agent in output_agent: self._bind_output(output_agent=agent, channel=channel) else: self._bind_output(output_agent=output_agent, channel=channel)
def _bind_output(self, output_agent, channel="default"): """ Internal method which implements the logic for connecting this agent, to the `output_agent`. """ if type(output_agent) == str: output_agent_id = output_agent else: output_agent_id = output_agent.get_attr('name') # if output_agent_id not in self.Outputs and output_agent_id != self.name: if output_agent_id not in self.Outputs and output_agent_id != self.name: # update self.Outputs list and Inputs list of output_module self.Outputs.update({output_agent.get_attr('name'): output_agent}) temp_updated_inputs = output_agent.get_attr('Inputs') temp_updated_inputs.update({self.name: self}) output_agent.set_attr(Inputs=temp_updated_inputs) # connect socket for osbrain if self.backend == "osbrain": self.Outputs_agent_channels.update({output_agent.get_attr('name'): channel}) # bind to the address if output_agent.has_socket(self.PubAddr_alias): if isinstance(channel, list): output_agent.connect(self.PubAddr, alias=self.PubAddr_alias, handler={channel_name: AgentMET4FOF.handle_process_data for channel_name in channel}) else: output_agent.subscribe(self.PubAddr_alias, handler={channel: AgentMET4FOF.handle_process_data}) else: if isinstance(channel, list): output_agent.connect(self.PubAddr, alias=self.PubAddr_alias, handler={channel_name: AgentMET4FOF.handle_process_data for channel_name in channel}) else: output_agent.connect(self.PubAddr, alias=self.PubAddr_alias, handler={channel: AgentMET4FOF.handle_process_data}) # update channels subscription information for mesa else: self.Outputs_agent_channels.update({output_agent.get_attr('name'): channel}) # calls on connect output method self.on_connect_output(output_agent) # LOGGING if self.log_mode: self.log_info("Connected output module: " + output_agent_id)
[docs] def unbind_output(self, output_agent): """ Remove existing output connection with another agent. This reverses the bind_output method Parameters ---------- output_agent : AgentMET4FOF Agent binded to this agent's output channel """ if type(output_agent) == str: output_agent_id = output_agent else: output_agent_id = output_agent.get_attr('name') if output_agent_id in self.Outputs and output_agent_id != self.name: self.Outputs.pop(output_agent_id, None) new_inputs = output_agent.get_attr('Inputs') new_inputs.pop(self.name, None) output_agent.set_attr(Inputs=new_inputs) if self.backend == "osbrain": output_agent.unsubscribe(self.PubAddr_alias, topic=self.Outputs_agent_channels[output_agent_id]) self.Outputs_agent_channels.pop(output_agent_id, None) # LOGGING if self.log_mode: self.log_info("Disconnected output module: " + output_agent_id)
def _convert_to_plotly(self, matplotlib_fig: matplotlib.figure.Figure): """ Internal method to convert matplotlib figure to plotly figure Parameters ---------- matplotlib_fig: plt.Figure Matplotlib figure to be converted """ # convert to plotly format matplotlib_fig.tight_layout() plotly_fig = tls.mpl_to_plotly(matplotlib_fig) plotly_fig['layout']['showlegend'] = True return plotly_fig def _fig_to_uri(self, matplotlib_fig: matplotlib.figure.Figure): """ Internal method to convert matplotlib figure to base64 uri image for display Parameters ---------- matplotlib_fig : plt.Figure Matplotlib figure to be converted """ out_img = BytesIO() matplotlib_fig.savefig(out_img, format='png') matplotlib_fig.clf() plt.close(matplotlib_fig) out_img.seek(0) # rewind file encoded = base64.b64encode(out_img.read()).decode("ascii").replace("\n", "") return "data:image/png;base64,{}".format(encoded) def _convert_matplotlib_fig(self, fig: matplotlib.figure.Figure, mode: str = "image"): """ Internal method to convert matplotlib figure which can be rendered by the dashboard. """ error_msg = "Conversion mode " + mode + " is not implemented." if mode == "plotly": fig = self._convert_to_plotly(fig) elif mode == "image": fig = self._fig_to_uri(fig) elif mode == "mpld3": fig = mpld3.fig_to_dict(fig) else: raise NotImplementedError(error_msg) return fig
[docs] def send_plot(self, fig: Union[matplotlib.figure.Figure, Dict[str, matplotlib.figure.Figure]], mode: str = "image"): """ Sends plot to agents connected to this agent's Output channel. This method is different from send_output which will be sent to through the 'plot' channel to be handled. Tradeoffs between "image" and "plotly" modes are that "image" are more stable and "plotly" are interactive. Note not all (complicated) matplotlib figures can be converted into a plotly figure. Parameters ---------- fig : matplotlib.figure.Figure or dict of matplotlib.figure.Figure Alternatively, multiple figures can be nested in a dict (with any preferred keys) e.g {"Temperature":matplotlib.Figure, "Acceleration":matplotlib.Figure} mode : str "image" - converts into image via encoding at base64 string. "plotly" - converts into plotly figure using `mpl_to_plotly` Default: "image" Returns ------- graph : str or plotly figure or dict of one of those converted figure(s) """ if isinstance(fig, matplotlib.figure.Figure): graph = {"mode": mode, "fig": self._convert_matplotlib_fig(fig, mode)} elif isinstance(fig, dict): # nested for key in fig.keys(): fig[key] = self._convert_matplotlib_fig(fig[key], mode) graph = {"mode": mode, "fig": list(fig.values())} elif isinstance(fig, list): graph = {"mode": mode, "fig": [self._convert_matplotlib_fig(fig_, mode) for fig_ in fig]} else: graph = {"mode": mode, "fig": fig} self.send_output(graph, channel="plot") return graph
def get_all_attr(self): _all_attr = self.__dict__ excludes = [ "Inputs", "Outputs", "buffer", "PubAddr_alias", "PubAddr", "states", "log_mode", "get_all_attr", "plots", "name", "agent_loop" ] filtered_attr = {key: val for key, val in _all_attr.items() if key.startswith('_') is False} filtered_attr = {key: val for key, val in filtered_attr.items() if key not in excludes and type(val).__name__ != 'function'} filtered_attr = {key: val if (type(val) == float or type(val) == int or type( val) == str or key == 'output_channels_info') else str(val) for key, val in filtered_attr.items()} filtered_attr = {key: val for key, val in filtered_attr.items() if "object" not in str(val)} return filtered_attr
[docs] def shutdown(self): if self.backend == "osbrain": osBrainAgent.shutdown(self) elif self.backend == "mesa": self.mesa_model.schedule.remove(self) del self
[docs]class AgentBuffer: """Buffer class which is instantiated in every agent to store data incrementally This buffer is necessary to handle multiple inputs coming from agents. We can access the buffer like a dict with exposed functions such as .values(), .keys() and .items(). The actual dict object is stored in the variable :attr:`buffer`. Attributes ---------- buffer : dict of iterables or dict of dicts of iterables The buffer can be a dict of iterables, or a dict of dict of iterables for nested named data. The keys are the names of agents. buffer_size : The total number of elements to be stored in the agent :attr:`buffer` supported_datatypes : list of types List of all types supported and thus properly handled by the buffer. Defaults to :class:`np.ndarray <NumPy:numpy.ndarray>`, list and Pandas :class:`DataFrame <Pandas:pandas.DataFrame>` """ def __init__(self, buffer_size: int = 1000): """Initialise a new agent buffer object Parameters ---------- buffer_size: int Length of buffer allowed. """ self.buffer = {} self.buffer_size = buffer_size self.supported_datatypes = [list, pd.DataFrame, np.ndarray] def __getitem__(self, key): return self.buffer[key]
[docs] def check_supported_datatype(self, obj: object) -> bool: """Checks whether `value` is an object of one of the supported data types Parameters ---------- obj : object Value to be checked Returns ------ result : boolean True if value is an object of one of the supported data types, False if not """ for supported_datatype in self.supported_datatypes: if isinstance(obj, supported_datatype): return True return False
[docs] def update( self, agent_from: Union[Dict[str, Union[np.ndarray, list, pd.DataFrame]], str], data: Union[np.ndarray, list, pd.DataFrame, float, int] = None, ): """Overrides data in the buffer dict keyed by ``agent_from`` with value ``data`` If ``data`` is a single value, this converts it into a list first before storing in the buffer dict. Parameters ---------- agent_from : str Name of agent sender data : np.ndarray, DataFrame, list, float or int New incoming data """ # handle if data type nested in dict if isinstance(data, dict): # check for each value datatype for key, value in data.items(): # if the value is not list types, turn it into a list of single value i.e [value] if not self.check_supported_datatype(value): data[key] = [value] elif not self.check_supported_datatype(data): data = [data] self.buffer.update({agent_from: data}) return self.buffer
def _concatenate( self, iterable: Union[np.ndarray, list, pd.DataFrame], data: Union[np.ndarray, list, DataFrame], concat_axis: int = 0 ) -> Iterable: """Concatenate the given ``iterable`` with ``data`` Handles the concatenation function depending on the datatype, and truncates it if the buffer is filled to `buffer_size`. Parameters ---------- iterable : any in supported_datatype The current buffer to be concatenated with. data : np.ndarray, DataFrame, list New incoming data Returns any in supported_datatype the original buffer with the data appended """ # handle list if isinstance(iterable, list): iterable += data # check if exceed memory buffer size, remove the first n elements which exceeded the size if len(iterable) > self.buffer_size: truncated_element_index = len(iterable) - self.buffer_size iterable = iterable[truncated_element_index:] # handle if data type is np.ndarray elif isinstance(iterable, np.ndarray): iterable = np.concatenate((iterable, data),axis=concat_axis) if len(iterable) > self.buffer_size: truncated_element_index = len(iterable) - self.buffer_size iterable = iterable[truncated_element_index:] # handle if data type is pd.DataFrame elif isinstance(iterable, pd.DataFrame): iterable = pd.concat([iterable,data], ignore_index=True, axis=concat_axis) if len(iterable) > self.buffer_size: truncated_element_index = len(iterable) - self.buffer_size iterable = iterable.truncate(before=truncated_element_index) return iterable
[docs] def buffer_filled(self, agent_from: Optional[str] = None) -> bool: """Checks whether buffer is filled, by comparing against the :attr:`buffer_size` For nested dict, this returns True if any of the iterables is beyond the :attr:`buffer_size`. For any of the dict values , which is not one of :attr:`supported_datatypes` this returns None. Parameters ---------- agent_from : str, optional Name of input agent in the buffer dict to be looked up. If ``agent_from`` is not provided, we check for all iterables in the buffer (default). Returns ------- bool or None True if either the or any of the iterables has reached :attr:`buffer_size` or None in case none of the values is of one of the supported datatypes. False if all present iterable can take at least one more element according to :attr:`buffer_size`. """ if agent_from is None: return any([self._iterable_filled(iterable) for iterable in self.values()]) elif isinstance(self[agent_from], dict): return any([self._iterable_filled(iterable) for iterable in self[agent_from].values()]) else: return self._iterable_filled(self[agent_from])
def _iterable_filled(self, iterable: Sized) -> Union[bool, None]: """Internal method for checking on length of iterables of supported types Parameters ---------- iterable : Any Expected to be an iterable of one of the supported datatypes but could be any. Returns ------- bool or None True if the iterable is of one of the supported datatypes and has reached :attr:`buffer_size` in length or False if not or None in case it is not of one of the supported datatypes. """ if self.check_supported_datatype(iterable): if len(iterable) >= self.buffer_size: return True return False
[docs] def popleft(self, n: Optional[int] = 1) -> Union[Dict, np.ndarray, list, DataFrame]: """Pops the first n entries in the buffer Parameters --------- n : int Number of elements to retrieve from buffer Returns ------- dict, :class:`np.ndarray <NumPy:numpy.ndarray>`, list or Pandas :class:`DataFrame <Pandas:pandas.DataFrame>` The retrieved elements """ popped_buffer = copy.copy(self.buffer) remaining_buffer = copy.copy(self.buffer) if isinstance(popped_buffer, dict): for key, value in popped_buffer.items(): value, remaining_buffer[key] = self._popleft(value, n) else: popped_buffer, remaining_buffer = self._popleft(popped_buffer, n) self.buffer = remaining_buffer return popped_buffer
def _popleft( self, iterable: Union[np.ndarray, list, DataFrame], n: Optional[int] = 1 ) -> Tuple[Union[np.ndarray, list, DataFrame], Union[np.ndarray, list, DataFrame]]: """Internal handler of the actual popping mechanism based on type of iterable Parameters --------- n : int Number of elements to retrieve from buffer. iterable : any in :attr:`supported_datatypes` The current buffer to retrieve from. Returns ------- 2-tuple of each either one of :class:`np.ndarray <NumPy:numpy.ndarray>`, list or Pandas :class:`DataFrame <Pandas:pandas.DataFrame>` The retrieved elements and the residual items in the buffer """ popped_item = 0 if isinstance(iterable, list): popped_item = iterable[:n] iterable = iterable[n:] elif isinstance(iterable, np.ndarray): popped_item = iterable[:n] iterable = iterable[n:] elif isinstance(iterable, pd.DataFrame): popped_item = iterable.iloc[:n] iterable = iterable.iloc[n:] return popped_item, iterable
[docs] def clear(self, agent_from: Optional[str] = None): """Clears the data in the buffer Parameters ---------- agent_from : str, optional Name of agent, if ``agent_from`` is not given, the entire buffer is flushed. (default) """ if agent_from is None: self.buffer = {} elif agent_from in self.buffer: del self.buffer[agent_from]
[docs] def store( self, agent_from: Union[Dict[str, Union[np.ndarray, list, pd.DataFrame]], str], data: Union[np.ndarray, list, pd.DataFrame, float, int] = None, concat_axis: Optional[int] = 0, ): """Stores data into :attr:`buffer` with the received message Checks if sender agent has sent any message before. If it did, then append, otherwise create new entry for it. Parameters ---------- agent_from : dict | str if type is dict, we expect it to be the agentMET4FOF dict message to be compliant with older code (keys ``from`` and ``data`` present'), otherwise we expect it to be name of agent sender and ``data`` will need to be passed as parameter data : np.ndarray, DataFrame, list, float or int Not used if ``agent_from`` is a dict. Otherwise ``data`` is compulsory. concat_axis : int, optional axis to concatenate on with the buffering for numpy arrays. Default is 0. """ # Store into a separate variables, it will be used frequently later for the # type checks. If first argument is the agentMET4FOF dict message in old format if isinstance(agent_from, dict): message_from = agent_from["from"] message_data = agent_from["data"] # ... otherwise, we expect the name of agent_sender and the data to be passed. else: message_from = agent_from message_data = data # check if sender agent has sent any message before: # if it did,then append, otherwise create new entry for the input agent if message_from not in self.buffer: self.update(message_from, message_data) return 0 # otherwise 'sender' exists in memory, handle appending # acceptable data types : list, dict, ndarray, dataframe, single values # handle nested data in dict if isinstance(message_data, dict): for key, value in message_data.items(): # if it is a single value, then we convert it into a single element list if not self.check_supported_datatype(value): value = [value] # check if the key exist # if it does, then append if key in self.buffer[agent_from].keys(): self.buffer[agent_from][key] = self._concatenate(self.buffer[agent_from][key], value,concat_axis) # otherwise, create new entry else: self.buffer[agent_from].update({key: value}) else: if not self.check_supported_datatype(message_data): message_data = [message_data] self.buffer[agent_from] = self._concatenate(self.buffer[agent_from], message_data,concat_axis)
[docs] def values(self): """Interface to access the internal dict's values()""" return self.buffer.values()
[docs] def items(self): """Interface to access the internal dict's items()""" return self.buffer.items()
[docs] def keys(self): """Interface to access the internal dict's keys()""" return self.buffer.keys()
class _AgentController(AgentMET4FOF): """ Unique internal agent to provide control to other agents. Automatically instantiated when starting server. Provides global control to all agents in network. """ def init_parameters(self, ns=None, backend='osbrain', mesa_model="", log_mode=True): self.backend = backend self.states = {0: "Idle", 1: "Running", 2: "Pause", 3: "Stop"} self.current_state = "Idle" self.ns = ns self.G = nx.DiGraph() self._logger = None self.coalitions = [] self.log_mode = log_mode if backend == "mesa": self.mesa_model = mesa_model def start_mesa_timer(self, mesa_update_interval): class RepeatTimer(): def __init__(self, t, repeat_function): self.t = t self.repeat_function = repeat_function self.thread = Timer(self.t, self.handle_function) def handle_function(self): self.repeat_function() self.thread = Timer(self.t, self.handle_function) self.thread.start() def start(self): self.thread.start() def cancel(self): self.thread.cancel() self.mesa_update_interval = mesa_update_interval self.mesa_timer = RepeatTimer(t=mesa_update_interval, repeat_function=self.mesa_model.step) self.mesa_timer.start() def stop_mesa_timer(self): if self.mesa_timer: self.mesa_timer.cancel() del self.mesa_timer def step_mesa_model(self): self.mesa_model.step() def get_mesa_model(self): return self.mesa_model def get_agent(self, agentName=""): if self.backend == "osbrain": return self.ns.proxy(agentName) elif self.backend == "mesa": return self.mesa_model.get_agent(agentName) def get_agentType_count(self, agentType): num_count = 1 agentType_name = str(agentType.__name__) agent_names = self.agents() if len(agent_names) != 0: for agentName in agent_names: current_agent_type = self.get_agent(agentName).get_attr('AgentType') if current_agent_type == agentType_name: num_count += 1 return num_count def get_agent_name_count(self, new_agent_name): num_count = 1 agent_names = self.agents() if len(agent_names) != 0: for agentName in agent_names: if new_agent_name in agentName: num_count += 1 return num_count def generate_module_name_byType(self, agentType): # handle agent type if isinstance(agentType, str): name = agentType else: name = agentType.__name__ name += "_" + str(self.get_agentType_count(agentType)) return name def generate_module_name_byUnique(self, agent_name): name = agent_name agent_copy_count = self.get_agent_name_count(agent_name) # number of agents with same name if agent_copy_count > 1: name += "(" + str(self.get_agent_name_count(agent_name)) + ")" return name def add_agent(self, name=" ", agentType=AgentMET4FOF, log_mode=True, buffer_size=1000, ip_addr=None, loop_wait=None, **kwargs): try: if ip_addr is None: ip_addr = 'localhost' if name == " ": new_name = self.generate_module_name_byType(agentType) else: new_name = self.generate_module_name_byUnique(name) # actual instantiation of agent, depending on backend if self.backend == "osbrain": new_agent = self._add_osbrain_agent(name=new_name, agentType=agentType, log_mode=log_mode, buffer_size=buffer_size, ip_addr=ip_addr, loop_wait=loop_wait, **kwargs) elif self.backend == "mesa": # handle osbrain and mesa here new_agent = self._add_mesa_agent(name=new_name, agentType=agentType, buffer_size=buffer_size, log_mode=log_mode, **kwargs) return new_agent except Exception as e: self.log_info("ERROR:" + str(e)) def _add_osbrain_agent(self, name=" ", agentType=AgentMET4FOF, log_mode=True, buffer_size=1000, ip_addr=None, loop_wait=None, **kwargs): new_agent = run_agent(name, base=agentType, attributes=dict(log_mode=log_mode, buffer_size=buffer_size), nsaddr=self.ns.addr(), addr=ip_addr) new_agent.init_parameters(**kwargs) new_agent.init_agent(buffer_size=buffer_size, log_mode=log_mode) new_agent.init_agent_loop(loop_wait) if log_mode: new_agent.set_logger(self._get_logger()) return new_agent def _add_mesa_agent(self, name=" ", agentType=AgentMET4FOF, log_mode=True, buffer_size=1000, **kwargs): new_agent = agentType(name=name, backend=self.backend, mesa_model=self.mesa_model) new_agent.init_parameters(**kwargs) new_agent.init_agent(buffer_size=buffer_size, log_mode=log_mode) new_agent = self.mesa_model.add_agent(new_agent) return new_agent def get_agents_stylesheets(self, agent_names): # for customising display purposes in dashboard agents_stylesheets = [] for agent in agent_names: try: stylesheet = self.get_agent(agent).get_attr("stylesheet") agents_stylesheets.append({"stylesheet": stylesheet}) except Exception as e: self.log_info("Error:" + str(e)) return agents_stylesheets def agents(self, exclude_names=["AgentController", "Logger"]): if self.backend == "osbrain": agent_names = [name for name in self.ns.agents() if name not in exclude_names] else: agent_names = self.mesa_model.agents() return agent_names def update_networkx(self): agent_names = self.agents() edges = self.get_latest_edges(agent_names) if len(agent_names) != self.G.number_of_nodes() or len(edges) != self.G.number_of_edges(): agent_stylesheets = self.get_agents_stylesheets(agent_names) new_G = nx.DiGraph() new_G.add_nodes_from(list(zip(agent_names, agent_stylesheets))) new_G.add_edges_from(edges) self.G = new_G def get_networkx(self): return (self.G) def get_latest_edges(self, agent_names): edges = [] for agent_name in agent_names: temp_agent = self.get_agent(agent_name) output_agent_channels = temp_agent.get_attr('Outputs_agent_channels') temp_output_agents = list(output_agent_channels.keys()) temp_output_channels = list(output_agent_channels.values()) for output_agent_name, output_agent_channel in zip(temp_output_agents,temp_output_channels): edges += [(agent_name, output_agent_name, {"channel":str(output_agent_channel)})] return edges def _get_logger(self): """ Internal method to access the Logger relative to the nameserver """ if self._logger is None: self._logger = self.ns.proxy('Logger') return self._logger def add_coalition(self, new_coalition): """ Instantiates a coalition of agents. """ self.coalitions.append(new_coalition) return new_coalition def del_coalition(self): self.coalitions = [] def add_coalition_agent(self, name, agents=[]): """ Add agents into the coalition """ # update coalition for coalition_i, coalition in enumerate(self.coalitions): if coalition.name == name: for agent in agents: self.coalitions[coalition_i].add_agent(agent) def remove_coalition_agent(self, coalition_name, agent_name=""): """ Remove agent from coalition """ # update coalition for coalition_i, coalition in enumerate(self.coalitions): if coalition.name == coalition_name: self.coalitions[coalition_i].remove_agent(agent_name) def get_coalition(self, name): """ Gets the coalition based on provided name """ for coalition_i, coalition in enumerate(self.coalitions): if coalition.name == name: return coalition return -1
[docs]class MesaModel(Model): """A MESA Model""" def __init__(self): self.schedule = BaseScheduler(self) def add_agent(self, agent: MesaAgent): self.schedule.add(agent) return agent def get_agent(self, agentName: str): agent = next((x for x in self.schedule.agents if x.name == agentName), None) return agent
[docs] def step(self): '''Advance the model by one step.''' self.schedule.step()
def agents(self): return [agent.name for agent in self.schedule.agents]
[docs] def shutdown(self): """Shutdown entire MESA model with all agents and schedulers""" for agent in self.agents(): agent_obj = self.get_agent(agent) agent_obj.shutdown()
[docs]class AgentNetwork: """ Object for starting a new Agent Network or connect to an existing Agent Network specified by ip & port Provides function to add agents, (un)bind agents, query agent network state, set global agent states Interfaces with an internal _AgentController which is hidden from user """ def __init__(self, ip_addr="127.0.0.1", port=3333, connect=False, log_filename="log_file.csv", dashboard_modules=True, dashboard_extensions=[], dashboard_update_interval=3, dashboard_max_monitors=10, dashboard_port=8050, backend="osbrain", mesa_update_interval=0.1, network_stylesheet = default_agent_network_stylesheet, **dashboard_kwargs): """ Parameters ---------- ip_addr: str Ip address of server to connect/start port: int Port of server to connect/start connect: bool False sets Agent network to connect mode and will connect to specified address True (Default) sets Agent network to initially try to connect and if it cant find one, it will start a new server at specified address log_filename: str Name of log file, acceptable csv format. It will be saved locally, in the same folder as the python script in which this AgentNetwork is instantiated on. If set to None or False, then will not save in a file. Note that the overhead of updating the log file can be huge, especially for high number of agents and large data transmission. dashboard_modules : list of modules , modules or bool Accepts list of modules which contains the AgentMET4FOF and DataStreamMET4FOF derived classes If set to True, will initiate the dashboard with default agents in AgentMET4FOF dashboard_update_interval : int Regular interval (seconds) to update the dashboard graphs dashboard_max_monitors : int Due to complexity in managing and instantiating dynamic figures, a maximum number of monitors is specified first and only the each Monitor Agent will occupy one of these figures. dashboard_port: int Port of the dashboard to be hosted on. By default is port 8050. **dashboard_kwargs Additional key words to be passed in initialising the dashboard """ self.backend = backend self.ip_addr = ip_addr self.port = port self._controller = None self._logger = None self.log_filename = log_filename self.mesa_update_interval = mesa_update_interval if connect: self.is_parent_mesa = False else: self.is_parent_mesa = True if type(self.log_filename) == str and '.csv' in self.log_filename: self.save_logfile = True else: self.save_logfile = False # handle different choices of backends if self.backend == "osbrain": if connect: self.connect(ip_addr, port, verbose=False) else: self.connect(ip_addr, port, verbose=False) if self.ns == 0: self.start_server_osbrain(ip_addr, port) elif self.backend == "mesa": self.start_server_mesa() else: raise NotImplementedError("Backend has not been implemented. Valid choices are 'osbrain' and 'mesa'.") if isinstance(dashboard_extensions, list) == False: dashboard_extensions = [dashboard_extensions] # handle instantiating the dashboard # if dashboard_modules is False, the dashboard will not be launched if dashboard_modules is not False: from .dashboard.Dashboard_agt_net import Dashboard_agt_net # Initialize common dashboard parameters for both types of dashboards # corresponding to different backends. dashboard_params = { "dashboard_modules": dashboard_modules, "dashboard_layouts": [Dashboard_agt_net] + dashboard_extensions, "dashboard_update_interval": dashboard_update_interval, "max_monitors": dashboard_max_monitors, "ip_addr": ip_addr, "port": dashboard_port, "agentNetwork": self, "network_stylesheet":network_stylesheet } dashboard_params.update(dashboard_kwargs) # Initialize dashboard process/thread. if self.backend == "osbrain": from .dashboard.Dashboard import AgentDashboardThread self.dashboard_proc = AgentDashboardThread(**dashboard_params) elif self.backend == "mesa": from .dashboard.Dashboard import AgentDashboardThread self.dashboard_proc = AgentDashboardThread(**dashboard_params) self.dashboard_proc.start() else: self.dashboard_proc = None
[docs] def connect(self, ip_addr="127.0.0.1", port=3333, verbose=True): """ Only for osbrain backend. Connects to an existing AgentNetwork. Parameters ---------- ip_addr: str IP Address of server to connect to port: int Port of server to connect to """ try: self.ns = NSProxy(nsaddr=ip_addr + ':' + str(port)) except: if verbose: print("Unable to connect to existing NameServer...") self.ns = 0
[docs] def start_server_osbrain(self, ip_addr: str = "127.0.0.1", port: int = 3333): """Starts a new AgentNetwork for osBrain and initializes :attr:`_controller` Parameters ---------- ip_addr: str IP Address of server to start port: int Port of server to start """ print("Starting NameServer...") self.ns = run_nameserver(addr=ip_addr + ':' + str(port)) if len(self.ns.agents()) != 0: self.ns.shutdown() self.ns = run_nameserver(addr=ip_addr + ':' + str(port)) self._controller = run_agent( "AgentController", base=_AgentController, attributes=dict(log_mode=True), nsaddr=self.ns.addr(), addr=ip_addr, ) self._logger = run_agent("Logger", base=_Logger, nsaddr=self.ns.addr()) self._controller.init_parameters(ns=self.ns, backend=self.backend) self._logger.init_parameters( log_filename=self.log_filename, save_logfile=self.save_logfile )
[docs] def start_server_mesa(self): """Starts a new AgentNetwork for Mesa and initializes :attr:`_controller` Handles the initialisation for :attr:`backend` ``== "mesa"``. Involves spawning two nested objects :attr:`mesa_model` and :attr:`_controller` and calls :meth:`start_mesa_timer`. """ self.mesa_model = MesaModel() self._controller = _AgentController(name="AgentController", backend=self.backend) self._controller.init_parameters(backend=self.backend, mesa_model=self.mesa_model) self.start_mesa_timer(self.mesa_update_interval)
def _set_mode(self, state): """ Internal method to set mode of Agent Controller Parameters ---------- state: str State of AgentController to set. """ self._get_controller().set_attr(current_state=state) def _get_mode(self): """ Returns ------- state: str State of Agent Network """ return self._get_controller().get_attr('current_state')
[docs] def get_mode(self): """ Returns ------- state: str State of Agent Network """ return self._get_controller().get_attr('current_state')
[docs] def set_running_state(self, filter_agent=None): """ Blanket operation on all agents to set their `current_state` attribute to "Running" Users will need to define their own flow of handling each type of `self.current_state` in the `agent_loop` Parameters ---------- filter_agent : str (Optional) Filter name of agents to set the states """ self.set_agents_state(filter_agent=filter_agent, state="Running")
def update_networkx(self): self._get_controller().update_networkx() def get_networkx(self): return self._get_controller().get_attr('G') def get_nodes_edges(self): G = self.get_networkx() return G.nodes, G.edges(data=True) def get_nodes(self): G = self.get_networkx() return G.nodes def get_edges(self): G = self.get_networkx() return G.edges
[docs] def set_stop_state(self, filter_agent=None): """ Blanket operation on all agents to set their `current_state` attribute to "Stop" Users will need to define their own flow of handling each type of `self.current_state` in the `agent_loop` Parameters ---------- filter_agent : str (Optional) Filter name of agents to set the states """ self.set_agents_state(filter_agent=filter_agent, state="Stop")
[docs] def set_agents_state(self, filter_agent=None, state="Idle"): """ Blanket operation on all agents to set their `current_state` attribute to given state Can be used to define different states of operation such as "Running", "Idle, "Stop", etc.. Users will need to define their own flow of handling each type of `self.current_state` in the `agent_loop` Parameters ---------- filter_agent : str (Optional) Filter name of agents to set the states state : str State of agents to set """ self._set_mode(state) for agent_name in self.agents(): if (filter_agent is not None and filter_agent in agent_name) or (filter_agent is None): agent = self.get_agent(agent_name) try: agent.set_attr(current_state=state) except Exception as e: print(e) print("SET STATE: ", state) return 0
def reset_agents(self): for agent_name in self.agents(): agent = self.get_agent(agent_name) agent.reset() agent.set_attr(current_state="Reset") self._set_mode("Reset") return 0 def remove_agent(self, agent): if type(agent) == str: agent_proxy = self.get_agent(agent) else: agent_proxy = agent for input_agent in agent_proxy.get_attr("Inputs"): self.get_agent(input_agent).unbind_output(agent_proxy) for output_agent in agent_proxy.get_attr("Outputs"): agent_proxy.unbind_output(self.get_agent(output_agent)) agent_proxy.shutdown()
[docs] def bind_agents(self, source, target, channel="default"): """ Binds two agents communication channel in a unidirectional manner from `source` Agent to `target` Agent Any subsequent calls of `source.send_output()` will reach `target` Agent's message queue. Parameters ---------- source : AgentMET4FOF Source agent whose Output channel will be binded to `target` target : AgentMET4FOF Target agent whose Input channel will be binded to `source` """ source.bind_output(target, channel=channel) return 0
[docs] def unbind_agents(self, source, target): """ Unbinds two agents communication channel in a unidirectional manner from `source` Agent to `target` Agent This is the reverse of `bind_agents()` Parameters ---------- source : AgentMET4FOF Source agent whose Output channel will be unbinded from `target` target : AgentMET4FOF Target agent whose Input channel will be unbinded from `source` """ source.unbind_output(target) return 0
def _get_controller(self): """Internal method to access the AgentController relative to the nameserver""" return self._controller def _get_logger(self): """Internal method to access the Logger relative to the nameserver""" return self._logger
[docs] def get_agent(self, agent_name): """ Returns a particular agent connected to Agent Network. Parameters ---------- agent_name : str Name of agent to search for in the network """ return self._get_controller().get_agent(agent_name)
[docs] def agents(self, filter_agent: Optional[str] = None) -> List[str]: """Returns all or subset of agents' names connected to agent network Parameters ---------- filter_agent : str, optional if present, only those names are returned which contain ``filter_agent``'s value Returns ------- list[str] requested names of agents """ all_agent_names = self._get_controller().agents() if filter_agent is not None: filtered_agent_names = [ agent_name for agent_name in all_agent_names if filter_agent in agent_name ] return filtered_agent_names return all_agent_names
def generate_module_name_byType(self,agentType): return (self._get_controller().generate_module_name_byType(agentType))
[docs] def add_agent(self, name=" ", agentType=AgentMET4FOF, log_mode=True, buffer_size=1000, ip_addr=None, loop_wait=None, **kwargs): """ Instantiates a new agent in the network. Parameters ---------- name str : (Optional) Unique name of agent. here cannot be more than one agent with the same name. Defaults to the agent's class name. agentType AgentMET4FOF : (Optional) Agent class to be instantiated in the network. Defaults to :py:class:`AgentMET4FOF` log_mode bool : (Optional) Determines if messages will be logged to background Logger Agent. Defaults to `True`. Returns ------- AgentMET4FOF : Newly instantiated agent """ if ip_addr is None: ip_addr = self.ip_addr agent = self._get_controller().add_agent(name=name, agentType=agentType, log_mode=log_mode, buffer_size=buffer_size, ip_addr=ip_addr, loop_wait=loop_wait, **kwargs) return agent
[docs] def add_coalition(self, name="Coalition_1", agents=[]): """ Instantiates a coalition of agents. """ new_coalition = Coalition(name, agents) self._get_controller().add_coalition(new_coalition) return new_coalition
[docs] def add_coalition_agent(self, name="Coalition_1", agents=[]): """ Add agents into the coalition """ self._get_controller().add_coalition_agent(name, agents)
[docs] def remove_coalition_agent(self, coalition_name, agent_name=""): """ Remove agent from coalition """ self._get_controller().remove_coalition_agent(coalition_name, agent_name)
[docs] def get_coalition(self, name): """ Returns the coalition with the provided name """ return self._get_controller().get_coalition(name)
def del_coalition(self): self._get_controller().del_coalition() @property def coalitions(self): return self._get_controller().get_attr("coalitions") def get_mesa_model(self): return self.mesa_model
[docs] def shutdown(self): """Shuts down the entire agent network and all agents""" # Shutdown the nameserver. # This leaves some process clutter in the process list, but the actual # processes are ended. if self.backend == "osbrain": self._get_controller().get_attr('ns').shutdown() elif self.backend == "mesa": self._get_controller().stop_mesa_timer() self.mesa_model.shutdown() # Shutdown the dashboard if present. if self.dashboard_proc is not None: # This calls either the provided method Process.terminate() which # abruptly stops the running multiprocess.Process in case of the osBrain # backend or the self-written method in the class AgentDashboardThread # ensuring the proper termination of the dash.Dash app. self.dashboard_proc.terminate() # Then wait for the termination of the actual thread or at least finish the # execution of the join method in case of the "Mesa" backend. See #163 # for the search for a proper solution to this issue. self.dashboard_proc.join(timeout=10) return 0
def start_mesa_timer(self, update_interval): self._get_controller().start_mesa_timer(update_interval) def stop_mesa_timer(self): self._get_controller().stop_mesa_timer() def step_mesa_model(self): self._get_controller().step_mesa_model()
[docs]class Coalition(): """ A special class for grouping agents. It is rendered as a parent group on the dashboard, along with its member agents. """ def __init__(self, name="Coalition", agents=[]): self.agents = agents self.name = name def agent_names(self): return [agent.get_attr("name") for agent in self.agents] def add_agent(self, agent): self.agents.append(agent) def remove_agent(self, agent): if isinstance(agent, str): self.agents = [agent_i for agent_i in self.agents if agent_i.name != agent] elif isinstance(agent, AgentMET4FOF): self.agents = [agent_i for agent_i in self.agents if agent_i.name != agent.name]
[docs]class DataStreamAgent(AgentMET4FOF): """ Able to simulate generation of datastream by loading a given DataStreamMET4FOF object. Can be used in incremental training or batch training mode. To simulate batch training mode, set `pretrain_size=-1` , otherwise, set pretrain_size and batch_size for the respective See `DataStreamMET4FOF` on loading your own data set as a data stream. """
[docs] def init_parameters(self, stream=DataStreamMET4FOF(), pretrain_size=None, batch_size=1, loop_wait=1, randomize=False): """ Parameters ---------- stream : DataStreamMET4FOF A DataStreamMET4FOF object which provides the sample data pretrain_size : int The number of sample data to send through in the first loop cycle, and subsequently, the batch_size will be used batch_size : int The number of sample data to send in every loop cycle loop_wait : int The duration to wait (seconds) at the end of each loop cycle before going into the next cycle randomize : bool Determines if the dataset should be shuffled before streaming """ self.stream = stream self.stream.prepare_for_use() if randomize: self.stream.randomize_data() self.batch_size = batch_size if pretrain_size is None: self.pretrain_size = batch_size else: self.pretrain_size = pretrain_size self.pretrain_done = False self.loop_wait = loop_wait
[docs] def agent_loop(self): if self.current_state == "Running": if self.pretrain_size is None: self.send_next_sample(self.batch_size) elif self.pretrain_size == -1 or self.batch_size == -1: self.send_all_sample() self.pretrain_done = True else: # handle pre-training mode if self.pretrain_done: self.send_next_sample(self.batch_size) else: self.send_next_sample(self.pretrain_size) self.pretrain_done = True
def send_next_sample(self, num_samples=1): if self.stream.has_more_samples(): data = self.stream.next_sample(num_samples) self.log_info("DATA SAMPLE ID: " + str(self.stream.sample_idx)) self.send_output(data)
[docs] def reset(self): super(DataStreamAgent, self).reset() self.stream.reset()
def send_all_sample(self): self.send_next_sample(-1)
[docs]class MonitorAgent(AgentMET4FOF): """ Unique Agent for storing plots and data from messages received from input agents. The dashboard searches for Monitor Agents' `buffer` and `plots` to draw the graphs "plot" channel is used to receive base64 images from agents to plot on dashboard Attributes ---------- plots : dict Dictionary of format `{agent1_name : agent1_plot, agent2_name : agent2_plot}` plot_filter : list of str List of keys to filter the 'data' upon receiving message to be saved into memory Used to specifically select only a few keys to be plotted custom_plot_function : callable a custom plot function that can be provided to handle the data in the monitor agents buffer (see :class:`AgentMET4FOF` for details). The function gets provided with the content (value) of the buffer and with the string of the sender agent's name as stored in the buffer's keys. Additionally any other parameters can be provided as a dict in custom_plot_parameters. custom_plot_parameters : dict a custom dictionary of parameters that shall be provided to each call of the custom_plot_function """
[docs] def init_parameters( self, plot_filter: Optional[List[str]] = None, custom_plot_function: Optional[Callable[..., Scatter]] = None, **kwargs ): """Initialize the monitor agent's parameters Parameters ---------- plot_filter : list of str, optional List of keys to filter the 'data' upon receiving message to be saved into memory. Used to specifically select only a few keys to be plotted custom_plot_function : callable, optional a custom plot function that can be provided to handle the data in the monitor agents buffer (see :class:`AgentMET4FOF` for details). The function gets provided with the content (value) of the buffer and with the string of the sender agent's name as stored in the buffer's keys. Additionally any other parameters can be provided as a dict in custom_plot_parameters. By default the data gets plotted as shown in the various tutorials. kwargs : Any custom key word parameters that shall be provided to each call of the :attr:`custom_plot_function` """ self.plots = {} self.plot_filter = plot_filter self.custom_plot_function = custom_plot_function self.custom_plot_parameters = kwargs
[docs] def on_received_message(self, message): """ Handles incoming data from 'default' and 'plot' channels. Stores 'default' data into :attr:`buffer` and 'plot' data into :attr:`plots` Parameters ---------- message : dict Acceptable channel values are 'default' or 'plot' """ if message['channel'] == 'default': if self.plot_filter: message['data'] = {key: message['data'][key] for key in self.plot_filter} self.buffer_store(agent_from=message["from"], data=message["data"]) elif message['channel'] == 'plot': self.update_plot_memory(message) return 0
[docs] def update_plot_memory(self, message: Dict[str, Any]): """ Updates plot figures stored in `self.plots` with the received message Parameters ---------- message : dict Standard message format specified by AgentMET4FOF class Message['data'] needs to be base64 image string and can be nested in dictionary for multiple plots Only the latest plot will be shown kept and does not keep a history of the plots. """ if type(message['data']) != dict or message['from'] not in self.plots.keys(): self.plots[message['from']] = message['data'] elif type(message['data']) == dict: for key in message['data'].keys(): self.plots[message['from']].update({key: message['data'][key]}) self.log_info("PLOTS: " + str(self.plots))
[docs] def reset(self): super(MonitorAgent, self).reset() del self.plots self.plots = {}
class _Logger(AgentMET4FOF): """ An internal logger agent which are instantiated immediately with each AgentNetwork. It collects all the logs which are sent to it, and print them and optionally save them into a csv log file. Since the user is not expected to directly access the logger agent, its initialisation option and interface are provided via the AgentNetwork object. When log_info of any agent is called, the agent will send the data to the logger agent. """ def init_parameters(self, log_filename="log_file.csv", save_logfile=True): self.current_log_handlers = {"INFO": self.log_handler} self.bind('SUB', 'sub', {"INFO": self.log_handler}) self.log_filename = log_filename self.save_logfile = save_logfile if self.save_logfile: try: # writes a new file self.writeFile = open(self.log_filename, 'w', newline='') writer = csv.writer(self.writeFile) writer.writerow(['Time', 'Name', 'Topic', 'Data']) # set to append mode self.writeFile = open(self.log_filename, 'a', newline='') except: raise Exception self.save_cycles = 0 @property def subscribed_topics(self): return list(self.current_log_handlers.keys()) def bind_log_handler(self, log_handler_functions): for topic in self.subscribed_topics: self.unsubscribe('sub', topic) self.current_log_handlers.update(log_handler_functions) self.subscribe('sub', self.current_log_handlers) def log_handler(self, message, topic): sys.stdout.write(message + '\n') sys.stdout.flush() self.save_log_info(str(message)) def save_log_info(self, log_msg): re_sq = r'\[(.*?)\]' re_rd = r'\((.*?)\)' date = re.findall(re_sq, log_msg)[0] date = "[" + date + "]" agent_name = re.findall(re_rd, log_msg)[0] contents = log_msg.split(':') if len(contents) > 4: topic = contents[3] data = str(contents[4:]) else: topic = contents[3] data = " " if self.save_logfile: try: # append new row writer = csv.writer(self.writeFile) writer.writerow([str(date), agent_name, topic, data]) if self.save_cycles % 15 == 0: self.writeFile.close() self.writeFile = open(self.log_filename, 'a', newline='') self.save_cycles += 1 except: raise Exception
[docs]class SineGeneratorAgent(AgentMET4FOF): """An agent streaming a sine signal Takes samples from the :py:mod:`SineGenerator` and pushes them sample by sample to connected agents via its output channel. """
[docs] def init_parameters(self, sfreq=500, sine_freq=5): """Initialize the input data Initialize the input data stream as an instance of the :class:`SineGenerator` class. Parameters ---------- sfreq : int sampling frequency for the underlying signal sine_freq : float frequency of the generated sine wave """ self._sine_stream = SineGenerator(sfreq=sfreq, sine_freq=sine_freq)
[docs] def agent_loop(self): """Model the agent's behaviour On state *Running* the agent will extract sample by sample the input data streams content and push it via invoking :meth:`AgentMET4FOF.send_output`. """ if self.current_state == "Running": sine_data = self._sine_stream.next_sample() # dictionary self.send_output(sine_data["quantities"])