Source code for agentMET4FOF.agents.signal_agents

from .base_agents import AgentMET4FOF
from ..streams.signal_streams import SineGenerator

__all__ = ["SineGeneratorAgent"]

[docs]class SineGeneratorAgent(AgentMET4FOF): """An agent streaming a sine signal Takes samples from the :py:mod:`SineGenerator` and pushes them sample by sample to connected agents via its output channel. """
[docs] def init_parameters(self, sfreq=500, sine_freq=5, amplitude=1, initial_phase=0): """Initialize the input data Initialize the input data stream as an instance of the :class:`SineGenerator` class. Parameters ---------- sfreq : int sampling frequency for the underlying signal sine_freq : float frequency of the generated sine wave amplitude : float amplitude of the generated sine wave initial_phase : float initial phase (at t=0) of the generated sine wave """ self._sine_stream = SineGenerator( sfreq=sfreq, sine_freq=sine_freq, amplitude=amplitude, initial_phase=initial_phase, )
[docs] def agent_loop(self): """Model the agent's behaviour On state *Running* the agent will extract sample by sample the input data streams content and push it via invoking :meth:`AgentMET4FOF.send_output`. """ if self.current_state == "Running": sine_data = self._sine_stream.next_sample() # dictionary self.send_output(sine_data["quantities"])