Source code for agentMET4FOF.agents.signal_agents

from typing import Any, Dict, List, Optional, Union

import numpy as np
import pandas as pd

from .base_agents import AgentMET4FOF
from ..streams.signal_streams import (
    SineGenerator,
    SineWithJitterGenerator,
    StaticSineWithJitterGenerator,
)

__all__ = [
    "SineGeneratorAgent",
    "SineWithJitterGeneratorAgent",
    "StaticSineWithJitterGeneratorAgent",
    "NoiseAgent",
]


[docs] class SineGeneratorAgent(AgentMET4FOF): """An agent streaming a sine signal Takes samples from the :py:mod:`SineGenerator` and pushes them sample by sample to connected agents via its output channel. """ _sine_stream: SineGenerator
[docs] def init_parameters( self, sfreq=100, sine_freq=2 * np.pi, amplitude=1.0, initial_phase=0.0 ): """Initialize the input data Initialize the input data stream as an instance of the :class:`SineGenerator` class. Parameters ---------- sfreq : int sampling frequency for the underlying signal sine_freq : float frequency of the generated sine wave amplitude : float amplitude of the generated sine wave initial_phase : float initial phase (at t=0) of the generated sine wave """ self._sine_stream = SineGenerator( sfreq=sfreq, sine_freq=sine_freq, amplitude=amplitude, initial_phase=initial_phase, )
[docs] def agent_loop(self): """Model the agent's behaviour On state *Running* the agent will extract sample by sample the input data streams content and push it via invoking :meth:`AgentMET4FOF.send_output`. """ if self.current_state == "Running": sine_data = self._sine_stream.next_sample() # dictionary self.send_output(sine_data)
[docs] class StaticSineWithJitterGeneratorAgent(AgentMET4FOF): """An agent streaming a pre generated sine signal of fixed length with jitter Takes samples from the :py:mod:`StaticSineGeneratorWithJitter` and pushes them sample by sample to connected agents via its output channel. """ _sine_stream: StaticSineWithJitterGenerator
[docs] def init_parameters( self, num_cycles: Optional[int] = 1000, jitter_std: Optional[float] = 0.02 ): r"""Initialize the pre generated sine signal of fixed length with jitter Initialize the static input data as an instance of the :class:`StaticSineWithJitterGenerator` class with the provided parameters. Parameters ---------- num_cycles : int, optional numbers of cycles, determines the signal length by :math:`\pi \cdot num\_cycles`, defaults to 1000 jitter_std : float, optional the standard deviation of the distribution to randomly draw jitter from, defaults to 0.02 """ self._sine_stream = StaticSineWithJitterGenerator( num_cycles=num_cycles, jitter_std=jitter_std )
[docs] def agent_loop(self): """Extract sample by sample the input data stream's content and push it""" if self.current_state == "Running": sine_data = self._sine_stream.next_sample() # dictionary self.send_output(sine_data)
[docs] class NoiseAgent(AgentMET4FOF): """An agent adding white noise to the incoming signal""" _noise_std: float @property def noise_std(self): """Standard deviation of the distribution to randomly draw noise from""" return self._noise_std
[docs] def init_parameters(self, noise_std: Optional[float] = 0.05): """Initialize the noise's standard deviation Parameters ---------- noise_std : float, optional the standard deviation of the distribution to randomly draw noise from, defaults to 0.05 """ self._noise_std = noise_std
[docs] def on_received_message(self, message: Dict[str, Any]): """Add noise to the received message's data Parameters ---------- message : Dictionary the received message in the expected form:: dict like { "from": "<valid agent name>" "data": <time series data as a list, np.ndarray or pd.Dataframe> or dict like { "quantities": <time series data as a list, np.ndarray or pd.Dataframe>, "target": <target labels as a list, np.ndarray or pd.Dataframe>, "time": <time stamps as a list, np.ndarray or pd.Dataframe of float or np.datetime64> } "senderType": <any subclass of AgentMet4FoF>, "channel": "<channel name>" } """ def _compute_noisy_signal_from_clean_signal( clean_signal: Union[List, np.ndarray, pd.DataFrame] ): return np.random.normal( loc=clean_signal, scale=self._noise_std, ) if self.current_state == "Running": data_in_message = message["data"].copy() if isinstance(data_in_message, (list, np.ndarray, pd.DataFrame)): self.send_output( _compute_noisy_signal_from_clean_signal(data_in_message) ) if isinstance(data_in_message, dict): fully_assembled_resulting_data = message["data"].copy() fully_assembled_resulting_data[ "quantities" ] = _compute_noisy_signal_from_clean_signal( data_in_message["quantities"] ) self.send_output(fully_assembled_resulting_data)
[docs] class SineWithJitterGeneratorAgent(SineGeneratorAgent): """An agent streaming a sine signal Takes samples from the :py:mod:`SineWithJitterGenerator` and pushes them sample by sample to connected agents via its output channel. """
[docs] def init_parameters( self, sfreq: Optional[int] = 10, sine_freq: Optional[float] = np.reciprocal(2 * np.pi), amplitude: Optional[float] = 1.0, initial_phase: Optional[float] = 0.0, jitter_std: Optional[float] = 0.02, ): r"""Initialize the input data Initialize the input data stream as an instance of the :class:`SineWithJitterGenerator` class. Parameters ---------- sfreq : int, optional sampling frequency which determines the time step when :meth:`.next_sample` is called, defaults to 10 sine_freq : float, optional frequency of the generated sine wave, defaults to :math:`\frac{1}{2 \pi}` amplitude : float, optional amplitude of the generated sine wave, defaults to 1.0 initial_phase : float, optional initial phase (at t=0) of the generated sine wave, defaults to 0.0 jitter_std : float, optional the standard deviation of the distribution to randomly draw jitter from, defaults to 0.02 """ self._sine_stream = SineWithJitterGenerator( sfreq=sfreq, sine_freq=sine_freq, amplitude=amplitude, initial_phase=initial_phase, jitter_std=jitter_std, )
[docs] def agent_loop(self): """Model the agent's behaviour On state *Running* the agent will extract sample by sample the input data streams content and push it via invoking :meth:`AgentMET4FOF.send_output`. """ if self.current_state == "Running": sine_data = self._sine_stream.next_sample() self.send_output(sine_data)