Tutorial 4 - A metrologically enabled pipeline.¶
In this tutorial we introduce the new metrologically enabled agents. We initialize an agent, which generates an infinite sine signal. The signal is generated from an external class Signal
and delivers on each call one timestamp and one value each with associated uncertainties.
The MetrologicalSineGeneratorAgent is based on the new class agentMET4FOF.metrological_agents.MetrologicalAgent. We only adapt the methods init_parameters()
and agent_loop()
. This we need to hand over an instance of the signal generating class and to generate the actual samples. The rest of the buffering and plotting logic is encapsulated inside of the new base classes.
[1]:
# %load tutorial_4_metrological_agents.py
import time
from typing import Dict
import numpy as np
from time_series_metadata.scheme import MetaData
from agentMET4FOF.agents import AgentMET4FOF, AgentNetwork
from agentMET4FOF.metrological_agents import MetrologicalAgent, MetrologicalMonitorAgent
class Signal:
"""
Simple class to request time-series datapoints of a signal
"""
def __init__(self):
self._description = MetaData(
device_id="my_virtual_sensor",
time_name="time",
time_unit="s",
quantity_names="pressure",
quantity_units="Pa",
)
@staticmethod
def _time():
return time.time()
@staticmethod
def _time_unc():
return time.get_clock_info("time").resolution
@staticmethod
def _value(timestamp):
return 1013.25 + 10 * np.sin(timestamp)
@staticmethod
def _value_unc():
return 0.5
@property
def current_datapoint(self):
t = self._time()
ut = self._time_unc()
v = self._value(t)
uv = self._value_unc()
return np.array((t, ut, v, uv))
@property
def metadata(self) -> MetaData:
return self._description
class MetrologicalSineGeneratorAgent(MetrologicalAgent):
"""An agent streaming a sine signal
Takes samples from the :py:mod:`SineGenerator` and pushes them sample by sample
to connected agents via its output channel.
"""
# The datatype of the stream will be SineGenerator.
_sine_stream: Signal
def init_parameters(self, signal: Signal = Signal(), **kwargs):
"""Initialize the input data
Initialize the input data stream as an instance of the
:py:mod:`SineGenerator` class
Parameters
----------
signal : Signal
the underlying signal for the generator
"""
self._sine_stream = signal
super().init_parameters()
def agent_loop(self):
"""Model the agent's behaviour
On state *Running* the agent will extract sample by sample the input data
streams content and push it via invoking
:py:method:`AgentMET4FOF.send_output`.
"""
if self.current_state == "Running":
self.set_output_data(channel="default", data=[self._sine_stream.current_datapoint])
super().agent_loop()
@property
def metadata(self) -> Dict:
return self._sine_stream.metadata.metadata
def main():
# start agent network server
agent_network = AgentNetwork(dashboard_modules=True)
# create and init agent
signal = Signal()
t_name, t_unit = signal.metadata.time.values()
v_name, v_unit = signal.metadata.get_quantity().values()
source_name = signal.metadata.metadata["device_id"]
source_agent = agent_network.add_agent(name=source_name,
agentType=MetrologicalSineGeneratorAgent)
source_agent.init_parameters(signal)
source_agent.set_output_data(channel="default", metadata=signal.metadata)
monitor_agent = agent_network.add_agent(
"MonitorAgent", agentType=MetrologicalMonitorAgent
)
# bind it to the monitor and activate it
source_agent.bind_output(monitor_agent)
# set all agents states to "Running"
agent_network.set_running_state()
# allow for shutting down the network after execution
return agent_network
if __name__ == "__main__":
main()
Starting NameServer...
Broadcast server running on 0.0.0.0:9091
NS running on 127.0.0.1:3333 (127.0.0.1)
URI = PYRO:Pyro.NameServer@127.0.0.1:3333
INFO [2020-07-08 19:57:12.604560] (AgentController): INITIALIZED
Dash is running on http://127.0.0.1:8050/
Warning: This is a development server. Do not use app.run_server
in production, use a production WSGI server like gunicorn instead.
INFO [2020-07-08 19:57:12.886800] (my_virtual_sensor_1): INITIALIZED
* Serving Flask app "agentMET4FOF.dashboard.Dashboard" (lazy loading)
* Environment: production
WARNING: This is a development server. Do not use it in a production deployment.
Use a production WSGI server instead.
* Debug mode: off
INFO [2020-07-08 19:57:12.974322] (MonitorAgent_1): INITIALIZED
[2020-07-08 19:57:13.032609] (my_virtual_sensor_1): Connected output module: MonitorAgent_1
SET STATE: Running
[2020-07-08 19:57:13.918868] (my_virtual_sensor_1): Pack time: 0.001148
[2020-07-08 19:57:13.922292] (my_virtual_sensor_1): Sending: [array([[1.59423823e+09, 1.00000000e-09, 1.01943921e+03, 5.00000000e-01]]), <time_series_metadata.scheme.MetaData object at 0x7f3c99d062e0>]
[2020-07-08 19:57:13.926203] (MonitorAgent_1): Received: {'from': 'my_virtual_sensor_1', 'data': array([[1.59423823e+09, 1.00000000e-09, 1.01943921e+03, 5.00000000e-01]]), 'metadata': <time_series_metadata.scheme.MetaData object at 0x7f3c99d111f0>, 'senderType': 'MetrologicalSineGeneratorAgent', 'channel': 'default'}
[2020-07-08 19:57:13.927291] (MonitorAgent_1): Tproc: 0.000397
[2020-07-08 19:57:14.916511] (my_virtual_sensor_1): Pack time: 0.00048
[2020-07-08 19:57:14.917917] (my_virtual_sensor_1): Sending: [array([[1.59423823e+09, 1.00000000e-09, 1.00998696e+03, 5.00000000e-01]]), <time_series_metadata.scheme.MetaData object at 0x7f3c99d062e0>]
[2020-07-08 19:57:14.919316] (MonitorAgent_1): Received: {'from': 'my_virtual_sensor_1', 'data': array([[1.59423823e+09, 1.00000000e-09, 1.00998696e+03, 5.00000000e-01]]), 'metadata': <time_series_metadata.scheme.MetaData object at 0x7f3c99d111c0>, 'senderType': 'MetrologicalSineGeneratorAgent', 'channel': 'default'}
[2020-07-08 19:57:14.920310] (MonitorAgent_1): Tproc: 0.000119
* Running on http://127.0.0.1:8050/ (Press CTRL+C to quit)
127.0.0.1 - - [08/Jul/2020 21:57:15] "POST /_dash-update-component HTTP/1.1" 204 -
127.0.0.1 - - [08/Jul/2020 21:57:15] "POST /_dash-update-component HTTP/1.1" 200 -
127.0.0.1 - - [08/Jul/2020 21:57:15] "POST /_dash-update-component HTTP/1.1" 200 -
[ ]: