# Agent dependencies
import base64
import copy
import csv
import datetime
import re
import sys
import time
from collections import deque
from io import BytesIO
from threading import Timer
from typing import Dict, Optional, Union
import matplotlib.figure
import matplotlib.pyplot as plt
import mpld3
import networkx as nx
import numpy as np
import pandas as pd
from mesa import Agent as MesaAgent, Model
from mesa.time import BaseScheduler
from osbrain import Agent as osBrainAgent, NSProxy, run_agent, run_nameserver
from plotly import tools as tls
from .dashboard.Dashboard_agt_net import Dashboard_agt_net
from .streams import DataStreamMET4FOF, SineGenerator
[docs]class AgentMET4FOF(MesaAgent, osBrainAgent):
"""
Base class for all agents with specific functions to be overridden/supplied by user.
Behavioral functions for users to provide are init_parameters, agent_loop and on_received_message.
Communicative functions are bind_output, unbind_output and send_output.
"""
def __init__(self, name='', host=None, serializer=None, transport=None, attributes=None, backend="osbrain",
mesa_model=None):
self.backend = backend.lower()
if self.backend == "osbrain":
self._remove_methods(MesaAgent)
osBrainAgent.__init__(self, name=name, host=host, serializer=serializer, transport=transport,
attributes=attributes)
elif self.backend == "mesa":
MesaAgent.__init__(self, name, mesa_model)
self._remove_methods(osBrainAgent)
self.init_mesa(name)
self.unique_id = name
self.name = name
self.mesa_model = mesa_model
else:
raise NotImplementedError("Backend has not been implemented. Valid choices are 'osbrain' and 'mesa'.")
def init_mesa(self, name):
# MESA Specific parameters
self.mesa_message_queue = deque([])
self.unique_id = name
self.name = name
[docs] def step(self):
"""
Used for MESA backend only. Behaviour on every update step.
"""
# check if there's message in queue
while len(self.mesa_message_queue) > 0:
self.handle_process_data(self.mesa_message_queue.popleft())
# proceed with user-defined agent-loop
self.agent_loop()
def _remove_methods(self, cls):
"""Remove methods from the other backends base class from the current agent"""
for name in list(vars(cls)):
if not name.startswith("__"):
try:
delattr(self, name)
except AttributeError:
# This situation only occurs when we start and stop agent
# networks of differing backends in one sequence. Normally
# ignoring these errors should be no problem.
pass
[docs] def set_attr(self, **kwargs):
for key, val in kwargs.items():
return setattr(self, key, val)
[docs] def get_attr(self, attr):
return getattr(self, attr)
[docs] def init_agent(self, buffer_size=1000, log_mode=True):
"""
Internal initialization to setup the agent: mainly on setting the dictionary of Inputs, Outputs, PubAddr.
Calls user-defined `init_parameters()` upon finishing.
Attributes
----------
Inputs : dict
Dictionary of Agents connected to its input channels. Messages will arrive from agents in this dictionary.
Automatically updated when `bind_output()` function is called
Outputs : dict
Dictionary of Agents connected to its output channels. Messages will be sent to agents in this dictionary.
Automatically updated when `bind_output()` function is called
PubAddr_alias : str
Name of Publish address socket
PubAddr : str
Publish address socket handle
AgentType : str
Name of class
current_state : str
Current state of agent. Can be used to define different states of operation such as "Running", "Idle, "Stop", etc..
Users will need to define their own flow of handling each type of `self.current_state` in the `agent_loop`
loop_wait : int
The interval to wait between loop.
Call `init_agent_loop` to restart the timer or set the value of loop_wait in `init_parameters` when necessary.
buffer_size : int
The total number of elements to be stored in the agent `buffer`
When total elements exceeds this number, the latest elements will be replaced with the incoming data elements
"""
self.Inputs = {}
self.Outputs = {}
self.AgentType = type(self).__name__
self.log_mode = log_mode
self.log_info("INITIALIZED")
# These are the available states to change the agents' behavior in
# agent_loop.
self.states = {0: "Idle", 1: "Running", 2: "Pause", 3: "Stop", 4: "Reset"}
self.current_state = self.states[0]
self.loop_wait = None
self.stylesheet = ""
self.output_channels_info = {}
self.buffer_size = buffer_size
self.buffer = AgentBuffer(self.buffer_size)
if self.backend == 'osbrain':
self.PubAddr_alias = self.name + "_PUB"
self.PubAddr = self.bind('PUB', alias=self.PubAddr_alias, transport='tcp')
[docs] def reset(self):
"""
This method will be called on all agents when the global `reset_agents` is called by the AgentNetwork and when the
Reset button is clicked on the dashboard.
Method to reset the agent's states and parameters. User can override this method to reset the specific parameters.
"""
self.log_info("RESET AGENT STATE")
self.memory = {}
[docs] def init_parameters(self):
"""
User provided function to initialize parameters of choice.
"""
return 0
def log_ML(self, message):
self.send("_logger", message, topic="ML_EXP")
[docs] def log_info(self, message):
"""
Prints logs to be saved into logfile with Logger Agent
Parameters
----------
message : str
Message to be logged to the internal Logger Agent
"""
try:
if self.log_mode:
if self.backend == "osbrain":
super().log_info(message)
elif self.backend == "mesa":
message = '[%s] (%s): %s' % (datetime.datetime.utcnow(), self.name, message)
print(message)
except Exception as e:
print(e)
return 1
[docs] def init_agent_loop(self, loop_wait: Optional[int] = None):
"""
Initiates the agent loop, which iterates every `loop_wait` seconds
Stops every timers and initiate a new loop.
Parameters
----------
loop_wait : int, optional
The wait between each iteration of the loop
"""
# most default: loop wait has not been set in init_parameters() not init_agent_loop()
if self.loop_wait is None and loop_wait is None:
set_loop_wait = 1.0
# init_agent_loop overrides loop_wait parameter
elif loop_wait is not None:
set_loop_wait = loop_wait
# otherwise assume init_parameters() have set loop_wait
elif self.loop_wait is not None:
set_loop_wait = self.loop_wait
self.loop_wait = set_loop_wait
if self.backend == "osbrain":
self.stop_all_timers()
# check if agent_loop is overridden by user
if self.__class__.agent_loop == AgentMET4FOF.agent_loop:
return 0
else:
if self.backend == "osbrain":
self.each(self.loop_wait, self.__class__.agent_loop)
return 0
[docs] def stop_agent_loop(self):
"""
Stops agent_loop from running. Note that the agent will still be responding to messages
"""
self.stop_all_timers()
[docs] def agent_loop(self):
"""
User defined method for the agent to execute for `loop_wait` seconds specified either in `self.loop_wait` or explicitly via`init_agent_loop(loop_wait)`
To start a new loop, call `init_agent_loop(loop_wait)` on the agent
Example of usage is to check the `current_state` of the agent and send data periodically
"""
return 0
[docs] def on_received_message(self, message):
"""
User-defined method and is triggered to handle the message passed by Input.
Parameters
----------
message : Dictionary
The message received is in form {'from':agent_name, 'data': data, 'senderType': agent_class, 'channel':channel_name}
agent_name is the name of the Input agent which sent the message
data is the actual content of the message
"""
return message
[docs] def buffer_filled(self, agent_name=None):
"""
Checks whether the internal buffer has been filled to the maximum allowed specified by self.buffer_size
Parameters
----------
agent_name : str
Index of the buffer which is the name of input agent.
Returns
-------
status of buffer filled : boolean
"""
return self.buffer.buffer_filled(agent_name)
[docs] def buffer_clear(self, agent_name=None):
"""
Empties buffer which is a dict indexed by the `agent_name`.
Parameters
----------
agent_name : str
Key of the memory dict, which can be the name of input agent, or self.name. If one is not supplied, we assume to clear the entire memory.
"""
self.buffer.clear(agent_name)
[docs] def buffer_store(self, agent_from: str, data=None, concat_axis=0):
"""
Updates data stored in `self.buffer` with the received message
Checks if sender agent has sent any message before
If it did,then append, otherwise create new entry for it
Parameters
----------
agent_from : str
Name of agent sender
data
Any supported data which can be stored in dict as buffer. See AgentBuffer for more information.
"""
self.buffer.store(agent_from=agent_from, data=data, concat_axis=concat_axis)
self.log_info("Buffer: " + str(self.buffer.buffer))
[docs] def pack_data(self, data, channel='default'):
"""
Internal method to pack the data content into a dictionary before sending out.
Special case : if the `data` is already a `message`, then the `from` and `senderType` will be altered to this agent,
without altering the `data` and `channel` within the message this is used for more succinct data processing and passing.
Parameters
----------
data : argument
Data content to be packed before sending out to agents.
channel : str
Key of dictionary which stores data
Returns
-------
Packed message data : dict of the form {'from':agent_name, 'data': data, 'senderType': agent_class, 'channel':channel_name}.
"""
# if is a message type, override the `from` and `senderType` fields only
if self._is_type_message(data):
new_data = data
new_data['from'] = self.name
new_data['senderType'] = type(self).__name__
return new_data
return {'from': self.name, 'data': data, 'senderType': type(self).__name__, 'channel': channel}
def _is_type_message(self, data):
"""
Internal method to check if the data carries signature of an agent message type
Parameters
----------
data
Data to be checked for type
Returns
-------
result : boolean
"""
if type(data) == dict:
dict_keys = data.keys()
if 'from' in dict_keys and 'data' in dict_keys and 'senderType' in dict_keys:
return True
return False
[docs] def send_output(self, data, channel='default'):
"""
Sends message data to all connected agents in self.Outputs.
Output connection can first be formed by calling bind_output.
By default calls pack_data(data) before sending out.
Can specify specific channel as opposed to 'default' channel.
Parameters
----------
data : argument
Data content to be sent out
channel : str
Key of `message` dictionary which stores data
Returns
-------
message : dict of the form {'from':agent_name, 'data': data, 'senderType': agent_class, 'channel':channel_name}.
"""
start_time_pack = time.time()
packed_data = self.pack_data(data, channel=channel)
if self.backend == "osbrain":
self.send(self.PubAddr, packed_data, topic='data')
elif self.backend == "mesa":
for key, value in self.Outputs.items():
value.mesa_message_queue.append(packed_data)
duration_time_pack = round(time.time() - start_time_pack, 6)
# LOGGING
try:
if self.log_mode:
self.log_info("Pack time: " + str(duration_time_pack))
self.log_info("Sending: " + str(data))
except Exception as e:
print(e)
# Add info of channel
self._update_output_channels_info(packed_data['data'], packed_data['channel'])
return packed_data
def _update_output_channels_info(self, data, channel):
"""
Internal method to update the dict of output_channels_info. This is used in conjunction with send_output().
Checks and records data type & dimension and channel name
If the data is nested within dict, then it will search deeper and subsequently record the info of each
inner hierarchy
Parameters
----------
data
data to be checked for type & dimension
channel : str
name of channel to be recorded
"""
if channel not in self.output_channels_info.keys():
if type(data) == dict:
nested_metadata = {key: self._get_metadata(data[key]) for key in data.keys()}
self.output_channels_info.update({channel: nested_metadata})
else:
self.output_channels_info.update({channel: self._get_metadata(data)})
def _get_metadata(self, data):
"""
Internal helper function for getting the data type & dimensions of data.
This is for update_output_channels_info()
"""
data_info = {}
if type(data) == np.ndarray or type(data).__name__ == "DataFrame":
data_info.update({'type': type(data).__name__, 'shape': data.shape})
elif type(data) == list:
data_info.update({'type': type(data).__name__, 'len': len(data)})
else:
data_info.update({'type': type(data).__name__})
return data_info
[docs] def handle_process_data(self, message):
"""
Internal method to handle incoming message before calling user-defined on_received_message method.
If current_state is either Stop or Reset, it will terminate early before entering on_received_message
"""
if self.current_state == "Stop" or self.current_state == "Reset":
return 0
# LOGGING
try:
self.log_info("Received: " + str(message))
except Exception as e:
print(e)
# process the received data here
start_time_pack = time.time()
self.on_received_message(message)
end_time_pack = time.time()
self.log_info("Tproc: " + str(round(end_time_pack - start_time_pack, 6)))
[docs] def bind_output(self, output_agent):
"""
Forms Output connection with another agent. Any call on send_output will reach this newly binded agent
Adds the agent to its list of Outputs.
Parameters
----------
output_agent : AgentMET4FOF or list
Agent(s) to be binded to this agent's output channel
"""
if isinstance(output_agent, list):
for agent in output_agent:
self._bind_output(agent)
else:
self._bind_output(output_agent)
def _bind_output(self, output_agent):
"""
Internal method which implements the logic for connecting this agent, to the `output_agent`.
"""
if type(output_agent) == str:
output_module_id = output_agent
else:
output_module_id = output_agent.get_attr('name')
if output_module_id not in self.Outputs and output_module_id != self.name:
# update self.Outputs list and Inputs list of output_module
self.Outputs.update({output_agent.get_attr('name'): output_agent})
temp_updated_inputs = output_agent.get_attr('Inputs')
temp_updated_inputs.update({self.name: self})
output_agent.set_attr(Inputs=temp_updated_inputs)
if self.backend == "osbrain":
# bind to the address
if output_agent.has_socket(self.PubAddr_alias):
output_agent.subscribe(self.PubAddr_alias, handler={'data': AgentMET4FOF.handle_process_data})
else:
output_agent.connect(self.PubAddr, alias=self.PubAddr_alias,
handler={'data': AgentMET4FOF.handle_process_data})
# LOGGING
if self.log_mode:
self.log_info("Connected output module: " + output_module_id)
[docs] def unbind_output(self, output_agent):
"""
Remove existing output connection with another agent. This reverses the bind_output method
Parameters
----------
output_agent : AgentMET4FOF
Agent binded to this agent's output channel
"""
if type(output_agent) == str:
module_id = output_agent
else:
module_id = output_agent.get_attr('name')
if module_id in self.Outputs and module_id != self.name:
self.Outputs.pop(module_id, None)
new_inputs = output_agent.get_attr('Inputs')
new_inputs.pop(self.name, None)
output_agent.set_attr(Inputs=new_inputs)
if self.backend == "osbrain":
output_agent.unsubscribe(self.PubAddr_alias, 'data')
# LOGGING
if self.log_mode:
self.log_info("Disconnected output module: " + module_id)
def _convert_to_plotly(self, matplotlib_fig: matplotlib.figure.Figure):
"""
Internal method to convert matplotlib figure to plotly figure
Parameters
----------
matplotlib_fig: plt.Figure
Matplotlib figure to be converted
"""
# convert to plotly format
matplotlib_fig.tight_layout()
plotly_fig = tls.mpl_to_plotly(matplotlib_fig)
plotly_fig['layout']['showlegend'] = True
return plotly_fig
def _fig_to_uri(self, matplotlib_fig: matplotlib.figure.Figure):
"""
Internal method to convert matplotlib figure to base64 uri image for display
Parameters
----------
matplotlib_fig : plt.Figure
Matplotlib figure to be converted
"""
out_img = BytesIO()
matplotlib_fig.savefig(out_img, format='png')
matplotlib_fig.clf()
plt.close(matplotlib_fig)
out_img.seek(0) # rewind file
encoded = base64.b64encode(out_img.read()).decode("ascii").replace("\n", "")
return "data:image/png;base64,{}".format(encoded)
def _convert_matplotlib_fig(self, fig: matplotlib.figure.Figure, mode: str = "image"):
"""
Internal method to convert matplotlib figure which can be rendered by the dashboard.
"""
error_msg = "Conversion mode " + mode + " is not implemented."
if mode == "plotly":
fig = self._convert_to_plotly(fig)
elif mode == "image":
fig = self._fig_to_uri(fig)
elif mode == "mpld3":
fig = mpld3.fig_to_dict(fig)
else:
raise NotImplementedError(error_msg)
return fig
[docs] def send_plot(self, fig: Union[matplotlib.figure.Figure, Dict[str, matplotlib.figure.Figure]], mode: str = "image"):
"""
Sends plot to agents connected to this agent's Output channel.
This method is different from send_output which will be sent to through the
'plot' channel to be handled.
Tradeoffs between "image" and "plotly" modes are that "image" are more stable and "plotly" are interactive.
Note not all (complicated) matplotlib figures can be converted into a plotly figure.
Parameters
----------
fig : matplotlib.figure.Figure or dict of matplotlib.figure.Figure
Alternatively, multiple figures can be nested in a dict (with any preferred keys) e.g {"Temperature":matplotlib.Figure, "Acceleration":matplotlib.Figure}
mode : str
"image" - converts into image via encoding at base64 string.
"plotly" - converts into plotly figure using `mpl_to_plotly`
Default: "image"
Returns
-------
graph : str or plotly figure or dict of one of those converted figure(s)
"""
if isinstance(fig, matplotlib.figure.Figure):
graph = {"mode": mode, "fig": self._convert_matplotlib_fig(fig, mode)}
elif isinstance(fig, dict): # nested
for key in fig.keys():
fig[key] = self._convert_matplotlib_fig(fig[key], mode)
graph = {"mode": mode, "fig": list(fig.values())}
elif isinstance(fig, list):
graph = {"mode": mode, "fig": [self._convert_matplotlib_fig(fig_, mode) for fig_ in fig]}
else:
graph = {"mode": mode, "fig": fig}
self.send_output(graph, channel="plot")
return graph
def get_all_attr(self):
_all_attr = self.__dict__
excludes = ["Inputs", "Outputs", "memory", "PubAddr_alias", "PubAddr", "states", "log_mode", "get_all_attr",
"plots", "name", "agent_loop"]
filtered_attr = {key: val for key, val in _all_attr.items() if key.startswith('_') is False}
filtered_attr = {key: val for key, val in filtered_attr.items() if
key not in excludes and type(val).__name__ != 'function'}
filtered_attr = {key: val if (type(val) == float or type(val) == int or type(
val) == str or key == 'output_channels_info') else str(val) for key, val in filtered_attr.items()}
filtered_attr = {key: val for key, val in filtered_attr.items() if "object" not in str(val)}
return filtered_attr
[docs] def shutdown(self):
if self.backend == "osbrain":
osBrainAgent.shutdown(self)
elif self.backend == "mesa":
self.mesa_model.schedule.remove(self)
del self
[docs]class AgentBuffer():
"""
Buffer class which is instantiated in every agent to store data incrementally.
This buffer is necessary to handle multiple inputs coming from agents.
The buffer can be a dict of iterables, or a dict of dict of iterables for nested named data.
The keys are the names of agents.
We can access the buffer like a dict with exposed functions such as .values(), .keys() and .items(),
The actual dict object is stored in the variable `self.buffer`
"""
def __init__(self, buffer_size=1000):
"""
Parameters
----------
buffer_size: int
Length of buffer allowed.
"""
self.buffer = {}
self.buffer_size = buffer_size
self.supported_datatype = (list, pd.DataFrame, np.ndarray)
def __getitem__(self, key):
return self.buffer[key]
[docs] def check_supported_datatype(self, value):
"""
Checks whether `value` is one of the supported data types.
Parameters
----------
value : iterable
Value to be checked.
Returns
------
result : boolean
"""
for supported_datatype in self.supported_datatype:
if isinstance(value, supported_datatype):
return True
return False
[docs] def update(self, agent_from: str, data):
"""
Overrides data in the buffer dict keyed by `agent_from` with value `data`
If `data` is a single value, this converts it into a list first before storing in the buffer dict.
"""
# handle if data type nested in dict
if isinstance(data, dict):
# check for each value datatype
for key, value in data.items():
# if the value is not list types, turn it into a list of single value i.e [value]
if not self.check_supported_datatype(value):
data[key] = [value]
elif not self.check_supported_datatype(data):
data = [data]
self.buffer.update({agent_from: data})
return self.buffer
def _concatenate(self, iterable, data, concat_axis=0):
"""
Concatenate the given `iterable`, with `data`.
Handles the concatenation function depending on the datatype, and truncates it if the buffer is filled to `buffer_size`.
Parameters
----------
iterable : any in supported_datatype
The current buffer to be concatenated with.
data : any in supported_datatype
New incoming data
"""
# handle list
if isinstance(iterable, list):
iterable += data
# check if exceed memory buffer size, remove the first n elements which exceeded the size
if len(iterable) > self.buffer_size:
truncated_element_index = len(iterable) - self.buffer_size
iterable = iterable[truncated_element_index:]
# handle if data type is np.ndarray
elif isinstance(iterable, np.ndarray):
iterable = np.concatenate((iterable, data),axis=concat_axis)
if len(iterable) > self.buffer_size:
truncated_element_index = len(iterable) - self.buffer_size
iterable = iterable[truncated_element_index:]
# handle if data type is pd.DataFrame
elif isinstance(iterable, pd.DataFrame):
iterable = pd.concat([iterable,data], ignore_index=True, axis=concat_axis)
if len(iterable) > self.buffer_size:
truncated_element_index = len(iterable) - self.buffer_size
iterable = iterable.truncate(before=truncated_element_index)
return iterable
[docs] def buffer_filled(self, agent_from=None):
"""
Checks whether buffer is filled, by comparing against the `buffer_size`.
Parameters
----------
agent_from : str
Name of input agent in the buffer dict to be looked up for.
If `agent_from` is not provided, we check for all iterables in the buffer.
For nested dict, this returns true for any iterable which is beyond the `buffer_size`.
"""
if agent_from is None:
return any([self._iterable_filled(iterable) for iterable in self.buffer.values()])
elif isinstance(self.buffer[agent_from], dict):
return any([self._iterable_filled(iterable) for iterable in self.buffer[agent_from].values()])
else:
return self._iterable_filled(self.buffer[agent_from])
def _iterable_filled(self, iterable):
"""
Internal method for checking on length of iterable.
"""
if self.check_supported_datatype(iterable):
if len(iterable) >= self.buffer_size:
return True
else:
return False
[docs] def popleft(self, n=1):
"""
Pops the first n entries in the buffer.
"""
popped_buffer = copy.copy(self.buffer)
remaining_buffer = copy.copy(self.buffer)
if isinstance(popped_buffer, dict):
for key in popped_buffer.keys():
popped_buffer[key], remaining_buffer[key] = self._popleft(popped_buffer[key], n)
else:
popped_buffer, remaining_buffer = self._popleft(popped_buffer, n)
self.buffer = remaining_buffer
return popped_buffer
def _popleft(self, iterable, n=1):
"""
Internal method to handle the actual popping mechanism based on the type of iterable.
"""
popped_item = 0
if isinstance(iterable, list):
popped_item = iterable[:n]
iterable = iterable[n:]
elif isinstance(iterable, np.ndarray):
popped_item = iterable[:n]
iterable = iterable[n:]
elif isinstance(iterable, pd.DataFrame):
popped_item = iterable.iloc[:n]
iterable = iterable.iloc[n:]
return popped_item, iterable
[docs] def clear(self, agent_from=None):
"""
Clears the data in the buffer. if `agent_from` is not given, the entire buffer is removed.
agent_from : str
Name of agent
"""
if agent_from is None:
del self.buffer
self.buffer = {}
else:
del self.buffer[agent_from]
[docs] def store(self, agent_from, data=None, concat_axis=0):
"""
Stores data into `self.buffer` with the received message
Checks if sender agent has sent any message before
If it did, then append, otherwise create new entry for it
Parameters
----------
agent_from : dict | str
if type is dict, we expect it to be the agentMET4FOF dict message to be
compliant with older code otherwise, we expect it to be name of agent
sender and `data` will need to be passed as parameter
data
optional if agent_from is a dict. Otherwise this parameter is compulsory.
Any supported data which can be stored in dict as buffering.
concat_axis : int
optional axis to concatenate on with the buffering for numpy arrays.
Default is 0.
"""
# if first argument is the agentMET4FOF dict message
if isinstance(agent_from, dict):
message = agent_from
# otherwise, we expect the name of agent_sender and the data to be passed
else:
message = {"from": agent_from, "data": data}
# store into a separate variables, it will be used frequently later for the type checks
message_from = message["from"]
message_data = message["data"]
# check if sender agent has sent any message before:
# if it did,then append, otherwise create new entry for the input agent
if message_from not in self.buffer:
self.update(message_from, message_data)
return 0
# otherwise 'sender' exists in memory, handle appending
# acceptable data types : list, dict, ndarray, dataframe, single values
# handle nested data in dict
if isinstance(message_data, dict):
for key, value in message_data.items():
# if it is a single value, then we convert it into a single element list
if not self.check_supported_datatype(value):
value = [value]
# check if the key exist
# if it does, then append
if key in self.buffer[agent_from].keys():
self.buffer[agent_from][key] = self._concatenate(self.buffer[agent_from][key], value,concat_axis)
# otherwise, create new entry
else:
self.buffer[agent_from].update({key: value})
else:
if not self.check_supported_datatype(message_data):
message_data = [message_data]
self.buffer[agent_from] = self._concatenate(self.buffer[agent_from], message_data,concat_axis)
[docs] def values(self):
"""
Interface to access the internal dict's values()
"""
return self.buffer.values()
[docs] def items(self):
"""
Interface to access the internal dict's items()
"""
return self.buffer.items()
[docs] def keys(self):
"""
Interface to access the internal dict's keys()
"""
return self.buffer.keys()
class _AgentController(AgentMET4FOF):
"""
Unique internal agent to provide control to other agents. Automatically instantiated when starting server.
Provides global control to all agents in network.
"""
def init_parameters(self, ns=None, backend='osbrain', mesa_model=""):
self.backend = backend
self.states = {0: "Idle", 1: "Running", 2: "Pause", 3: "Stop"}
self.current_state = "Idle"
self.ns = ns
self.G = nx.DiGraph()
self._logger = None
self.coalitions = []
if backend == "mesa":
self.mesa_model = mesa_model
def start_mesa_timer(self, mesa_update_interval):
class RepeatTimer():
def __init__(self, t, repeat_function):
self.t = t
self.repeat_function = repeat_function
self.thread = Timer(self.t, self.handle_function)
def handle_function(self):
self.repeat_function()
self.thread = Timer(self.t, self.handle_function)
self.thread.start()
def start(self):
self.thread.start()
def cancel(self):
self.thread.cancel()
self.mesa_update_interval = mesa_update_interval
self.mesa_timer = RepeatTimer(t=mesa_update_interval, repeat_function=self.mesa_model.step)
self.mesa_timer.start()
def stop_mesa_timer(self):
if self.mesa_timer:
self.mesa_timer.cancel()
del self.mesa_timer
def step_mesa_model(self):
self.mesa_model.step()
def get_mesa_model(self):
return self.mesa_model
def get_agent(self, agentName=""):
if self.backend == "osbrain":
return self.ns.proxy(agentName)
elif self.backend == "mesa":
return self.mesa_model.get_agent(agentName)
def get_agentType_count(self, agentType):
num_count = 1
agentType_name = str(agentType.__name__)
agent_names = self.agents()
if len(agent_names) != 0:
for agentName in agent_names:
current_agent_type = self.get_agent(agentName).get_attr('AgentType')
if current_agent_type == agentType_name:
num_count += 1
return num_count
def get_agent_name_count(self, new_agent_name):
num_count = 1
agent_names = self.agents()
if len(agent_names) != 0:
for agentName in agent_names:
if new_agent_name in agentName:
num_count += 1
return num_count
def generate_module_name_byType(self, agentType):
name = agentType.__name__
name += "_" + str(self.get_agentType_count(agentType))
return name
def generate_module_name_byUnique(self, agent_name):
name = agent_name
agent_copy_count = self.get_agent_name_count(agent_name) # number of agents with same name
if agent_copy_count > 1:
name += "(" + str(self.get_agent_name_count(agent_name)) + ")"
return name
def add_agent(self, name=" ", agentType=AgentMET4FOF, log_mode=True, buffer_size=1000, ip_addr=None, loop_wait=None,
**kwargs):
try:
if ip_addr is None:
ip_addr = 'localhost'
if name == " ":
new_name = self.generate_module_name_byType(agentType)
else:
new_name = self.generate_module_name_byUnique(name)
# actual instantiation of agent, depending on backend
if self.backend == "osbrain":
new_agent = self._add_osbrain_agent(name=new_name, agentType=agentType, log_mode=log_mode,
buffer_size=buffer_size, ip_addr=ip_addr, loop_wait=loop_wait,
**kwargs)
elif self.backend == "mesa":
# handle osbrain and mesa here
new_agent = self._add_mesa_agent(name=new_name, agentType=agentType, buffer_size=buffer_size,
log_mode=log_mode, **kwargs)
return new_agent
except Exception as e:
self.log_info("ERROR:" + str(e))
def _add_osbrain_agent(self, name=" ", agentType=AgentMET4FOF, log_mode=True, buffer_size=1000, ip_addr=None,
loop_wait=None, **kwargs):
new_agent = run_agent(name, base=agentType, attributes=dict(log_mode=log_mode, buffer_size=buffer_size),
nsaddr=self.ns.addr(), addr=ip_addr)
new_agent.init_parameters(**kwargs)
new_agent.init_agent(buffer_size=buffer_size, log_mode=log_mode)
new_agent.init_agent_loop(loop_wait)
if log_mode:
new_agent.set_logger(self._get_logger())
return new_agent
def _add_mesa_agent(self, name=" ", agentType=AgentMET4FOF, log_mode=True, buffer_size=1000, **kwargs):
new_agent = agentType(name=name, backend=self.backend, mesa_model=self.mesa_model)
new_agent.init_parameters(**kwargs)
new_agent.init_agent(buffer_size=buffer_size, log_mode=log_mode)
new_agent = self.mesa_model.add_agent(new_agent)
return new_agent
def get_agents_stylesheets(self, agent_names):
# for customising display purposes in dashboard
agents_stylesheets = []
for agent in agent_names:
try:
stylesheet = self.get_agent(agent).get_attr("stylesheet")
agents_stylesheets.append({"stylesheet": stylesheet})
except Exception as e:
self.log_info("Error:" + str(e))
return agents_stylesheets
def agents(self, exclude_names=["AgentController", "Logger"]):
if self.backend == "osbrain":
agent_names = [name for name in self.ns.agents() if name not in exclude_names]
else:
agent_names = self.mesa_model.agents()
return agent_names
def update_networkx(self):
agent_names = self.agents()
edges = self.get_latest_edges(agent_names)
if len(agent_names) != self.G.number_of_nodes() or len(edges) != self.G.number_of_edges():
agent_stylesheets = self.get_agents_stylesheets(agent_names)
new_G = nx.DiGraph()
new_G.add_nodes_from(list(zip(agent_names, agent_stylesheets)))
new_G.add_edges_from(edges)
self.G = new_G
def get_networkx(self):
return (self.G)
def get_latest_edges(self, agent_names):
edges = []
for agent_name in agent_names:
temp_agent = self.get_agent(agent_name)
temp_output_connections = list(temp_agent.get_attr('Outputs').keys())
for output_connection in temp_output_connections:
edges += [(agent_name, output_connection)]
return edges
def _get_logger(self):
"""
Internal method to access the Logger relative to the nameserver
"""
if self._logger is None:
self._logger = self.ns.proxy('Logger')
return self._logger
def add_coalition(self, new_coalition):
"""
Instantiates a coalition of agents.
"""
self.coalitions.append(new_coalition)
return new_coalition
[docs]class MesaModel(Model):
"""A MESA Model"""
def __init__(self):
self.schedule = BaseScheduler(self)
def add_agent(self, agent: MesaAgent):
self.schedule.add(agent)
return agent
def get_agent(self, agentName: str):
agent = next((x for x in self.schedule.agents if x.name == agentName), None)
return agent
[docs] def step(self):
'''Advance the model by one step.'''
self.schedule.step()
def agents(self):
return [agent.name for agent in self.schedule.agents]
[docs] def shutdown(self):
"""Shutdown entire MESA model with all agents and schedulers"""
for agent in self.agents():
agent_obj = self.get_agent(agent)
agent_obj.shutdown()
[docs]class AgentNetwork:
"""
Object for starting a new Agent Network or connect to an existing Agent Network specified by ip & port
Provides function to add agents, (un)bind agents, query agent network state, set global agent states
Interfaces with an internal _AgentController which is hidden from user
"""
def __init__(self, ip_addr="127.0.0.1", port=3333, connect=False, log_filename="log_file.csv",
dashboard_modules=True, dashboard_extensions=[], dashboard_update_interval=3,
dashboard_max_monitors=10, dashboard_port=8050, backend="osbrain", mesa_update_interval=0.1):
"""
Parameters
----------
ip_addr: str
Ip address of server to connect/start
port: int
Port of server to connect/start
connect: bool
False sets Agent network to connect mode and will connect to specified address
True (Default) sets Agent network to initially try to connect and if it cant find one, it will start a new server at specified address
log_filename: str
Name of log file, acceptable csv format. It will be saved locally, in the same folder as the python script in which this AgentNetwork is instantiated on.
If set to None or False, then will not save in a file. Note that the overhead of updating the log file can be huge, especially for high number of agents and large data transmission.
dashboard_modules : list of modules , modules or bool
Accepts list of modules which contains the AgentMET4FOF and DataStreamMET4FOF derived classes
If set to True, will initiate the dashboard with default agents in AgentMET4FOF
dashboard_update_interval : int
Regular interval (seconds) to update the dashboard graphs
dashboard_max_monitors : int
Due to complexity in managing and instantiating dynamic figures, a maximum number of monitors is specified first and only the each Monitor Agent will occupy one of these figures.
dashboard_port: int
Port of the dashboard to be hosted on. By default is port 8050.
"""
self.backend = backend
self.ip_addr = ip_addr
self.port = port
self._controller = None
self._logger = None
self.log_filename = log_filename
self.mesa_update_interval = mesa_update_interval
if connect:
self.is_parent_mesa = False
else:
self.is_parent_mesa = True
if type(self.log_filename) == str and '.csv' in self.log_filename:
self.save_logfile = True
else:
self.save_logfile = False
# handle different choices of backends
if self.backend == "osbrain":
if connect:
self.connect(ip_addr, port, verbose=False)
else:
self.connect(ip_addr, port, verbose=False)
if self.ns == 0:
self.start_server_osbrain(ip_addr, port)
elif self.backend == "mesa":
self.start_server_mesa()
else:
raise NotImplementedError("Backend has not been implemented. Valid choices are 'osbrain' and 'mesa'.")
if isinstance(dashboard_extensions, list) == False:
dashboard_extensions = [dashboard_extensions]
# handle instantiating the dashboard
# if dashboard_modules is False, the dashboard will not be launched
if dashboard_modules is not False:
# Initialize common dashboard parameters for both types of dashboards
# corresponding to different backends.
dashboard_params = {
"dashboard_modules": dashboard_modules,
"dashboard_layouts": [Dashboard_agt_net] + dashboard_extensions,
"dashboard_update_interval": dashboard_update_interval,
"max_monitors": dashboard_max_monitors,
"ip_addr": ip_addr,
"port": dashboard_port,
"agentNetwork": self,
}
# Initialize dashboard process/thread.
if self.backend == "osbrain":
from .dashboard.Dashboard import AgentDashboardProcess
self.dashboard_proc = AgentDashboardProcess(**dashboard_params)
elif self.backend == "mesa":
from .dashboard.Dashboard import AgentDashboardThread
self.dashboard_proc = AgentDashboardThread(**dashboard_params)
self.dashboard_proc.start()
else:
self.dashboard_proc = None
[docs] def connect(self, ip_addr="127.0.0.1", port=3333, verbose=True):
"""
Only for osbrain backend. Connects to an existing AgentNetwork.
Parameters
----------
ip_addr: str
IP Address of server to connect to
port: int
Port of server to connect to
"""
try:
self.ns = NSProxy(nsaddr=ip_addr + ':' + str(port))
except:
if verbose:
print("Unable to connect to existing NameServer...")
self.ns = 0
[docs] def start_server_osbrain(self, ip_addr="127.0.0.1", port=3333):
"""
Only for osbrain backend. Starts a new AgentNetwork.
Parameters
----------
ip_addr: str
IP Address of server to start
port: int
Port of server to start
"""
print("Starting NameServer...")
self.ns = run_nameserver(addr=ip_addr + ':' + str(port))
if len(self.ns.agents()) != 0:
self.ns.shutdown()
self.ns = run_nameserver(addr=ip_addr + ':' + str(port))
self.controller = run_agent("AgentController", base=_AgentController, attributes=dict(log_mode=True),
nsaddr=self.ns.addr(), addr=ip_addr)
self.logger = run_agent("Logger", base=_Logger, nsaddr=self.ns.addr())
self.controller.init_parameters(ns=self.ns, backend=self.backend)
self.logger.init_parameters(log_filename=self.log_filename, save_logfile=self.save_logfile)
[docs] def start_server_mesa(self):
"""
Handles the initialisation for backend == "mesa".
Involves spawning two nested objects : MesaModel and AgentController
"""
self.mesa_model = MesaModel()
self._controller = _AgentController(name="AgentController", backend=self.backend)
self._controller.init_parameters(backend=self.backend, mesa_model=self.mesa_model)
self.start_mesa_timer(self.mesa_update_interval)
def _set_mode(self, state):
"""
Internal method to set mode of Agent Controller
Parameters
----------
state: str
State of AgentController to set.
"""
self._get_controller().set_attr(current_state=state)
def _get_mode(self):
"""
Returns
-------
state: str
State of Agent Network
"""
return self._get_controller().get_attr('current_state')
[docs] def get_mode(self):
"""
Returns
-------
state: str
State of Agent Network
"""
return self._get_controller().get_attr('current_state')
[docs] def set_running_state(self, filter_agent=None):
"""
Blanket operation on all agents to set their `current_state` attribute to "Running"
Users will need to define their own flow of handling each type of `self.current_state` in the `agent_loop`
Parameters
----------
filter_agent : str
(Optional) Filter name of agents to set the states
"""
self.set_agents_state(filter_agent=filter_agent, state="Running")
def update_networkx(self):
self._get_controller().update_networkx()
def get_networkx(self):
return self._get_controller().get_attr('G')
def get_nodes_edges(self):
G = self.get_networkx()
return G.nodes, G.edges
def get_nodes(self):
G = self.get_networkx()
return G.nodes
def get_edges(self):
G = self.get_networkx()
return G.edges
[docs] def set_stop_state(self, filter_agent=None):
"""
Blanket operation on all agents to set their `current_state` attribute to "Stop"
Users will need to define their own flow of handling each type of `self.current_state` in the `agent_loop`
Parameters
----------
filter_agent : str
(Optional) Filter name of agents to set the states
"""
self.set_agents_state(filter_agent=filter_agent, state="Stop")
[docs] def set_agents_state(self, filter_agent=None, state="Idle"):
"""
Blanket operation on all agents to set their `current_state` attribute to given state
Can be used to define different states of operation such as "Running", "Idle, "Stop", etc..
Users will need to define their own flow of handling each type of `self.current_state` in the `agent_loop`
Parameters
----------
filter_agent : str
(Optional) Filter name of agents to set the states
state : str
State of agents to set
"""
self._set_mode(state)
for agent_name in self.agents():
if (filter_agent is not None and filter_agent in agent_name) or (filter_agent is None):
agent = self.get_agent(agent_name)
try:
agent.set_attr(current_state=state)
except Exception as e:
print(e)
print("SET STATE: ", state)
return 0
def reset_agents(self):
for agent_name in self.agents():
agent = self.get_agent(agent_name)
agent.reset()
agent.set_attr(current_state="Reset")
self._set_mode("Reset")
return 0
def remove_agent(self, agent):
if type(agent) == str:
agent_proxy = self.get_agent(agent)
else:
agent_proxy = agent
for input_agent in agent_proxy.get_attr("Inputs"):
self.get_agent(input_agent).unbind_output(agent_proxy)
for output_agent in agent_proxy.get_attr("Outputs"):
agent_proxy.unbind_output(self.get_agent(output_agent))
agent_proxy.shutdown()
[docs] def bind_agents(self, source, target):
"""
Binds two agents communication channel in a unidirectional manner from `source` Agent to `target` Agent
Any subsequent calls of `source.send_output()` will reach `target` Agent's message queue.
Parameters
----------
source : AgentMET4FOF
Source agent whose Output channel will be binded to `target`
target : AgentMET4FOF
Target agent whose Input channel will be binded to `source`
"""
source.bind_output(target)
return 0
[docs] def unbind_agents(self, source, target):
"""
Unbinds two agents communication channel in a unidirectional manner from `source` Agent to `target` Agent
This is the reverse of `bind_agents()`
Parameters
----------
source : AgentMET4FOF
Source agent whose Output channel will be unbinded from `target`
target : AgentMET4FOF
Target agent whose Input channel will be unbinded from `source`
"""
source.unbind_output(target)
return 0
def _get_controller(self):
"""
Internal method to access the AgentController relative to the nameserver
"""
if self.backend == "osbrain":
if self._controller is None:
self._controller = self.ns.proxy('AgentController')
return self._controller
def _get_logger(self):
"""
Internal method to access the Logger relative to the nameserver
"""
if self._logger is None:
self._logger = self.ns.proxy('Logger')
return self._logger
[docs] def get_agent(self, agent_name):
"""
Returns a particular agent connected to Agent Network.
Parameters
----------
agent_name : str
Name of agent to search for in the network
"""
return self._get_controller().get_agent(agent_name)
[docs] def agents(self, filter_agent=None):
"""
Returns all agent names connected to Agent Network.
Returns
-------
list : names of all agents
"""
agent_names = self._get_controller().agents()
if filter_agent is not None:
agent_names = [agent_name for agent_name in agent_names if filter_agent in agent_name]
return agent_names
[docs] def add_agent(self, name=" ", agentType=AgentMET4FOF, log_mode=True, buffer_size=1000, ip_addr=None, loop_wait=None,
**kwargs):
"""
Instantiates a new agent in the network.
Parameters
----------
name str : (Optional) Unique name of agent. here cannot be more than one agent
with the same name. Defaults to the agent's class name.
agentType AgentMET4FOF : (Optional) Agent class to be instantiated in the
network. Defaults to :py:class:`AgentMET4FOF`
log_mode bool : (Optional) Determines if messages will be logged to background
Logger Agent. Defaults to `True`.
Returns
-------
AgentMET4FOF : Newly instantiated agent
"""
if ip_addr is None:
ip_addr = self.ip_addr
agent = self._get_controller().add_agent(name=name, agentType=agentType, log_mode=log_mode,
buffer_size=buffer_size, ip_addr=ip_addr, loop_wait=loop_wait,
**kwargs)
return agent
[docs] def add_coalition(self, name="Coalition_1", agents=[]):
"""
Instantiates a coalition of agents.
"""
new_coalition = Coalition(name, agents)
self._get_controller().add_coalition(new_coalition)
return new_coalition
@property
def coalitions(self):
return self._get_controller().get_attr("coalitions")
def get_mesa_model(self):
return self.mesa_model
[docs] def shutdown(self):
"""Shuts down the entire agent network and all agents"""
# Shutdown the nameserver.
# This leaves some process clutter in the process list, but the actual
# processes are ended.
if self.backend == "osbrain":
self._get_controller().get_attr('ns').shutdown()
elif self.backend == "mesa":
self._get_controller().stop_mesa_timer()
self.mesa_model.shutdown()
# Shutdown the dashboard if present.
if self.dashboard_proc is not None:
# This calls either the provided method Process.terminate() which
# abruptly stops the running multiprocess.Process in case of the osBrain
# backend or the self-written method in the class AgentDashboardThread
# ensuring the proper termination of the dash.Dash app.
self.dashboard_proc.terminate()
# Then wait for the termination of the actual thread or at least finish the
# execution of the join method in case of the "Mesa" backend. See #163
# for the search for a proper solution to this issue.
self.dashboard_proc.join(timeout=10)
return 0
def start_mesa_timer(self, update_interval):
self._get_controller().start_mesa_timer(update_interval)
def stop_mesa_timer(self):
self._get_controller().stop_mesa_timer()
def step_mesa_model(self):
self._get_controller().step_mesa_model()
class Coalition():
def __init__(self, name="Coalition", agents=[]):
self.agents = agents
self.name = name
def agent_names(self):
return [agent.get_attr("name") for agent in self.agents]
[docs]class DataStreamAgent(AgentMET4FOF):
"""
Able to simulate generation of datastream by loading a given DataStreamMET4FOF object.
Can be used in incremental training or batch training mode.
To simulate batch training mode, set `pretrain_size=-1` , otherwise, set pretrain_size and batch_size for the respective
See `DataStreamMET4FOF` on loading your own data set as a data stream.
"""
[docs] def init_parameters(self, stream=DataStreamMET4FOF(), pretrain_size=None, batch_size=1, loop_wait=1,
randomize=False):
"""
Parameters
----------
stream : DataStreamMET4FOF
A DataStreamMET4FOF object which provides the sample data
pretrain_size : int
The number of sample data to send through in the first loop cycle, and subsequently, the batch_size will be used
batch_size : int
The number of sample data to send in every loop cycle
loop_wait : int
The duration to wait (seconds) at the end of each loop cycle before going into the next cycle
randomize : bool
Determines if the dataset should be shuffled before streaming
"""
self.stream = stream
self.stream.prepare_for_use()
if randomize:
self.stream.randomize_data()
self.batch_size = batch_size
if pretrain_size is None:
self.pretrain_size = batch_size
else:
self.pretrain_size = pretrain_size
self.pretrain_done = False
self.loop_wait = loop_wait
[docs] def agent_loop(self):
if self.current_state == "Running":
if self.pretrain_size is None:
self.send_next_sample(self.batch_size)
elif self.pretrain_size == -1 or self.batch_size == -1:
self.send_all_sample()
self.pretrain_done = True
else:
# handle pre-training mode
if self.pretrain_done:
self.send_next_sample(self.batch_size)
else:
self.send_next_sample(self.pretrain_size)
self.pretrain_done = True
def send_next_sample(self, num_samples=1):
if self.stream.has_more_samples():
data = self.stream.next_sample(num_samples)
self.log_info("DATA SAMPLE ID: " + str(self.stream.sample_idx))
self.send_output(data)
[docs] def reset(self):
super(DataStreamAgent, self).reset()
self.stream.reset()
def send_all_sample(self):
self.send_next_sample(-1)
[docs]class MonitorAgent(AgentMET4FOF):
"""
Unique Agent for storing plots and data from messages received from input agents.
The dashboard searches for Monitor Agents' `memory` and `plots` to draw the graphs
"plot" channel is used to receive base64 images from agents to plot on dashboard
Attributes
----------
memory : dict
Dictionary of format `{agent1_name : agent1_data, agent2_name : agent2_data}`
plots : dict
Dictionary of format `{agent1_name : agent1_plot, agent2_name : agent2_plot}`
plot_filter : list of str
List of keys to filter the 'data' upon receiving message to be saved into memory
Used to specifically select only a few keys to be plotted
"""
[docs] def init_parameters(self, plot_filter=[], custom_plot_function=-1, *args, **kwargs):
self.memory = {}
self.plots = {}
self.plot_filter = plot_filter
self.custom_plot_function = custom_plot_function
self.custom_plot_parameters = kwargs
[docs] def on_received_message(self, message):
"""
Handles incoming data from 'default' and 'plot' channels.
Stores 'default' data into `self.memory` and 'plot' data into `self.plots`
Parameters
----------
message : dict
Acceptable channel values are 'default' or 'plot'
"""
if message['channel'] == 'default':
if self.plot_filter != []:
message['data'] = {key: message['data'][key] for key in self.plot_filter}
self.buffer_store(agent_from=message["from"], data=message["data"])
elif message['channel'] == 'plot':
self.update_plot_memory(message)
return 0
[docs] def update_plot_memory(self, message):
"""
Updates plot figures stored in `self.plots` with the received message
Parameters
----------
message : dict
Standard message format specified by AgentMET4FOF class
Message['data'] needs to be base64 image string and can be nested in dictionary for multiple plots
Only the latest plot will be shown kept and does not keep a history of the plots.
"""
if type(message['data']) != dict or message['from'] not in self.plots.keys():
self.plots[message['from']] = message['data']
elif type(message['data']) == dict:
for key in message['data'].keys():
self.plots[message['from']].update({key: message['data'][key]})
self.log_info("PLOTS: " + str(self.plots))
[docs] def reset(self):
super(MonitorAgent, self).reset()
del self.plots
self.plots = {}
class _Logger(AgentMET4FOF):
"""
An internal logger agent which are instantiated immediately with each AgentNetwork.
It collects all the logs which are sent to it, and print them and optionally save them into a csv log file.
Since the user is not expected to directly access the logger agent, its initialisation option and interface are provided via the AgentNetwork object.
When log_info of any agent is called, the agent will send the data to the logger agent.
"""
def init_parameters(self, log_filename="log_file.csv", save_logfile=True):
self.current_log_handlers = {"INFO": self.log_handler}
self.bind('SUB', 'sub', {"INFO": self.log_handler})
self.log_filename = log_filename
self.save_logfile = save_logfile
if self.save_logfile:
try:
# writes a new file
self.writeFile = open(self.log_filename, 'w', newline='')
writer = csv.writer(self.writeFile)
writer.writerow(['Time', 'Name', 'Topic', 'Data'])
# set to append mode
self.writeFile = open(self.log_filename, 'a', newline='')
except:
raise Exception
self.save_cycles = 0
@property
def subscribed_topics(self):
return list(self.current_log_handlers.keys())
def bind_log_handler(self, log_handler_functions):
for topic in self.subscribed_topics:
self.unsubscribe('sub', topic)
self.current_log_handlers.update(log_handler_functions)
self.subscribe('sub', self.current_log_handlers)
def log_handler(self, message, topic):
sys.stdout.write(message + '\n')
sys.stdout.flush()
self.save_log_info(str(message))
def save_log_info(self, log_msg):
re_sq = r'\[(.*?)\]'
re_rd = r'\((.*?)\)'
date = re.findall(re_sq, log_msg)[0]
date = "[" + date + "]"
agent_name = re.findall(re_rd, log_msg)[0]
contents = log_msg.split(':')
if len(contents) > 4:
topic = contents[3]
data = str(contents[4:])
else:
topic = contents[3]
data = " "
if self.save_logfile:
try:
# append new row
writer = csv.writer(self.writeFile)
writer.writerow([str(date), agent_name, topic, data])
if self.save_cycles % 15 == 0:
self.writeFile.close()
self.writeFile = open(self.log_filename, 'a', newline='')
self.save_cycles += 1
except:
raise Exception
[docs]class SineGeneratorAgent(AgentMET4FOF):
"""An agent streaming a sine signal
Takes samples from the :py:mod:`SineGenerator` and pushes them sample by sample
to connected agents via its output channel.
"""
[docs] def init_parameters(self, sfreq=500, sine_freq=5):
"""Initialize the input data
Initialize the input data stream as an instance of the :class:`SineGenerator`
class.
Parameters
----------
sfreq : int
sampling frequency for the underlying signal
sine_freq : float
frequency of the generated sine wave
"""
self._sine_stream = SineGenerator(sfreq=sfreq, sine_freq=sine_freq)
[docs] def agent_loop(self):
"""Model the agent's behaviour
On state *Running* the agent will extract sample by sample the input data
streams content and push it via invoking :meth:`AgentMET4FOF.send_output`.
"""
if self.current_state == "Running":
sine_data = self._sine_stream.next_sample() # dictionary
self.send_output(sine_data["quantities"])