Tutorial 3 - An advanced pipeline with multichannel signals.

We can use different channels for the receiver to handle specifically each channel name. This can be useful for example in splitting train and test channels in machine learning Then, the user will need to implement specific handling of each channel in the receiving agent.

In this example, the MultiGeneratorAgent is used to send two different types of data - Sine and Cosine generator. This is done via specifying send_output (channel="sine") and send_output(channel="cosine").

Then on the receiving end, the on_received_message() function checks for message['channel'] to handle it separately.

Note that by default, MonitorAgent is only subscribed to the "default" channel. Hence it will not respond to the "cosine" and "sine" channel.

[1]:
# %load tutorial_3_multi_channel.py
from agentMET4FOF.agents import AgentMET4FOF, AgentNetwork, MonitorAgent
from agentMET4FOF.streams import SineGenerator, CosineGenerator


class MultiGeneratorAgent(AgentMET4FOF):

    _sine_stream: SineGenerator
    _cos_stream: CosineGenerator

    def init_parameters(self):
        self._sine_stream = SineGenerator()
        self._cos_stream = CosineGenerator()

    def agent_loop(self):
        if self.current_state == "Running":
            sine_data = self._sine_stream.next_sample()  # dictionary
            cosine_data = self._sine_stream.next_sample()  # dictionary
            self.send_output(sine_data["x"], channel="sine")
            self.send_output(cosine_data["x"], channel="cosine")


class MultiOutputMathAgent(AgentMET4FOF):

    _minus_param: float
    _plus_param: float

    def init_parameters(self, minus_param=0.5, plus_param=0.5):
        self._minus_param = minus_param
        self._plus_param = plus_param

    def on_received_message(self, message):
        """
        Checks for message['channel'] and handles them separately
        Acceptable channels are "cosine" and "sine"
        """
        if message["channel"] == "cosine":
            minus_data = self.minus(message["data"], self._minus_param)
            self.send_output({"cosine_minus": minus_data})
        elif message["channel"] == "sine":
            plus_data = self.plus(message["data"], self._plus_param)
            self.send_output({"sine_plus": plus_data})

    @staticmethod
    def minus(data, minus_val):
        return data - minus_val

    @staticmethod
    def plus(data, plus_val):
        return data + plus_val


def main():
    # start agent network server
    agentNetwork = AgentNetwork()
    # init agents
    gen_agent = agentNetwork.add_agent(agentType=MultiGeneratorAgent)
    multi_math_agent = agentNetwork.add_agent(agentType=MultiOutputMathAgent)
    monitor_agent = agentNetwork.add_agent(agentType=MonitorAgent)
    # connect agents : We can connect multiple agents to any particular agent
    # However the agent needs to implement handling multiple inputs
    agentNetwork.bind_agents(gen_agent, multi_math_agent)
    agentNetwork.bind_agents(gen_agent, monitor_agent)
    agentNetwork.bind_agents(multi_math_agent, monitor_agent)
    # set all agents states to "Running"
    agentNetwork.set_running_state()

    # allow for shutting down the network after execution
    return agentNetwork


if __name__ == "__main__":
    main()
Starting NameServer...
Broadcast server running on 0.0.0.0:9091
NS running on 127.0.0.1:3333 (127.0.0.1)
URI = PYRO:Pyro.NameServer@127.0.0.1:3333
INFO [2020-07-08 20:15:03.999193] (AgentController): INITIALIZED
INFO [2020-07-08 20:15:04.160762] (MultiGeneratorAgent_1): INITIALIZED
Dash is running on http://127.0.0.1:8050/

 Warning: This is a development server. Do not use app.run_server
 in production, use a production WSGI server like gunicorn instead.

 * Serving Flask app "agentMET4FOF.dashboard.Dashboard" (lazy loading)
 * Environment: production
   WARNING: This is a development server. Do not use it in a production deployment.
   Use a production WSGI server instead.
 * Debug mode: off
INFO [2020-07-08 20:15:04.238194] (MultiOutputMathAgent_1): INITIALIZED
INFO [2020-07-08 20:15:04.307723] (MonitorAgent_1): INITIALIZED
[2020-07-08 20:15:04.364692] (MultiGeneratorAgent_1): Connected output module: MultiOutputMathAgent_1
[2020-07-08 20:15:04.370956] (MultiGeneratorAgent_1): Connected output module: MonitorAgent_1
[2020-07-08 20:15:04.379114] (MultiOutputMathAgent_1): Connected output module: MonitorAgent_1
SET STATE:   Running
[2020-07-08 20:15:05.171751] (MultiGeneratorAgent_1): Pack time: 0.000315
[2020-07-08 20:15:05.172512] (MultiGeneratorAgent_1): Sending: [0.]
[2020-07-08 20:15:05.173200] (MonitorAgent_1): Received: {'from': 'MultiGeneratorAgent_1', 'data': array([0.]), 'senderType': 'MultiGeneratorAgent', 'channel': 'sine'}
[2020-07-08 20:15:05.175533] (MultiOutputMathAgent_1): Received: {'from': 'MultiGeneratorAgent_1', 'data': array([0.]), 'senderType': 'MultiGeneratorAgent', 'channel': 'sine'}
[2020-07-08 20:15:05.172748] (MultiGeneratorAgent_1): Pack time: 0.000102
[2020-07-08 20:15:05.173377] (MonitorAgent_1): Tproc: 5e-06
[2020-07-08 20:15:05.175921] (MultiOutputMathAgent_1): Pack time: 0.00023
[2020-07-08 20:15:05.173245] (MultiGeneratorAgent_1): Sending: [0.47942554]
[2020-07-08 20:15:05.174524] (MonitorAgent_1): Received: {'from': 'MultiGeneratorAgent_1', 'data': array([0.47942554]), 'senderType': 'MultiGeneratorAgent', 'channel': 'cosine'}
[2020-07-08 20:15:05.176514] (MultiOutputMathAgent_1): Sending: {'sine_plus': array([0.5])}
[2020-07-08 20:15:05.176643] (MultiOutputMathAgent_1): Tproc: 0.000988
[2020-07-08 20:15:05.174650] (MonitorAgent_1): Tproc: 5e-06
[2020-07-08 20:15:05.177175] (MultiOutputMathAgent_1): Received: {'from': 'MultiGeneratorAgent_1', 'data': array([0.47942554]), 'senderType': 'MultiGeneratorAgent', 'channel': 'cosine'}
[2020-07-08 20:15:05.178346] (MonitorAgent_1): Received: {'from': 'MultiOutputMathAgent_1', 'data': {'sine_plus': array([0.5])}, 'senderType': 'MultiOutputMathAgent', 'channel': 'default'}
[2020-07-08 20:15:05.177369] (MultiOutputMathAgent_1): Pack time: 9.9e-05
[2020-07-08 20:15:05.178461] (MonitorAgent_1): Tproc: 1.9e-05
[2020-07-08 20:15:05.177650] (MultiOutputMathAgent_1): Sending: {'cosine_minus': array([-0.02057446])}
[2020-07-08 20:15:05.178849] (MonitorAgent_1): Received: {'from': 'MultiOutputMathAgent_1', 'data': {'cosine_minus': array([-0.02057446])}, 'senderType': 'MultiOutputMathAgent', 'channel': 'default'}
[2020-07-08 20:15:05.177710] (MultiOutputMathAgent_1): Tproc: 0.00046
[2020-07-08 20:15:05.179303] (MonitorAgent_1): Memory: {'MultiOutputMathAgent_1': {'sine_plus': array([0.5]), 'cosine_minus': array([-0.02057446])}}
[2020-07-08 20:15:05.179361] (MonitorAgent_1): Tproc: 0.000437
 * Running on http://127.0.0.1:8050/ (Press CTRL+C to quit)
127.0.0.1 - - [08/Jul/2020 22:15:05] "POST /_dash-update-component HTTP/1.1" 204 -