Tutorial 8 - A simple pipeline to plot a self-made signal

Here we define a simple custom signal agent pipeline. One agent will generate a sine signal (in our case a self-made SineGenerator). The second one plots the signals on the dashboard.

We define a SineGeneratorAgent for which we have to override the functions init_parameters() & agent_loop() to define the new agent’s behaviour.

  • init_parameters() is used to setup the input data stream and potentially other necessary parameters.

  • agent_loop() will be endlessly repeated until further notice. It will sample by sample extract the input data stream’s content and push it to all agents connected to SineGeneratorAgent’s output channel by invoking send_output().

The MonitorAgent is connected to the SineGeneratorAgent’s output channel and per default automatically plots the output.

Each agent has an internal current_state which can be used as a switch to change the behaviour of the agent. The available states are listed here.

As soon as all agents are initialized and the connections are set up, the agent network is started by accordingly changing all agents’ state simultaneously.

[2]:
# %load simple_generator.py
from agentMET4FOF.agents import AgentMET4FOF, AgentNetwork, MonitorAgent
from agentMET4FOF.streams import DataStreamMET4FOF
import numpy as np


class SineGenerator(DataStreamMET4FOF):
    """Copy of the built-in class of a streaming sine wave generator

    `sfreq` is sampling frequency which determines the time step when next_sample is called
    `F` is frequency of wave function
    `sine_wave_function` is a custom defined function which has a required keyword `time` as argument and any number of optional additional arguments (e.g `F`).
    to be supplied to the `set_generator_function`

    """

    def __init__(self, sfreq=500, F=5):
        super().__init__()
        self.set_metadata(
            "SineGenerator",
            "time",
            "s",
            ("Voltage"),
            ("V"),
            "Simple sine wave generator",
        )
        self.set_generator_function(
            generator_function=self.sine_wave_function, sfreq=sfreq, F=F
        )

    def sine_wave_function(self, time, F=50):
        value = np.sin(2 * np.pi * F * time)
        return value


class SineGeneratorAgent(AgentMET4FOF):
    """An agent streaming a sine signal

    Takes samples from the :py:mod:`SineGenerator` and pushes them sample by sample
    to connected agents via its output channel.
    """

    # # The datatype of the stream will be SineGenerator.
    _sine_stream: SineGenerator

    def init_parameters(self):
        """Initialize the input data

        Initialize the input data stream as an instance of the
        :py:mod:`SineGenerator` class
        """
        self.sine_stream = SineGenerator()

    def agent_loop(self):
        """Model the agent's behaviour

        On state *Running* the agent will extract sample by sample the input data
        streams content and push it via invoking :py:method:`AgentMET4FOF.send_output`.
        """
        if self.current_state == "Running":
            sine_data = self.sine_stream.next_sample()  # dictionary
            self.send_output(sine_data["quantities"])


def demonstrate_generator_agent_use():
    # Start agent network server.
    agent_network = AgentNetwork()

    # Initialize agents by adding them to the agent network.
    gen_agent = agent_network.add_agent(agentType=SineGeneratorAgent)
    gen_agent.init_parameters()
    monitor_agent = agent_network.add_agent(agentType=MonitorAgent)

    # Interconnect agents by either way:
    # 1) by agent network.bind_agents(source, target).
    agent_network.bind_agents(gen_agent, monitor_agent)

    # 2) by the agent.bind_output().
    gen_agent.bind_output(monitor_agent)

    # Set all agents' states to "Running".
    agent_network.set_running_state()

    # Allow for shutting down the network after execution
    return agent_network


if __name__ == "__main__":
    demonstrate_generator_agent_use()
Error on connecting to existing name server at http://0.0.0.0:3333: Could not locate the name server!
Starting NameServer...
Broadcast server running on 0.0.0.0:9091
NS running on 0.0.0.0:3333 (0.0.0.0)
URI = PYRO:Pyro.NameServer@0.0.0.0:3333
INFO [2021-07-30 11:30:13.395180] (SineGeneratorAgent_1): INITIALIZED
INFO [2021-07-30 11:30:13.437144] (MonitorAgent_1): INITIALIZED
[2021-07-30 11:30:13.453633] (SineGeneratorAgent_1): Connected output module: MonitorAgent_1

|----------------------------------------------------------|
|                                                          |
| Your agent network is starting up. Open your browser and |
| visit the agentMET4FOF dashboard on http://0.0.0.0:8050/ |
|                                                          |
|----------------------------------------------------------|

SET STATE:   Running
[2021-07-30 11:30:14.400304] (SineGeneratorAgent_1): Pack time: 0.00028
[2021-07-30 11:30:14.400995] (SineGeneratorAgent_1): Sending: [0.]
[2021-07-30 11:30:14.401661] (MonitorAgent_1): Received: {'from': 'SineGeneratorAgent_1', 'data': array([0.]), 'senderType': 'SineGeneratorAgent', 'channel': 'default'}
[2021-07-30 11:30:14.402003] (MonitorAgent_1): Buffer: {'SineGeneratorAgent_1': array([0.])}
[2021-07-30 11:30:14.402087] (MonitorAgent_1): Tproc: 0.000288
[2021-07-30 11:30:15.402374] (SineGeneratorAgent_1): Pack time: 0.000744
[2021-07-30 11:30:15.415579] (SineGeneratorAgent_1): Sending: [0.06279052]
[2021-07-30 11:30:15.411719] (MonitorAgent_1): Received: {'from': 'SineGeneratorAgent_1', 'data': array([0.06279052]), 'senderType': 'SineGeneratorAgent', 'channel': 'default'}
[2021-07-30 11:30:15.416224] (MonitorAgent_1): Buffer: {'SineGeneratorAgent_1': array([0.        , 0.06279052])}
[2021-07-30 11:30:15.416788] (MonitorAgent_1): Tproc: 0.004702
[2021-07-30 11:30:16.407970] (MonitorAgent_1): Received: {'from': 'SineGeneratorAgent_1', 'data': array([0.12533323]), 'senderType': 'SineGeneratorAgent', 'channel': 'default'}
[2021-07-30 11:30:16.412750] (MonitorAgent_1): Buffer: {'SineGeneratorAgent_1': array([0.        , 0.06279052, 0.12533323])}
[2021-07-30 11:30:16.402722] (SineGeneratorAgent_1): Pack time: 0.000737
[2021-07-30 11:30:16.413619] (MonitorAgent_1): Tproc: 0.005057
[2021-07-30 11:30:16.411102] (SineGeneratorAgent_1): Sending: [0.12533323]
[2021-07-30 11:30:17.399958] (SineGeneratorAgent_1): Pack time: 0.000132
[2021-07-30 11:30:17.400477] (SineGeneratorAgent_1): Sending: [0.18738131]
[2021-07-30 11:30:17.400948] (MonitorAgent_1): Received: {'from': 'SineGeneratorAgent_1', 'data': array([0.18738131]), 'senderType': 'SineGeneratorAgent', 'channel': 'default'}
[2021-07-30 11:30:17.401294] (MonitorAgent_1): Buffer: {'SineGeneratorAgent_1': array([0.        , 0.06279052, 0.12533323, 0.18738131])}
[2021-07-30 11:30:17.401350] (MonitorAgent_1): Tproc: 0.000304
[2021-07-30 11:30:18.399971] (SineGeneratorAgent_1): Pack time: 0.000173
[2021-07-30 11:30:18.400371] (SineGeneratorAgent_1): Sending: [0.24868989]
[2021-07-30 11:30:18.401405] (MonitorAgent_1): Received: {'from': 'SineGeneratorAgent_1', 'data': array([0.24868989]), 'senderType': 'SineGeneratorAgent', 'channel': 'default'}
[2021-07-30 11:30:18.401999] (MonitorAgent_1): Buffer: {'SineGeneratorAgent_1': array([0.        , 0.06279052, 0.12533323, 0.18738131, 0.24868989])}
[2021-07-30 11:30:18.406055] (MonitorAgent_1): Tproc: 0.004479
[2021-07-30 11:30:19.400467] (SineGeneratorAgent_1): Pack time: 0.000204
[2021-07-30 11:30:19.401485] (MonitorAgent_1): Received: {'from': 'SineGeneratorAgent_1', 'data': array([0.30901699]), 'senderType': 'SineGeneratorAgent', 'channel': 'default'}
[2021-07-30 11:30:19.401203] (SineGeneratorAgent_1): Sending: [0.30901699]
[2021-07-30 11:30:19.402583] (MonitorAgent_1): Buffer: {'SineGeneratorAgent_1': array([0.        , 0.06279052, 0.12533323, 0.18738131, 0.24868989,
       0.30901699])}
[2021-07-30 11:30:19.402992] (MonitorAgent_1): Tproc: 0.00108
[2021-07-30 11:30:20.400489] (SineGeneratorAgent_1): Pack time: 0.000249
[2021-07-30 11:30:20.401135] (SineGeneratorAgent_1): Sending: [0.36812455]
[2021-07-30 11:30:20.401488] (MonitorAgent_1): Received: {'from': 'SineGeneratorAgent_1', 'data': array([0.36812455]), 'senderType': 'SineGeneratorAgent', 'channel': 'default'}
[2021-07-30 11:30:20.402401] (MonitorAgent_1): Buffer: {'SineGeneratorAgent_1': array([0.        , 0.06279052, 0.12533323, 0.18738131, 0.24868989,
       0.30901699, 0.36812455])}
[2021-07-30 11:30:20.402531] (MonitorAgent_1): Tproc: 0.000869
[2021-07-30 11:30:21.403602] (SineGeneratorAgent_1): Pack time: 0.000866
[2021-07-30 11:30:21.411816] (SineGeneratorAgent_1): Sending: [0.42577929]
[2021-07-30 11:30:21.416702] (MonitorAgent_1): Received: {'from': 'SineGeneratorAgent_1', 'data': array([0.42577929]), 'senderType': 'SineGeneratorAgent', 'channel': 'default'}
[2021-07-30 11:30:21.453530] (MonitorAgent_1): Buffer: {'SineGeneratorAgent_1': array([0.        , 0.06279052, 0.12533323, 0.18738131, 0.24868989,
       0.30901699, 0.36812455, 0.42577929])}
[2021-07-30 11:30:21.454438] (MonitorAgent_1): Tproc: 0.031729
[2021-07-30 11:30:22.400592] (SineGeneratorAgent_1): Pack time: 0.000147
[2021-07-30 11:30:22.401208] (SineGeneratorAgent_1): Sending: [0.48175367]
[2021-07-30 11:30:22.402775] (MonitorAgent_1): Received: {'from': 'SineGeneratorAgent_1', 'data': array([0.48175367]), 'senderType': 'SineGeneratorAgent', 'channel': 'default'}
[2021-07-30 11:30:22.403261] (MonitorAgent_1): Buffer: {'SineGeneratorAgent_1': array([0.        , 0.06279052, 0.12533323, 0.18738131, 0.24868989,
       0.30901699, 0.36812455, 0.42577929, 0.48175367])}
[2021-07-30 11:30:22.403356] (MonitorAgent_1): Tproc: 0.000474
[2021-07-30 11:30:23.403115] (SineGeneratorAgent_1): Pack time: 0.000741
[2021-07-30 11:30:23.405271] (SineGeneratorAgent_1): Sending: [0.53582679]
[2021-07-30 11:30:23.411827] (MonitorAgent_1): Received: {'from': 'SineGeneratorAgent_1', 'data': array([0.53582679]), 'senderType': 'SineGeneratorAgent', 'channel': 'default'}
[2021-07-30 11:30:23.415936] (MonitorAgent_1): Buffer: {'SineGeneratorAgent_1': array([0.        , 0.06279052, 0.12533323, 0.18738131, 0.24868989,
       0.30901699, 0.36812455, 0.42577929, 0.48175367, 0.53582679])}
[2021-07-30 11:30:23.416486] (MonitorAgent_1): Tproc: 0.00388
NS shut down.
[2021-07-30 11:30:24.401433] (SineGeneratorAgent_1): Pack time: 0.000457
[2021-07-30 11:30:24.402590] (SineGeneratorAgent_1): Sending: [0.58778525]
[2021-07-30 11:30:24.403865] (MonitorAgent_1): Received: {'from': 'SineGeneratorAgent_1', 'data': array([0.58778525]), 'senderType': 'SineGeneratorAgent', 'channel': 'default'}
[2021-07-30 11:30:24.406800] (MonitorAgent_1): Buffer: {'SineGeneratorAgent_1': array([0.        , 0.06279052, 0.12533323, 0.18738131, 0.24868989,
       0.30901699, 0.36812455, 0.42577929, 0.48175367, 0.53582679,
       0.58778525])}
[2021-07-30 11:30:24.409730] (MonitorAgent_1): Tproc: 0.00543
[2021-07-30 11:30:25.400245] (SineGeneratorAgent_1): Pack time: 0.00014
[2021-07-30 11:30:25.400745] (SineGeneratorAgent_1): Sending: [0.63742399]
[2021-07-30 11:30:25.400892] (MonitorAgent_1): Received: {'from': 'SineGeneratorAgent_1', 'data': array([0.63742399]), 'senderType': 'SineGeneratorAgent', 'channel': 'default'}
[2021-07-30 11:30:25.401745] (MonitorAgent_1): Buffer: {'SineGeneratorAgent_1': array([0.        , 0.06279052, 0.12533323, 0.18738131, 0.24868989,
       0.30901699, 0.36812455, 0.42577929, 0.48175367, 0.53582679,
       0.58778525, 0.63742399])}
[2021-07-30 11:30:25.401901] (MonitorAgent_1): Tproc: 0.000871
[2021-07-30 11:30:26.399903] (SineGeneratorAgent_1): Pack time: 0.000146
[2021-07-30 11:30:26.400291] (SineGeneratorAgent_1): Sending: [0.68454711]
[2021-07-30 11:30:26.400498] (MonitorAgent_1): Received: {'from': 'SineGeneratorAgent_1', 'data': array([0.68454711]), 'senderType': 'SineGeneratorAgent', 'channel': 'default'}
[2021-07-30 11:30:26.401123] (MonitorAgent_1): Buffer: {'SineGeneratorAgent_1': array([0.        , 0.06279052, 0.12533323, 0.18738131, 0.24868989,
       0.30901699, 0.36812455, 0.42577929, 0.48175367, 0.53582679,
       0.58778525, 0.63742399, 0.68454711])}
[2021-07-30 11:30:26.401469] (MonitorAgent_1): Tproc: 0.00083
[2021-07-30 11:30:27.400900] (SineGeneratorAgent_1): Pack time: 0.00037
[2021-07-30 11:30:27.401206] (SineGeneratorAgent_1): Sending: [0.72896863]
[2021-07-30 11:30:27.401722] (MonitorAgent_1): Received: {'from': 'SineGeneratorAgent_1', 'data': array([0.72896863]), 'senderType': 'SineGeneratorAgent', 'channel': 'default'}
[2021-07-30 11:30:27.402221] (MonitorAgent_1): Buffer: {'SineGeneratorAgent_1': array([0.        , 0.06279052, 0.12533323, 0.18738131, 0.24868989,
       0.30901699, 0.36812455, 0.42577929, 0.48175367, 0.53582679,
       0.58778525, 0.63742399, 0.68454711, 0.72896863])}
[2021-07-30 11:30:27.402318] (MonitorAgent_1): Tproc: 0.000505
[2021-07-30 11:30:28.400319] (SineGeneratorAgent_1): Pack time: 0.000219
[2021-07-30 11:30:28.400931] (SineGeneratorAgent_1): Sending: [0.77051324]
[2021-07-30 11:30:28.401338] (MonitorAgent_1): Received: {'from': 'SineGeneratorAgent_1', 'data': array([0.77051324]), 'senderType': 'SineGeneratorAgent', 'channel': 'default'}
[2021-07-30 11:30:28.402537] (MonitorAgent_1): Buffer: {'SineGeneratorAgent_1': array([0.        , 0.06279052, 0.12533323, 0.18738131, 0.24868989,
       0.30901699, 0.36812455, 0.42577929, 0.48175367, 0.53582679,
       0.58778525, 0.63742399, 0.68454711, 0.72896863, 0.77051324])}
[2021-07-30 11:30:28.402762] (MonitorAgent_1): Tproc: 0.001223