Source code for agentMET4FOF.metrological_agents

from typing import Dict, List, Tuple, Union

import numpy as np
import pandas as pd
import plotly.graph_objs as go
from time_series_buffer import TimeSeriesBuffer
from time_series_metadata.scheme import MetaData

from agentMET4FOF.agents import AgentBuffer, AgentMET4FOF


[docs]class MetrologicalAgent(AgentMET4FOF): # dict like { # <from>: { # "buffer": TimeSeriesBuffer(maxlen=buffer_size), # "metadata": MetaData(**kwargs).metadata, # } _input_data: Dict[str, Dict[str, Union[TimeSeriesBuffer, Dict]]] """Input dictionary of all incoming data including metadata:: dict like { <from>: { "buffer": TimeSeriesBuffer(maxlen=buffer_size), "metadata": MetaData(**kwargs).metadata, } """ _input_data_maxlen: int # dict like { # <channel> : { # "buffer" : TimeSeriesBuffer(maxlen=buffer_size), # "metadata" : MetaData(**kwargs) # } _output_data: Dict[str, Dict[str, Union[TimeSeriesBuffer, MetaData]]] """Output dictionary of all outgoing data including metadata:: dict like { <from>: { "buffer": TimeSeriesBuffer(maxlen=buffer_size), "metadata": MetaData(**kwargs).metadata, } """ _output_data_maxlen: int
[docs] def init_parameters(self, input_data_maxlen=25, output_data_maxlen=25): super(MetrologicalAgent, self).init_parameters() self._input_data = {} self._input_data_maxlen = input_data_maxlen self._output_data = {} self._output_data_maxlen = output_data_maxlen
[docs] def on_received_message(self, message): channel = message["channel"] sender = message["from"] if channel == "default": data = message["data"] metadata = None if "metadata" in message.keys(): metadata = message["metadata"] self._set_input_data(sender, data, metadata)
def _set_input_data(self, sender, data=None, metadata=None): # create storage for new senders if sender not in self._input_data.keys(): self._input_data[sender] = { "metadata": metadata, "buffer": TimeSeriesBuffer(maxlen=self._input_data_maxlen), } if metadata is not None: # update received metadata self._input_data[sender]["metadata"] = metadata if data is not None: # append received data self._input_data[sender]["buffer"].add(data=data) def set_output_data(self, channel, data=None, metadata=None): # create storage for new output channels if channel not in self._output_data.keys(): self._output_data[channel] = { "metadata": metadata, "buffer": TimeSeriesBuffer(maxlen=self._output_data_maxlen), } if metadata is not None: # update received metadata self._output_data[channel]["metadata"] = metadata if data is not None: # append received data self._output_data[channel]["buffer"].add(data=data)
[docs] def agent_loop(self): if self.current_state == "Running": for channel, channel_dict in self._output_data.items(): # short names metadata = channel_dict["metadata"] buffer = channel_dict["buffer"] # if there is something in the buffer, send it all buffer_len = len(buffer) if buffer_len > 0: data = buffer.pop(n_samples=buffer_len) # send data+metadata self.send_output([data, metadata], channel=channel)
[docs] def pack_data(self, data, channel="default"): # include metadata in the packed data packed_data = { "from": self.name, "data": data[0], "metadata": data[1], "senderType": type(self).__name__, "channel": channel, } return packed_data
[docs]class MetrologicalMonitorAgent(MetrologicalAgent):
[docs] def init_parameters(self, *args, **kwargs): super(MetrologicalMonitorAgent, self).init_parameters(*args, **kwargs) # create alias/dummies to match dashboard expectations self.memory = self._input_data self.plot_filter = [] self.plots = {} self.custom_plot_parameters = {}
[docs] def on_received_message(self, message): """ Handles incoming data from 'default' and 'plot' channels. Stores 'default' data into `self.memory` and 'plot' data into `self.plots` Parameters ---------- message : dict Acceptable channel values are 'default' or 'plot' """ if message['channel'] == 'default': if self.plot_filter != []: message['data'] = {key: message['data'][key] for key in self.plot_filter} message['metadata'] = {key: message['metadata'][key] for key in self.plot_filter} self.buffer_store(agent_from=message["from"], data={"data": message["data"], "metadata": message["metadata"]}) elif message['channel'] == 'plot': self.update_plot_memory(message) return 0
[docs] def update_plot_memory(self, message): """ Updates plot figures stored in `self.plots` with the received message Parameters ---------- message : dict Standard message format specified by AgentMET4FOF class Message['data'] needs to be base64 image string and can be nested in dictionary for multiple plots Only the latest plot will be shown kept and does not keep a history of the plots. """ if type(message['data']) != dict or message['from'] not in self.plots.keys(): self.plots[message['from']] = message['data'] elif type(message['data']) == dict: for key in message['data'].keys(): self.plots[message['from']].update({key: message['data'][key]}) self.log_info("PLOTS: " + str(self.plots))
[docs] def reset(self): super(MetrologicalMonitorAgent, self).reset() del self.plots self.plots = {}
def custom_plot_function(self, data, sender_agent, **kwargs): # TODO: cannot set the label of the xaxis within this method # data display if "data" in data.keys(): if len(data["data"]): # values = data["buffer"].show(n_samples=-1) # -1 --> all values = data["data"] t = values[:, 0] ut = values[:, 1] v = values[:, 2] uv = values[:, 3] # use description desc = data["metadata"][0] t_name, t_unit = desc.time.values() v_name, v_unit = desc.get_quantity().values() x_label = f"{t_name} [{t_unit}]" y_label = f"{v_name} [{v_unit}]" trace = go.Scatter( x=t, y=v, error_x=dict(type="data", array=ut, visible=True), error_y=dict(type="data", array=uv, visible=True), mode="lines", name=f"{y_label} ({sender_agent})", ) else: trace = go.Scatter() else: trace = go.Scatter() return trace
[docs]class MetrologicalAgentBuffer(AgentBuffer): """Buffer class which is instantiated in every metrological agent to store data This buffer is necessary to handle multiple inputs coming from agents. We can access the buffer like a dict with exposed functions such as .values(), .keys() and .items(). The actual dict object is stored in the attribute :attr:`buffer <agentMET4FOF.agents.AgentBuffer.buffer>`. The list in :attr:`supported_datatypes <agentMET4FOF.agents.AgentBuffer.supported_datatypes>` contains one more element for metrological agents, namely :class:`TimeSeriesBuffer <time-series-buffer:time_series_buffer.buffer.TimeSeriesBuffer>`. """ def __init__(self, buffer_size: int = 1000): """Initialise a new agent buffer object Parameters ---------- buffer_size: int Length of buffer allowed. """ super(MetrologicalAgentBuffer, self).__init__(buffer_size) self.supported_datatypes.append(TimeSeriesBuffer)
[docs] def convert_single_to_tsbuffer(self, single_data: Union[List, Tuple, np.ndarray]): """Convert common data in agentMET4FOF to :class:`TimeSeriesBuffer <time-series-buffer:time_series_buffer.buffer.TimeSeriesBuffer>` Parameters ---------- single_data : iterable of iterables (list, tuple, np.ndarrray) with shape (N, M) * M==2 (pairs): assumed to be like (time, value) * M==3 (triple): assumed to be like (time, value, value_unc) * M==4 (4-tuple): assumed to be like (time, time_unc, value, value_unc) Returns ------- TimeSeriesBuffer the new :class:`TimeSeriesBuffer <time-series-buffer:time_series_buffer.buffer.TimeSeriesBuffer>` object """ ts = TimeSeriesBuffer(maxlen=self.buffer_size) ts.add(single_data) return ts
[docs] def update( self, agent_from: str, data: Union[Dict, List, Tuple, np.ndarray], ) -> TimeSeriesBuffer: """Overrides data in the buffer dict keyed by `agent_from` with value `data` Parameters ---------- agent_from : str Name of agent sender data : dict or iterable of iterables (list, tuple, np.ndarray) with shape (N, M the data to be stored in the metrological buffer Returns ------- TimeSeriesBuffer the updated :class:`TimeSeriesBuffer <time-series-buffer:time_series_buffer.buffer.TimeSeriesBuffer>` object """ # handle if data type nested in dict if isinstance(data, dict): # check for each value datatype for key, value in data.items(): data[key] = self.convert_single_to_tsbuffer(value) else: data = self.convert_single_to_tsbuffer(data) self.buffer.update({agent_from: data}) return self.buffer
[docs] def _concatenate( self, iterable: TimeSeriesBuffer, data: Union[np.ndarray, list, pd.DataFrame], concat_axis: int = 0 ) -> TimeSeriesBuffer: """Concatenate the given ``TimeSeriesBuffer`` with ``data`` Add ``data`` to the :class:`TimeSeriesBuffer <time-series-buffer:time_series_buffer.buffer.TimeSeriesBuffer>` object. Parameters ---------- iterable : TimeSeriesBuffer The current buffer to be concatenated with. data : np.ndarray, DataFrame, list New incoming data Returns ------- TimeSeriesBuffer the original buffer with the data appended """ iterable.add(data) return iterable