agentMET4FOF agents

class agentMET4FOF.agents.AgentBuffer(buffer_size: int = 1000)[source]

Buffer class which is instantiated in every agent to store data incrementally

This buffer is necessary to handle multiple inputs coming from agents.

We can access the buffer like a dict with exposed functions such as .values(), .keys() and .items(). The actual dict object is stored in the variable buffer.

buffer

The buffer can be a dict of iterables, or a dict of dict of iterables for nested named data. The keys are the names of agents.

Type:dict of iterables or dict of dicts of iterables
buffer_size

The total number of elements to be stored in the agent buffer

supported_datatypes

List of all types supported and thus properly handled by the buffer. Defaults to np.ndarray, list and Pandas DataFrame

Type:list of types
buffer_filled(agent_from: Optional[str] = None) → bool[source]

Checks whether buffer is filled, by comparing against the buffer_size

For nested dict, this returns True if any of the iterables is beyond the buffer_size. For any of the dict values , which is not one of supported_datatypes this returns None.

Parameters:agent_from (str, optional) – Name of input agent in the buffer dict to be looked up. If agent_from is not provided, we check for all iterables in the buffer (default).
Returns:True if either the or any of the iterables has reached buffer_size or None in case none of the values is of one of the supported datatypes. False if all present iterable can take at least one more element according to buffer_size.
Return type:bool or None
check_supported_datatype(obj: object) → bool[source]

Checks whether value is an object of one of the supported data types

Parameters:obj (object) – Value to be checked
Returns:result – True if value is an object of one of the supported data types, False if not
Return type:boolean
clear(agent_from: Optional[str] = None)[source]

Clears the data in the buffer

Parameters:agent_from (str, optional) – Name of agent, if agent_from is not given, the entire buffer is flushed. (default)
items()[source]

Interface to access the internal dict’s items()

keys()[source]

Interface to access the internal dict’s keys()

popleft(n: Optional[int] = 1) → Union[Dict[KT, VT], numpy.ndarray, list, pandas.core.frame.DataFrame][source]

Pops the first n entries in the buffer

Parameters:n (int) – Number of elements to retrieve from buffer
Returns:
store(agent_from: Union[Dict[str, Union[numpy.ndarray, list, pandas.core.frame.DataFrame]], str], data: Union[numpy.ndarray, list, pandas.core.frame.DataFrame, float, int] = None, concat_axis: Optional[int] = 0)[source]

Stores data into buffer with the received message

Checks if sender agent has sent any message before. If it did, then append, otherwise create new entry for it.

Parameters:
  • agent_from (dict | str) – if type is dict, we expect it to be the agentMET4FOF dict message to be compliant with older code (keys from and data present’), otherwise we expect it to be name of agent sender and data will need to be passed as parameter
  • data (np.ndarray, DataFrame, list, float or int) – Not used if agent_from is a dict. Otherwise data is compulsory.
  • concat_axis (int, optional) – axis to concatenate on with the buffering for numpy arrays. Default is 0.
update(agent_from: Union[Dict[str, Union[numpy.ndarray, list, pandas.core.frame.DataFrame]], str], data: Union[numpy.ndarray, list, pandas.core.frame.DataFrame, float, int] = None)[source]

Overrides data in the buffer dict keyed by agent_from with value data

If data is a single value, this converts it into a list first before storing in the buffer dict.

Parameters:
  • agent_from (str) – Name of agent sender
  • data (np.ndarray, DataFrame, list, float or int) – New incoming data
values()[source]

Interface to access the internal dict’s values()

class agentMET4FOF.agents.AgentMET4FOF(name='', host=None, serializer=None, transport=None, attributes=None, backend='osbrain', mesa_model=None)[source]

Base class for all agents with specific functions to be overridden/supplied by user.

Behavioral functions for users to provide are init_parameters, agent_loop and on_received_message. Communicative functions are bind_output, unbind_output and send_output.

agent_loop()[source]

User defined method for the agent to execute for loop_wait seconds specified either in self.loop_wait or explicitly via`init_agent_loop(loop_wait)`

To start a new loop, call init_agent_loop(loop_wait) on the agent Example of usage is to check the current_state of the agent and send data periodically

bind_output(output_agent, channel='default')[source]

Forms Output connection with another agent. Any call on send_output will reach this newly binded agent

Adds the agent to its list of Outputs.

Parameters:
  • output_agent (AgentMET4FOF or list) – Agent(s) to be binded to this agent’s output channel
  • channel (str or list of str) – Specific name of the channel(s) to be subscribed to. (Default = “data”)
buffer_clear(agent_name: Optional[str] = None)[source]

Empties buffer which is a dict indexed by the agent_name.

Parameters:agent_name (str, optional) – Key of the memory dict, which can be the name of input agent, or self.name. If not supplied (default), we assume to clear the entire memory.
buffer_filled(agent_name=None)[source]

Checks whether the internal buffer has been filled to the maximum allowed specified by self.buffer_size

Parameters:agent_name (str) – Index of the buffer which is the name of input agent.
Returns:status of buffer filled
Return type:boolean
buffer_store(agent_from: str, data=None, concat_axis=0)[source]

Updates data stored in self.buffer with the received message

Checks if sender agent has sent any message before If it did,then append, otherwise create new entry for it

Parameters:
  • agent_from (str) – Name of agent sender
  • data – Any supported data which can be stored in dict as buffer. See AgentBuffer for more information.
get_attr(attr)[source]

Return the specified attribute of the agent.

Parameters:name – Name of the attribute to be retrieved.
handle_process_data(message)[source]

Internal method to handle incoming message before calling user-defined on_received_message method.

If current_state is either Stop or Reset, it will terminate early before entering on_received_message

init_agent(buffer_size=1000, log_mode=True)[source]

Internal initialization to setup the agent: mainly on setting the dictionary of Inputs, Outputs, PubAddr.

Calls user-defined init_parameters() upon finishing.

Inputs

Dictionary of Agents connected to its input channels. Messages will arrive from agents in this dictionary. Automatically updated when bind_output() function is called

Type:dict
Outputs

Dictionary of Agents connected to its output channels. Messages will be sent to agents in this dictionary. Automatically updated when bind_output() function is called

Type:dict
PubAddr_alias

Name of Publish address socket

Type:str
PubAddr

Publish address socket handle

Type:str
AgentType

Name of class

Type:str
current_state

Current state of agent. Can be used to define different states of operation such as “Running”, “Idle, “Stop”, etc.. Users will need to define their own flow of handling each type of self.current_state in the agent_loop

Type:str
loop_wait

The interval to wait between loop. Call init_agent_loop to restart the timer or set the value of loop_wait in init_parameters when necessary.

Type:int
buffer_size

The total number of elements to be stored in the agent buffer When total elements exceeds this number, the latest elements will be replaced with the incoming data elements

Type:int
init_agent_loop(loop_wait: Optional[int] = None)[source]

Initiates the agent loop, which iterates every loop_wait seconds

Stops every timers and initiate a new loop.

Parameters:loop_wait (int, optional) – The wait between each iteration of the loop
init_buffer(buffer_size)[source]

A method to initialise the buffer. By overriding this method, user can provide a custom buffer, instead of the regular AgentBuffer. This can be used, for example, to provide a MetrologicalAgentBuffer in the metrological agents.

init_parameters()[source]

User provided function to initialize parameters of choice.

log_info(message)[source]

Prints logs to be saved into logfile with Logger Agent

Parameters:message (str) – Message to be logged to the internal Logger Agent
on_connect_output(output_agent)[source]

This user provided method is called whenever an agent is connected to its output.

This can be for example, to send metadata or ping to the output agent.

on_received_message(message)[source]

User-defined method and is triggered to handle the message passed by Input.

Parameters:message (Dictionary) – The message received is in form {‘from’:agent_name, ‘data’: data, ‘senderType’: agent_class, ‘channel’:channel_name} agent_name is the name of the Input agent which sent the message data is the actual content of the message
pack_data(data, channel='default')[source]

Internal method to pack the data content into a dictionary before sending out.

Special case : if the data is already a message, then the from and senderType will be altered to this agent, without altering the data and channel within the message this is used for more succinct data processing and passing.

Parameters:
  • data (argument) – Data content to be packed before sending out to agents.
  • channel (str) – Key of dictionary which stores data
Returns:

Packed message data

Return type:

dict of the form {‘from’:agent_name, ‘data’: data, ‘senderType’: agent_class, ‘channel’:channel_name}.

reset()[source]

This method will be called on all agents when the global reset_agents is called by the AgentNetwork and when the Reset button is clicked on the dashboard.

Method to reset the agent’s states and parameters. User can override this method to reset the specific parameters.

respond_reply_attr_(message_data)[source]

Response to a reply of setting attribute

respond_request_attr_(attribute: str)[source]

Response to a request of attribute from input agents.

This agent reply with the requested attribute if it has it.

respond_request_method_(message_data: dict)[source]

Response to a request of executing method from input agents.

This agent will execute the method with the provided parameters of the method.

send_output(data, channel='default')[source]

Sends message data to all connected agents in self.Outputs.

Output connection can first be formed by calling bind_output. By default calls pack_data(data) before sending out. Can specify specific channel as opposed to ‘default’ channel.

Parameters:
  • data (argument) – Data content to be sent out
  • channel (str) – Key of message dictionary which stores data
Returns:

message

Return type:

dict of the form {‘from’:agent_name, ‘data’: data, ‘senderType’: agent_class, ‘channel’:channel_name}.

send_plot(fig: Union[matplotlib.figure.Figure, Dict[str, matplotlib.figure.Figure]], mode: str = 'image')[source]

Sends plot to agents connected to this agent’s Output channel.

This method is different from send_output which will be sent to through the ‘plot’ channel to be handled.

Tradeoffs between “image” and “plotly” modes are that “image” are more stable and “plotly” are interactive. Note not all (complicated) matplotlib figures can be converted into a plotly figure.

Parameters:
  • fig (matplotlib.figure.Figure or dict of matplotlib.figure.Figure) – Alternatively, multiple figures can be nested in a dict (with any preferred keys) e.g {“Temperature”:matplotlib.Figure, “Acceleration”:matplotlib.Figure}
  • mode (str) – “image” - converts into image via encoding at base64 string. “plotly” - converts into plotly figure using mpl_to_plotly Default: “image”
Returns:

graph

Return type:

str or plotly figure or dict of one of those converted figure(s)

send_request_attribute(attribute: str)[source]

Send a request of attribute to output agents.

Output agents will reply with the requested attribute if they have.

send_request_method(method: str, **method_params)[source]

Send a request of executing methods to output agents.

Output agents will respond by calling the method.

send_set_attr(attr: str, value)[source]

Sends a message to set the attr of another agent to that of value.

Parameters:
  • attr (str) – The variable name of the output agent to be set.
  • value – The value of the variable to be set
set_attr(**kwargs)[source]

Set object attributes.

Parameters:kwargs ([name, value]) – Keyword arguments will be used to set the object attributes.
shutdown()[source]

Cleanly stop and shut down the agent assuming the agent is running.

Will let the main thread do the tear down.

step()[source]

Used for MESA backend only. Behaviour on every update step.

stop_agent_loop()[source]

Stops agent_loop from running. Note that the agent will still be responding to messages

unbind_output(output_agent)[source]

Remove existing output connection with another agent. This reverses the bind_output method

Parameters:output_agent (AgentMET4FOF) – Agent binded to this agent’s output channel
class agentMET4FOF.agents.AgentNetwork(ip_addr='127.0.0.1', port=3333, connect=False, log_filename='log_file.csv', dashboard_modules=True, dashboard_extensions=[], dashboard_update_interval=3, dashboard_max_monitors=10, dashboard_port=8050, backend='osbrain', mesa_update_interval=0.1, network_stylesheet=[{'selector': 'node', 'style': {'label': 'data(id)', 'shape': 'rectangle', 'text-valign': 'center', 'text-halign': 'center', 'color': '#FFF', 'text-outline-width': 1.5, 'text-outline-color': '#000232'}}, {'selector': 'edge', 'style': {'curve-style': 'unbundled-bezier', 'mid-target-arrow-shape': 'triangle', 'arrow-scale': 2, 'line-color': '#4287f5', 'mid-target-arrow-color': '#4287f5', 'label': 'data(channel)', 'text-outline-width': 1.5, 'text-outline-color': '#000232', 'color': '#FFF', 'text-margin-x': '10px', 'text-margin-y': '20px'}}, {'selector': '.rectangle', 'style': {'shape': 'rectangle'}}, {'selector': '.triangle', 'style': {'shape': 'triangle'}}, {'selector': '.octagon', 'style': {'shape': 'octagon'}}, {'selector': '.ellipse', 'style': {'shape': 'ellipse'}}, {'selector': '.bluebackground', 'style': {'background-color': '#c4fdff'}}, {'selector': '.blue', 'style': {'background-color': '#006db5'}}, {'selector': '.coalition', 'style': {'background-color': '#c4fdff', 'text-valign': 'top', 'text-halign': 'center'}}, {'selector': '.coalition-edge', 'style': {'line-style': 'dashed'}}, {'selector': '.outline', 'style': {'color': '#fff', 'text-outline-color': '#888', 'text-outline-width': 2}}], **dashboard_kwargs)[source]

Object for starting a new Agent Network or connect to an existing Agent Network specified by ip & port

Provides function to add agents, (un)bind agents, query agent network state, set global agent states Interfaces with an internal _AgentController which is hidden from user

add_agent(name=' ', agentType=<class 'agentMET4FOF.agents.AgentMET4FOF'>, log_mode=True, buffer_size=1000, ip_addr=None, loop_wait=None, **kwargs)[source]

Instantiates a new agent in the network.

Parameters:
  • str (name) – with the same name. Defaults to the agent’s class name.
  • AgentMET4FOF (agentType) – network. Defaults to AgentMET4FOF
  • bool (log_mode) – Logger Agent. Defaults to True.
Returns:

AgentMET4FOF

Return type:

Newly instantiated agent

add_coalition(name='Coalition_1', agents=[])[source]

Instantiates a coalition of agents.

add_coalition_agent(name='Coalition_1', agents=[])[source]

Add agents into the coalition

agents(filter_agent: Optional[str] = None) → List[str][source]

Returns all or subset of agents’ names connected to agent network

Parameters:filter_agent (str, optional) – if present, only those names are returned which contain filter_agent’s value
Returns:requested names of agents
Return type:list[str]
bind_agents(source, target, channel='default')[source]

Binds two agents communication channel in a unidirectional manner from source Agent to target Agent

Any subsequent calls of source.send_output() will reach target Agent’s message queue.

Parameters:
  • source (AgentMET4FOF) – Source agent whose Output channel will be binded to target
  • target (AgentMET4FOF) – Target agent whose Input channel will be binded to source
connect(ip_addr='127.0.0.1', port=3333, verbose=True)[source]

Only for osbrain backend. Connects to an existing AgentNetwork.

Parameters:
  • ip_addr (str) – IP Address of server to connect to
  • port (int) – Port of server to connect to
get_agent(agent_name)[source]

Returns a particular agent connected to Agent Network.

Parameters:agent_name (str) – Name of agent to search for in the network
get_coalition(name)[source]

Returns the coalition with the provided name

get_mode()[source]
Returns:state – State of Agent Network
Return type:str
remove_coalition_agent(coalition_name, agent_name='')[source]

Remove agent from coalition

set_agents_state(filter_agent=None, state='Idle')[source]

Blanket operation on all agents to set their current_state attribute to given state

Can be used to define different states of operation such as “Running”, “Idle, “Stop”, etc.. Users will need to define their own flow of handling each type of self.current_state in the agent_loop

Parameters:
  • filter_agent (str) – (Optional) Filter name of agents to set the states
  • state (str) – State of agents to set
set_running_state(filter_agent=None)[source]

Blanket operation on all agents to set their current_state attribute to “Running”

Users will need to define their own flow of handling each type of self.current_state in the agent_loop

Parameters:filter_agent (str) – (Optional) Filter name of agents to set the states
set_stop_state(filter_agent=None)[source]

Blanket operation on all agents to set their current_state attribute to “Stop”

Users will need to define their own flow of handling each type of self.current_state in the agent_loop

Parameters:filter_agent (str) – (Optional) Filter name of agents to set the states
shutdown()[source]

Shuts down the entire agent network and all agents

start_server_mesa()[source]

Starts a new AgentNetwork for Mesa and initializes _controller

Handles the initialisation for backend == "mesa". Involves spawning two nested objects mesa_model and _controller and calls start_mesa_timer().

start_server_osbrain(ip_addr: str = '127.0.0.1', port: int = 3333)[source]

Starts a new AgentNetwork for osBrain and initializes _controller

Parameters:
  • ip_addr (str) – IP Address of server to start
  • port (int) – Port of server to start
unbind_agents(source, target)[source]

Unbinds two agents communication channel in a unidirectional manner from source Agent to target Agent

This is the reverse of bind_agents()

Parameters:
  • source (AgentMET4FOF) – Source agent whose Output channel will be unbinded from target
  • target (AgentMET4FOF) – Target agent whose Input channel will be unbinded from source
class agentMET4FOF.agents.Coalition(name='Coalition', agents=[])[source]

A special class for grouping agents.

It is rendered as a parent group on the dashboard, along with its member agents.

class agentMET4FOF.agents.DataStreamAgent(name='', host=None, serializer=None, transport=None, attributes=None, backend='osbrain', mesa_model=None)[source]

Able to simulate generation of datastream by loading a given DataStreamMET4FOF object.

Can be used in incremental training or batch training mode. To simulate batch training mode, set pretrain_size=-1 , otherwise, set pretrain_size and batch_size for the respective See DataStreamMET4FOF on loading your own data set as a data stream.

agent_loop()[source]

User defined method for the agent to execute for loop_wait seconds specified either in self.loop_wait or explicitly via`init_agent_loop(loop_wait)`

To start a new loop, call init_agent_loop(loop_wait) on the agent Example of usage is to check the current_state of the agent and send data periodically

init_parameters(stream=<agentMET4FOF.streams.DataStreamMET4FOF object>, pretrain_size=None, batch_size=1, loop_wait=1, randomize=False)[source]
Parameters:
  • stream (DataStreamMET4FOF) – A DataStreamMET4FOF object which provides the sample data
  • pretrain_size (int) – The number of sample data to send through in the first loop cycle, and subsequently, the batch_size will be used
  • batch_size (int) – The number of sample data to send in every loop cycle
  • loop_wait (int) – The duration to wait (seconds) at the end of each loop cycle before going into the next cycle
  • randomize (bool) – Determines if the dataset should be shuffled before streaming
reset()[source]

This method will be called on all agents when the global reset_agents is called by the AgentNetwork and when the Reset button is clicked on the dashboard.

Method to reset the agent’s states and parameters. User can override this method to reset the specific parameters.

class agentMET4FOF.agents.MesaModel[source]

A MESA Model

shutdown()[source]

Shutdown entire MESA model with all agents and schedulers

step()[source]

Advance the model by one step.

class agentMET4FOF.agents.MonitorAgent(name='', host=None, serializer=None, transport=None, attributes=None, backend='osbrain', mesa_model=None)[source]

Unique Agent for storing plots and data from messages received from input agents.

The dashboard searches for Monitor Agents’ buffer and plots to draw the graphs “plot” channel is used to receive base64 images from agents to plot on dashboard

plots

Dictionary of format {agent1_name : agent1_plot, agent2_name : agent2_plot}

Type:dict
plot_filter

List of keys to filter the ‘data’ upon receiving message to be saved into memory Used to specifically select only a few keys to be plotted

Type:list of str
custom_plot_function

a custom plot function that can be provided to handle the data in the monitor agents buffer (see AgentMET4FOF for details). The function gets provided with the content (value) of the buffer and with the string of the sender agent’s name as stored in the buffer’s keys. Additionally any other parameters can be provided as a dict in custom_plot_parameters.

Type:callable
custom_plot_parameters

a custom dictionary of parameters that shall be provided to each call of the custom_plot_function

Type:dict
init_parameters(plot_filter: Optional[List[str]] = None, custom_plot_function: Optional[Callable[[...], plotly.graph_objs._scatter.Scatter]] = None, **kwargs)[source]

Initialize the monitor agent’s parameters

Parameters:
  • plot_filter (list of str, optional) – List of keys to filter the ‘data’ upon receiving message to be saved into memory. Used to specifically select only a few keys to be plotted
  • custom_plot_function (callable, optional) – a custom plot function that can be provided to handle the data in the monitor agents buffer (see AgentMET4FOF for details). The function gets provided with the content (value) of the buffer and with the string of the sender agent’s name as stored in the buffer’s keys. Additionally any other parameters can be provided as a dict in custom_plot_parameters. By default the data gets plotted as shown in the various tutorials.
  • kwargs (Any) – custom key word parameters that shall be provided to each call of the custom_plot_function
on_received_message(message)[source]

Handles incoming data from ‘default’ and ‘plot’ channels.

Stores ‘default’ data into buffer and ‘plot’ data into plots

Parameters:message (dict) – Acceptable channel values are ‘default’ or ‘plot’
reset()[source]

This method will be called on all agents when the global reset_agents is called by the AgentNetwork and when the Reset button is clicked on the dashboard.

Method to reset the agent’s states and parameters. User can override this method to reset the specific parameters.

update_plot_memory(message: Dict[str, Any])[source]

Updates plot figures stored in self.plots with the received message

Parameters:message (dict) – Standard message format specified by AgentMET4FOF class Message[‘data’] needs to be base64 image string and can be nested in dictionary for multiple plots Only the latest plot will be shown kept and does not keep a history of the plots.
class agentMET4FOF.agents.SineGeneratorAgent(name='', host=None, serializer=None, transport=None, attributes=None, backend='osbrain', mesa_model=None)[source]

An agent streaming a sine signal

Takes samples from the SineGenerator and pushes them sample by sample to connected agents via its output channel.

agent_loop()[source]

Model the agent’s behaviour

On state Running the agent will extract sample by sample the input data streams content and push it via invoking AgentMET4FOF.send_output().

init_parameters(sfreq=500, sine_freq=5)[source]

Initialize the input data

Initialize the input data stream as an instance of the SineGenerator class.

Parameters:
  • sfreq (int) – sampling frequency for the underlying signal
  • sine_freq (float) – frequency of the generated sine wave