agentMET4FOF streams

class agentMET4FOF.streams.CosineGenerator(sfreq=500, cosine_freq=5)[source]

Built-in class of cosine wave generator which inherits all methods and attributes from DataStreamMET4FOF. cosine_wave_function() is a custom defined function which has a required keyword time as argument and any number of optional additional arguments (e.g cosine_freq) to be supplied to the DataStreamMET4FOF.set_generator_function().

Parameters:
  • sfreq (int) – sampling frequency which determines the time step when next_sample() is called
  • F (int) – frequency of wave function
cosine_wave_function(time, cosine_freq=50)[source]

A simple cosine wave generator

class agentMET4FOF.streams.DataStreamMET4FOF[source]

Abstract class for creating datastreams.

Data can be fetched sequentially using next_sample() or all at once all_samples(). This increments the internal sample index _sample_idx.

For sensors data, we assume:

  • The format shape for 2D data stream (timesteps, n_sensors)
  • The format shape for 3D data stream (num_cycles, timesteps , n_sensors)

To create a new DataStreamMET4FOF class, inherit this class and call set_metadata() in the constructor. Choose one of two types of datastreams to be created:

Alternatively, override the next_sample() function if neither option suits the application. For generator functions, sfreq is a required variable to be set on init which sets the sampling frequency and the time-step which occurs when next_sample() is called.

For an example implementation of using generator function, see the built-in SineGenerator class. See tutorials for more implementations.

_quantities

Measured quantities such as sensors readings

Type:Union[List, DataFrame, np.ndarray]
_target

Target label in the context of machine learning. This can be Remaining Useful Life in predictive maintenance application. Note this can be an unobservable variable in real-time and applies only for validation during offline analysis.

Type:Union[List, DataFrame, np.ndarray]
_time

dtype can be either float or datetime64 to indicate the time when the _quantities were measured.

Type:Union[List, DataFrame, np.ndarray]
_current_sample_quantities

Last returned measured quantities from a call to next_sample()

Type:Union[List, DataFrame, np.ndarray]
_current_sample_target

Last returned target labels from a call to next_sample()

Type:Union[List, DataFrame, np.ndarray]
_current_sample_time

dtype can be either float or datetime64 to indicate the time when the _current_sample_quantities were measured.

Type:Union[List, DataFrame, np.ndarray]
_sample_idx

Current sample index

Type:int
_n_samples

Total number of samples

Type:int
_data_source_type

Explicitly account for the data source type: either “function” or “dataset”

Type:str
_generator_function

A generator function which takes in at least one argument time which will be used in next_sample()

Type:Callable
_generator_parameters

Any additional keyword arguments to be supplied to the generator function. The generator function call for every sample will be supplied with the **generator_parameters.

Type:Dict
sfreq

Sampling frequency

Type:int
_metadata

The quantities metadata as time_series_metadata.scheme.MetaData

Type:MetaData
_default_generator_function(time)[source]

This is the default generator function used, if non was specified

Parameters:time (Union[List, DataFrame, np.ndarray]) –
_next_sample_data_source(batch_size: int = 1) → Dict[str, Union[List[T], pandas.core.frame.DataFrame, numpy.ndarray]][source]

Internal method for fetching latest samples from a dataset.

Parameters:batch_size (int) – number of batches to get from data stream
Returns:samples{'quantities':current_sample_quantities, 'target':current_sample_target}
Return type:Dict
_next_sample_generator(batch_size: int = 1) → Dict[str, numpy.ndarray][source]

Internal method for generating a batch of samples from the generator function.

_set_data_source_type(dt_type: str = 'function')[source]

To explicitly account for the type of data source: either from dataset, or a generator function.

Parameters:dt_type (str) – Either “function” or “dataset”
all_samples() → Dict[str, Union[List[T], pandas.core.frame.DataFrame, numpy.ndarray]][source]

Returns all the samples in the data stream

Returns:samples{'x': current_sample_x, 'y': current_sample_y}
Return type:Dict
next_sample(batch_size: int = 1)[source]

Fetches the latest batch_size samples from the iterables: quantities, time and target. This advances the internal pointer _sample_idx by batch_size.

Parameters:batch_size (int) – number of batches to get from data stream
Returns:samples{'time':current_sample_time, 'quantities':current_sample_quantities, 'target':current_sample_target}
Return type:Dict
randomize_data()[source]

Randomizes the provided quantities, useful in machine learning contexts

set_data_source(quantities: Union[List[T], pandas.core.frame.DataFrame, numpy.ndarray] = None, target: Union[List[T], pandas.core.frame.DataFrame, numpy.ndarray, None] = None, time: Union[List[T], pandas.core.frame.DataFrame, numpy.ndarray, None] = None)[source]

This sets the data source by providing up to three iterables: quantities , time and target which are assumed to be aligned.

For sensors data, we assume: The format shape for 2D data stream (timesteps, n_sensors) The format shape for 3D data stream (num_cycles, timesteps , n_sensors)

Parameters:
  • quantities (Union[List, DataFrame, np.ndarray]) – Measured quantities such as sensors readings.
  • target (Optional[Union[List, DataFrame, np.ndarray]]) – Target label in the context of machine learning. This can be Remaining Useful Life in predictive maintenance application. Note this can be an unobservable variable in real-time and applies only for validation during offline analysis.
  • time (Optional[Union[List, DataFrame, np.ndarray]]) – dtype can be either float or datetime64 to indicate the time when the quantities were measured.
set_generator_function(generator_function: Callable = None, sfreq: int = None, **kwargs)[source]

Sets the data source to a generator function. By default, this function resorts to a sine wave generator function. Initialisation of the generator’s parameters should be done here such as setting the sampling frequency and wave frequency. For setting it with a dataset instead, see set_data_source().

Parameters:
  • generator_function (Callable) – A generator function which takes in at least one argument time which will be used in next_sample(). Parameters of the function can be fixed by providing additional arguments such as the wave frequency.
  • sfreq (int) – Sampling frequency.
  • **kwargs (Any) – Any additional keyword arguments to be supplied to the generator function. The **kwargs will be saved as _generator_parameters. The generator function call for every sample will be supplied with the **generator_parameters.
set_metadata(device_id: str, time_name: str, time_unit: str, quantity_names: Union[str, Tuple[str, ...]], quantity_units: Union[str, Tuple[str, ...]], misc: Optional[Any] = None)[source]

Set the quantities metadata as a MetaData object

Details you find in the time_series_metadata.scheme.MetaData documentation.

Parameters:
  • device_id (str) – Name of the represented generator
  • time_name (str) – Name for the time dimension
  • time_unit (str) – Unit for the time
  • quantity_names (iterable of str or str) – A string or an iterable of names of the represented quantities’ values
  • quantity_units (iterable of str or str) – An iterable of units for the quantities’ values
  • misc (Any, optional) – This parameter can take any additional metadata which will be handed over to the corresponding attribute of the created Metadata object
class agentMET4FOF.streams.SineGenerator(sfreq=500, sine_freq=5)[source]

Built-in class of sine wave generator which inherits all methods and attributes from DataStreamMET4FOF. sine_wave_function() is a custom defined function which has a required keyword time as argument and any number of optional additional arguments (e.g F) to be supplied to the DataStreamMET4FOF.set_generator_function().

Parameters:
  • sfreq (int) – sampling frequency which determines the time step when next_sample() is called
  • sine_freq (float) – frequency of wave function
sine_wave_function(time, sine_freq=50)[source]

A simple sine wave generator

agentMET4FOF.streams.extract_x_y(message)[source]

Extracts features & target from message['data'] with expected structure such as:

  1. tuple - (x,y)
  2. dict - {‘x’:x_data,’y’:y_data}

Handle data structures of dictionary to extract features & target