from typing import Dict, Union
import plotly.graph_objs as go
from time_series_buffer import TimeSeriesBuffer
from time_series_metadata.scheme import MetaData
from .base_agents import AgentMET4FOF
__all__ = ["MetrologicalAgent", "MetrologicalMonitorAgent"]
[docs]class MetrologicalAgent(AgentMET4FOF):
_input_data: Dict[str, Dict[str, Union[TimeSeriesBuffer, Dict]]]
"""Input dictionary of all incoming data including metadata::
dict like {
<from>: {
"buffer": TimeSeriesBuffer(maxlen=buffer_size),
"metadata": MetaData(**kwargs).metadata,
}
"""
_input_data_maxlen: int
_output_data: Dict[str, Dict[str, Union[TimeSeriesBuffer, MetaData]]]
"""Output dictionary of all outgoing data including metadata::
dict like {
<from>: {
"buffer": TimeSeriesBuffer(maxlen=buffer_size),
"metadata": MetaData(**kwargs).metadata,
}
"""
_output_data_maxlen: int
[docs] def init_parameters(self, input_data_maxlen=25, output_data_maxlen=25):
super(MetrologicalAgent, self).init_parameters()
self._input_data = {}
self._input_data_maxlen = input_data_maxlen
self._output_data = {}
self._output_data_maxlen = output_data_maxlen
[docs] def on_received_message(self, message):
channel = message["channel"]
sender = message["from"]
if channel == "default":
data = message["data"]
metadata = None
if "metadata" in message.keys():
metadata = message["metadata"]
self._set_input_data(sender, data, metadata)
def _set_input_data(self, sender, data=None, metadata=None):
# create storage for new senders
if sender not in self._input_data.keys():
self._input_data[sender] = {
"metadata": metadata,
"buffer": TimeSeriesBuffer(maxlen=self._input_data_maxlen),
}
if metadata is not None:
# update received metadata
self._input_data[sender]["metadata"] = metadata
if data is not None:
# append received data
self._input_data[sender]["buffer"].add(data=data)
def set_output_data(self, channel, data=None, metadata=None):
# create storage for new output channels
if channel not in self._output_data.keys():
self._output_data[channel] = {
"metadata": metadata,
"buffer": TimeSeriesBuffer(maxlen=self._output_data_maxlen),
}
if metadata is not None:
# update received metadata
self._output_data[channel]["metadata"] = metadata
if data is not None:
# append received data
self._output_data[channel]["buffer"].add(data=data)
[docs] def agent_loop(self):
if self.current_state == "Running":
for channel, channel_dict in self._output_data.items():
# short names
metadata = channel_dict["metadata"]
buffer = channel_dict["buffer"]
# if there is something in the buffer, send it all
buffer_len = len(buffer)
if buffer_len > 0:
data = buffer.pop(n_samples=buffer_len)
# send data+metadata
self.send_output([data, metadata], channel=channel)
[docs] def pack_data(self, data, channel="default"):
# include metadata in the packed data
packed_data = {
"from": self.name,
"data": data[0],
"metadata": data[1],
"senderType": type(self).__name__,
"channel": channel,
}
return packed_data
[docs]class MetrologicalMonitorAgent(MetrologicalAgent):
[docs] def init_parameters(self, *args, **kwargs):
super(MetrologicalMonitorAgent, self).init_parameters(*args, **kwargs)
# create alias/dummies to match dashboard expectations
self.plot_filter = []
self.plots = {}
self.custom_plot_parameters = {}
[docs] def on_received_message(self, message):
"""Handles incoming data from ``default`` and ``plot`` channels
Feeds ``default`` data into the buffer as a dictionary::
dict like {
"data": message["data"],
"metadata": message["metadata"],
}
and hands over 'plot' data to plot memory.
Parameters
----------
message : dict
Acceptable ``channel`` values are ``default`` or ``plot``
"""
if message["channel"] == "default":
if self.plot_filter:
message["data"] = {
key: message["data"][key] for key in self.plot_filter
}
message["metadata"] = {
key: message["metadata"][key] for key in self.plot_filter
}
self.buffer_store(
agent_from=message["from"],
data={"data": message["data"], "metadata": message["metadata"]},
)
elif message["channel"] == "plot":
self.update_plot_memory(message)
return 0
[docs] def update_plot_memory(self, message):
"""
Updates plot figures stored in `self.plots` with the received message
Parameters
----------
message : dict
Standard message format specified by AgentMET4FOF class
Message['data'] needs to be base64 image string and can be nested in dictionary for multiple plots
Only the latest plot will be shown kept and does not keep a history of the plots.
"""
if type(message["data"]) != dict or message["from"] not in self.plots.keys():
self.plots[message["from"]] = message["data"]
elif type(message["data"]) == dict:
for key in message["data"].keys():
self.plots[message["from"]].update({key: message["data"][key]})
self.log_info("PLOTS: " + str(self.plots))
[docs] def reset(self):
super(MetrologicalMonitorAgent, self).reset()
del self.plots
self.plots = {}
def custom_plot_function(self, data, sender_agent, **kwargs):
# TODO: cannot set the label of the xaxis within this method
# data display
if "data" in data.keys():
if len(data["data"]):
# values = data["buffer"].show(n_samples=-1) # -1 --> all
values = data["data"]
t = values[:, 0]
ut = values[:, 1]
v = values[:, 2]
uv = values[:, 3]
# use description
desc = data["metadata"][0]
t_name, t_unit = desc.time.values()
v_name, v_unit = desc.get_quantity().values()
x_label = f"{t_name} [{t_unit}]"
y_label = f"{v_name} [{v_unit}]"
trace = go.Scatter(
x=t,
y=v,
error_x=dict(type="data", array=ut, visible=True),
error_y=dict(type="data", array=uv, visible=True),
mode="lines",
name=f"{y_label} ({sender_agent})",
)
else:
trace = go.Scatter()
else:
trace = go.Scatter()
return trace