# -*- coding: utf-8 -*-
import socket
from threading import Thread
from time import sleep
from wsgiref.simple_server import make_server
import dash
from dash import dcc, html
from multiprocess.context import Process
from .Dashboard_Control import _Dashboard_Control
[docs]
class AgentDashboard:
"""
Class for the web dashboard which runs with the AgentNetwork object, which by default are on the same IP.
Optional to run the dashboard on a separate IP by providing the right parameters. See example for an implementation of a separate run of dashboard to connect to an existing agent network. If there is no existing agent network, error will show up.
An internal _Dashboard_Control object is instantiated inside this object, which manages access to the AgentNetwork.
"""
def __init__(
self,
dashboard_modules=[],
dashboard_layouts=[],
dashboard_update_interval=3,
max_monitors=10,
ip_addr="0.0.0.0",
port=8050,
agentNetwork="127.0.0.1",
agent_ip_addr=3333,
agent_port=None,
network_stylesheet=[],
hide_default_edge=True,
**kwargs,
):
"""
Parameters
----------
dashboard_modules : modules
Modules which are separate files, and contain classes of agents to be imported into the dashboard's interactive "Add Agent" function
dashboard_update_interval : int
Auto refresh rate which the dashboard queries the states of Agent Network to update the graphs and display
max_monitors : int
Due to complexity in managing and instantiating dynamic figures, a maximum number of monitors is specified first and only the
each Monitor Agent will occupy one of these figures. It is not ideal, but will undergo changes for the better.
ip_addr : str
IP Address of the dashboard to be instantiated on.
port : int
Port number of the dashboard to be instantianted on.
agentNetwork: AgentNetwork
AgentNetwork object. If `agent_ip_addr` or `agent_port` is provided, this argument will be ignored and try to connect to a new AgentNetwork specified by those arguments.
agent_ip_addr : str
IP Address of the Agent Network address to connect to. By default, it will match that of the ip_addr, assuming the Dashboard and Agent Network are run on the same machine with same IP address.
If they are meant to be separated, then this argument will point to the IP address of where the Agent Network is running.
agent_port : int
Port of the Agent Network port. The rationale is similar to that of the argument `agent_ip_addr`.
"""
super(AgentDashboard, self).__init__()
if self.is_port_at_ip_available(ip_addr, port):
if dashboard_modules is not None and dashboard_modules is not False:
# initialise the dashboard layout and its control here
self.ip_addr = ip_addr
self.port = port
self.external_stylesheets = [
"https://fonts.googleapis.com/icon?family=Material+Icons",
"https://cdnjs.cloudflare.com/ajax/libs/materialize/1.0.0/css/materialize.min.css",
]
self.external_scripts = [
"https://cdnjs.cloudflare.com/ajax/libs/materialize/1.0.0/js/materialize.min.js"
]
self.app = self.init_app_layout(
update_interval_seconds=dashboard_update_interval,
max_monitors=max_monitors,
dashboard_layouts=dashboard_layouts,
network_stylesheet=network_stylesheet,
hide_default_edge=hide_default_edge,
**kwargs,
)
self.app.dashboard_ctrl = _Dashboard_Control(
modules=dashboard_modules,
agent_ip_addr=agent_ip_addr,
agent_port=agent_port,
agentNetwork=agentNetwork,
)
# Spawn a very simple WSGI server.
self._server = make_server(
host=self.ip_addr, port=self.port, app=self.app.server
)
else:
print(
f"Dashboard or something else is running on: {ip_addr}:{port}. If "
f"you cannot access the dashboard in your browser, try initializing "
f"your agent network with any other port AgentNetwork([...], "
f"port=<OTHER_PORT_THAN_{port}>)."
)
[docs]
def run(self):
"""This is actually executed on calling start() and brings up the server"""
if hasattr(self, "_server"):
self._show_startup_message()
self._server.serve_forever()
[docs]
def _show_startup_message(self):
"""This method prints the startup message of the webserver/dashboard"""
ip_to_print = "127.0.0.1" if self.ip_addr == "0.0.0.0" else self.ip_addr
crucial_line = (
f"\n| visit the agentMET4FOF dashboard on http:/"
f"/{ip_to_print}:{self.port}/ |"
)
crucial_line_len = len(crucial_line)
print(
f"\n|-".ljust(crucial_line_len - 1, "-"),
"|\n|".ljust(crucial_line_len, " "),
"|\n"
f"| Your agent network is starting up. Open your browser and".ljust(
crucial_line_len, " "
),
"|",
crucial_line,
"\n|".ljust(crucial_line_len - 1, " "),
f"|" f"\n|-".ljust(crucial_line_len, "-"),
"|\n",
sep="",
)
[docs]
def init_app_layout(
self,
update_interval_seconds=3,
max_monitors=10,
dashboard_layouts=[],
network_stylesheet=[],
hide_default_edge=True,
**kwargs,
):
"""
Initialises the overall dash app "layout" which has two sub-pages (Agent network and ML experiment)
Parameters
----------
update_interval_seconds : float or int
Auto refresh rate which the app queries the states of Agent Network to update the graphs and display
max_monitors : int
Due to complexity in managing and instantiating dynamic figures, a maximum number of monitors is specified first and only the
each Monitor Agent will occupy one of these figures. It is not ideal, but will undergo changes for the better.
Returns
-------
app : Dash app object
"""
app = dash.Dash(
__name__,
external_stylesheets=self.external_stylesheets,
external_scripts=self.external_scripts,
)
app.network_stylesheet = network_stylesheet
app.update_interval_seconds = update_interval_seconds
app.num_monitors = max_monitors
app.hide_default_edge = hide_default_edge
for key in kwargs.keys():
setattr(app, key, kwargs[key])
# initialise dashboard layout objects
self.dashboard_layouts = [
dashboard_layout(app) for dashboard_layout in dashboard_layouts
]
app.layout = html.Div(
children=[
# header
html.Nav(
[
html.Div(
[
html.A(
"Met4FoF Agent Testbed",
className="brand-logo center",
),
html.Ul([], className="right hide-on-med-and-down"),
],
className="nav-wrapper container",
)
],
className="light-blue lighten-1",
),
dcc.Tabs(
id="main-tabs",
value="agt-net",
children=[
dashboard_layout.dcc_tab
for dashboard_layout in self.dashboard_layouts
],
),
html.Div(
id="page-div",
children=[
dashboard_layout.get_layout()
for dashboard_layout in self.dashboard_layouts
],
),
]
)
for dashboard_layout in self.dashboard_layouts:
dashboard_layout.prepare_callbacks(app)
@app.callback(
[dash.dependencies.Output("page-div", "children")],
[dash.dependencies.Input("main-tabs", "value")],
)
def render_content(tab):
for dashboard_layout in self.dashboard_layouts:
if dashboard_layout.id == tab:
return [dashboard_layout.get_layout()]
return app
[docs]
def is_port_at_ip_available(self, ip_addr: str, _port: int) -> bool:
"""Check if desired port at ip is available"""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
# Set timeout to wait for response on ip:port.
sock.settimeout(1)
# Check if connection is possible and shutdown checking connection.
# Presumably this means, that a dashboard is running.
if sock.connect_ex((ip_addr, _port)) == 0:
sock.shutdown(socket.SHUT_RDWR)
return False
# Seems as if, we can actually start our dashboard server.
return True
[docs]
class AgentDashboardProcess(AgentDashboard, Process):
"""Represents an agent dashboard for the osBrain backend"""
[docs]
def terminate(self):
"""This is shutting down the application server serving the web interface"""
super(AgentDashboardProcess, self).terminate()
self._server.server_close()
[docs]
class AgentDashboardThread(AgentDashboard, Thread):
"""Represents an agent dashboard for the Mesa backend"""
def __init__(
self,
dashboard_modules=[],
dashboard_layouts=[],
dashboard_update_interval=3,
max_monitors=10,
ip_addr="127.0.0.1",
port=8050,
agentNetwork="127.0.0.1",
agent_ip_addr=3333,
agent_port=None,
**kwargs,
):
super(AgentDashboardThread, self).__init__(
dashboard_modules=dashboard_modules,
dashboard_layouts=dashboard_layouts,
dashboard_update_interval=dashboard_update_interval,
max_monitors=max_monitors,
ip_addr=ip_addr,
port=port,
agentNetwork=agentNetwork,
agent_ip_addr=agent_ip_addr,
agent_port=agent_port,
**kwargs,
)
# Make sure, we are actually able to stop the server running from outside by
# calling terminate() later.
self._supposed_to_run = True
[docs]
def run(self):
"""This is actually executed on calling start() and brings up the server"""
if hasattr(self, "_server"):
super().run()
# Make sure, we are actually able to stop the server running from outside.
while not self._supposed_to_run:
sleep(9)
return 0
return 1
[docs]
def terminate(self):
"""This is shutting down the application server serving the web interface"""
try:
self._server.shutdown()
self._server.server_close()
except AttributeError:
# In this case the dashboard has in fact already been shutdown earlier.
pass
self._supposed_to_run = False