Source code for agentMET4FOF.streams.base_streams

import warnings
from typing import Any, Callable, Dict, List, Optional, Tuple, Union

import numpy as np
from pandas import DataFrame
from time_series_metadata.scheme import MetaData

__all__ = ["DataStreamMET4FOF"]


[docs] class DataStreamMET4FOF: """Abstract class for creating datastreams Data can be fetched sequentially using :func:`next_sample` or all at once :func:`all_samples`. This increments the internal sample index :attr:`_sample_idx`. For sensors data, we assume: - The format shape for 2D data stream (timesteps, n_sensors) - The format shape for 3D data stream (num_cycles, timesteps , n_sensors) To create a new DataStreamMET4FOF class, inherit this class and call :func:`set_metadata` in the constructor. Choose one of two types of datastreams to be created: - from dataset file (:func:`set_data_source`), or - a waveform generator function (:func:`set_generator_function`). Alternatively, override the :func:`next_sample` function if neither option suits the application. For generator functions, :attr:`sfreq` is a required variable to be set on `init` which sets the sampling frequency and the time-step which occurs when :func:`next_sample` is called. For an example implementation of using generator function, see the built-in :class:`SineGenerator` class. See tutorials for more implementations. Attributes ---------- _quantities : Union[List, DataFrame, np.ndarray] Measured quantities such as sensors readings _target : Union[List, DataFrame, np.ndarray] Target label in the context of machine learning. This can be Remaining Useful Life in predictive maintenance application. Note this can be an unobservable variable in real-time and applies only for validation during offline analysis. _time : Union[List, DataFrame, np.ndarray] ``dtype`` can be either ``float`` or ``datetime64`` to indicate the time when the :attr:`_quantities` were measured. _current_sample_quantities : Union[List, DataFrame, np.ndarray] Last returned measured quantities from a call to :func:`next_sample` _current_sample_target : Union[List, DataFrame, np.ndarray] Last returned target labels from a call to :func:`next_sample` _current_sample_time : Union[List, DataFrame, np.ndarray] ``dtype`` can be either ``float`` or ``datetime64`` to indicate the time when the :attr:`_current_sample_quantities` were measured. _sample_idx : int Current sample index _n_samples : int Total number of samples _data_source_type : str Explicitly account for the data source type: either "function" or "dataset" _generator_function : Callable A generator function which takes in at least one argument ``time`` which will be used in :func:`next_sample` _generator_parameters : Dict Any additional keyword arguments to be supplied to the generator function. The generator function call for every sample will be supplied with the ``**generator_parameters``. sfreq : int Sampling frequency _metadata : MetaData The quantities metadata as :class:`time_series_metadata.scheme.MetaData` """ def __init__(self): """Initialize a DataStreamMet4FoF object""" super().__init__() self._quantities: Union[List, DataFrame, np.ndarray] self._target: Union[List, DataFrame, np.ndarray] self._time: Union[List, DataFrame, np.ndarray] self._current_sample_quantities: Union[List, DataFrame, np.ndarray] self._current_sample_target: Union[List, DataFrame, np.ndarray] self._current_sample_time: Union[List, DataFrame, np.ndarray] self._sample_idx: int = 0 # current sample index self._n_samples: int = 0 # total number of samples self._data_source_type: str = "function" self._generator_function: Callable self._generator_parameters: Dict = {} self.sfreq: int = 1 self._metadata: MetaData
[docs] def _set_data_source_type(self, dt_type: str = "function"): """ To explicitly account for the type of data source: either from dataset, or a generator function. Parameters ---------- dt_type : str Either "function" or "dataset" """ self._data_source_type = dt_type
[docs] def randomize_data(self): """Randomizes the provided quantities, useful in machine learning contexts""" random_index = np.arange(self._quantities.shape[0]) np.random.shuffle(random_index) self._quantities = self._quantities[random_index] if ( type(self._target).__name__ == "ndarray" or type(self._target).__name__ == "list" ): self._target = self._target[random_index] elif type(self._target).__name__ == "DataFrame": self._target = self._target.iloc[random_index]
@property def metadata(self): return self._metadata @property def sample_idx(self): return self._sample_idx
[docs] def set_metadata( self, device_id: str, time_name: str, time_unit: str, quantity_names: Union[str, Tuple[str, ...]], quantity_units: Union[str, Tuple[str, ...]], misc: Optional[Any] = None, ): """Set the quantities metadata as a ``MetaData`` object Details you find in the :class:`time_series_metadata.scheme.MetaData` documentation. Parameters ---------- device_id : str Name of the represented generator time_name : str Name for the time dimension time_unit : str Unit for the time quantity_names : iterable of str or str A string or an iterable of names of the represented quantities' values quantity_units : iterable of str or str An iterable of units for the quantities' values misc : Any, optional This parameter can take any additional metadata which will be handed over to the corresponding attribute of the created :class:`Metadata` object """ self._metadata = MetaData( device_id=device_id, time_name=time_name, time_unit=time_unit, quantity_names=quantity_names, quantity_units=quantity_units, misc=misc, )
[docs] def _default_generator_function(self, time): """This is the default generator function used, if non was specified Parameters ---------- time : Union[List, DataFrame, np.ndarray] the time stamps at which to evaluate the function Returns ------- np.ndarray :math:`f(x) = \sin (2 \pi \cdot \text{self.sfreq} \cdot x)` evaluated at ``time`` """ return np.sin(2 * np.pi * self.sfreq * time)
[docs] def set_generator_function( self, generator_function: Optional[Callable] = None, sfreq: Optional[int] = 50, **kwargs: Any ): """ Sets the data source to a generator function. By default, this function resorts to a sine wave generator function. Initialisation of the generator's parameters should be done here such as setting the sampling frequency and wave frequency. For setting it with a dataset instead, see :func:`set_data_source`. Parameters ---------- generator_function : Callable, optional A generator function which takes in at least one argument ``time`` which will be used in :func:`next_sample`. Parameters of the function can be fixed by providing additional arguments such as the wave frequency. sfreq : int, optional Sampling frequency. **kwargs : Any Any additional keyword arguments to be supplied to the generator function. The ``**kwargs`` will be saved as :attr:`_generator_parameters`. The generator function call for every sample will be supplied with the ``**generator_parameters``. """ # save the kwargs into generator_parameters self._generator_parameters = kwargs if sfreq is not None: self.sfreq = sfreq self._set_data_source_type("function") # resort to default wave generator if one is not supplied if generator_function is None: warnings.warn( "No generator function specified. Setting to default (sine wave)." ) self._generator_function = self._default_generator_function else: self._generator_function = generator_function return self._generator_function
[docs] def _next_sample_generator( self, batch_size: Optional[int] = 1 ) -> Dict[str, np.ndarray]: """Internal method to generate a batch of samples from the generator function Parameters ---------- batch_size : int, optional number of batches to get from data stream, defaults to 1 Returns ------- Dict[str, Union[List, DataFrame, np.ndarray]] latest samples in the form:: dict like { "quantities": <time series data as a list, np.ndarray or pd.Dataframe>, "time": <time stamps as a list, np.ndarray or pd.Dataframe of float or np.datetime64> } """ time: np.ndarray = ( np.arange(self._sample_idx, self._sample_idx + batch_size, 1) / self.sfreq ) self._sample_idx += batch_size value: np.ndarray = self._generator_function(time, **self._generator_parameters) return {"quantities": value, "time": time}
[docs] def set_data_source( self, quantities: Union[List, DataFrame, np.ndarray] = None, target: Optional[Union[List, DataFrame, np.ndarray]] = None, time: Optional[Union[List, DataFrame, np.ndarray]] = None, ): """ This sets the data source by providing up to three iterables: ``quantities`` , ``time`` and ``target`` which are assumed to be aligned. For sensors data, we assume: The format shape for 2D data stream (timesteps, n_sensors) The format shape for 3D data stream (num_cycles, timesteps , n_sensors) Parameters ---------- quantities : Union[List, DataFrame, np.ndarray] Measured quantities such as sensors readings. target : Optional[Union[List, DataFrame, np.ndarray]] Target label in the context of machine learning. This can be Remaining Useful Life in predictive maintenance application. Note this can be an unobservable variable in real-time and applies only for validation during offline analysis. time : Optional[Union[List, DataFrame, np.ndarray]] ``dtype`` can be either ``float`` or ``datetime64`` to indicate the time when the ``quantities`` were measured. """ self._sample_idx = 0 self._current_sample_quantities = None self._current_sample_target = None self._current_sample_time = None if quantities is None and target is None: self._quantities = list(np.arange(10)) self._target = list(np.arange(10)) self._time = list(np.arange(10)) self._target.reverse() else: self._quantities = quantities self._target = target self._time = time # infer number of samples if type(self._quantities).__name__ == "list": self._n_samples = len(self._quantities) elif type(self._quantities).__name__ == "DataFrame": # dataframe or numpy self._quantities = self._quantities.to_numpy() self._n_samples = self._quantities.shape[0] elif type(self._quantities).__name__ == "ndarray": self._n_samples = self._quantities.shape[0] self._set_data_source_type("dataset")
def prepare_for_use(self): self.reset()
[docs] def all_samples(self) -> Dict[str, Union[List, DataFrame, np.ndarray]]: """Return all the samples in the data stream Returns ------- Dict[str, Union[List, DataFrame, np.ndarray]] all samples in the form:: dict like { "quantities": <time series data as a list, np.ndarray or pd.Dataframe>, "target": <target labels as a list, np.ndarray or pd.Dataframe>, "time": <time stamps as a list, np.ndarray or pd.Dataframe of float or np.datetime64> } """ return self.next_sample(-1)
[docs] def next_sample( self, batch_size: Optional[int] = 1 ) -> Dict[str, Union[List, DataFrame, np.ndarray]]: """Fetch the latest samples from the ``quantities``, ``time`` and ``target`` Parameters ---------- batch_size : int, optional number of batches to get from data stream, defaults to 1 Returns ------- Dict[str, Union[List, DataFrame, np.ndarray]] latest samples in the form:: dict like { "quantities": <time series data as a list, np.ndarray or pd.Dataframe>, "target": <target labels as a list, np.ndarray or pd.Dataframe>, "time": <time stamps as a list, np.ndarray or pd.Dataframe of float or np.datetime64> } """ if self._data_source_type == "function": return self._next_sample_generator(batch_size) elif self._data_source_type == "dataset": return self._next_sample_data_source(batch_size)
[docs] def _next_sample_data_source( self, batch_size: Optional[int] = 1 ) -> Dict[str, Union[List, DataFrame, np.ndarray]]: """Internal method for fetching latest samples from a dataset Parameters ---------- batch_size : int, optional number of batches to get from data stream, defaults to 1 Returns ------- Dict[str, Union[List, DataFrame, np.ndarray]] latest samples in the form:: dict like { "quantities": <time series data as a list, np.ndarray or pd.Dataframe>, "target": <target labels as a list, np.ndarray or pd.Dataframe>, "time": <time stamps as a list, np.ndarray or pd.Dataframe of float or np.datetime64> } """ if batch_size < 0: batch_size = self._quantities.shape[0] self._sample_idx += batch_size try: self._current_sample_quantities = self._quantities[ self._sample_idx - batch_size : self._sample_idx ] # if target is available if self._target is not None: self._current_sample_target = self._target[ self._sample_idx - batch_size : self._sample_idx ] else: self._current_sample_target = None # if time is available if self._time is not None: self._current_sample_time = self._time[ self._sample_idx - batch_size : self._sample_idx ] else: self._current_sample_time = None except IndexError: self._current_sample_quantities = None self._current_sample_target = None self._current_sample_time = None return { "time": self._current_sample_time, "quantities": self._current_sample_quantities, "target": self._current_sample_target, }
[docs] def reset(self): """Set the sample count to zero to prepare for new extractions""" self._sample_idx = 0
[docs] def has_more_samples(self) -> bool: """Tell if there are more samples to extract""" return self._sample_idx < self._n_samples